s3FileObject.py 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632
  1. #!python
  2. #cython: language_level=3
  3. __copyright__ = "(C) 2019-2021 Science and Technology Facilities Council"
  4. __license__ = "BSD - see LICENSE file in top-level directory"
  5. __authors__ = "Neil Massey"
  6. import io
  7. from fnmatch import fnmatch
  8. from urllib.parse import urlparse
  9. from botocore.exceptions import ClientError
  10. import botocore.session
  11. from S3netCDF4.Managers._ConnectionPool import ConnectionPool
  12. from S3netCDF4.Managers._ConfigManager import Config
  13. from S3netCDF4._Exceptions import APIException, IOException
  14. class s3FileObject(io.BufferedIOBase):
  15. """Custom file object class, inheriting from Python io.Base, to read from
  16. an S3 object store / AWS cloud storage."""
  17. """Static connection pool object - i.e. shared across the file objects."""
  18. _connection_pool = ConnectionPool()
  19. # The defaults for MAXIMUM_PART_SIZE etc. are now assigned in
  20. # __init__ if no values are found in ~/.s3nc.json
  21. """Static config object for the backend options"""
  22. _config = Config()
  23. def _get_server_bucket_object(uri):
  24. """Get the server name from the URI"""
  25. # First split the uri into the network location and path, and build the
  26. # server
  27. url_p = urlparse(uri)
  28. # check that the uri contains a scheme and a netloc
  29. if url_p.scheme == '' or url_p.netloc == '':
  30. raise APIException(
  31. "URI supplied to s3FileObject is not well-formed: {}".format(uri)
  32. )
  33. server = url_p.scheme + "://" + url_p.netloc
  34. split_path = url_p.path.split("/")
  35. # get the bucket
  36. try:
  37. bucket = split_path[1]
  38. except IndexError as e:
  39. raise APIException(
  40. "URI supplied has no bucket contained within it: {}".format(uri)
  41. )
  42. # get the path
  43. try:
  44. path = "/".join(split_path[2:])
  45. except IndexError as e:
  46. raise APIException(
  47. "URI supplied has no path contained within it: {}".format(uri)
  48. )
  49. return server, bucket, path
  50. def __init__(self, uri, credentials, mode='r', create_bucket=True,
  51. part_size=None, max_parts=None, multipart_upload=None,
  52. multipart_download=None, connect_timeout=None,
  53. read_timeout=None):
  54. """Initialise the file object by creating or reusing a connection in the
  55. connection pool."""
  56. # get the server, bucket and the key from the endpoint url
  57. self._server, self._bucket, self._path = s3FileObject._get_server_bucket_object(uri)
  58. self._closed = False # set the file to be not closed
  59. self._mode = mode
  60. self._seek_pos = 0
  61. self._buffer = [io.BytesIO()] # have a list of objects that can stream
  62. self._credentials = credentials
  63. self._create_bucket = create_bucket
  64. self._uri = uri
  65. """Either get the backend config from the parameters, or the config file
  66. or use defaults."""
  67. if "s3FileObject" in s3FileObject._config["backends"]:
  68. backend_config = s3FileObject._config["backends"]["s3FileObject"]
  69. else:
  70. backend_config = {}
  71. if part_size:
  72. self._part_size = int(part_size)
  73. elif "maximum_part_size" in backend_config:
  74. self._part_size = int(backend_config["maximum_part_size"])
  75. else:
  76. self._part_size = int(50 * 1024 * 1024)
  77. if max_parts:
  78. self._max_parts = int(max_parts)
  79. elif "maximum_parts" in backend_config:
  80. self._max_parts = int(backend_config["maximum_parts"])
  81. else:
  82. self._max_parts = 8
  83. if multipart_upload:
  84. self._multipart_upload = multipart_upload
  85. elif "multipart_upload" in backend_config:
  86. self._multipart_upload = backend_config["multipart_upload"]
  87. else:
  88. self._multipart_upload = True
  89. if multipart_download:
  90. self._multipart_download = multipart_download
  91. elif "multipart_download" in backend_config:
  92. self._multipart_download = backend_config["multipart_download"]
  93. else:
  94. self._multipart_download = True
  95. if connect_timeout:
  96. self._connect_timeout = connect_timeout
  97. elif "connect_timeout" in backend_config:
  98. self._connect_timeout = backend_config["connect_timeout"]
  99. else:
  100. self._connect_timeout = 30.0
  101. if read_timeout:
  102. self._read_timeout = read_timeout
  103. elif "read_timeout" in backend_config:
  104. self._read_timeout = backend_config["read_timeout"]
  105. else:
  106. self._read_timeout = 30.0
  107. def __enter__(self):
  108. """Create the connection on an enter."""
  109. self.connect()
  110. return self
  111. def __exit__(self, exc_type, exc_value, exc_tb):
  112. """Close the file on the exit of a with statement, or by the garbage
  113. collector removing the object."""
  114. self.close()
  115. # check for any exceptions
  116. if exc_type is not None:
  117. return False
  118. return True
  119. def _getsize(self):
  120. # Use content length in the head object to determine the size of
  121. # the file / object
  122. # If we are writing then the size should be the buffer size
  123. try:
  124. if 'w' in self._mode:
  125. size = self._part_size
  126. else:
  127. response = self._conn_obj.conn.head_object(
  128. Bucket=self._bucket,
  129. Key=self._path
  130. )
  131. size = response['ContentLength']
  132. except ClientError as e:
  133. raise IOException(
  134. "Could not get size of object {}".format(self._path)
  135. )
  136. except AttributeError as e:
  137. self._handle_connection_exception(e)
  138. return size
  139. def _get_bucket_list(self):
  140. # get the names of the buckets in a list
  141. try:
  142. bl = self._conn_obj.conn.list_buckets()['Buckets'] # this returns a dict
  143. bucket_list = [b['Name'] for b in bl]
  144. except AttributeError as e:
  145. self._handle_connection_exception(e)
  146. return bucket_list
  147. def _handle_connection_exception(self, e):
  148. # Check if connection made
  149. if ("_conn_obj" in e.args[0] or "_current_part" in e.args[0]):
  150. raise APIException(
  151. "Connection to S3 server is not established. Use either the "
  152. ".connect method or a with statement."
  153. )
  154. else:
  155. # other AttributeError - handle that separately
  156. raise e
  157. def connect(self):
  158. """Connect to the s3 server with the details passed in via the __init__
  159. method."""
  160. # if the connection returns None then either there isn't a connection to
  161. # the server in the pool, or there is no connection that is available
  162. self._conn_obj = s3FileObject._connection_pool.get(self._server)
  163. if self._conn_obj is None:
  164. try:
  165. session = botocore.session.get_session()
  166. config = botocore.config.Config(
  167. connect_timeout=self._connect_timeout,
  168. read_timeout=self._connect_timeout
  169. )
  170. s3c = session.create_client(
  171. "s3",
  172. endpoint_url=self._server,
  173. aws_access_key_id=self._credentials["accessKey"],
  174. aws_secret_access_key=self._credentials["secretKey"],
  175. config=config
  176. )
  177. # add the connection to the connection pool
  178. self._conn_obj = s3FileObject._connection_pool.add(
  179. s3c, self._server
  180. )
  181. except ClientError as e:
  182. raise IOException(
  183. "Could not connect to S3 endpoint {} {}".format(
  184. self._server, e)
  185. )
  186. if ('r' in self._mode and '*' not in self._path and
  187. '?' not in self._path):
  188. # if this is a read method then check the file exists
  189. response = self._conn_obj.conn.list_objects_v2(
  190. Bucket=self._bucket,
  191. Prefix=self._path
  192. )
  193. exists = False
  194. for obj in response.get('Contents', []):
  195. if obj['Key'] == self._path:
  196. exists = True
  197. break
  198. if not exists:
  199. raise IOException(
  200. "Object does not exist: {}/{}/{}".format(
  201. self._server, self._bucket, self._path
  202. )
  203. )
  204. if 'w' in self._mode:
  205. # if this is a write method then create a bytes array
  206. self._current_part = 1
  207. if 'a' in self._mode or '+' in self._mode:
  208. raise APIException(
  209. "Appending to files is not supported {}".format(self._path)
  210. )
  211. return True
  212. def detach(self):
  213. """Separate the underlying raw stream from the buffer and return it.
  214. Not supported in S3."""
  215. raise io.UnsupportedOperation
  216. def read(self, size=-1):
  217. """Read and return up to size bytes. For the S3 implementation the size
  218. can be used for RangeGet. If size==-1 then the whole object is streamed
  219. into memory."""
  220. # read the object using the bucket and path already determined in
  221. # __init__, and using the connection object
  222. try:
  223. if size== -1:
  224. s3_object = self._conn_obj.conn.get_object(
  225. Bucket = self._bucket,
  226. Key = self._path
  227. )
  228. body = s3_object['Body']
  229. else:
  230. # do the partial / range get version, and increment the seek
  231. # pointer
  232. range_end = self._seek_pos+size-1
  233. file_size = self._getsize()
  234. if range_end >= file_size:
  235. range_end = file_size-1
  236. if not self._multipart_download:
  237. s3_object = self._conn_obj.conn.get_object(
  238. Bucket = self._bucket,
  239. Key = self._path,
  240. )
  241. body = s3_object['Body']
  242. else:
  243. s3_object = self._conn_obj.conn.get_object(
  244. Bucket = self._bucket,
  245. Key = self._path,
  246. Range = 'bytes={}-{}'.format(
  247. self._seek_pos, range_end
  248. )
  249. )
  250. self._seek_pos += size
  251. body = s3_object['Body']
  252. except ClientError as e:
  253. raise IOException(
  254. "Could not read from object {} {}".format(self._path, e)
  255. )
  256. except AttributeError as e:
  257. self._handle_connection_exception(e)
  258. return body.read()
  259. def read1(self, size=-1):
  260. """Just call read."""
  261. return self.read(size=size)
  262. def readinto(self, b):
  263. """Read bytes into a pre-allocated, writable bytes-like object b and
  264. return the number of bytes read.
  265. In S3 the entire file is read into the bytesbuffer. It is important
  266. that the bytesbuffer is big enough to hold the entire file."""
  267. # get the size of the file
  268. size = self._getsize()
  269. b[:size] = self.read(size)
  270. return size
  271. def readinto1(self, b):
  272. """Just call readinto"""
  273. return self.readinto(b)
  274. def _multipart_upload_from_buffer(self):
  275. """Do a multipart upload from the buffer.
  276. There are two cases:
  277. 1. The size is exactly the same size as the self._part_size
  278. 2. The size is greater than the self._part_size
  279. """
  280. # check to see if bucket needs to be created
  281. if self._create_bucket:
  282. # check whether the bucket exists
  283. bucket_list = self._get_bucket_list()
  284. if not self._bucket in bucket_list:
  285. self._conn_obj.conn.create_bucket(Bucket=self._bucket)
  286. # if the current part is 1 we have to create the multipart upload
  287. if self._current_part == 1:
  288. response = self._conn_obj.conn.create_multipart_upload(
  289. Bucket = self._bucket,
  290. Key = self._path
  291. )
  292. self._upload_id = response['UploadId']
  293. # we need to keep a track of the multipart info
  294. self._multipart_info = {'Parts' : []}
  295. # upload from a buffer - do we need to split into more than one
  296. # multiparts? Remember: self._buffer is a list of BytesIO objects
  297. new_buffer = []
  298. for buffer_part in range(0, len(self._buffer)):
  299. # is the current part of the buffer larger than the maximum
  300. # upload size? split if it is
  301. data_buf = self._buffer[buffer_part]
  302. data_len = data_buf.tell()
  303. if data_len >= self._part_size:
  304. data_buf.seek(0)
  305. data_pos = 0
  306. # split the file up
  307. while data_pos < data_len:
  308. new_buffer.append(io.BytesIO())
  309. # copy the data - don't overstep the buffer
  310. if data_pos + self._part_size >= data_len:
  311. sub_data = data_buf.read(data_len-data_pos)
  312. else:
  313. sub_data = data_buf.read(self._part_size)
  314. new_buffer[-1].write(sub_data)
  315. # increment to next
  316. data_pos += self._part_size
  317. # free the old memory
  318. self._buffer[buffer_part].close()
  319. else:
  320. self._buffer[buffer_part].seek(0)
  321. new_buffer.append(io.BytesIO(self._buffer[buffer_part].read()))
  322. self._buffer = new_buffer
  323. for buffer_part in range(0, len(self._buffer)):
  324. # seek in the BytesIO buffer to get to the beginning after the
  325. # writing§
  326. self._buffer[buffer_part].seek(0)
  327. # upload here
  328. part = self._conn_obj.conn.upload_part(
  329. Bucket=self._bucket,
  330. Key=self._path,
  331. UploadId=self._upload_id,
  332. PartNumber=self._current_part,
  333. Body=self._buffer[buffer_part]
  334. )
  335. # insert into the multipart info list of dictionaries
  336. self._multipart_info['Parts'].append(
  337. {
  338. 'PartNumber' : self._current_part,
  339. 'ETag' : part['ETag']
  340. }
  341. )
  342. self._current_part += 1
  343. # reset all the byte buffers and their positions
  344. for buffer_part in range(0, len(self._buffer)):
  345. self._buffer[buffer_part].close()
  346. self._buffer = [io.BytesIO()]
  347. self._seek_pos = 0
  348. self._current_part += 1
  349. def write(self, b):
  350. """Write the given bytes-like object, b, and return the number of bytes
  351. written (always equal to the length of b in bytes, since if the write
  352. fails an OSError will be raised).
  353. For the S3 file object we just write the file to a temporary bytearray
  354. and increment the seek_pos.
  355. This data will be uploaded to an object when .flush is called.
  356. """
  357. if "w" not in self._mode:
  358. raise APIException(
  359. "Trying to write to a read only file, where mode != 'w'."
  360. )
  361. try:
  362. # add to local, temporary bytearray
  363. size = len(b)
  364. self._buffer[-1].write(b)
  365. self._seek_pos += size
  366. # test to see whether we should do a multipart upload now
  367. # this occurs when the number of buffers is > the maximum number of
  368. # parts. self._current_part is indexed from 1
  369. if (self._multipart_upload and
  370. self._seek_pos > self._part_size):
  371. if len(self._buffer) == self._max_parts:
  372. self._multipart_upload_from_buffer()
  373. else:
  374. # add another buffer to write to
  375. self._buffer.append(io.BytesIO())
  376. except ClientError as e:
  377. raise IOException(
  378. "Could not write to object {} {}".format(self._path, e)
  379. )
  380. except AttributeError as e:
  381. self._handle_connection_exception(e)
  382. return size
  383. def close(self):
  384. """Flush and close this stream. This method has no effect if the file is
  385. already closed. Once the file is closed, any operation on the file (e.g.
  386. reading or writing) will raise a ValueError.
  387. As a convenience, it is allowed to call this method more than once; only
  388. the first call, however, will have an effect."""
  389. try:
  390. if not self._closed:
  391. # self.flush will upload the buffer to the S3 store
  392. self.flush()
  393. s3FileObject._connection_pool.release(self._conn_obj)
  394. self._closed = True
  395. except AttributeError as e:
  396. self._handle_connection_exception(e)
  397. return True
  398. def seek(self, offset, whence=io.SEEK_SET):
  399. """Change the stream position to the given byte offset. offset is
  400. interpreted relative to the position indicated by whence. The default
  401. value for whence is SEEK_SET. Values for whence are:
  402. SEEK_SET or 0 – start of the stream (the default); offset should be zero
  403. or positive
  404. SEEK_CUR or 1 – current stream position; offset may be negative
  405. SEEK_END or 2 – end of the stream; offset is usually negative
  406. Return the new absolute position.
  407. Note: currently cannot seek when writing a file.
  408. """
  409. if self._mode == 'w':
  410. raise IOException(
  411. "Cannot seek within a file that is being written to."
  412. )
  413. size = self._getsize()
  414. error_string = "Seek {} is outside file size bounds 0->{} for file {}"
  415. seek_pos = self._seek_pos
  416. if whence == io.SEEK_SET:
  417. # range check
  418. seek_pos = offset
  419. elif whence == io.SEEK_CUR:
  420. seek_pos += offset
  421. elif whence == io.SEEK_END:
  422. seek_pos = size - offset
  423. # range checks
  424. if (seek_pos >= size):
  425. raise IOException(error_string.format(
  426. seek_pos,
  427. size,
  428. self._path)
  429. )
  430. elif (seek_pos < 0):
  431. raise IOException(error_string.format(
  432. seek_pos,
  433. size,
  434. self._path)
  435. )
  436. self._seek_pos = seek_pos
  437. return self._seek_pos
  438. def seekable(self):
  439. """We can seek in s3 streams using the range get and range put features.
  440. """
  441. return True
  442. def tell(self):
  443. """Return True if the stream supports random access. If False, seek(),
  444. tell() and truncate() will raise OSError."""
  445. return self._seek_pos
  446. def fileno(self):
  447. """Return the underlying file descriptor (an integer) of the stream if
  448. it exists. An IOError is raised if the IO object does not use a file
  449. descriptor."""
  450. raise io.UnsupportedOperation
  451. def flush(self):
  452. """Flush the write buffers of the stream. This will upload the contents
  453. of the final multipart upload of self._buffer to the S3 store."""
  454. try:
  455. if 'w' in self._mode:
  456. # if the size is less than the MAXIMUM UPLOAD SIZE
  457. # then just write the data
  458. size = self._buffer[0].tell()
  459. if self._current_part == 1 and size < self._part_size:
  460. if self._create_bucket:
  461. # check whether the bucket exists and create if not
  462. bucket_list = self._get_bucket_list()
  463. if not self._bucket in bucket_list:
  464. self._conn_obj.conn.create_bucket(
  465. Bucket=self._bucket
  466. )
  467. # upload the whole buffer - seek back to the start first
  468. self._buffer[0].seek(0)
  469. self._conn_obj.conn.put_object(
  470. Bucket=self._bucket,
  471. Key=self._path,
  472. Body=self._buffer[0].read(size)
  473. )
  474. else:
  475. # upload as multipart
  476. self._multipart_upload_from_buffer()
  477. # finalise the multipart upload
  478. self._conn_obj.conn.complete_multipart_upload(
  479. Bucket=self._bucket,
  480. Key=self._path,
  481. UploadId=self._upload_id,
  482. MultipartUpload=self._multipart_info
  483. )
  484. except AttributeError as e:
  485. self._handle_connection_exception(e)
  486. return True
  487. def readable(self):
  488. """Return True if the stream can be read from. If False, read() will
  489. raise IOError."""
  490. return 'r' in self._mode or '+' in self._mode
  491. def readline(self, size=-1):
  492. """Read and return one line from the stream.
  493. If size is specified, at most size bytes will be read."""
  494. if 'b' in self._mode:
  495. raise APIException(
  496. "readline on a binary file is not permitted: {}".format(
  497. self._uri)
  498. )
  499. # only read a set number of bytes if size is passed in, otherwise
  500. # read upto the file size
  501. if size == -1:
  502. size = self._getsize()
  503. # use the BytesIO readline methods
  504. if self.tell() == 0:
  505. buffer = self.read(size=size)
  506. self._buffer[-1].write(buffer)
  507. self._buffer[-1].seek(0)
  508. line = self._buffer[-1].readline().decode().strip()
  509. return line
  510. def readlines(self, hint=-1):
  511. """Read and return a list of lines from the stream. hint can be
  512. specified to control the number of lines read: no more lines will be
  513. read if the total size (in bytes/characters) of all lines so far exceeds
  514. hint."""
  515. if 'b' in self._mode:
  516. raise APIException(
  517. "readline on a binary file is not permitted: {}".format(
  518. self._uri)
  519. )
  520. # read the entire file in and decode it
  521. lines = self.read().decode().split("\n")
  522. return lines
  523. def truncate(self, size=None):
  524. """Not supported"""
  525. raise io.UnsupportedOperation
  526. def writable(self):
  527. """Return True if the stream supports writing. If False, write() and
  528. truncate() will raise IOError."""
  529. return 'w' in self._mode
  530. def writelines(self, lines):
  531. """Write a list of lines to the stream."""
  532. # first check if the file is binary or not
  533. if 'b' in self._mode:
  534. raise APIException(
  535. "writelines on a binary file is not permitted: {}".format(
  536. self._uri)
  537. )
  538. # write all but the last line with a line break
  539. for l in lines:
  540. self.write((l+"\n").encode('utf-8'))
  541. return True
  542. def glob(self):
  543. """Emulate glob on an open bucket. The glob has been passed in via
  544. self._path, created on connection to the server and bucket."""
  545. # get the path string up to the wildcards
  546. try:
  547. pi1 = self._path.index("*")
  548. except ValueError:
  549. pi1 = len(self._path)
  550. try:
  551. pi2 = self._path.index("?")
  552. except ValueError:
  553. pi2 = len(self._path)
  554. pi = min(pi1, pi2)
  555. # using the prefix will cut down on the search space
  556. prefix = self._path[:pi]
  557. # get the wildcard
  558. wildcard = self._path[pi:]
  559. # set up the paginator
  560. paginator = self._conn_obj.conn.get_paginator("list_objects_v2")
  561. parameters = {
  562. 'Bucket': self._bucket,
  563. 'Prefix': prefix
  564. }
  565. page_iterator = paginator.paginate(**parameters)
  566. files = []
  567. for page in page_iterator:
  568. for item in page.get('Contents', []):
  569. fname = item['Key']
  570. # check that it matches against wildcard
  571. if fnmatch(fname, wildcard):
  572. files.append(item['Key'])
  573. return files