record_upload_manager-backup.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273
  1. import asyncio
  2. import os.path
  3. import sys
  4. import threading
  5. import time
  6. import traceback
  7. from queue import Queue
  8. from string import Template
  9. import dateutil.parser
  10. import yaml
  11. from bilibili_api import video
  12. from comment_task import CommentTask
  13. from recorder_config import RecorderConfig, UploaderAccount
  14. from recorder_manager import RecorderManager
  15. from session import Session, Video
  16. from subtitle_task import SubtitleTask
  17. from task_save import TaskSave
  18. from upload_task import UploadTask
  19. CONTINUE_SESSION_MINUTES = 5
  20. WAIT_SESSION_MINUTES = 6
  21. WAIT_BEFORE_SESSION_MINUTES = 1
  22. class RecordUploadManager:
  23. def __init__(self, config_path, save_path):
  24. self.config_path = config_path
  25. self.save_path = save_path
  26. with open(config_path, 'r') as file:
  27. self.config = RecorderConfig(yaml.load(file, Loader=yaml.FullLoader))
  28. if os.path.isfile(save_path):
  29. with open(save_path, 'r') as file:
  30. self.save = TaskSave.from_dict(yaml.load(file, Loader=yaml.FullLoader))
  31. else:
  32. print("Creating save file")
  33. self.save = TaskSave()
  34. self.save_progress()
  35. self.recorder_manager = RecorderManager([room.id for room in self.config.rooms])
  36. self.sessions: dict()
  37. self.video_upload_queue: Queue[UploadTask] = Queue()
  38. self.comment_post_queue: Queue[CommentTask] = Queue()
  39. self.subtitle_post_queue: Queue[SubtitleTask] = Queue()
  40. self.save_lock = threading.Lock()
  41. self.video_upload_thread = threading.Thread(target=self.video_uploader)
  42. self.comment_post_thread = threading.Thread(target=self.comment_poster)
  43. self.subtitle_post_thread = threading.Thread(target=self.subtitle_poster)
  44. self.video_processing_loop = asyncio.new_event_loop()
  45. self.video_uploading_loop = asyncio.new_event_loop()
  46. self.subtitle_posting_loop = asyncio.new_event_loop()
  47. self.video_upload_thread.start()
  48. self.comment_post_thread.start()
  49. self.subtitle_post_thread.start()
  50. self.video_uploading_thread = threading.Thread(target= self.video_processing_loop.run_forever())
  51. self.video_uploading_thread.start()
  52. def save_progress(self):
  53. with open(self.save_path, 'w') as file:
  54. yaml.dump(self.save.to_dict(), file, Dumper=yaml.Dumper)
  55. def video_uploader(self):
  56. asyncio.set_event_loop(self.video_uploading_loop)
  57. while True:
  58. upload_task = self.video_upload_queue.get()
  59. try:
  60. first_video_comment = upload_task.session_id not in self.save.session_id_map
  61. bv_id = upload_task.upload(self.save.session_id_map)
  62. sys.stdout.flush()
  63. with self.save_lock:
  64. self.save.session_id_map[upload_task.session_id] = bv_id
  65. self.save_progress()
  66. if first_video_comment:
  67. self.comment_post_queue.put(
  68. CommentTask.from_upload_task(upload_task)
  69. )
  70. v_info = video.get_video_info(bvid=bv_id, is_simple=False, is_member=True, verify=upload_task.verify)
  71. cid = v_info['videos'][0]['cid']
  72. print("adding subtitle task to queue")
  73. self.subtitle_post_queue.put(
  74. SubtitleTask.from_upload_task(upload_task, bv_id, cid)
  75. )
  76. except Exception:
  77. if upload_task.trial < 5:
  78. upload_task.trial += 1
  79. self.video_upload_queue.put(upload_task)
  80. print(f"Upload failed: {upload_task.title}, retrying")
  81. else:
  82. print(f"Upload failed too many times: {upload_task.title}")
  83. print(traceback.format_exc())
  84. def comment_poster(self):
  85. asyncio.set_event_loop(self.video_uploading_loop)
  86. while True:
  87. with self.save_lock:
  88. while not self.comment_post_queue.empty():
  89. self.save.active_comment_tasks += [self.comment_post_queue.get()]
  90. self.save_progress()
  91. try:
  92. if len(self.save.active_comment_tasks) != 0:
  93. task_to_remove = []
  94. for idx, task in enumerate(self.save.active_comment_tasks):
  95. task: CommentTask
  96. if task.post_comment(self.save.session_id_map):
  97. task_to_remove += [idx]
  98. if task_to_remove != 0:
  99. with self.save_lock:
  100. self.save.active_comment_tasks = [
  101. comment_task
  102. for idx, comment_task in enumerate(self.save.active_comment_tasks)
  103. if idx not in task_to_remove
  104. ]
  105. self.save_progress()
  106. except Exception as err:
  107. print(f"Unknown posting exception: {err}")
  108. print(traceback.format_exc())
  109. finally:
  110. time.sleep(60)
  111. def subtitle_poster(self):
  112. asyncio.set_event_loop(self.subtitle_posting_loop)
  113. while True:
  114. with self.save_lock:
  115. while not self.subtitle_post_queue.empty():
  116. self.save.active_subtitle_tasks += [self.subtitle_post_queue.get()]
  117. self.save_progress()
  118. try:
  119. if len(self.save.active_subtitle_tasks) != 0:
  120. task_to_remove = []
  121. for idx, task in enumerate(self.save.active_subtitle_tasks):
  122. task: SubtitleTask
  123. print("try posting subtitle")
  124. if task.post_subtitle():
  125. task_to_remove += [idx]
  126. if task_to_remove != 0:
  127. with self.save_lock:
  128. new_subtitle_tasks = []
  129. for idx, subtitle_task in enumerate(self.save.active_subtitle_tasks):
  130. subtitle_task: SubtitleTask
  131. append = True
  132. if idx in task_to_remove:
  133. append = False
  134. else:
  135. for j in task_to_remove:
  136. removing_task = self.save.active_subtitle_tasks[j]
  137. if subtitle_task.is_earlier_task_of(removing_task):
  138. append = False
  139. break
  140. if append:
  141. new_subtitle_tasks += [subtitle_task]
  142. self.save.active_subtitle_tasks = new_subtitle_tasks
  143. self.save_progress()
  144. except Exception as err:
  145. print(f"Unknown posting exception: {err}")
  146. print(traceback.format_exc())
  147. finally:
  148. time.sleep(60)
  149. async def upload_video(self, session: Session):
  150. await asyncio.sleep(WAIT_BEFORE_SESSION_MINUTES * 60)
  151. if len(session.videos) == 0:
  152. print(f"No video in session: {session.room_id}@{session.session_id}")
  153. return
  154. room_config = session.room_config
  155. if room_config.uploader is None:
  156. print(f"No need to upload for {room_config.id}")
  157. await session.gen_early_video()
  158. await asyncio.sleep(WAIT_SESSION_MINUTES * 60)
  159. await session.gen_danmaku_video()
  160. return
  161. uploader: UploaderAccount = self.config.accounts[room_config.uploader]
  162. substitute_dict = {
  163. "name": session.room_name,
  164. "title": session.room_title,
  165. "uploader_name": uploader.name,
  166. "y": session.start_time.year,
  167. "m": session.start_time.month,
  168. "d": session.start_time.day,
  169. "yy": f"{session.start_time.year:04d}",
  170. "mm": f"{session.start_time.month:02d}",
  171. "dd": f"{session.start_time.day:02d}",
  172. "flv_path": session.videos[0].flv_file_path()
  173. }
  174. title = Template(room_config.title).substitute(substitute_dict)
  175. temp_title = title
  176. i = 1
  177. other_video_titles = [
  178. name for session_id, name in self.save.video_name_history.items()
  179. if session_id != session.session_id
  180. ]
  181. while temp_title in other_video_titles:
  182. i += 1
  183. temp_title = f"{temp_title}{i}"
  184. title = temp_title
  185. with self.save_lock:
  186. self.save.video_name_history[session.session_id] = title
  187. description = Template(room_config.description).substitute(substitute_dict)
  188. await session.gen_early_video()
  189. early_upload_task = None
  190. if session.early_video_path is not None:
  191. early_upload_task = UploadTask(
  192. session_id=session.session_id,
  193. video_path=session.early_video_path,
  194. thumbnail_path=session.output_path()['thumbnail'],
  195. sc_path=session.output_path()['sc_file'],
  196. he_path=session.output_path()['he_file'],
  197. subtitle_path=session.output_path()['sc_srt'],
  198. title=title,
  199. source=room_config.source,
  200. description=description,
  201. tag=room_config.tags,
  202. channel_id=room_config.channel_id,
  203. danmaku=False,
  204. account=uploader
  205. )
  206. self.video_upload_queue.put(early_upload_task)
  207. await asyncio.sleep(WAIT_SESSION_MINUTES * 60)
  208. await session.gen_danmaku_video()
  209. danmaku_upload_task = UploadTask(
  210. session_id=session.session_id,
  211. video_path=session.output_path()['danmaku_video'],
  212. thumbnail_path=session.output_path()['thumbnail'],
  213. sc_path=session.output_path()['sc_file'],
  214. he_path=session.output_path()['he_file'],
  215. subtitle_path=session.output_path()['sc_srt'],
  216. title=title,
  217. source=room_config.source,
  218. description=description,
  219. tag=room_config.tags,
  220. channel_id=room_config.channel_id,
  221. danmaku=True,
  222. account=uploader
  223. )
  224. self.video_upload_queue.put(
  225. danmaku_upload_task
  226. )
  227. if early_upload_task is None:
  228. self.comment_post_queue.put(
  229. CommentTask.from_upload_task(danmaku_upload_task)
  230. )
  231. async def handle_update(self, update_json: dict):
  232. room_id = update_json["EventData"]["RoomId"]
  233. session_id = update_json["EventData"]["SessionId"]
  234. event_timestamp = dateutil.parser.isoparse(update_json["EventTimestamp"])
  235. room_config = None
  236. for room in self.config.rooms:
  237. if room.id == room_id:
  238. room_config = room
  239. if room_config is None:
  240. print(f"Cannot find room config for {room_id}!")
  241. return
  242. if update_json["EventType"] == "SessionStarted":
  243. for session in self.sessions.values():
  244. if session.room_id == room_id and \
  245. (event_timestamp - session.end_time).total_seconds() / 60 < CONTINUE_SESSION_MINUTES:
  246. self.sessions[session_id] = session
  247. if session.upload_task is not None:
  248. session.upload_task.cancel()
  249. return
  250. self.sessions[session_id] = Session(update_json, room_config=room_config)
  251. else:
  252. if session_id not in self.sessions:
  253. print(f"Cannot find {room_id}/{session_id} for: {update_json}")
  254. return
  255. current_session: Session = self.sessions[session_id]
  256. current_session.process_update(update_json)
  257. if update_json["EventType"] == "FileClosed":
  258. new_video = Video(update_json)
  259. await current_session.add_video(new_video)
  260. elif update_json["EventType"] == "SessionEnded":
  261. current_session.upload_task = \
  262. asyncio.run_coroutine_threadsafe(self.upload_video(current_session), self.video_processing_loop)