import html, re, requests
from datetime import datetime
from loguru import logger
from snscrape.modules.twitter import TwitterTweetScraper, Video, Gif, Photo
from .base_archiver import Archiver, ArchiveResult
class TwitterArchiver(Archiver):
"""
This Twitter Archiver uses unofficial scraping methods, and it works as
an alternative to TwitterApiArchiver when no API credentials are provided.
"""
name = "twitter"
link_pattern = re.compile(r"twitter.com\/(?:\#!\/)?(\w+)\/status(?:es)?\/(\d+)")
def get_username_tweet_id(self, url):
# detect URLs that we definitely cannot handle
matches = self.link_pattern.findall(url)
if not len(matches): return False, False
username, tweet_id = matches[0] # only one URL supported
logger.debug(f"Found {username=} and {tweet_id=} in {url=}")
return username, tweet_id
def download(self, url, check_if_exists=False):
username, tweet_id = self.get_username_tweet_id(url)
if not username: return False
scr = TwitterTweetScraper(tweet_id)
try:
tweet = next(scr.get_items())
except Exception as ex:
logger.warning(f"can't get tweet: {type(ex).__name__} occurred. args: {ex.args}")
return self.download_alternative(url, tweet_id)
if tweet.media is None:
logger.debug(f'No media found, archiving tweet text only')
screenshot = self.get_screenshot(url)
page_cdn, page_hash, _ = self.generate_media_page_html(url, [], html.escape(tweet.json()))
return ArchiveResult(status="success", cdn_url=page_cdn, title=tweet.content, timestamp=tweet.date, hash=page_hash, screenshot=screenshot)
urls = []
for media in tweet.media:
if type(media) == Video:
variant = max(
[v for v in media.variants if v.bitrate], key=lambda v: v.bitrate)
urls.append(variant.url)
elif type(media) == Gif:
urls.append(media.variants[0].url)
elif type(media) == Photo:
urls.append(media.fullUrl.replace('name=large', 'name=orig'))
else:
logger.warning(f"Could not get media URL of {media}")
page_cdn, page_hash, thumbnail = self.generate_media_page(urls, url, tweet.json())
screenshot = self.get_screenshot(url)
return ArchiveResult(status="success", cdn_url=page_cdn, screenshot=screenshot, hash=page_hash, thumbnail=thumbnail, timestamp=tweet.date, title=tweet.content)
def download_alternative(self, url, tweet_id):
# https://stackoverflow.com/a/71867055/6196010
logger.debug(f"Trying twitter hack for {url=}")
hack_url = f"https://cdn.syndication.twimg.com/tweet?id={tweet_id}"
r = requests.get(hack_url)
if r.status_code != 200: return False
tweet = r.json()
urls = []
for p in tweet["photos"]:
urls.append(p["url"])
# 1 tweet has 1 video max
if "video" in tweet:
v = tweet["video"]
urls.append(self.choose_variant(v.get("variants", [])))
logger.debug(f"Twitter hack got {urls=}")
timestamp = datetime.strptime(tweet["created_at"], "%Y-%m-%dT%H:%M:%S.%fZ")
screenshot = self.get_screenshot(url)
page_cdn, page_hash, thumbnail = self.generate_media_page(urls, url, r.text)
return ArchiveResult(status="success", cdn_url=page_cdn, screenshot=screenshot, hash=page_hash, thumbnail=thumbnail, timestamp=timestamp, title=tweet["text"])
def choose_variant(self, variants):
# choosing the highest quality possible
variant, width, height = None, 0, 0
for var in variants:
if var["type"] == "video/mp4":
width_height = re.search(r"\/(\d+)x(\d+)\/", var["src"])
if width_height:
w, h = int(width_height[1]), int(width_height[2])
if w > width or h > height:
width, height = w, h
variant = var.get("src", variant)
else:
variant = var.get("src") if not variant else variant
return variant