kafka-json-model-backup.py 1.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849
  1. import threading, logging, time
  2. import multiprocessing
  3. import json
  4. from kafka import KafkaConsumer, KafkaProducer
  5. import joblib
  6. import pandas as pd
  7. from cassandra_rw import CassandraReadWriteDb
  8. from PredictTxInfo import PredictTxInfoModel
  9. class Consumer():
  10. def __init__(self):
  11. self.model = joblib.load('model3.pipeline')
  12. self.cwd = CassandraReadWriteDb(ip_addrs=['172.17.0.2'], keyspace="emp")
  13. self.cwd.sync_class_table(PredictTxInfoModel)
  14. def run(self):
  15. consumer = KafkaConsumer(bootstrap_servers='localhost:9092',
  16. auto_offset_reset='earliest',
  17. consumer_timeout_ms=1000, value_deserializer= json.loads(m.decode('ascii')))
  18. consumer.subscribe(['credit-card-tx'])
  19. while True:
  20. for message in consumer:
  21. df = pd.DataFrame.from_records([message.value])
  22. df = df.drop(['Time'],axis=1)
  23. outcome = self.model.best_estimator_.predict(df)[0]
  24. message.value['P'] = outcome
  25. self.cwd.write_json_table(message.value)
  26. consumer.close()
  27. def main():
  28. tasks = [
  29. Consumer()
  30. ]
  31. for t in tasks:
  32. t.run()
  33. if __name__ == "__main__":
  34. logging.basicConfig(
  35. format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%(levelname)s:%(process)d:%(message)s',
  36. level=logging.INFO
  37. )
  38. main()