7-example.py 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172
  1. import asyncio
  2. from nats.aio.client import Client as NATS
  3. from nats.errors import ConnectionClosedError, TimeoutError
  4. async def main():
  5. nc = NATS()
  6. try:
  7. # It is very likely that the demo server will see traffic from clients other than yours.
  8. # To avoid this, start your own locally and modify the example to use it.
  9. # await nc.connect(servers=["nats://127.0.0.1:4222"])
  10. await nc.connect(servers=["nats://demo.nats.io:4222"])
  11. except:
  12. pass
  13. async def message_handler(msg):
  14. print(f"[Received on '{msg.subject}']: {msg.data.decode()}")
  15. try:
  16. # Interested in receiving 2 messages from the 'discover' subject.
  17. sub = await nc.subscribe("discover", "", message_handler)
  18. await sub.unsubscribe(2)
  19. await nc.publish("discover", b'hello')
  20. await nc.publish("discover", b'world')
  21. # Following 2 messages won't be received.
  22. await nc.publish("discover", b'again')
  23. await nc.publish("discover", b'!!!!!')
  24. except ConnectionClosedError:
  25. print("Connection closed prematurely")
  26. async def request_handler(msg):
  27. print("[Request on '{} {}']: {}".format(msg.subject, msg.reply,
  28. msg.data.decode()))
  29. await nc.publish(msg.reply, b'OK')
  30. if nc.is_connected:
  31. # Subscription using a 'workers' queue so that only a single subscriber
  32. # gets a request at a time.
  33. await nc.subscribe("help", "workers", cb=request_handler)
  34. try:
  35. # Make a request expecting a single response within 500 ms,
  36. # otherwise raising a timeout error.
  37. msg = await nc.request("help", b'help please', 0.500)
  38. print(f"[Response]: {msg.data}")
  39. # Make a roundtrip to the server to ensure messages
  40. # that sent messages have been processed already.
  41. await nc.flush(0.500)
  42. except ErrTimeout:
  43. print("[Error] Timeout!")
  44. # Wait a bit for message to be dispatched...
  45. await asyncio.sleep(1)
  46. # Detach from the server.
  47. await nc.close()
  48. if nc.last_error is not None:
  49. print(f"Last Error: {nc.last_error}")
  50. if nc.is_closed:
  51. print("Disconnected.")
  52. if __name__ == '__main__':
  53. asyncio.run(main())