3-example.py 1.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
  1. #!/usr/bin/env python
  2. import threading, time
  3. from kafka import KafkaAdminClient, KafkaConsumer, KafkaProducer
  4. from kafka.admin import NewTopic
  5. class Producer(threading.Thread):
  6. def __init__(self):
  7. threading.Thread.__init__(self)
  8. self.stop_event = threading.Event()
  9. def stop(self):
  10. self.stop_event.set()
  11. def run(self):
  12. producer = KafkaProducer(bootstrap_servers='localhost:9092')
  13. while not self.stop_event.is_set():
  14. producer.send('my-topic', b"test")
  15. producer.send('my-topic', b"\xc2Hola, mundo!")
  16. time.sleep(1)
  17. producer.close()
  18. class Consumer(threading.Thread):
  19. def __init__(self):
  20. threading.Thread.__init__(self)
  21. self.stop_event = threading.Event()
  22. def stop(self):
  23. self.stop_event.set()
  24. def run(self):
  25. consumer = KafkaConsumer(bootstrap_servers='localhost:9092',
  26. auto_offset_reset='earliest',
  27. consumer_timeout_ms=1000)
  28. consumer.subscribe(['my-topic'])
  29. while not self.stop_event.is_set():
  30. for message in consumer:
  31. print(message)
  32. if self.stop_event.is_set():
  33. break
  34. consumer.close()
  35. def main():
  36. # Create 'my-topic' Kafka topic
  37. try:
  38. admin = KafkaAdminClient(bootstrap_servers='localhost:9092')
  39. topic = NewTopic(name='my-topic',
  40. num_partitions=1,
  41. replication_factor=1)
  42. admin.create_topics([topic])
  43. except Exception:
  44. pass
  45. tasks = [
  46. Producer(),
  47. Consumer()
  48. ]
  49. # Start threads of a publisher/producer and a subscriber/consumer to 'my-topic' Kafka topic
  50. for t in tasks:
  51. t.start()
  52. time.sleep(10)
  53. # Stop threads
  54. for task in tasks:
  55. task.stop()
  56. for task in tasks:
  57. task.join()
  58. if __name__ == "__main__":
  59. main()