telegram_archiver.py 3.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788
  1. import os, requests, re
  2. import html
  3. from bs4 import BeautifulSoup
  4. from loguru import logger
  5. from .base_archiver import Archiver, ArchiveResult
  6. from storages import Storage
  7. class TelegramArchiver(Archiver):
  8. name = "telegram"
  9. def download(self, url, check_if_exists=False):
  10. # detect URLs that we definitely cannot handle
  11. if 't.me' != self.get_netloc(url):
  12. return False
  13. headers = {
  14. 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.138 Safari/537.36'
  15. }
  16. status = "success"
  17. original_url = url
  18. # TODO: check if we can do this more resilient to variable URLs
  19. if url[-8:] != "?embed=1":
  20. url += "?embed=1"
  21. screenshot = self.get_screenshot(url)
  22. t = requests.get(url, headers=headers)
  23. s = BeautifulSoup(t.content, 'html.parser')
  24. video = s.find("video")
  25. if video is None:
  26. logger.warning("could not find video")
  27. image_tags = s.find_all(class_="js-message_photo")
  28. images = []
  29. for im in image_tags:
  30. urls = [u.replace("'", "") for u in re.findall(r'url\((.*?)\)', im['style'])]
  31. images += urls
  32. page_cdn, page_hash, thumbnail = self.generate_media_page(images, url, html.escape(str(t.content)))
  33. time_elements = s.find_all('time')
  34. timestamp = time_elements[0].get('datetime') if len(time_elements) else None
  35. return ArchiveResult(status="success", cdn_url=page_cdn, screenshot=screenshot, hash=page_hash, thumbnail=thumbnail, timestamp=timestamp)
  36. video_url = video.get('src')
  37. video_id = video_url.split('/')[-1].split('?')[0]
  38. key = self.get_key(video_id)
  39. filename = os.path.join(Storage.TMP_FOLDER, key)
  40. if check_if_exists and self.storage.exists(key):
  41. status = 'already archived'
  42. v = requests.get(video_url, headers=headers)
  43. with open(filename, 'wb') as f:
  44. f.write(v.content)
  45. if status != 'already archived':
  46. self.storage.upload(filename, key)
  47. hash = self.get_hash(filename)
  48. # extract duration from HTML
  49. try:
  50. duration = s.find_all('time')[0].contents[0]
  51. if ':' in duration:
  52. duration = float(duration.split(
  53. ':')[0]) * 60 + float(duration.split(':')[1])
  54. else:
  55. duration = float(duration)
  56. except:
  57. duration = ""
  58. # process thumbnails
  59. key_thumb, thumb_index = self.get_thumbnails(
  60. filename, key, duration=duration)
  61. os.remove(filename)
  62. cdn_url = self.storage.get_cdn_url(key)
  63. return ArchiveResult(status=status, cdn_url=cdn_url, thumbnail=key_thumb, thumbnail_index=thumb_index,
  64. duration=duration, title=original_url, timestamp=s.find_all('time')[1].get('datetime'), hash=hash, screenshot=screenshot)