broker_4.py 875 B

1234567891011121314151617181920212223
  1. def create_kafka_consumer(self):
  2. """docstring for create_kafka_consumer"""
  3. try:
  4. if isinstance(self.topic, list):
  5. self.consumer = KafkaConsumer(
  6. group_id=self.group,
  7. bootstrap_servers=self.bootstrap_servers,
  8. enable_auto_commit=self.enable_auto_commit,
  9. auto_offset_reset=self.start_point
  10. )
  11. self.consumer.subscribe(self.topic)
  12. else:
  13. self.consumer = KafkaConsumer(
  14. self.topic,
  15. group_id=self.group,
  16. bootstrap_servers=self.bootstrap_servers,
  17. enable_auto_commit=self.enable_auto_commit,
  18. auto_offset_reset=self.start_point)
  19. except Exception as e:
  20. LOGGER.critical(
  21. "Can't create a consumer with {0}".format(e), exc_info=e)
  22. raise e