fetch_links.py 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219
  1. #! /usr/bin/env python
  2. import time
  3. from time import mktime
  4. from datetime import datetime, timedelta
  5. import argparse
  6. from pprint import pprint
  7. import json
  8. import csv
  9. import os
  10. from psaw import PushshiftAPI
  11. pushshift_rate_limit_per_minute = 20
  12. max_comments_per_query = 150
  13. write_every = 10
  14. link_fields = ['author', 'created_utc', 'domain', 'id', 'is_self',
  15. 'num_comments', 'over_18', 'permalink', 'retrieved_on', 'score',
  16. 'selftext', 'stickied', 'subreddit_id', 'title', 'url']
  17. comment_fields = ['author', 'body', 'created_utc', 'id', 'link_id',
  18. 'parent_id', 'score', 'stickied', 'subreddit_id']
  19. def fetch_links(subreddit=None, date_start=None, date_stop=None, limit=None, score=None, self_only=False):
  20. if subreddit is None or date_start is None or date_stop is None:
  21. print('ERROR: missing required arguments')
  22. exit()
  23. api = PushshiftAPI(rate_limit_per_minute=pushshift_rate_limit_per_minute, detect_local_tz=False)
  24. # get links
  25. links = []
  26. print('fetching submissions %s to %s...' % (time.strftime('%Y-%m-%d', date_start), time.strftime('%Y-%m-%d', date_stop)))
  27. params = {
  28. 'after': int(mktime(date_start)) - 86400, # make date inclusive, adjust for UTC
  29. 'before': int(mktime(date_stop)) + 86400,
  30. 'subreddit': subreddit,
  31. 'filter': link_fields,
  32. 'sort': 'asc',
  33. 'sort_type': 'created_utc',
  34. }
  35. if limit:
  36. params['limit'] = int(limit)
  37. if score:
  38. params['score'] = score
  39. if self_only:
  40. params['is_self'] = True
  41. link_results = list(api.search_submissions(**params))
  42. print('processing %s links' % len(link_results))
  43. for s in link_results:
  44. # print('%s %s' % (datetime.utcfromtimestamp(int(s.d_['created_utc'])), s.d_['title']))
  45. # pprint(s)
  46. # get comment ids
  47. comments = []
  48. if s.d_['num_comments'] > 0 and not comment_data_exists(subreddit, s.d_['created_utc'], s.d_['id']):
  49. comment_ids = list(api._get_submission_comment_ids(s.d_['id']))
  50. # print('%s comment_ids: %s' % (data['id'], comment_ids))
  51. # get comments
  52. if (len(comment_ids) > 0):
  53. mychunks = []
  54. if len(comment_ids) > max_comments_per_query:
  55. mychunks = chunks(comment_ids, max_comments_per_query)
  56. else:
  57. mychunks = [comment_ids]
  58. for chunk in mychunks:
  59. comment_params = {
  60. 'filter': comment_fields,
  61. 'ids': ','.join(chunk),
  62. 'limit': max_comments_per_query,
  63. }
  64. comments_results = list(api.search_comments(**comment_params))
  65. print('%s fetch link %s comments %s/%s' % (datetime.utcfromtimestamp(int(s.d_['created_utc'])), s.d_['id'], len(comments_results), len(comment_ids)))
  66. for c in comments_results:
  67. comments.append(c.d_)
  68. s.d_['comments'] = comments
  69. links.append(s.d_)
  70. # write results
  71. if len(links) >= write_every:
  72. success = write_links(subreddit, links)
  73. if success:
  74. links = []
  75. # write remining results
  76. if len(links):
  77. write_links(subreddit, links)
  78. # csvs are not guaranteed to be sorted by date but you can resume broken runs
  79. # and change sort criteria later to add more posts without getting duplicates.
  80. # delete csvs and re-run to update existing posts
  81. def write_links(subreddit, links):
  82. if links and len(links) > 0:
  83. writing_day = None
  84. file = None
  85. writer = None
  86. existing_link_ids = []
  87. wrote_links = 0
  88. wrote_comments = 0
  89. for r in links:
  90. # print('%s link %s' % (r['id'], r['title']))
  91. # grab link comments
  92. existing_comment_ids = []
  93. comments = r['comments']
  94. # print('%s comments %s' % (r['id'], comments))
  95. created_ts = int(r['created_utc'])
  96. created = datetime.utcfromtimestamp(created_ts).strftime('%Y-%m-%d')
  97. created_path = datetime.utcfromtimestamp(created_ts).strftime('%Y/%m/%d')
  98. if created != writing_day:
  99. if file:
  100. file.close()
  101. writing_day = created
  102. path = 'data/' + subreddit + '/' + created_path
  103. os.makedirs(path, exist_ok=True)
  104. # create and parse existing links
  105. filename = 'links.csv'
  106. filepath = path + '/' + filename
  107. if not os.path.isfile(filepath):
  108. file = open(filepath, 'a', encoding='utf-8')
  109. writer = csv.DictWriter(file, fieldnames=link_fields)
  110. writer.writeheader()
  111. # print('created %s' % filepath)
  112. else:
  113. with open(filepath, 'r', encoding='utf-8') as file:
  114. reader = csv.DictReader(file)
  115. for row in reader:
  116. existing_link_ids.append(row['id'])
  117. file = open(filepath, 'a', encoding='utf-8')
  118. writer = csv.DictWriter(file, fieldnames=link_fields)
  119. # create and parse existing comments
  120. # writing empty comments csvs resuming and comment_data_exists()
  121. filename = r['id'] + '.csv'
  122. filepath = path + '/' + filename
  123. if not os.path.isfile(filepath):
  124. comments_file = open(filepath, 'a', encoding='utf-8')
  125. comments_writer = csv.DictWriter(comments_file, fieldnames=comment_fields)
  126. comments_writer.writeheader()
  127. # print('created %s' % filepath)
  128. else:
  129. with open(filepath, 'r', encoding='utf-8') as comments_file:
  130. reader = csv.DictReader(comments_file)
  131. for row in reader:
  132. existing_comment_ids.append(row['id'])
  133. comments_file = open(filepath, 'a', encoding='utf-8')
  134. comments_writer = csv.DictWriter(comments_file, fieldnames=comment_fields)
  135. # write link row
  136. if r['id'] not in existing_link_ids:
  137. for field in list(r):
  138. if field not in link_fields:
  139. del r[field]
  140. writer.writerow(r)
  141. wrote_links += 1
  142. # write comments
  143. for c in comments:
  144. if c['id'] not in existing_comment_ids:
  145. for field in list(c):
  146. if field not in comment_fields:
  147. del c[field]
  148. comments_writer.writerow(c)
  149. wrote_comments += 1
  150. comments_file.close()
  151. print('got %s links, wrote %s and %s comments' % (len(links), wrote_links, wrote_comments))
  152. return True
  153. def link_data_exists(subreddit, date):
  154. created_path = time.strftime('%Y/%m/%d', date)
  155. path = 'data/' + subreddit + '/' + created_path + '/links.csv'
  156. if not os.path.isfile(path):
  157. return False
  158. return True
  159. def comment_data_exists(subreddit, link_created_utc, link_id):
  160. created_ts = int(link_created_utc)
  161. created_path = datetime.utcfromtimestamp(created_ts).strftime('%Y/%m/%d')
  162. path = 'data/' + subreddit + '/' + created_path + '/' + link_id + '.csv'
  163. if os.path.isfile(path):
  164. return True
  165. return False
  166. def chunks(l, n):
  167. """Yield successive n-sized chunks from l."""
  168. for i in range(0, len(l), n):
  169. yield l[i:i + n]
  170. def mkdate(datestr):
  171. try:
  172. return time.strptime(datestr, '%Y-%m-%d')
  173. except ValueError:
  174. raise argparse.ArgumentTypeError(datestr + ' is not a proper date string')
  175. if __name__ == '__main__':
  176. parser=argparse.ArgumentParser()
  177. parser.add_argument('subreddit', help='subreddit to archive')
  178. parser.add_argument('date_start', type=mkdate, help='start archiving at date, e.g. 2005-1-1')
  179. parser.add_argument('date_stop', type=mkdate, help='stop archiving at date, inclusive, cannot be date_start')
  180. parser.add_argument('--limit', default=None, help='pushshift api limit param, default None')
  181. parser.add_argument('--score', default=None, help='pushshift api score param, e.g. "> 10", default None')
  182. parser.add_argument('--self_only', action="store_true", help='only fetch selftext submissions, default False')
  183. args=parser.parse_args()
  184. self_only = False
  185. if args.self_only:
  186. self_only = True
  187. args.subreddit = args.subreddit.lower()
  188. fetch_links(args.subreddit, args.date_start, args.date_stop, args.limit, args.score, self_only)