core.py 39 KB


  1. #!/usr/bin/env python3
  2. """
  3. URLs of note
  4. https://www.showroom-live.com/room/is_live?room_id=61879
  5. {"ok": 0} == not live
  6. {"ok": 1} == live (may be other keys)
  7. !!!! As of 2017-10-23, this url no longer works.
  8. ~~~ https://www.showroom-live.com/room/get_live_data?room_id=61747
  9. https://www.showroom-live.com/event/akb48_sousenkyo_45th poll for time schedule
  10. https://www.showroom-live.com/event/all_rooms?event_id=1364
  11. List of all current live broadcasts w/ streaming links
  12. https://www.showroom-live.com/api/live/onlives
  13. All upcoming lives in the Idol genre
  14. https://www.showroom-live.com/api/live/upcoming?genre_id=102
  15. https://www.showroom-live.com/api/time_table/time_tables?order=asc&ended_at=1493621999&_=1492566692848
  16. Find the next live for a room
  17. https://www.showroom-live.com/api/room/next_live?room_id=61576
  18. Some basic info about a live broadcast
  19. https://www.showroom-live.com/api/live/live_info?room_id=44010
  20. {
  21. "age_verification_status": 0,
  22. "video_type": 0,
  23. "enquete_gift_num": 0,
  24. "is_enquete": false,
  25. "bcsvr_port": 8080,
  26. "live_type": 0,
  27. "is_free_gift_only": false,
  28. "bcsvr_host": "online.showroom-live.com",
  29. "live_id": 2741506,
  30. "is_enquete_result": false,
  31. "live_status": 2,
  32. "room_id": 44010,
  33. "bcsvr_key": "29d502:AJ4IiAqb",
  34. "background_image_url": null
  35. }
  36. https://www.showroom-live.com/api/live/live_info?room_id=75207
  37. {
  38. "age_verification_status": 0,
  39. "video_type": 0,
  40. "enquete_gift_num": 0,
  41. "is_enquete": false,
  42. "bcsvr_port": 8080,
  43. "live_type": 0,
  44. "is_free_gift_only": false,
  45. "bcsvr_host": "online.showroom-live.com",
  46. "live_id": 0,
  47. "is_enquete_result": false,
  48. "live_status": 1,
  49. "room_id": 75207,
  50. "bcsvr_key": "",
  51. "background_image_url": null
  52. }
  53. Video Banner (the strip of text across the top of the video)
  54. https://www.showroom-live.com/api/live/telop?room_id=61627
  55. Comment Log
  56. https://www.showroom-live.com/api/live/comment_log?room_id=61627
  57. Ranking Summary
  58. https://www.showroom-live.com/api/live/summary_ranking?room_id=61627
  59. NetworkInterface sources:
  60. http://stackoverflow.com/a/14671133/3380530
  61. KeyboardInterface sources:
  62. read loop http://stackoverflow.com/a/19655992/3380530
  63. Faster strptime
  64. http://ze.phyr.us/faster-strptime/
  65. I don't use strptime in this program, but it might be useful elsewhere
  66. Also I can probably do something similar for strftime
  67. Other observations:
  68. It appears that (dt-dt).total_seconds() calls are extremely fast.
  69. About 0.003 seconds per 10000 operations.
  70. What is peculiar is creating the timedelta object doesn't seem to take any time?
  71. Anyway, this means there's no reason not to move these calls to Watcher.check()
  72. Still need to run cProfile over a long run.
  73. A common feature of failed recordings is an overabundance of
  74. HandleCtrl, Ping <number>
  75. in the output log
  76. FFmpeg compile requirements:
  77. openssl
  78. librtmp
  79. libx264 (?)
  80. """
  81. # from sys import stdout, stdin, exit
  82. import datetime
  83. import itertools
  84. import json
  85. import logging
  86. import re
  87. import threading
  88. # import glob
  89. import time
  90. from collections import OrderedDict
  91. # import argparse
  92. from heapq import heapify, heappush, heappop
  93. from json.decoder import JSONDecodeError
  94. from queue import Queue # Empty as QueueEmpty
  95. from requests.exceptions import HTTPError
  96. from showroom.api import ShowroomClient
  97. from showroom.downloader import Downloader
  98. # from .message import ShowroomMessage
  99. # from .exceptions import ShowroomDownloadError
  100. from .comments import CommentLogger
  101. from .constants import TOKYO_TZ, HHMM_FMT, FULL_DATE_FMT, MODE_TO_STATUS
  102. from .index import ShowroomIndex, Room
  103. from .settings import ShowroomSettings
  104. from .utils import strftime
  105. # The times and dates reported on the website are screwy, but when fetched
  106. # through BeautifulSoup they *seem* to come in JST
  107. # If you're getting incorrect times you probably need to mess with
  108. # Schedule.convert_time()
  109. # Or add custom headers to the requests.get() call in Scheduler.tick()
  110. # TODO: create a separate loggers.py or something, set the levels there
  111. # and other important details, e.g. define custom levels for finer verbosity control
  112. core_logger = logging.getLogger('showroom.core')
  113. hls_url_re1 = re.compile(r'(https://edge-(\d*)-(\d*)-(\d*)-(\d*).showroom-live.com:443/liveedge/(\w*))/playlist.m3u8')
  114. # TODO: Make this a config file option
  115. STREAM_PREFERENCE = ("rtmp", "lhls", "hls")
  116. WATCHSECONDS = (600, 420, 360, 360, 300, 300, 240, 240, 180, 150)
  117. # TODO: handle genre/category by individual rooms
  118. # currently this checks the onlive list for each of Music, Idol, and Talent/Model
  119. # schedules are still Idol only
  120. GENRE_IDS = {101, 102, 103, 104, 105, 106, 107, 200}
  121. def watch_seconds(priority: int):
  122. """
  123. Translates priority to a watch duration in seconds.
  124. Looks up the priority in a tuple, returns number of seconds before
  125. start_time to begin watching a room with the given priority.
  126. Args:
  127. priority: An int representing the room's priority.
  128. Returns:
  129. Seconds as an int. For all priorities over 10, returns 120,
  130. else looks up priority in WATCHSECONDS.
  131. TODO:
  132. Make this a feature of Watcher objects, calculated on creation or
  133. updated when (watch) duration or (room) priority is updated.
  134. """
  135. if priority > len(WATCHSECONDS):
  136. return 120
  137. elif priority < 0:
  138. return 600
  139. else:
  140. return WATCHSECONDS[priority-1]
  141. class Watcher(object):
  142. """Manages downloads for a single room/stream.
  143. TODO:
  144. docstrings
  145. logging
  146. flow analysis for run()
  147. review end states
  148. option to download all streams but only keep wanted
  149. instead of default of only downloading wanted
  150. """
  151. def __init__(self, room: Room, client: ShowroomClient, settings: ShowroomSettings,
  152. update_flag: threading.Event=None, start_time: datetime.datetime=None,
  153. watch_duration: int=None):
  154. self._lock = threading.RLock()
  155. if update_flag:
  156. self._update_flag = update_flag
  157. else:
  158. self._update_flag = threading.Event()
  159. self._room = room
  160. self._client = client
  161. self._settings = settings
  162. self._download = Downloader(room, client, settings)
  163. if self._settings.comments.record and self.priority < self._settings.comments.max_priority:
  164. self.comment_logger = CommentLogger(self.room, self._client, self._settings, self)
  165. else:
  166. self.comment_logger = None
  167. # originally start_time was the time the stream began recording
  168. # now however i'm using the start_time according to Showroom
  169. # so Watcher is always created with a start_time
  170. if start_time:
  171. self.__start_time = start_time
  172. else:
  173. self.__start_time = datetime.datetime.now(tz=TOKYO_TZ)
  174. self._end_time = None
  175. self._watch_duration = watch_duration
  176. self._watch_start_time = self._watch_end_time = None
  177. self.set_watch_time(self.__start_time, self.watch_duration)
  178. self._live = False
  179. self.__live_time = datetime.datetime.fromtimestamp(0.0, tz=TOKYO_TZ)
  180. self.__mode = "schedule"
  181. # mainly used by hacked together priority heapq
  182. def __bool__(self):
  183. return bool(self._room)
  184. # access to internal objects
  185. @property
  186. def room(self):
  187. return self._room
  188. @property
  189. def download(self):
  190. return self._download
  191. # informational properties
  192. @property
  193. def name(self):
  194. return self._room.name
  195. @property
  196. def web_url(self):
  197. return self.room.long_url
  198. @property
  199. def room_id(self):
  200. return self._room.room_id
  201. @property
  202. def priority(self):
  203. return self._room['priority']
  204. @property
  205. def watch_duration(self):
  206. """
  207. Time in seconds to start watching a room ahead of when it is scheduled to go live.
  208. Will keep watching for watch_duration*2 after scheduled start_time
  209. """
  210. if self._watch_duration:
  211. return self._watch_duration
  212. else:
  213. return watch_seconds(self.priority)
  214. @property
  215. def mode(self):
  216. return self.__mode
  217. @property
  218. def formatted_start_time(self):
  219. return strftime(self.__start_time, HHMM_FMT)
  220. @property
  221. def start_time(self):
  222. return self.__start_time
  223. # internal properties
  224. @property
  225. def _start_time(self):
  226. return self.__start_time
  227. @_start_time.setter
  228. def _start_time(self, new_time):
  229. with self._lock:
  230. self.__start_time = new_time
  231. @property
  232. def _mode(self):
  233. return self.__mode
  234. @_mode.setter
  235. def _mode(self, new_mode):
  236. with self._lock:
  237. self.__mode = new_mode
  238. @property
  239. def __watch_rate(self):
  240. return self._settings.throttle.rate.watch
  241. @property
  242. def __live_rate(self):
  243. return self._settings.throttle.rate.live
  244. @property
  245. def __download_timeout(self):
  246. return self._settings.throttle.timeout.downloads
  247. def get_info(self):
  248. """Returns a dictionary describing the Watcher's state.
  249. Also returns info for child Downloader and Room objects."""
  250. with self._lock:
  251. room_info = self.room.get_info()
  252. return {
  253. # TODO: fix the write_completed method below to handle datetime
  254. "start_time": self._start_time,
  255. "end_time": self._end_time,
  256. "live": self.is_live(),
  257. "mode": self._mode,
  258. # this is kinda hokey, but it's needed often enough so...
  259. "name": room_info['name'],
  260. "room": room_info,
  261. "download": self.download.get_info()}
  262. # TODO: review uses and functionality of these two methods
  263. def reschedule(self, new_time):
  264. with self._lock:
  265. if self._mode == "schedule":
  266. self._start_time = new_time
  267. self.set_watch_time(new_time)
  268. self._update_flag.set()
  269. def set_watch_time(self, watch_time, watch_duration: int=None):
  270. with self._lock:
  271. if watch_duration is None:
  272. watch_duration = self.watch_duration
  273. self._watch_start_time = watch_time - datetime.timedelta(seconds=watch_duration)
  274. self._watch_end_time = watch_time + datetime.timedelta(seconds=watch_duration*2.0)
  275. def is_live(self):
  276. """Returns whether the stream is live or not.
  277. May be stale"""
  278. return self._live
  279. def _watch_ready(self):
  280. # start watch_seconds before start_time
  281. # finish watch_seconds * 2 after start_time
  282. curr_time = datetime.datetime.now(tz=TOKYO_TZ)
  283. # TODO: is this noticeably slower than the old (int > (curr - start).totalseconds() > int)
  284. if (self._watch_start_time
  285. < curr_time
  286. < self._watch_end_time):
  287. return True
  288. else:
  289. return False
  290. def _live_ready(self):
  291. curr_time = datetime.datetime.now(tz=TOKYO_TZ)
  292. if (curr_time - self.__live_time).total_seconds() > self.__live_rate:
  293. self.__live_time = curr_time
  294. return True
  295. else:
  296. return False
  297. def check_live_status(self):
  298. """Checks if the stream is live or not.
  299. This actually checks the website"""
  300. try:
  301. self._live = self._client.is_live(self.room_id)
  302. except HTTPError as e:
  303. core_logger.warn('Caught HTTPError while checking room\'s live status: {}'.format(e))
  304. self._live = False
  305. return self._live
  306. def stop(self):
  307. self._mode = "quitting"
  308. if self._download.is_running():
  309. self._download.stop()
  310. self.comment_logger.quit()
  311. def kill(self):
  312. with self._lock:
  313. if self._mode == "quitting" and self.download.is_running():
  314. self._download.kill()
  315. def run(self):
  316. """
  317. Watcher flow:
  318. Case 1: Scheduled Live
  319. 1) New Watcher is created at step "schedule" and started
  320. 2) Enter schedule loop:
  321. Check if curr_time is close enough to start_time to begin watching.
  322. Manager may update start_time if the schedule changes.
  323. If curr_time is close enough, change mode to watch
  324. Else, sleep for a short period of time
  325. 3) Enter watch loop:
  326. Check if stream is live
  327. If live, update start_time and check room.is_wanted()
  328. WARNING: ensure Watcher and Manager don't overwrite each other's start_times
  329. Manager should only update start_time if in schedule mode
  330. e.g. use a reschedule() method that locks mode until updated? I don't think that's sufficient
  331. Watcher should only update start_time if in watch mode
  332. If room.is_wanted()
  333. 3) When the room goes live, Watcher starts the download and switches to "download"
  334. 4) When the stream ends, Watcher completes the download and switches to "complete"
  335. and the thread ends (returns a completed watcher?)
  336. FLOW: schedule -> watch -> download -> completed
  337. Returns:
  338. Nothing
  339. """
  340. self._update_flag.set()
  341. # core_logger.debug('Entering {} mode for {}'.format(self.mode, self.name))
  342. while self._mode == "schedule":
  343. if self._watch_ready():
  344. core_logger.info('Watching {}'.format(self.name))
  345. self._mode = "watch"
  346. else:
  347. time.sleep(1.0)
  348. # core_logger.debug('Entering {} mode for {}'.format(self.mode, self.name))
  349. while self._mode == "watch":
  350. if self._watch_ready():
  351. if self.check_live_status():
  352. self._start_time = datetime.datetime.now(tz=TOKYO_TZ)
  353. core_logger.info('{} is now live'.format(self.name))
  354. if self.room.is_wanted():
  355. self._mode = "download"
  356. else:
  357. self.download.update_streaming_url()
  358. self._mode = "live"
  359. else:
  360. # This is okay as long as watch rate is a short period of time
  361. time.sleep(self.__watch_rate)
  362. else:
  363. self._mode = "expired"
  364. if self.mode in ("live", "download"):
  365. self._update_flag.set()
  366. if self.comment_logger:
  367. self.comment_logger.start()
  368. # core_logger.debug('Entering {} mode for {}'.format(self.mode, self.name))
  369. while self._mode in ("live", "download"):
  370. # These are together so that users can toggle
  371. # "wanted" status and switch between them, though it would almost be better
  372. # if we just automatically recorded everything and discarded unwanted files...
  373. # except when stuff like New Year's happens.
  374. # TODO: add an optional flag (to settings) that does exactly that
  375. while self._mode == "live":
  376. if self._live_ready():
  377. # TODO: periodically update the streaming urls
  378. if self.check_live_status():
  379. if self.room.is_wanted():
  380. self._mode = "download"
  381. else:
  382. self._end_time = datetime.datetime.now(tz=TOKYO_TZ)
  383. self._mode = "completed"
  384. time.sleep(1.0)
  385. while self._mode == "download":
  386. # this happens at the top here so that changing mode to "quitting"
  387. # will cause the loop to break before the download is resumed
  388. # check_live_status was moved to the end to avoid
  389. # pinging the site twice whenever a download starts
  390. if self.is_live():
  391. if self.room.is_wanted():
  392. self.download.start()
  393. else:
  394. self._mode = "live"
  395. else:
  396. self._end_time = datetime.datetime.now(tz=TOKYO_TZ)
  397. self._mode = 'completed'
  398. # self.download.wait(timeout=self.__download_timeout)
  399. self.download.wait()
  400. time.sleep(0.5)
  401. self.check_live_status()
  402. # core_logger.debug('Entering {} mode for {}'.format(self.mode, self.name))
  403. # TODO: decide what to do with the three end states
  404. if self._mode == "quitting":
  405. # what is this mode? it's presumably a way to break out of the download loop
  406. # to quit:
  407. # change mode to quitting
  408. # and call stop() on the downloader
  409. # actually we should never make it to this block while the download is active, right?
  410. # unless it times out. but if it times out and the process doesn't end, wait() will
  411. # never return, so again we'll never reach this block. Thus, all I need to do is...
  412. self._mode = "completed"
  413. self._update_flag.set()
  414. if self._mode == "expired":
  415. # download never started
  416. # watcher needs to vacate the premises as fast as possible
  417. # how it does that I'm not really sure... it depends what WatchManager is doing
  418. # If watchqueue has a window that returns all expired watchers, that would work
  419. return
  420. elif self._mode == "completed":
  421. # download started and finished, and has already been moved to dest
  422. return
  423. class WatchQueue(object):
  424. """Priority heap queue that also permits iteration.
  425. TODO:
  426. Review need for this object
  427. Write a proper docstring
  428. Decide how to quit
  429. Deal with downloads and lives in remove/prune methods.
  430. Review need for prune
  431. Review need for list_info
  432. """
  433. REMOVED = None
  434. MODE_GROUPS = {"upcoming": ("schedule",),
  435. "working": ("schedule", "watch", "download", "live"),
  436. "active": ("watch", "download", "live"),
  437. "live": ("live", "download"),
  438. "ending": ("quitting",),
  439. "done": ("expired", "completed")}
  440. def __init__(self):
  441. self.queue = []
  442. self.entry_map = {}
  443. self._counter = itertools.count()
  444. self._dirty = False
  445. self._rlock = threading.RLock()
  446. def __len__(self):
  447. return len(self.entry_map)
  448. def __iter__(self):
  449. with self._rlock:
  450. _index = 0
  451. while _index < len(self.queue):
  452. val = self.queue[_index][2]
  453. if val is not None:
  454. yield val
  455. _index += 1
  456. def __bool__(self):
  457. return len(self.entry_map) > 0
  458. def __getitem__(self, key):
  459. return self.entry_map[key][2]
  460. def __contains__(self, room_id):
  461. with self._rlock:
  462. if room_id in self.entry_map:
  463. return True
  464. else:
  465. return False
  466. # This looks nicer than ~7 different methods, but is it clearer?
  467. def get_by_mode(self, mode):
  468. """Returns an iterator through all Watchers with the given mode or mode group.
  469. Modes:
  470. schedule
  471. watch
  472. live (both live and download)
  473. download
  474. quitting (or ending)
  475. expired
  476. completed
  477. Groups:
  478. upcoming: schedule
  479. live: live, download
  480. working: schedule, watch, live, download, quitting
  481. active: watch, live, download
  482. done: expired, completed
  483. """
  484. if mode in self.MODE_GROUPS:
  485. mode = self.MODE_GROUPS[mode]
  486. else:
  487. mode = (mode,)
  488. with self._rlock:
  489. yield from (i for i in self if i.mode in mode)
  490. def ids(self):
  491. """Returns all room ids in the queue.
  492. Of debatable utility."""
  493. with self._rlock:
  494. return self.entry_map.copy().keys()
  495. def add(self, item):
  496. """Adds an item to the queue.
  497. Preserves heap invariant."""
  498. if item.room_id in self.entry_map:
  499. return False # do nothing
  500. else:
  501. with self._rlock:
  502. count = next(self._counter)
  503. entry = [item.priority, count, item]
  504. self.entry_map[item.room_id] = entry
  505. heappush(self.queue, entry)
  506. return True
  507. def pop(self):
  508. """Pops the item at the front of the queue.
  509. Preserves heap invariant."""
  510. with self._rlock:
  511. while self.queue:
  512. priority, count, item = heappop(self.queue)
  513. if item is not self.REMOVED:
  514. del self.entry_map[item.room_id]
  515. return item
  516. def replace(self, item):
  517. """Places an item on the queue and pops another from the front of the queue.
  518. Preserves heap invariant.
  519. Args:
  520. Item to place on the queue.
  521. Returns:
  522. Item formerly at the front of the queue.
  523. Issues:
  524. Unneeded.
  525. """
  526. with self._rlock:
  527. if item.room_id in self.entry_map:
  528. return self.pop()
  529. else:
  530. count = next(self._counter)
  531. entry = [item.priority, count, item]
  532. self.entry_map[item.room_id] = entry
  533. heappush(self.queue, entry)
  534. return self.pop()
  535. def peek(self):
  536. """Peeks at the front of the queue without popping.
  537. May reentrant lock while rebuilding the queue."""
  538. if len(self) > 0:
  539. result = self.queue[0][2]
  540. if not result:
  541. self.rebuild()
  542. result = self.queue[0][2]
  543. return result
  544. else:
  545. return None
  546. def rebuild(self):
  547. """Rebuilds the queue, removing dead items left by dirty operations.
  548. Dirty operations:
  549. remove
  550. dirty_pop
  551. pop_end
  552. prune
  553. """
  554. if self._dirty:
  555. with self._rlock:
  556. self.queue = [e for e in self.queue if e[2] != self.REMOVED]
  557. heapify(self.queue)
  558. def remove(self, item):
  559. """Removes item from queue.
  560. Heap should be rebuilt afterwards."""
  561. with self._rlock:
  562. entry = self.entry_map.pop(item.room_id)
  563. entry[2] = self.REMOVED
  564. self._dirty = True
  565. def dirty_pop(self, item):
  566. """Pops a specific item from anywhere in the queue.
  567. Heap should be rebuilt afterwards."""
  568. with self._rlock:
  569. entry = self.entry_map.pop(item.room_id)
  570. if entry:
  571. result = entry[2]
  572. entry[2] = self.REMOVED
  573. self._dirty = True
  574. return result
  575. else:
  576. return None
  577. def pop_end(self):
  578. """Removes the last item from the queue.
  579. Heap should be rebuilt afterwards."""
  580. with self._rlock:
  581. while self.queue:
  582. priority, count, item = self.queue.pop(-1)
  583. if item is not self.REMOVED:
  584. del self.entry_map[item.room_id]
  585. self._dirty = True
  586. return item
  587. def prune(self, priority):
  588. """Removes a low priority item from the end of the queue.
  589. Heap should be rebuilt afterwards.
  590. Note that "low" priority is a slight misnomer, since the lowest
  591. *value* priorities are actually the "highest", most resistant to pruning.
  592. In general this method should be avoided, and rooms changed to wanted/unwanted
  593. instead.
  594. """
  595. with self._rlock:
  596. while self.queue:
  597. if self.queue[-1][2] is None:
  598. self.queue.pop(-1)
  599. elif self.queue[-1][2].priority > priority:
  600. self.remove(self.queue[-1][2])
  601. return True
  602. else:
  603. return False
  604. class WatchManager(object):
  605. def __init__(self, index: ShowroomIndex, settings: ShowroomSettings):
  606. """
  607. """
  608. # maintains a list?
  609. # does it still need a priority queue?
  610. # various permutations of the base list
  611. self.index = index
  612. self.client = ShowroomClient()
  613. self.settings = settings
  614. self.watchers = WatchQueue()
  615. self.completed = []
  616. self._threads = {}
  617. self._counter = itertools.count()
  618. self._undead_threads = Queue()
  619. # TODO: undead thread handler?
  620. self._completed_lock = threading.RLock()
  621. self.__schedule_time = datetime.datetime.fromtimestamp(0.0, tz=TOKYO_TZ)
  622. self.__lives_time = self.__schedule_time
  623. self.update_flag = threading.Event()
  624. self._next_maintenance = None
  625. self.schedule_next_maintenance()
  626. # TODO: design better organised configuration/settings
  627. if self.settings.feedback.write_schedules_to_file:
  628. self._schedule_update_thread = threading.Thread(target=self.write_schedules, name="ScheduleWriter")
  629. self._schedule_update_thread.daemon = True
  630. self._schedule_update_thread.start()
  631. self.__schedule_warned = False
  632. self.__onlives_warned = False
  633. def __len__(self):
  634. return len(self.watchers)
  635. def list_ids(self):
  636. """Returns a list of room ids in the child WatchQueue.
  637. List may become stale, check that item is still in the queue before
  638. operating on it."""
  639. return self.watchers.ids()
  640. # TODO: This is unneeded, I think, but I do need to reset other state overnight
  641. # And print a list of completed lives
  642. '''
  643. def reset_ticks(self):
  644. self._tick_count = 0
  645. '''
  646. # These three properties are not in use at the moment
  647. @property
  648. def output_dir(self):
  649. return self.settings.directory.output
  650. @property
  651. def max_watches(self):
  652. return self.settings.throttle.max.watches
  653. @property
  654. def max_downloads(self):
  655. return self.settings.throttle.max.downloads
  656. def _setup_thread(self, watcher):
  657. """
  658. Sets up, names, and starts a thread for the watcher.
  659. Args:
  660. A Watcher object ready to start.
  661. Returns:
  662. Nothing
  663. """
  664. if watcher.room_id in self._threads:
  665. if self._threads[watcher.room_id].is_alive():
  666. # TODO: handle this error
  667. pass
  668. thread_name = "Watcher-{count}-{name}".format(name=watcher.name, count=next(self._counter))
  669. t = threading.Thread(target=watcher.run, name=thread_name)
  670. t.start()
  671. self._threads[watcher.room_id] = t
  672. def update_lives(self):
  673. """Looks for unexpected live rooms."""
  674. try:
  675. onlives = self.client.onlives() or []
  676. except HTTPError as e:
  677. if not self.__onlives_warned:
  678. if e.response.status_code >= 400:
  679. core_logger.warn('Fetching onlives failed with error: {}'.format(e))
  680. else:
  681. # I don't think any of these would actually raise?
  682. core_logger.warn('Fetching onlives failed unexpectedly: {}'.format(e))
  683. self.__onlives_warned = True
  684. return
  685. self.__onlives_warned = False
  686. # temporary fix for getting multiple genres
  687. for livelist in onlives:
  688. if livelist['genre_id'] in GENRE_IDS:
  689. for item in [e for e in livelist['lives'] if 'room_id' in e and str(e['room_id']) in self.index]:
  690. room_id = str(item['room_id'])
  691. # TODO: incorporate live_id into watchers
  692. # either as '{room_id}_{live_id}' or as (room_id, live_id)
  693. # TODO: store room_id and live_id as integers instead of strings
  694. live_id = str(item['live_id'])
  695. start_time = datetime.datetime.fromtimestamp(float(item['started_at']), tz=TOKYO_TZ)
  696. # core_logger.debug('Checking live room id {}'.format(room_id))
  697. if room_id in self.watchers:
  698. if self.watchers[room_id].mode == "schedule":
  699. self.watchers[room_id].reschedule(start_time)
  700. self.watchers[room_id].set_watch_time(datetime.datetime.now(tz=TOKYO_TZ))
  701. core_logger.debug('Early live for {} at {}'.format(self.watchers[room_id].name,
  702. self.watchers[
  703. room_id].formatted_start_time))
  704. else:
  705. new = Watcher(self.index[room_id], self.client, self.settings,
  706. update_flag=self.update_flag, start_time=start_time)
  707. new.set_watch_time(datetime.datetime.now(tz=TOKYO_TZ))
  708. info = new.get_info()
  709. core_logger.debug(
  710. 'Unscheduled live for {} starting at {}'.format(info['name'], info['start_time']))
  711. self.add(new)
  712. # lives is a list of json objects (dicts)
  713. # representing items to include in the page
  714. # both rooms and groups
  715. # entries with:
  716. # "cell_type": 0
  717. # represent page formatting clues, e.g. the header for mainichi idol/onlives
  718. # other useful in each entry:
  719. # "started_at": 1484559975
  720. # "room_url_key": "48_YUNA_EGO"
  721. # "follower_num": 9690
  722. # "view_num": 9662
  723. # core_logger.debug('Checking idol lives')
  724. def update_schedule(self):
  725. """Checks the schedule and adds watchers for any new rooms found."""
  726. # TODO: get multiple genres
  727. try:
  728. upcoming = self.client.upcoming(genre_id=102) or []
  729. except HTTPError as e:
  730. if not self.__schedule_warned:
  731. if e.response.status_code >= 500:
  732. core_logger.warn('Fetching schedule failed temporarily: {}'.format(e))
  733. elif e.response.status_code >= 400:
  734. core_logger.warn('Fetching schedule failed permanently: {}'.format(e))
  735. else:
  736. # I don't think any of these would actually raise?
  737. core_logger.warn('Fetching onlives failed unexpectedly: {}'.format(e))
  738. self.__schedule_warned = True
  739. return
  740. self.__schedule_warned = False
  741. for item in [e for e in upcoming if str(e['room_id']) in self.index]:
  742. start_time = datetime.datetime.fromtimestamp(float(item['next_live_start_at']),
  743. tz=TOKYO_TZ)
  744. room_id = str(item['room_id'])
  745. if room_id in self.watchers:
  746. if (self.watchers[room_id].mode == 'schedule' and
  747. start_time != self.watchers[room_id].start_time):
  748. # Update start_time if still in schedule mode, otherwise
  749. # defer this until the room finishes its current live
  750. # This seems like way too many things to check tbh
  751. # but then the schedule won't be updated that often.
  752. # Will occasionally get false positives if the schedule is updated
  753. # right as the room goes live, but that's fine because
  754. # reschedule won't let start_time be changed if it's not still
  755. # in schedule mode.
  756. self.watchers[room_id].reschedule(start_time)
  757. core_logger.debug('{} rescheduled for {}'.format(self.watchers[room_id].name,
  758. self.watchers[room_id].formatted_start_time))
  759. else:
  760. new = Watcher(self.index[room_id], self.client, self.settings,
  761. update_flag=self.update_flag, start_time=start_time)
  762. core_logger.info('{} scheduled for {}'.format(new.name, new.formatted_start_time))
  763. self.add(new)
  764. def update_completed(self):
  765. for watch in self.watchers.get_by_mode("done"):
  766. with self._completed_lock:
  767. watch = self.watchers.dirty_pop(watch)
  768. self.completed.append(watch)
  769. try:
  770. thread = self._threads.pop(watch.room_id)
  771. except KeyError:
  772. # TODO: handle this error
  773. pass
  774. # else:
  775. # is this check necessary?
  776. # is it too fast?
  777. # if thread.is_alive():
  778. # TODO: log undead threads
  779. # TODO: handle undead threads elsewhere
  780. # self._undead_threads.put(thread)
  781. self.watchers.rebuild()
  782. def write_schedules(self):
  783. outfile = self.settings.file.schedule
  784. def lookup_mode(mode):
  785. return MODE_TO_STATUS[mode]
  786. # TODO: toggle this on/off
  787. while self.update_flag.wait():
  788. # TODO: allow exit
  789. # it's a daemon thread so it shouldn't matter
  790. watchers = self.get_working_list()
  791. # index_filter = self.index.filter_get_list()
  792. self.update_flag.clear()
  793. schedules = []
  794. for item in watchers:
  795. status = lookup_mode(item['mode'])
  796. new_schedule = OrderedDict([('name', item['name']),
  797. ('live', False if status not in ('live', 'downloading') else True),
  798. ('status', status),
  799. ('start_time', strftime(item['start_time'], FULL_DATE_FMT)),
  800. ('streaming_urls', (item['download']['streaming_urls'] or []).copy()),
  801. ('room', item['room'])])
  802. schedules.append(new_schedule)
  803. # TODO: add filter management to index
  804. # TODO: add a way to verify that the filters are set correctly
  805. # schedules should already be sorted
  806. core_logger.debug('Writing schedules to file')
  807. with open(outfile, 'w', encoding='utf8') as outfp:
  808. json.dump(schedules, outfp, ensure_ascii=False, indent=2)
  809. # even if the update_flag gets set again we don't want to spit out another update so fast
  810. # TODO: make the sleep time here configurable?
  811. time.sleep(4.0)
  812. def write_completed(self):
  813. """Called by the manager?"""
  814. # TODO: add today's date to the completed file, change it during nightly maintenance
  815. # dirty hack: no timezone, so we get the "correct" date even after midnight JST
  816. datestr = datetime.datetime.now().strftime(FULL_DATE_FMT)[:10]
  817. outfile = self.settings.file.completed.replace('.json', '_{}.json'.format(datestr))
  818. try:
  819. with open(outfile, 'r', encoding='utf8') as infp:
  820. completed = json.load(infp)
  821. except FileNotFoundError:
  822. completed = []
  823. except JSONDecodeError:
  824. # TODO: backups
  825. raise
  826. with self._completed_lock:
  827. for item in self.completed:
  828. info = item.get_info()
  829. for key in ('start_time', 'end_time'):
  830. info[key] = str(info[key])
  831. completed.append(info)
  832. self.completed = []
  833. with open(outfile, 'w', encoding='utf8') as outfp:
  834. json.dump(completed, outfp, indent=2, ensure_ascii=False)
  835. def schedule_next_maintenance(self, minutes=None):
  836. if minutes:
  837. maint_time = self._next_maintenance + datetime.timedelta(minutes=minutes)
  838. if not minutes or maint_time.hour > 5:
  839. maint_time = (datetime.datetime.now(tz=TOKYO_TZ) + datetime.timedelta(days=1)).replace(hour=0, minute=5, second=0, microsecond=0)
  840. self._next_maintenance = maint_time
  841. def add(self, watcher):
  842. if watcher.room_id not in self.watchers:
  843. # TODO: tell the logger about newly scheduled rooms
  844. self._setup_thread(watcher)
  845. self.watchers.add(watcher)
  846. def get_working_list(self):
  847. """Returns a list of all currently scheduled and live rooms"""
  848. # Watchers that aren't in one of these 4 modes shouldn't be in the WatchQueue any more.
  849. return sorted((e.get_info() for e in self.watchers.get_by_mode("working")),
  850. key=lambda x: (x['start_time'], x['room']['name']))
  851. @property
  852. def __schedule_rate(self):
  853. return self.settings.throttle.rate.upcoming
  854. @property
  855. def __lives_rate(self):
  856. return self.settings.throttle.rate.onlives
  857. def _schedule_ready(self):
  858. curr_time = datetime.datetime.now(tz=TOKYO_TZ)
  859. time_diff = (curr_time - self.__schedule_time).total_seconds()
  860. if time_diff > self.__schedule_rate:
  861. # core_logger.debug('Time difference of {} is greater than schedule rate of {}, '
  862. # 'beginning schedule check'.format(time_diff, self.__schedule_rate))
  863. self.__schedule_time = curr_time
  864. return True
  865. else:
  866. # core_logger.debug('Skipping schedule check')
  867. return False
  868. def _lives_ready(self):
  869. curr_time = datetime.datetime.now(tz=TOKYO_TZ)
  870. time_diff = (curr_time - self.__lives_time).total_seconds()
  871. if time_diff > self.__lives_rate:
  872. # core_logger.debug('Time difference of {} is greater than live rate of {}, '
  873. # 'beginning live check'.format(time_diff, self.__lives_rate))
  874. self.__lives_time = curr_time
  875. return True
  876. else:
  877. # core_logger.debug('Skipping live check')
  878. return False
  879. def _maintenance_ready(self):
  880. curr_time = datetime.datetime.now(tz=TOKYO_TZ)
  881. if self._next_maintenance < curr_time:
  882. if len(list(self.watchers.get_by_mode("live"))) < 1:
  883. return True
  884. else:
  885. # core_logger.debug('Live watcher prevents maintenance, rescheduling')
  886. # TODO: print live watchers that are preventing maintenance
  887. self.schedule_next_maintenance(30)
  888. return False
  889. def do_maintenance(self):
  890. self.write_completed()
  891. self.schedule_next_maintenance()
  892. def tick(self):
  893. """Periodic live and schedule check"""
  894. if self._lives_ready():
  895. # core_logger.debug('Checking lives')
  896. # completed is checked before lives so that a completed room
  897. # doesn't prevent a new live from being added
  898. self.update_completed()
  899. self.update_lives()
  900. if self._schedule_ready():
  901. # core_logger.debug('Checking schedule')
  902. self.update_schedule()
  903. # core_logger.debug('{} active watchers'.format(len(self.watchers)))
  904. if self._maintenance_ready():
  905. self.do_maintenance()
  906. def stop(self):
  907. for watch in self.watchers:
  908. watch.stop()
  909. while self.watchers:
  910. self.update_completed()
  911. time.sleep(0.5)
  912. # TODO: handle zombie threads/watchers