def list(self): list_groups_request = admin.ListGroupsRequest_v0( timeout=self.timeout ) kafka_broker_ids = [broker.nodeId for broker in self.client.cluster.brokers()] consumers_grp = {} for broker_id in kafka_broker_ids: current_of_tries = 0 max_of_tries = 5 data_from_node = False while not data_from_node and current_of_tries <= max_of_tries: future = self.client.send(broker_id, list_groups_request) self.client.poll(timeout_ms=self.timeout, future=future) if future.value is not None: result = future.value.groups for i in result: consumers_grp.update({i[0]: broker_id}) data_from_node = True else: current_of_tries += 1 time.sleep(0.5) return consumers_grp