subscribe_json.py 754 B

12345678910111213141516171819202122232425262728
  1. # [begin subscribe_json]
  2. import asyncio
  3. import json
  4. from nats.aio.client import Client as NATS
  5. from nats.aio.errors import ErrTimeout
  6. async def run(loop):
  7. nc = NATS()
  8. await nc.connect(servers=["nats://127.0.0.1:4222"], loop=loop)
  9. async def message_handler(msg):
  10. data = json.loads(msg.data.decode())
  11. print(data)
  12. sid = await nc.subscribe("updates", cb=message_handler)
  13. await nc.flush()
  14. await nc.auto_unsubscribe(sid, 2)
  15. await nc.publish("updates", json.dumps({"symbol": "GOOG", "price": 1200 }).encode())
  16. await asyncio.sleep(1, loop=loop)
  17. await nc.close()
  18. # [end subscribe_json]
  19. if __name__ == '__main__':
  20. loop = asyncio.get_event_loop()
  21. loop.run_until_complete(run(loop))
  22. loop.close()