# scraping comments import datetime import json import os import threading import time # Type “pip install websocket-client” to install. import websocket # this is to record comments on real time import math import logging from json import JSONDecodeError from websocket import ABNF from websocket import WebSocketConnectionClosedException from showroom.constants import TOKYO_TZ, FULL_DATE_FMT from showroom.utils import format_name from requests.exceptions import HTTPError # TODO: save comments, stats, telop(s) # { # "comment_log": [], # "telop": { # "latest": { # "text": "", # "created_at": "" # }, # "older": [ # { # "text": "", # "created_at": "" # } # ] # }, # "live_info": { # # stuff like view count over time etc. # } # } ''' Option 1: 2 separate "loggers", one for comments, one for stats/telop The *only* reason to do this is to allow grabbing just stats and telop instead of all three. So I'm not going to do that. What's option 2. Options 2: StatsLogger, CommentsLogger, RoomLogger: StatsLogger records just stats and telop ''' cmt_logger = logging.getLogger('showroom.comments') def convert_comments_to_danmaku(startTime, commentList, fontsize=18, fontname='MS PGothic', alpha='1A', width=640, height=360): """ Convert comments to danmaku (弾幕 / bullets) subtitles :param startTime: comments recording start time (timestamp in milliseconds) :param commentList: list of showroom messages :param fontsize = 18 :param fontname = 'MS PGothic' :param alpha = '1A' # transparency '00' to 'FF' (hex string) :param width = 640 # video screen height :param height = 360 # video screen width :return a string of danmaku subtitles """ # slotsNum: max number of comment line vertically shown on screen slotsNum = math.floor(height / fontsize) travelTime = 8 * 1000 # 8 sec, bullet comment flight time on screen # ass subtitle file header danmaku = "[Script Info]\n" danmaku += "ScriptType: v4.00+\n" danmaku += "Collisions: Normal\n" danmaku += "PlayResX: " + str(width) + "\n" danmaku += "PlayResY: " + str(height) + "\n\n" danmaku += "[V4+ Styles]\n" danmaku += "Format: Name, Fontname, Fontsize, PrimaryColour, SecondaryColour, OutlineColour, BackColour, Bold, Italic, Underline, StrikeOut, ScaleX, ScaleY, Spacing, Angle, BorderStyle, Outline, Shadow, Alignment, MarginL, MarginR, MarginV, Encoding\n" danmaku += "Style: danmakuFont, " + fontname + ", " + str(fontsize) + \ ", &H00FFFFFF, &H00FFFFFF, &H00000000, &H00000000, 1, 0, 0, 0, 100, 100, 0.00, 0.00, 1, 1, 0, 2, 20, 20, 20, 0\n\n" danmaku += "[Events]\n" danmaku += "Format: Layer, Start, End, Style, Name, MarginL, MarginR, MarginV, Effect, Text\n" # each comment line on screen can be seen as a slot # each slot will be filled with the time which indicates when the bullet comment will disappear on screen # slot[0], slot[1], slot[2], ...: for the comment lines from top to down slots = [] for i in range(slotsNum): slots.append(0) previousTelop = '' for data in commentList: m_type = str(data['t']) comment = '' if m_type == '1': # comment comment = data['cm'] elif m_type == '3': # voting start poll = data['l'] if len(poll) < 1: continue comment = 'Poll Started: 【({})'.format(poll[0]['id'] % 10000) for k in range(1, len(poll)): if k > 4: comment += ', ...' break comment += ', ({})'.format(poll[k]['id'] % 10000) comment += '】' elif m_type == '4': # voting result poll = data['l'] if len(poll) < 1: continue comment = 'Poll: 【({}) {}%'.format(poll[0]['id'] % 10000, poll[0]['r']) for k in range(1, len(poll)): if k > 4: comment += ', ...' break comment += ', ({}) {}%'.format(poll[k]['id'] % 10000, poll[k]['r']) comment += '】' elif m_type == '8': # telop telop = data['telop'] if telop is not None and telop != previousTelop: previousTelop = telop # show telop as a comment comment = 'Telop: 【' + telop + '】' else: continue else: # not comment, telop, or voting result continue # compute current relative time t = data['received_at'] - startTime # find available slot vertically from up to down selectedSlot = 0 isSlotFound = False for j in range(slotsNum): if slots[j] <= t: slots[j] = t + travelTime # replaced with the time that it will finish isSlotFound = True selectedSlot = j break # when all slots have larger times, find the smallest time and replace the slot if not isSlotFound: minIdx = 0 for j in range(1, slotsNum): if slots[j] < slots[minIdx]: minIdx = j slots[minIdx] = t + travelTime selectedSlot = minIdx # calculate bullet comment flight positions, from (x1,y1) to (x2,y2) on screen # extra flight length so a comment appears and disappears outside of the screen extraLen = math.ceil(len(comment) / 2.0) x1 = width + extraLen * fontsize y1 = (selectedSlot + 1) * fontsize x2 = 0 - extraLen * fontsize y2 = y1 def msecToAssTime(uTime): """ convert milliseconds to ass subtitle format """ msec = uTime % 1000 msec = int(round(msec / 10.0)) uTime = math.floor(uTime / 1000.0) s = int(uTime % 60) uTime = math.floor(uTime / 60.0) m = int(uTime % 60) h = int(math.floor(uTime / 60.0)) msf = ("00" + str(msec))[-2:] sf = ("00" + str(s))[-2:] mf = ("00" + str(m))[-2:] hf = ("00" + str(h))[-2:] return hf + ":" + mf + ":" + sf + "." + msf # build ass subtitle script sub = "Dialogue: 3," + msecToAssTime(t) + "," + msecToAssTime(t + travelTime) # alpha: 00 means fully visible, and FF (ie. 255 in decimal) is fully transparent. sub += ",danmakuFont,,0000,0000,0000,,{\\alpha&H" + alpha + "&\\move(" sub += str(x1) + "," + str(y1) + "," + str(x2) + "," + str(y2) sub += ")}" + comment + "\n" danmaku += sub # end of for return danmaku class CommentLogger(object): comment_id_pattern = "{created_at}_{user_id}" def __init__(self, room, client, settings, watcher): self.room = room self.client = client self.settings = settings self.watcher = watcher self.last_update = datetime.datetime.fromtimestamp(10000, tz=TOKYO_TZ) self.update_interval = self.settings.comments.default_update_interval self.comment_log = [] self.comment_ids = set() self._thread = None self.comment_count = 0 self.ws = None self.ws_startTime = 0 self.ws_send_txt = '' self._thread_interval = None self._isQuit = False self._isRecording = False @property def isRecording(self): return self._isRecording def start(self): if not self._thread: self._thread = threading.Thread(target=self.run, name='{} Comment Log'.format(self.room.name)) self._thread.start() def run(self): """ Record comments and save as niconico danmaku (弾幕 / bullets) subtitle ass file """ def ws_on_message(ws, message): """ WebSocket callback """ # "created at" has no millisecond part, so we record the precise time here now = int(time.time() * 1000) idx = message.find("{") if idx < 0: cmt_logger.error('no JSON message - {}'.format(message)) return message = message[idx:] try: data = json.loads(message) except JSONDecodeError as e: # cmt_logger.debug('JSONDecodeError, broken message: {}'.format(message)) # try to fix message += '","t":"1"}' try: data = json.loads(message) except JSONDecodeError: cmt_logger.error('JSONDecodeError, failed to fix broken message: {}'.format(message)) return cmt_logger.debug('broken message, JSONDecodeError is fixed: {}'.format(message)) # add current time data['received_at'] = now # Some useful info in the message: # ['t'] message type, determine the message is comment, telop, or gift # ['cm'] comment # ['ac'] name # ['u'] user_id # ['av'] avatar_id # ['g'] gift_id # ['n'] gift_num # type of the message m_type = str(data['t']) # could be integer or string if m_type == '1': # comment comment = data['cm'] # skip counting for 50 if len(comment) < 3 and comment.isdecimal() and int(comment) <= 50: # s1 = '⑷'; s2 = u'²'; s3 = '❹' # print(s1.isdigit()) # True # print(s2.isdigit()) # True # print(s1.isdecimal()) # False # print(s2.isdecimal()) # False # int(s1) # ValueError # int(s2) # ValueError pass else: comment = comment.replace('\n', ' ') # replace line break to a space # cmt_logger.info('{}: {}'.format(self.room.name, comment)) data['cm'] = comment self.comment_log.append(data) self.comment_count += 1 elif m_type == '2': # gift pass elif m_type == '3': # voting start self.comment_log.append(data) elif m_type == '4': # voting result self.comment_log.append(data) cmt_logger.debug('{}: has voting result'.format(self.room.name)) elif m_type == '8': # telop self.comment_log.append(data) if data['telop'] is not None: # could be null # cmt_logger.info('{}: telop = {}'.format(self.room.name, data['telop'])) pass elif m_type == '11': # cumulated gifts report pass elif m_type == '101': # indicating live finished self.comment_log.append(data) self._isQuit = True else: self.comment_log.append(data) def ws_on_error(ws, error): """ WebSocket callback """ cmt_logger.error('websocket on error: {} - {}'.format(type(error).__name__, error)) def ws_on_close(ws): """ WebSocket callback """ # cmt_logger.debug('websocket closed') self._isQuit = True def interval_send(ws): """ interval thread to send message and to close WebSocket """ count = 60 while True: # check whether to quit every sec if self._isQuit: break # send bcsvr_key every 60 secs if count >= 60: count = 0 try: # cmt_logger.debug('sending {}'.format(self.ws_send_txt)) ws.send(self.ws_send_txt) except WebSocketConnectionClosedException as e: cmt_logger.debug( 'WebSocket closed before sending message. {} Closing interval thread now...'.format(e)) break time.sleep(1) count += 1 # close WebSocket if ws is not None: ws.close() ws = None # cmt_logger.debug('interval thread finished') def ws_on_open(ws): """ WebSocket callback """ self.ws_startTime = int(time.time() * 1000) # cmt_logger.debug('websocket on open') # keep sending bcsvr_key to prevent disconnection self._thread_interval = threading.Thread(target=interval_send, name='{} Comment Log interval'.format(self.room.name), args=(ws,)) self._thread_interval.start() def ws_start(ws_uri, on_open=ws_on_open, on_message=ws_on_message, on_error=ws_on_error, on_close=ws_on_close): """ WebSocket main loop """ self.ws = websocket.WebSocket() # connect try: self.ws.connect(ws_uri) except Exception as e: on_error(self.ws, e) return on_open(self.ws) buffer = b"" buffered_opcode = ABNF.OPCODE_TEXT while not self._isQuit: try: frame = self.ws.recv_frame() except WebSocketConnectionClosedException as e: cmt_logger.debug('ws_start: WebSocket Closed') break except Exception as e: on_error(self.ws, e) break """ Fragmented frame example: For a text message sent as three fragments, the 1st fragment: opcode = 0x1 (OPCODE_TEXT) and FIN bit = 0, the 2nd fragment: opcode = 0x0 (OPCODE_CONT) and FIN bit = 0, the last fragment: opcode = 0x0 (OPCODE_CONT) and FIN bit = 1. """ if frame.opcode in (ABNF.OPCODE_TEXT, ABNF.OPCODE_BINARY, ABNF.OPCODE_CONT): buffer += frame.data if frame.opcode != ABNF.OPCODE_CONT: buffered_opcode = frame.opcode else: cmt_logger.debug('ws_start: fragment message: {}'.format(frame.data)) # it's either a last fragmented frame, or a non-fragmented single message frame if frame.fin == 1: data = buffer buffer = b"" if buffered_opcode == ABNF.OPCODE_TEXT: message = "" try: message = data.decode('utf-8') except UnicodeDecodeError as e: message = data.decode('latin-1') cmt_logger.debug('ws_start: UnicodeDecodeError, decoded as latin-1: {}'.format(message)) except Exception as e: on_error(self.ws, e) on_message(self.ws, message) elif buffered_opcode == ABNF.OPCODE_BINARY: cmt_logger.debug('ws_start: received unknown binary data: {}'.format(data)) elif frame.opcode == ABNF.OPCODE_CLOSE: # cmt_logger.debug('ws_start: received close opcode') # self.ws.close() will try to send close frame, so we skip sending close frame here break elif frame.opcode == ABNF.OPCODE_PING: cmt_logger.debug('ws_start: received ping, sending pong') if len(frame.data) < 126: self.ws.pong(frame.data) else: cmt_logger.debug('ws_start: ping message too big to send') elif frame.opcode == ABNF.OPCODE_PONG: cmt_logger.debug('ws_start: received pong') else: cmt_logger.error('ws_start: unknown frame opcode = {}'.format(frame.opcode)) on_close(self.ws) self.ws.close() # Get live info from https://www.showroom-live.com/api/live/live_info?room_id=xxx # If a room closes and then reopen on live within 30 seconds (approximately), # the broadcast_key from https://www.showroom-live.com/api/live/onlives # will not be updated with the new key. It's the same situation that when a # room live is finished, /api/live/onlives will not update its onlives list within # about 30 seconds. So here it's better to get accurate broadcast_key # from /api/live/live_info try: info = self.client.live_info(self.room.room_id) or [] except HTTPError as e: # TODO: log/handle properly cmt_logger.error('HTTP Error while getting live_info for {}: {}'.format(self.room.handle, e)) return if len(info['bcsvr_key']) == 0: cmt_logger.debug('not on live, no bcsvr_key.') return # # TODO: allow comment_logger to trigger get_live_status ? # last_counts = [] # max_interval = self.settings.comments.max_update_interval # min_interval = self.settings.comments.min_update_interval _, destdir, filename = format_name(self.settings.directory.data, self.watcher.start_time.strftime(FULL_DATE_FMT), self.room, ext=self.settings.ffmpeg.container) # TODO: modify format_name so it doesn't require so much hackery for this filename = filename.replace(self.settings.ffmpeg.container, ' comments.json') filenameAss = filename.replace(' comments.json', 'ass') destdir += '/comments' # TODO: only call this once per group per day os.makedirs(destdir, exist_ok=True) outfile = '/'.join((destdir, filename)) outfileAss = '/'.join((destdir, filenameAss)) # def add_counts(count): # return [count] + last_counts[:2] cmt_logger.info("Recording comments for {}".format(self.room.name)) # while self.watcher.is_live(): # count = 0 # seen = 0 # # update comments # try: # data = self.client.comment_log(self.room.room_id) or [] # except HTTPError as e: # # TODO: log/handle properly # print('HTTP Error while getting comments for {}: {}'.format(self.room.handle, e)) # break # for comment in data: # if len(comment['comment']) < 4 and comment['comment'].isdigit(): # continue # cid = self.comment_id_pattern.format(**comment) # if cid not in self.comment_ids: # self.comment_log.append(comment) # self.comment_ids.add(cid) # count += 1 # else: # seen += 1 # # if seen > 5: # last_counts = add_counts(count) # break # # # update update_interval if needed # highest_count = max(last_counts, default=10) # if highest_count < 7 and self.update_interval < max_interval: # self.update_interval += 1.0 # elif highest_count > 50 and self.update_interval > min_interval: # self.update_interval *= 0.5 # elif highest_count > 20 and self.update_interval > min_interval: # self.update_interval -= 1.0 # # current_time = datetime.datetime.now(tz=TOKYO_TZ) # timediff = (current_time - self.last_update).total_seconds() # self.last_update = current_time # # sleep_timer = max(0.5, self.update_interval - timediff) # time.sleep(sleep_timer) self._isRecording = True self.ws_send_txt = 'SUB\t' + info['bcsvr_key'] websocket.enableTrace(False) # False: disable trace outputs ws_start('ws://' + info['bcsvr_host'] + ':' + str(info['bcsvr_port']), on_open=ws_on_open, on_message=ws_on_message, on_error=ws_on_error, on_close=ws_on_close) if self._thread_interval is not None: self._thread_interval.join() # sorting self.comment_log = sorted(self.comment_log, key=lambda x: x['received_at']) with open(outfile, 'w', encoding='utf8') as outfp: # json.dump({"comment_log": sorted(self.comment_log, key=lambda x: x['created_at'], reverse=True)}, # outfp, indent=2, ensure_ascii=False) json.dump(self.comment_log, outfp, indent=2, ensure_ascii=False) if len(self.comment_log) > 0: # convert comments to danmaku assTxt = convert_comments_to_danmaku(self.ws_startTime, self.comment_log, fontsize=18, fontname='MS PGothic', alpha='1A', width=640, height=360) with open(outfileAss, 'w', encoding='utf8') as outfpAss: outfpAss.write(assTxt) cmt_logger.info('Completed {}'.format(outfileAss)) else: cmt_logger.info('No comments to save for {}'.format(self.room.name)) self._isRecording = False def quit(self): """ To quit comment logger anytime (to close WebSocket, save file and finish job) """ self._isQuit = True self._thread.join() if self._thread_interval is not None: self._thread_interval.join() class RoomScraper: comment_id_pattern = "{created_at}_{user_id}" def __init__(self, room, client, settings, watcher, record_comments=False): self.room = room self.client = client self.settings = settings self.watcher = watcher self.last_update = datetime.datetime.fromtimestamp(10000, tz=TOKYO_TZ) self.update_interval = self.settings.comments.default_update_interval self.comment_log = [] self.comment_ids = set() self._thread = None self.record_comments = record_comments def start(self): if not self._thread: if self.record_comments: self._thread = threading.Thread(target=self.record_with_comments, name='{} Room Log'.format(self.room.name)) else: self._thread = threading.Thread(target=self.record, name='{} Room Log'.format(self.room.name)) self._thread.start() def _fetch_comments(self): pass def _parse_comments(self, comment_log): pass def _fetch_info(self): "https://www.showroom-live.com/room/get_live_data?room_id=76535" pass def _parse_info(self, info): result = { # TODO: check for differences between result and stored data # some of this stuff should never change and/or is useful in the Watcher "live_info": { "created_at": info['live_res'].get('created_at'), "started_at": info['live_res'].get('started_at'), "live_id": info['live_res'].get('live_id'), "comment_num": info['live_res'].get('comment_num'), # oooohhhhhh # "chat_token": info['live_res'].get('chat_token'), "hot_point": "", "gift_num": "", "live_type": "", "ended_at": "", "view_uu": "", "bcsvr_key": "", }, "telop": info['telop'], "broadcast_key": "", # same as live_res.bcsvr_key "online_user_num": "", # same as live_res.view_uu "room": { "last_live_id": "", }, "broadcast_port": 8080, "broadcast_host": "onlive.showroom-live.com", } pass def record_with_comments(self): # TODO: allow comment_logger to trigger get_live_status ? last_counts = [] max_interval = self.settings.comments.max_update_interval min_interval = self.settings.comments.min_update_interval _, destdir, filename = format_name(self.settings.directory.data, self.watcher.start_time.strftime(FULL_DATE_FMT), self.room, self.settings.ffmpeg.container) # TODO: modify format_name so it doesn't require so much hackery for this filename = filename.replace('.{}'.format(self.settings.ffmpeg.container), ' comments.json') destdir += '/comments' # TODO: only call this once per group per day os.makedirs(destdir, exist_ok=True) outfile = '/'.join((destdir, filename)) def add_counts(count): return [count] + last_counts[:2] print("Recording comments for {}".format(self.room.name)) while self.watcher.is_live(): count = 0 seen = 0 # update comments try: data = self.client.comment_log(self.room.room_id) or [] except HTTPError as e: # TODO: log/handle properly print('HTTP Error while getting comments for {}: {}\n{}'.format(self.room.handle, e, e.response.content)) break for comment in data: cid = self.comment_id_pattern.format(**comment) if cid not in self.comment_ids: self.comment_log.append(comment) self.comment_ids.add(cid) count += 1 else: seen += 1 if seen > 5: last_counts = add_counts(count) break # update update_interval if needed highest_count = max(last_counts, default=10) if highest_count < 7 and self.update_interval < max_interval: self.update_interval += 1.0 elif highest_count > 50 and self.update_interval > min_interval: self.update_interval *= 0.5 elif highest_count > 20 and self.update_interval > min_interval: self.update_interval -= 1.0 current_time = datetime.datetime.now(tz=TOKYO_TZ) timediff = (current_time - self.last_update).total_seconds() self.last_update = current_time sleep_timer = max(0.5, self.update_interval - timediff) time.sleep(sleep_timer) with open(outfile, 'w', encoding='utf8') as outfp: json.dump({"comment_log": sorted(self.comment_log, key=lambda x: x['created_at'], reverse=True)}, outfp, indent=2, ensure_ascii=False) def record(self): pass def join(self): pass