# encoding=utf-8
import threading
import unittest
import tornado.ioloop
import tornado.testing
import tornado.web
from terroroftinytown.client.scraper import Scraper
from terroroftinytown.client.errors import ScraperError
class ExampleApp(tornado.web.Application):
def __init__(self):
tornado.web.Application.__init__(self,
[
(r'/([a-zA-Z0-9]+)', ExampleHandler)
],
debug=True)
class ExampleHandler(tornado.web.RequestHandler):
def get(self, shortcode):
if shortcode == 'a':
self.redirect('http://archive.land', status=301)
elif shortcode == 'b':
self.write(b'
Please watch this ad.')
self.write(b'
.')
self.write(b'continue')
elif shortcode == 'd':
self.set_status(420, 'banned')
elif shortcode == 'e':
self.redirect('/404.php', status=301)
else:
self.redirect('http://example.com', status=303)
def head(self, shortcode):
self.get(shortcode)
class IOLoopThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.daemon = True
self.io_loop = tornado.ioloop.IOLoop()
def run(self):
self.io_loop.start()
def stop(self):
self.io_loop.add_callback(self.io_loop.stop)
class TestTracker(unittest.TestCase):
def setUp(self):
self.io_loop_thread = IOLoopThread()
app = ExampleApp()
socket_obj, self.port = tornado.testing.bind_unused_port()
http_server = tornado.httpserver.HTTPServer(
app, io_loop=self.io_loop_thread.io_loop
)
http_server.add_socket(socket_obj)
self.io_loop_thread.start()
def tearDown(self):
self.io_loop_thread.stop()
def get_url(self, path):
return 'http://localhost:{0}{1}'.format(self.port, path)
def test_scraper(self):
scraper = Scraper(
{
'alphabet': 'abcdefghijklmnopqrstuvwxyz',
'url_template': self.get_url('/{shortcode}'),
'request_delay': 0.1,
'redirect_codes': [301, 200],
'no_redirect_codes': [303],
'unavailable_codes': [],
'banned_codes': [420],
'body_regex': r'id="contlink" href="([^"]+)',
'location_anti_regex': r'^/404.php$',
'custom_code_required': False,
'method': 'get',
'name': 'blah',
},
[0, 1, 2, 4]
)
scraper.run()
self.assertEqual(2, len(scraper.results))
self.assertEqual('http://archive.land', scraper.results['a']['url'])
self.assertEqual('http://yahoo.city', scraper.results['b']['url'])
def test_scraper_banned(self):
scraper = Scraper(
{
'alphabet': 'abcdefghijklmnopqrstuvwxyz',
'url_template': self.get_url('/{shortcode}'),
'request_delay': 0.1,
'redirect_codes': [301, 200],
'no_redirect_codes': [303],
'unavailable_codes': [],
'banned_codes': [420],
'body_regex': r'id="contlink" href="([^"]+)',
'custom_code_required': False,
'method': 'get',
'name': 'blah',
},
[3],
max_try_count=1
)
try:
scraper.run()
except ScraperError:
pass
else:
self.fail()