zabbix-kafka-consumers-monitor_5.py 955 B

123456789101112131415161718192021222324
  1. def get_lag_by_topic(self, group_name, topic):
  2. consumer = KafkaConsumer(
  3. bootstrap_servers=self.kafka_brokers,
  4. group_id=group_name,
  5. security_protocol=self.security_protocol,
  6. sasl_mechanism=self.sasl_mechanism,
  7. sasl_plain_username=self.sasl_plain_username,
  8. sasl_plain_password=self.sasl_plain_password,
  9. ssl_context=self.ssl_context
  10. )
  11. partitions_per_topic = consumer.partitions_for_topic(topic)
  12. for partition in partitions_per_topic:
  13. tp = TopicPartition(topic, partition)
  14. consumer.assign([tp])
  15. committed = consumer.committed(tp)
  16. consumer.seek_to_end(tp)
  17. last_offset = consumer.position(tp)
  18. if committed is not None and int(committed) and last_offset is not None and int(last_offset):
  19. self.lag_topics_found.append(topic)
  20. self.lag_total += (last_offset - committed)
  21. consumer.close(autocommit=False)
  22. return