123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276 |
- """
- This library helps populate a series of JSON files from a
- running Zulip instance.
- Conceptually it just moves data in one direction:
- Zulip -> file system (JSON files)
- This is probably the most technical part of the archive codebase
- for now. Conceptually, it's just connecting to Zulip with the
- Python API for Zulip and getting recent messages.
- Some of the details are about getting incremental updates from
- Zulip. See `populate_incremental`, but the gist of it is that
- we read `latest_id` from the JSON and then use that as the
- `anchor` in the API request to Zulip.
- About the data:
- The json format for stream_index.json is something like below:
- {
- 'time': <the last time stream_index.md was updated>,
- 'streams': {
- stream_name: {
- 'id': stream_id,
- 'latest_id': id of latest post in stream,
- 'topic_data': {
- topic_name: {
- topic_size: num posts in topic,
- latest_date: time of latest post }}}}}
- stream_index.json is created in the top level of the JSON directory.
- This directory also contains a subdirectory for each archived stream.
- In each stream subdirectory, there is a json file for each topic in that stream.
- This json file is a list of message objects,
- as desribed at https://zulip.com/api/get-messages
- """
- import json
- import time
- from datetime import datetime
- from pathlib import Path
- from .common import (
- exit_immediately,
- open_outfile,
- )
- from .url import (
- sanitize_stream,
- sanitize,
- )
- def dump_json(js, outfile):
- json.dump(js, outfile, ensure_ascii=False, sort_keys=True, indent=4)
- # Takes a list of messages. Returns a dict mapping topic names to lists of messages in that topic.
- def separate_results(list):
- map = {}
- for m in list:
- if m["subject"] not in map:
- map[m["subject"]] = [m]
- else:
- map[m["subject"]].append(m)
- return map
- # Retrieves all messages matching request from Zulip, starting at post id anchor.
- # As recommended in the Zulip API docs, requests 1000 messages at a time.
- # Returns a list of messages.
- def request_all(client, request, anchor=0):
- request["anchor"] = anchor
- request["num_before"] = 0
- request["num_after"] = 1000
- response = safe_request(client.get_messages, request)
- msgs = response["messages"]
- while not response["found_newest"]:
- request["anchor"] = response["messages"][-1]["id"] + 1
- response = safe_request(client.get_messages, request)
- msgs = msgs + response["messages"]
- return msgs
- # runs client.cmd(args). If the response is a rate limit error, waits
- # the requested time and then retries the request.
- def safe_request(cmd, *args, **kwargs):
- rsp = cmd(*args, **kwargs)
- while rsp["result"] == "error":
- if "retry-after" in rsp:
- print("timeout hit: {}".format(rsp["retry-after"]))
- time.sleep(float(rsp["retry-after"]) + 1)
- rsp = cmd(*args, **kwargs)
- else:
- exit_immediately(rsp["msg"])
- return rsp
- def get_streams(client):
- # In the future, we may want to change this to
- # include_web_public=True, for organizations that might want to
- # use the upcoming web_public flag; but at the very least we
- # should only include public streams.
- response = safe_request(
- client.get_streams, include_public=True, include_subscribed=False
- )
- return response["streams"]
- # Retrieves all messages from Zulip and builds a cache at json_root.
- def populate_all(
- client,
- json_root,
- is_valid_stream_name,
- ):
- all_streams = get_streams(client)
- streams = [s for s in all_streams if is_valid_stream_name(s["name"])]
- streams_data = {}
- for s in streams:
- stream_name = s["name"]
- stream_id = s["stream_id"]
- print(stream_name)
- topics = safe_request(client.get_stream_topics, stream_id)["topics"]
- latest_id = 0 # till we know better
- topic_data = {}
- for t in topics:
- topic_name = t["name"]
- request = {
- "narrow": [
- {"operator": "stream", "operand": stream_name},
- {"operator": "topic", "operand": topic_name},
- ],
- "client_gravatar": True,
- "apply_markdown": True,
- }
- messages = request_all(client, request)
- topic_count = len(messages)
- last_message = messages[-1]
- latest_date = last_message["timestamp"]
- topic_data[topic_name] = dict(size=topic_count, latest_date=latest_date)
- latest_id = max(latest_id, last_message["id"])
- dump_topic_messages(json_root, s, topic_name, messages)
- stream_data = dict(
- id=stream_id,
- latest_id=latest_id,
- topic_data=topic_data,
- )
- streams_data[stream_name] = stream_data
- js = dict(streams=streams_data, time=time.time())
- dump_stream_index(json_root, js)
- # Retrieves only new messages from Zulip, based on timestamps from the last update.
- # Raises an exception if there is no index at json_root/stream_index.json
- def populate_incremental(
- client,
- json_root,
- is_valid_stream_name,
- ):
- streams = get_streams(client)
- stream_index = json_root / Path("stream_index.json")
- if not stream_index.exists():
- error_msg = """
- You are trying to incrementally update your index, but we cannot find
- a stream index at {}.
- Most likely, you have never built the index. You can use the -t option
- of this script to build a full index one time.
- (It's also possible that you have built the index but modified the configuration
- or moved files in your file system.)
- """.format(
- stream_index
- )
- exit_immediately(error_msg)
- f = stream_index.open("r", encoding="utf-8")
- js = json.load(f)
- f.close()
- for s in (s for s in streams if is_valid_stream_name(s["name"])):
- print(s["name"])
- if s["name"] not in js["streams"]:
- js["streams"][s["name"]] = {
- "id": s["stream_id"],
- "latest_id": 0,
- "topic_data": {},
- }
- request = {
- "narrow": [{"operator": "stream", "operand": s["name"]}],
- "client_gravatar": True,
- "apply_markdown": True,
- }
- new_msgs = request_all(
- client, request, js["streams"][s["name"]]["latest_id"] + 1
- )
- if len(new_msgs) > 0:
- js["streams"][s["name"]]["latest_id"] = new_msgs[-1]["id"]
- nm = separate_results(new_msgs)
- for topic_name in nm:
- p = (
- json_root
- / Path(sanitize_stream(s["name"], s["stream_id"]))
- / Path(sanitize(topic_name) + ".json")
- )
- topic_exists = p.exists()
- old = []
- if topic_exists:
- f = p.open("r", encoding="utf-8")
- old = json.load(f)
- f.close()
- m = nm[topic_name]
- new_topic_data = {
- "size": len(m) + len(old),
- "latest_date": m[-1]["timestamp"],
- }
- js["streams"][s["name"]]["topic_data"][topic_name] = new_topic_data
- dump_topic_messages(json_root, s, topic_name, old + m)
- js["time"] = time.time()
- dump_stream_index(json_root, js)
- def dump_stream_index(json_root, js):
- if not ("streams" in js and "time" in js):
- raise Exception("programming error")
- out = open_outfile(json_root, Path("stream_index.json"), "w")
- dump_json(js, out)
- out.close()
- def dump_topic_messages(json_root, stream_data, topic_name, message_data):
- stream_name = stream_data["name"]
- stream_id = stream_data["stream_id"]
- sanitized_stream_name = sanitize_stream(stream_name, stream_id)
- stream_dir = json_root / Path(sanitized_stream_name)
- sanitized_topic_name = sanitize(topic_name)
- topic_fn = sanitized_topic_name + ".json"
- out = open_outfile(stream_dir, topic_fn, "w")
- msgs = [slim_message(m) for m in message_data]
- dump_json(msgs, out)
- out.close()
- def slim_message(msg):
- fields = [
- "content",
- "id",
- "sender_full_name",
- "timestamp",
- ]
- return {k: v for k, v in msg.items() if k in fields}
|