1234567891011121314151617181920212223 |
- def create_kafka_consumer(self):
- """docstring for create_kafka_consumer"""
- try:
- if isinstance(self.topic, list):
- self.consumer = KafkaConsumer(
- group_id=self.group,
- bootstrap_servers=self.bootstrap_servers,
- enable_auto_commit=self.enable_auto_commit,
- auto_offset_reset=self.start_point
- )
- self.consumer.subscribe(self.topic)
- else:
- self.consumer = KafkaConsumer(
- self.topic,
- group_id=self.group,
- bootstrap_servers=self.bootstrap_servers,
- enable_auto_commit=self.enable_auto_commit,
- auto_offset_reset=self.start_point)
- except Exception as e:
- LOGGER.critical(
- "Can't create a consumer with {0}".format(e), exc_info=e)
- raise e
|