123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138 |
- import os, time
- from loguru import logger
- from .base_storage import Storage
- from dataclasses import dataclass
- from googleapiclient.discovery import build
- from googleapiclient.http import MediaFileUpload
- from google.oauth2 import service_account
- @dataclass
- class GDConfig:
- root_folder_id: str
- folder: str = "default"
- service_account: str = "service_account.json"
- class GDStorage(Storage):
- def __init__(self, config: GDConfig):
- self.folder = config.folder
- self.root_folder_id = config.root_folder_id
- creds = service_account.Credentials.from_service_account_file(
- config.service_account, scopes=['https://www.googleapis.com/auth/drive'])
- self.service = build('drive', 'v3', credentials=creds)
- def get_cdn_url(self, key):
- """
- only support files saved in a folder for GD
- S3 supports folder and all stored in the root
- """
- full_name = os.path.join(self.folder, key)
- parent_id, folder_id = self.root_folder_id, None
- path_parts = full_name.split(os.path.sep)
- filename = path_parts[-1]
- logger.info(f"looking for folders for {path_parts[0:-1]} before uploading {filename=}")
- for folder in path_parts[0:-1]:
- folder_id = self._get_id_from_parent_and_name(parent_id, folder, use_mime_type=True, raise_on_missing=True)
- parent_id = folder_id
-
- file_id = self._get_id_from_parent_and_name(folder_id, filename)
- return f"https://drive.google.com/file/d/{file_id}/view?usp=sharing"
- def exists(self, key):
- try:
- self.get_cdn_url(key)
- return True
- except: return False
- def uploadf(self, file: str, key: str, **_kwargs):
- """
- 1. for each sub-folder in the path check if exists or create
- 2. upload file to root_id/other_paths.../filename
- """
- full_name = os.path.join(self.folder, key)
- parent_id, upload_to = self.root_folder_id, None
- path_parts = full_name.split(os.path.sep)
- filename = path_parts[-1]
- logger.info(f"checking folders {path_parts[0:-1]} exist (or creating) before uploading {filename=}")
- for folder in path_parts[0:-1]:
- upload_to = self._get_id_from_parent_and_name(parent_id, folder, use_mime_type=True, raise_on_missing=False)
- if upload_to is None:
- upload_to = self._mkdir(folder, parent_id)
- parent_id = upload_to
-
- logger.debug(f'uploading {filename=} to folder id {upload_to}')
- file_metadata = {
- 'name': [filename],
- 'parents': [upload_to]
- }
- media = MediaFileUpload(file, resumable=True)
- gd_file = self.service.files().create(body=file_metadata, media_body=media, fields='id').execute()
- logger.debug(f'uploadf: uploaded file {gd_file["id"]} succesfully in folder={upload_to}')
- def upload(self, filename: str, key: str, **kwargs):
-
- self.uploadf(filename, key, **kwargs)
- def _get_id_from_parent_and_name(self, parent_id: str, name: str, retries: int = 1, sleep_seconds: int = 10, use_mime_type: bool = False, raise_on_missing: bool = True, use_cache=True):
- """
- Retrieves the id of a folder or file from its @name and the @parent_id folder
- Optionally does multiple @retries and sleeps @sleep_seconds between them
- If @use_mime_type will restrict search to "mimeType='application/vnd.google-apps.folder'"
- If @raise_on_missing will throw error when not found, or returns None
- Will remember previous calls to avoid duplication if @use_cache
- Returns the id of the file or folder from its name as a string
- """
-
- if use_cache:
- self.api_cache = getattr(self, "api_cache", {})
- cache_key = f"{parent_id}_{name}_{use_mime_type}"
- if cache_key in self.api_cache:
- logger.debug(f"cache hit for {cache_key=}")
- return self.api_cache[cache_key]
-
- debug_header: str = f"[searching {name=} in {parent_id=}]"
- query_string = f"'{parent_id}' in parents and name = '{name}' "
- if use_mime_type:
- query_string += f" and mimeType='application/vnd.google-apps.folder' "
- for attempt in range(retries):
- results = self.service.files().list(
- q=query_string,
- spaces='drive',
- fields='files(id, name)'
- ).execute()
- items = results.get('files', [])
- if len(items) > 0:
- logger.debug(f"{debug_header} found {len(items)} matches, returning last of {','.join([i['id'] for i in items])}")
- _id = items[-1]['id']
- if use_cache: self.api_cache[cache_key] = _id
- return _id
- else:
- logger.debug(f'{debug_header} not found, attempt {attempt+1}/{retries}.')
- if attempt < retries - 1:
- logger.debug(f'sleeping for {sleep_seconds} second(s)')
- time.sleep(sleep_seconds)
- if raise_on_missing:
- raise ValueError(f'{debug_header} not found after {retries} attempt(s)')
- return None
- def _mkdir(self, name: str, parent_id: str):
- """
- Creates a new GDrive folder @name inside folder @parent_id
- Returns id of the created folder
- """
- logger.debug(f'Creating new folder with {name=} inside {parent_id=}')
- file_metadata = {
- 'name': [name],
- 'mimeType': 'application/vnd.google-apps.folder',
- 'parents': [parent_id]
- }
- gd_folder = self.service.files().create(body=file_metadata, fields='id').execute()
- return gd_folder.get('id')
|