def run(self): consumer = KafkaConsumer(bootstrap_servers='localhost:9092', auto_offset_reset='earliest', consumer_timeout_ms=1000, value_deserializer=lambda m: json.loads(m.decode('ascii'))) consumer.subscribe(['credit-card-tx']) while True: for message in consumer: df = pd.DataFrame.from_records([message.value]) df = df.drop(['Time'],axis=1) outcome = self.model.best_estimator_.predict(df)[0] message.value['P'] = outcome self.cwd.write_json_table(message.value) consumer.close()