def describe(self, node_id, group_name): describe_groups_request = admin.DescribeGroupsRequest_v0( groups=[(group_name)] ) future = self.client.send(node_id, describe_groups_request) self.client.poll(timeout_ms=self.timeout, future=future) (error_code, group_id, state, protocol_type, protocol, members) = future.value.groups[0] if error_code != 0: print( "Kafka API - RET admin.DescribeGroupsRequest, error_code={}, group_id={}, state={}, protocol_type={}, protocol={}, members_count={}".format( error_code, group_id, state, protocol_type, protocol, len(members))) exit(1) metadata_consumer_group = {'id': group_name, 'state': state, 'topics': [], 'lag': 0, 'members': []} if len(members) != 0: for member in members: (member_id, client_id, client_host, member_metadata, member_assignment) = member member_topics_assignment = [] for (topic, partitions) in MemberAssignment.decode(member_assignment).assignment: member_topics_assignment.append(topic) metadata_consumer_group['members'].append( {'member_id': member_id, 'client_id': client_id, 'client_host': client_host, 'topic': member_topics_assignment}) metadata_consumer_group['topics'] += member_topics_assignment (lag_total, topics_found) = self.get_lag_by_topic_list(group_name, topics=metadata_consumer_group['topics']) metadata_consumer_group['lag'] = metadata_consumer_group['lag'] + lag_total else: all_topics = self.client.cluster.topics() while '__consumer_offsets' in all_topics: all_topics.remove('__consumer_offsets') (lag_total, topics_found) = self.get_lag_by_topic_list(group_name, topics=all_topics) metadata_consumer_group['lag'] = metadata_consumer_group['lag'] + lag_total metadata_consumer_group['topics'] = topics_found return metadata_consumer_group