123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186 |
- # encoding=utf-8
- '''Base class for URL shortener services'''
- import datetime
- import logging
- import re
- import sys
- import time
- from requests.exceptions import ConnectionError
- from terroroftinytown.client import alphabet, VERSION
- from terroroftinytown.client.errors import (UnhandledStatusCode,
- UnexpectedNoResult, ScraperError, PleaseRetry, MalformedResponse)
- from terroroftinytown.services.status import URLStatus
- from terroroftinytown.six.moves import html_parser
- import terroroftinytown
- __all__ = ['BaseService', 'registry']
- DEFAULT_USER_AGENT = (
- 'URLTeam TerrorOfTinyTown/{version} (ArchiveTeam; '
- '+http://archiveteam.org/index.php?title=URLTeam/Appeal)'
- ).format(version=VERSION)
- class BaseService(object):
- def __init__(self, params):
- self.params = params
- self.logger = logging.getLogger(self.__class__.__name__)
- self.current_shortcode = None
- self.user_agent = DEFAULT_USER_AGENT
- self.tolerate_missing_location_header = bool(
- self.params.get('location_anti_regex') and \
- re.search(self.params['location_anti_regex'], ''))
- def prepare(self):
- pass
- def wait(self):
- sleep_time = self.params['request_delay']
- time.sleep(sleep_time)
- def transform_sequence_num(self, sequence_number):
- return alphabet.int_to_str(
- sequence_number, self.params['alphabet']
- )
- def scrape_one(self, sequence_number):
- self.current_shortcode = shortcode = self.transform_sequence_num(sequence_number)
- url = self.params['url_template'].format(shortcode=shortcode)
- self.logger.info('Requesting %s', url)
- response = self.fetch_url(url)
- url_status, result_url, encoding = self.process_response(response)
- if url_status == URLStatus.ok:
- assert result_url is not None
- self.logger.info('Got a result.')
- self.logger.debug('%s %s', result_url, response.encoding)
- return {
- 'shortcode': shortcode,
- 'url': result_url,
- 'encoding': encoding or 'latin-1'
- }
- def fetch_url(self, url, method=None):
- # this import is moved here so that tracker can import
- # registry without installing requests
- import requests
- assert method in (None, 'get', 'head'), method
- headers = {
- 'User-Agent': self.user_agent,
- }
- try:
- if method == 'get' or self.params['method'] == 'get':
- response = requests.get(
- url, allow_redirects=False, headers=headers, timeout=60)
- else:
- response = requests.head(
- url, allow_redirects=False, headers=headers, timeout=60)
- except (ConnectionError, ValueError) as e:
- return self.process_connection_error(e)
- return response
- def process_response(self, response):
- status_code = response.status_code
- if status_code in self.params['redirect_codes']:
- return self.process_redirect(response)
- elif status_code in self.params['no_redirect_codes']:
- return self.process_no_redirect(response)
- elif status_code in self.params['unavailable_codes']:
- return self.process_unavailable(response)
- elif status_code in self.params['banned_codes']:
- return self.process_banned(response)
- else:
- return self.process_unknown_code(response)
- def process_redirect(self, response):
- if 'Location' in response.headers:
- result_url = response.headers['Location']
- if sys.version_info[0] == 2 and \
- isinstance(result_url, terroroftinytown.six.binary_type):
- # Headers are treated as latin-1
- # This is needed so that unit tests don't need to
- # do implicit unicode conversion. Ick!
- result_url = result_url.decode('latin-1')
- response.content # read the response to allow connection reuse
- return self.check_anti_regex(response, result_url, None)
- elif self.params.get('body_regex'):
- return self.process_redirect_body(response)
- elif self.tolerate_missing_location_header:
- response.content # read the response to allow connection reuse
- return self.process_no_redirect(response)
- else:
- response.content # read the response to allow connection reuse
- raise UnexpectedNoResult(
- 'Unexpectedly did not get a redirect result for {0}'
- .format(repr(response.url))
- )
- def process_redirect_body(self, response):
- pattern = self.params['body_regex']
- match = re.search(pattern, html_unescape(response.text))
- if match:
- return self.check_anti_regex(response, match.group(1), response.encoding)
- else:
- raise UnexpectedNoResult(
- 'Unexpectedly did not get a body result for {0}'
- .format(repr(response.url))
- )
- def process_no_redirect(self, response):
- return (URLStatus.not_found, None, None)
- def process_unavailable(self, response):
- return (URLStatus.unavailable, None, None)
- def process_banned(self, response):
- raise PleaseRetry('Server said: {0}'.format(repr(response.reason)))
- def process_unknown_code(self, response):
- raise UnhandledStatusCode(
- 'Unknown status code {0} for {1}'.format(response.status_code,
- repr(response.url))
- )
- def process_connection_error(self, exception):
- ex_args = repr(exception.args)
- if 'ProtocolError' in ex_args or 'Invalid IPv6 URL' in ex_args:
- raise MalformedResponse(
- 'Malformed response: {0}'.format(ex_args))
- else:
- raise PleaseRetry('Connection error: {0}'.format(ex_args))
- def check_anti_regex(self, response, result_url, encoding):
- if not result_url or self.matches_anti_regex(result_url):
- return self.process_no_redirect(response)
- else:
- return (URLStatus.ok, result_url, encoding)
- def matches_anti_regex(self, result_url):
- anti_regex = self.params.get('location_anti_regex')
- return (anti_regex and re.search(anti_regex, result_url))
- class DefaultService(BaseService):
- pass
- _html_parser_unescaper = html_parser.HTMLParser()
- def html_unescape(text):
- return _html_parser_unescaper.unescape(text)
|