drain_conn.py 998 B

123456789101112131415161718192021222324252627282930313233343536373839
  1. # [begin drain_conn]
  2. import asyncio
  3. from nats.aio.client import Client as NATS
  4. async def example(loop):
  5. nc = NATS()
  6. await nc.connect("nats://127.0.0.1:4222", loop=loop)
  7. async def handler(msg):
  8. print("[Received] ", msg)
  9. await nc.publish(msg.reply, b'I can help')
  10. # Can check whether client is in draining state
  11. if nc.is_draining:
  12. print("Connection is draining")
  13. await nc.subscribe("help", "workers", cb=handler)
  14. await nc.flush()
  15. requests = []
  16. for i in range(0, 10):
  17. request = nc.request("help", b'help!', timeout=1)
  18. requests.append(request)
  19. # Wait for all the responses
  20. responses = []
  21. responses = await asyncio.gather(*requests)
  22. # Gracefully close the connection.
  23. await nc.drain()
  24. print("Received {} responses".format(len(responses)))
  25. # [end drain_conn]
  26. if __name__ == '__main__':
  27. loop = asyncio.get_event_loop()
  28. loop.run_until_complete(example(loop))
  29. loop.close()