def run(self): producer = KafkaProducer(bootstrap_servers='localhost:9092', value_serializer=lambda m: json.dumps(m).encode('ascii')) with open('credit_test.json') as json_file: data = json.load(json_file) for p in data: print (p) producer.send('credit-card-tx', p) time.sleep(5) producer.close()