12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788 |
- import os, requests, re
- import html
- from bs4 import BeautifulSoup
- from loguru import logger
- from .base_archiver import Archiver, ArchiveResult
- from storages import Storage
- class TelegramArchiver(Archiver):
- name = "telegram"
- def download(self, url, check_if_exists=False):
- # detect URLs that we definitely cannot handle
- if 't.me' != self.get_netloc(url):
- return False
- headers = {
- 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.138 Safari/537.36'
- }
- status = "success"
- original_url = url
- # TODO: check if we can do this more resilient to variable URLs
- if url[-8:] != "?embed=1":
- url += "?embed=1"
- screenshot = self.get_screenshot(url)
- t = requests.get(url, headers=headers)
- s = BeautifulSoup(t.content, 'html.parser')
- video = s.find("video")
- if video is None:
- logger.warning("could not find video")
- image_tags = s.find_all(class_="js-message_photo")
- images = []
- for im in image_tags:
- urls = [u.replace("'", "") for u in re.findall(r'url\((.*?)\)', im['style'])]
- images += urls
- page_cdn, page_hash, thumbnail = self.generate_media_page(images, url, html.escape(str(t.content)))
- time_elements = s.find_all('time')
- timestamp = time_elements[0].get('datetime') if len(time_elements) else None
- return ArchiveResult(status="success", cdn_url=page_cdn, screenshot=screenshot, hash=page_hash, thumbnail=thumbnail, timestamp=timestamp)
- video_url = video.get('src')
- video_id = video_url.split('/')[-1].split('?')[0]
- key = self.get_key(video_id)
- filename = os.path.join(Storage.TMP_FOLDER, key)
- if check_if_exists and self.storage.exists(key):
- status = 'already archived'
- v = requests.get(video_url, headers=headers)
- with open(filename, 'wb') as f:
- f.write(v.content)
- if status != 'already archived':
- self.storage.upload(filename, key)
- hash = self.get_hash(filename)
- # extract duration from HTML
- try:
- duration = s.find_all('time')[0].contents[0]
- if ':' in duration:
- duration = float(duration.split(
- ':')[0]) * 60 + float(duration.split(':')[1])
- else:
- duration = float(duration)
- except:
- duration = ""
- # process thumbnails
- key_thumb, thumb_index = self.get_thumbnails(
- filename, key, duration=duration)
- os.remove(filename)
- cdn_url = self.storage.get_cdn_url(key)
- return ArchiveResult(status=status, cdn_url=cdn_url, thumbnail=key_thumb, thumbnail_index=thumb_index,
- duration=duration, title=original_url, timestamp=s.find_all('time')[1].get('datetime'), hash=hash, screenshot=screenshot)
|