zabbix-kafka-consumers-monitor_3.py 2.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243
  1. def describe(self, node_id, group_name):
  2. describe_groups_request = admin.DescribeGroupsRequest_v0(
  3. groups=[(group_name)]
  4. )
  5. future = self.client.send(node_id, describe_groups_request)
  6. self.client.poll(timeout_ms=self.timeout, future=future)
  7. (error_code, group_id, state, protocol_type, protocol, members) = future.value.groups[0]
  8. if error_code != 0:
  9. print(
  10. "Kafka API - RET admin.DescribeGroupsRequest, error_code={}, group_id={}, state={}, protocol_type={}, protocol={}, members_count={}".format(
  11. error_code, group_id, state, protocol_type, protocol, len(members)))
  12. exit(1)
  13. metadata_consumer_group = {'id': group_name, 'state': state, 'topics': [], 'lag': 0, 'members': []}
  14. if len(members) != 0:
  15. for member in members:
  16. (member_id, client_id, client_host, member_metadata, member_assignment) = member
  17. member_topics_assignment = []
  18. for (topic, partitions) in MemberAssignment.decode(member_assignment).assignment:
  19. member_topics_assignment.append(topic)
  20. metadata_consumer_group['members'].append(
  21. {'member_id': member_id, 'client_id': client_id, 'client_host': client_host,
  22. 'topic': member_topics_assignment})
  23. metadata_consumer_group['topics'] += member_topics_assignment
  24. (lag_total, topics_found) = self.get_lag_by_topic_list(group_name,
  25. topics=metadata_consumer_group['topics'])
  26. metadata_consumer_group['lag'] = metadata_consumer_group['lag'] + lag_total
  27. else:
  28. all_topics = self.client.cluster.topics()
  29. while '__consumer_offsets' in all_topics: all_topics.remove('__consumer_offsets')
  30. (lag_total, topics_found) = self.get_lag_by_topic_list(group_name, topics=all_topics)
  31. metadata_consumer_group['lag'] = metadata_consumer_group['lag'] + lag_total
  32. metadata_consumer_group['topics'] = topics_found
  33. return metadata_consumer_group