1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225 |
- # Copyright (c) 2010 OpenStack, LLC.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- # implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- """
- SwiftS3 middleware emulates AWS S3 REST api on top of Swift.
- The following opperations are currently supported:
- * GET Service
- * DELETE Bucket (Delete bucket; abort running MPUs)
- * GET Bucket (List Objects; List in-progress multipart uploads)
- * PUT Bucket
- * DELETE Object
- * GET Object
- * HEAD Object
- * PUT Object
- * PUT Object (Copy)
- To add this middleware to your configuration, add the swifts3 middleware
- before the auth middleware and before any other middleware that
- waits for swift requests (like rate limiting).
- To set up your client, the access key will be the concatenation of the
- account and user strings that should look like test:tester, and the
- secret access key is the account password. The host should also point
- to the swift storage hostname and it should use the old style
- calling format, not the hostname based container format.
- An example client using the python boto library might look like the
- following for a SAIO setup:
- connection = boto.s3.Connection(
- aws_access_key_id='test:tester',
- aws_secret_access_key='testing',
- port=8080,
- host='127.0.0.1',
- is_secure=False,
- calling_format=boto.s3.connection.OrdinaryCallingFormat())
- Note that all the operations with multipart upload buckets are denied
- to user, as well as multipart buckets are not listed in all buckets list.
- In case of GET/DELETE - NoSuchBucket error is returned;
- In case of PUT/POST - InvalidBucketName error is returned.
- """
- from urllib import unquote, quote
- import rfc822
- import hmac
- import base64
- import uuid
- import errno
- from xml.sax.saxutils import escape as xml_escape
- import urlparse
- from webob import Request as WebObRequest, Response
- from webob.exc import HTTPNotFound
- from webob.multidict import MultiDict
- import simplejson as json
- from swift.common.utils import split_path, get_logger
- #XXX: In webob-1.9b copied environment contained link to the original
- # instance of a TrackableMultiDict which reflects to original
- # request.
- class Request(WebObRequest):
- def _remove_query_vars(self):
- if 'webob._parsed_query_vars' in self.environ:
- del self.environ['webob._parsed_query_vars']
- def copy(self):
- req = super(Request, self).copy()
- req._remove_query_vars()
- return req
- def copy_get(self):
- req = super(Request, self).copy_get()
- req._remove_query_vars()
- return req
- MAX_BUCKET_LISTING = 1000
- MAX_UPLOADS_LISTING = 1000
- MULTIPART_UPLOAD_PREFIX = 'MPU.'
- # List of Query String Arguments of Interest
- qsa_of_interest = ['acl', 'defaultObjectAcl', 'location', 'logging',
- 'partNumber', 'policy', 'requestPayment', 'torrent',
- 'versioning', 'versionId', 'versions', 'website',
- 'uploads', 'uploadId', 'response-content-type',
- 'response-content-language', 'response-expires',
- 'response-cache-control', 'response-content-disposition',
- 'response-content-encoding', 'delete', 'lifecycle']
- def get_err_response(code):
- """
- Creates a properly formatted xml error response by a
- given HTTP response code,
- :param code: error code
- :returns: webob.response object
- """
- error_table = {
- 'AccessDenied':
- (403, 'Access denied'),
- 'BucketAlreadyExists':
- (409, 'The requested bucket name is not available'),
- 'BucketNotEmpty':
- (409, 'The bucket you tried to delete is not empty'),
- 'InvalidArgument':
- (400, 'Invalid Argument'),
- 'InvalidBucketName':
- (400, 'The specified bucket is not valid'),
- 'InvalidURI':
- (400, 'Could not parse the specified URI'),
- 'NoSuchBucket':
- (404, 'The specified bucket does not exist'),
- 'SignatureDoesNotMatch':
- (403, 'The calculated request signature does not match '\
- 'your provided one'),
- 'NoSuchKey':
- (404, 'The resource you requested does not exist'),
- 'NoSuchUpload':
- (404, 'The specified multipart upload does not exist.'),
- }
- resp = Response(content_type='text/xml')
- resp.status = error_table[code][0]
- resp.body = error_table[code][1]
- resp.body = '<?xml version="1.0" encoding="UTF-8"?>\r\n<Error>\r\n ' \
- '<Code>%s</Code>\r\n <Message>%s</Message>\r\n</Error>\r\n' \
- % (code, error_table[code][1])
- return resp
- def get_acl(account_name):
- body = ('<AccessControlPolicy>'
- '<Owner>'
- '<ID>%s</ID>'
- '</Owner>'
- '<AccessControlList>'
- '<Grant>'
- '<Grantee xmlns:xsi="http://www.w3.org/2001/'\
- 'XMLSchema-instance" xsi:type="CanonicalUser">'
- '<ID>%s</ID>'
- '</Grantee>'
- '<Permission>FULL_CONTROL</Permission>'
- '</Grant>'
- '</AccessControlList>'
- '</AccessControlPolicy>' %
- (account_name, account_name))
- return Response(body=body, content_type="text/plain")
- def canonical_string(req):
- """
- Canonicalize a request to a token that can be signed.
- """
- def unquote_v(nv):
- if len(nv) == 1:
- return nv
- else:
- return (nv[0], unquote(nv[1]))
- amz_headers = {}
- buf = "%s\n%s\n%s\n" % (req.method, req.headers.get('Content-MD5', ''),
- req.headers.get('Content-Type') or '')
- for amz_header in sorted((key.lower() for key in req.headers
- if key.lower().startswith('x-amz-'))):
- amz_headers[amz_header] = req.headers[amz_header]
- if 'x-amz-date' in amz_headers:
- buf += "\n"
- elif 'Date' in req.headers:
- buf += "%s\n" % req.headers['Date']
- for k in sorted(key.lower() for key in amz_headers):
- buf += "%s:%s\n" % (k, amz_headers[k])
- # don't include anything after the first ? in the resource...
- # unless it is one of the QSA of interest, defined above
- parts = req.path_qs.split('?')
- buf += parts[0]
- if len(parts) > 1:
- qsa = parts[1].split('&')
- qsa = [a.split('=', 1) for a in qsa]
- qsa = [unquote_v(a) for a in qsa if a[0] in qsa_of_interest]
- if len(qsa) > 0:
- qsa.sort(cmp=lambda x, y: cmp(x[0], y[0]))
- qsa = ['='.join(a) for a in qsa]
- buf += '?'
- buf += '&'.join(qsa)
- return buf
- def check_container_name_no_such_bucket_error(container_name):
- """Checks that user do not tries to operate with MPU container"""
- if container_name.startswith(MULTIPART_UPLOAD_PREFIX):
- return get_err_response('NoSuchBucket')
- def check_container_name_invalid_bucket_name_error(container_name):
- """Checks that user do not tries to operate with MPU container"""
- if container_name.startswith(MULTIPART_UPLOAD_PREFIX):
- return get_err_response('InvalidBucketName')
- def meta_request_head(req, meta_path, app):
- """
- HEAD request to check that meta file presents and
- multipart upload is in progress.
- """
- meta_req = req.copy()
- meta_req.method = 'HEAD'
- meta_req.body = ''
- meta_req.upath_info = meta_path
- meta_req.GET.clear()
- return meta_req.get_response(app)
- class ServiceController(object):
- """
- Handles account level requests.
- """
- def __init__(self, env, app, account_name, token, **kwargs):
- self.app = app
- env['HTTP_X_AUTH_TOKEN'] = token
- env['PATH_INFO'] = '/v1/%s' % account_name
- def GET(self, req):
- """
- Handle GET Service request
- """
- req.GET.clear()
- req.GET['format'] = 'json'
- resp = req.get_response(self.app)
- status = resp.status_int
- body = resp.body
- if status != 200:
- if status == 401:
- return get_err_response('AccessDenied')
- else:
- return get_err_response('InvalidURI')
- containers = json.loads(body)
- # we don't keep the creation time of a bucket (s3cmd doesn't
- # work without that) so we use some bogus
- body = '<?xml version="1.0" encoding="UTF-8"?>' \
- '<ListAllMyBucketsResult ' \
- 'xmlns="http://s3.amazonaws.com/doc/2006-03-01/">' \
- '<Buckets>%s</Buckets>' \
- '</ListAllMyBucketsResult>' \
- % ("".join(['<Bucket><Name>%s</Name><CreationDate>' \
- '2009-02-03T16:45:09.000Z</CreationDate></Bucket>' %
- xml_escape(i['name']) for i in containers if \
- not i['name'].startswith(MULTIPART_UPLOAD_PREFIX)]))
- # we shold not show multipart buckets here
- return Response(status=200, content_type='application/xml', body=body)
- class BucketController(object):
- """
- Handles bucket requests.
- """
- def __init__(self, env, app, account_name, token, container_name,
- **kwargs):
- self.app = app
- self.container_name = unquote(container_name)
- self.account_name = unquote(account_name)
- env['HTTP_X_AUTH_TOKEN'] = token
- env['PATH_INFO'] = '/v1/%s/%s' % (account_name, container_name)
- def get_uploads(self, req):
- """Handles listing of in-progress multipart uploads"""
- acl = req.GET.get('acl')
- params = MultiDict([('format', 'json')])
- max_uploads = req.GET.get('max-uploads')
- if (max_uploads is not None and max_uploads.isdigit()):
- max_uploads = min(int(max_uploads), MAX_UPLOADS_LISTING)
- else:
- max_uploads = MAX_UPLOADS_LISTING
- params['limit'] = str(max_uploads + 1)
- for param_name in ('key-marker', 'prefix', 'delimiter',
- 'upload-id-marker'):
- if param_name in req.GET:
- params[param_name] = req.GET[param_name]
- cont_name = MULTIPART_UPLOAD_PREFIX + self.container_name
- cont_path = "/v1/%s/%s/" % (self.account_name, cont_name)
- req.upath_info = cont_path
- req.GET.clear()
- req.GET.update(params)
- resp = req.get_response(self.app)
- status = resp.status_int
- if status != 200:
- if status == 401:
- return get_err_response('AccessDenied')
- elif status == 404:
- return get_err_response('InvalidBucketName')
- else:
- return get_err_response('InvalidURI')
- if acl is not None:
- return get_acl(self.account_name)
- objects = json.loads(resp.body)
- uploads = ''
- splited_name = ''
- for obj in objects:
- if obj['name'].endswith('/meta'):
- splited_name = obj['name'].split('/')
- uploads = uploads.join(
- "<Upload>"
- "<Key>%s</Key>"
- "<UploadId>%s</UploadId>"
- "<Initiator>"
- "<ID>%s</ID>"
- "<DisplayName>%s</DisplayName>"
- "</Initiator>"
- "<Owner>"
- "<ID>%s</ID>"
- "<DisplayName>%s</DisplayName>"
- "</Owner>"
- "<StorageClass>STANDARD</StorageClass>"
- "<Initiated>%sZ</Initiated>"
- "</Upload>" % (
- splited_name[0],
- splited_name[1],
- self.account_name,
- self.account_name,
- self.account_name,
- self.account_name,
- obj['last_modified'][:-3]))
- else:
- objects.remove(obj)
- #TODO: Currently there are less then max_uploads results
- # in a response; Amount of uploads == amount of meta files
- # received in a request for a list of objects in a bucket.
- if len(objects) == (max_uploads + 1):
- is_truncated = 'true'
- next_key_marker = splited_name[0]
- next_uploadId_marker = splited_name[1]
- else:
- is_truncated = 'false'
- next_key_marker = next_uploadId_marker = ''
- body = ('<?xml version="1.0" encoding="UTF-8"?>'
- '<ListMultipartUploadsResult '
- 'xmlns="http://s3.amazonaws.com/doc/2006-03-01/">'
- '<Bucket>%s</Bucket>'
- '<KeyMarker>%s</KeyMarker>'
- '<UploadIdMarker>%s</UploadIdMarker>'
- '<NextKeyMarker>%s</NextKeyMarker>'
- '<NextUploadIdMarker>%s</NextUploadIdMarker>'
- '<MaxUploads>%s</MaxUploads>'
- '<IsTruncated>%s</IsTruncated>'
- '%s'
- '</ListMultipartUploadsResult>' %
- (
- xml_escape(self.container_name),
- xml_escape(params.get('key-marker', '')),
- xml_escape(params.get('upload-id-marker', '')),
- next_key_marker,
- next_uploadId_marker,
- max_uploads,
- is_truncated,
- uploads
- )
- )
- return Response(body=body, content_type='application/xml')
- def GET(self, req):
- """
- Handles listing of in-progress multipart uploads,
- handles list objects request.
- """
- # any operations with multipart buckets are not allowed to user
- check_container_name_no_such_bucket_error(self.container_name)
- if 'uploads' in req.GET:
- return self.get_uploads(req)
- else:
- acl = req.GET.get('acl')
- params = MultiDict([('format', 'json')])
- max_keys = req.GET.get('max-keys')
- if (max_keys is not None and max_keys.isdigit()):
- max_keys = min(int(max_keys), MAX_BUCKET_LISTING)
- else:
- max_keys = MAX_BUCKET_LISTING
- params['limit'] = str(max_keys + 1)
- for param_name in ('marker', 'prefix', 'delimiter'):
- if param_name in req.GET:
- params[param_name] = req.GET[param_name]
- req.GET.clear()
- req.GET.update(params)
- resp = req.get_response(self.app)
- status = resp.status_int
- body = resp.body
- if status != 200:
- if status == 401:
- return get_err_response('AccessDenied')
- elif status == 404:
- return get_err_response('InvalidBucketName')
- else:
- return get_err_response('InvalidURI')
- if acl is not None:
- return get_acl(self.account_name)
- objects = json.loads(resp.body)
- body = ('<?xml version="1.0" encoding="UTF-8"?>'
- '<ListBucketResult '
- 'xmlns="http://s3.amazonaws.com/doc/2006-03-01/">'
- '<Prefix>%s</Prefix>'
- '<Marker>%s</Marker>'
- '<Delimiter>%s</Delimiter>'
- '<IsTruncated>%s</IsTruncated>'
- '<MaxKeys>%s</MaxKeys>'
- '<Name>%s</Name>'
- '%s'
- '%s'
- '</ListBucketResult>' %
- (
- xml_escape(params.get('prefix', '')),
- xml_escape(params.get('marker', '')),
- xml_escape(params.get('delimiter', '')),
- 'true' if len(objects) == (max_keys + 1) else 'false',
- max_keys,
- xml_escape(self.container_name),
- "".join(['<Contents><Key>%s</Key><LastModified>%sZ</Last'\
- 'Modified><ETag>%s</ETag><Size>%s</Size><Storage'\
- 'Class>STANDARD</StorageClass></Contents>' %
- (xml_escape(i['name']), i['last_modified'][:-3],
- i['hash'], i['bytes'])
- for i in objects[:max_keys] if 'subdir' not in i]),
- "".join(['<CommonPrefixes><Prefix>%s</Prefix></Common'\
- 'Prefixes>' % xml_escape(i['subdir'])
- for i in objects[:max_keys] if 'subdir' in i])))
- return Response(body=body, content_type='application/xml')
- def PUT(self, req):
- """
- Handles PUT Bucket request.
- """
- # any operations with multipart buckets are not allowed to user
- check_container_name_invalid_bucket_name_error(self.container_name)
- resp = req.get_response(self.app)
- status = resp.status_int
- if status != 201:
- if status == 401:
- return get_err_response('AccessDenied')
- elif status == 202:
- return get_err_response('BucketAlreadyExists')
- else:
- return get_err_response('InvalidURI')
- resp = Response()
- resp.headers.add('Location', self.container_name)
- resp.status = 200
- return resp
- def mpu_bucket_deletion_list_request(self, req, cont_path):
- """This method returns listing of MPU bucket for deletion"""
- list_req = req.copy()
- list_req.method = 'GET'
- list_req.upath_info = cont_path
- list_req.GET.clear()
- list_req.GET['format'] = 'json'
- return list_req.get_response(self.app)
- def mpu_bucket_deletion(self, req):
- """
- This method checks if MPU bucket exists and
- if there are any active MPUs are in it.
- MPUs are aborted, uploaded parts are deleted.
- """
- cont_name = MULTIPART_UPLOAD_PREFIX + self.container_name
- cont_path = "/v1/%s/%s/" % (self.account_name, cont_name)
- list_resp = self.mpu_bucket_deletion_list_request(req, cont_path)
- status = list_resp.status_int
- if status != 200:
- if status == 401:
- return get_err_response('AccessDenied')
- elif status == 404:
- # there is no MPU bucket, it's OK, there is only regular bucket
- pass
- else:
- return get_err_response('InvalidURI')
- elif status == 200:
- # aborting multipart uploads by deleting meta and other files
- objects = json.loads(list_resp.body)
- for obj in objects:
- if obj['name'].endswith('/meta'):
- for mpu_obj in objects:
- if mpu_obj['name'].startswith(obj['name'][:-5]):
- obj_req = req.copy()
- obj_req.upath_info = "%s%s" % (cont_path,
- mpu_obj['name'])
- obj_req.GET.clear()
- obj_resp = obj_req.get_response(self.app)
- status = obj_resp.status_int
- #TODO: Add some logs here
- if status not in (200, 204):
- if status == 401:
- return get_err_response('AccessDenied')
- elif status == 404:
- return get_err_response('NoSuchKey')
- else:
- return get_err_response('InvalidURI')
- # deleting multipart bucket
- del_mpu_req = req.copy()
- del_mpu_req.upath_info = cont_path
- del_mpu_req.GET.clear()
- del_mpu_resp = del_mpu_req.get_response(self.app)
- status = del_mpu_resp.status_int
- if status != 204:
- if status == 401:
- return get_err_response('AccessDenied')
- elif status == 404:
- return get_err_response('InvalidBucketName')
- elif status == 409:
- return get_err_response('BucketNotEmpty')
- else:
- return get_err_response('InvalidURI')
- return Response(status=204)
- def DELETE(self, req):
- """
- Handles DELETE Bucket request.
- Also deletes multipart bucket if it exists.
- Aborts all multipart uploads initiated for this bucket.
- """
- # any operations with multipart buckets are not allowed to user
- check_container_name_no_such_bucket_error(self.container_name)
- # deleting regular bucket,
- # request is copied to save valid authorization
- del_req = req.copy()
- resp = del_req.get_response(self.app)
- status = resp.status_int
- if status != 204:
- if status == 401:
- return get_err_response('AccessDenied')
- elif status == 404:
- return get_err_response('InvalidBucketName')
- elif status == 409:
- return get_err_response('BucketNotEmpty')
- else:
- return get_err_response('InvalidURI')
- # check if there is a multipart bucket and
- # return 204 when everything is deleted
- return self.mpu_bucket_deletion(req)
- class NormalObjectController(object):
- """
- Handles requests on objects.
- """
- def __init__(self, env, app, account_name, token, container_name,
- object_name, **kwargs):
- self.app = app
- self.account_name = unquote(account_name)
- self.container_name = unquote(container_name)
- self.object_name = unquote(object_name)
- env['HTTP_X_AUTH_TOKEN'] = token
- env['PATH_INFO'] = '/v1/%s/%s/%s' % (account_name, container_name,
- object_name)
- def GETorHEAD(self, req):
- resp = req.get_response(self.app)
- status = resp.status_int
- headers = resp.headers
- app_iter = resp.app_iter
- if 200 <= status < 300:
- if 'acl' in req.GET:
- return get_acl(self.account_name)
- new_hdrs = {}
- for key, val in headers.iteritems():
- _key = key.lower()
- if _key.startswith('x-object-meta-'):
- new_hdrs['x-amz-meta-' + key[14:]] = val
- elif _key in ('content-length', 'content-type',
- 'content-encoding', 'etag', 'last-modified'):
- new_hdrs[key] = val
- return Response(status=status, headers=new_hdrs, app_iter=app_iter)
- elif status == 401:
- return get_err_response('AccessDenied')
- elif status == 404:
- return get_err_response('NoSuchKey')
- else:
- return get_err_response('InvalidURI')
- def HEAD(self, req):
- """
- Handles HEAD Object request.
- """
- return self.GETorHEAD(req)
- def GET(self, req):
- """
- Handles GET Object request.
- """
- return self.GETorHEAD(req)
- def PUT(self, req):
- """
- Handles PUT Object and PUT Object (Copy) request.
- """
- environ = req.environ
- for key, value in environ.items():
- if key.startswith('HTTP_X_AMZ_META_'):
- del environ[key]
- environ['HTTP_X_OBJECT_META_' + key[16:]] = value
- elif key == 'HTTP_CONTENT_MD5':
- environ['HTTP_ETAG'] = value.decode('base64').encode('hex')
- elif key == 'HTTP_X_AMZ_COPY_SOURCE':
- environ['HTTP_X_COPY_FROM'] = value
- resp = req.get_response(self.app)
- status = resp.status_int
- headers = resp.headers
- if status != 201:
- if status == 401:
- return get_err_response('AccessDenied')
- elif status == 404:
- return get_err_response('InvalidBucketName')
- else:
- return get_err_response('InvalidURI')
- if 'HTTP_X_COPY_FROM' in environ:
- body = '<CopyObjectResult>' \
- '<ETag>"%s"</ETag>' \
- '</CopyObjectResult>' % headers['ETag']
- return Response(status=200, body=body)
- return Response(status=200, etag=headers['ETag'])
- def DELETE(self, req):
- """
- Handles DELETE Object request.
- """
- resp = req.get_response(self.app)
- status = resp.status_int
- if status not in (200, 204):
- if status == 401:
- return get_err_response('AccessDenied')
- elif status == 404:
- return get_err_response('NoSuchKey')
- else:
- return get_err_response('InvalidURI')
- return Response(status=204)
- class MultiPartObjectController(object):
- def __init__(self, env, app, account_name, token, container_name,
- object_name, **kwargs):
- self.app = app
- self.account_name = unquote(account_name)
- self.container_name = unquote(container_name)
- self.object_name = unquote(object_name)
- self.orig_path_info = env['PATH_INFO']
- env['HTTP_X_AUTH_TOKEN'] = token
- env['PATH_INFO'] = '/v1/%s/%s/%s' % (account_name, container_name,
- object_name)
- def GET(self, req):
- """
- Lists multipart uploads by uploadId.
- """
- # any operations with multipart buckets are not allowed to user
- check_container_name_no_such_bucket_error(self.container_name)
- upload_id = req.GET.get('uploadId')
- max_parts = req.GET.get('max-parts', '1000')
- part_number_marker = req.GET.get('part-number-marker', '')
- try:
- int(upload_id, 16)
- max_parts = int(max_parts)
- if part_number_marker:
- part_number_marker = int(part_number_marker)
- except (TypeError, ValueError):
- return get_err_response('InvalidURI')
- object_name_prefix_len = len(self.object_name) + 1
- cont_name = MULTIPART_UPLOAD_PREFIX + self.container_name
- cont_path = "/v1/%s/%s/" % (self.account_name, cont_name)
- meta_path = "%s%s/%s/meta" % (cont_path,
- self.object_name,
- upload_id)
- meta_resp = meta_request_head(req, meta_path, self.app)
- status = meta_resp.status_int
- if status != 200:
- return get_err_response('NoSuchUpload')
- list_req = req.copy()
- list_req.upath_info = cont_path
- list_req.GET.clear()
- list_req.GET['format'] = 'json'
- list_req.GET['prefix'] = "%s/%s/%s/part/" % (cont_name,
- self.object_name,
- upload_id)
- list_req.GET['limit'] = str(max_parts + 1)
- if part_number_marker:
- list_req.GET['marker'] = "%s/%s/part/%s" % (self.object_name,
- upload_id,
- part_number_marker)
- resp = list_req.get_response(self.app)
- status = resp.status_int
- if status != 200:
- if status == 401:
- return get_err_response('AccessDenied')
- elif status == 404:
- return get_err_response('InvalidBucketName')
- else:
- return get_err_response('InvalidURI')
- objects = json.loads(resp.body)
- if len(objects) > max_parts:
- objects = objects.pop(-1)
- next_marker = objects[-1]['name'][object_name_prefix_len:]
- is_truncated = 'true'
- else:
- next_marker = ''
- is_truncated = 'false'
- if next_marker:
- next_marker = "<NextPartNumberMarker>%</NextPartNumberMarker>" % \
- next_marker
- if part_number_marker:
- part_number_marker = "<PartNumberMarker>%</PartNumberMarker>" % \
- part_number_marker
- parts = ''.join(("<Part>"
- "<PartNumber>%s</PartNumber>"
- "<LastModified>%sZ</LastModified>"
- "<ETag>\"%s\"</ETag>"
- "<Size>%s</Size>"
- "</Part>" % (
- obj['name'][object_name_prefix_len:],
- obj['last_modified'][:-3],
- obj['hash'],
- obj['bytes']) for obj in objects))
- body = (
- "<?xml version=\"1.0\" encoding=\"UTF-8\"?>"
- "<ListPartsResult xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">"
- "<Bucket>%s</Bucket>"
- "<Key>%s</Key>"
- "<UploadId>%s</UploadId>"
- "<Initiator>"
- "<ID>%s</ID>"
- "<DisplayName>%s</DisplayName>"
- "</Initiator>"
- "<Owner>"
- "<ID>%s</ID>"
- "<DisplayName>%s</DisplayName>"
- "</Owner>"
- "<StorageClass>STANDARD</StorageClass>"
- "%s%s"
- "<MaxParts>%s</MaxParts>"
- "<IsTruncated>%s</IsTruncated>"
- "%s"
- "</ListPartsResult>" % (
- self.container_name,
- self.object_name,
- upload_id,
- self.account_name,
- self.account_name,
- self.account_name,
- self.account_name,
- part_number_marker,
- next_marker,
- max_parts,
- is_truncated,
- parts,
- ))
- return Response(status=200,
- body=body,
- content_type='application/xml')
- def post_uploads_container_request(self, req, cont_path):
- """Method used to create a container for MPU."""
- cont_req = req.copy()
- cont_req.method = 'PUT'
- cont_req.upath_info = cont_path
- cont_req.GET.clear()
- return cont_req.get_response(self.app)
- def post_uploads_put_meta_req(self, req, cont_path, upload_id):
- """Method to create a MPU metafile."""
- meta_req = req.copy()
- meta_req.method = 'PUT'
- meta_req.upath_info = "%s%s/%s/meta" % (cont_path,
- self.object_name,
- upload_id)
- for header, value in meta_req.headers.items():
- if header.lower().startswith('x-amz-meta-'):
- meta_req.headers['X-Object-Meta-Amz-' + header[11:]] = \
- value
- return meta_req.get_response(self.app)
- def post_uploads(self, req):
- """
- Called if POST with 'uploads' query string was received.
- Creates metafile which is used as a flag on uncompleted MPU.
- Initiates multipart upload.
- """
- cont_name = MULTIPART_UPLOAD_PREFIX + self.container_name
- cont_path = "/v1/%s/%s/" % (self.account_name, cont_name)
- cont_req = req.copy()
- cont_req.method = 'HEAD'
- cont_req.upath_info = cont_path
- cont_req.GET.clear()
- cont_resp = cont_req.get_response(self.app)
- status = cont_resp.status_int
- if status == 404:
- # creating container for MPU
- cont_resp = self.post_uploads_container_request(req, cont_path)
- status = cont_resp.status_int
- if status not in (201, 204):
- if status == 401:
- return get_err_response('AccessDenied')
- elif status == 404:
- return get_err_response('InvalidBucketName')
- else:
- return get_err_response('InvalidURI')
- upload_id = uuid.uuid4().hex
- meta_resp = self.post_uploads_put_meta_req(req, cont_path, upload_id)
- status = meta_resp.status_int
- if status != 201:
- if status == 401:
- return get_err_response('AccessDenied')
- elif status == 404:
- return get_err_response('InvalidBucketName')
- else:
- return get_err_response('InvalidURI')
- body = ('<?xml version="1.0" encoding="UTF-8"?>'
- '<InitiateMultipartUploadResult '\
- 'xmlns="http://s3.amazonaws.com/doc/2006-03-01/">'
- '<Bucket>%s</Bucket>'
- '<Key>%s</Key>'
- '<UploadId>%s</UploadId>'
- '</InitiateMultipartUploadResult>' %
- (self.container_name, self.object_name, upload_id))
- return Response(status=200,
- body=body,
- content_type='application/xml')
- def post_uploadId(self, req):
- """
- Called if POST with 'uploadId' query string was received.
- Deletes metafile after completion of MPU.
- Completes multipart upload.
- """
- upload_id = req.GET.get('uploadId')
- try:
- int(upload_id, 16)
- except (TypeError, ValueError):
- return get_err_response('InvalidURI')
- cont_name = MULTIPART_UPLOAD_PREFIX + self.container_name
- cont_path = "/v1/%s/%s/" % (self.account_name, cont_name)
- meta_path = "%s%s/%s/meta" % (cont_path,
- self.object_name,
- upload_id)
- meta_resp = meta_request_head(req, meta_path, self.app)
- status = meta_resp.status_int
- if status != 200:
- if status == 401:
- return get_err_response('AccessDenied')
- elif status == 404:
- return get_err_response('NoSuchUpload')
- else:
- return get_err_response('InvalidURI')
- # TODO: Validate uploaded parts.
- manifest_path = MULTIPART_UPLOAD_PREFIX + \
- "%s/%s/%s/part/" % (self.container_name,
- self.object_name,
- upload_id)
- manifest_req = req.copy()
- manifest_req.method = 'PUT'
- manifest_req.GET.clear()
- manifest_req.headers['X-Object-Manifest'] = manifest_path
- for header, value in meta_resp.headers.iteritems():
- if header.lower().startswith('x-object-meta-amz-'):
- manifest_req.headers['x-amz-meta-' + header[18:]] = value
- manifest_resp = manifest_req.get_response(self.app)
- status = manifest_resp.status_int
- if status == 201:
- finish_req = req.copy()
- finish_req.method = 'DELETE'
- finish_req.upath_info = meta_path
- finish_req.body = ''
- finish_req.GET.clear()
- finish_resp = finish_req.get_response(self.app)
- status = finish_resp.status_int
- if status not in (201, 204):
- if status == 401:
- return get_err_response('AccessDenied')
- elif status == 404:
- return get_err_response('InvalidBucketName')
- else:
- return get_err_response('InvalidURI')
- body = ('<?xml version="1.0" encoding="UTF-8"?>'
- '<CompleteMultipartUploadResult '\
- 'xmlns="http://s3.amazonaws.com/doc/2006-03-01/">'
- '<Location>%s</Location>'
- '<Bucket>%s</Bucket>'
- '<Key>%s</Key>'
- '<ETag>%s</ETag>'
- '</CompleteMultipartUploadResult>' %
- (self.orig_path_info,
- self.container_name,
- self.object_name,
- manifest_resp.headers['ETag']))
- return Response(status=200,
- body=body,
- content_type='application/xml')
- def POST(self, req):
- """
- Initiate and complete multipart upload.
- """
- # any operations with multipart buckets are not allowed to user
- check_container_name_invalid_bucket_name_error(self.container_name)
- if 'uploads' in req.GET:
- return self.post_uploads(req)
- elif 'uploadId' in req.GET:
- return self.post_uploadId(req)
- return get_err_response('InvalidURI')
- def PUT(self, req):
- """
- Upload part of a multipart upload.
- """
- upload_id = req.GET.get('uploadId')
- part_number = req.GET.get('partNumber', '')
- try:
- int(upload_id, 16)
- except (TypeError, ValueError):
- return get_err_response('InvalidURI')
- if not part_number.isdigit():
- return get_err_response('InvalidURI')
- # any operations with multipart buckets are not allowed to user
- check_container_name_invalid_bucket_name_error(self.container_name)
- cont_name = MULTIPART_UPLOAD_PREFIX + self.container_name
- cont_path = "/v1/%s/%s/" % (self.account_name, cont_name)
- meta_path = "%s%s/%s/meta" % (cont_path, self.object_name, upload_id)
- meta_resp = meta_request_head(req, meta_path, self.app)
- status = meta_resp.status_int
- if status != 200:
- return get_err_response('NoSuchUpload')
- req = req.copy()
- req.upath_info = "%s%s/%s/part/%s" % (cont_path, self.object_name,
- upload_id, part_number)
- req.GET.clear()
- resp = req.get_response(self.app)
- status = resp.status_int
- headers = resp.headers
- if status != 201:
- if status == 401:
- return get_err_response('AccessDenied')
- elif status == 404:
- return get_err_response('InvalidBucketName')
- else:
- return get_err_response('InvalidURI')
- if 'HTTP_X_COPY_FROM' in req.environ:
- body = '<CopyObjectResult>' \
- '<ETag>"%s"</ETag>' \
- '</CopyObjectResult>' % resp.headers['ETag']
- return Response(status=200, body=body)
- return Response(status=200, etag=resp.headers['ETag'])
- def DELETE(self, req):
- """
- Aborts multipart upload by uploadId.
- """
- upload_id = req.GET.get('uploadId')
- try:
- int(upload_id, 16)
- except (TypeError, ValueError):
- return get_err_response('InvalidURI')
- # any operations with multipart buckets are not allowed to user
- check_container_name_no_such_bucket_error(self.container_name)
- cont_name = MULTIPART_UPLOAD_PREFIX + self.container_name
- cont_path = "/v1/%s/%s/" % (self.account_name, cont_name)
- prefix = "%s/%s/" % (self.object_name, upload_id)
- list_req = req.copy_get()
- list_req.upath_info = cont_path
- list_req.GET.clear()
- list_req.GET['format'] = 'json'
- list_req.GET['prefix'] = prefix
- list_resp = list_req.get_response(self.app)
- status = list_resp.status_int
- if status != 200:
- if status == 401:
- return get_err_response('AccessDenied')
- elif status == 404:
- return get_err_response('InvalidBucketName')
- else:
- return get_err_response('InvalidURI')
- objects = json.loads(list_resp.body)
- for obj in objects:
- obj_req = req.copy()
- obj_req.method = 'DELETE'
- obj_req.upath_info = "%s%s" % (cont_path, obj['name'])
- obj_req.GET.clear()
- obj_resp = obj_req.get_response(self.app)
- status = obj_resp.status_int
- if status not in (200, 204):
- if status == 401:
- return get_err_response('AccessDenied')
- elif status == 404:
- return get_err_response('NoSuchKey')
- else:
- return get_err_response('InvalidURI')
- return Response(status=204)
- class ObjectController(NormalObjectController, MultiPartObjectController):
- """Manages requests on normal and multipart objects"""
- def __init__(self, *args, **kwargs):
- MultiPartObjectController.__init__(self, *args, **kwargs)
- def GET(self, req):
- if 'uploadId' in req.GET:
- return MultiPartObjectController.GET(self, req)
- return NormalObjectController.GET(self, req)
- def PUT(self, req):
- if 'uploadId' in req.GET:
- return MultiPartObjectController.PUT(self, req)
- return NormalObjectController.PUT(self, req)
- def POST(self, req):
- if 'uploadId' in req.GET or 'uploads' in req.GET:
- return MultiPartObjectController.POST(self, req)
- return NormalObjectController.POST(self, req)
- def DELETE(self, req):
- if 'uploadId' in req.GET:
- return MultiPartObjectController.DELETE(self, req)
- obj_req = req.copy_get()
- obj_req.method = 'HEAD'
- obj_req.GET.clear()
- obj_resp = obj_req.get_response(self.app)
- status = obj_resp.status_int
- if status == 200 and 'X-Object-Manifest' in obj_resp.headers:
- manifest = obj_resp.headers['X-Object-Manifest']
- upload_id = manifest.split('/')[2]
- del_req = req.copy()
- del_req.GET['uploadId'] = upload_id
- MultiPartObjectController.DELETE(self, del_req)
- return NormalObjectController.DELETE(self, req)
- class Swift3Middleware(object):
- """Swift3 S3 compatibility midleware"""
- def __init__(self, app, conf, *args, **kwargs):
- self.app = app
- def get_controller(self, path, params):
- container, obj = split_path(path, 0, 2, True)
- d = dict(container_name=container, object_name=obj)
- if container and obj:
- return ObjectController, d
- elif container:
- return BucketController, d
- return ServiceController, d
- def __call__(self, env, start_response):
- req = Request(env)
- if 'AWSAccessKeyId' in req.GET:
- try:
- req.headers['Date'] = req.GET['Expires']
- req.headers['Authorization'] = \
- 'AWS %(AWSAccessKeyId)s:%(Signature)s' % req.GET
- except KeyError:
- return get_err_response('InvalidArgument')(env, start_response)
- if not 'Authorization' in req.headers:
- return self.app(env, start_response)
- try:
- account, signature = \
- req.headers['Authorization'].split(' ')[-1].rsplit(':', 1)
- except Exception:
- return get_err_response('InvalidArgument')(env, start_response)
- try:
- controller, path_parts = self.get_controller(req.path, req.GET)
- except ValueError:
- return get_err_response('InvalidURI')(env, start_response)
- token = base64.urlsafe_b64encode(canonical_string(req))
- controller = controller(req.environ,
- self.app,
- account,
- token,
- **path_parts)
- if hasattr(controller, req.method):
- res = getattr(controller, req.method)(req)
- else:
- return get_err_response('InvalidURI')(env, start_response)
- return res(env, start_response)
- def filter_factory(global_conf, **local_conf):
- """Standard filter factory to use the middleware with paste.deploy"""
- conf = global_conf.copy()
- conf.update(local_conf)
- def swifts3_filter(app):
- return Swift3Middleware(app, conf)
- return swifts3_filter
|