broker_6.py 1.7 KB

1234567891011121314151617181920212223242526272829303132333435363738
  1. def consume(self):
  2. while True:
  3. try:
  4. for message in self.consumer:
  5. try:
  6. LOGGER.debug(f"We have recieved {message}")
  7. ml = MessageLoader(self.topic)
  8. ml.load_kafka_message(message)
  9. ret_val = ml.return_message()
  10. LOGGER.info(f"Channel: {self.topic}, "
  11. f"Received: {ret_val._message_data.uuid} "
  12. f"at {ret_val._message_data.timestamp} "
  13. f"offset {ret_val._message_data.offset}, "
  14. f"partition {ret_val._message_data.partition}"
  15. )
  16. yield ret_val
  17. except Exception as e:
  18. LOGGER.critical(f"{self} failed to consume message {message} on channel {message.topic} failed with: {e}",
  19. exc_info=True)
  20. raise e
  21. except Exception as e:
  22. if self.consumer is not None:
  23. try:
  24. self.commit()
  25. self.close()
  26. except Exception as e:
  27. LOGGER.critical(
  28. "Couldn't close/commit consumer in exception due to {} trying to re-create".format(e))
  29. self.create_kafka_consumer()
  30. LOGGER.critical(
  31. "consuming message failed with: ", exc_info=True)
  32. else:
  33. raise MessagingExceptionCritical(
  34. "Kafka has gone away, exiting running code, tried creating new consumer and failed")
  35. finally:
  36. sleep(0.0001)