1234567891011121314151617181920212223242526272829303132333435363738 |
- def consume(self):
- while True:
- try:
- for message in self.consumer:
- try:
- LOGGER.debug(f"We have recieved {message}")
- ml = MessageLoader(self.topic)
- ml.load_kafka_message(message)
- ret_val = ml.return_message()
- LOGGER.info(f"Channel: {self.topic}, "
- f"Received: {ret_val._message_data.uuid} "
- f"at {ret_val._message_data.timestamp} "
- f"offset {ret_val._message_data.offset}, "
- f"partition {ret_val._message_data.partition}"
- )
- yield ret_val
- except Exception as e:
- LOGGER.critical(f"{self} failed to consume message {message} on channel {message.topic} failed with: {e}",
- exc_info=True)
- raise e
- except Exception as e:
- if self.consumer is not None:
- try:
- self.commit()
- self.close()
- except Exception as e:
- LOGGER.critical(
- "Couldn't close/commit consumer in exception due to {} trying to re-create".format(e))
- self.create_kafka_consumer()
- LOGGER.critical(
- "consuming message failed with: ", exc_info=True)
- else:
- raise MessagingExceptionCritical(
- "Kafka has gone away, exiting running code, tried creating new consumer and failed")
- finally:
- sleep(0.0001)
|