twitter_archiver-backup.py 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
  1. import html, re, requests
  2. from datetime import datetime
  3. from loguru import logger
  4. from snscrape.modules.twitter import TwitterTweetScraper, Video, Gif, Photo
  5. from .base_archiver import Archiver, ArchiveResult
  6. class TwitterArchiver(Archiver):
  7. """
  8. This Twitter Archiver uses unofficial scraping methods, and it works as
  9. an alternative to TwitterApiArchiver when no API credentials are provided.
  10. """
  11. name = "twitter"
  12. link_pattern = re.compile(r"twitter.com\/(?:\#!\/)?(\w+)\/status(?:es)?\/(\d+)")
  13. def get_username_tweet_id(self, url):
  14. # detect URLs that we definitely cannot handle
  15. matches = self.link_pattern.findall(url)
  16. if not len(matches): return False, False
  17. username, tweet_id = matches[0] # only one URL supported
  18. logger.debug(f"Found {username=} and {tweet_id=} in {url=}")
  19. return username, tweet_id
  20. def download(self, url, check_if_exists=False):
  21. username, tweet_id = self.get_username_tweet_id(url)
  22. if not username: return False
  23. scr = TwitterTweetScraper(tweet_id)
  24. try:
  25. tweet = next(scr.get_items())
  26. except Exception as ex:
  27. logger.warning(f"can't get tweet: {type(ex).__name__} occurred. args: {ex.args}")
  28. return self.download_alternative(url, tweet_id)
  29. if tweet.media is None:
  30. logger.debug(f'No media found, archiving tweet text only')
  31. screenshot = self.get_screenshot(url)
  32. page_cdn, page_hash, _ = self.generate_media_page_html(url, [], html.escape(tweet.json()))
  33. return ArchiveResult(status="success", cdn_url=page_cdn, title=tweet.content, timestamp=tweet.date, hash=page_hash, screenshot=screenshot)
  34. urls = []
  35. for media in tweet.media:
  36. if type(media) == Video:
  37. variant = max(
  38. [v for v in media.variants if v.bitrate], key= v.bitrate)
  39. urls.append(variant.url)
  40. elif type(media) == Gif:
  41. urls.append(media.variants[0].url)
  42. elif type(media) == Photo:
  43. urls.append(media.fullUrl.replace('name=large', 'name=orig'))
  44. else:
  45. logger.warning(f"Could not get media URL of {media}")
  46. page_cdn, page_hash, thumbnail = self.generate_media_page(urls, url, tweet.json())
  47. screenshot = self.get_screenshot(url)
  48. return ArchiveResult(status="success", cdn_url=page_cdn, screenshot=screenshot, hash=page_hash, thumbnail=thumbnail, timestamp=tweet.date, title=tweet.content)
  49. def download_alternative(self, url, tweet_id):
  50. # https://stackoverflow.com/a/71867055/6196010
  51. logger.debug(f"Trying twitter hack for {url=}")
  52. hack_url = f"https://cdn.syndication.twimg.com/tweet?id={tweet_id}"
  53. r = requests.get(hack_url)
  54. if r.status_code != 200: return False
  55. tweet = r.json()
  56. urls = []
  57. for p in tweet["photos"]:
  58. urls.append(p["url"])
  59. # 1 tweet has 1 video max
  60. if "video" in tweet:
  61. v = tweet["video"]
  62. urls.append(self.choose_variant(v.get("variants", [])))
  63. logger.debug(f"Twitter hack got {urls=}")
  64. timestamp = datetime.strptime(tweet["created_at"], "%Y-%m-%dT%H:%M:%S.%fZ")
  65. screenshot = self.get_screenshot(url)
  66. page_cdn, page_hash, thumbnail = self.generate_media_page(urls, url, r.text)
  67. return ArchiveResult(status="success", cdn_url=page_cdn, screenshot=screenshot, hash=page_hash, thumbnail=thumbnail, timestamp=timestamp, title=tweet["text"])
  68. def choose_variant(self, variants):
  69. # choosing the highest quality possible
  70. variant, width, height = None, 0, 0
  71. for var in variants:
  72. if var["type"] == "video/mp4":
  73. width_height = re.search(r"\/(\d+)x(\d+)\/", var["src"])
  74. if width_height:
  75. w, h = int(width_height[1]), int(width_height[2])
  76. if w > width or h > height:
  77. width, height = w, h
  78. variant = var.get("src", variant)
  79. else:
  80. variant = var.get("src") if not variant else variant
  81. return variant