zabbix-kafka-consumers-monitor.py 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. from kafka import KafkaConsumer, TopicPartition
  2. from kafka.client_async import KafkaClient
  3. from kafka.protocol import admin
  4. import time
  5. import threading
  6. import ssl
  7. from kafka.protocol.group import MemberAssignment
  8. #import logging
  9. #logging.basicConfig(level=logging.DEBUG)
  10. '''
  11. This class retrieve all consumer groups and information about members and lag using Kafka API.
  12. Documentation about Kafka API
  13. https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol
  14. '''
  15. class KafkaConsumerGroups:
  16. kafka_brokers = None
  17. client = None
  18. timeout = None
  19. security_protocol = None
  20. sasl_mechanism = None
  21. sasl_plain_username = None
  22. sasl_plain_password = None
  23. ssl_certfile = None
  24. ssl_keyfile = None
  25. ssl_context = None
  26. def __init__(self, kafka_brokers, security_protocol, sasl_mechanism, sasl_plain_username, sasl_plain_password, ssl_context, timeout=5000):
  27. self.kafka_brokers = kafka_brokers
  28. self.security_protocol = security_protocol
  29. self.sasl_mechanism = sasl_mechanism
  30. self.sasl_plain_username = sasl_plain_username
  31. self.sasl_plain_password = sasl_plain_password
  32. self.ssl_context = ssl_context
  33. self.timeout = timeout
  34. self.client = KafkaClient(bootstrap_servers=kafka_brokers, security_protocol=security_protocol, sasl_mechanism=sasl_mechanism, sasl_plain_username=sasl_plain_username, sasl_plain_password=sasl_plain_password, ssl_context=ssl_context, timeout=timeout)
  35. self.lag_topics_found = []
  36. self.lag_total = 0
  37. def list(self):
  38. list_groups_request = admin.ListGroupsRequest_v0(
  39. timeout=self.timeout
  40. )
  41. kafka_broker_ids = [broker.nodeId for broker in self.client.cluster.brokers()]
  42. consumers_grp = {}
  43. for broker_id in kafka_broker_ids:
  44. current_of_tries = 0
  45. max_of_tries = 5
  46. data_from_node = False
  47. while not data_from_node and current_of_tries <= max_of_tries:
  48. future = self.client.send(broker_id, list_groups_request)
  49. self.client.poll(timeout_ms=self.timeout, future=future)
  50. if future.value is not None:
  51. result = future.value.groups
  52. for i in result:
  53. consumers_grp.update({i[0]: broker_id})
  54. data_from_node = True
  55. else:
  56. current_of_tries += 1
  57. time.sleep(0.5)
  58. return consumers_grp
  59. def get_members(self, node_id, group_name):
  60. describe_groups_request = admin.DescribeGroupsRequest_v0(
  61. groups=[(group_name)]
  62. )
  63. future = self.client.send(node_id, describe_groups_request)
  64. self.client.poll(timeout_ms=self.timeout, future=future)
  65. (error_code, group_id, state, protocol_type, protocol, members) = future.value.groups[0]
  66. if error_code != 0:
  67. print(
  68. "Kafka API - RET admin.DescribeGroupsRequest, error_code={}, group_id={}, state={}, protocol_type={}, protocol={}, members_count={}".format(
  69. error_code, group_id, state, protocol_type, protocol, len(members)))
  70. exit(1)
  71. lmembers=[]
  72. for member in members:
  73. (member_id, client_id, client_host, member_metadata, member_assignment) = member
  74. lmembers.append({'member_id': member_id, 'client_id': client_id, 'client_host': client_host})
  75. return lmembers
  76. def describe(self, node_id, group_name):
  77. describe_groups_request = admin.DescribeGroupsRequest_v0(
  78. groups=[(group_name)]
  79. )
  80. future = self.client.send(node_id, describe_groups_request)
  81. self.client.poll(timeout_ms=self.timeout, future=future)
  82. (error_code, group_id, state, protocol_type, protocol, members) = future.value.groups[0]
  83. if error_code != 0:
  84. print(
  85. "Kafka API - RET admin.DescribeGroupsRequest, error_code={}, group_id={}, state={}, protocol_type={}, protocol={}, members_count={}".format(
  86. error_code, group_id, state, protocol_type, protocol, len(members)))
  87. exit(1)
  88. metadata_consumer_group = {'id': group_name, 'state': state, 'topics': [], 'lag': 0, 'members': []}
  89. if len(members) != 0:
  90. for member in members:
  91. (member_id, client_id, client_host, member_metadata, member_assignment) = member
  92. member_topics_assignment = []
  93. for (topic, partitions) in MemberAssignment.decode(member_assignment).assignment:
  94. member_topics_assignment.append(topic)
  95. metadata_consumer_group['members'].append(
  96. {'member_id': member_id, 'client_id': client_id, 'client_host': client_host,
  97. 'topic': member_topics_assignment})
  98. metadata_consumer_group['topics'] += member_topics_assignment
  99. (lag_total, topics_found) = self.get_lag_by_topic_list(group_name,
  100. topics=metadata_consumer_group['topics'])
  101. metadata_consumer_group['lag'] = metadata_consumer_group['lag'] + lag_total
  102. else:
  103. all_topics = self.client.cluster.topics()
  104. while '__consumer_offsets' in all_topics: all_topics.remove('__consumer_offsets')
  105. (lag_total, topics_found) = self.get_lag_by_topic_list(group_name, topics=all_topics)
  106. metadata_consumer_group['lag'] = metadata_consumer_group['lag'] + lag_total
  107. metadata_consumer_group['topics'] = topics_found
  108. return metadata_consumer_group
  109. def get_lag_by_topic_list(self, group_name, topics):
  110. self.lag_topics_found = []
  111. self.lag_total = 0
  112. topics = list(topics)
  113. no_threads = 16
  114. batches = [topics[i:i + no_threads] for i in range(0, len(topics), no_threads)]
  115. for batch_topics in batches:
  116. threads = []
  117. for topic in batch_topics:
  118. t = threading.Thread(target=self.get_lag_by_topic, args=(group_name, topic,))
  119. threads.append(t)
  120. t.start()
  121. for t in threads:
  122. if t.isAlive():
  123. t.join()
  124. return self.lag_total, list(set(self.lag_topics_found))
  125. def get_lag_by_topic(self, group_name, topic):
  126. consumer = KafkaConsumer(
  127. bootstrap_servers=self.kafka_brokers,
  128. group_id=group_name,
  129. security_protocol=self.security_protocol,
  130. sasl_mechanism=self.sasl_mechanism,
  131. sasl_plain_username=self.sasl_plain_username,
  132. sasl_plain_password=self.sasl_plain_password,
  133. ssl_context=self.ssl_context
  134. )
  135. partitions_per_topic = consumer.partitions_for_topic(topic)
  136. for partition in partitions_per_topic:
  137. tp = TopicPartition(topic, partition)
  138. consumer.assign([tp])
  139. committed = consumer.committed(tp)
  140. consumer.seek_to_end(tp)
  141. last_offset = consumer.position(tp)
  142. if committed is not None and int(committed) and last_offset is not None and int(last_offset):
  143. self.lag_topics_found.append(topic)
  144. self.lag_total += (last_offset - committed)
  145. consumer.close(autocommit=False)
  146. return