downloader.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478
  1. # Showroom Downloader
  2. import subprocess
  3. import threading
  4. import datetime
  5. import logging
  6. import time
  7. import os
  8. import shutil
  9. from .constants import TOKYO_TZ, FULL_DATE_FMT
  10. from .utils import format_name, strftime
  11. download_logger = logging.getLogger('showroom.downloader')
  12. class Downloader(object):
  13. """
  14. Handles downloads for a parent Watcher.
  15. Created with a room, a client, and an output directory. Started with start(),
  16. then call wait() to wait on the underlying Popen process. Wait will return when the
  17. download process ends, either because the stream has completed, because it timed out,
  18. or because it was terminated from outside the thread. On POSIX systems, a negative
  19. return code from wait() signals termination/timeout, however this is not portable.
  20. Regardless of why the download finished, Watcher still needs to check live status, so
  21. the only reason why termination vs. completion matters is potentially responding to
  22. repeated timeouts (e.g. like that time all the streams failed for 4 hours)
  23. Attributes:
  24. destdir: final destination for the download
  25. tempdir: temporary ("active") directory
  26. outfile: name of the file being written to
  27. all_files: list of files this downloader has written, eventually will be logged
  28. when the stream completes
  29. NOTE: all_files is the only attribute that has any reason to be public
  30. Properties:
  31. stream_data: stream data returned by showroom
  32. protocol: protocol in use, either rtmp or hls (use enum?)
  33. rtmp_url, lhls_url, hls_url: separate handles for rtmp and hls urls
  34. timed_out: whether the last wait() timed out
  35. Methods: (remove this before release)
  36. start
  37. wait
  38. get_info
  39. is_running -- whether the child process is running
  40. stop, kill -- usually called from outside the current thread
  41. update_stream_url -- internal use only
  42. move_to_dest -- internal use only
  43. switch_protocol -- don't change protocol, change downloaders
  44. TODO:
  45. Logging (e.g. "download has started" or let Watcher handle this)
  46. Fix ffmpeg logging on Windows without pulling in PATH
  47. DONE:
  48. For now, instead of the below, just use rtmp streams:
  49. Separate downloaders for rtmp and hls streams? That is, if one is failing
  50. instead of switching the protocol, have Watcher pop off the failing stream
  51. and make a new downloader, handing the failing downloader off to some
  52. cleanup thread via queue. Or can we handle all cleanup here?
  53. Add a wait() function that wraps the internal Popen process and checks for fail
  54. states without bothering the wrapping Watcher. Raise on failure?
  55. TESTING:
  56. hls recording fails awfully. find out why
  57. For the failure detection to work properly, must ffmpeg be compiled with librtmp? (yes)
  58. """
  59. def __init__(self, room, client, settings, default_protocol='rtmp'):
  60. self._room = room
  61. self._client = client
  62. self._rootdir = settings.directory.output
  63. self._logging = settings.ffmpeg.logging
  64. self._ffmpeg_path = settings.ffmpeg.path
  65. self._ffmpeg_container = settings.ffmpeg.container
  66. self.destdir, self.tempdir, self.outfile = "", "", ""
  67. self._protocol = default_protocol
  68. self._rtmp_url = ""
  69. self._hls_url = ""
  70. self._lhls_url = ""
  71. self._stream_data = []
  72. self._process = None
  73. # self._timeouts = 0
  74. # self._timed_out = False
  75. self._pingouts = 0
  76. self._lock = threading.Lock()
  77. # Index of dead processes, list of tuples
  78. # (outfile, destdir, tempdir, process)
  79. self._dead_files = []
  80. # keep a list of previous outfile names
  81. self.all_files = []
  82. @property
  83. def rtmp_url(self):
  84. return self._rtmp_url
  85. @property
  86. def hls_url(self):
  87. return self._hls_url
  88. @property
  89. def lhls_url(self):
  90. return self._lhls_url
  91. @property
  92. def stream_url(self):
  93. return getattr(self, '_{}_url'.format(self.protocol))
  94. @property
  95. def protocol(self):
  96. return self._protocol
  97. def get_info(self):
  98. with self._lock:
  99. return {"streaming_urls": self._stream_data,
  100. "protocol": self._protocol,
  101. "filename": self.outfile,
  102. "dest_dir": self.destdir,
  103. "active": self.is_running(),
  104. "timeouts": 0,
  105. "pingouts": self._pingouts,
  106. "completed_files": self.all_files.copy()}
  107. def is_running(self):
  108. """Checks if the child process is running."""
  109. if self._process:
  110. return self._process.poll() is None
  111. else:
  112. return False
  113. def switch_protocol(self):
  114. """Switches protocol between rtmp and hls."""
  115. with self._lock:
  116. if self.protocol == 'rtmp':
  117. self._protocol = 'hls'
  118. else:
  119. self._protocol = 'rtmp'
  120. def wait(self):
  121. """
  122. Waits for a download to finish.
  123. Returns:
  124. returncode of the child process, or None if a ping loop of death was detected.
  125. On POSIX systems, this will be a negative value if the process
  126. was terminated (e.g. by timeout) rather than exiting normally.
  127. Will wait progressively longer if the download keeps timing out.
  128. TODO:
  129. Detect ping loop of death ? Or is timeout sufficient?
  130. Check for other issues, e.g. black 540p
  131. Logging
  132. Reset _pingouts?
  133. I need to check for both pinging and a timeout
  134. Because the ping message comes from librtmp, and that might not be part
  135. of ffmpeg
  136. Check periodically that the stream is still live:
  137. I've had a couple zombie streams even with the new system
  138. (no ffmpeg logs, so no idea what happened)
  139. """
  140. num_pings = 0
  141. # Some streams seem to start fine with up to 4 pings before beginning download?
  142. # More investigation is needed
  143. max_pings = 1 + self._pingouts
  144. # timeout after 1 minute
  145. timeout = datetime.datetime.now() + datetime.timedelta(minutes=1)
  146. try:
  147. for line in self._process.stderr:
  148. # TODO: add mpegts or other variants depending on the container settings? or no?
  149. # if "Output #0, mp4" in line:
  150. if "Output #0" in line:
  151. self._process.communicate()
  152. self.move_to_dest()
  153. self._pingouts = 0
  154. break
  155. elif "HandleCtrl, Ping" in line:
  156. num_pings += 1
  157. if num_pings > max_pings:
  158. # The main issue with this is that the slain processes will not have their files moved
  159. # But I think this is preferable to the other solutions I've come up with.
  160. # For future reference, those were:
  161. #
  162. # 1) Sending SIGINT then continuing to read stderr until it exited (sometimes it doesn't)
  163. # 2) Sending SIGINT, storing a reference to the process, then restarting the download.
  164. # This prevents the process from being garbage collected until the Watcher is
  165. # 3) Sending SIGINT, then storing info about src and dest paths for the stopped download.
  166. # If a reference to the process is NOT stored, there's no way to be sure it has finished writing
  167. # (if it's writing at all). The only way was to give them a grace period and then just start
  168. # moving, but this adds undesirable time to the cleanup phase, when we may want to restart
  169. # a falsely completed Watcher asap.
  170. # 4) Just moving the file straightaway. This is obviously bad since ffmpeg takes a few moments to
  171. # finish.
  172. # NOTE: only option #1 was actually tried, the others were partially written before being
  173. # abandoned as their problems became clear
  174. #
  175. # Two additional options exist (not mutually exclusive):
  176. # 1) Passing the dead processes off to a queue and having another thread clean up.
  177. # 2) Having regular maintenance sweep the active folder and move files it can be sure are done
  178. # to their proper folders.
  179. #
  180. # I *probably* need to use 1) eventually, especially once I figure out how to actually end
  181. # stuck processes without killing the parent. But it requires a lot more code.
  182. # Until then let's just see how this works.
  183. #
  184. # When that time does come, a Downloader copy constructor may be useful.
  185. download_logger.debug("Download pinged {} times: Stopping".format(num_pings))
  186. self._pingouts += 1
  187. self.stop()
  188. # close stderr to force the loop to exit
  189. time.sleep(0.1)
  190. self._process.stderr.close()
  191. time.sleep(0.1)
  192. # process will be garbage collected when the next one is started, or the Watcher dies
  193. # self._process = None
  194. # This *should* work for newer builds of FFmpeg without librtmp.
  195. # Only question is whether 1 minute is too long (or too short).
  196. # UPDATE: Why doesn't this ever seem to work?
  197. # is it because FFmpeg freezes output and hangs now? so we're never getting another line to iterate over
  198. # elif datetime.datetime.now() > timeout:
  199. # download_logger.debug("Download of {} timed out".format(self.outfile))
  200. # self.stop()
  201. # time.sleep(0.1)
  202. # self._process.stderr.close()
  203. # time.sleep(0.1)
  204. else:
  205. time.sleep(0.2)
  206. except ValueError:
  207. download_logger.debug('ffmpeg stderr closed unexpectedly')
  208. # Is it possible for the process to end prematurely?
  209. return self._process.returncode
  210. def stop(self):
  211. """Stop an active download.
  212. Returns immediately, check is_running() for success.
  213. """
  214. # trying this instead of SIGTERM
  215. # http://stackoverflow.com/a/6659191/3380530
  216. # self._process.send_signal(SIGINT)
  217. # Or not. SIGINT doesn't exist on Windows
  218. self._process.terminate()
  219. def kill(self):
  220. """Kill an active download.
  221. Like stop, only tries to kill the process instead of just terminating it.
  222. Only use this as a last resort, as it will render any video unusable."""
  223. self._process.kill()
  224. def move_to_dest(self):
  225. """Moves output file to its final destination."""
  226. destpath = self._move_to_dest(self.outfile, self.tempdir, self.destdir)
  227. if destpath:
  228. self.all_files.append(destpath)
  229. download_logger.info('Completed {}'.format(destpath))
  230. with self._lock:
  231. self.outfile = ""
  232. @staticmethod
  233. def _move_to_dest(outfile, tempdir, destdir):
  234. srcpath = '{}/{}'.format(tempdir, outfile)
  235. destpath = '{}/{}'.format(destdir, outfile)
  236. download_logger.debug('File transfer: {} -> {}'.format(srcpath, destpath))
  237. if os.path.exists(destpath):
  238. raise FileExistsError
  239. else:
  240. try:
  241. shutil.move(srcpath, destpath)
  242. except FileNotFoundError:
  243. download_logger.debug('File not found: {} -> {}'.format(srcpath, destpath))
  244. return
  245. else:
  246. return destpath
  247. def update_streaming_url(self):
  248. data = self._client.streaming_url(self._room.room_id)
  249. self._stream_data = data
  250. download_logger.debug('{}'.format(self._stream_data))
  251. # TODO: it shouldn't still attempt to start up without a fresh url
  252. if not data:
  253. return
  254. rtmp_streams = []
  255. hls_streams = []
  256. lhls_streams = []
  257. # TODO: sort according to a priority list defined in config file
  258. # e.g. ('rtmp', 'lhls', 'hls'), or just "rtmp" (infer the others from defaults)
  259. #
  260. for stream in data:
  261. if stream['type'] == 'rtmp':
  262. rtmp_streams.append((int(stream['quality']), '/'.join((stream['url'], stream['stream_name']))))
  263. elif stream['type'] == 'hls':
  264. hls_streams.append((int(stream['quality']), stream['url']))
  265. elif stream['type'] == 'lhls':
  266. lhls_streams.append((int(stream['quality']), stream['url']))
  267. try:
  268. new_rtmp_url = sorted(rtmp_streams)[-1][1]
  269. except IndexError as e:
  270. # download_logger.warn("Caught IndexError while reading RTMP url: {}\n{}".format(e, data))
  271. new_rtmp_url = ""
  272. try:
  273. new_hls_url = sorted(hls_streams)[-1][1]
  274. except IndexError as e:
  275. # download_logger.warn("Caught IndexError while reading HLS url: {}\n{}".format(e, data))
  276. new_hls_url = ""
  277. try:
  278. new_lhls_url = sorted(lhls_streams)[-1][1]
  279. except IndexError as e:
  280. # download_logger.warn("Caught IndexError while reading HLS url: {}\n{}".format(e, data))
  281. new_lhls_url = ""
  282. with self._lock:
  283. self._rtmp_url = new_rtmp_url
  284. self._hls_url = new_hls_url
  285. self._lhls_url = new_lhls_url
  286. # def update_streaming_url_web(self):
  287. # """Updates streaming urls from the showroom website.
  288. # Fallback if api changes again
  289. # But pretty sure this doesn't work anymore
  290. # """
  291. # # TODO: add an endpoint for fetching the browser page
  292. # r = self._client._session.get(self._room.long_url)
  293. # if r.ok:
  294. # match = hls_url_re1.search(r.text)
  295. # # TODO: check if there was a match
  296. # if not match:
  297. # # no url found in the page
  298. # # probably the stream has ended but is_live returned true
  299. # # just don't update the urls
  300. # # except what happens if they are still "" ?
  301. # return
  302. # hls_url = match.group(0)
  303. # rtmps_url = match.group(1).replace('https', 'rtmps')
  304. # rtmp_url = "rtmp://{}.{}.{}.{}:1935/liveedge/{}".format(*match.groups()[1:])
  305. # with self._lock:
  306. # self._rtmp_url = rtmp_url
  307. # self._hls_url = hls_url
  308. # self._rtmps_url = rtmps_url
  309. # def update_streaming_url_old(self):
  310. # """Updates streaming urls from the showroom website."""
  311. # data = self.client.json('https://www.showroom-live.com/room/get_live_data',
  312. # params={'room_id': self._room.room_id},
  313. # headers={'Referer': self._room.long_url})
  314. # if not data:
  315. # pass # how to resolve this? can it even happen without throwing an exception earlier?
  316. #
  317. # # TODO: Check that strings aren't empty
  318. # stream_name = data['streaming_name_rtmp']
  319. # stream_url = data["streaming_url_rtmp"]
  320. # new_rtmp_url = '{}/{}'.format(stream_url, stream_name)
  321. # new_hls_url = data["streaming_url_hls"]
  322. #
  323. # with self._lock:
  324. # if new_rtmp_url != self.rtmp_url:
  325. # # TODO: log url change
  326. # # TODO: Trigger this message when the stream first goes live, from elsewhere
  327. # # print('Downloading {}\'s Showroom'.format(self.room.name))
  328. # # self.announce((self.web_url, self.stream_url))
  329. # pass
  330. #
  331. # if new_hls_url != self.hls_url:
  332. # # TODO: log url change
  333. # pass
  334. #
  335. # self._rtmp_url = new_rtmp_url
  336. # self._hls_url = new_hls_url
  337. def start(self):
  338. """
  339. Starts the download.
  340. Refreshes the streaming url, generates a new file name, and starts a new ffmpeg
  341. process.
  342. Returns:
  343. datetime object representing the time the download started
  344. """
  345. tokyo_time = datetime.datetime.now(tz=TOKYO_TZ)
  346. # TODO: Does this work on Windows now?
  347. env = os.environ.copy()
  348. # remove proxy information
  349. for key in ('http_proxy', 'https_proxy', 'HTTP_PROXY', 'HTTPS_PROXY'):
  350. env.pop(key, None)
  351. self.update_streaming_url()
  352. # TODO: rework this whole process to include lhls, and make it configurable
  353. # and less braindead
  354. if not self._protocol:
  355. self._protocol = 'rtmp'
  356. if not self._ffmpeg_container:
  357. self._ffmpeg_container = 'mp4'
  358. extra_args = []
  359. # Fall back to HLS if no RTMP stream available
  360. # Better to do this here or in update_streaming_url?
  361. # There's a possible race condition here, if some external thread modifies either of these
  362. if not self._rtmp_url and self._protocol == 'rtmp':
  363. download_logger.warn('Using HLS downloader for {}'.format(self._room.handle))
  364. self._protocol = 'hls'
  365. # extra_args = []
  366. # force using TS container with HLS
  367. # this is causing more problems than it solves
  368. # if self.protocol in ('hls', 'lhls'):
  369. # self._ffmpeg_container = 'ts'
  370. # 2020-01-10: those problems were preferrable to completely unwatchable streams
  371. if self.protocol in ('hls', 'lhls'):
  372. extra_args = ["-copyts"]
  373. if self._ffmpeg_container == 'mp4':
  374. extra_args.extend(["-bsf:a", "aac_adtstoasc"])
  375. # I don't think this is needed?
  376. # if self._ffmpeg_container == 'ts':
  377. # extra_args.extend(['-bsf:v', 'h264_mp4toannexb'])
  378. # elif self._ffmpeg_container != 'mp4':
  379. # # TODO: support additional container formats, e.g. FLV
  380. # self._ffmpeg_container = 'mp4'
  381. temp, dest, out = format_name(self._rootdir,
  382. strftime(tokyo_time, FULL_DATE_FMT),
  383. self._room, ext=self._ffmpeg_container)
  384. with self._lock:
  385. self.tempdir, self.destdir, self.outfile = temp, dest, out
  386. if self._logging is True:
  387. log_file = os.path.normpath('{}/logs/{}.log'.format(self.destdir, self.outfile))
  388. env.update({'FFREPORT': 'file={}:level=40'.format(log_file)})
  389. # level=48 is debug mode, with lots and lots of extra information
  390. # maybe too much
  391. normed_outpath = os.path.normpath('{}/{}'.format(self.tempdir, self.outfile))
  392. self._process = subprocess.Popen([
  393. self._ffmpeg_path,
  394. # '-nostdin',
  395. # '-nostats', # will this omit any useful information?
  396. '-loglevel', '40', # 40+ required for wait() to check output
  397. '-copytb', '1',
  398. '-i', self.stream_url,
  399. '-c', 'copy',
  400. *extra_args,
  401. normed_outpath
  402. ],
  403. stdin=subprocess.DEVNULL,
  404. stderr=subprocess.PIPE, # ffmpeg sends all output to stderr
  405. universal_newlines=True,
  406. bufsize=1,
  407. env=env)