12345678910111213141516171819202122232425262728293031323334353637383940414243 |
- def describe(self, node_id, group_name):
- describe_groups_request = admin.DescribeGroupsRequest_v0(
- groups=[(group_name)]
- )
- future = self.client.send(node_id, describe_groups_request)
- self.client.poll(timeout_ms=self.timeout, future=future)
- (error_code, group_id, state, protocol_type, protocol, members) = future.value.groups[0]
- if error_code != 0:
- print(
- "Kafka API - RET admin.DescribeGroupsRequest, error_code={}, group_id={}, state={}, protocol_type={}, protocol={}, members_count={}".format(
- error_code, group_id, state, protocol_type, protocol, len(members)))
- exit(1)
- metadata_consumer_group = {'id': group_name, 'state': state, 'topics': [], 'lag': 0, 'members': []}
- if len(members) != 0:
- for member in members:
- (member_id, client_id, client_host, member_metadata, member_assignment) = member
- member_topics_assignment = []
- for (topic, partitions) in MemberAssignment.decode(member_assignment).assignment:
- member_topics_assignment.append(topic)
- metadata_consumer_group['members'].append(
- {'member_id': member_id, 'client_id': client_id, 'client_host': client_host,
- 'topic': member_topics_assignment})
- metadata_consumer_group['topics'] += member_topics_assignment
- (lag_total, topics_found) = self.get_lag_by_topic_list(group_name,
- topics=metadata_consumer_group['topics'])
- metadata_consumer_group['lag'] = metadata_consumer_group['lag'] + lag_total
- else:
- all_topics = self.client.cluster.topics()
- while '__consumer_offsets' in all_topics: all_topics.remove('__consumer_offsets')
- (lag_total, topics_found) = self.get_lag_by_topic_list(group_name, topics=all_topics)
- metadata_consumer_group['lag'] = metadata_consumer_group['lag'] + lag_total
- metadata_consumer_group['topics'] = topics_found
- return metadata_consumer_group
|