test.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121
  1. # encoding=utf-8
  2. import threading
  3. import unittest
  4. import tornado.ioloop
  5. import tornado.testing
  6. import tornado.web
  7. from terroroftinytown.client.scraper import Scraper
  8. from terroroftinytown.client.errors import ScraperError
  9. class ExampleApp(tornado.web.Application):
  10. def __init__(self):
  11. tornado.web.Application.__init__(self,
  12. [
  13. (r'/([a-zA-Z0-9]+)', ExampleHandler)
  14. ],
  15. debug=True)
  16. class ExampleHandler(tornado.web.RequestHandler):
  17. def get(self, shortcode):
  18. if shortcode == 'a':
  19. self.redirect('http://archive.land', status=301)
  20. elif shortcode == 'b':
  21. self.write(b'<html><body>Please watch this ad.')
  22. self.write(b'<img><a id="contlink" href="http://yahoo.city">.')
  23. self.write(b'continue</a></html><body>')
  24. elif shortcode == 'd':
  25. self.set_status(420, 'banned')
  26. elif shortcode == 'e':
  27. self.redirect('/404.php', status=301)
  28. else:
  29. self.redirect('http://example.com', status=303)
  30. def head(self, shortcode):
  31. self.get(shortcode)
  32. class IOLoopThread(threading.Thread):
  33. def __init__(self):
  34. threading.Thread.__init__(self)
  35. self.daemon = True
  36. self.io_loop = tornado.ioloop.IOLoop()
  37. def run(self):
  38. self.io_loop.start()
  39. def stop(self):
  40. self.io_loop.add_callback(self.io_loop.stop)
  41. class TestTracker(unittest.TestCase):
  42. def setUp(self):
  43. self.io_loop_thread = IOLoopThread()
  44. app = ExampleApp()
  45. socket_obj, self.port = tornado.testing.bind_unused_port()
  46. http_server = tornado.httpserver.HTTPServer(
  47. app, io_loop=self.io_loop_thread.io_loop
  48. )
  49. http_server.add_socket(socket_obj)
  50. self.io_loop_thread.start()
  51. def tearDown(self):
  52. self.io_loop_thread.stop()
  53. def get_url(self, path):
  54. return 'http://localhost:{0}{1}'.format(self.port, path)
  55. def test_scraper(self):
  56. scraper = Scraper(
  57. {
  58. 'alphabet': 'abcdefghijklmnopqrstuvwxyz',
  59. 'url_template': self.get_url('/{shortcode}'),
  60. 'request_delay': 0.1,
  61. 'redirect_codes': [301, 200],
  62. 'no_redirect_codes': [303],
  63. 'unavailable_codes': [],
  64. 'banned_codes': [420],
  65. 'body_regex': r'id="contlink" href="([^"]+)',
  66. 'location_anti_regex': r'^/404.php$',
  67. 'custom_code_required': False,
  68. 'method': 'get',
  69. 'name': 'blah',
  70. },
  71. [0, 1, 2, 4]
  72. )
  73. scraper.run()
  74. self.assertEqual(2, len(scraper.results))
  75. self.assertEqual('http://archive.land', scraper.results['a']['url'])
  76. self.assertEqual('http://yahoo.city', scraper.results['b']['url'])
  77. def test_scraper_banned(self):
  78. scraper = Scraper(
  79. {
  80. 'alphabet': 'abcdefghijklmnopqrstuvwxyz',
  81. 'url_template': self.get_url('/{shortcode}'),
  82. 'request_delay': 0.1,
  83. 'redirect_codes': [301, 200],
  84. 'no_redirect_codes': [303],
  85. 'unavailable_codes': [],
  86. 'banned_codes': [420],
  87. 'body_regex': r'id="contlink" href="([^"]+)',
  88. 'custom_code_required': False,
  89. 'method': 'get',
  90. 'name': 'blah',
  91. },
  92. [3],
  93. max_try_count=1
  94. )
  95. try:
  96. scraper.run()
  97. except ScraperError:
  98. pass
  99. else:
  100. self.fail()