kafka_producer.py 2.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071
  1. import argparse
  2. import json
  3. import time
  4. from random import choice
  5. import datetime
  6. from kafka import KafkaProducer
  7. def _kafka_args():
  8. parser = argparse.ArgumentParser(add_help=False)
  9. args = parser.add_argument_group("kafka configurations")
  10. args.add_argument("-t", "--topic", help="Kafka topic.", default="kafka-default")
  11. args.add_argument(
  12. "-a", "--address", help="Kafka address.", default="localhost:9092"
  13. )
  14. args.add_argument("-u", "--username", help="Kafka server's username.")
  15. args.add_argument("-p", "--password", help="Kafka server's password.")
  16. return parser
  17. def main():
  18. def default(o):
  19. if isinstance(o, datetime.datetime):
  20. return o.isoformat()
  21. parser = argparse.ArgumentParser(
  22. parents=[_kafka_args()], formatter_class=argparse.ArgumentDefaultsHelpFormatter
  23. )
  24. args = parser.parse_args()
  25. if args.username and args.password:
  26. client = KafkaProducer(
  27. bootstrap_servers=args.address,
  28. sasl_plain_username=args.username,
  29. sasl_plain_password=args.password,
  30. sasl_mechanism="PLAIN",
  31. security_protocol="SASL_PLAINTEXT",
  32. acks=1,
  33. api_version=(1, 0),
  34. )
  35. else:
  36. client = KafkaProducer(bootstrap_servers=args.address)
  37. schools = ["harvard", "stanford", "oxford", "cambridge", "mit", "carnegie"]
  38. majors = [
  39. "computer science",
  40. "biology",
  41. "physics",
  42. "mechanical science",
  43. "materials science",
  44. ]
  45. names = ["lilei", "hanmeimei", "tom", "jerry", "leesin"]
  46. while True:
  47. data = {
  48. "school": choice(schools),
  49. "major": choice(majors),
  50. "name": choice(names),
  51. "timestamp": datetime.datetime.now(),
  52. "extra": "may be some other data",
  53. }
  54. msg = json.dumps(data, default=default)
  55. print("send kafka msg:", msg)
  56. client.send(args.topic, value=msg.encode("utf-8"))
  57. time.sleep(0.2)
  58. if __name__ == "__main__":
  59. main()