123456789101112131415161718192021 |
- def get_lag_by_topic_list(self, group_name, topics):
- self.lag_topics_found = []
- self.lag_total = 0
- topics = list(topics)
- no_threads = 16
- batches = [topics[i:i + no_threads] for i in range(0, len(topics), no_threads)]
- for batch_topics in batches:
- threads = []
- for topic in batch_topics:
- t = threading.Thread(target=self.get_lag_by_topic, args=(group_name, topic,))
- threads.append(t)
- t.start()
- for t in threads:
- if t.isAlive():
- t.join()
- return self.lag_total, list(set(self.lag_topics_found))
|