twitter_api_archiver.py 3.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. import json
  2. from datetime import datetime
  3. from loguru import logger
  4. from pytwitter import Api
  5. from storages.base_storage import Storage
  6. from configs import TwitterApiConfig
  7. from .base_archiver import ArchiveResult
  8. from .twitter_archiver import TwitterArchiver
  9. class TwitterApiArchiver(TwitterArchiver):
  10. name = "twitter_api"
  11. def __init__(self, storage: Storage, driver, config: TwitterApiConfig):
  12. super().__init__(storage, driver)
  13. if config.bearer_token:
  14. self.api = Api(bearer_token=config.bearer_token)
  15. elif config.consumer_key and config.consumer_secret and config.access_token and config.access_secret:
  16. self.api = Api(
  17. consumer_key=config.consumer_key, consumer_secret=config.consumer_secret, access_token=config.access_token, access_secret=config.access_secret)
  18. def download(self, url, check_if_exists=False):
  19. if not hasattr(self, "api"):
  20. logger.warning('Missing Twitter API config')
  21. return False
  22. username, tweet_id = self.get_username_tweet_id(url)
  23. if not username: return False
  24. tweet = self.api.get_tweet(tweet_id, expansions=["attachments.media_keys"], media_fields=["type", "duration_ms", "url", "variants"], tweet_fields=["attachments", "author_id", "created_at", "entities", "id", "text", "possibly_sensitive"])
  25. timestamp = datetime.strptime(tweet.data.created_at, "%Y-%m-%dT%H:%M:%S.%fZ")
  26. # check if exists
  27. key = self.get_html_key(url)
  28. if check_if_exists and self.storage.exists(key):
  29. # only s3 storage supports storage.exists as not implemented on gd
  30. cdn_url = self.storage.get_cdn_url(key)
  31. screenshot = self.get_screenshot(url)
  32. return ArchiveResult(status='already archived', cdn_url=cdn_url, title=tweet.data.text, timestamp=timestamp, screenshot=screenshot)
  33. urls = []
  34. if tweet.includes:
  35. for m in tweet.includes.media:
  36. if m.url:
  37. urls.append(m.url)
  38. elif hasattr(m, "variants"):
  39. var_url = self.choose_variant(m.variants)
  40. urls.append(var_url)
  41. else:
  42. urls.append(None) # will trigger error
  43. for u in urls:
  44. if u is None:
  45. logger.error(f"Should not have gotten None url for {tweet.includes.media=}")
  46. return self.download_alternative(url, tweet_id)
  47. logger.debug(f"found {urls=}")
  48. output = json.dumps({
  49. "id": tweet.data.id,
  50. "text": tweet.data.text,
  51. "created_at": tweet.data.created_at,
  52. "author_id": tweet.data.author_id,
  53. "geo": tweet.data.geo,
  54. "lang": tweet.data.lang,
  55. "media": urls
  56. }, ensure_ascii=False, indent=4)
  57. screenshot = self.get_screenshot(url)
  58. page_cdn, page_hash, thumbnail = self.generate_media_page(urls, url, output)
  59. return ArchiveResult(status="success", cdn_url=page_cdn, screenshot=screenshot, hash=page_hash, thumbnail=thumbnail, timestamp=timestamp, title=tweet.data.text)