123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273 |
- import asyncio
- import os.path
- import sys
- import threading
- import time
- import traceback
- from queue import Queue
- from string import Template
- import dateutil.parser
- import yaml
- from bilibili_api import video
- from comment_task import CommentTask
- from recorder_config import RecorderConfig, UploaderAccount
- from recorder_manager import RecorderManager
- from session import Session, Video
- from subtitle_task import SubtitleTask
- from task_save import TaskSave
- from upload_task import UploadTask
- CONTINUE_SESSION_MINUTES = 5
- WAIT_SESSION_MINUTES = 6
- WAIT_BEFORE_SESSION_MINUTES = 1
- class RecordUploadManager:
- def __init__(self, config_path, save_path):
- self.config_path = config_path
- self.save_path = save_path
- with open(config_path, 'r') as file:
- self.config = RecorderConfig(yaml.load(file, Loader=yaml.FullLoader))
- if os.path.isfile(save_path):
- with open(save_path, 'r') as file:
- self.save = TaskSave.from_dict(yaml.load(file, Loader=yaml.FullLoader))
- else:
- print("Creating save file")
- self.save = TaskSave()
- self.save_progress()
- self.recorder_manager = RecorderManager([room.id for room in self.config.rooms])
- self.sessions: dict()
- self.video_upload_queue: Queue[UploadTask] = Queue()
- self.comment_post_queue: Queue[CommentTask] = Queue()
- self.subtitle_post_queue: Queue[SubtitleTask] = Queue()
- self.save_lock = threading.Lock()
- self.video_upload_thread = threading.Thread(target=self.video_uploader)
- self.comment_post_thread = threading.Thread(target=self.comment_poster)
- self.subtitle_post_thread = threading.Thread(target=self.subtitle_poster)
- self.video_processing_loop = asyncio.new_event_loop()
- self.video_uploading_loop = asyncio.new_event_loop()
- self.subtitle_posting_loop = asyncio.new_event_loop()
- self.video_upload_thread.start()
- self.comment_post_thread.start()
- self.subtitle_post_thread.start()
- self.video_uploading_thread = threading.Thread(target= self.video_processing_loop.run_forever())
- self.video_uploading_thread.start()
- def save_progress(self):
- with open(self.save_path, 'w') as file:
- yaml.dump(self.save.to_dict(), file, Dumper=yaml.Dumper)
- def video_uploader(self):
- asyncio.set_event_loop(self.video_uploading_loop)
- while True:
- upload_task = self.video_upload_queue.get()
- try:
- first_video_comment = upload_task.session_id not in self.save.session_id_map
- bv_id = upload_task.upload(self.save.session_id_map)
- sys.stdout.flush()
- with self.save_lock:
- self.save.session_id_map[upload_task.session_id] = bv_id
- self.save_progress()
- if first_video_comment:
- self.comment_post_queue.put(
- CommentTask.from_upload_task(upload_task)
- )
- v_info = video.get_video_info(bvid=bv_id, is_simple=False, is_member=True, verify=upload_task.verify)
- cid = v_info['videos'][0]['cid']
- print("adding subtitle task to queue")
- self.subtitle_post_queue.put(
- SubtitleTask.from_upload_task(upload_task, bv_id, cid)
- )
- except Exception:
- if upload_task.trial < 5:
- upload_task.trial += 1
- self.video_upload_queue.put(upload_task)
- print(f"Upload failed: {upload_task.title}, retrying")
- else:
- print(f"Upload failed too many times: {upload_task.title}")
- print(traceback.format_exc())
- def comment_poster(self):
- asyncio.set_event_loop(self.video_uploading_loop)
- while True:
- with self.save_lock:
- while not self.comment_post_queue.empty():
- self.save.active_comment_tasks += [self.comment_post_queue.get()]
- self.save_progress()
- try:
- if len(self.save.active_comment_tasks) != 0:
- task_to_remove = []
- for idx, task in enumerate(self.save.active_comment_tasks):
- task: CommentTask
- if task.post_comment(self.save.session_id_map):
- task_to_remove += [idx]
- if task_to_remove != 0:
- with self.save_lock:
- self.save.active_comment_tasks = [
- comment_task
- for idx, comment_task in enumerate(self.save.active_comment_tasks)
- if idx not in task_to_remove
- ]
- self.save_progress()
- except Exception as err:
- print(f"Unknown posting exception: {err}")
- print(traceback.format_exc())
- finally:
- time.sleep(60)
- def subtitle_poster(self):
- asyncio.set_event_loop(self.subtitle_posting_loop)
- while True:
- with self.save_lock:
- while not self.subtitle_post_queue.empty():
- self.save.active_subtitle_tasks += [self.subtitle_post_queue.get()]
- self.save_progress()
- try:
- if len(self.save.active_subtitle_tasks) != 0:
- task_to_remove = []
- for idx, task in enumerate(self.save.active_subtitle_tasks):
- task: SubtitleTask
- print("try posting subtitle")
- if task.post_subtitle():
- task_to_remove += [idx]
- if task_to_remove != 0:
- with self.save_lock:
- new_subtitle_tasks = []
- for idx, subtitle_task in enumerate(self.save.active_subtitle_tasks):
- subtitle_task: SubtitleTask
- append = True
- if idx in task_to_remove:
- append = False
- else:
- for j in task_to_remove:
- removing_task = self.save.active_subtitle_tasks[j]
- if subtitle_task.is_earlier_task_of(removing_task):
- append = False
- break
- if append:
- new_subtitle_tasks += [subtitle_task]
- self.save.active_subtitle_tasks = new_subtitle_tasks
- self.save_progress()
- except Exception as err:
- print(f"Unknown posting exception: {err}")
- print(traceback.format_exc())
- finally:
- time.sleep(60)
- async def upload_video(self, session: Session):
- await asyncio.sleep(WAIT_BEFORE_SESSION_MINUTES * 60)
- if len(session.videos) == 0:
- print(f"No video in session: {session.room_id}@{session.session_id}")
- return
- room_config = session.room_config
- if room_config.uploader is None:
- print(f"No need to upload for {room_config.id}")
- await session.gen_early_video()
- await asyncio.sleep(WAIT_SESSION_MINUTES * 60)
- await session.gen_danmaku_video()
- return
- uploader: UploaderAccount = self.config.accounts[room_config.uploader]
- substitute_dict = {
- "name": session.room_name,
- "title": session.room_title,
- "uploader_name": uploader.name,
- "y": session.start_time.year,
- "m": session.start_time.month,
- "d": session.start_time.day,
- "yy": f"{session.start_time.year:04d}",
- "mm": f"{session.start_time.month:02d}",
- "dd": f"{session.start_time.day:02d}",
- "flv_path": session.videos[0].flv_file_path()
- }
- title = Template(room_config.title).substitute(substitute_dict)
- temp_title = title
- i = 1
- other_video_titles = [
- name for session_id, name in self.save.video_name_history.items()
- if session_id != session.session_id
- ]
- while temp_title in other_video_titles:
- i += 1
- temp_title = f"{temp_title}{i}"
- title = temp_title
- with self.save_lock:
- self.save.video_name_history[session.session_id] = title
- description = Template(room_config.description).substitute(substitute_dict)
- await session.gen_early_video()
- early_upload_task = None
- if session.early_video_path is not None:
- early_upload_task = UploadTask(
- session_id=session.session_id,
- video_path=session.early_video_path,
- thumbnail_path=session.output_path()['thumbnail'],
- sc_path=session.output_path()['sc_file'],
- he_path=session.output_path()['he_file'],
- subtitle_path=session.output_path()['sc_srt'],
- title=title,
- source=room_config.source,
- description=description,
- tag=room_config.tags,
- channel_id=room_config.channel_id,
- danmaku=False,
- account=uploader
- )
- self.video_upload_queue.put(early_upload_task)
- await asyncio.sleep(WAIT_SESSION_MINUTES * 60)
- await session.gen_danmaku_video()
- danmaku_upload_task = UploadTask(
- session_id=session.session_id,
- video_path=session.output_path()['danmaku_video'],
- thumbnail_path=session.output_path()['thumbnail'],
- sc_path=session.output_path()['sc_file'],
- he_path=session.output_path()['he_file'],
- subtitle_path=session.output_path()['sc_srt'],
- title=title,
- source=room_config.source,
- description=description,
- tag=room_config.tags,
- channel_id=room_config.channel_id,
- danmaku=True,
- account=uploader
- )
- self.video_upload_queue.put(
- danmaku_upload_task
- )
- if early_upload_task is None:
- self.comment_post_queue.put(
- CommentTask.from_upload_task(danmaku_upload_task)
- )
- async def handle_update(self, update_json: dict):
- room_id = update_json["EventData"]["RoomId"]
- session_id = update_json["EventData"]["SessionId"]
- event_timestamp = dateutil.parser.isoparse(update_json["EventTimestamp"])
- room_config = None
- for room in self.config.rooms:
- if room.id == room_id:
- room_config = room
- if room_config is None:
- print(f"Cannot find room config for {room_id}!")
- return
- if update_json["EventType"] == "SessionStarted":
- for session in self.sessions.values():
- if session.room_id == room_id and \
- (event_timestamp - session.end_time).total_seconds() / 60 < CONTINUE_SESSION_MINUTES:
- self.sessions[session_id] = session
- if session.upload_task is not None:
- session.upload_task.cancel()
- return
- self.sessions[session_id] = Session(update_json, room_config=room_config)
- else:
- if session_id not in self.sessions:
- print(f"Cannot find {room_id}/{session_id} for: {update_json}")
- return
- current_session: Session = self.sessions[session_id]
- current_session.process_update(update_json)
- if update_json["EventType"] == "FileClosed":
- new_video = Video(update_json)
- await current_session.add_video(new_video)
- elif update_json["EventType"] == "SessionEnded":
- current_session.upload_task = \
- asyncio.run_coroutine_threadsafe(self.upload_video(current_session), self.video_processing_loop)
|