5-producer.py 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869
  1. #!/usr/bin/env python
  2. #
  3. # Copyright 2016 Confluent Inc.
  4. #
  5. # Licensed under the Apache License, Version 2.0 (the "License");
  6. # you may not use this file except in compliance with the License.
  7. # You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing, software
  12. # distributed under the License is distributed on an "AS IS" BASIS,
  13. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. # See the License for the specific language governing permissions and
  15. # limitations under the License.
  16. #
  17. #
  18. # Example Kafka Producer.
  19. # Reads lines from stdin and sends to Kafka.
  20. #
  21. from confluent_kafka import Producer
  22. import sys
  23. if __name__ == '__main__':
  24. if len(sys.argv) != 3:
  25. sys.stderr.write('Usage: %s <bootstrap-brokers> <topic>\n' % sys.argv[0])
  26. sys.exit(1)
  27. broker = sys.argv[1]
  28. topic = sys.argv[2]
  29. # Producer configuration
  30. # See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
  31. conf = {'bootstrap.servers': broker}
  32. # Create Producer instance
  33. p = Producer(**conf)
  34. # Optional per-message delivery callback (triggered by poll() or flush())
  35. # when a message has been successfully delivered or permanently
  36. # failed delivery (after retries).
  37. def delivery_callback(err, msg):
  38. if err:
  39. sys.stderr.write('%% Message failed delivery: %s\n' % err)
  40. else:
  41. sys.stderr.write('%% Message delivered to %s [%d] @ %d\n' %
  42. (msg.topic(), msg.partition(), msg.offset()))
  43. # Read lines from stdin, produce each line to Kafka
  44. for line in sys.stdin:
  45. try:
  46. # Produce line (without newline)
  47. p.produce(topic, line.rstrip(), callback=delivery_callback)
  48. except BufferError:
  49. sys.stderr.write('%% Local producer queue is full (%d messages awaiting delivery): try again\n' %
  50. len(p))
  51. # Serve delivery callback queue.
  52. # NOTE: Since produce() is an asynchronous API this poll() call
  53. # will most likely not serve the delivery callback for the
  54. # last produce()d message.
  55. p.poll(0)
  56. # Wait until all messages have been delivered
  57. sys.stderr.write('%% Waiting for %d deliveries\n' % len(p))
  58. p.flush()