kafka-json-model_1.py 639 B

1234567891011121314151617
  1. def run(self):
  2. consumer = KafkaConsumer(bootstrap_servers='localhost:9092',
  3. auto_offset_reset='earliest',
  4. consumer_timeout_ms=1000, value_deserializer=lambda m: json.loads(m.decode('ascii')))
  5. consumer.subscribe(['credit-card-tx'])
  6. while True:
  7. for message in consumer:
  8. df = pd.DataFrame.from_records([message.value])
  9. df = df.drop(['Time'],axis=1)
  10. outcome = self.model.best_estimator_.predict(df)[0]
  11. message.value['P'] = outcome
  12. self.cwd.write_json_table(message.value)
  13. consumer.close()