spiders.py 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103
  1. # -*- coding: utf-8 -*-
  2. from scrapy import signals
  3. from scrapy.exceptions import DontCloseSpider
  4. from scrapy.spiders import Spider
  5. from kafka.client import KafkaClient
  6. from kafka.consumer import SimpleConsumer
  7. class KafkaSpiderMixin(object):
  8. """
  9. Mixin class to implement reading urls from a kafka queue.
  10. :type kafka_topic: str
  11. """
  12. kafka_topic = None
  13. def process_kafka_message(self, message):
  14. """"
  15. Tell this spider how to extract urls from a kafka message
  16. :param message: A Kafka message object
  17. :type message: kafka.common.OffsetAndMessage
  18. :rtype: str or None
  19. """
  20. if not message:
  21. return None
  22. return message.message.value
  23. def setup_kafka(self, settings):
  24. """Setup redis connection and idle signal.
  25. This should be called after the spider has set its crawler object.
  26. :param settings: The current Scrapy settings being used
  27. :type settings: scrapy.settings.Settings
  28. """
  29. if not hasattr(self, 'topic') or not self.topic:
  30. self.topic = '%s-starturls' % self.name
  31. hosts = settings.get('SCRAPY_KAFKA_HOSTS', ['localhost:9092'])
  32. consumer_group = settings.get('SCRAPY_KAFKA_SPIDER_CONSUMER_GROUP', 'scrapy-kafka')
  33. _kafka = KafkaClient(hosts)
  34. # wait at most 1sec for more messages. Otherwise continue
  35. self.consumer = SimpleConsumer(_kafka, consumer_group, self.topic,
  36. auto_commit=True, iter_timeout=1.0)
  37. # idle signal is called when the spider has no requests left,
  38. # that's when we will schedule new requests from kafka topic
  39. self.crawler.signals.connect(self.spider_idle, signal=signals.spider_idle)
  40. self.crawler.signals.connect(self.item_scraped, signal=signals.item_scraped)
  41. self.log("Reading URLs from kafka topic '%s'" % self.kafka_topic)
  42. def next_request(self):
  43. """
  44. Returns a request to be scheduled.
  45. :rtype: str or None
  46. """
  47. message = self.consumer.get_message(True)
  48. url = self.process_kafka_message(message)
  49. if not url:
  50. return None
  51. return self.make_requests_from_url(url)
  52. def schedule_next_request(self):
  53. """Schedules a request if available"""
  54. req = self.next_request()
  55. if req:
  56. self.crawler.engine.crawl(req, spider=self)
  57. def spider_idle(self):
  58. """Schedules a request if available, otherwise waits."""
  59. self.schedule_next_request()
  60. raise DontCloseSpider
  61. def item_scraped(self, *args, **kwargs):
  62. """Avoids waiting for the spider to idle before scheduling the next request"""
  63. self.schedule_next_request()
  64. class ListeningKafkaSpider(KafkaSpiderMixin, Spider):
  65. """
  66. Spider that reads urls from a kafka topic when idle.
  67. This spider will exit only if stopped, otherwise it keeps
  68. listening to messages on the given topic
  69. Specify the topic to listen to by setting the spider's `kafka_topic`.
  70. Messages are assumed to be URLS, one by message. To do custom
  71. processing of kafka messages, override the spider's `process_kafka_message`
  72. method
  73. """
  74. def _set_crawler(self, crawler):
  75. """
  76. :type crawler: scrapy.crawler.Crawler
  77. """
  78. super(ListeningKafkaSpider, self)._set_crawler(crawler)
  79. self.setup_kafka(crawler.settings)