kafka-json-producer-backup.py 1.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051
  1. import threading, logging, time
  2. import multiprocessing
  3. import json
  4. from kafka import KafkaProducer
  5. class Producer(threading.Thread):
  6. def __init__(self):
  7. threading.Thread.__init__(self)
  8. self.stop_event = threading.Event()
  9. def stop(self):
  10. self.stop_event.set()
  11. def run(self):
  12. producer = KafkaProducer(bootstrap_servers='localhost:9092', value_serializer= json.dumps(m).encode('ascii'))
  13. with open('credit_test.json') as json_file:
  14. data = json.load(json_file)
  15. for p in data:
  16. print (p)
  17. producer.send('credit-card-tx', p)
  18. time.sleep(5)
  19. producer.close()
  20. def main():
  21. tasks = [
  22. Producer(),
  23. ]
  24. for t in tasks:
  25. t.start()
  26. time.sleep(10)
  27. for task in tasks:
  28. task.stop()
  29. for task in tasks:
  30. task.join()
  31. if __name__ == "__main__":
  32. logging.basicConfig(
  33. format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%(levelname)s:%(process)d:%(message)s',
  34. level=logging.INFO
  35. )
  36. main()