import asyncio import nats.aio.errors from nats.aio.client import Client as NATS async def example(): # [begin slow_listener] nc = NATS() async def error_cb(e): if type(e) is nats.aio.errors.ErrSlowConsumer: print("Slow consumer error, unsubscribing from handling further messages...") await nc.unsubscribe(e.sid) await nc.connect( servers=["nats://demo.nats.io:4222"], error_cb=error_cb, ) msgs = [] future = asyncio.Future() async def cb(msg): nonlocal msgs nonlocal future print(msg) msgs.append(msg) if len(msgs) == 3: # Head of line blocking on other messages caused # by single message proccesing taking long... await asyncio.sleep(1) await nc.subscribe("updates", cb=cb, pending_msgs_limit=5) for i in range(0, 10): await nc.publish("updates", "msg #{}".format(i).encode()) await asyncio.sleep(0) try: await asyncio.wait_for(future, 1) except asyncio.TimeoutError: pass for msg in msgs: print("[Received]", msg) await nc.close() # [end slow_listener] loop = asyncio.get_event_loop() loop.run_until_complete(example()) loop.close()