|
- #!/usr/bin/env python3
- """
- URLs of note
- https://www.showroom-live.com/room/is_live?room_id=61879
- {"ok": 0} == not live
- {"ok": 1} == live (may be other keys)
- !!!! As of 2017-10-23, this url no longer works.
- ~~~ https://www.showroom-live.com/room/get_live_data?room_id=61747
- https://www.showroom-live.com/event/akb48_sousenkyo_45th poll for time schedule
- https://www.showroom-live.com/event/all_rooms?event_id=1364
- List of all current live broadcasts w/ streaming links
- https://www.showroom-live.com/api/live/onlives
- All upcoming lives in the Idol genre
- https://www.showroom-live.com/api/live/upcoming?genre_id=102
- https://www.showroom-live.com/api/time_table/time_tables?order=asc&ended_at=1493621999&_=1492566692848
- Find the next live for a room
- https://www.showroom-live.com/api/room/next_live?room_id=61576
- Some basic info about a live broadcast
- https://www.showroom-live.com/api/live/live_info?room_id=44010
- {
- "age_verification_status": 0,
- "video_type": 0,
- "enquete_gift_num": 0,
- "is_enquete": false,
- "bcsvr_port": 8080,
- "live_type": 0,
- "is_free_gift_only": false,
- "bcsvr_host": "online.showroom-live.com",
- "live_id": 2741506,
- "is_enquete_result": false,
- "live_status": 2,
- "room_id": 44010,
- "bcsvr_key": "29d502:AJ4IiAqb",
- "background_image_url": null
- }
- https://www.showroom-live.com/api/live/live_info?room_id=75207
- {
- "age_verification_status": 0,
- "video_type": 0,
- "enquete_gift_num": 0,
- "is_enquete": false,
- "bcsvr_port": 8080,
- "live_type": 0,
- "is_free_gift_only": false,
- "bcsvr_host": "online.showroom-live.com",
- "live_id": 0,
- "is_enquete_result": false,
- "live_status": 1,
- "room_id": 75207,
- "bcsvr_key": "",
- "background_image_url": null
- }
- Video Banner (the strip of text across the top of the video)
- https://www.showroom-live.com/api/live/telop?room_id=61627
- Comment Log
- https://www.showroom-live.com/api/live/comment_log?room_id=61627
- Ranking Summary
- https://www.showroom-live.com/api/live/summary_ranking?room_id=61627
- NetworkInterface sources:
- http://stackoverflow.com/a/14671133/3380530
- KeyboardInterface sources:
- read loop http://stackoverflow.com/a/19655992/3380530
- Faster strptime
- http://ze.phyr.us/faster-strptime/
- I don't use strptime in this program, but it might be useful elsewhere
- Also I can probably do something similar for strftime
- Other observations:
- It appears that (dt-dt).total_seconds() calls are extremely fast.
- About 0.003 seconds per 10000 operations.
- What is peculiar is creating the timedelta object doesn't seem to take any time?
- Anyway, this means there's no reason not to move these calls to Watcher.check()
- Still need to run cProfile over a long run.
- A common feature of failed recordings is an overabundance of
- HandleCtrl, Ping <number>
- in the output log
- FFmpeg compile requirements:
- openssl
- librtmp
- libx264 (?)
- """
- # from sys import stdout, stdin, exit
- import datetime
- import itertools
- import json
- import logging
- import re
- import threading
- # import glob
- import time
- from collections import OrderedDict
- # import argparse
- from heapq import heapify, heappush, heappop
- from json.decoder import JSONDecodeError
- from queue import Queue # Empty as QueueEmpty
- from requests.exceptions import HTTPError
- from showroom.api import ShowroomClient
- from showroom.downloader import Downloader
- # from .message import ShowroomMessage
- # from .exceptions import ShowroomDownloadError
- from .comments import CommentLogger
- from .constants import TOKYO_TZ, HHMM_FMT, FULL_DATE_FMT, MODE_TO_STATUS
- from .index import ShowroomIndex, Room
- from .settings import ShowroomSettings
- from .utils import strftime
- # The times and dates reported on the website are screwy, but when fetched
- # through BeautifulSoup they *seem* to come in JST
- # If you're getting incorrect times you probably need to mess with
- # Schedule.convert_time()
- # Or add custom headers to the requests.get() call in Scheduler.tick()
- # TODO: create a separate loggers.py or something, set the levels there
- # and other important details, e.g. define custom levels for finer verbosity control
- core_logger = logging.getLogger('showroom.core')
- hls_url_re1 = re.compile(r'(https://edge-(\d*)-(\d*)-(\d*)-(\d*).showroom-live.com:443/liveedge/(\w*))/playlist.m3u8')
- # TODO: Make this a config file option
- STREAM_PREFERENCE = ("rtmp", "lhls", "hls")
- WATCHSECONDS = (600, 420, 360, 360, 300, 300, 240, 240, 180, 150)
- # TODO: handle genre/category by individual rooms
- # currently this checks the onlive list for each of Music, Idol, and Talent/Model
- # schedules are still Idol only
- GENRE_IDS = {101, 102, 103, 104, 105, 106, 107, 200}
- def watch_seconds(priority: int):
- """
- Translates priority to a watch duration in seconds.
- Looks up the priority in a tuple, returns number of seconds before
- start_time to begin watching a room with the given priority.
- Args:
- priority: An int representing the room's priority.
- Returns:
- Seconds as an int. For all priorities over 10, returns 120,
- else looks up priority in WATCHSECONDS.
- TODO:
- Make this a feature of Watcher objects, calculated on creation or
- updated when (watch) duration or (room) priority is updated.
- """
- if priority > len(WATCHSECONDS):
- return 120
- elif priority < 0:
- return 600
- else:
- return WATCHSECONDS[priority-1]
- class Watcher(object):
- """Manages downloads for a single room/stream.
- TODO:
- docstrings
- logging
- flow analysis for run()
- review end states
- option to download all streams but only keep wanted
- instead of default of only downloading wanted
- """
- def __init__(self, room: Room, client: ShowroomClient, settings: ShowroomSettings,
- update_flag: threading.Event=None, start_time: datetime.datetime=None,
- watch_duration: int=None):
- self._lock = threading.RLock()
- if update_flag:
- self._update_flag = update_flag
- else:
- self._update_flag = threading.Event()
- self._room = room
- self._client = client
- self._settings = settings
- self._download = Downloader(room, client, settings)
- if self._settings.comments.record and self.priority < self._settings.comments.max_priority:
- self.comment_logger = CommentLogger(self.room, self._client, self._settings, self)
- else:
- self.comment_logger = None
- # originally start_time was the time the stream began recording
- # now however i'm using the start_time according to Showroom
- # so Watcher is always created with a start_time
- if start_time:
- self.__start_time = start_time
- else:
- self.__start_time = datetime.datetime.now(tz=TOKYO_TZ)
- self._end_time = None
- self._watch_duration = watch_duration
- self._watch_start_time = self._watch_end_time = None
- self.set_watch_time(self.__start_time, self.watch_duration)
- self._live = False
- self.__live_time = datetime.datetime.fromtimestamp(0.0, tz=TOKYO_TZ)
- self.__mode = "schedule"
- # mainly used by hacked together priority heapq
- def __bool__(self):
- return bool(self._room)
- # access to internal objects
- @property
- def room(self):
- return self._room
- @property
- def download(self):
- return self._download
- # informational properties
- @property
- def name(self):
- return self._room.name
- @property
- def web_url(self):
- return self.room.long_url
-
- @property
- def room_id(self):
- return self._room.room_id
- @property
- def priority(self):
- return self._room['priority']
- @property
- def watch_duration(self):
- """
- Time in seconds to start watching a room ahead of when it is scheduled to go live.
- Will keep watching for watch_duration*2 after scheduled start_time
- """
- if self._watch_duration:
- return self._watch_duration
- else:
- return watch_seconds(self.priority)
- @property
- def mode(self):
- return self.__mode
- @property
- def formatted_start_time(self):
- return strftime(self.__start_time, HHMM_FMT)
- @property
- def start_time(self):
- return self.__start_time
- # internal properties
- @property
- def _start_time(self):
- return self.__start_time
- @_start_time.setter
- def _start_time(self, new_time):
- with self._lock:
- self.__start_time = new_time
- @property
- def _mode(self):
- return self.__mode
- @_mode.setter
- def _mode(self, new_mode):
- with self._lock:
- self.__mode = new_mode
- @property
- def __watch_rate(self):
- return self._settings.throttle.rate.watch
- @property
- def __live_rate(self):
- return self._settings.throttle.rate.live
- @property
- def __download_timeout(self):
- return self._settings.throttle.timeout.downloads
- def get_info(self):
- """Returns a dictionary describing the Watcher's state.
- Also returns info for child Downloader and Room objects."""
- with self._lock:
- room_info = self.room.get_info()
- return {
- # TODO: fix the write_completed method below to handle datetime
- "start_time": self._start_time,
- "end_time": self._end_time,
- "live": self.is_live(),
- "mode": self._mode,
- # this is kinda hokey, but it's needed often enough so...
- "name": room_info['name'],
- "room": room_info,
- "download": self.download.get_info()}
- # TODO: review uses and functionality of these two methods
- def reschedule(self, new_time):
- with self._lock:
- if self._mode == "schedule":
- self._start_time = new_time
- self.set_watch_time(new_time)
- self._update_flag.set()
- def set_watch_time(self, watch_time, watch_duration: int=None):
- with self._lock:
- if watch_duration is None:
- watch_duration = self.watch_duration
- self._watch_start_time = watch_time - datetime.timedelta(seconds=watch_duration)
- self._watch_end_time = watch_time + datetime.timedelta(seconds=watch_duration*2.0)
- def is_live(self):
- """Returns whether the stream is live or not.
- May be stale"""
- return self._live
- def _watch_ready(self):
- # start watch_seconds before start_time
- # finish watch_seconds * 2 after start_time
- curr_time = datetime.datetime.now(tz=TOKYO_TZ)
- # TODO: is this noticeably slower than the old (int > (curr - start).totalseconds() > int)
- if (self._watch_start_time
- < curr_time
- < self._watch_end_time):
- return True
- else:
- return False
- def _live_ready(self):
- curr_time = datetime.datetime.now(tz=TOKYO_TZ)
- if (curr_time - self.__live_time).total_seconds() > self.__live_rate:
- self.__live_time = curr_time
- return True
- else:
- return False
- def check_live_status(self):
- """Checks if the stream is live or not.
- This actually checks the website"""
- try:
- self._live = self._client.is_live(self.room_id)
- except HTTPError as e:
- core_logger.warn('Caught HTTPError while checking room\'s live status: {}'.format(e))
- self._live = False
- return self._live
- def stop(self):
- self._mode = "quitting"
- if self._download.is_running():
- self._download.stop()
- self.comment_logger.quit()
- def kill(self):
- with self._lock:
- if self._mode == "quitting" and self.download.is_running():
- self._download.kill()
- def run(self):
- """
- Watcher flow:
- Case 1: Scheduled Live
- 1) New Watcher is created at step "schedule" and started
- 2) Enter schedule loop:
- Check if curr_time is close enough to start_time to begin watching.
- Manager may update start_time if the schedule changes.
- If curr_time is close enough, change mode to watch
- Else, sleep for a short period of time
- 3) Enter watch loop:
- Check if stream is live
- If live, update start_time and check room.is_wanted()
- WARNING: ensure Watcher and Manager don't overwrite each other's start_times
- Manager should only update start_time if in schedule mode
- e.g. use a reschedule() method that locks mode until updated? I don't think that's sufficient
- Watcher should only update start_time if in watch mode
- If room.is_wanted()
- 3) When the room goes live, Watcher starts the download and switches to "download"
- 4) When the stream ends, Watcher completes the download and switches to "complete"
- and the thread ends (returns a completed watcher?)
- FLOW: schedule -> watch -> download -> completed
- Returns:
- Nothing
- """
- self._update_flag.set()
- # core_logger.debug('Entering {} mode for {}'.format(self.mode, self.name))
- while self._mode == "schedule":
- if self._watch_ready():
- core_logger.info('Watching {}'.format(self.name))
- self._mode = "watch"
- else:
- time.sleep(1.0)
- # core_logger.debug('Entering {} mode for {}'.format(self.mode, self.name))
- while self._mode == "watch":
- if self._watch_ready():
- if self.check_live_status():
- self._start_time = datetime.datetime.now(tz=TOKYO_TZ)
- core_logger.info('{} is now live'.format(self.name))
- if self.room.is_wanted():
- self._mode = "download"
- else:
- self.download.update_streaming_url()
- self._mode = "live"
- else:
- # This is okay as long as watch rate is a short period of time
- time.sleep(self.__watch_rate)
- else:
- self._mode = "expired"
- if self.mode in ("live", "download"):
- self._update_flag.set()
- if self.comment_logger:
- self.comment_logger.start()
- # core_logger.debug('Entering {} mode for {}'.format(self.mode, self.name))
- while self._mode in ("live", "download"):
- # These are together so that users can toggle
- # "wanted" status and switch between them, though it would almost be better
- # if we just automatically recorded everything and discarded unwanted files...
- # except when stuff like New Year's happens.
- # TODO: add an optional flag (to settings) that does exactly that
- while self._mode == "live":
- if self._live_ready():
- # TODO: periodically update the streaming urls
- if self.check_live_status():
- if self.room.is_wanted():
- self._mode = "download"
- else:
- self._end_time = datetime.datetime.now(tz=TOKYO_TZ)
- self._mode = "completed"
- time.sleep(1.0)
- while self._mode == "download":
- # this happens at the top here so that changing mode to "quitting"
- # will cause the loop to break before the download is resumed
- # check_live_status was moved to the end to avoid
- # pinging the site twice whenever a download starts
- if self.is_live():
- if self.room.is_wanted():
- self.download.start()
- else:
- self._mode = "live"
- else:
- self._end_time = datetime.datetime.now(tz=TOKYO_TZ)
- self._mode = 'completed'
- # self.download.wait(timeout=self.__download_timeout)
- self.download.wait()
- time.sleep(0.5)
- self.check_live_status()
- # core_logger.debug('Entering {} mode for {}'.format(self.mode, self.name))
- # TODO: decide what to do with the three end states
- if self._mode == "quitting":
- # what is this mode? it's presumably a way to break out of the download loop
- # to quit:
- # change mode to quitting
- # and call stop() on the downloader
- # actually we should never make it to this block while the download is active, right?
- # unless it times out. but if it times out and the process doesn't end, wait() will
- # never return, so again we'll never reach this block. Thus, all I need to do is...
- self._mode = "completed"
- self._update_flag.set()
- if self._mode == "expired":
- # download never started
- # watcher needs to vacate the premises as fast as possible
- # how it does that I'm not really sure... it depends what WatchManager is doing
- # If watchqueue has a window that returns all expired watchers, that would work
- return
- elif self._mode == "completed":
- # download started and finished, and has already been moved to dest
- return
- class WatchQueue(object):
- """Priority heap queue that also permits iteration.
- TODO:
- Review need for this object
- Write a proper docstring
- Decide how to quit
- Deal with downloads and lives in remove/prune methods.
- Review need for prune
- Review need for list_info
- """
- REMOVED = None
- MODE_GROUPS = {"upcoming": ("schedule",),
- "working": ("schedule", "watch", "download", "live"),
- "active": ("watch", "download", "live"),
- "live": ("live", "download"),
- "ending": ("quitting",),
- "done": ("expired", "completed")}
- def __init__(self):
- self.queue = []
- self.entry_map = {}
- self._counter = itertools.count()
- self._dirty = False
- self._rlock = threading.RLock()
- def __len__(self):
- return len(self.entry_map)
- def __iter__(self):
- with self._rlock:
- _index = 0
- while _index < len(self.queue):
- val = self.queue[_index][2]
- if val is not None:
- yield val
- _index += 1
- def __bool__(self):
- return len(self.entry_map) > 0
- def __getitem__(self, key):
- return self.entry_map[key][2]
- def __contains__(self, room_id):
- with self._rlock:
- if room_id in self.entry_map:
- return True
- else:
- return False
- # This looks nicer than ~7 different methods, but is it clearer?
- def get_by_mode(self, mode):
- """Returns an iterator through all Watchers with the given mode or mode group.
- Modes:
- schedule
- watch
- live (both live and download)
- download
- quitting (or ending)
- expired
- completed
- Groups:
- upcoming: schedule
- live: live, download
- working: schedule, watch, live, download, quitting
- active: watch, live, download
- done: expired, completed
- """
- if mode in self.MODE_GROUPS:
- mode = self.MODE_GROUPS[mode]
- else:
- mode = (mode,)
- with self._rlock:
- yield from (i for i in self if i.mode in mode)
- def ids(self):
- """Returns all room ids in the queue.
- Of debatable utility."""
- with self._rlock:
- return self.entry_map.copy().keys()
-
- def add(self, item):
- """Adds an item to the queue.
- Preserves heap invariant."""
- if item.room_id in self.entry_map:
- return False # do nothing
- else:
- with self._rlock:
- count = next(self._counter)
- entry = [item.priority, count, item]
- self.entry_map[item.room_id] = entry
- heappush(self.queue, entry)
- return True
- def pop(self):
- """Pops the item at the front of the queue.
- Preserves heap invariant."""
- with self._rlock:
- while self.queue:
- priority, count, item = heappop(self.queue)
- if item is not self.REMOVED:
- del self.entry_map[item.room_id]
- return item
- def replace(self, item):
- """Places an item on the queue and pops another from the front of the queue.
- Preserves heap invariant.
- Args:
- Item to place on the queue.
- Returns:
- Item formerly at the front of the queue.
- Issues:
- Unneeded.
- """
- with self._rlock:
- if item.room_id in self.entry_map:
- return self.pop()
- else:
- count = next(self._counter)
- entry = [item.priority, count, item]
- self.entry_map[item.room_id] = entry
- heappush(self.queue, entry)
- return self.pop()
- def peek(self):
- """Peeks at the front of the queue without popping.
- May reentrant lock while rebuilding the queue."""
- if len(self) > 0:
- result = self.queue[0][2]
- if not result:
- self.rebuild()
- result = self.queue[0][2]
- return result
- else:
- return None
- def rebuild(self):
- """Rebuilds the queue, removing dead items left by dirty operations.
- Dirty operations:
- remove
- dirty_pop
- pop_end
- prune
- """
- if self._dirty:
- with self._rlock:
- self.queue = [e for e in self.queue if e[2] != self.REMOVED]
- heapify(self.queue)
- def remove(self, item):
- """Removes item from queue.
- Heap should be rebuilt afterwards."""
- with self._rlock:
- entry = self.entry_map.pop(item.room_id)
- entry[2] = self.REMOVED
- self._dirty = True
- def dirty_pop(self, item):
- """Pops a specific item from anywhere in the queue.
- Heap should be rebuilt afterwards."""
- with self._rlock:
- entry = self.entry_map.pop(item.room_id)
- if entry:
- result = entry[2]
- entry[2] = self.REMOVED
- self._dirty = True
- return result
- else:
- return None
- def pop_end(self):
- """Removes the last item from the queue.
- Heap should be rebuilt afterwards."""
- with self._rlock:
- while self.queue:
- priority, count, item = self.queue.pop(-1)
- if item is not self.REMOVED:
- del self.entry_map[item.room_id]
- self._dirty = True
- return item
- def prune(self, priority):
- """Removes a low priority item from the end of the queue.
- Heap should be rebuilt afterwards.
- Note that "low" priority is a slight misnomer, since the lowest
- *value* priorities are actually the "highest", most resistant to pruning.
- In general this method should be avoided, and rooms changed to wanted/unwanted
- instead.
- """
- with self._rlock:
- while self.queue:
- if self.queue[-1][2] is None:
- self.queue.pop(-1)
- elif self.queue[-1][2].priority > priority:
- self.remove(self.queue[-1][2])
- return True
- else:
- return False
- class WatchManager(object):
- def __init__(self, index: ShowroomIndex, settings: ShowroomSettings):
- """
-
- """
- # maintains a list?
- # does it still need a priority queue?
- # various permutations of the base list
- self.index = index
- self.client = ShowroomClient()
- self.settings = settings
- self.watchers = WatchQueue()
- self.completed = []
- self._threads = {}
- self._counter = itertools.count()
- self._undead_threads = Queue()
- # TODO: undead thread handler?
- self._completed_lock = threading.RLock()
- self.__schedule_time = datetime.datetime.fromtimestamp(0.0, tz=TOKYO_TZ)
- self.__lives_time = self.__schedule_time
- self.update_flag = threading.Event()
-
- self._next_maintenance = None
- self.schedule_next_maintenance()
- # TODO: design better organised configuration/settings
- if self.settings.feedback.write_schedules_to_file:
- self._schedule_update_thread = threading.Thread(target=self.write_schedules, name="ScheduleWriter")
- self._schedule_update_thread.daemon = True
- self._schedule_update_thread.start()
- self.__schedule_warned = False
- self.__onlives_warned = False
- def __len__(self):
- return len(self.watchers)
- def list_ids(self):
- """Returns a list of room ids in the child WatchQueue.
- List may become stale, check that item is still in the queue before
- operating on it."""
- return self.watchers.ids()
- # TODO: This is unneeded, I think, but I do need to reset other state overnight
- # And print a list of completed lives
- '''
- def reset_ticks(self):
- self._tick_count = 0
- '''
- # These three properties are not in use at the moment
- @property
- def output_dir(self):
- return self.settings.directory.output
-
- @property
- def max_watches(self):
- return self.settings.throttle.max.watches
-
- @property
- def max_downloads(self):
- return self.settings.throttle.max.downloads
- def _setup_thread(self, watcher):
- """
- Sets up, names, and starts a thread for the watcher.
- Args:
- A Watcher object ready to start.
- Returns:
- Nothing
- """
- if watcher.room_id in self._threads:
- if self._threads[watcher.room_id].is_alive():
- # TODO: handle this error
- pass
- thread_name = "Watcher-{count}-{name}".format(name=watcher.name, count=next(self._counter))
- t = threading.Thread(target=watcher.run, name=thread_name)
- t.start()
- self._threads[watcher.room_id] = t
- def update_lives(self):
- """Looks for unexpected live rooms."""
- try:
- onlives = self.client.onlives() or []
- except HTTPError as e:
- if not self.__onlives_warned:
- if e.response.status_code >= 400:
- core_logger.warn('Fetching onlives failed with error: {}'.format(e))
- else:
- # I don't think any of these would actually raise?
- core_logger.warn('Fetching onlives failed unexpectedly: {}'.format(e))
- self.__onlives_warned = True
- return
- self.__onlives_warned = False
- # temporary fix for getting multiple genres
- for livelist in onlives:
- if livelist['genre_id'] in GENRE_IDS:
- for item in [e for e in livelist['lives'] if 'room_id' in e and str(e['room_id']) in self.index]:
- room_id = str(item['room_id'])
- # TODO: incorporate live_id into watchers
- # either as '{room_id}_{live_id}' or as (room_id, live_id)
- # TODO: store room_id and live_id as integers instead of strings
- live_id = str(item['live_id'])
- start_time = datetime.datetime.fromtimestamp(float(item['started_at']), tz=TOKYO_TZ)
- # core_logger.debug('Checking live room id {}'.format(room_id))
- if room_id in self.watchers:
- if self.watchers[room_id].mode == "schedule":
- self.watchers[room_id].reschedule(start_time)
- self.watchers[room_id].set_watch_time(datetime.datetime.now(tz=TOKYO_TZ))
- core_logger.debug('Early live for {} at {}'.format(self.watchers[room_id].name,
- self.watchers[
- room_id].formatted_start_time))
- else:
- new = Watcher(self.index[room_id], self.client, self.settings,
- update_flag=self.update_flag, start_time=start_time)
- new.set_watch_time(datetime.datetime.now(tz=TOKYO_TZ))
- info = new.get_info()
- core_logger.debug(
- 'Unscheduled live for {} starting at {}'.format(info['name'], info['start_time']))
- self.add(new)
- # lives is a list of json objects (dicts)
- # representing items to include in the page
- # both rooms and groups
- # entries with:
- # "cell_type": 0
- # represent page formatting clues, e.g. the header for mainichi idol/onlives
- # other useful in each entry:
- # "started_at": 1484559975
- # "room_url_key": "48_YUNA_EGO"
- # "follower_num": 9690
- # "view_num": 9662
- # core_logger.debug('Checking idol lives')
- def update_schedule(self):
- """Checks the schedule and adds watchers for any new rooms found."""
- # TODO: get multiple genres
- try:
- upcoming = self.client.upcoming(genre_id=102) or []
- except HTTPError as e:
- if not self.__schedule_warned:
- if e.response.status_code >= 500:
- core_logger.warn('Fetching schedule failed temporarily: {}'.format(e))
- elif e.response.status_code >= 400:
- core_logger.warn('Fetching schedule failed permanently: {}'.format(e))
- else:
- # I don't think any of these would actually raise?
- core_logger.warn('Fetching onlives failed unexpectedly: {}'.format(e))
- self.__schedule_warned = True
- return
- self.__schedule_warned = False
- for item in [e for e in upcoming if str(e['room_id']) in self.index]:
- start_time = datetime.datetime.fromtimestamp(float(item['next_live_start_at']),
- tz=TOKYO_TZ)
- room_id = str(item['room_id'])
- if room_id in self.watchers:
- if (self.watchers[room_id].mode == 'schedule' and
- start_time != self.watchers[room_id].start_time):
- # Update start_time if still in schedule mode, otherwise
- # defer this until the room finishes its current live
- # This seems like way too many things to check tbh
- # but then the schedule won't be updated that often.
- # Will occasionally get false positives if the schedule is updated
- # right as the room goes live, but that's fine because
- # reschedule won't let start_time be changed if it's not still
- # in schedule mode.
- self.watchers[room_id].reschedule(start_time)
- core_logger.debug('{} rescheduled for {}'.format(self.watchers[room_id].name,
- self.watchers[room_id].formatted_start_time))
- else:
- new = Watcher(self.index[room_id], self.client, self.settings,
- update_flag=self.update_flag, start_time=start_time)
- core_logger.info('{} scheduled for {}'.format(new.name, new.formatted_start_time))
- self.add(new)
- def update_completed(self):
- for watch in self.watchers.get_by_mode("done"):
- with self._completed_lock:
- watch = self.watchers.dirty_pop(watch)
- self.completed.append(watch)
- try:
- thread = self._threads.pop(watch.room_id)
- except KeyError:
- # TODO: handle this error
- pass
- # else:
- # is this check necessary?
- # is it too fast?
- # if thread.is_alive():
- # TODO: log undead threads
- # TODO: handle undead threads elsewhere
- # self._undead_threads.put(thread)
- self.watchers.rebuild()
- def write_schedules(self):
- outfile = self.settings.file.schedule
- def lookup_mode(mode):
- return MODE_TO_STATUS[mode]
- # TODO: toggle this on/off
- while self.update_flag.wait():
- # TODO: allow exit
- # it's a daemon thread so it shouldn't matter
- watchers = self.get_working_list()
- # index_filter = self.index.filter_get_list()
- self.update_flag.clear()
- schedules = []
- for item in watchers:
- status = lookup_mode(item['mode'])
- new_schedule = OrderedDict([('name', item['name']),
- ('live', False if status not in ('live', 'downloading') else True),
- ('status', status),
- ('start_time', strftime(item['start_time'], FULL_DATE_FMT)),
- ('streaming_urls', (item['download']['streaming_urls'] or []).copy()),
- ('room', item['room'])])
- schedules.append(new_schedule)
- # TODO: add filter management to index
- # TODO: add a way to verify that the filters are set correctly
- # schedules should already be sorted
- core_logger.debug('Writing schedules to file')
- with open(outfile, 'w', encoding='utf8') as outfp:
- json.dump(schedules, outfp, ensure_ascii=False, indent=2)
- # even if the update_flag gets set again we don't want to spit out another update so fast
- # TODO: make the sleep time here configurable?
- time.sleep(4.0)
- def write_completed(self):
- """Called by the manager?"""
- # TODO: add today's date to the completed file, change it during nightly maintenance
- # dirty hack: no timezone, so we get the "correct" date even after midnight JST
- datestr = datetime.datetime.now().strftime(FULL_DATE_FMT)[:10]
- outfile = self.settings.file.completed.replace('.json', '_{}.json'.format(datestr))
- try:
- with open(outfile, 'r', encoding='utf8') as infp:
- completed = json.load(infp)
- except FileNotFoundError:
- completed = []
- except JSONDecodeError:
- # TODO: backups
- raise
- with self._completed_lock:
- for item in self.completed:
- info = item.get_info()
- for key in ('start_time', 'end_time'):
- info[key] = str(info[key])
- completed.append(info)
- self.completed = []
- with open(outfile, 'w', encoding='utf8') as outfp:
- json.dump(completed, outfp, indent=2, ensure_ascii=False)
- def schedule_next_maintenance(self, minutes=None):
- if minutes:
- maint_time = self._next_maintenance + datetime.timedelta(minutes=minutes)
- if not minutes or maint_time.hour > 5:
- maint_time = (datetime.datetime.now(tz=TOKYO_TZ) + datetime.timedelta(days=1)).replace(hour=0, minute=5, second=0, microsecond=0)
- self._next_maintenance = maint_time
- def add(self, watcher):
- if watcher.room_id not in self.watchers:
- # TODO: tell the logger about newly scheduled rooms
- self._setup_thread(watcher)
- self.watchers.add(watcher)
- def get_working_list(self):
- """Returns a list of all currently scheduled and live rooms"""
- # Watchers that aren't in one of these 4 modes shouldn't be in the WatchQueue any more.
- return sorted((e.get_info() for e in self.watchers.get_by_mode("working")),
- key=lambda x: (x['start_time'], x['room']['name']))
- @property
- def __schedule_rate(self):
- return self.settings.throttle.rate.upcoming
- @property
- def __lives_rate(self):
- return self.settings.throttle.rate.onlives
- def _schedule_ready(self):
- curr_time = datetime.datetime.now(tz=TOKYO_TZ)
- time_diff = (curr_time - self.__schedule_time).total_seconds()
- if time_diff > self.__schedule_rate:
- # core_logger.debug('Time difference of {} is greater than schedule rate of {}, '
- # 'beginning schedule check'.format(time_diff, self.__schedule_rate))
- self.__schedule_time = curr_time
- return True
- else:
- # core_logger.debug('Skipping schedule check')
- return False
- def _lives_ready(self):
- curr_time = datetime.datetime.now(tz=TOKYO_TZ)
- time_diff = (curr_time - self.__lives_time).total_seconds()
- if time_diff > self.__lives_rate:
- # core_logger.debug('Time difference of {} is greater than live rate of {}, '
- # 'beginning live check'.format(time_diff, self.__lives_rate))
- self.__lives_time = curr_time
- return True
- else:
- # core_logger.debug('Skipping live check')
- return False
- def _maintenance_ready(self):
- curr_time = datetime.datetime.now(tz=TOKYO_TZ)
- if self._next_maintenance < curr_time:
- if len(list(self.watchers.get_by_mode("live"))) < 1:
- return True
- else:
- # core_logger.debug('Live watcher prevents maintenance, rescheduling')
- # TODO: print live watchers that are preventing maintenance
- self.schedule_next_maintenance(30)
- return False
- def do_maintenance(self):
- self.write_completed()
- self.schedule_next_maintenance()
- def tick(self):
- """Periodic live and schedule check"""
- if self._lives_ready():
- # core_logger.debug('Checking lives')
- # completed is checked before lives so that a completed room
- # doesn't prevent a new live from being added
- self.update_completed()
- self.update_lives()
- if self._schedule_ready():
- # core_logger.debug('Checking schedule')
- self.update_schedule()
- # core_logger.debug('{} active watchers'.format(len(self.watchers)))
- if self._maintenance_ready():
- self.do_maintenance()
- def stop(self):
- for watch in self.watchers:
- watch.stop()
- while self.watchers:
- self.update_completed()
- time.sleep(0.5)
- # TODO: handle zombie threads/watchers
|