livestream.py 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215
  1. import copy
  2. import prawcore
  3. import time
  4. import traceback
  5. from . import common
  6. from . import exceptions
  7. from . import tsdb
  8. from voussoirkit import vlogging
  9. log = vlogging.get_logger(__name__)
  10. def _listify(x):
  11. '''
  12. The user may have given us a string containing multiple subreddits / users.
  13. Try to split that up into a list of names.
  14. '''
  15. if not x:
  16. return []
  17. if isinstance(x, str):
  18. return common.split_any(x, ['+', ' ', ','])
  19. return x
  20. def generator_printer(generator):
  21. '''
  22. Given a generator that produces livestream update steps, print them out.
  23. This yields None because print returns None.
  24. '''
  25. prev_message_length = 0
  26. for step in generator:
  27. newtext = '%s: +%ds, %dc' % (step['tsdb'].filepath.basename, step['new_submissions'], step['new_comments'])
  28. totalnew = step['new_submissions'] + step['new_comments']
  29. status = '{now} {new}'.format(now=common.human(common.get_now()), new=newtext)
  30. clear_prev = (' ' * prev_message_length) + '\r'
  31. print(clear_prev + status, end='')
  32. prev_message_length = len(status)
  33. if totalnew == 0 and log.level == 0 or log.level > vlogging.DEBUG:
  34. # Since there were no news, allow the next line to overwrite status
  35. print('\r', end='', flush=True)
  36. else:
  37. print()
  38. yield None
  39. def cycle_generators(generators, only_once, sleepy):
  40. '''
  41. Given multiple generators, yield an item from each one, cycling through
  42. them in a round-robin fashion.
  43. This is useful if you want to convert multiple livestream generators into a
  44. single generator that take turns updating each of them and yields all of
  45. their items.
  46. '''
  47. while True:
  48. for generator in generators:
  49. yield next(generator)
  50. if only_once:
  51. break
  52. time.sleep(sleepy)
  53. def livestream(
  54. subreddit=None,
  55. username=None,
  56. as_a_generator=False,
  57. do_submissions=True,
  58. do_comments=True,
  59. limit=100,
  60. only_once=False,
  61. sleepy=30,
  62. ):
  63. '''
  64. Continuously get posts from this source and insert them into the database.
  65. as_a_generator:
  66. Return a generator where every iteration does a single livestream loop
  67. and yields the return value of TSDB.insert (A summary of new
  68. submission & comment count).
  69. This is useful if you want to manage the generator yourself.
  70. Otherwise, this function will run the generator forever.
  71. '''
  72. subreddits = _listify(subreddit)
  73. usernames = _listify(username)
  74. kwargs = {
  75. 'do_submissions': do_submissions,
  76. 'do_comments': do_comments,
  77. 'limit': limit,
  78. 'params': {'show': 'all'},
  79. }
  80. subreddit_generators = [
  81. _livestream_as_a_generator(subreddit=subreddit, username=None, **kwargs) for subreddit in subreddits
  82. ]
  83. user_generators = [
  84. _livestream_as_a_generator(subreddit=None, username=username, **kwargs) for username in usernames
  85. ]
  86. generators = subreddit_generators + user_generators
  87. if as_a_generator:
  88. if len(generators) == 1:
  89. return generators[0]
  90. return generators
  91. generator = cycle_generators(generators, only_once=only_once, sleepy=sleepy)
  92. generator = generator_printer(generator)
  93. try:
  94. for step in generator:
  95. pass
  96. except KeyboardInterrupt:
  97. print()
  98. return
  99. hangman = lambda: livestream(
  100. username='gallowboob',
  101. do_submissions=True,
  102. do_comments=True,
  103. sleepy=60,
  104. )
  105. def _livestream_as_a_generator(
  106. subreddit,
  107. username,
  108. do_submissions,
  109. do_comments,
  110. limit,
  111. params,
  112. ):
  113. if not common.is_xor(subreddit, username):
  114. raise exceptions.NotExclusive(['subreddit', 'username'])
  115. if not any([do_submissions, do_comments]):
  116. raise TypeError('Required do_submissions and/or do_comments parameter')
  117. common.login()
  118. if subreddit:
  119. log.debug('Getting subreddit %s', subreddit)
  120. (database, subreddit) = tsdb.TSDB.for_subreddit(subreddit, fix_name=True)
  121. subreddit = common.r.subreddit(subreddit)
  122. submission_function = subreddit.new if do_submissions else None
  123. comment_function = subreddit.comments if do_comments else None
  124. else:
  125. log.debug('Getting redditor %s', username)
  126. (database, username) = tsdb.TSDB.for_user(username, fix_name=True)
  127. user = common.r.redditor(username)
  128. submission_function = user.submissions.new if do_submissions else None
  129. comment_function = user.comments.new if do_comments else None
  130. while True:
  131. try:
  132. items = _livestream_helper(
  133. submission_function=submission_function,
  134. comment_function=comment_function,
  135. limit=limit,
  136. params=params,
  137. )
  138. newitems = database.insert(items)
  139. yield newitems
  140. except prawcore.exceptions.NotFound:
  141. print(database.filepath.basename, '404 not found')
  142. step = {'tsdb': database, 'new_comments': 0, 'new_submissions': 0}
  143. yield step
  144. except Exception:
  145. traceback.print_exc()
  146. print('Retrying...')
  147. step = {'tsdb': database, 'new_comments': 0, 'new_submissions': 0}
  148. yield step
  149. def _livestream_helper(
  150. submission_function=None,
  151. comment_function=None,
  152. *args,
  153. **kwargs,
  154. ):
  155. '''
  156. Given a submission-retrieving function and/or a comment-retrieving function,
  157. collect submissions and comments in a list together and return that.
  158. args and kwargs go into the collecting functions.
  159. '''
  160. if not any([submission_function, comment_function]):
  161. raise TypeError('Required submissions and/or comments parameter')
  162. results = []
  163. if submission_function:
  164. log.debug('Getting submissions %s %s', args, kwargs)
  165. this_kwargs = copy.deepcopy(kwargs)
  166. submission_batch = submission_function(*args, **this_kwargs)
  167. results.extend(submission_batch)
  168. if comment_function:
  169. log.debug('Getting comments %s %s', args, kwargs)
  170. this_kwargs = copy.deepcopy(kwargs)
  171. comment_batch = comment_function(*args, **this_kwargs)
  172. results.extend(comment_batch)
  173. log.debug('Got %d posts', len(results))
  174. return results
  175. def livestream_argparse(args):
  176. if args.submissions is args.comments is False:
  177. args.submissions = True
  178. args.comments = True
  179. if args.limit is None:
  180. limit = 100
  181. else:
  182. limit = int(args.limit)
  183. return livestream(
  184. subreddit=args.subreddit,
  185. username=args.username,
  186. do_comments=args.comments,
  187. do_submissions=args.submissions,
  188. limit=limit,
  189. only_once=args.once,
  190. sleepy=int(args.sleepy),
  191. )