1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859 |
- # -*- coding: utf-8 -*-
- from scrapy.utils.serialize import ScrapyJSONEncoder
- from kafka.client import KafkaClient
- from kafka.producer import SimpleProducer
- class KafkaPipeline(object):
- """
- Publishes a serialized item into a Kafka topic
- :param producer: The Kafka producer
- :type producer: kafka.producer.Producer
- :param topic: The Kafka topic being used
- :type topic: str or unicode
- """
- def __init__(self, producer, topic):
- """
- :type producer: kafka.producer.Producer
- :type topic: str or unicode
- """
- self.producer = producer
- self.topic = topic
- self.encoder = ScrapyJSONEncoder()
- def process_item(self, item, spider):
- """
- Overriden method to process the item
- :param item: Item being passed
- :type item: scrapy.item.Item
- :param spider: The current spider being used
- :type spider: scrapy.spider.Spider
- """
- # put spider name in item
- item = dict(item)
- item['spider'] = spider.name
- msg = self.encoder.encode(item)
- self.producer.send_messages(self.topic, msg)
- @classmethod
- def from_settings(cls, settings):
- """
- :param settings: the current Scrapy settings
- :type settings: scrapy.settings.Settings
- :rtype: A :class:`~KafkaPipeline` instance
- """
- k_hosts = settings.get('SCRAPY_KAFKA_HOSTS', ['localhost:9092'])
- topic = settings.get('SCRAPY_KAFKA_ITEM_PIPELINE_TOPIC', 'scrapy_kafka_item')
- kafka = KafkaClient(k_hosts)
- conn = SimpleProducer(kafka)
- return cls(conn, topic)
|