4-confluent_cloud.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. #!/usr/bin/env python
  2. #
  3. # Copyright 2018 Confluent Inc.
  4. #
  5. # Licensed under the Apache License, Version 2.0 (the "License");
  6. # you may not use this file except in compliance with the License.
  7. # You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing, software
  12. # distributed under the License is distributed on an "AS IS" BASIS,
  13. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. # See the License for the specific language governing permissions and
  15. # limitations under the License.
  16. # This is a simple example demonstrating how to produce a message to
  17. # Confluent Cloud then read it back again.
  18. #
  19. # https://www.confluent.io/confluent-cloud/
  20. #
  21. # Auto-creation of topics is disabled in Confluent Cloud. You will need to
  22. # use the ccloud cli to create the python-test-topic topic before running this
  23. # example.
  24. #
  25. # $ ccloud topic create python-test-topic
  26. #
  27. # The <ccloud bootstrap servers>, <ccloud key> and <ccloud secret> parameters
  28. # are available via the Confluent Cloud web interface. For more information,
  29. # refer to the quick-start:
  30. #
  31. # https://docs.confluent.io/current/cloud-quickstart.html
  32. #
  33. # to execute using Python 2.7:
  34. # $ virtualenv ccloud_example
  35. # $ source ccloud_example/bin/activate
  36. # $ pip install confluent_kafka
  37. # $ python confluent_cloud.py
  38. # $ deactivate
  39. #
  40. # to execute using Python 3.x:
  41. # $ python -m venv ccloud_example
  42. # $ source ccloud_example/bin/activate
  43. # $ pip install confluent_kafka
  44. # $ python confluent_cloud.py
  45. # $ deactivate
  46. import uuid
  47. from confluent_kafka import Producer, Consumer, KafkaError, KafkaException
  48. def error_cb(err):
  49. """ The error callback is used for generic client errors. These
  50. errors are generally to be considered informational as the client will
  51. automatically try to recover from all errors, and no extra action
  52. is typically required by the application.
  53. For this example however, we terminate the application if the client
  54. is unable to connect to any broker (_ALL_BROKERS_DOWN) and on
  55. authentication errors (_AUTHENTICATION). """
  56. print("Client error: {}".format(err))
  57. if err.code() == KafkaError._ALL_BROKERS_DOWN or \
  58. err.code() == KafkaError._AUTHENTICATION:
  59. # Any exception raised from this callback will be re-raised from the
  60. # triggering flush() or poll() call.
  61. raise KafkaException(err)
  62. # Create producer
  63. p = Producer({
  64. 'bootstrap.servers': '<ccloud bootstrap servers>',
  65. 'sasl.mechanism': 'PLAIN',
  66. 'security.protocol': 'SASL_SSL',
  67. 'sasl.username': '<ccloud key>',
  68. 'sasl.password': '<ccloud secret>',
  69. 'error_cb': error_cb,
  70. })
  71. def acked(err, msg):
  72. """Delivery report callback called (from flush()) on successful or failed delivery of the message."""
  73. if err is not None:
  74. print('Failed to deliver message: {}'.format(err.str()))
  75. else:
  76. print('Produced to: {} [{}] @ {}'.format(msg.topic(), msg.partition(), msg.offset()))
  77. if __name__ == '__main__':
  78. for n in range(0, 10):
  79. # Produce message: this is an asynchronous operation.
  80. # Upon successful or permanently failed delivery to the broker the
  81. # callback will be called to propagate the produce result.
  82. # The delivery callback is triggered from poll() or flush().
  83. # For long running
  84. # produce loops it is recommended to call poll() to serve these
  85. # delivery report callbacks.
  86. p.produce('python-test-topic',
  87. value='python test value nr {}'.format(n),
  88. callback=acked)
  89. # Trigger delivery report callbacks from previous produce calls.
  90. p.poll(0)
  91. # flush() is typically called when the producer is done sending messages to wait
  92. # for outstanding messages to be transmitted to the broker and delivery report
  93. # callbacks to get called. For continous producing you should call p.poll(0)
  94. # after each produce() call to trigger delivery report callbacks.
  95. p.flush(10)
  96. # Create consumer
  97. c = Consumer({
  98. 'bootstrap.servers': '<ccloud bootstrap servers>',
  99. 'sasl.mechanism': 'PLAIN',
  100. 'security.protocol': 'SASL_SSL',
  101. 'sasl.username': '<ccloud key>',
  102. 'sasl.password': '<ccloud secret>',
  103. 'group.id': str(uuid.uuid1()),
  104. # this will create a new consumer group on each invocation.
  105. 'auto.offset.reset': 'earliest',
  106. 'error_cb': error_cb,
  107. })
  108. c.subscribe(['python-test-topic'])
  109. try:
  110. while True:
  111. msg = c.poll(0.1) # Wait for message or event/error
  112. if msg is None:
  113. # No message available within timeout.
  114. # Initial message consumption may take up to `session.timeout.ms` for
  115. # the group to rebalance and start consuming.
  116. continue
  117. if msg.error():
  118. # Errors are typically temporary, print error and continue.
  119. print('Consumer error: {}'.format(msg.error()))
  120. continue
  121. print('Consumed: {}'.format(msg.value()))
  122. except KeyboardInterrupt:
  123. pass
  124. finally:
  125. # Leave group and commit final offsets
  126. c.close()