youtube.py 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778
  1. #!/usr/bin/env python3
  2. import logging
  3. import tempfile
  4. from pathlib import Path
  5. from typing import Callable, Optional
  6. import yt_dlp
  7. from praw.models import Submission
  8. from bdfr.exceptions import NotADownloadableLinkError, SiteDownloaderError
  9. from bdfr.resource import Resource
  10. from bdfr.site_authenticator import SiteAuthenticator
  11. from bdfr.site_downloaders.base_downloader import BaseDownloader
  12. logger = logging.getLogger(__name__)
  13. class Youtube(BaseDownloader):
  14. def __init__(self, post: Submission):
  15. super().__init__(post)
  16. def find_resources(self, authenticator: Optional[SiteAuthenticator] = None) -> list[Resource]:
  17. ytdl_options = {
  18. 'format': 'best',
  19. 'playlistend': 1,
  20. 'nooverwrites': True,
  21. }
  22. download_function = self._download_video(ytdl_options)
  23. extension = self.get_video_attributes(self.post.url)['ext']
  24. res = Resource(self.post, self.post.url, download_function, extension)
  25. return [res]
  26. def _download_video(self, ytdl_options: dict) -> Callable:
  27. yt_logger = logging.getLogger('youtube-dl')
  28. yt_logger.setLevel(logging.CRITICAL)
  29. ytdl_options['quiet'] = True
  30. ytdl_options['logger'] = yt_logger
  31. def download(_: dict) -> bytes:
  32. with tempfile.TemporaryDirectory() as temp_dir:
  33. download_path = Path(temp_dir).resolve()
  34. ytdl_options['outtmpl'] = str(download_path) + '/' + 'test.%(ext)s'
  35. try:
  36. with yt_dlp.YoutubeDL(ytdl_options) as ydl:
  37. ydl.download([self.post.url])
  38. except yt_dlp.DownloadError as e:
  39. raise SiteDownloaderError(f'Youtube download failed: {e}')
  40. downloaded_files = list(download_path.iterdir())
  41. if len(downloaded_files) > 0:
  42. downloaded_file = downloaded_files[0]
  43. else:
  44. raise NotADownloadableLinkError(f"No media exists in the URL {self.post.url}")
  45. with open(downloaded_file, 'rb') as file:
  46. content = file.read()
  47. return content
  48. return download
  49. @staticmethod
  50. def get_video_data(url: str) -> dict:
  51. yt_logger = logging.getLogger('youtube-dl')
  52. yt_logger.setLevel(logging.CRITICAL)
  53. with yt_dlp.YoutubeDL({'logger': yt_logger, }) as ydl:
  54. try:
  55. result = ydl.extract_info(url, download=False)
  56. except Exception as e:
  57. logger.exception(e)
  58. raise NotADownloadableLinkError(f'Video info extraction failed for {url}')
  59. return result
  60. @staticmethod
  61. def get_video_attributes(url: str) -> dict:
  62. result = Youtube.get_video_data(url)
  63. if 'ext' in result:
  64. return result
  65. else:
  66. raise NotADownloadableLinkError(f'Video info extraction failed for {url}')