connect_tls.py 642 B

1234567891011121314151617181920212223242526
  1. import asyncio
  2. import ssl
  3. from nats.aio.client import Client as NATS
  4. async def example():
  5. # [begin connect_tls]
  6. nc = NATS()
  7. ssl_ctx = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH)
  8. ssl_ctx.load_verify_locations('ca.pem')
  9. ssl_ctx.load_cert_chain(certfile='client-cert.pem',
  10. keyfile='client-key.pem')
  11. await nc.connect(io_loop=loop, tls=ssl_ctx)
  12. await nc.connect(servers=["nats://demo.nats.io:4222"], tls=ssl_ctx)
  13. # Do something with the connection.
  14. # [end connect_tls]
  15. await nc.close()
  16. loop = asyncio.get_event_loop()
  17. loop.run_until_complete(example())
  18. loop.close()