1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071 |
- import argparse
- import json
- import time
- from random import choice
- import datetime
- from kafka import KafkaProducer
- def _kafka_args():
- parser = argparse.ArgumentParser(add_help=False)
- args = parser.add_argument_group("kafka configurations")
- args.add_argument("-t", "--topic", help="Kafka topic.", default="kafka-default")
- args.add_argument(
- "-a", "--address", help="Kafka address.", default="localhost:9092"
- )
- args.add_argument("-u", "--username", help="Kafka server's username.")
- args.add_argument("-p", "--password", help="Kafka server's password.")
- return parser
- def main():
- def default(o):
- if isinstance(o, datetime.datetime):
- return o.isoformat()
- parser = argparse.ArgumentParser(
- parents=[_kafka_args()], formatter_class=argparse.ArgumentDefaultsHelpFormatter
- )
- args = parser.parse_args()
- if args.username and args.password:
- client = KafkaProducer(
- bootstrap_servers=args.address,
- sasl_plain_username=args.username,
- sasl_plain_password=args.password,
- sasl_mechanism="PLAIN",
- security_protocol="SASL_PLAINTEXT",
- acks=1,
- api_version=(1, 0),
- )
- else:
- client = KafkaProducer(bootstrap_servers=args.address)
- schools = ["harvard", "stanford", "oxford", "cambridge", "mit", "carnegie"]
- majors = [
- "computer science",
- "biology",
- "physics",
- "mechanical science",
- "materials science",
- ]
- names = ["lilei", "hanmeimei", "tom", "jerry", "leesin"]
- while True:
- data = {
- "school": choice(schools),
- "major": choice(majors),
- "name": choice(names),
- "timestamp": datetime.datetime.now(),
- "extra": "may be some other data",
- }
- msg = json.dumps(data, default=default)
- print("send kafka msg:", msg)
- client.send(args.topic, value=msg.encode("utf-8"))
- time.sleep(0.2)
- if __name__ == "__main__":
- main()
|