12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849 |
- import threading, logging, time
- import multiprocessing
- import json
- from kafka import KafkaConsumer, KafkaProducer
- import joblib
- import pandas as pd
- from cassandra_rw import CassandraReadWriteDb
- from PredictTxInfo import PredictTxInfoModel
- class Consumer():
- def __init__(self):
- self.model = joblib.load('model3.pipeline')
- self.cwd = CassandraReadWriteDb(ip_addrs=['172.17.0.2'], keyspace="emp")
- self.cwd.sync_class_table(PredictTxInfoModel)
-
- def run(self):
- consumer = KafkaConsumer(bootstrap_servers='localhost:9092',
- auto_offset_reset='earliest',
- consumer_timeout_ms=1000, value_deserializer= json.loads(m.decode('ascii')))
- consumer.subscribe(['credit-card-tx'])
- while True:
- for message in consumer:
- df = pd.DataFrame.from_records([message.value])
- df = df.drop(['Time'],axis=1)
- outcome = self.model.best_estimator_.predict(df)[0]
- message.value['P'] = outcome
- self.cwd.write_json_table(message.value)
- consumer.close()
-
-
- def main():
- tasks = [
- Consumer()
- ]
- for t in tasks:
- t.run()
-
-
- if __name__ == "__main__":
- logging.basicConfig(
- format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%(levelname)s:%(process)d:%(message)s',
- level=logging.INFO
- )
- main()
|