import threading, logging, time import multiprocessing import json from kafka import KafkaProducer class Producer(threading.Thread): def __init__(self): threading.Thread.__init__(self) self.stop_event = threading.Event() def stop(self): self.stop_event.set() def run(self): producer = KafkaProducer(bootstrap_servers='localhost:9092', value_serializer=lambda m: json.dumps(m).encode('ascii')) with open('credit_test.json') as json_file: data = json.load(json_file) for p in data: print (p) producer.send('credit-card-tx', p) time.sleep(5) producer.close() def main(): tasks = [ Producer(), ] for t in tasks: t.start() time.sleep(10) for task in tasks: task.stop() for task in tasks: task.join() if __name__ == "__main__": logging.basicConfig( format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%(levelname)s:%(process)d:%(message)s', level=logging.INFO ) main()