123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354 |
- import asyncio
- import nats.aio.errors
- from nats.aio.client import Client as NATS
- async def example():
- # [begin slow_listener]
- nc = NATS()
- async def error_cb(e):
- if type(e) is nats.aio.errors.ErrSlowConsumer:
- print("Slow consumer error, unsubscribing from handling further messages...")
- await nc.unsubscribe(e.sid)
- await nc.connect(
- servers=["nats://demo.nats.io:4222"],
- error_cb=error_cb,
- )
- msgs = []
- future = asyncio.Future()
- async def cb(msg):
- nonlocal msgs
- nonlocal future
- print(msg)
- msgs.append(msg)
- if len(msgs) == 3:
- # Head of line blocking on other messages caused
- # by single message proccesing taking long...
- await asyncio.sleep(1)
- await nc.subscribe("updates", cb=cb, pending_msgs_limit=5)
- for i in range(0, 10):
- await nc.publish("updates", "msg #{}".format(i).encode())
- await asyncio.sleep(0)
- try:
- await asyncio.wait_for(future, 1)
- except asyncio.TimeoutError:
- pass
- for msg in msgs:
- print("[Received]", msg)
- await nc.close()
- # [end slow_listener]
- loop = asyncio.get_event_loop()
- loop.run_until_complete(example())
- loop.close()
|