1234567891011121314151617 |
- def run(self):
- consumer = KafkaConsumer(bootstrap_servers='localhost:9092',
- auto_offset_reset='earliest',
- consumer_timeout_ms=1000, value_deserializer=lambda m: json.loads(m.decode('ascii')))
- consumer.subscribe(['credit-card-tx'])
- while True:
- for message in consumer:
- df = pd.DataFrame.from_records([message.value])
- df = df.drop(['Time'],axis=1)
- outcome = self.model.best_estimator_.predict(df)[0]
- message.value['P'] = outcome
- self.cwd.write_json_table(message.value)
- consumer.close()
-
-
|