s3aioFileObject.py 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719
  1. #!python
  2. #cython: language_level=3
  3. __copyright__ = "(C) 2019-2021 Science and Technology Facilities Council"
  4. __license__ = "BSD - see LICENSE file in top-level directory"
  5. __authors__ = "Neil Massey"
  6. import io
  7. from fnmatch import fnmatch
  8. from urllib.parse import urlparse
  9. import asyncio
  10. import aiobotocore
  11. from botocore.exceptions import ClientError
  12. import botocore.config
  13. from S3netCDF4.Managers._ConnectionPool import ConnectionPool, ConnectionObject
  14. from S3netCDF4.Managers._ConfigManager import Config
  15. from S3netCDF4._Exceptions import APIException, IOException
  16. class s3aioFileObject(object):
  17. """Custom file object class, inheriting from Python io.Base, to read from
  18. an S3 object store / AWS cloud storage."""
  19. """Static connection pool object - i.e. shared across the file objects."""
  20. _connection_pool = ConnectionPool()
  21. # The defaults for MAXIMUM_PART_SIZE etc. are now assigned in
  22. # __init__ if no values are found in ~/.s3nc.json
  23. """Static config object for the backend options"""
  24. _config = Config()
  25. def _get_server_bucket_object(uri):
  26. """Get the server name from the URI"""
  27. # First split the uri into the network location and path, and build the
  28. # server
  29. url_p = urlparse(uri)
  30. # check that the uri contains a scheme and a netloc
  31. if url_p.scheme == '' or url_p.netloc == '':
  32. raise APIException(
  33. "URI supplied to s3aioFileObject is not well-formed: {}". format(uri)
  34. )
  35. server = url_p.scheme + "://" + url_p.netloc
  36. split_path = url_p.path.split("/")
  37. # get the bucket
  38. try:
  39. bucket = split_path[1]
  40. except IndexError as e:
  41. raise APIException(
  42. "URI supplied has no bucket contained within it: {}".format(uri)
  43. )
  44. # get the path
  45. try:
  46. path = "/".join(split_path[2:])
  47. except IndexError as e:
  48. raise APIException(
  49. "URI supplied has no path contained within it: {}".format(uri)
  50. )
  51. return server, bucket, path
  52. def __init__(self, uri, credentials, mode='r', create_bucket=True,
  53. part_size=None, max_parts=None, multipart_upload=None,
  54. multipart_download=None, connect_timeout=None,
  55. read_timeout=None):
  56. """Initialise the file object by creating or reusing a connection in the
  57. connection pool."""
  58. # get the server, bucket and the key from the endpoint url
  59. self._server, self._bucket, self._path = s3aioFileObject._get_server_bucket_object(uri)
  60. self._closed = False # set the file to be not closed
  61. self._mode = mode
  62. self._seek_pos = 0
  63. self._buffer = [io.BytesIO()] # have a list of objects that can stream
  64. self._credentials = credentials
  65. self._create_bucket = create_bucket
  66. self._uri = uri
  67. """Either get the backend config from the parameters, or the config file
  68. or use defaults."""
  69. if "s3aioFileObject" in s3aioFileObject._config["backends"]:
  70. backend_config = s3aioFileObject._config["backends"]["s3aioFileObject"]
  71. else:
  72. backend_config = {}
  73. if part_size:
  74. self._part_size = int(part_size)
  75. elif "maximum_part_size" in backend_config:
  76. self._part_size = int(backend_config["maximum_part_size"])
  77. else:
  78. self._part_size = int(50 * 1024 * 1024)
  79. if max_parts:
  80. self._max_parts = int(max_parts)
  81. elif "maximum_parts" in backend_config:
  82. self._max_parts = int(backend_config["maximum_parts"])
  83. else:
  84. self._max_parts = 8
  85. if multipart_upload:
  86. self._multipart_upload = multipart_upload
  87. elif "multipart_upload" in backend_config:
  88. self._multipart_upload = backend_config["multipart_upload"]
  89. else:
  90. self._multipart_upload = True
  91. if multipart_download:
  92. self._multipart_download = multipart_download
  93. elif "multipart_download" in backend_config:
  94. self._multipart_download = backend_config["multipart_download"]
  95. else:
  96. self._multipart_download = True
  97. if connect_timeout:
  98. self._connect_timeout = connect_timeout
  99. elif "connect_timeout" in backend_config:
  100. self._connect_timeout = backend_config["connect_timeout"]
  101. else:
  102. self._connect_timeout = 30.0
  103. if read_timeout:
  104. self._read_timeout = read_timeout
  105. elif "read_timeout" in backend_config:
  106. self._read_timeout = backend_config["read_timeout"]
  107. else:
  108. self._read_timeout = 30.0
  109. async def __aenter__(self):
  110. """Async version of the enter context method."""
  111. await self.connect()
  112. return self
  113. async def __aexit__(self, exc_type, exc_value, exc_tb):
  114. """Close the file on the exit of a with statement, or by the garbage
  115. collector removing the object."""
  116. await self.close()
  117. # check for any exceptions
  118. if exc_type is not None:
  119. return False
  120. return True
  121. async def _getsize(self):
  122. # Use content length in the head object to determine how the size of
  123. # the file / object
  124. # If we are writing then the size should be the buffer size
  125. try:
  126. if 'w' in self._mode:
  127. size = self._part_size
  128. else:
  129. response = await self._conn_obj.conn.head_object(
  130. Bucket=self._bucket,
  131. Key=self._path
  132. )
  133. size = response['ContentLength']
  134. except ClientError as e:
  135. raise IOException(
  136. "Could not get size of object {}".format(self._path)
  137. )
  138. except AttributeError as e:
  139. self._handle_connection_exception(e)
  140. return size
  141. async def _get_bucket_list(self):
  142. # get the names of the buckets in a list
  143. try:
  144. bl = await self._conn_obj.conn.list_buckets()
  145. bucket_list = [b['Name'] for b in bl['Buckets']]
  146. except AttributeError as e:
  147. self._handle_connection_exception(e)
  148. return bucket_list
  149. def _handle_connection_exception(self, e):
  150. # Check if connection made
  151. if ("_conn_obj" in e.args[0] or "_current_part" in e.args[0]):
  152. raise APIException(
  153. "Connection to S3 server is not established. Use either the "
  154. ".connect method or a with statement."
  155. )
  156. else:
  157. # other AttributeError - handle that separately
  158. raise e
  159. async def connect(self):
  160. """Connect to the s3 server with the details passed in via the __init__
  161. method."""
  162. # if the connection returns None then either there isn't a connection to
  163. # the server in the pool, or there is no connection that is available
  164. self._conn_obj = s3aioFileObject._connection_pool.get(self._server)
  165. if self._conn_obj is None:
  166. try:
  167. session = aiobotocore.get_session()
  168. config = botocore.config.Config(
  169. connect_timeout=self._connect_timeout,
  170. read_timeout=self._read_timeout
  171. )
  172. s3c = session.create_client(
  173. "s3",
  174. endpoint_url=self._server,
  175. aws_access_key_id=self._credentials["accessKey"],
  176. aws_secret_access_key=self._credentials["secretKey"],
  177. config=config
  178. )
  179. # call await s3c.__aenter__ : this is needed for newer versions
  180. # of aiobotocore
  181. s3c = await s3c.__aenter__()
  182. # add the connection to the connection pool
  183. self._conn_obj = s3aioFileObject._connection_pool.add(
  184. s3c, self._server
  185. )
  186. except ClientError as e:
  187. raise IOException(
  188. "Could not connect to S3 endpoint {} {}".format(
  189. self._server, e)
  190. )
  191. if ('r' in self._mode and '*' not in self._path and
  192. '?' not in self._path):
  193. # if this is a read method then check the file exists
  194. response = await self._conn_obj.conn.list_objects_v2(
  195. Bucket=self._bucket,
  196. Prefix=self._path
  197. )
  198. exists = False
  199. for obj in response.get('Contents', []):
  200. if obj['Key'] == self._path:
  201. exists = True
  202. if not exists:
  203. raise IOException(
  204. "Object does not exist: {}/{}/{}".format(
  205. self._server, self._bucket, self._path
  206. )
  207. )
  208. if 'w' in self._mode:
  209. # if this is a write method then create a bytes array
  210. self._current_part = 1
  211. if 'a' in self._mode or '+' in self._mode:
  212. raise APIException(
  213. "Appending to files is not supported {}".format(self._path)
  214. )
  215. return True
  216. def detach(self):
  217. """Separate the underlying raw stream from the buffer and return it.
  218. Not supported in S3."""
  219. raise io.UnsupportedOperation
  220. async def _read_partial_file(self, part_num, part_size):
  221. s = int(part_num*part_size)
  222. e = int((part_num+1)*part_size)-1
  223. range_fmt = 'bytes={}-{}'.format(s,e)
  224. s3_object = await self._conn_obj.conn.get_object(
  225. Bucket = self._bucket,
  226. Key = self._path,
  227. Range = range_fmt
  228. )
  229. body = s3_object['Body']
  230. return await body.read()
  231. async def read(self, size=-1):
  232. """Read and return up to size bytes. For the S3 implementation the size
  233. can be used for RangeGet. If size==-1 then the whole object is streamed
  234. into memory."""
  235. # read the object using the bucket and path already determined in
  236. # __init__, and using the connection object
  237. try:
  238. # get the file size first
  239. file_size = await self._getsize()
  240. if size== -1:
  241. range_start = 0
  242. range_end = file_size
  243. range_size = file_size
  244. else:
  245. range_start = self._seek_pos
  246. range_end = self._seek_pos+size-1
  247. if range_end > file_size:
  248. range_end = file_size-1
  249. range_size = range_end-range_start+1
  250. # if multipart download is not supported
  251. if not self._multipart_download:
  252. # get the full file
  253. s3_object = await self._conn_obj.conn.get_object(
  254. Bucket = self._bucket,
  255. Key = self._path,
  256. )
  257. body = s3_object['Body']
  258. data = await body.read()
  259. # if the file is smaller than the MAXIMUM_PART_SIZE
  260. elif (range_size < self._part_size):
  261. # the requested range is the full file, it is fastest to
  262. # not specify the range
  263. if (range_start == 0 and range_size == file_size):
  264. # get the full file
  265. s3_object = await self._conn_obj.conn.get_object(
  266. Bucket = self._bucket,
  267. Key = self._path,
  268. )
  269. # a portion of the file is requested
  270. else:
  271. s3_object = await self._conn_obj.conn.get_object(
  272. Bucket = self._bucket,
  273. Key = self._path,
  274. Range = 'bytes={}-{}'.format(
  275. range_start, range_end
  276. )
  277. )
  278. body = s3_object['Body']
  279. data = await body.read()
  280. # multipart download version
  281. else:
  282. """Use range get to split up a file into the MAXIMUM_PART_SIZE
  283. and download each part asynchronously."""
  284. # calculate the number of necessary parts
  285. n_parts = int(range_size / self._part_size + 1)
  286. # don't go above the maximum number downloadable
  287. if n_parts > self._max_parts:
  288. n_parts = self._max_parts
  289. # (re)calculate the download size
  290. part_size = float(range_size) / n_parts
  291. # create the tasks and assign the return data buffer
  292. tasks = []
  293. data_buf = io.BytesIO()
  294. for p in range(0, n_parts):
  295. event_loop = asyncio.get_event_loop()
  296. task = event_loop.create_task(self._read_partial_file(
  297. p, part_size
  298. ))
  299. tasks.append(task)
  300. # wait for all the tasks to finish
  301. results = await asyncio.gather(*tasks)
  302. # read each chunk of data and write into the global buffer
  303. for r in results:
  304. data_buf.write(r)
  305. r = None # indicate ready for garbage collection
  306. data_buf.seek(0)
  307. data = data_buf.read()
  308. except ClientError as e:
  309. raise IOException(
  310. "Could not read from object {} {}".format(self._path, e)
  311. )
  312. except AttributeError as e:
  313. self._handle_connection_exception(e)
  314. return data
  315. async def read1(self, size=-1):
  316. """Just call read."""
  317. return await self.read(size=size)
  318. async def readinto(self, b):
  319. """Read bytes into a pre-allocated, writable bytes-like object b and
  320. return the number of bytes read.
  321. In S3 the entire file is read into the bytesbuffer. It is important
  322. that the bytesbuffer is big enough to hold the entire file."""
  323. # get the size of the file
  324. size = await self._getsize()
  325. b[:size] = await self.read(size)
  326. return size
  327. async def readinto1(self, b):
  328. """Just call readinto"""
  329. return await self.readinto(b)
  330. async def _multipart_upload_from_buffer(self):
  331. """Do a multipart upload from the buffer.
  332. There are three cases:
  333. 1. The size is exactly the same size as the MAXIMUM_PART_SIZE
  334. 2. The size is greater than the MAXIMUM_PART_SIZE
  335. 3. The size is multiple times greater than the MAX_UPLOAD_SIZE and
  336. requires splitting into smaller chunks
  337. """
  338. # check to see if bucket needs to be created
  339. if self._create_bucket:
  340. # check whether the bucket exists
  341. bucket_list = await self._get_bucket_list()
  342. if not self._bucket in bucket_list:
  343. await self._conn_obj.conn.create_bucket(Bucket=self._bucket)
  344. # if the current part is 1 we have to create the multipart upload
  345. if self._current_part == 1:
  346. response = await self._conn_obj.conn.create_multipart_upload(
  347. Bucket = self._bucket,
  348. Key = self._path
  349. )
  350. self._upload_id = response['UploadId']
  351. # we need to keep a track of the multipart info
  352. self._multipart_info = {'Parts' : []}
  353. # upload from a buffer - do we need to split into more than one
  354. # multiparts?
  355. new_buffer = []
  356. for buffer_part in range(0, len(self._buffer)):
  357. # is the current part of the buffer larger than the maximum
  358. # upload size? split if it is
  359. data_buf = self._buffer[buffer_part]
  360. data_len = data_buf.tell()
  361. if data_len >= self._part_size:
  362. data_buf.seek(0)
  363. data_pos = 0
  364. # split the file up
  365. while data_pos < data_len:
  366. new_buffer.append(io.BytesIO())
  367. # copy the data - don't overstep the buffer
  368. if data_pos + self._part_size >= data_len:
  369. sub_data = data_buf.read(data_len-data_pos)
  370. else:
  371. sub_data = data_buf.read(
  372. self._part_size
  373. )
  374. new_buffer[-1].write(sub_data)
  375. # increment to next
  376. data_pos += self._part_size
  377. # free the old memory
  378. self._buffer[buffer_part].close()
  379. else:
  380. # copy the old buffer into a new one
  381. self._buffer[buffer_part].seek(0)
  382. new_buffer.append(io.BytesIO(self._buffer[buffer_part].read()))
  383. # close other buffers first
  384. for b in self._buffer:
  385. b.close()
  386. self._buffer = new_buffer
  387. tasks = []
  388. for buffer_part in range(0, len(self._buffer)):
  389. # seek in the BytesIO buffer to get to the beginning after the
  390. # writing
  391. self._buffer[buffer_part].seek(0)
  392. # upload here
  393. # schedule the uploads
  394. event_loop = asyncio.get_event_loop()
  395. task = event_loop.create_task(self._conn_obj.conn.upload_part(
  396. Bucket=self._bucket,
  397. Key=self._path,
  398. UploadId=self._upload_id,
  399. PartNumber=self._current_part + buffer_part,
  400. Body=self._buffer[buffer_part]
  401. ))
  402. tasks.append(task)
  403. # await the completion of the uploads
  404. res = await asyncio.gather(*tasks)
  405. for buffer_part in range(0, len(self._buffer)):
  406. # insert into the multipart info list of dictionaries
  407. part = res[buffer_part]
  408. self._multipart_info['Parts'].append(
  409. {
  410. 'PartNumber' : self._current_part + buffer_part,
  411. 'ETag' : part['ETag']
  412. }
  413. )
  414. # add the total number of uploads to the current part
  415. self._current_part += len(self._buffer)
  416. # reset all the byte buffers and their positions
  417. for buffer_part in range(0, len(self._buffer)):
  418. self._buffer[buffer_part].close()
  419. self._buffer = [io.BytesIO()]
  420. self._seek_pos = 0
  421. async def write(self, b):
  422. """Write the given bytes-like object, b, and return the number of bytes
  423. written (always equal to the length of b in bytes, since if the write
  424. fails an OSError will be raised).
  425. For the S3 file object we just write the file to a temporary bytearray
  426. and increment the seek_pos.
  427. This data will be uploaded to an object when .flush is called.
  428. """
  429. if "w" not in self._mode:
  430. raise APIException(
  431. "Trying to write to a read only file, where mode != 'w'."
  432. )
  433. try:
  434. # add to local, temporary bytearray
  435. size = len(b)
  436. self._buffer[-1].write(b)
  437. self._seek_pos += size
  438. # test to see whether we should do a multipart upload now
  439. # this occurs when the number of buffers is > the maximum number of
  440. # parts. self._current_part is indexed from 1
  441. if (self._multipart_upload and
  442. self._seek_pos > self._part_size):
  443. if len(self._buffer) == self._max_parts:
  444. await self._multipart_upload_from_buffer()
  445. else:
  446. # add another buffer to write to
  447. self._buffer.append(io.BytesIO())
  448. except ClientError as e:
  449. raise IOException(
  450. "Could not write to object {} {}".format(self._path, e)
  451. )
  452. except AttributeError as e:
  453. self._handle_connection_exception(e)
  454. return size
  455. async def close(self):
  456. """Flush and close this stream. This method has no effect if the file is
  457. already closed. Once the file is closed, any operation on the file (e.g.
  458. reading or writing) will raise a ValueError.
  459. As a convenience, it is allowed to call this method more than once; only
  460. the first call, however, will have an effect."""
  461. try:
  462. if not self._closed:
  463. # self.flush will upload the bytesarray to the S3 store
  464. await self.flush()
  465. s3aioFileObject._connection_pool.release(self._conn_obj)
  466. self._closed = True
  467. except AttributeError as e:
  468. self._handle_connection_exception(e)
  469. return True
  470. async def seek(self, offset, whence=io.SEEK_SET):
  471. """Change the stream position to the given byte offset. offset is
  472. interpreted relative to the position indicated by whence. The default
  473. value for whence is SEEK_SET. Values for whence are:
  474. SEEK_SET or 0 – start of the stream (the default); offset should be zero
  475. or positive
  476. SEEK_CUR or 1 – current stream position; offset may be negative
  477. SEEK_END or 2 – end of the stream; offset is usually negative
  478. Return the new absolute position.
  479. Note: currently cannot seek when writing a file.
  480. """
  481. if self._mode == 'w':
  482. raise IOException(
  483. "Cannot seek within a file that is being written to."
  484. )
  485. size = await self._getsize()
  486. error_string = "Seek {} is outside file size bounds 0->{} for file {}"
  487. seek_pos = self._seek_pos
  488. if whence == io.SEEK_SET:
  489. # range check
  490. seek_pos = offset
  491. elif whence == io.SEEK_CUR:
  492. seek_pos += offset
  493. elif whence == io.SEEK_END:
  494. seek_pos = size - offset
  495. # range checks
  496. if (seek_pos >= size):
  497. raise IOException(error_string.format(
  498. seek_pos,
  499. size,
  500. self._path)
  501. )
  502. elif (seek_pos < 0):
  503. raise IOException(error_string.format(
  504. seek_pos,
  505. size,
  506. self._path)
  507. )
  508. self._seek_pos = seek_pos
  509. return self._seek_pos
  510. def seekable(self):
  511. """We can seek in s3 streams using the range get and range put features.
  512. """
  513. return True
  514. def tell(self):
  515. """Return True if the stream supports random access. If False, seek(),
  516. tell() and truncate() will raise OSError."""
  517. return self._seek_pos
  518. def fileno(self):
  519. """Return the underlying file descriptor (an integer) of the stream if
  520. it exists. An IOError is raised if the IO object does not use a file
  521. descriptor."""
  522. raise io.UnsupportedOperation
  523. async def flush(self):
  524. """Flush the write buffers of the stream. This will upload the contents
  525. of the final multipart upload of self._buffer to the S3 store."""
  526. try:
  527. if 'w' in self._mode:
  528. # if the size is less than the MAXIMUM UPLOAD SIZE
  529. # then just write the data
  530. size = self._buffer[0].tell()
  531. if ((self._current_part == 1 and
  532. size < self._part_size) or
  533. not self._multipart_upload
  534. ):
  535. if self._create_bucket:
  536. # check whether the bucket exists and create if not
  537. bucket_list = await self._get_bucket_list()
  538. if not self._bucket in bucket_list:
  539. await self._conn_obj.conn.create_bucket(
  540. Bucket=self._bucket
  541. )
  542. # upload the whole buffer - seek back to the start first
  543. self._buffer[0].seek(0)
  544. await self._conn_obj.conn.put_object(
  545. Bucket=self._bucket,
  546. Key=self._path,
  547. Body=self._buffer[0].read(size)
  548. )
  549. else:
  550. # upload as multipart
  551. await self._multipart_upload_from_buffer()
  552. # finalise the multipart upload
  553. await self._conn_obj.conn.complete_multipart_upload(
  554. Bucket=self._bucket,
  555. Key=self._path,
  556. UploadId=self._upload_id,
  557. MultipartUpload=self._multipart_info
  558. )
  559. # clear the buffers
  560. for b in self._buffer:
  561. b.close()
  562. except AttributeError as e:
  563. self._handle_connection_exception(e)
  564. return True
  565. def readable(self):
  566. """Return True if the stream can be read from. If False, read() will
  567. raise IOError."""
  568. return 'r' in self._mode or '+' in self._mode
  569. async def readline(self, size=-1):
  570. """Read and return one line from the stream.
  571. If size is specified, at most size bytes will be read."""
  572. if 'b' in self._mode:
  573. raise APIException(
  574. "readline on a binary file is not permitted: {}".format(
  575. self._uri)
  576. )
  577. # only read a set number of bytes if size is passed in, otherwise
  578. # read upto the file size
  579. if size == -1:
  580. size = self._getsize()
  581. # use the BytesIO readline methods
  582. if self.tell() == 0:
  583. buffer = await self.read(size=size)
  584. self._buffer[-1].write(buffer)
  585. self._buffer[-1].seek(0)
  586. line = self._buffer[-1].readline().decode().strip()
  587. return line
  588. async def readlines(self, hint=-1):
  589. """Read and return a list of lines from the stream. hint can be
  590. specified to control the number of lines read: no more lines will be
  591. read if the total size (in bytes/characters) of all lines so far exceeds
  592. hint."""
  593. if 'b' in self._mode:
  594. raise APIException(
  595. "readline on a binary file is not permitted: {}".format(
  596. self._uri)
  597. )
  598. # read the entire file in and decode it
  599. lines = await self.read().decode().split("\n")
  600. return lines
  601. def truncate(self, size=None):
  602. """Not supported"""
  603. raise io.UnsupportedOperation
  604. def writable(self):
  605. """Return True if the stream supports writing. If False, write() and
  606. truncate() will raise IOError."""
  607. return 'w' in self._mode
  608. async def writelines(self, lines):
  609. """Write a list of lines to the stream."""
  610. # first check if the file is binary or not
  611. if 'b' in self._mode:
  612. raise APIException(
  613. "writelines on a binary file is not permitted: {}".format(
  614. self._uri)
  615. )
  616. # write all but the last line with a line break
  617. for l in lines:
  618. await self.write((l+"\n").encode('utf-8'))
  619. return True
  620. async def glob(self):
  621. """Emulate glob on an open bucket. The glob has been passed in via
  622. self._path, created on connection to the server and bucket."""
  623. # get the path string up to the wildcards
  624. try:
  625. pi1 = self._path.index("*")
  626. except ValueError:
  627. pi1 = len(self._path)
  628. try:
  629. pi2 = self._path.index("?")
  630. except ValueError:
  631. pi2 = len(self._path)
  632. pi = min(pi1, pi2)
  633. # using the prefix will cut down on the search space
  634. prefix = self._path[:pi]
  635. # get the wildcard
  636. wildcard = self._path[pi:]
  637. # set up the paginator
  638. paginator = self._conn_obj.conn.get_paginator("list_objects_v2")
  639. parameters = {
  640. 'Bucket': self._bucket,
  641. 'Prefix': prefix
  642. }
  643. page_iterator = paginator.paginate(**parameters)
  644. files = []
  645. async for page in page_iterator:
  646. for item in page.get('Contents', []):
  647. fname = item['Key']
  648. # check that it matches against wildcard
  649. if fnmatch(fname, wildcard):
  650. files.append(item['Key'])
  651. return files