# Copyright (c) 2010 OpenStack, LLC. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or # implied. # See the License for the specific language governing permissions and # limitations under the License. """ SwiftS3 middleware emulates AWS S3 REST api on top of Swift. The following opperations are currently supported: * GET Service * DELETE Bucket (Delete bucket; abort running MPUs) * GET Bucket (List Objects; List in-progress multipart uploads) * PUT Bucket * DELETE Object * GET Object * HEAD Object * PUT Object * PUT Object (Copy) To add this middleware to your configuration, add the swifts3 middleware before the auth middleware and before any other middleware that waits for swift requests (like rate limiting). To set up your client, the access key will be the concatenation of the account and user strings that should look like test:tester, and the secret access key is the account password. The host should also point to the swift storage hostname and it should use the old style calling format, not the hostname based container format. An example client using the python boto library might look like the following for a SAIO setup: connection = boto.s3.Connection( aws_access_key_id='test:tester', aws_secret_access_key='testing', port=8080, host='127.0.0.1', is_secure=False, calling_format=boto.s3.connection.OrdinaryCallingFormat()) Note that all the operations with multipart upload buckets are denied to user, as well as multipart buckets are not listed in all buckets list. In case of GET/DELETE - NoSuchBucket error is returned; In case of PUT/POST - InvalidBucketName error is returned. """ from urllib import unquote, quote import rfc822 import hmac import base64 import uuid import errno from xml.sax.saxutils import escape as xml_escape import urlparse from webob import Request as WebObRequest, Response from webob.exc import HTTPNotFound from webob.multidict import MultiDict import simplejson as json from swift.common.utils import split_path, get_logger #XXX: In webob-1.9b copied environment contained link to the original # instance of a TrackableMultiDict which reflects to original # request. class Request(WebObRequest): def _remove_query_vars(self): if 'webob._parsed_query_vars' in self.environ: del self.environ['webob._parsed_query_vars'] def copy(self): req = super(Request, self).copy() req._remove_query_vars() return req def copy_get(self): req = super(Request, self).copy_get() req._remove_query_vars() return req MAX_BUCKET_LISTING = 1000 MAX_UPLOADS_LISTING = 1000 MULTIPART_UPLOAD_PREFIX = 'MPU.' # List of Query String Arguments of Interest qsa_of_interest = ['acl', 'defaultObjectAcl', 'location', 'logging', 'partNumber', 'policy', 'requestPayment', 'torrent', 'versioning', 'versionId', 'versions', 'website', 'uploads', 'uploadId', 'response-content-type', 'response-content-language', 'response-expires', 'response-cache-control', 'response-content-disposition', 'response-content-encoding', 'delete', 'lifecycle'] def get_err_response(code): """ Creates a properly formatted xml error response by a given HTTP response code, :param code: error code :returns: webob.response object """ error_table = { 'AccessDenied': (403, 'Access denied'), 'BucketAlreadyExists': (409, 'The requested bucket name is not available'), 'BucketNotEmpty': (409, 'The bucket you tried to delete is not empty'), 'InvalidArgument': (400, 'Invalid Argument'), 'InvalidBucketName': (400, 'The specified bucket is not valid'), 'InvalidURI': (400, 'Could not parse the specified URI'), 'NoSuchBucket': (404, 'The specified bucket does not exist'), 'SignatureDoesNotMatch': (403, 'The calculated request signature does not match '\ 'your provided one'), 'NoSuchKey': (404, 'The resource you requested does not exist'), 'NoSuchUpload': (404, 'The specified multipart upload does not exist.'), } resp = Response(content_type='text/xml') resp.status = error_table[code][0] resp.body = error_table[code][1] resp.body = '\r\n\r\n ' \ '%s\r\n %s\r\n\r\n' \ % (code, error_table[code][1]) return resp def get_acl(account_name): body = ('' '' '%s' '' '' '' '' '%s' '' 'FULL_CONTROL' '' '' '' % (account_name, account_name)) return Response(body=body, content_type="text/plain") def canonical_string(req): """ Canonicalize a request to a token that can be signed. """ def unquote_v(nv): if len(nv) == 1: return nv else: return (nv[0], unquote(nv[1])) amz_headers = {} buf = "%s\n%s\n%s\n" % (req.method, req.headers.get('Content-MD5', ''), req.headers.get('Content-Type') or '') for amz_header in sorted((key.lower() for key in req.headers if key.lower().startswith('x-amz-'))): amz_headers[amz_header] = req.headers[amz_header] if 'x-amz-date' in amz_headers: buf += "\n" elif 'Date' in req.headers: buf += "%s\n" % req.headers['Date'] for k in sorted(key.lower() for key in amz_headers): buf += "%s:%s\n" % (k, amz_headers[k]) # don't include anything after the first ? in the resource... # unless it is one of the QSA of interest, defined above parts = req.path_qs.split('?') buf += parts[0] if len(parts) > 1: qsa = parts[1].split('&') qsa = [a.split('=', 1) for a in qsa] qsa = [unquote_v(a) for a in qsa if a[0] in qsa_of_interest] if len(qsa) > 0: qsa.sort(cmp=lambda x, y: cmp(x[0], y[0])) qsa = ['='.join(a) for a in qsa] buf += '?' buf += '&'.join(qsa) return buf def check_container_name_no_such_bucket_error(container_name): """Checks that user do not tries to operate with MPU container""" if container_name.startswith(MULTIPART_UPLOAD_PREFIX): return get_err_response('NoSuchBucket') def check_container_name_invalid_bucket_name_error(container_name): """Checks that user do not tries to operate with MPU container""" if container_name.startswith(MULTIPART_UPLOAD_PREFIX): return get_err_response('InvalidBucketName') def meta_request_head(req, meta_path, app): """ HEAD request to check that meta file presents and multipart upload is in progress. """ meta_req = req.copy() meta_req.method = 'HEAD' meta_req.body = '' meta_req.upath_info = meta_path meta_req.GET.clear() return meta_req.get_response(app) class ServiceController(object): """ Handles account level requests. """ def __init__(self, env, app, account_name, token, **kwargs): self.app = app env['HTTP_X_AUTH_TOKEN'] = token env['PATH_INFO'] = '/v1/%s' % account_name def GET(self, req): """ Handle GET Service request """ req.GET.clear() req.GET['format'] = 'json' resp = req.get_response(self.app) status = resp.status_int body = resp.body if status != 200: if status == 401: return get_err_response('AccessDenied') else: return get_err_response('InvalidURI') containers = json.loads(body) # we don't keep the creation time of a bucket (s3cmd doesn't # work without that) so we use some bogus body = '' \ '' \ '%s' \ '' \ % ("".join(['%s' \ '2009-02-03T16:45:09.000Z' % xml_escape(i['name']) for i in containers if \ not i['name'].startswith(MULTIPART_UPLOAD_PREFIX)])) # we shold not show multipart buckets here return Response(status=200, content_type='application/xml', body=body) class BucketController(object): """ Handles bucket requests. """ def __init__(self, env, app, account_name, token, container_name, **kwargs): self.app = app self.container_name = unquote(container_name) self.account_name = unquote(account_name) env['HTTP_X_AUTH_TOKEN'] = token env['PATH_INFO'] = '/v1/%s/%s' % (account_name, container_name) def get_uploads(self, req): """Handles listing of in-progress multipart uploads""" acl = req.GET.get('acl') params = MultiDict([('format', 'json')]) max_uploads = req.GET.get('max-uploads') if (max_uploads is not None and max_uploads.isdigit()): max_uploads = min(int(max_uploads), MAX_UPLOADS_LISTING) else: max_uploads = MAX_UPLOADS_LISTING params['limit'] = str(max_uploads + 1) for param_name in ('key-marker', 'prefix', 'delimiter', 'upload-id-marker'): if param_name in req.GET: params[param_name] = req.GET[param_name] cont_name = MULTIPART_UPLOAD_PREFIX + self.container_name cont_path = "/v1/%s/%s/" % (self.account_name, cont_name) req.upath_info = cont_path req.GET.clear() req.GET.update(params) resp = req.get_response(self.app) status = resp.status_int if status != 200: if status == 401: return get_err_response('AccessDenied') elif status == 404: return get_err_response('InvalidBucketName') else: return get_err_response('InvalidURI') if acl is not None: return get_acl(self.account_name) objects = json.loads(resp.body) uploads = '' splited_name = '' for obj in objects: if obj['name'].endswith('/meta'): splited_name = obj['name'].split('/') uploads = uploads.join( "" "%s" "%s" "" "%s" "%s" "" "" "%s" "%s" "" "STANDARD" "%sZ" "" % ( splited_name[0], splited_name[1], self.account_name, self.account_name, self.account_name, self.account_name, obj['last_modified'][:-3])) else: objects.remove(obj) #TODO: Currently there are less then max_uploads results # in a response; Amount of uploads == amount of meta files # received in a request for a list of objects in a bucket. if len(objects) == (max_uploads + 1): is_truncated = 'true' next_key_marker = splited_name[0] next_uploadId_marker = splited_name[1] else: is_truncated = 'false' next_key_marker = next_uploadId_marker = '' body = ('' '' '%s' '%s' '%s' '%s' '%s' '%s' '%s' '%s' '' % ( xml_escape(self.container_name), xml_escape(params.get('key-marker', '')), xml_escape(params.get('upload-id-marker', '')), next_key_marker, next_uploadId_marker, max_uploads, is_truncated, uploads ) ) return Response(body=body, content_type='application/xml') def GET(self, req): """ Handles listing of in-progress multipart uploads, handles list objects request. """ # any operations with multipart buckets are not allowed to user check_container_name_no_such_bucket_error(self.container_name) if 'uploads' in req.GET: return self.get_uploads(req) else: acl = req.GET.get('acl') params = MultiDict([('format', 'json')]) max_keys = req.GET.get('max-keys') if (max_keys is not None and max_keys.isdigit()): max_keys = min(int(max_keys), MAX_BUCKET_LISTING) else: max_keys = MAX_BUCKET_LISTING params['limit'] = str(max_keys + 1) for param_name in ('marker', 'prefix', 'delimiter'): if param_name in req.GET: params[param_name] = req.GET[param_name] req.GET.clear() req.GET.update(params) resp = req.get_response(self.app) status = resp.status_int body = resp.body if status != 200: if status == 401: return get_err_response('AccessDenied') elif status == 404: return get_err_response('InvalidBucketName') else: return get_err_response('InvalidURI') if acl is not None: return get_acl(self.account_name) objects = json.loads(resp.body) body = ('' '' '%s' '%s' '%s' '%s' '%s' '%s' '%s' '%s' '' % ( xml_escape(params.get('prefix', '')), xml_escape(params.get('marker', '')), xml_escape(params.get('delimiter', '')), 'true' if len(objects) == (max_keys + 1) else 'false', max_keys, xml_escape(self.container_name), "".join(['%s%sZ%s%sSTANDARD' % (xml_escape(i['name']), i['last_modified'][:-3], i['hash'], i['bytes']) for i in objects[:max_keys] if 'subdir' not in i]), "".join(['%s' % xml_escape(i['subdir']) for i in objects[:max_keys] if 'subdir' in i]))) return Response(body=body, content_type='application/xml') def PUT(self, req): """ Handles PUT Bucket request. """ # any operations with multipart buckets are not allowed to user check_container_name_invalid_bucket_name_error(self.container_name) resp = req.get_response(self.app) status = resp.status_int if status != 201: if status == 401: return get_err_response('AccessDenied') elif status == 202: return get_err_response('BucketAlreadyExists') else: return get_err_response('InvalidURI') resp = Response() resp.headers.add('Location', self.container_name) resp.status = 200 return resp def mpu_bucket_deletion_list_request(self, req, cont_path): """This method returns listing of MPU bucket for deletion""" list_req = req.copy() list_req.method = 'GET' list_req.upath_info = cont_path list_req.GET.clear() list_req.GET['format'] = 'json' return list_req.get_response(self.app) def mpu_bucket_deletion(self, req): """ This method checks if MPU bucket exists and if there are any active MPUs are in it. MPUs are aborted, uploaded parts are deleted. """ cont_name = MULTIPART_UPLOAD_PREFIX + self.container_name cont_path = "/v1/%s/%s/" % (self.account_name, cont_name) list_resp = self.mpu_bucket_deletion_list_request(req, cont_path) status = list_resp.status_int if status != 200: if status == 401: return get_err_response('AccessDenied') elif status == 404: # there is no MPU bucket, it's OK, there is only regular bucket pass else: return get_err_response('InvalidURI') elif status == 200: # aborting multipart uploads by deleting meta and other files objects = json.loads(list_resp.body) for obj in objects: if obj['name'].endswith('/meta'): for mpu_obj in objects: if mpu_obj['name'].startswith(obj['name'][:-5]): obj_req = req.copy() obj_req.upath_info = "%s%s" % (cont_path, mpu_obj['name']) obj_req.GET.clear() obj_resp = obj_req.get_response(self.app) status = obj_resp.status_int #TODO: Add some logs here if status not in (200, 204): if status == 401: return get_err_response('AccessDenied') elif status == 404: return get_err_response('NoSuchKey') else: return get_err_response('InvalidURI') # deleting multipart bucket del_mpu_req = req.copy() del_mpu_req.upath_info = cont_path del_mpu_req.GET.clear() del_mpu_resp = del_mpu_req.get_response(self.app) status = del_mpu_resp.status_int if status != 204: if status == 401: return get_err_response('AccessDenied') elif status == 404: return get_err_response('InvalidBucketName') elif status == 409: return get_err_response('BucketNotEmpty') else: return get_err_response('InvalidURI') return Response(status=204) def DELETE(self, req): """ Handles DELETE Bucket request. Also deletes multipart bucket if it exists. Aborts all multipart uploads initiated for this bucket. """ # any operations with multipart buckets are not allowed to user check_container_name_no_such_bucket_error(self.container_name) # deleting regular bucket, # request is copied to save valid authorization del_req = req.copy() resp = del_req.get_response(self.app) status = resp.status_int if status != 204: if status == 401: return get_err_response('AccessDenied') elif status == 404: return get_err_response('InvalidBucketName') elif status == 409: return get_err_response('BucketNotEmpty') else: return get_err_response('InvalidURI') # check if there is a multipart bucket and # return 204 when everything is deleted return self.mpu_bucket_deletion(req) class NormalObjectController(object): """ Handles requests on objects. """ def __init__(self, env, app, account_name, token, container_name, object_name, **kwargs): self.app = app self.account_name = unquote(account_name) self.container_name = unquote(container_name) self.object_name = unquote(object_name) env['HTTP_X_AUTH_TOKEN'] = token env['PATH_INFO'] = '/v1/%s/%s/%s' % (account_name, container_name, object_name) def GETorHEAD(self, req): resp = req.get_response(self.app) status = resp.status_int headers = resp.headers app_iter = resp.app_iter if 200 <= status < 300: if 'acl' in req.GET: return get_acl(self.account_name) new_hdrs = {} for key, val in headers.iteritems(): _key = key.lower() if _key.startswith('x-object-meta-'): new_hdrs['x-amz-meta-' + key[14:]] = val elif _key in ('content-length', 'content-type', 'content-encoding', 'etag', 'last-modified'): new_hdrs[key] = val return Response(status=status, headers=new_hdrs, app_iter=app_iter) elif status == 401: return get_err_response('AccessDenied') elif status == 404: return get_err_response('NoSuchKey') else: return get_err_response('InvalidURI') def HEAD(self, req): """ Handles HEAD Object request. """ return self.GETorHEAD(req) def GET(self, req): """ Handles GET Object request. """ return self.GETorHEAD(req) def PUT(self, req): """ Handles PUT Object and PUT Object (Copy) request. """ environ = req.environ for key, value in environ.items(): if key.startswith('HTTP_X_AMZ_META_'): del environ[key] environ['HTTP_X_OBJECT_META_' + key[16:]] = value elif key == 'HTTP_CONTENT_MD5': environ['HTTP_ETAG'] = value.decode('base64').encode('hex') elif key == 'HTTP_X_AMZ_COPY_SOURCE': environ['HTTP_X_COPY_FROM'] = value resp = req.get_response(self.app) status = resp.status_int headers = resp.headers if status != 201: if status == 401: return get_err_response('AccessDenied') elif status == 404: return get_err_response('InvalidBucketName') else: return get_err_response('InvalidURI') if 'HTTP_X_COPY_FROM' in environ: body = '' \ '"%s"' \ '' % headers['ETag'] return Response(status=200, body=body) return Response(status=200, etag=headers['ETag']) def DELETE(self, req): """ Handles DELETE Object request. """ resp = req.get_response(self.app) status = resp.status_int if status not in (200, 204): if status == 401: return get_err_response('AccessDenied') elif status == 404: return get_err_response('NoSuchKey') else: return get_err_response('InvalidURI') return Response(status=204) class MultiPartObjectController(object): def __init__(self, env, app, account_name, token, container_name, object_name, **kwargs): self.app = app self.account_name = unquote(account_name) self.container_name = unquote(container_name) self.object_name = unquote(object_name) self.orig_path_info = env['PATH_INFO'] env['HTTP_X_AUTH_TOKEN'] = token env['PATH_INFO'] = '/v1/%s/%s/%s' % (account_name, container_name, object_name) def GET(self, req): """ Lists multipart uploads by uploadId. """ # any operations with multipart buckets are not allowed to user check_container_name_no_such_bucket_error(self.container_name) upload_id = req.GET.get('uploadId') max_parts = req.GET.get('max-parts', '1000') part_number_marker = req.GET.get('part-number-marker', '') try: int(upload_id, 16) max_parts = int(max_parts) if part_number_marker: part_number_marker = int(part_number_marker) except (TypeError, ValueError): return get_err_response('InvalidURI') object_name_prefix_len = len(self.object_name) + 1 cont_name = MULTIPART_UPLOAD_PREFIX + self.container_name cont_path = "/v1/%s/%s/" % (self.account_name, cont_name) meta_path = "%s%s/%s/meta" % (cont_path, self.object_name, upload_id) meta_resp = meta_request_head(req, meta_path, self.app) status = meta_resp.status_int if status != 200: return get_err_response('NoSuchUpload') list_req = req.copy() list_req.upath_info = cont_path list_req.GET.clear() list_req.GET['format'] = 'json' list_req.GET['prefix'] = "%s/%s/%s/part/" % (cont_name, self.object_name, upload_id) list_req.GET['limit'] = str(max_parts + 1) if part_number_marker: list_req.GET['marker'] = "%s/%s/part/%s" % (self.object_name, upload_id, part_number_marker) resp = list_req.get_response(self.app) status = resp.status_int if status != 200: if status == 401: return get_err_response('AccessDenied') elif status == 404: return get_err_response('InvalidBucketName') else: return get_err_response('InvalidURI') objects = json.loads(resp.body) if len(objects) > max_parts: objects = objects.pop(-1) next_marker = objects[-1]['name'][object_name_prefix_len:] is_truncated = 'true' else: next_marker = '' is_truncated = 'false' if next_marker: next_marker = "%" % \ next_marker if part_number_marker: part_number_marker = "%" % \ part_number_marker parts = ''.join(("" "%s" "%sZ" "\"%s\"" "%s" "" % ( obj['name'][object_name_prefix_len:], obj['last_modified'][:-3], obj['hash'], obj['bytes']) for obj in objects)) body = ( "" "" "%s" "%s" "%s" "" "%s" "%s" "" "" "%s" "%s" "" "STANDARD" "%s%s" "%s" "%s" "%s" "" % ( self.container_name, self.object_name, upload_id, self.account_name, self.account_name, self.account_name, self.account_name, part_number_marker, next_marker, max_parts, is_truncated, parts, )) return Response(status=200, body=body, content_type='application/xml') def post_uploads_container_request(self, req, cont_path): """Method used to create a container for MPU.""" cont_req = req.copy() cont_req.method = 'PUT' cont_req.upath_info = cont_path cont_req.GET.clear() return cont_req.get_response(self.app) def post_uploads_put_meta_req(self, req, cont_path, upload_id): """Method to create a MPU metafile.""" meta_req = req.copy() meta_req.method = 'PUT' meta_req.upath_info = "%s%s/%s/meta" % (cont_path, self.object_name, upload_id) for header, value in meta_req.headers.items(): if header.lower().startswith('x-amz-meta-'): meta_req.headers['X-Object-Meta-Amz-' + header[11:]] = \ value return meta_req.get_response(self.app) def post_uploads(self, req): """ Called if POST with 'uploads' query string was received. Creates metafile which is used as a flag on uncompleted MPU. Initiates multipart upload. """ cont_name = MULTIPART_UPLOAD_PREFIX + self.container_name cont_path = "/v1/%s/%s/" % (self.account_name, cont_name) cont_req = req.copy() cont_req.method = 'HEAD' cont_req.upath_info = cont_path cont_req.GET.clear() cont_resp = cont_req.get_response(self.app) status = cont_resp.status_int if status == 404: # creating container for MPU cont_resp = self.post_uploads_container_request(req, cont_path) status = cont_resp.status_int if status not in (201, 204): if status == 401: return get_err_response('AccessDenied') elif status == 404: return get_err_response('InvalidBucketName') else: return get_err_response('InvalidURI') upload_id = uuid.uuid4().hex meta_resp = self.post_uploads_put_meta_req(req, cont_path, upload_id) status = meta_resp.status_int if status != 201: if status == 401: return get_err_response('AccessDenied') elif status == 404: return get_err_response('InvalidBucketName') else: return get_err_response('InvalidURI') body = ('' '' '%s' '%s' '%s' '' % (self.container_name, self.object_name, upload_id)) return Response(status=200, body=body, content_type='application/xml') def post_uploadId(self, req): """ Called if POST with 'uploadId' query string was received. Deletes metafile after completion of MPU. Completes multipart upload. """ upload_id = req.GET.get('uploadId') try: int(upload_id, 16) except (TypeError, ValueError): return get_err_response('InvalidURI') cont_name = MULTIPART_UPLOAD_PREFIX + self.container_name cont_path = "/v1/%s/%s/" % (self.account_name, cont_name) meta_path = "%s%s/%s/meta" % (cont_path, self.object_name, upload_id) meta_resp = meta_request_head(req, meta_path, self.app) status = meta_resp.status_int if status != 200: if status == 401: return get_err_response('AccessDenied') elif status == 404: return get_err_response('NoSuchUpload') else: return get_err_response('InvalidURI') # TODO: Validate uploaded parts. manifest_path = MULTIPART_UPLOAD_PREFIX + \ "%s/%s/%s/part/" % (self.container_name, self.object_name, upload_id) manifest_req = req.copy() manifest_req.method = 'PUT' manifest_req.GET.clear() manifest_req.headers['X-Object-Manifest'] = manifest_path for header, value in meta_resp.headers.iteritems(): if header.lower().startswith('x-object-meta-amz-'): manifest_req.headers['x-amz-meta-' + header[18:]] = value manifest_resp = manifest_req.get_response(self.app) status = manifest_resp.status_int if status == 201: finish_req = req.copy() finish_req.method = 'DELETE' finish_req.upath_info = meta_path finish_req.body = '' finish_req.GET.clear() finish_resp = finish_req.get_response(self.app) status = finish_resp.status_int if status not in (201, 204): if status == 401: return get_err_response('AccessDenied') elif status == 404: return get_err_response('InvalidBucketName') else: return get_err_response('InvalidURI') body = ('' '' '%s' '%s' '%s' '%s' '' % (self.orig_path_info, self.container_name, self.object_name, manifest_resp.headers['ETag'])) return Response(status=200, body=body, content_type='application/xml') def POST(self, req): """ Initiate and complete multipart upload. """ # any operations with multipart buckets are not allowed to user check_container_name_invalid_bucket_name_error(self.container_name) if 'uploads' in req.GET: return self.post_uploads(req) elif 'uploadId' in req.GET: return self.post_uploadId(req) return get_err_response('InvalidURI') def PUT(self, req): """ Upload part of a multipart upload. """ upload_id = req.GET.get('uploadId') part_number = req.GET.get('partNumber', '') try: int(upload_id, 16) except (TypeError, ValueError): return get_err_response('InvalidURI') if not part_number.isdigit(): return get_err_response('InvalidURI') # any operations with multipart buckets are not allowed to user check_container_name_invalid_bucket_name_error(self.container_name) cont_name = MULTIPART_UPLOAD_PREFIX + self.container_name cont_path = "/v1/%s/%s/" % (self.account_name, cont_name) meta_path = "%s%s/%s/meta" % (cont_path, self.object_name, upload_id) meta_resp = meta_request_head(req, meta_path, self.app) status = meta_resp.status_int if status != 200: return get_err_response('NoSuchUpload') req = req.copy() req.upath_info = "%s%s/%s/part/%s" % (cont_path, self.object_name, upload_id, part_number) req.GET.clear() resp = req.get_response(self.app) status = resp.status_int headers = resp.headers if status != 201: if status == 401: return get_err_response('AccessDenied') elif status == 404: return get_err_response('InvalidBucketName') else: return get_err_response('InvalidURI') if 'HTTP_X_COPY_FROM' in req.environ: body = '' \ '"%s"' \ '' % resp.headers['ETag'] return Response(status=200, body=body) return Response(status=200, etag=resp.headers['ETag']) def DELETE(self, req): """ Aborts multipart upload by uploadId. """ upload_id = req.GET.get('uploadId') try: int(upload_id, 16) except (TypeError, ValueError): return get_err_response('InvalidURI') # any operations with multipart buckets are not allowed to user check_container_name_no_such_bucket_error(self.container_name) cont_name = MULTIPART_UPLOAD_PREFIX + self.container_name cont_path = "/v1/%s/%s/" % (self.account_name, cont_name) prefix = "%s/%s/" % (self.object_name, upload_id) list_req = req.copy_get() list_req.upath_info = cont_path list_req.GET.clear() list_req.GET['format'] = 'json' list_req.GET['prefix'] = prefix list_resp = list_req.get_response(self.app) status = list_resp.status_int if status != 200: if status == 401: return get_err_response('AccessDenied') elif status == 404: return get_err_response('InvalidBucketName') else: return get_err_response('InvalidURI') objects = json.loads(list_resp.body) for obj in objects: obj_req = req.copy() obj_req.method = 'DELETE' obj_req.upath_info = "%s%s" % (cont_path, obj['name']) obj_req.GET.clear() obj_resp = obj_req.get_response(self.app) status = obj_resp.status_int if status not in (200, 204): if status == 401: return get_err_response('AccessDenied') elif status == 404: return get_err_response('NoSuchKey') else: return get_err_response('InvalidURI') return Response(status=204) class ObjectController(NormalObjectController, MultiPartObjectController): """Manages requests on normal and multipart objects""" def __init__(self, *args, **kwargs): MultiPartObjectController.__init__(self, *args, **kwargs) def GET(self, req): if 'uploadId' in req.GET: return MultiPartObjectController.GET(self, req) return NormalObjectController.GET(self, req) def PUT(self, req): if 'uploadId' in req.GET: return MultiPartObjectController.PUT(self, req) return NormalObjectController.PUT(self, req) def POST(self, req): if 'uploadId' in req.GET or 'uploads' in req.GET: return MultiPartObjectController.POST(self, req) return NormalObjectController.POST(self, req) def DELETE(self, req): if 'uploadId' in req.GET: return MultiPartObjectController.DELETE(self, req) obj_req = req.copy_get() obj_req.method = 'HEAD' obj_req.GET.clear() obj_resp = obj_req.get_response(self.app) status = obj_resp.status_int if status == 200 and 'X-Object-Manifest' in obj_resp.headers: manifest = obj_resp.headers['X-Object-Manifest'] upload_id = manifest.split('/')[2] del_req = req.copy() del_req.GET['uploadId'] = upload_id MultiPartObjectController.DELETE(self, del_req) return NormalObjectController.DELETE(self, req) class Swift3Middleware(object): """Swift3 S3 compatibility midleware""" def __init__(self, app, conf, *args, **kwargs): self.app = app def get_controller(self, path, params): container, obj = split_path(path, 0, 2, True) d = dict(container_name=container, object_name=obj) if container and obj: return ObjectController, d elif container: return BucketController, d return ServiceController, d def __call__(self, env, start_response): req = Request(env) if 'AWSAccessKeyId' in req.GET: try: req.headers['Date'] = req.GET['Expires'] req.headers['Authorization'] = \ 'AWS %(AWSAccessKeyId)s:%(Signature)s' % req.GET except KeyError: return get_err_response('InvalidArgument')(env, start_response) if not 'Authorization' in req.headers: return self.app(env, start_response) try: account, signature = \ req.headers['Authorization'].split(' ')[-1].rsplit(':', 1) except Exception: return get_err_response('InvalidArgument')(env, start_response) try: controller, path_parts = self.get_controller(req.path, req.GET) except ValueError: return get_err_response('InvalidURI')(env, start_response) token = base64.urlsafe_b64encode(canonical_string(req)) controller = controller(req.environ, self.app, account, token, **path_parts) if hasattr(controller, req.method): res = getattr(controller, req.method)(req) else: return get_err_response('InvalidURI')(env, start_response) return res(env, start_response) def filter_factory(global_conf, **local_conf): """Standard filter factory to use the middleware with paste.deploy""" conf = global_conf.copy() conf.update(local_conf) def swifts3_filter(app): return Swift3Middleware(app, conf) return swifts3_filter