gd_storage.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138
  1. import os, time
  2. from loguru import logger
  3. from .base_storage import Storage
  4. from dataclasses import dataclass
  5. from googleapiclient.discovery import build
  6. from googleapiclient.http import MediaFileUpload
  7. from google.oauth2 import service_account
  8. @dataclass
  9. class GDConfig:
  10. root_folder_id: str
  11. folder: str = "default"
  12. service_account: str = "service_account.json"
  13. class GDStorage(Storage):
  14. def __init__(self, config: GDConfig):
  15. self.folder = config.folder
  16. self.root_folder_id = config.root_folder_id
  17. creds = service_account.Credentials.from_service_account_file(
  18. config.service_account, scopes=['https://www.googleapis.com/auth/drive'])
  19. self.service = build('drive', 'v3', credentials=creds)
  20. def get_cdn_url(self, key):
  21. """
  22. only support files saved in a folder for GD
  23. S3 supports folder and all stored in the root
  24. """
  25. full_name = os.path.join(self.folder, key)
  26. parent_id, folder_id = self.root_folder_id, None
  27. path_parts = full_name.split(os.path.sep)
  28. filename = path_parts[-1]
  29. logger.info(f"looking for folders for {path_parts[0:-1]} before uploading {filename=}")
  30. for folder in path_parts[0:-1]:
  31. folder_id = self._get_id_from_parent_and_name(parent_id, folder, use_mime_type=True, raise_on_missing=True)
  32. parent_id = folder_id
  33. # get id of file inside folder (or sub folder)
  34. file_id = self._get_id_from_parent_and_name(folder_id, filename)
  35. return f"https://drive.google.com/file/d/{file_id}/view?usp=sharing"
  36. def exists(self, key):
  37. try:
  38. self.get_cdn_url(key)
  39. return True
  40. except: return False
  41. def uploadf(self, file: str, key: str, **_kwargs):
  42. """
  43. 1. for each sub-folder in the path check if exists or create
  44. 2. upload file to root_id/other_paths.../filename
  45. """
  46. full_name = os.path.join(self.folder, key)
  47. parent_id, upload_to = self.root_folder_id, None
  48. path_parts = full_name.split(os.path.sep)
  49. filename = path_parts[-1]
  50. logger.info(f"checking folders {path_parts[0:-1]} exist (or creating) before uploading {filename=}")
  51. for folder in path_parts[0:-1]:
  52. upload_to = self._get_id_from_parent_and_name(parent_id, folder, use_mime_type=True, raise_on_missing=False)
  53. if upload_to is None:
  54. upload_to = self._mkdir(folder, parent_id)
  55. parent_id = upload_to
  56. # upload file to gd
  57. logger.debug(f'uploading {filename=} to folder id {upload_to}')
  58. file_metadata = {
  59. 'name': [filename],
  60. 'parents': [upload_to]
  61. }
  62. media = MediaFileUpload(file, resumable=True)
  63. gd_file = self.service.files().create(body=file_metadata, media_body=media, fields='id').execute()
  64. logger.debug(f'uploadf: uploaded file {gd_file["id"]} succesfully in folder={upload_to}')
  65. def upload(self, filename: str, key: str, **kwargs):
  66. # GD only requires the filename not a file reader
  67. self.uploadf(filename, key, **kwargs)
  68. def _get_id_from_parent_and_name(self, parent_id: str, name: str, retries: int = 1, sleep_seconds: int = 10, use_mime_type: bool = False, raise_on_missing: bool = True, use_cache=True):
  69. """
  70. Retrieves the id of a folder or file from its @name and the @parent_id folder
  71. Optionally does multiple @retries and sleeps @sleep_seconds between them
  72. If @use_mime_type will restrict search to "mimeType='application/vnd.google-apps.folder'"
  73. If @raise_on_missing will throw error when not found, or returns None
  74. Will remember previous calls to avoid duplication if @use_cache
  75. Returns the id of the file or folder from its name as a string
  76. """
  77. # cache logic
  78. if use_cache:
  79. self.api_cache = getattr(self, "api_cache", {})
  80. cache_key = f"{parent_id}_{name}_{use_mime_type}"
  81. if cache_key in self.api_cache:
  82. logger.debug(f"cache hit for {cache_key=}")
  83. return self.api_cache[cache_key]
  84. # API logic
  85. debug_header: str = f"[searching {name=} in {parent_id=}]"
  86. query_string = f"'{parent_id}' in parents and name = '{name}' "
  87. if use_mime_type:
  88. query_string += f" and mimeType='application/vnd.google-apps.folder' "
  89. for attempt in range(retries):
  90. results = self.service.files().list(
  91. q=query_string,
  92. spaces='drive', # ie not appDataFolder or photos
  93. fields='files(id, name)'
  94. ).execute()
  95. items = results.get('files', [])
  96. if len(items) > 0:
  97. logger.debug(f"{debug_header} found {len(items)} matches, returning last of {','.join([i['id'] for i in items])}")
  98. _id = items[-1]['id']
  99. if use_cache: self.api_cache[cache_key] = _id
  100. return _id
  101. else:
  102. logger.debug(f'{debug_header} not found, attempt {attempt+1}/{retries}.')
  103. if attempt < retries - 1:
  104. logger.debug(f'sleeping for {sleep_seconds} second(s)')
  105. time.sleep(sleep_seconds)
  106. if raise_on_missing:
  107. raise ValueError(f'{debug_header} not found after {retries} attempt(s)')
  108. return None
  109. def _mkdir(self, name: str, parent_id: str):
  110. """
  111. Creates a new GDrive folder @name inside folder @parent_id
  112. Returns id of the created folder
  113. """
  114. logger.debug(f'Creating new folder with {name=} inside {parent_id=}')
  115. file_metadata = {
  116. 'name': [name],
  117. 'mimeType': 'application/vnd.google-apps.folder',
  118. 'parents': [parent_id]
  119. }
  120. gd_folder = self.service.files().create(body=file_metadata, fields='id').execute()
  121. return gd_folder.get('id')