s3_storage.py 2.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. import uuid, os
  2. from dataclasses import dataclass
  3. import boto3
  4. from botocore.errorfactory import ClientError
  5. from .base_storage import Storage
  6. from dataclasses import dataclass
  7. from loguru import logger
  8. @dataclass
  9. class S3Config:
  10. bucket: str
  11. region: str
  12. key: str
  13. secret: str
  14. folder: str = ""
  15. endpoint_url: str = "https://{region}.digitaloceanspaces.com"
  16. cdn_url: str = "https://{bucket}.{region}.cdn.digitaloceanspaces.com/{key}"
  17. private: bool = False
  18. key_path: str = "default" # 'default' uses full naming, 'random' uses generated uuid
  19. class S3Storage(Storage):
  20. def __init__(self, config: S3Config):
  21. self.bucket = config.bucket
  22. self.region = config.region
  23. self.folder = config.folder
  24. self.private = config.private
  25. self.cdn_url = config.cdn_url
  26. self.key_path = config.key_path
  27. self.key_dict = {}
  28. self.s3 = boto3.client(
  29. 's3',
  30. region_name=config.region,
  31. endpoint_url=config.endpoint_url.format(region=config.region),
  32. aws_access_key_id=config.key,
  33. aws_secret_access_key=config.secret
  34. )
  35. def _get_path(self, key):
  36. """
  37. Depends on the self.key_path configuration:
  38. * random - assigns a random UUID which can be used in conjunction with "private=false" to have unguessable documents publicly available -> self.folder/randomUUID
  39. * default -> defaults to self.folder/key
  40. """
  41. # defaults to /key
  42. final_key = key
  43. if self.key_path == "random":
  44. if key not in self.key_dict:
  45. ext = os.path.splitext(key)[1]
  46. self.key_dict[key] = f"{str(uuid.uuid4())}{ext}"
  47. final_key = self.key_dict[key]
  48. return os.path.join(self.folder, final_key)
  49. def get_cdn_url(self, key):
  50. return self.cdn_url.format(bucket=self.bucket, region=self.region, key=self._get_path(key))
  51. def exists(self, key):
  52. try:
  53. self.s3.head_object(Bucket=self.bucket, Key=self._get_path(key))
  54. return True
  55. except ClientError:
  56. return False
  57. def uploadf(self, file, key, **kwargs):
  58. if self.private:
  59. extra_args = kwargs.get("extra_args", {})
  60. else:
  61. extra_args = kwargs.get("extra_args", {'ACL': 'public-read'})
  62. self.s3.upload_fileobj(file, Bucket=self.bucket, Key=self._get_path(key), ExtraArgs=extra_args)