utils.py 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. import concurrent.futures
  2. import logging
  3. import re
  4. logger = logging.getLogger(__name__)
  5. # S3 multi-part upload parts must be larger than 5mb
  6. KB = 1024
  7. MB = KB**2
  8. GB = KB**3
  9. TB = KB**4
  10. MIN_S3_SIZE = 5 * MB
  11. def _thread_run(item, callback):
  12. for _ in range(3):
  13. # re try 3 times before giving up
  14. try:
  15. response = callback(item)
  16. return response
  17. except Exception:
  18. logger.exception("Retry failed batch of: {}".format(item))
  19. def _threads(num_threads, data, callback):
  20. results = []
  21. with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor: # noqa: E501
  22. futures = (
  23. executor.submit(_thread_run, d, callback)
  24. for d in data
  25. )
  26. for future in concurrent.futures.as_completed(futures):
  27. result = future.result()
  28. if not result:
  29. raise Exception("no response gotten from callback")
  30. results.append(result)
  31. return results
  32. def _create_s3_client(session, s3_client_kwargs=None):
  33. if s3_client_kwargs is None:
  34. s3_client_kwargs = {}
  35. return session.client('s3', **s3_client_kwargs)
  36. def _chunk_by_size(file_list, min_file_size):
  37. """Split list by size of file
  38. Arguments:
  39. file_list {list} -- List of tuples as (<filename>, <file_size>)
  40. min_file_size {int} -- Min part file size in bytes
  41. Returns:
  42. list -- Each list of files is the min file size
  43. """
  44. grouped_list = []
  45. current_list = []
  46. current_size = 0
  47. current_index = 1
  48. for p in file_list:
  49. current_size += p[1]
  50. current_list.append(p)
  51. if min_file_size is not None and current_size > min_file_size:
  52. grouped_list.append((current_index, current_list))
  53. current_list = []
  54. current_size = 0
  55. current_index += 1
  56. # Get anything left over
  57. if current_size != 0:
  58. grouped_list.append((current_index, current_list))
  59. return grouped_list
  60. def _convert_to_bytes(value):
  61. """Convert the input value to bytes
  62. Arguments:
  63. value {string} -- Value and size of the input with no spaces
  64. Returns:
  65. float -- The value converted to bytes as a float
  66. Raises:
  67. ValueError -- if the input value is not a valid type to convert
  68. """
  69. if value is None:
  70. return None
  71. value = value.strip()
  72. sizes = {'KB': 1024,
  73. 'MB': 1024**2,
  74. 'GB': 1024**3,
  75. 'TB': 1024**4,
  76. }
  77. if value[-2:].upper() in sizes:
  78. return float(value[:-2].strip()) * sizes[value[-2:].upper()]
  79. elif re.match(r'^\d+(\.\d+)?$', value):
  80. return float(value)
  81. elif re.match(r'^\d+(\.\d+)?\s?B$', value):
  82. return float(value[:-1])
  83. else:
  84. raise ValueError("Value {} is not a valid size".format(value))