slow_listener.py 1.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354
  1. import asyncio
  2. import nats.aio.errors
  3. from nats.aio.client import Client as NATS
  4. async def example():
  5. # [begin slow_listener]
  6. nc = NATS()
  7. async def error_cb(e):
  8. if type(e) is nats.aio.errors.ErrSlowConsumer:
  9. print("Slow consumer error, unsubscribing from handling further messages...")
  10. await nc.unsubscribe(e.sid)
  11. await nc.connect(
  12. servers=["nats://demo.nats.io:4222"],
  13. error_cb=error_cb,
  14. )
  15. msgs = []
  16. future = asyncio.Future()
  17. async def cb(msg):
  18. nonlocal msgs
  19. nonlocal future
  20. print(msg)
  21. msgs.append(msg)
  22. if len(msgs) == 3:
  23. # Head of line blocking on other messages caused
  24. # by single message proccesing taking long...
  25. await asyncio.sleep(1)
  26. await nc.subscribe("updates", cb=cb, pending_msgs_limit=5)
  27. for i in range(0, 10):
  28. await nc.publish("updates", "msg #{}".format(i).encode())
  29. await asyncio.sleep(0)
  30. try:
  31. await asyncio.wait_for(future, 1)
  32. except asyncio.TimeoutError:
  33. pass
  34. for msg in msgs:
  35. print("[Received]", msg)
  36. await nc.close()
  37. # [end slow_listener]
  38. loop = asyncio.get_event_loop()
  39. loop.run_until_complete(example())
  40. loop.close()