123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176 |
- from kafka import KafkaConsumer, TopicPartition
- from kafka.client_async import KafkaClient
- from kafka.protocol import admin
- import time
- import threading
- import ssl
- from kafka.protocol.group import MemberAssignment
- #import logging
- #logging.basicConfig(level=logging.DEBUG)
- '''
- This class retrieve all consumer groups and information about members and lag using Kafka API.
- Documentation about Kafka API
- https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol
- '''
- class KafkaConsumerGroups:
- kafka_brokers = None
- client = None
- timeout = None
- security_protocol = None
- sasl_mechanism = None
- sasl_plain_username = None
- sasl_plain_password = None
- ssl_certfile = None
- ssl_keyfile = None
- ssl_context = None
- def __init__(self, kafka_brokers, security_protocol, sasl_mechanism, sasl_plain_username, sasl_plain_password, ssl_context, timeout=5000):
- self.kafka_brokers = kafka_brokers
- self.security_protocol = security_protocol
- self.sasl_mechanism = sasl_mechanism
- self.sasl_plain_username = sasl_plain_username
- self.sasl_plain_password = sasl_plain_password
- self.ssl_context = ssl_context
- self.timeout = timeout
- self.client = KafkaClient(bootstrap_servers=kafka_brokers, security_protocol=security_protocol, sasl_mechanism=sasl_mechanism, sasl_plain_username=sasl_plain_username, sasl_plain_password=sasl_plain_password, ssl_context=ssl_context, timeout=timeout)
- self.lag_topics_found = []
- self.lag_total = 0
- def list(self):
- list_groups_request = admin.ListGroupsRequest_v0(
- timeout=self.timeout
- )
- kafka_broker_ids = [broker.nodeId for broker in self.client.cluster.brokers()]
- consumers_grp = {}
- for broker_id in kafka_broker_ids:
- current_of_tries = 0
- max_of_tries = 5
- data_from_node = False
- while not data_from_node and current_of_tries <= max_of_tries:
- future = self.client.send(broker_id, list_groups_request)
- self.client.poll(timeout_ms=self.timeout, future=future)
- if future.value is not None:
- result = future.value.groups
- for i in result:
- consumers_grp.update({i[0]: broker_id})
- data_from_node = True
- else:
- current_of_tries += 1
- time.sleep(0.5)
- return consumers_grp
- def get_members(self, node_id, group_name):
- describe_groups_request = admin.DescribeGroupsRequest_v0(
- groups=[(group_name)]
- )
- future = self.client.send(node_id, describe_groups_request)
- self.client.poll(timeout_ms=self.timeout, future=future)
- (error_code, group_id, state, protocol_type, protocol, members) = future.value.groups[0]
- if error_code != 0:
- print(
- "Kafka API - RET admin.DescribeGroupsRequest, error_code={}, group_id={}, state={}, protocol_type={}, protocol={}, members_count={}".format(
- error_code, group_id, state, protocol_type, protocol, len(members)))
- exit(1)
- lmembers=[]
- for member in members:
- (member_id, client_id, client_host, member_metadata, member_assignment) = member
- lmembers.append({'member_id': member_id, 'client_id': client_id, 'client_host': client_host})
- return lmembers
- def describe(self, node_id, group_name):
- describe_groups_request = admin.DescribeGroupsRequest_v0(
- groups=[(group_name)]
- )
- future = self.client.send(node_id, describe_groups_request)
- self.client.poll(timeout_ms=self.timeout, future=future)
- (error_code, group_id, state, protocol_type, protocol, members) = future.value.groups[0]
- if error_code != 0:
- print(
- "Kafka API - RET admin.DescribeGroupsRequest, error_code={}, group_id={}, state={}, protocol_type={}, protocol={}, members_count={}".format(
- error_code, group_id, state, protocol_type, protocol, len(members)))
- exit(1)
- metadata_consumer_group = {'id': group_name, 'state': state, 'topics': [], 'lag': 0, 'members': []}
- if len(members) != 0:
- for member in members:
- (member_id, client_id, client_host, member_metadata, member_assignment) = member
- member_topics_assignment = []
- for (topic, partitions) in MemberAssignment.decode(member_assignment).assignment:
- member_topics_assignment.append(topic)
- metadata_consumer_group['members'].append(
- {'member_id': member_id, 'client_id': client_id, 'client_host': client_host,
- 'topic': member_topics_assignment})
- metadata_consumer_group['topics'] += member_topics_assignment
- (lag_total, topics_found) = self.get_lag_by_topic_list(group_name,
- topics=metadata_consumer_group['topics'])
- metadata_consumer_group['lag'] = metadata_consumer_group['lag'] + lag_total
- else:
- all_topics = self.client.cluster.topics()
- while '__consumer_offsets' in all_topics: all_topics.remove('__consumer_offsets')
- (lag_total, topics_found) = self.get_lag_by_topic_list(group_name, topics=all_topics)
- metadata_consumer_group['lag'] = metadata_consumer_group['lag'] + lag_total
- metadata_consumer_group['topics'] = topics_found
- return metadata_consumer_group
- def get_lag_by_topic_list(self, group_name, topics):
- self.lag_topics_found = []
- self.lag_total = 0
- topics = list(topics)
- no_threads = 16
- batches = [topics[i:i + no_threads] for i in range(0, len(topics), no_threads)]
- for batch_topics in batches:
- threads = []
- for topic in batch_topics:
- t = threading.Thread(target=self.get_lag_by_topic, args=(group_name, topic,))
- threads.append(t)
- t.start()
- for t in threads:
- if t.isAlive():
- t.join()
- return self.lag_total, list(set(self.lag_topics_found))
- def get_lag_by_topic(self, group_name, topic):
- consumer = KafkaConsumer(
- bootstrap_servers=self.kafka_brokers,
- group_id=group_name,
- security_protocol=self.security_protocol,
- sasl_mechanism=self.sasl_mechanism,
- sasl_plain_username=self.sasl_plain_username,
- sasl_plain_password=self.sasl_plain_password,
- ssl_context=self.ssl_context
- )
- partitions_per_topic = consumer.partitions_for_topic(topic)
- for partition in partitions_per_topic:
- tp = TopicPartition(topic, partition)
- consumer.assign([tp])
- committed = consumer.committed(tp)
- consumer.seek_to_end(tp)
- last_offset = consumer.position(tp)
- if committed is not None and int(committed) and last_offset is not None and int(last_offset):
- self.lag_topics_found.append(topic)
- self.lag_total += (last_offset - committed)
- consumer.close(autocommit=False)
- return
|