123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051 |
- import threading, logging, time
- import multiprocessing
- import json
- from kafka import KafkaProducer
- class Producer(threading.Thread):
- def __init__(self):
- threading.Thread.__init__(self)
- self.stop_event = threading.Event()
-
- def stop(self):
- self.stop_event.set()
- def run(self):
- producer = KafkaProducer(bootstrap_servers='localhost:9092', value_serializer= json.dumps(m).encode('ascii'))
- with open('credit_test.json') as json_file:
- data = json.load(json_file)
- for p in data:
- print (p)
- producer.send('credit-card-tx', p)
- time.sleep(5)
- producer.close()
-
- def main():
- tasks = [
- Producer(),
- ]
- for t in tasks:
- t.start()
- time.sleep(10)
-
- for task in tasks:
- task.stop()
- for task in tasks:
- task.join()
-
-
- if __name__ == "__main__":
- logging.basicConfig(
- format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%(levelname)s:%(process)d:%(message)s',
- level=logging.INFO
- )
- main()
|