123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511 |
- """This package enables saving and loading of python objects to disk
- while also backing to S3 storage. """
- import os
- import datetime
- import ntpath # to extract file name from path, OS-independent
- import traceback # for printing full stacktraces of errors
- import concurrent.futures # for asynchronous file uploads
- import pickle # for pickling files
- try: # for automatic caching of return values of functions
- from functools import lru_cache
- except ImportError:
- from functools32 import lru_cache # pylint: disable=E0401
- import pandas as pd
- import boto3 # to interact with AWS S3
- from botocore.exceptions import ClientError
- import dateutil # to make local change-time datetime objects time-aware
- import yaml # to read the s3bp config
- import feather # to read/write pandas dataframes as feather objects
- CFG_FILE_NAME = 's3bp_cfg.yml'
- DEFAULT_MAX_WORKERS = 5
- EXECUTOR = None
- # === Reading configuration ===
- def _s3bp_cfg_file_path():
- return os.path.abspath(os.path.join(
- os.path.dirname(os.path.realpath(__file__)),
- CFG_FILE_NAME))
- def _get_s3bp_cfg():
- try:
- with open(_s3bp_cfg_file_path(), 'r') as cfg_file:
- cfg = yaml.safe_load(cfg_file)
- if not isinstance(cfg, dict):
- cfg = {'base_dir_to_bucket_map': {}},
- return cfg
- except FileNotFoundError:
- with open(_s3bp_cfg_file_path(), 'w') as outfile:
- outfile.write(yaml.dump(
- {'base_dir_to_bucket_map': {}},
- default_flow_style=False
- ))
- return _get_s3bp_cfg()
- def _max_workers():
- try:
- return _get_s3bp_cfg()['max_workers']
- except KeyError:
- return DEFAULT_MAX_WORKERS
- def _default_bucket():
- return _get_s3bp_cfg()['default_bucket']
- def _base_dir_to_bucket_map():
- return _get_s3bp_cfg()['base_dir_to_bucket_map']
- def _base_dirs():
- return list(_get_s3bp_cfg()['base_dir_to_bucket_map'].keys())
- # === Setting configuration ===
- def _set_s3bp_cfg(cfg):
- with open(_s3bp_cfg_file_path(), 'w') as outfile:
- outfile.write(yaml.dump(cfg, default_flow_style=False))
- def set_max_workers(max_workers):
- """Sets the maximum number of workers in the thread pool used to
- asynchronously upload files. NOTE: Resets the current thread pool!"""
- cfg = _get_s3bp_cfg()
- cfg['max_workers'] = max_workers
- _set_s3bp_cfg(cfg)
- _get_executor(reset=True)
- def set_default_bucket(bucket_name):
- """Sets the given string as the default bucket name."""
- cfg = _get_s3bp_cfg()
- cfg['default_bucket'] = bucket_name
- _set_s3bp_cfg(cfg)
- def unset_default_bucket():
- """Unsets the currently set default bucket, if set."""
- cfg = _get_s3bp_cfg()
- cfg.pop('default_bucket', None)
- _set_s3bp_cfg(cfg)
- def _parse_dir_path(dir_path):
- if '~' in dir_path:
- return os.path.expanduser(dir_path)
- return dir_path
- def set_default_base_directory(base_directory):
- """Sets the given string as the default base directory name."""
- cfg = _get_s3bp_cfg()
- cfg['default_base_dir'] = _parse_dir_path(base_directory)
- _set_s3bp_cfg(cfg)
- def map_base_directory_to_bucket(base_directory, bucket_name):
- """Maps the given directory as a base directory of the given bucket.
- Arguments
- ---------
- base_directory : str
- The full path, from root, to the desired base directory.
- bucket_name : str
- The name of the bucket to map the given directory to.
- """
- cfg = _get_s3bp_cfg()
- parsed_path = _parse_dir_path(base_directory)
- if not isinstance(cfg['base_dir_to_bucket_map'], dict):
- cfg['base_dir_to_bucket_map'] = {}
- cfg['base_dir_to_bucket_map'][parsed_path] = bucket_name
- _set_s3bp_cfg(cfg)
- def remove_base_directory_mapping(base_directory):
- """Remove the mapping associated with the given directory, if exists."""
- cfg = _get_s3bp_cfg()
- parsed_path = _parse_dir_path(base_directory)
- cfg['base_dir_to_bucket_map'].pop(parsed_path, None)
- _set_s3bp_cfg(cfg)
- # === Getting parameters ===
- def _get_executor(reset=False):
- if reset:
- _get_executor.executor = concurrent.futures.ThreadPoolExecutor(
- _max_workers())
- try:
- return _get_executor.executor
- except AttributeError:
- _get_executor.executor = concurrent.futures.ThreadPoolExecutor(
- _max_workers())
- return _get_executor.executor
- @lru_cache(maxsize=32)
- def _get_bucket_by_name(bucket_name):
- s3_rsc = boto3.resource('s3')
- return s3_rsc.Bucket(bucket_name)
- @lru_cache(maxsize=32)
- def _get_base_dir_by_file_path_and_bucket_name(filepath, bucket_name):
- try:
- for directory in _base_dirs():
- if (directory in filepath) and (
- _base_dir_to_bucket_map()[directory] == bucket_name):
- return directory
- except (KeyError, AttributeError):
- return None
- return None
- def _bucket_name_and_base_dir_by_filepath(filepath):
- try:
- for directory in _base_dirs():
- if directory in filepath:
- return _base_dir_to_bucket_map()[directory], directory
- except (KeyError, AttributeError):
- pass
- try:
- return _default_bucket(), None
- except KeyError:
- raise ValueError(
- "No bucket name was given, and neither a default was defined "
- "nor could one be interpreted from the file path. Please "
- "provide one explicitly, or define an appropriate bucket.")
- return None, None
- def _get_key(filepath, namekey, base_directory):
- if namekey or not base_directory:
- return ntpath.basename(filepath)
- index = filepath.find(base_directory[base_directory.rfind('/'):])
- return filepath[index + 1:]
- @lru_cache(maxsize=32)
- def _get_bucket_and_key(filepath, bucket_name, namekey):
- base_directory = None
- if bucket_name is None:
- bucket_name, base_directory = _bucket_name_and_base_dir_by_filepath(
- filepath)
- elif not namekey:
- base_directory = _get_base_dir_by_file_path_and_bucket_name(
- filepath, bucket_name)
- os.makedirs(base_directory, exist_ok=True)
- bucket = _get_bucket_by_name(bucket_name)
- key = _get_key(filepath, namekey, base_directory)
- return bucket, key
- # === Uploading/Downloading files ===
- def _parse_file_path(filepath):
- if '~' in filepath:
- return os.path.expanduser(filepath)
- return filepath
- def _file_upload_thread(bucket, filepath, key):
- try:
- bucket.upload_file(filepath, key)
- except BaseException as exc: # pylint: disable=W0703
- print(
- 'File upload failed with following exception:\n{}'.format(exc),
- flush=True
- )
- def upload_file(filepath, bucket_name=None, namekey=None, wait=False):
- """Uploads the given file to S3 storage.
- Arguments
- ---------
- filepath : str
- The full path, from root, to the desired file.
- bucket_name (optional) : str
- The name of the bucket to upload the file to. If not given, it will be
- inferred from any defined base directory that is present on the path
- (there is no guarentee which base directory will be used if several are
- present in the given path). If base directory inferrence fails the
- default bukcet will be used, if defined, else the operation will fail.
- namekey (optional) : bool
- Indicate whether to use the name of the file as the key when uploading
- to the bucket. If set, or if no base directory is found in the
- filepath, the file name will be used as key. Otherwise, the path
- rooted at the detected base directory will be used, resulting in a
- directory-like structure in the S3 bucket.
- wait (optional) : bool
- Defaults to False. If set to True, the function will wait on the upload
- operation. Otherwise, the upload will be performed asynchronously in a
- separate thread.
- """
- filepath = _parse_file_path(filepath)
- bucket, key = _get_bucket_and_key(filepath, bucket_name, namekey)
- if wait:
- bucket.upload_file(filepath, key)
- else:
- _get_executor().submit(_file_upload_thread, bucket, filepath, key)
- def _file_time_modified(filepath):
- timestamp = os.path.getmtime(filepath)
- dt_obj = datetime.datetime.utcfromtimestamp(timestamp)
- # this is correct only because the non-time-aware obj is in UTC!
- dt_obj = dt_obj.replace(tzinfo=dateutil.tz.tzutc())
- return dt_obj
- def download_file(filepath, bucket_name=None, namekey=None, verbose=False):
- """Downloads the most recent version of the given file from S3, if needed.
- Arguments
- ---------
- filepath : str
- The full path, from root, to the desired file.
- bucket_name (optional) : str
- The name of the bucket to download the file from. If not given, it
- will be inferred from any defined base directory that is present on
- the path (there is no guarentee which base directory will be used if
- several are present in the given path). If base directory inferrence
- fails the default bukcet will be used, if defined, else the operation
- will fail.
- namekey (optional) : bool
- Indicate whether to use the name of the file as the key when
- downloading from the bucket. If set, or if no base directory is found
- in the filepath, the file name will be used as key. Otherwise, the path
- rooted at the detected base directory will be used, resulting in a
- directory-like structure in the S3 bucket.
- verbose (optional) : bool
- Defaults to False. If set to True, some informative messages will be
- printed.
- """
- filepath = _parse_file_path(filepath)
- bucket, key = _get_bucket_and_key(filepath, bucket_name, namekey)
- try:
- if os.path.isfile(filepath):
- if verbose:
- print('File %s found on disk.' % key)
- # this datetime object has tzinfo=dateutil.tz.utc()
- s3_last_modified = bucket.Object(key).get()['LastModified']
- if s3_last_modified > _file_time_modified(filepath):
- if verbose:
- print('But S3 has an updated version. Downloading...')
- bucket.download_file(key, filepath)
- else:
- if verbose:
- print('File %s NOT found on disk. Downloading...' % key)
- # creating non-existing dirs on the path
- if not os.path.exists(filepath):
- os.makedirs(filepath[:filepath.rfind('/')])
- bucket.download_file(key, filepath)
- except ClientError:
- if verbose:
- print('Loading dataframe failed with the following exception:')
- print(traceback.format_exc())
- raise ValueError('No dataframe found with key %s' % key)
- # === Saving/loading Python objects ===
- def _pickle_serialiazer(pyobject, filepath):
- pickle.dump(pyobject, open(filepath, 'wb'))
- def save_object(pyobject, filepath, bucket_name=None,
- serializer=_pickle_serialiazer, namekey=None, wait=False):
- """Saves the given object to S3 storage, caching it as the given file.
- Arguments
- ---------
- pyobject : object
- The python object to save.
- filepath : str
- The full path, from root, to the desired cache file.
- bucket_name (optional) : str
- The name of the bucket to upload the file to. If not given, it will be
- inferred from any defined base directory that is present on the path
- (there is no guarentee which base directory will be used if several are
- present in the given path). If base directory inferrence fails the
- default bukcet will be used, if defined, else the operation will fail.
- serializer (optional) : callable
- A callable that takes two positonal arguments, a Python object and a
- path to a file, and dumps the object to the given file. Defaults to a
- wrapper of pickle.dump.
- namekey (optional) : bool
- Indicate whether to use the name of the file as the key when uploading
- to the bucket. If set, or if no base directory is found in the
- filepath, the file name will be used as key. Otherwise, the path
- rooted at the detected base directory will be used, resulting in a
- directory-like structure in the S3 bucket.
- wait (optional) : bool
- Defaults to False. If set to True, the function will wait on the upload
- operation. Otherwise, the upload will be performed asynchronously in a
- separate thread.
- """
- serializer(pyobject, filepath)
- upload_file(filepath, bucket_name, namekey, wait)
- def _picke_deserializer(filepath):
- return pickle.load(open(filepath, 'rb'))
- def load_object(filepath, bucket_name=None, deserializer=_picke_deserializer,
- namekey=None, verbose=False):
- """Loads the most recent version of the object cached in the given file.
- Arguments
- ---------
- filepath : str
- The full path, from root, to the desired file.
- bucket_name (optional) : str
- The name of the bucket to download the file from. If not given, it
- will be inferred from any defined base directory that is present on
- the path (there is no guarentee which base directory will be used if
- several are present in the given path). If base directory inferrence
- fails the default bukcet will be used, if defined, else the operation
- will fail.
- deserializer (optional) : callable
- A callable that takes one positonal argument, a path to a file, and
- returns the object stored in it. Defaults to a wrapper of pickle.load.
- namekey (optional) : bool
- Indicate whether to use the name of the file as the key when
- downloading from the bucket. If set, or if no base directory is found
- in the filepath, the file name will be used as key. Otherwise, the path
- rooted at the detected base directory will be used, resulting in a
- directory-like structure in the S3 bucket.
- verbose (optional) : bool
- Defaults to False. If set to True, some informative messages will be
- printed.
- """
- download_file(filepath, bucket_name=bucket_name, namekey=namekey,
- verbose=verbose)
- return deserializer(filepath)
- # === Saving/loading dataframes ===
- def _pandas_df_csv_serializer(pyobject, filepath):
- pyobject.to_csv(filepath)
- def _pandas_df_excel_serializer(pyobject, filepath):
- pyobject.to_excel(filepath)
- def _pandas_df_feather_serializer(pyobject, filepath):
- feather.write_dataframe(pyobject, filepath)
- def _get_pandas_df_serializer(dformat):
- dformat = dformat.lower()
- if dformat == 'csv':
- return _pandas_df_csv_serializer
- if dformat == 'excel':
- return _pandas_df_excel_serializer
- if dformat == 'feather':
- return _pandas_df_feather_serializer
- def save_dataframe(df, filepath, bucket_name=None, dformat='csv', namekey=None,
- wait=False):
- """Writes the given dataframe as a CSV file to disk and S3 storage.
- Arguments
- ---------
- df : pandas.Dataframe
- The pandas Dataframe object to save.
- filepath : str
- The full path, from root, to the desired file.
- bucket_name (optional) : str
- The name of the bucket to upload the file to. If not given, it will be
- inferred from any defined base directory that is present on the path
- (there is no guarentee which base directory will be used if several are
- present in the given path). If base directory inferrence fails the
- default bukcet will be used, if defined, else the operation will fail.
- dformat (optional) : str
- The storage format for the Dataframe. One of 'csv','excel' and
- 'feather'. Defaults to 'csv'.
- namekey (optional) : bool
- Indicate whether to use the name of the file as the key when uploading
- to the bucket. If set, or if no base directory is found in the
- filepath, the file name will be used as key. Otherwise, the path
- rooted at the detected base directory will be used, resulting in a
- directory-like structure in the S3 bucket.
- wait (optional) : bool
- Defaults to False. If set to True, the function will wait on the upload
- operation. Otherwise, the upload will be performed asynchronously in a
- separate thread.
- """
- save_object(df, filepath, serializer=_get_pandas_df_serializer(dformat),
- bucket_name=bucket_name, namekey=namekey, wait=wait)
- def _pandas_df_csv_deserializer(filepath):
- return pd.read_csv(filepath)
- def _pandas_df_excel_deserializer(filepath):
- return pd.read_excel(filepath)
- def _pandas_df_feather_deserializer(filepath):
- return feather.read_dataframe(filepath)
- def _get_pandf_defserializer(dformat):
- dformat = dformat.lower()
- if dformat == 'csv':
- return _pandas_df_csv_deserializer
- if dformat == 'excel':
- return _pandas_df_excel_deserializer
- if dformat == 'feather':
- return _pandas_df_feather_deserializer
- def load_dataframe(filepath, bucket_name=None, dformat='csv', namekey=None,
- verbose=False):
- """Loads the most updated version of a dataframe from file, fetching it
- from S3 storage if necessary.
- Arguments
- ---------
- filepath : str
- The full path, from root, to the desired file.
- bucket_name (optional) : str
- The name of the bucket to download the file from. If not given, it
- will be inferred from any defined base directory that is present on
- the path (there is no guarentee which base directory will be used if
- several are present in the given path). If base directory inferrence
- fails the default bukcet will be used, if defined, else the operation
- will fail.
- dformat (optional) : str
- The storage format for the Dataframe. One of 'csv','excel' and
- 'feather'. Defaults to 'csv'.
- namekey (optional) : bool
- Indicate whether to use the name of the file as the key when
- downloading from the bucket. If set, or if no base directory is found
- in the filepath, the file name will be used as key. Otherwise, the path
- rooted at the detected base directory will be used, resulting in a
- directory-like structure in the S3 bucket.
- verbose (optional) : bool
- Defaults to False. If set to True, some informative messages will be
- printed.
- """
- return load_object(
- filepath, deserializer=_get_pandf_defserializer(dformat),
- bucket_name=bucket_name, namekey=namekey, verbose=verbose)
|