__init__.py 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566
  1. import boto3
  2. import logging
  3. from .utils import _create_s3_client, _convert_to_bytes, _chunk_by_size
  4. from .multipart_upload_job import MultipartUploadJob
  5. logger = logging.getLogger(__name__)
  6. class S3Concat:
  7. def __init__(self, bucket, key, min_file_size,
  8. content_type='application/octet-stream',
  9. session=boto3.session.Session(),
  10. s3_client_kwargs=None,
  11. s3_client=None):
  12. self.bucket = bucket
  13. self.key = key
  14. self.min_file_size = _convert_to_bytes(min_file_size)
  15. self.content_type = content_type
  16. self.all_files = []
  17. self.s3 = s3_client or _create_s3_client(session, s3_client_kwargs=s3_client_kwargs) # noqa: E501
  18. def concat(self, small_parts_threads=1):
  19. grouped_parts_list = _chunk_by_size(self.all_files, self.min_file_size)
  20. logger.info("Created {} concatenation groups"
  21. .format(len(grouped_parts_list)))
  22. part_keys = []
  23. for part_data in grouped_parts_list:
  24. upload_resp = MultipartUploadJob(
  25. self.bucket,
  26. self.key,
  27. part_data,
  28. self.s3,
  29. small_parts_threads=small_parts_threads,
  30. add_part_number=self.min_file_size is not None,
  31. content_type=self.content_type,
  32. )
  33. part_keys.append(upload_resp.result_filepath)
  34. return part_keys
  35. def add_files(self, prefix):
  36. def resp_to_filelist(resp):
  37. return [(x['Key'], x['Size']) for x in resp['Contents']]
  38. objects_list = []
  39. resp = self.s3.list_objects(Bucket=self.bucket, Prefix=prefix)
  40. objects_list.extend(resp_to_filelist(resp))
  41. logger.info("Found {} objects so far".format(len(objects_list)))
  42. while resp['IsTruncated']:
  43. last_key = objects_list[-1][0]
  44. resp = self.s3.list_objects(Bucket=self.bucket,
  45. Prefix=prefix,
  46. Marker=last_key)
  47. objects_list.extend(resp_to_filelist(resp))
  48. logger.info("Found {} objects so far".format(len(objects_list)))
  49. self.all_files.extend(objects_list)
  50. def add_file(self, key):
  51. resp = self.s3.head_object(Bucket=self.bucket, Key=key)
  52. self.all_files.append((key, resp['ContentLength']))