|
- """
- Representation and parsing of HTTP-style status + headers
- """
- from six.moves import range
- from six import iteritems
- from warcio.utils import to_native_str, headers_to_str_headers
- import uuid
- from six.moves.urllib.parse import quote
- import re
- #=================================================================
- class StatusAndHeaders(object):
- ENCODE_HEADER_RX = re.compile(r'[=]["\']?([^;"]+)["\']?(?=[;]?)')
- """
- Representation of parsed http-style status line and headers
- Status Line if first line of request/response
- Headers is a list of (name, value) tuples
- An optional protocol which appears on first line may be specified
- If is_http_request is true, split http verb (instead of protocol) from start of statusline
- """
- def __init__(self, statusline, headers, protocol='', total_len=0, is_http_request=False):
- if is_http_request:
- protocol, statusline = statusline.split(' ', 1)
- self.statusline = statusline
- self.headers = headers_to_str_headers(headers)
- self.protocol = protocol
- self.total_len = total_len
- self.headers_buff = None
- def get_header(self, name, default_value=None):
- """
- return header (name, value)
- if found
- """
- name_lower = name.lower()
- for value in self.headers:
- if value[0].lower() == name_lower:
- return value[1]
- return default_value
- def add_header(self, name, value):
- self.headers.append((name, value))
- def replace_header(self, name, value):
- """
- replace header with new value or add new header
- return old header value, if any
- """
- name_lower = name.lower()
- for index in range(len(self.headers) - 1, -1, -1):
- curr_name, curr_value = self.headers[index]
- if curr_name.lower() == name_lower:
- self.headers[index] = (curr_name, value)
- return curr_value
- self.headers.append((name, value))
- return None
- def remove_header(self, name):
- """
- Remove header (case-insensitive)
- return True if header removed, False otherwise
- """
- name_lower = name.lower()
- for index in range(len(self.headers) - 1, -1, -1):
- if self.headers[index][0].lower() == name_lower:
- del self.headers[index]
- return True
- return False
- def get_statuscode(self):
- """
- Return the statuscode part of the status response line
- (Assumes no protocol in the statusline)
- """
- code = self.statusline.split(' ', 1)[0]
- return code
- def validate_statusline(self, valid_statusline):
- """
- Check that the statusline is valid, eg. starts with a numeric
- code. If not, replace with passed in valid_statusline
- """
- code = self.get_statuscode()
- try:
- code = int(code)
- assert(code > 0)
- return True
- except(ValueError, AssertionError):
- self.statusline = valid_statusline
- return False
- def add_range(self, start, part_len, total_len):
- """
- Add range headers indicating that this a partial response
- """
- content_range = 'bytes {0}-{1}/{2}'.format(start,
- start + part_len - 1,
- total_len)
- self.statusline = '206 Partial Content'
- self.replace_header('Content-Range', content_range)
- self.replace_header('Content-Length', str(part_len))
- self.replace_header('Accept-Ranges', 'bytes')
- return self
- def compute_headers_buffer(self, header_filter=None):
- """
- Set buffer representing headers
- """
- # HTTP headers %-encoded as ascii (see to_ascii_bytes for more info)
- self.headers_buff = self.to_ascii_bytes(header_filter)
- def __repr__(self):
- return "StatusAndHeaders(protocol = '{0}', statusline = '{1}', \
- headers = {2})".format(self.protocol, self.statusline, self.headers)
- def __ne__(self, other):
- return not (self == other)
- def __eq__(self, other):
- if not other:
- return False
- return (self.statusline == other.statusline and
- self.headers == other.headers and
- self.protocol == other.protocol)
- def __str__(self, exclude_list=None):
- return self.to_str(exclude_list)
- def __bool__(self):
- return bool(self.statusline or self.headers)
- __nonzero__ = __bool__
- def to_str(self, filter_func=None):
- string = self.protocol
- if string and self.statusline:
- string += ' '
- if self.statusline:
- string += self.statusline
- if string:
- string += '\r\n'
- for h in self.headers:
- if filter_func:
- h = filter_func(h)
- if not h:
- continue
- string += ': '.join(h) + '\r\n'
- return string
- def to_bytes(self, filter_func=None, encoding='utf-8'):
- return self.to_str(filter_func).encode(encoding) + b'\r\n'
- def to_ascii_bytes(self, filter_func=None):
- """ Attempt to encode the headers block as ascii
- If encoding fails, call percent_encode_non_ascii_headers()
- to encode any headers per RFCs
- """
- try:
- string = self.to_str(filter_func)
- string = string.encode('ascii')
- except (UnicodeEncodeError, UnicodeDecodeError):
- self.percent_encode_non_ascii_headers()
- string = self.to_str(filter_func)
- string = string.encode('ascii')
- return string + b'\r\n'
- def percent_encode_non_ascii_headers(self, encoding='UTF-8'):
- """ Encode any headers that are not plain ascii
- as UTF-8 as per:
- https://tools.ietf.org/html/rfc8187#section-3.2.3
- https://tools.ietf.org/html/rfc5987#section-3.2.2
- """
- def do_encode(m):
- return "*={0}''".format(encoding) + quote(to_native_str(m.group(1)))
- for index in range(len(self.headers) - 1, -1, -1):
- curr_name, curr_value = self.headers[index]
- try:
- # test if header is ascii encodable, no action needed
- curr_value.encode('ascii')
- except:
- # if single value header, (eg. no ';'), %-encode entire header
- if ';' not in curr_value:
- new_value = quote(curr_value)
- else:
- # %-encode value in ; name="value"
- new_value = self.ENCODE_HEADER_RX.sub(do_encode, curr_value)
- if new_value == curr_value:
- new_value = quote(curr_value)
- self.headers[index] = (curr_name, new_value)
- # act like a (case-insensitive) dictionary of headers, much like other
- # python http headers apis including http.client.HTTPMessage
- # and requests.structures.CaseInsensitiveDict
- get = get_header
- __getitem__ = get_header
- __setitem__ = replace_header
- __delitem__ = remove_header
- def __contains__(self, key):
- return bool(self[key])
- #=================================================================
- def _strip_count(string, total_read):
- length = len(string)
- return string.rstrip(), total_read + length
- #=================================================================
- class StatusAndHeadersParser(object):
- """
- Parser which consumes a stream support readline() to read
- status and headers and return a StatusAndHeaders object
- """
- def __init__(self, statuslist, verify=True):
- self.statuslist = statuslist
- self.verify = verify
- def parse(self, stream, full_statusline=None):
- """
- parse stream for status line and headers
- return a StatusAndHeaders object
- support continuation headers starting with space or tab
- """
- # status line w newlines intact
- if full_statusline is None:
- full_statusline = stream.readline()
- full_statusline = self.decode_header(full_statusline)
- statusline, total_read = _strip_count(full_statusline, 0)
- headers = []
- # at end of stream
- if total_read == 0:
- raise EOFError()
- elif not statusline:
- return StatusAndHeaders(statusline=statusline,
- headers=headers,
- protocol='',
- total_len=total_read)
- # validate only if verify is set
- if self.verify:
- protocol_status = self.split_prefix(statusline, self.statuslist)
- if not protocol_status:
- msg = 'Expected Status Line starting with {0} - Found: {1}'
- msg = msg.format(self.statuslist, statusline)
- raise StatusAndHeadersParserException(msg, full_statusline)
- else:
- protocol_status = statusline.split(' ', 1)
- line, total_read = _strip_count(self.decode_header(stream.readline()), total_read)
- while line:
- result = line.split(':', 1)
- if len(result) == 2:
- name = result[0].rstrip(' \t')
- value = result[1].lstrip()
- else:
- name = result[0]
- value = None
- next_line, total_read = _strip_count(self.decode_header(stream.readline()),
- total_read)
- # append continuation lines, if any
- while next_line and next_line.startswith((' ', '\t')):
- if value is not None:
- value += next_line
- next_line, total_read = _strip_count(self.decode_header(stream.readline()),
- total_read)
- if value is not None:
- header = (name, value)
- headers.append(header)
- line = next_line
- if len(protocol_status) > 1:
- statusline = protocol_status[1].strip()
- else:
- statusline = ''
- return StatusAndHeaders(statusline=statusline,
- headers=headers,
- protocol=protocol_status[0],
- total_len=total_read)
- @staticmethod
- def split_prefix(key, prefixs):
- """
- split key string into prefix and remainder
- for first matching prefix from a list
- """
- key_upper = key.upper()
- for prefix in prefixs:
- if key_upper.startswith(prefix):
- plen = len(prefix)
- return (key_upper[:plen], key[plen:])
- @staticmethod
- def make_warc_id(id_=None):
- if not id_:
- id_ = uuid.uuid4()
- return '<urn:uuid:{0}>'.format(id_)
- @staticmethod
- def decode_header(line):
- try:
- # attempt to decode as utf-8 first
- return to_native_str(line, 'utf-8')
- except:
- # if fails, default to ISO-8859-1
- return to_native_str(line, 'iso-8859-1')
- #=================================================================
- class StatusAndHeadersParserException(Exception):
- """
- status + headers parsing exception
- """
- def __init__(self, msg, statusline):
- super(StatusAndHeadersParserException, self).__init__(msg)
- self.statusline = statusline
|