zabbix-kafka-consumers-monitor_1.py 878 B

12345678910111213141516171819202122232425
  1. def list(self):
  2. list_groups_request = admin.ListGroupsRequest_v0(
  3. timeout=self.timeout
  4. )
  5. kafka_broker_ids = [broker.nodeId for broker in self.client.cluster.brokers()]
  6. consumers_grp = {}
  7. for broker_id in kafka_broker_ids:
  8. current_of_tries = 0
  9. max_of_tries = 5
  10. data_from_node = False
  11. while not data_from_node and current_of_tries <= max_of_tries:
  12. future = self.client.send(broker_id, list_groups_request)
  13. self.client.poll(timeout_ms=self.timeout, future=future)
  14. if future.value is not None:
  15. result = future.value.groups
  16. for i in result:
  17. consumers_grp.update({i[0]: broker_id})
  18. data_from_node = True
  19. else:
  20. current_of_tries += 1
  21. time.sleep(0.5)
  22. return consumers_grp