12345678910111213141516171819202122232425262728 |
- # [begin subscribe_json]
- import asyncio
- import json
- from nats.aio.client import Client as NATS
- from nats.aio.errors import ErrTimeout
- async def run(loop):
- nc = NATS()
- await nc.connect(servers=["nats://127.0.0.1:4222"], loop=loop)
- async def message_handler(msg):
- data = json.loads(msg.data.decode())
- print(data)
- sid = await nc.subscribe("updates", cb=message_handler)
- await nc.flush()
- await nc.auto_unsubscribe(sid, 2)
- await nc.publish("updates", json.dumps({"symbol": "GOOG", "price": 1200 }).encode())
- await asyncio.sleep(1, loop=loop)
- await nc.close()
- # [end subscribe_json]
- if __name__ == '__main__':
- loop = asyncio.get_event_loop()
- loop.run_until_complete(run(loop))
- loop.close()
|