12345678910111213141516171819202122232425 |
- def list(self):
- list_groups_request = admin.ListGroupsRequest_v0(
- timeout=self.timeout
- )
- kafka_broker_ids = [broker.nodeId for broker in self.client.cluster.brokers()]
- consumers_grp = {}
- for broker_id in kafka_broker_ids:
- current_of_tries = 0
- max_of_tries = 5
- data_from_node = False
- while not data_from_node and current_of_tries <= max_of_tries:
- future = self.client.send(broker_id, list_groups_request)
- self.client.poll(timeout_ms=self.timeout, future=future)
- if future.value is not None:
- result = future.value.groups
- for i in result:
- consumers_grp.update({i[0]: broker_id})
- data_from_node = True
- else:
- current_of_tries += 1
- time.sleep(0.5)
- return consumers_grp
|