zabbix-kafka-consumers-monitor_2.py 912 B

12345678910111213141516171819202122
  1. def get_members(self, node_id, group_name):
  2. describe_groups_request = admin.DescribeGroupsRequest_v0(
  3. groups=[(group_name)]
  4. )
  5. future = self.client.send(node_id, describe_groups_request)
  6. self.client.poll(timeout_ms=self.timeout, future=future)
  7. (error_code, group_id, state, protocol_type, protocol, members) = future.value.groups[0]
  8. if error_code != 0:
  9. print(
  10. "Kafka API - RET admin.DescribeGroupsRequest, error_code={}, group_id={}, state={}, protocol_type={}, protocol={}, members_count={}".format(
  11. error_code, group_id, state, protocol_type, protocol, len(members)))
  12. exit(1)
  13. lmembers=[]
  14. for member in members:
  15. (member_id, client_id, client_host, member_metadata, member_assignment) = member
  16. lmembers.append({'member_id': member_id, 'client_id': client_id, 'client_host': client_host})
  17. return lmembers