populate.py 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276
  1. """
  2. This library helps populate a series of JSON files from a
  3. running Zulip instance.
  4. Conceptually it just moves data in one direction:
  5. Zulip -> file system (JSON files)
  6. This is probably the most technical part of the archive codebase
  7. for now. Conceptually, it's just connecting to Zulip with the
  8. Python API for Zulip and getting recent messages.
  9. Some of the details are about getting incremental updates from
  10. Zulip. See `populate_incremental`, but the gist of it is that
  11. we read `latest_id` from the JSON and then use that as the
  12. `anchor` in the API request to Zulip.
  13. About the data:
  14. The json format for stream_index.json is something like below:
  15. {
  16. 'time': <the last time stream_index.md was updated>,
  17. 'streams': {
  18. stream_name: {
  19. 'id': stream_id,
  20. 'latest_id': id of latest post in stream,
  21. 'topic_data': {
  22. topic_name: {
  23. topic_size: num posts in topic,
  24. latest_date: time of latest post }}}}}
  25. stream_index.json is created in the top level of the JSON directory.
  26. This directory also contains a subdirectory for each archived stream.
  27. In each stream subdirectory, there is a json file for each topic in that stream.
  28. This json file is a list of message objects,
  29. as desribed at https://zulip.com/api/get-messages
  30. """
  31. import json
  32. import time
  33. from datetime import datetime
  34. from pathlib import Path
  35. from .common import (
  36. exit_immediately,
  37. open_outfile,
  38. )
  39. from .url import (
  40. sanitize_stream,
  41. sanitize,
  42. )
  43. def dump_json(js, outfile):
  44. json.dump(js, outfile, ensure_ascii=False, sort_keys=True, indent=4)
  45. # Takes a list of messages. Returns a dict mapping topic names to lists of messages in that topic.
  46. def separate_results(list):
  47. map = {}
  48. for m in list:
  49. if m["subject"] not in map:
  50. map[m["subject"]] = [m]
  51. else:
  52. map[m["subject"]].append(m)
  53. return map
  54. # Retrieves all messages matching request from Zulip, starting at post id anchor.
  55. # As recommended in the Zulip API docs, requests 1000 messages at a time.
  56. # Returns a list of messages.
  57. def request_all(client, request, anchor=0):
  58. request["anchor"] = anchor
  59. request["num_before"] = 0
  60. request["num_after"] = 1000
  61. response = safe_request(client.get_messages, request)
  62. msgs = response["messages"]
  63. while not response["found_newest"]:
  64. request["anchor"] = response["messages"][-1]["id"] + 1
  65. response = safe_request(client.get_messages, request)
  66. msgs = msgs + response["messages"]
  67. return msgs
  68. # runs client.cmd(args). If the response is a rate limit error, waits
  69. # the requested time and then retries the request.
  70. def safe_request(cmd, *args, **kwargs):
  71. rsp = cmd(*args, **kwargs)
  72. while rsp["result"] == "error":
  73. if "retry-after" in rsp:
  74. print("timeout hit: {}".format(rsp["retry-after"]))
  75. time.sleep(float(rsp["retry-after"]) + 1)
  76. rsp = cmd(*args, **kwargs)
  77. else:
  78. exit_immediately(rsp["msg"])
  79. return rsp
  80. def get_streams(client):
  81. # In the future, we may want to change this to
  82. # include_web_public=True, for organizations that might want to
  83. # use the upcoming web_public flag; but at the very least we
  84. # should only include public streams.
  85. response = safe_request(
  86. client.get_streams, include_public=True, include_subscribed=False
  87. )
  88. return response["streams"]
  89. # Retrieves all messages from Zulip and builds a cache at json_root.
  90. def populate_all(
  91. client,
  92. json_root,
  93. is_valid_stream_name,
  94. ):
  95. all_streams = get_streams(client)
  96. streams = [s for s in all_streams if is_valid_stream_name(s["name"])]
  97. streams_data = {}
  98. for s in streams:
  99. stream_name = s["name"]
  100. stream_id = s["stream_id"]
  101. print(stream_name)
  102. topics = safe_request(client.get_stream_topics, stream_id)["topics"]
  103. latest_id = 0 # till we know better
  104. topic_data = {}
  105. for t in topics:
  106. topic_name = t["name"]
  107. request = {
  108. "narrow": [
  109. {"operator": "stream", "operand": stream_name},
  110. {"operator": "topic", "operand": topic_name},
  111. ],
  112. "client_gravatar": True,
  113. "apply_markdown": True,
  114. }
  115. messages = request_all(client, request)
  116. topic_count = len(messages)
  117. last_message = messages[-1]
  118. latest_date = last_message["timestamp"]
  119. topic_data[topic_name] = dict(size=topic_count, latest_date=latest_date)
  120. latest_id = max(latest_id, last_message["id"])
  121. dump_topic_messages(json_root, s, topic_name, messages)
  122. stream_data = dict(
  123. id=stream_id,
  124. latest_id=latest_id,
  125. topic_data=topic_data,
  126. )
  127. streams_data[stream_name] = stream_data
  128. js = dict(streams=streams_data, time=time.time())
  129. dump_stream_index(json_root, js)
  130. # Retrieves only new messages from Zulip, based on timestamps from the last update.
  131. # Raises an exception if there is no index at json_root/stream_index.json
  132. def populate_incremental(
  133. client,
  134. json_root,
  135. is_valid_stream_name,
  136. ):
  137. streams = get_streams(client)
  138. stream_index = json_root / Path("stream_index.json")
  139. if not stream_index.exists():
  140. error_msg = """
  141. You are trying to incrementally update your index, but we cannot find
  142. a stream index at {}.
  143. Most likely, you have never built the index. You can use the -t option
  144. of this script to build a full index one time.
  145. (It's also possible that you have built the index but modified the configuration
  146. or moved files in your file system.)
  147. """.format(
  148. stream_index
  149. )
  150. exit_immediately(error_msg)
  151. f = stream_index.open("r", encoding="utf-8")
  152. js = json.load(f)
  153. f.close()
  154. for s in (s for s in streams if is_valid_stream_name(s["name"])):
  155. print(s["name"])
  156. if s["name"] not in js["streams"]:
  157. js["streams"][s["name"]] = {
  158. "id": s["stream_id"],
  159. "latest_id": 0,
  160. "topic_data": {},
  161. }
  162. request = {
  163. "narrow": [{"operator": "stream", "operand": s["name"]}],
  164. "client_gravatar": True,
  165. "apply_markdown": True,
  166. }
  167. new_msgs = request_all(
  168. client, request, js["streams"][s["name"]]["latest_id"] + 1
  169. )
  170. if len(new_msgs) > 0:
  171. js["streams"][s["name"]]["latest_id"] = new_msgs[-1]["id"]
  172. nm = separate_results(new_msgs)
  173. for topic_name in nm:
  174. p = (
  175. json_root
  176. / Path(sanitize_stream(s["name"], s["stream_id"]))
  177. / Path(sanitize(topic_name) + ".json")
  178. )
  179. topic_exists = p.exists()
  180. old = []
  181. if topic_exists:
  182. f = p.open("r", encoding="utf-8")
  183. old = json.load(f)
  184. f.close()
  185. m = nm[topic_name]
  186. new_topic_data = {
  187. "size": len(m) + len(old),
  188. "latest_date": m[-1]["timestamp"],
  189. }
  190. js["streams"][s["name"]]["topic_data"][topic_name] = new_topic_data
  191. dump_topic_messages(json_root, s, topic_name, old + m)
  192. js["time"] = time.time()
  193. dump_stream_index(json_root, js)
  194. def dump_stream_index(json_root, js):
  195. if not ("streams" in js and "time" in js):
  196. raise Exception("programming error")
  197. out = open_outfile(json_root, Path("stream_index.json"), "w")
  198. dump_json(js, out)
  199. out.close()
  200. def dump_topic_messages(json_root, stream_data, topic_name, message_data):
  201. stream_name = stream_data["name"]
  202. stream_id = stream_data["stream_id"]
  203. sanitized_stream_name = sanitize_stream(stream_name, stream_id)
  204. stream_dir = json_root / Path(sanitized_stream_name)
  205. sanitized_topic_name = sanitize(topic_name)
  206. topic_fn = sanitized_topic_name + ".json"
  207. out = open_outfile(stream_dir, topic_fn, "w")
  208. msgs = [slim_message(m) for m in message_data]
  209. dump_json(msgs, out)
  210. out.close()
  211. def slim_message(msg):
  212. fields = [
  213. "content",
  214. "id",
  215. "sender_full_name",
  216. "timestamp",
  217. ]
  218. return {k: v for k, v in msg.items() if k in fields}