123456789101112131415161718192021222324 |
- def get_lag_by_topic(self, group_name, topic):
- consumer = KafkaConsumer(
- bootstrap_servers=self.kafka_brokers,
- group_id=group_name,
- security_protocol=self.security_protocol,
- sasl_mechanism=self.sasl_mechanism,
- sasl_plain_username=self.sasl_plain_username,
- sasl_plain_password=self.sasl_plain_password,
- ssl_context=self.ssl_context
- )
- partitions_per_topic = consumer.partitions_for_topic(topic)
- for partition in partitions_per_topic:
- tp = TopicPartition(topic, partition)
- consumer.assign([tp])
- committed = consumer.committed(tp)
- consumer.seek_to_end(tp)
- last_offset = consumer.position(tp)
- if committed is not None and int(committed) and last_offset is not None and int(last_offset):
- self.lag_topics_found.append(topic)
- self.lag_total += (last_offset - committed)
- consumer.close(autocommit=False)
- return
|