drain_sub.py 1.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647
  1. # [begin drain_sub]
  2. import asyncio
  3. from nats.aio.client import Client as NATS
  4. async def example(loop):
  5. nc = NATS()
  6. await nc.connect("nats://127.0.0.1:4222", loop=loop)
  7. async def handler(msg):
  8. print("[Received] ", msg)
  9. await nc.publish(msg.reply, b'I can help')
  10. # Can check whether client is in draining state
  11. if nc.is_draining:
  12. print("Connection is draining")
  13. sid = await nc.subscribe("help", "workers", cb=handler)
  14. await nc.flush()
  15. # Gracefully unsubscribe the subscription
  16. await nc.drain(sid)
  17. # [end drain_sub]
  18. requests = []
  19. for i in range(0, 100):
  20. request = nc.request("help", b'help!', timeout=1)
  21. requests.append(request)
  22. # Wait for all the responses
  23. try:
  24. responses = []
  25. responses = await asyncio.gather(*requests)
  26. except:
  27. pass
  28. print("Received {} responses".format(len(responses)))
  29. await nc.close()
  30. if __name__ == '__main__':
  31. loop = asyncio.get_event_loop()
  32. loop.run_until_complete(example(loop))
  33. loop.close()