def get_lag_by_topic(self, group_name, topic): consumer = KafkaConsumer( bootstrap_servers=self.kafka_brokers, group_id=group_name, security_protocol=self.security_protocol, sasl_mechanism=self.sasl_mechanism, sasl_plain_username=self.sasl_plain_username, sasl_plain_password=self.sasl_plain_password, ssl_context=self.ssl_context ) partitions_per_topic = consumer.partitions_for_topic(topic) for partition in partitions_per_topic: tp = TopicPartition(topic, partition) consumer.assign([tp]) committed = consumer.committed(tp) consumer.seek_to_end(tp) last_offset = consumer.position(tp) if committed is not None and int(committed) and last_offset is not None and int(last_offset): self.lag_topics_found.append(topic) self.lag_total += (last_offset - committed) consumer.close(autocommit=False) return