123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103 |
- # -*- coding: utf-8 -*-
- from scrapy import signals
- from scrapy.exceptions import DontCloseSpider
- from scrapy.spiders import Spider
- from kafka.client import KafkaClient
- from kafka.consumer import SimpleConsumer
- class KafkaSpiderMixin(object):
- """
- Mixin class to implement reading urls from a kafka queue.
- :type kafka_topic: str
- """
- kafka_topic = None
- def process_kafka_message(self, message):
- """"
- Tell this spider how to extract urls from a kafka message
- :param message: A Kafka message object
- :type message: kafka.common.OffsetAndMessage
- :rtype: str or None
- """
- if not message:
- return None
- return message.message.value
- def setup_kafka(self, settings):
- """Setup redis connection and idle signal.
- This should be called after the spider has set its crawler object.
- :param settings: The current Scrapy settings being used
- :type settings: scrapy.settings.Settings
- """
- if not hasattr(self, 'topic') or not self.topic:
- self.topic = '%s-starturls' % self.name
- hosts = settings.get('SCRAPY_KAFKA_HOSTS', ['localhost:9092'])
- consumer_group = settings.get('SCRAPY_KAFKA_SPIDER_CONSUMER_GROUP', 'scrapy-kafka')
- _kafka = KafkaClient(hosts)
- # wait at most 1sec for more messages. Otherwise continue
- self.consumer = SimpleConsumer(_kafka, consumer_group, self.topic,
- auto_commit=True, iter_timeout=1.0)
- # idle signal is called when the spider has no requests left,
- # that's when we will schedule new requests from kafka topic
- self.crawler.signals.connect(self.spider_idle, signal=signals.spider_idle)
- self.crawler.signals.connect(self.item_scraped, signal=signals.item_scraped)
- self.log("Reading URLs from kafka topic '%s'" % self.kafka_topic)
- def next_request(self):
- """
- Returns a request to be scheduled.
- :rtype: str or None
- """
- message = self.consumer.get_message(True)
- url = self.process_kafka_message(message)
- if not url:
- return None
- return self.make_requests_from_url(url)
- def schedule_next_request(self):
- """Schedules a request if available"""
- req = self.next_request()
- if req:
- self.crawler.engine.crawl(req, spider=self)
- def spider_idle(self):
- """Schedules a request if available, otherwise waits."""
- self.schedule_next_request()
- raise DontCloseSpider
- def item_scraped(self, *args, **kwargs):
- """Avoids waiting for the spider to idle before scheduling the next request"""
- self.schedule_next_request()
- class ListeningKafkaSpider(KafkaSpiderMixin, Spider):
- """
- Spider that reads urls from a kafka topic when idle.
- This spider will exit only if stopped, otherwise it keeps
- listening to messages on the given topic
- Specify the topic to listen to by setting the spider's `kafka_topic`.
- Messages are assumed to be URLS, one by message. To do custom
- processing of kafka messages, override the spider's `process_kafka_message`
- method
- """
- def _set_crawler(self, crawler):
- """
- :type crawler: scrapy.crawler.Crawler
- """
- super(ListeningKafkaSpider, self)._set_crawler(crawler)
- self.setup_kafka(crawler.settings)
|