pipelines.py 1.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859
  1. # -*- coding: utf-8 -*-
  2. from scrapy.utils.serialize import ScrapyJSONEncoder
  3. from kafka.client import KafkaClient
  4. from kafka.producer import SimpleProducer
  5. class KafkaPipeline(object):
  6. """
  7. Publishes a serialized item into a Kafka topic
  8. :param producer: The Kafka producer
  9. :type producer: kafka.producer.Producer
  10. :param topic: The Kafka topic being used
  11. :type topic: str or unicode
  12. """
  13. def __init__(self, producer, topic):
  14. """
  15. :type producer: kafka.producer.Producer
  16. :type topic: str or unicode
  17. """
  18. self.producer = producer
  19. self.topic = topic
  20. self.encoder = ScrapyJSONEncoder()
  21. def process_item(self, item, spider):
  22. """
  23. Overriden method to process the item
  24. :param item: Item being passed
  25. :type item: scrapy.item.Item
  26. :param spider: The current spider being used
  27. :type spider: scrapy.spider.Spider
  28. """
  29. # put spider name in item
  30. item = dict(item)
  31. item['spider'] = spider.name
  32. msg = self.encoder.encode(item)
  33. self.producer.send_messages(self.topic, msg)
  34. @classmethod
  35. def from_settings(cls, settings):
  36. """
  37. :param settings: the current Scrapy settings
  38. :type settings: scrapy.settings.Settings
  39. :rtype: A :class:`~KafkaPipeline` instance
  40. """
  41. k_hosts = settings.get('SCRAPY_KAFKA_HOSTS', ['localhost:9092'])
  42. topic = settings.get('SCRAPY_KAFKA_ITEM_PIPELINE_TOPIC', 'scrapy_kafka_item')
  43. kafka = KafkaClient(k_hosts)
  44. conn = SimpleProducer(kafka)
  45. return cls(conn, topic)