import os, requests, re import html from bs4 import BeautifulSoup from loguru import logger from .base_archiver import Archiver, ArchiveResult from storages import Storage class TelegramArchiver(Archiver): name = "telegram" def download(self, url, check_if_exists=False): # detect URLs that we definitely cannot handle if 't.me' != self.get_netloc(url): return False headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.138 Safari/537.36' } status = "success" original_url = url # TODO: check if we can do this more resilient to variable URLs if url[-8:] != "?embed=1": url += "?embed=1" screenshot = self.get_screenshot(url) t = requests.get(url, headers=headers) s = BeautifulSoup(t.content, 'html.parser') video = s.find("video") if video is None: logger.warning("could not find video") image_tags = s.find_all(class_="js-message_photo") images = [] for im in image_tags: urls = [u.replace("'", "") for u in re.findall(r'url\((.*?)\)', im['style'])] images += urls page_cdn, page_hash, thumbnail = self.generate_media_page(images, url, html.escape(str(t.content))) time_elements = s.find_all('time') timestamp = time_elements[0].get('datetime') if len(time_elements) else None return ArchiveResult(status="success", cdn_url=page_cdn, screenshot=screenshot, hash=page_hash, thumbnail=thumbnail, timestamp=timestamp) video_url = video.get('src') video_id = video_url.split('/')[-1].split('?')[0] key = self.get_key(video_id) filename = os.path.join(Storage.TMP_FOLDER, key) if check_if_exists and self.storage.exists(key): status = 'already archived' v = requests.get(video_url, headers=headers) with open(filename, 'wb') as f: f.write(v.content) if status != 'already archived': self.storage.upload(filename, key) hash = self.get_hash(filename) # extract duration from HTML try: duration = s.find_all('time')[0].contents[0] if ':' in duration: duration = float(duration.split( ':')[0]) * 60 + float(duration.split(':')[1]) else: duration = float(duration) except: duration = "" # process thumbnails key_thumb, thumb_index = self.get_thumbnails( filename, key, duration=duration) os.remove(filename) cdn_url = self.storage.get_cdn_url(key) return ArchiveResult(status=status, cdn_url=cdn_url, thumbnail=key_thumb, thumbnail_index=thumb_index, duration=duration, title=original_url, timestamp=s.find_all('time')[1].get('datetime'), hash=hash, screenshot=screenshot)