core.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511
  1. """This package enables saving and loading of python objects to disk
  2. while also backing to S3 storage. """
  3. import os
  4. import datetime
  5. import ntpath # to extract file name from path, OS-independent
  6. import traceback # for printing full stacktraces of errors
  7. import concurrent.futures # for asynchronous file uploads
  8. import pickle # for pickling files
  9. try: # for automatic caching of return values of functions
  10. from functools import lru_cache
  11. except ImportError:
  12. from functools32 import lru_cache # pylint: disable=E0401
  13. import pandas as pd
  14. import boto3 # to interact with AWS S3
  15. from botocore.exceptions import ClientError
  16. import dateutil # to make local change-time datetime objects time-aware
  17. import yaml # to read the s3bp config
  18. import feather # to read/write pandas dataframes as feather objects
  19. CFG_FILE_NAME = 's3bp_cfg.yml'
  20. DEFAULT_MAX_WORKERS = 5
  21. EXECUTOR = None
  22. # === Reading configuration ===
  23. def _s3bp_cfg_file_path():
  24. return os.path.abspath(os.path.join(
  25. os.path.dirname(os.path.realpath(__file__)),
  26. CFG_FILE_NAME))
  27. def _get_s3bp_cfg():
  28. try:
  29. with open(_s3bp_cfg_file_path(), 'r') as cfg_file:
  30. cfg = yaml.safe_load(cfg_file)
  31. if not isinstance(cfg, dict):
  32. cfg = {'base_dir_to_bucket_map': {}},
  33. return cfg
  34. except FileNotFoundError:
  35. with open(_s3bp_cfg_file_path(), 'w') as outfile:
  36. outfile.write(yaml.dump(
  37. {'base_dir_to_bucket_map': {}},
  38. default_flow_style=False
  39. ))
  40. return _get_s3bp_cfg()
  41. def _max_workers():
  42. try:
  43. return _get_s3bp_cfg()['max_workers']
  44. except KeyError:
  45. return DEFAULT_MAX_WORKERS
  46. def _default_bucket():
  47. return _get_s3bp_cfg()['default_bucket']
  48. def _base_dir_to_bucket_map():
  49. return _get_s3bp_cfg()['base_dir_to_bucket_map']
  50. def _base_dirs():
  51. return list(_get_s3bp_cfg()['base_dir_to_bucket_map'].keys())
  52. # === Setting configuration ===
  53. def _set_s3bp_cfg(cfg):
  54. with open(_s3bp_cfg_file_path(), 'w') as outfile:
  55. outfile.write(yaml.dump(cfg, default_flow_style=False))
  56. def set_max_workers(max_workers):
  57. """Sets the maximum number of workers in the thread pool used to
  58. asynchronously upload files. NOTE: Resets the current thread pool!"""
  59. cfg = _get_s3bp_cfg()
  60. cfg['max_workers'] = max_workers
  61. _set_s3bp_cfg(cfg)
  62. _get_executor(reset=True)
  63. def set_default_bucket(bucket_name):
  64. """Sets the given string as the default bucket name."""
  65. cfg = _get_s3bp_cfg()
  66. cfg['default_bucket'] = bucket_name
  67. _set_s3bp_cfg(cfg)
  68. def unset_default_bucket():
  69. """Unsets the currently set default bucket, if set."""
  70. cfg = _get_s3bp_cfg()
  71. cfg.pop('default_bucket', None)
  72. _set_s3bp_cfg(cfg)
  73. def _parse_dir_path(dir_path):
  74. if '~' in dir_path:
  75. return os.path.expanduser(dir_path)
  76. return dir_path
  77. def set_default_base_directory(base_directory):
  78. """Sets the given string as the default base directory name."""
  79. cfg = _get_s3bp_cfg()
  80. cfg['default_base_dir'] = _parse_dir_path(base_directory)
  81. _set_s3bp_cfg(cfg)
  82. def map_base_directory_to_bucket(base_directory, bucket_name):
  83. """Maps the given directory as a base directory of the given bucket.
  84. Arguments
  85. ---------
  86. base_directory : str
  87. The full path, from root, to the desired base directory.
  88. bucket_name : str
  89. The name of the bucket to map the given directory to.
  90. """
  91. cfg = _get_s3bp_cfg()
  92. parsed_path = _parse_dir_path(base_directory)
  93. if not isinstance(cfg['base_dir_to_bucket_map'], dict):
  94. cfg['base_dir_to_bucket_map'] = {}
  95. cfg['base_dir_to_bucket_map'][parsed_path] = bucket_name
  96. _set_s3bp_cfg(cfg)
  97. def remove_base_directory_mapping(base_directory):
  98. """Remove the mapping associated with the given directory, if exists."""
  99. cfg = _get_s3bp_cfg()
  100. parsed_path = _parse_dir_path(base_directory)
  101. cfg['base_dir_to_bucket_map'].pop(parsed_path, None)
  102. _set_s3bp_cfg(cfg)
  103. # === Getting parameters ===
  104. def _get_executor(reset=False):
  105. if reset:
  106. _get_executor.executor = concurrent.futures.ThreadPoolExecutor(
  107. _max_workers())
  108. try:
  109. return _get_executor.executor
  110. except AttributeError:
  111. _get_executor.executor = concurrent.futures.ThreadPoolExecutor(
  112. _max_workers())
  113. return _get_executor.executor
  114. @lru_cache(maxsize=32)
  115. def _get_bucket_by_name(bucket_name):
  116. s3_rsc = boto3.resource('s3')
  117. return s3_rsc.Bucket(bucket_name)
  118. @lru_cache(maxsize=32)
  119. def _get_base_dir_by_file_path_and_bucket_name(filepath, bucket_name):
  120. try:
  121. for directory in _base_dirs():
  122. if (directory in filepath) and (
  123. _base_dir_to_bucket_map()[directory] == bucket_name):
  124. return directory
  125. except (KeyError, AttributeError):
  126. return None
  127. return None
  128. def _bucket_name_and_base_dir_by_filepath(filepath):
  129. try:
  130. for directory in _base_dirs():
  131. if directory in filepath:
  132. return _base_dir_to_bucket_map()[directory], directory
  133. except (KeyError, AttributeError):
  134. pass
  135. try:
  136. return _default_bucket(), None
  137. except KeyError:
  138. raise ValueError(
  139. "No bucket name was given, and neither a default was defined "
  140. "nor could one be interpreted from the file path. Please "
  141. "provide one explicitly, or define an appropriate bucket.")
  142. return None, None
  143. def _get_key(filepath, namekey, base_directory):
  144. if namekey or not base_directory:
  145. return ntpath.basename(filepath)
  146. index = filepath.find(base_directory[base_directory.rfind('/'):])
  147. return filepath[index + 1:]
  148. @lru_cache(maxsize=32)
  149. def _get_bucket_and_key(filepath, bucket_name, namekey):
  150. base_directory = None
  151. if bucket_name is None:
  152. bucket_name, base_directory = _bucket_name_and_base_dir_by_filepath(
  153. filepath)
  154. elif not namekey:
  155. base_directory = _get_base_dir_by_file_path_and_bucket_name(
  156. filepath, bucket_name)
  157. os.makedirs(base_directory, exist_ok=True)
  158. bucket = _get_bucket_by_name(bucket_name)
  159. key = _get_key(filepath, namekey, base_directory)
  160. return bucket, key
  161. # === Uploading/Downloading files ===
  162. def _parse_file_path(filepath):
  163. if '~' in filepath:
  164. return os.path.expanduser(filepath)
  165. return filepath
  166. def _file_upload_thread(bucket, filepath, key):
  167. try:
  168. bucket.upload_file(filepath, key)
  169. except BaseException as exc: # pylint: disable=W0703
  170. print(
  171. 'File upload failed with following exception:\n{}'.format(exc),
  172. flush=True
  173. )
  174. def upload_file(filepath, bucket_name=None, namekey=None, wait=False):
  175. """Uploads the given file to S3 storage.
  176. Arguments
  177. ---------
  178. filepath : str
  179. The full path, from root, to the desired file.
  180. bucket_name (optional) : str
  181. The name of the bucket to upload the file to. If not given, it will be
  182. inferred from any defined base directory that is present on the path
  183. (there is no guarentee which base directory will be used if several are
  184. present in the given path). If base directory inferrence fails the
  185. default bukcet will be used, if defined, else the operation will fail.
  186. namekey (optional) : bool
  187. Indicate whether to use the name of the file as the key when uploading
  188. to the bucket. If set, or if no base directory is found in the
  189. filepath, the file name will be used as key. Otherwise, the path
  190. rooted at the detected base directory will be used, resulting in a
  191. directory-like structure in the S3 bucket.
  192. wait (optional) : bool
  193. Defaults to False. If set to True, the function will wait on the upload
  194. operation. Otherwise, the upload will be performed asynchronously in a
  195. separate thread.
  196. """
  197. filepath = _parse_file_path(filepath)
  198. bucket, key = _get_bucket_and_key(filepath, bucket_name, namekey)
  199. if wait:
  200. bucket.upload_file(filepath, key)
  201. else:
  202. _get_executor().submit(_file_upload_thread, bucket, filepath, key)
  203. def _file_time_modified(filepath):
  204. timestamp = os.path.getmtime(filepath)
  205. dt_obj = datetime.datetime.utcfromtimestamp(timestamp)
  206. # this is correct only because the non-time-aware obj is in UTC!
  207. dt_obj = dt_obj.replace(tzinfo=dateutil.tz.tzutc())
  208. return dt_obj
  209. def download_file(filepath, bucket_name=None, namekey=None, verbose=False):
  210. """Downloads the most recent version of the given file from S3, if needed.
  211. Arguments
  212. ---------
  213. filepath : str
  214. The full path, from root, to the desired file.
  215. bucket_name (optional) : str
  216. The name of the bucket to download the file from. If not given, it
  217. will be inferred from any defined base directory that is present on
  218. the path (there is no guarentee which base directory will be used if
  219. several are present in the given path). If base directory inferrence
  220. fails the default bukcet will be used, if defined, else the operation
  221. will fail.
  222. namekey (optional) : bool
  223. Indicate whether to use the name of the file as the key when
  224. downloading from the bucket. If set, or if no base directory is found
  225. in the filepath, the file name will be used as key. Otherwise, the path
  226. rooted at the detected base directory will be used, resulting in a
  227. directory-like structure in the S3 bucket.
  228. verbose (optional) : bool
  229. Defaults to False. If set to True, some informative messages will be
  230. printed.
  231. """
  232. filepath = _parse_file_path(filepath)
  233. bucket, key = _get_bucket_and_key(filepath, bucket_name, namekey)
  234. try:
  235. if os.path.isfile(filepath):
  236. if verbose:
  237. print('File %s found on disk.' % key)
  238. # this datetime object has tzinfo=dateutil.tz.utc()
  239. s3_last_modified = bucket.Object(key).get()['LastModified']
  240. if s3_last_modified > _file_time_modified(filepath):
  241. if verbose:
  242. print('But S3 has an updated version. Downloading...')
  243. bucket.download_file(key, filepath)
  244. else:
  245. if verbose:
  246. print('File %s NOT found on disk. Downloading...' % key)
  247. # creating non-existing dirs on the path
  248. if not os.path.exists(filepath):
  249. os.makedirs(filepath[:filepath.rfind('/')])
  250. bucket.download_file(key, filepath)
  251. except ClientError:
  252. if verbose:
  253. print('Loading dataframe failed with the following exception:')
  254. print(traceback.format_exc())
  255. raise ValueError('No dataframe found with key %s' % key)
  256. # === Saving/loading Python objects ===
  257. def _pickle_serialiazer(pyobject, filepath):
  258. pickle.dump(pyobject, open(filepath, 'wb'))
  259. def save_object(pyobject, filepath, bucket_name=None,
  260. serializer=_pickle_serialiazer, namekey=None, wait=False):
  261. """Saves the given object to S3 storage, caching it as the given file.
  262. Arguments
  263. ---------
  264. pyobject : object
  265. The python object to save.
  266. filepath : str
  267. The full path, from root, to the desired cache file.
  268. bucket_name (optional) : str
  269. The name of the bucket to upload the file to. If not given, it will be
  270. inferred from any defined base directory that is present on the path
  271. (there is no guarentee which base directory will be used if several are
  272. present in the given path). If base directory inferrence fails the
  273. default bukcet will be used, if defined, else the operation will fail.
  274. serializer (optional) : callable
  275. A callable that takes two positonal arguments, a Python object and a
  276. path to a file, and dumps the object to the given file. Defaults to a
  277. wrapper of pickle.dump.
  278. namekey (optional) : bool
  279. Indicate whether to use the name of the file as the key when uploading
  280. to the bucket. If set, or if no base directory is found in the
  281. filepath, the file name will be used as key. Otherwise, the path
  282. rooted at the detected base directory will be used, resulting in a
  283. directory-like structure in the S3 bucket.
  284. wait (optional) : bool
  285. Defaults to False. If set to True, the function will wait on the upload
  286. operation. Otherwise, the upload will be performed asynchronously in a
  287. separate thread.
  288. """
  289. serializer(pyobject, filepath)
  290. upload_file(filepath, bucket_name, namekey, wait)
  291. def _picke_deserializer(filepath):
  292. return pickle.load(open(filepath, 'rb'))
  293. def load_object(filepath, bucket_name=None, deserializer=_picke_deserializer,
  294. namekey=None, verbose=False):
  295. """Loads the most recent version of the object cached in the given file.
  296. Arguments
  297. ---------
  298. filepath : str
  299. The full path, from root, to the desired file.
  300. bucket_name (optional) : str
  301. The name of the bucket to download the file from. If not given, it
  302. will be inferred from any defined base directory that is present on
  303. the path (there is no guarentee which base directory will be used if
  304. several are present in the given path). If base directory inferrence
  305. fails the default bukcet will be used, if defined, else the operation
  306. will fail.
  307. deserializer (optional) : callable
  308. A callable that takes one positonal argument, a path to a file, and
  309. returns the object stored in it. Defaults to a wrapper of pickle.load.
  310. namekey (optional) : bool
  311. Indicate whether to use the name of the file as the key when
  312. downloading from the bucket. If set, or if no base directory is found
  313. in the filepath, the file name will be used as key. Otherwise, the path
  314. rooted at the detected base directory will be used, resulting in a
  315. directory-like structure in the S3 bucket.
  316. verbose (optional) : bool
  317. Defaults to False. If set to True, some informative messages will be
  318. printed.
  319. """
  320. download_file(filepath, bucket_name=bucket_name, namekey=namekey,
  321. verbose=verbose)
  322. return deserializer(filepath)
  323. # === Saving/loading dataframes ===
  324. def _pandas_df_csv_serializer(pyobject, filepath):
  325. pyobject.to_csv(filepath)
  326. def _pandas_df_excel_serializer(pyobject, filepath):
  327. pyobject.to_excel(filepath)
  328. def _pandas_df_feather_serializer(pyobject, filepath):
  329. feather.write_dataframe(pyobject, filepath)
  330. def _get_pandas_df_serializer(dformat):
  331. dformat = dformat.lower()
  332. if dformat == 'csv':
  333. return _pandas_df_csv_serializer
  334. if dformat == 'excel':
  335. return _pandas_df_excel_serializer
  336. if dformat == 'feather':
  337. return _pandas_df_feather_serializer
  338. def save_dataframe(df, filepath, bucket_name=None, dformat='csv', namekey=None,
  339. wait=False):
  340. """Writes the given dataframe as a CSV file to disk and S3 storage.
  341. Arguments
  342. ---------
  343. df : pandas.Dataframe
  344. The pandas Dataframe object to save.
  345. filepath : str
  346. The full path, from root, to the desired file.
  347. bucket_name (optional) : str
  348. The name of the bucket to upload the file to. If not given, it will be
  349. inferred from any defined base directory that is present on the path
  350. (there is no guarentee which base directory will be used if several are
  351. present in the given path). If base directory inferrence fails the
  352. default bukcet will be used, if defined, else the operation will fail.
  353. dformat (optional) : str
  354. The storage format for the Dataframe. One of 'csv','excel' and
  355. 'feather'. Defaults to 'csv'.
  356. namekey (optional) : bool
  357. Indicate whether to use the name of the file as the key when uploading
  358. to the bucket. If set, or if no base directory is found in the
  359. filepath, the file name will be used as key. Otherwise, the path
  360. rooted at the detected base directory will be used, resulting in a
  361. directory-like structure in the S3 bucket.
  362. wait (optional) : bool
  363. Defaults to False. If set to True, the function will wait on the upload
  364. operation. Otherwise, the upload will be performed asynchronously in a
  365. separate thread.
  366. """
  367. save_object(df, filepath, serializer=_get_pandas_df_serializer(dformat),
  368. bucket_name=bucket_name, namekey=namekey, wait=wait)
  369. def _pandas_df_csv_deserializer(filepath):
  370. return pd.read_csv(filepath)
  371. def _pandas_df_excel_deserializer(filepath):
  372. return pd.read_excel(filepath)
  373. def _pandas_df_feather_deserializer(filepath):
  374. return feather.read_dataframe(filepath)
  375. def _get_pandf_defserializer(dformat):
  376. dformat = dformat.lower()
  377. if dformat == 'csv':
  378. return _pandas_df_csv_deserializer
  379. if dformat == 'excel':
  380. return _pandas_df_excel_deserializer
  381. if dformat == 'feather':
  382. return _pandas_df_feather_deserializer
  383. def load_dataframe(filepath, bucket_name=None, dformat='csv', namekey=None,
  384. verbose=False):
  385. """Loads the most updated version of a dataframe from file, fetching it
  386. from S3 storage if necessary.
  387. Arguments
  388. ---------
  389. filepath : str
  390. The full path, from root, to the desired file.
  391. bucket_name (optional) : str
  392. The name of the bucket to download the file from. If not given, it
  393. will be inferred from any defined base directory that is present on
  394. the path (there is no guarentee which base directory will be used if
  395. several are present in the given path). If base directory inferrence
  396. fails the default bukcet will be used, if defined, else the operation
  397. will fail.
  398. dformat (optional) : str
  399. The storage format for the Dataframe. One of 'csv','excel' and
  400. 'feather'. Defaults to 'csv'.
  401. namekey (optional) : bool
  402. Indicate whether to use the name of the file as the key when
  403. downloading from the bucket. If set, or if no base directory is found
  404. in the filepath, the file name will be used as key. Otherwise, the path
  405. rooted at the detected base directory will be used, resulting in a
  406. directory-like structure in the S3 bucket.
  407. verbose (optional) : bool
  408. Defaults to False. If set to True, some informative messages will be
  409. printed.
  410. """
  411. return load_object(
  412. filepath, deserializer=_get_pandf_defserializer(dformat),
  413. bucket_name=bucket_name, namekey=namekey, verbose=verbose)