123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816 |
- from __future__ import absolute_import
- from __future__ import print_function
- from __future__ import unicode_literals
- __all__ = ["S3FS"]
- import contextlib
- from datetime import datetime
- import io
- import itertools
- import os
- from ssl import SSLError
- import tempfile
- import threading
- import mimetypes
- import boto3
- from botocore.exceptions import ClientError, EndpointConnectionError
- import six
- from six import text_type
- from fs import ResourceType
- from fs.base import FS
- from fs.info import Info
- from fs import errors
- from fs.mode import Mode
- from fs.subfs import SubFS
- from fs.path import basename, dirname, forcedir, join, normpath, relpath
- from fs.time import datetime_to_epoch
- def _make_repr(class_name, *args, **kwargs):
- """
- Generate a repr string.
- Positional arguments should be the positional arguments used to
- construct the class. Keyword arguments should consist of tuples of
- the attribute value and default. If the value is the default, then
- it won't be rendered in the output.
- Here's an example::
- def __repr__(self):
- return make_repr('MyClass', 'foo', name=(self.name, None))
- The output of this would be something line ``MyClass('foo',
- name='Will')``.
- """
- arguments = [repr(arg) for arg in args]
- arguments.extend(
- "{}={!r}".format(name, value)
- for name, (value, default) in sorted(kwargs.items())
- if value != default
- )
- return "{}({})".format(class_name, ", ".join(arguments))
- class S3File(io.IOBase):
- """Proxy for a S3 file."""
- @classmethod
- def factory(cls, filename, mode, on_close):
- """Create a S3File backed with a temporary file."""
- _temp_file = tempfile.TemporaryFile()
- proxy = cls(_temp_file, filename, mode, on_close=on_close)
- return proxy
- def __repr__(self):
- return _make_repr(
- self.__class__.__name__, self.__filename, text_type(self.__mode)
- )
- def __init__(self, f, filename, mode, on_close=None):
- self._f = f
- self.__filename = filename
- self.__mode = mode
- self._on_close = on_close
- def __enter__(self):
- return self
- def __exit__(self, exc_type, exc_value, traceback):
- self.close()
- @property
- def raw(self):
- return self._f
- def close(self):
- if self._on_close is not None:
- self._on_close(self)
- @property
- def closed(self):
- return self._f.closed
- def fileno(self):
- return self._f.fileno()
- def flush(self):
- return self._f.flush()
- def isatty(self):
- return self._f.asatty()
- def readable(self):
- return self.__mode.reading
- def readline(self, limit=-1):
- return self._f.readline(limit)
- def readlines(self, hint=-1):
- if hint == -1:
- return self._f.readlines(hint)
- else:
- size = 0
- lines = []
- for line in iter(self._f.readline, b""):
- lines.append(line)
- size += len(line)
- if size > hint:
- break
- return lines
- def seek(self, offset, whence=os.SEEK_SET):
- if whence not in (os.SEEK_CUR, os.SEEK_END, os.SEEK_SET):
- raise ValueError("invalid value for 'whence'")
- self._f.seek(offset, whence)
- return self._f.tell()
- def seekable(self):
- return True
- def tell(self):
- return self._f.tell()
- def writable(self):
- return self.__mode.writing
- def writelines(self, lines):
- return self._f.writelines(lines)
- def read(self, n=-1):
- if not self.__mode.reading:
- raise IOError("not open for reading")
- return self._f.read(n)
- def readall(self):
- return self._f.readall()
- def readinto(self, b):
- return self._f.readinto()
- def write(self, b):
- if not self.__mode.writing:
- raise IOError("not open for reading")
- self._f.write(b)
- return len(b)
- def truncate(self, size=None):
- if size is None:
- size = self._f.tell()
- self._f.truncate(size)
- return size
- @contextlib.contextmanager
- def s3errors(path):
- """Translate S3 errors to FSErrors."""
- try:
- yield
- except ClientError as error:
- _error = error.response.get("Error", {})
- error_code = _error.get("Code", None)
- response_meta = error.response.get("ResponseMetadata", {})
- http_status = response_meta.get("HTTPStatusCode", 200)
- error_msg = _error.get("Message", None)
- if error_code == "NoSuchBucket":
- raise errors.ResourceError(path, exc=error, msg=error_msg)
- if http_status == 404:
- raise errors.ResourceNotFound(path)
- elif http_status == 403:
- raise errors.PermissionDenied(path=path, msg=error_msg)
- else:
- raise errors.OperationFailed(path=path, exc=error)
- except SSLError as error:
- raise errors.OperationFailed(path, exc=error)
- except EndpointConnectionError as error:
- raise errors.RemoteConnectionError(path, exc=error, msg="{}".format(error))
- @six.python_2_unicode_compatible
- class S3FS(FS):
- """
- Construct an Amazon S3 filesystem for
- `PyFilesystem <https://pyfilesystem.org>`_
- :param str bucket_name: The S3 bucket name.
- :param str dir_path: The root directory within the S3 Bucket.
- Defaults to ``"/"``
- :param str aws_access_key_id: The access key, or ``None`` to read
- the key from standard configuration files.
- :param str aws_secret_access_key: The secret key, or ``None`` to
- read the key from standard configuration files.
- :param str endpoint_url: Alternative endpoint url (``None`` to use
- default).
- :param str aws_session_token:
- :param str region: Optional S3 region.
- :param str delimiter: The delimiter to separate folders, defaults to
- a forward slash.
- :param bool strict: When ``True`` (default) S3FS will follow the
- PyFilesystem specification exactly. Set to ``False`` to disable
- validation of destination paths which may speed up uploads /
- downloads.
- :param str cache_control: Sets the 'Cache-Control' header for uploads.
- :param str acl: Sets the Access Control List header for uploads.
- :param dict upload_args: A dictionary for additional upload arguments.
- See https://boto3.readthedocs.io/en/latest/reference/services/s3.html#S3.Object.put
- for details.
- :param dict download_args: Dictionary of extra arguments passed to
- the S3 client.
- """
- _meta = {
- "case_insensitive": False,
- "invalid_path_chars": "\0",
- "network": True,
- "read_only": False,
- "thread_safe": True,
- "unicode_paths": True,
- "virtual": False,
- }
- _object_attributes = [
- "accept_ranges",
- "cache_control",
- "content_disposition",
- "content_encoding",
- "content_language",
- "content_length",
- "content_type",
- "delete_marker",
- "e_tag",
- "expiration",
- "expires",
- "last_modified",
- "metadata",
- "missing_meta",
- "parts_count",
- "replication_status",
- "request_charged",
- "restore",
- "server_side_encryption",
- "sse_customer_algorithm",
- "sse_customer_key_md5",
- "ssekms_key_id",
- "storage_class",
- "version_id",
- "website_redirect_location",
- ]
- def __init__(
- self,
- bucket_name,
- dir_path="/",
- aws_access_key_id=None,
- aws_secret_access_key=None,
- aws_session_token=None,
- endpoint_url=None,
- region=None,
- delimiter="/",
- strict=True,
- cache_control=None,
- acl=None,
- upload_args=None,
- download_args=None,
- ):
- _creds = (aws_access_key_id, aws_secret_access_key)
- if any(_creds) and not all(_creds):
- raise ValueError(
- "aws_access_key_id and aws_secret_access_key "
- "must be set together if specified"
- )
- self._bucket_name = bucket_name
- self.dir_path = dir_path
- self._prefix = relpath(normpath(dir_path)).rstrip("/")
- self.aws_access_key_id = aws_access_key_id
- self.aws_secret_access_key = aws_secret_access_key
- self.aws_session_token = aws_session_token
- self.endpoint_url = endpoint_url
- self.region = region
- self.delimiter = delimiter
- self.strict = strict
- self._tlocal = threading.local()
- if cache_control or acl:
- upload_args = upload_args or {}
- if cache_control:
- upload_args["CacheControl"] = cache_control
- if acl:
- upload_args["ACL"] = acl
- self.upload_args = upload_args
- self.download_args = download_args
- super(S3FS, self).__init__()
- def __repr__(self):
- return _make_repr(
- self.__class__.__name__,
- self._bucket_name,
- dir_path=(self.dir_path, "/"),
- region=(self.region, None),
- delimiter=(self.delimiter, "/"),
- )
- def __str__(self):
- return "<s3fs '{}'>".format(join(self._bucket_name, relpath(self.dir_path)))
- def _path_to_key(self, path):
- """Converts an fs path to a s3 key."""
- _path = relpath(normpath(path))
- _key = (
- "{}/{}".format(self._prefix, _path).lstrip("/").replace("/", self.delimiter)
- )
- return _key
- def _path_to_dir_key(self, path):
- """Converts an fs path to a s3 key."""
- _path = relpath(normpath(path))
- _key = (
- forcedir("{}/{}".format(self._prefix, _path))
- .lstrip("/")
- .replace("/", self.delimiter)
- )
- return _key
- def _key_to_path(self, key):
- return key.replace(self.delimiter, "/")
- def _get_object(self, path, key):
- _key = key.rstrip(self.delimiter)
- try:
- with s3errors(path):
- obj = self.s3.Object(self._bucket_name, _key)
- obj.load()
- except errors.ResourceNotFound:
- with s3errors(path):
- obj = self.s3.Object(self._bucket_name, _key + self.delimiter)
- obj.load()
- return obj
- else:
- return obj
- def _get_upload_args(self, key):
- upload_args = self.upload_args.copy() if self.upload_args else {}
- if "ContentType" not in upload_args:
- mime_type, _encoding = mimetypes.guess_type(key)
- if six.PY2 and mime_type is not None:
- mime_type = mime_type.decode("utf-8", "replace")
- upload_args["ContentType"] = mime_type or "binary/octet-stream"
- return upload_args
- @property
- def s3(self):
- if not hasattr(self._tlocal, "s3"):
- self._tlocal.s3 = boto3.resource(
- "s3",
- region_name=self.region,
- aws_access_key_id=self.aws_access_key_id,
- aws_secret_access_key=self.aws_secret_access_key,
- aws_session_token=self.aws_session_token,
- endpoint_url=self.endpoint_url,
- )
- return self._tlocal.s3
- @property
- def client(self):
- if not hasattr(self._tlocal, "client"):
- self._tlocal.client = boto3.client(
- "s3",
- region_name=self.region,
- aws_access_key_id=self.aws_access_key_id,
- aws_secret_access_key=self.aws_secret_access_key,
- aws_session_token=self.aws_session_token,
- endpoint_url=self.endpoint_url,
- )
- return self._tlocal.client
- def _info_from_object(self, obj, namespaces):
- """Make an info dict from an s3 Object."""
- key = obj.key
- path = self._key_to_path(key)
- name = basename(path.rstrip("/"))
- is_dir = key.endswith(self.delimiter)
- info = {"basic": {"name": name, "is_dir": is_dir}}
- if "details" in namespaces:
- _type = int(ResourceType.directory if is_dir else ResourceType.file)
- info["details"] = {
- "accessed": None,
- "modified": datetime_to_epoch(obj.last_modified),
- "size": obj.content_length,
- "type": _type,
- }
- if "s3" in namespaces:
- s3info = info["s3"] = {}
- for name in self._object_attributes:
- value = getattr(obj, name, None)
- if isinstance(value, datetime):
- value = datetime_to_epoch(value)
- s3info[name] = value
- if "urls" in namespaces:
- url = self.client.generate_presigned_url(
- ClientMethod="get_object",
- Params={"Bucket": self._bucket_name, "Key": key},
- )
- info["urls"] = {"download": url}
- return info
- def isdir(self, path):
- _path = self.validatepath(path)
- try:
- return self._getinfo(_path).is_dir
- except errors.ResourceNotFound:
- return False
- def getinfo(self, path, namespaces=None):
- self.check()
- namespaces = namespaces or ()
- _path = self.validatepath(path)
- _key = self._path_to_key(_path)
- try:
- dir_path = dirname(_path)
- if dir_path != "/":
- _dir_key = self._path_to_dir_key(dir_path)
- with s3errors(path):
- obj = self.s3.Object(self._bucket_name, _dir_key)
- obj.load()
- except errors.ResourceNotFound:
- raise errors.ResourceNotFound(path)
- if _path == "/":
- return Info(
- {
- "basic": {"name": "", "is_dir": True},
- "details": {"type": int(ResourceType.directory)},
- }
- )
- obj = self._get_object(path, _key)
- info = self._info_from_object(obj, namespaces)
- return Info(info)
- def _getinfo(self, path, namespaces=None):
- """Gets info without checking for parent dir."""
- namespaces = namespaces or ()
- _path = self.validatepath(path)
- _key = self._path_to_key(_path)
- if _path == "/":
- return Info(
- {
- "basic": {"name": "", "is_dir": True},
- "details": {"type": int(ResourceType.directory)},
- }
- )
- obj = self._get_object(path, _key)
- info = self._info_from_object(obj, namespaces)
- return Info(info)
- def listdir(self, path):
- _path = self.validatepath(path)
- _s3_key = self._path_to_dir_key(_path)
- prefix_len = len(_s3_key)
- paginator = self.client.get_paginator("list_objects")
- with s3errors(path):
- _paginate = paginator.paginate(
- Bucket=self._bucket_name, Prefix=_s3_key, Delimiter=self.delimiter
- )
- _directory = []
- for result in _paginate:
- common_prefixes = result.get("CommonPrefixes", ())
- for prefix in common_prefixes:
- _prefix = prefix.get("Prefix")
- _name = _prefix[prefix_len:]
- if _name:
- _directory.append(_name.rstrip(self.delimiter))
- for obj in result.get("Contents", ()):
- name = obj["Key"][prefix_len:]
- if name:
- _directory.append(name)
- if not _directory:
- if not self.getinfo(_path).is_dir:
- raise errors.DirectoryExpected(path)
- return _directory
- def makedir(self, path, permissions=None, recreate=False):
- self.check()
- _path = self.validatepath(path)
- _key = self._path_to_dir_key(_path)
- if not self.isdir(dirname(_path)):
- raise errors.ResourceNotFound(path)
- try:
- self._getinfo(path)
- except errors.ResourceNotFound:
- pass
- else:
- if recreate:
- return self.opendir(_path)
- else:
- raise errors.DirectoryExists(path)
- with s3errors(path):
- _obj = self.s3.Object(self._bucket_name, _key)
- _obj.put(**self._get_upload_args(_key))
- return SubFS(self, path)
- def openbin(self, path, mode="r", buffering=-1, **options):
- _mode = Mode(mode)
- _mode.validate_bin()
- self.check()
- _path = self.validatepath(path)
- _key = self._path_to_key(_path)
- if _mode.create:
- def on_close_create(s3file):
- """Called when the S3 file closes, to upload data."""
- try:
- s3file.raw.seek(0)
- with s3errors(path):
- self.client.upload_fileobj(
- s3file.raw,
- self._bucket_name,
- _key,
- ExtraArgs=self._get_upload_args(_key),
- )
- finally:
- s3file.raw.close()
- try:
- dir_path = dirname(_path)
- if dir_path != "/":
- _dir_key = self._path_to_dir_key(dir_path)
- self._get_object(dir_path, _dir_key)
- except errors.ResourceNotFound:
- raise errors.ResourceNotFound(path)
- try:
- info = self._getinfo(path)
- except errors.ResourceNotFound:
- pass
- else:
- if _mode.exclusive:
- raise errors.FileExists(path)
- if info.is_dir:
- raise errors.FileExpected(path)
- s3file = S3File.factory(path, _mode, on_close=on_close_create)
- if _mode.appending:
- try:
- with s3errors(path):
- self.client.download_fileobj(
- self._bucket_name,
- _key,
- s3file.raw,
- ExtraArgs=self.download_args,
- )
- except errors.ResourceNotFound:
- pass
- else:
- s3file.seek(0, os.SEEK_END)
- return s3file
- if self.strict:
- info = self.getinfo(path)
- if info.is_dir:
- raise errors.FileExpected(path)
- def on_close(s3file):
- """Called when the S3 file closes, to upload the data."""
- try:
- if _mode.writing:
- s3file.raw.seek(0, os.SEEK_SET)
- with s3errors(path):
- self.client.upload_fileobj(
- s3file.raw,
- self._bucket_name,
- _key,
- ExtraArgs=self._get_upload_args(_key),
- )
- finally:
- s3file.raw.close()
- s3file = S3File.factory(path, _mode, on_close=on_close)
- with s3errors(path):
- self.client.download_fileobj(
- self._bucket_name, _key, s3file.raw, ExtraArgs=self.download_args
- )
- s3file.seek(0, os.SEEK_SET)
- return s3file
- def remove(self, path):
- self.check()
- _path = self.validatepath(path)
- _key = self._path_to_key(_path)
- if self.strict:
- info = self.getinfo(path)
- if info.is_dir:
- raise errors.FileExpected(path)
- self.client.delete_object(Bucket=self._bucket_name, Key=_key)
- def isempty(self, path):
- self.check()
- _path = self.validatepath(path)
- _key = self._path_to_dir_key(_path)
- response = self.client.list_objects(
- Bucket=self._bucket_name, Prefix=_key, MaxKeys=2
- )
- contents = response.get("Contents", ())
- for obj in contents:
- if obj["Key"] != _key:
- return False
- return True
- def removedir(self, path):
- self.check()
- _path = self.validatepath(path)
- if _path == "/":
- raise errors.RemoveRootError()
- info = self.getinfo(_path)
- if not info.is_dir:
- raise errors.DirectoryExpected(path)
- if not self.isempty(path):
- raise errors.DirectoryNotEmpty(path)
- _key = self._path_to_dir_key(_path)
- self.client.delete_object(Bucket=self._bucket_name, Key=_key)
- def setinfo(self, path, info):
- self.getinfo(path)
- def readbytes(self, path):
- self.check()
- if self.strict:
- info = self.getinfo(path)
- if not info.is_file:
- raise errors.FileExpected(path)
- _path = self.validatepath(path)
- _key = self._path_to_key(_path)
- bytes_file = io.BytesIO()
- with s3errors(path):
- self.client.download_fileobj(
- self._bucket_name, _key, bytes_file, ExtraArgs=self.download_args
- )
- return bytes_file.getvalue()
- def download(self, path, file, chunk_size=None, **options):
- self.check()
- if self.strict:
- info = self.getinfo(path)
- if not info.is_file:
- raise errors.FileExpected(path)
- _path = self.validatepath(path)
- _key = self._path_to_key(_path)
- with s3errors(path):
- self.client.download_fileobj(
- self._bucket_name, _key, file, ExtraArgs=self.download_args
- )
- def exists(self, path):
- self.check()
- _path = self.validatepath(path)
- if _path == "/":
- return True
- _key = self._path_to_dir_key(_path)
- try:
- self._get_object(path, _key)
- except errors.ResourceNotFound:
- return False
- else:
- return True
- def scandir(self, path, namespaces=None, page=None):
- _path = self.validatepath(path)
- namespaces = namespaces or ()
- _s3_key = self._path_to_dir_key(_path)
- prefix_len = len(_s3_key)
- info = self.getinfo(path)
- if not info.is_dir:
- raise errors.DirectoryExpected(path)
- paginator = self.client.get_paginator("list_objects")
- _paginate = paginator.paginate(
- Bucket=self._bucket_name, Prefix=_s3_key, Delimiter=self.delimiter
- )
- def gen_info():
- for result in _paginate:
- common_prefixes = result.get("CommonPrefixes", ())
- for prefix in common_prefixes:
- _prefix = prefix.get("Prefix")
- _name = _prefix[prefix_len:]
- if _name:
- info = {
- "basic": {
- "name": _name.rstrip(self.delimiter),
- "is_dir": True,
- }
- }
- yield Info(info)
- for _obj in result.get("Contents", ()):
- name = _obj["Key"][prefix_len:]
- if name:
- with s3errors(path):
- obj = self.s3.Object(self._bucket_name, _obj["Key"])
- info = self._info_from_object(obj, namespaces)
- yield Info(info)
- iter_info = iter(gen_info())
- if page is not None:
- start, end = page
- iter_info = itertools.islice(iter_info, start, end)
- for info in iter_info:
- yield info
- def writebytes(self, path, contents):
- if not isinstance(contents, bytes):
- raise TypeError("contents must be bytes")
- _path = self.validatepath(path)
- _key = self._path_to_key(_path)
- if self.strict:
- if not self.isdir(dirname(path)):
- raise errors.ResourceNotFound(path)
- try:
- info = self._getinfo(path)
- if info.is_dir:
- raise errors.FileExpected(path)
- except errors.ResourceNotFound:
- pass
- bytes_file = io.BytesIO(contents)
- with s3errors(path):
- self.client.upload_fileobj(
- bytes_file,
- self._bucket_name,
- _key,
- ExtraArgs=self._get_upload_args(_key),
- )
- def upload(self, path, file, chunk_size=None, **options):
- _path = self.validatepath(path)
- _key = self._path_to_key(_path)
- if self.strict:
- if not self.isdir(dirname(path)):
- raise errors.ResourceNotFound(path)
- try:
- info = self._getinfo(path)
- if info.is_dir:
- raise errors.FileExpected(path)
- except errors.ResourceNotFound:
- pass
- with s3errors(path):
- self.client.upload_fileobj(
- file, self._bucket_name, _key, ExtraArgs=self._get_upload_args(_key)
- )
- def copy(self, src_path, dst_path, overwrite=False):
- if not overwrite and self.exists(dst_path):
- raise errors.DestinationExists(dst_path)
- _src_path = self.validatepath(src_path)
- _dst_path = self.validatepath(dst_path)
- if self.strict:
- if not self.isdir(dirname(_dst_path)):
- raise errors.ResourceNotFound(dst_path)
- _src_key = self._path_to_key(_src_path)
- _dst_key = self._path_to_key(_dst_path)
- try:
- with s3errors(src_path):
- self.client.copy_object(
- Bucket=self._bucket_name,
- Key=_dst_key,
- CopySource={"Bucket": self._bucket_name, "Key": _src_key},
- )
- except errors.ResourceNotFound:
- if self.exists(src_path):
- raise errors.FileExpected(src_path)
- raise
- def move(self, src_path, dst_path, overwrite=False):
- self.copy(src_path, dst_path, overwrite=overwrite)
- self.remove(src_path)
- def geturl(self, path, purpose="download"):
- _path = self.validatepath(path)
- _key = self._path_to_key(_path)
- if _path == "/":
- raise errors.NoURL(path, purpose)
- if purpose == "download":
- url = self.client.generate_presigned_url(
- ClientMethod="get_object",
- Params={"Bucket": self._bucket_name, "Key": _key},
- )
- return url
- else:
- raise errors.NoURL(path, purpose)
|