zabbix-kafka-consumers-monitor_4.py 612 B

123456789101112131415161718192021
  1. def get_lag_by_topic_list(self, group_name, topics):
  2. self.lag_topics_found = []
  3. self.lag_total = 0
  4. topics = list(topics)
  5. no_threads = 16
  6. batches = [topics[i:i + no_threads] for i in range(0, len(topics), no_threads)]
  7. for batch_topics in batches:
  8. threads = []
  9. for topic in batch_topics:
  10. t = threading.Thread(target=self.get_lag_by_topic, args=(group_name, topic,))
  11. threads.append(t)
  12. t.start()
  13. for t in threads:
  14. if t.isAlive():
  15. t.join()
  16. return self.lag_total, list(set(self.lag_topics_found))