vk_archiver.py 3.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. import re, json, mimetypes, os
  2. from loguru import logger
  3. from vk_url_scraper import VkScraper, DateTimeEncoder
  4. from storages import Storage
  5. from .base_archiver import Archiver, ArchiveResult
  6. from configs import VkConfig
  7. class VkArchiver(Archiver):
  8. """"
  9. VK videos are handled by YTDownloader, this archiver gets posts text and images.
  10. Currently only works for /wall posts
  11. """
  12. name = "vk"
  13. wall_pattern = re.compile(r"(wall.{0,1}\d+_\d+)")
  14. photo_pattern = re.compile(r"(photo.{0,1}\d+_\d+)")
  15. def __init__(self, storage: Storage, driver, config: VkConfig):
  16. super().__init__(storage, driver)
  17. if config != None:
  18. self.vks = VkScraper(config.username, config.password)
  19. def download(self, url, check_if_exists=False):
  20. if not hasattr(self, "vks") or self.vks is None:
  21. logger.debug("VK archiver was not supplied with credentials.")
  22. return False
  23. key = self.get_html_key(url)
  24. # if check_if_exists and self.storage.exists(key):
  25. # screenshot = self.get_screenshot(url)
  26. # cdn_url = self.storage.get_cdn_url(key)
  27. # return ArchiveResult(status="already archived", cdn_url=cdn_url, screenshot=screenshot)
  28. results = self.vks.scrape(url) # some urls can contain multiple wall/photo/... parts and all will be fetched
  29. if len(results) == 0:
  30. return False
  31. def dump_payload(p): return json.dumps(p, ensure_ascii=False, indent=4, cls=DateTimeEncoder)
  32. textual_output = ""
  33. title, datetime = results[0]["text"], results[0]["datetime"]
  34. urls_found = []
  35. for res in results:
  36. textual_output += f"id: {res['id']}<br>time utc: {res['datetime']}<br>text: {res['text']}<br>payload: {dump_payload(res['payload'])}<br><hr/><br>"
  37. title = res["text"] if len(title) == 0 else title
  38. datetime = res["datetime"] if not datetime else datetime
  39. for attachments in res["attachments"].values():
  40. urls_found.extend(attachments)
  41. # we don't call generate_media_page which downloads urls because it cannot download vk video urls
  42. thumbnail, thumbnail_index = None, None
  43. uploaded_media = []
  44. filenames = self.vks.download_media(results, Storage.TMP_FOLDER)
  45. for filename in filenames:
  46. key = self.get_key(filename)
  47. self.storage.upload(filename, key)
  48. hash = self.get_hash(filename)
  49. cdn_url = self.storage.get_cdn_url(key)
  50. try:
  51. _type = mimetypes.guess_type(filename)[0].split("/")[0]
  52. if _type == "image" and thumbnail is None:
  53. thumbnail = cdn_url
  54. if _type == "video" and (thumbnail is None or thumbnail_index is None):
  55. thumbnail, thumbnail_index = self.get_thumbnails(filename, key)
  56. except Exception as e:
  57. logger.warning(f"failed to get thumb for {filename=} with {e=}")
  58. uploaded_media.append({'cdn_url': cdn_url, 'key': key, 'hash': hash})
  59. page_cdn, page_hash, thumbnail = self.generate_media_page_html(url, uploaded_media, textual_output, thumbnail=thumbnail)
  60. # # if multiple wall/photos/videos are present the screenshot will only grab the 1st
  61. screenshot = self.get_screenshot(url)
  62. return ArchiveResult(status="success", cdn_url=page_cdn, screenshot=screenshot, hash=page_hash, thumbnail=thumbnail, thumbnail_index=thumbnail_index, timestamp=datetime, title=title)