dev.org 24 KB

Python Examples

COMMENT Notes

(setq org-babel-python-command "python3")

[35/35] [100%] Progress

DONE connect_default

import asyncio
from nats.aio.client import Client as NATS

async def example():
   # [begin connect_default]
   nc = NATS()
   await nc.connect()

   # Do something with the connection

   await nc.close()

   # [end connect_default]

loop = asyncio.get_event_loop()
loop.run_until_complete(example())
loop.close()

DONE connect_url

import asyncio
from nats.aio.client import Client as NATS

async def example():
   # [begin connect_url]
   nc = NATS()
   await nc.connect(servers=["nats://demo.nats.io:4222"])

   # Do something with the connection

   await nc.close()

   # [end connect_url]

loop = asyncio.get_event_loop()
loop.run_until_complete(example())
loop.close()

COMMENT connect_options

import asyncio
from nats.aio.client import Client as NATS

async def example():
   # [begin connect_url]
   nc = NATS()
   await nc.connect(servers=["nats://demo.nats.io:4222"])

   # Do something with the connection

   await nc.close()

   # [end connect_url]

loop = asyncio.get_event_loop()
loop.run_until_complete(example())
loop.close()

DONE connect_multiple

import asyncio
from nats.aio.client import Client as NATS

async def example():
   # [begin connect_multiple]
   nc = NATS()
   await nc.connect(servers=[
      "nats://127.0.0.1:1222",
      "nats://127.0.0.1:1223",
      "nats://127.0.0.1:1224"
      ])

   # Do something with the connection

   await nc.close()

   # [end connect_multiple]

loop = asyncio.get_event_loop()
loop.run_until_complete(example())
loop.close()

DONE reconnect_no_random

import asyncio
from nats.aio.client import Client as NATS

async def example():
   # [begin reconnect_no_random]
   nc = NATS()
   await nc.connect(
      servers=[
         "nats://127.0.0.1:1222",
         "nats://127.0.0.1:1223",
         "nats://127.0.0.1:1224"
         ],
      dont_randomize=True,
      )

   # Do something with the connection

   await nc.close()

   # [end reconnect_no_random]

loop = asyncio.get_event_loop()
loop.run_until_complete(example())
loop.close()

DONE reconnect_none

import asyncio
from nats.aio.client import Client as NATS

async def example():
   # [begin reconnect_none]
   nc = NATS()
   await nc.connect(
      servers=[
         "nats://127.0.0.1:1222",
         "nats://127.0.0.1:1223",
         "nats://127.0.0.1:1224"
         ],
      allow_reconnect=False,
      )

   # Do something with the connection

   await nc.close()

   # [end reconnect_none]

loop = asyncio.get_event_loop()
loop.run_until_complete(example())
loop.close()

DONE reconnect_10x

import asyncio
from nats.aio.client import Client as NATS

async def example():
   # [begin reconnect_10x]
   nc = NATS()
   await nc.connect(
      servers=["nats://demo.nats.io:4222"],
      max_reconnect_attempts=10,
      )

   # Do something with the connection

   await nc.close()

   # [end reconnect_10x]

loop = asyncio.get_event_loop()
loop.run_until_complete(example())
loop.close()

COMMENT reconnect_5mb

import asyncio
from nats.aio.client import Client as NATS

async def example():
   # [begin reconnect_5mb]
   nc = NATS()
   await nc.connect(
      servers=["nats://demo.nats.io:4222"],
      max_reconnect_attempts=10,
      )

   # Do something with the connection

   await nc.close()

   # [end reconnect_5mb]

loop = asyncio.get_event_loop()
loop.run_until_complete(example())
loop.close()

DONE reconnect_10s

import asyncio
from nats.aio.client import Client as NATS

async def example():
   # [begin reconnect_10s]
   nc = NATS()
   await nc.connect(
      servers=["nats://demo.nats.io:4222"],
      reconnect_time_wait=10,
      )

   # Do something with the connection

   await nc.close()

   # [end reconnect_10s]

loop = asyncio.get_event_loop()
loop.run_until_complete(example())
loop.close()

DONE reconnect_event

import asyncio
from nats.aio.client import Client as NATS

async def example():

   # [begin reconnect_event]
   nc = NATS()

   async def disconnected_cb():
      print("Got disconnected!")

   async def reconnected_cb():
      # See who we are connected to on reconnect.
      print("Got reconnected to {url}".format(url=nc.connected_url.netloc))

   await nc.connect(
      servers=["nats://127.0.0.1:4222"],
      reconnect_time_wait=10,
      reconnected_cb=reconnected_cb,
      disconnected_cb=disconnected_cb,
      )

   # Do something with the connection.

   # [end reconnect_event]

   while True:
     if nc.is_closed:
       break
     await asyncio.sleep(1)

   await nc.close()

loop = asyncio.get_event_loop()
loop.run_until_complete(example())
loop.close()

DONE ping_20s

import asyncio
from nats.aio.client import Client as NATS

async def example():

   # [begin ping_20s]
   nc = NATS()

   await nc.connect(
      servers=["nats://demo.nats.io:4222"],
      # Set Ping Interval to 20 seconds
      ping_interval=20,
      )

   # Do something with the connection.

   # [end ping_20s]

   while True:
     if nc.is_closed:
       break
     await asyncio.sleep(1)

   await nc.close()

loop = asyncio.get_event_loop()
loop.run_until_complete(example())
loop.close()

DONE ping_5

import asyncio
from nats.aio.client import Client as NATS

async def example():

   # [begin ping_5]
   nc = NATS()

   await nc.connect(
      servers=["nats://127.0.0.1:4222"],
      # Set maximum number of PINGs out without getting a PONG back
      # before the connection will be disconnected as a stale connection.
      max_outstanding_pings=5,
      ping_interval=1,
      )

   # Do something with the connection.

   # [end ping_5]

   while True:
     if nc.is_closed:
       break
     await asyncio.sleep(1)

   await nc.close()

loop = asyncio.get_event_loop()
loop.run_until_complete(example())
loop.close()

DONE max_payload

import asyncio
from nats.aio.client import Client as NATS

async def example():

   # [begin max_payload]
   nc = NATS()

   await nc.connect(servers=["nats://demo.nats.io:4222"])

   print("Maximum payload is %d bytes" % nc.max_payload)

   # Do something with the max payload.

   # [end max_payload]

   await nc.close()

loop = asyncio.get_event_loop()
loop.run_until_complete(example())
loop.close()
Maximum payload is 1048576 bytes

COMMENT control_2k

COMMENT no_echo

DONE connect_pedantic

import asyncio
from nats.aio.client import Client as NATS

async def example():

   # [begin connect_pedantic]
   nc = NATS()

   await nc.connect(servers=["nats://127.0.0.1:4222"], pedantic=True)

   # Do something with the connection.

   # [end connect_pedantic]

   await nc.close()

loop = asyncio.get_event_loop()
loop.run_until_complete(example())
loop.close()

DONE connect_verbose

import asyncio
from nats.aio.client import Client as NATS

async def example():

   # [begin connect_verbose]
   nc = NATS()

   await nc.connect(servers=["nats://127.0.0.1:4222"], verbose=True)

   # Do something with the connection.

   # [end connect_verbose]

   await nc.close()

loop = asyncio.get_event_loop()
loop.run_until_complete(example())
loop.close()

DONE connect_name

import asyncio
from nats.aio.client import Client as NATS

async def example():

   # [begin connect_name]
   nc = NATS()

   await nc.connect(servers=["nats://127.0.0.1:4222"], name="my-connection")

   # Do something with the connection.

   # [end connect_name]

   await nc.close()

loop = asyncio.get_event_loop()
loop.run_until_complete(example())
loop.close()

DONE connect_tls

import asyncio
import ssl
from nats.aio.client import Client as NATS

async def example():

   # [begin connect_tls]
   nc = NATS()

   ssl_ctx = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH)
   ssl_ctx.load_verify_locations('ca.pem')
   ssl_ctx.load_cert_chain(certfile='client-cert.pem',
                           keyfile='client-key.pem')
   await nc.connect(io_loop=loop, tls=ssl_ctx)

   await nc.connect(servers=["nats://127.0.0.1:4222"], tls=ssl_ctx)

   # Do something with the connection.

   # [end connect_tls]

   await nc.close()

loop = asyncio.get_event_loop()
loop.run_until_complete(example())
loop.close()

COMMENT connect_tls_url

DONE connect_userpass

import asyncio
from nats.aio.client import Client as NATS

async def example():

   # [begin connect_userpass]
   nc = NATS()

   await nc.connect(servers=["nats://myname:password@127.0.0.1:4222"])

   # Do something with the connection.

   # [end connect_userpass]

   await nc.close()

loop = asyncio.get_event_loop()
loop.run_until_complete(example())
loop.close()

DONE connect_userpass_url

import asyncio
from nats.aio.client import Client as NATS

async def example():

   # [begin connect_userpass_url]
   nc = NATS()

   await nc.connect(servers=["nats://myname:password@127.0.0.1:4222"])

   # Do something with the connection.

   # [end connect_userpass_url]

   await nc.close()

loop = asyncio.get_event_loop()
loop.run_until_complete(example())
loop.close()

DONE connect_token

import asyncio
from nats.aio.client import Client as NATS

async def example():

   # [begin connect_token]
   nc = NATS()

   await nc.connect(servers=["nats://mytoken@127.0.0.1:4222"])

   # Do something with the connection.

   # [end connect_token]

   await nc.close()

loop = asyncio.get_event_loop()
loop.run_until_complete(example())
loop.close()

DONE connect_token_url

import asyncio
from nats.aio.client import Client as NATS

async def example():

   # [begin connect_token_url]
   nc = NATS()

   await nc.connect(servers=["nats://mytoken@127.0.0.1:4222"])

   # Do something with the connection.

   # [end connect_token_url]

   await nc.close()

loop = asyncio.get_event_loop()
loop.run_until_complete(example())
loop.close()

DONE publish_bytes

import asyncio
from nats.aio.client import Client as NATS

async def example():

   # [begin publish_bytes]
   nc = NATS()

   await nc.connect(servers=["nats://127.0.0.1:4222"])

   await nc.publish("updates", b'All is Well')

   # [end publish_bytes]

   await nc.close()

loop = asyncio.get_event_loop()
loop.run_until_complete(example())
loop.close()

DONE publish_json

import asyncio
import json
from nats.aio.client import Client as NATS

async def example():

   # [begin publish_json]
   nc = NATS()

   await nc.connect(servers=["nats://127.0.0.1:4222"])

   await nc.publish("updates", json.dumps({"symbol": "GOOG", "price": 1200 }).encode())

   # [end publish_json]

   await nc.close()

loop = asyncio.get_event_loop()
loop.run_until_complete(example())
loop.close()

DONE publish_with_reply

import asyncio
import json
from nats.aio.client import Client as NATS
from nats.aio.utils import new_inbox

async def example():

   # [begin publish_with_reply]
   nc = NATS()

   future = asyncio.Future()

   async def sub(msg):
     nonlocal future
     future.set_result(msg)

   await nc.connect(servers=["nats://127.0.0.1:4222"])
   await nc.subscribe("time", cb=sub)

   unique_reply_to = new_inbox()
   await nc.publish_request("time", unique_reply_to, b'')

   # Use the response
   msg = await asyncio.wait_for(future, 1)
   print("Reply:", msg)

   # [end publish_with_reply]

   await nc.close()

loop = asyncio.get_event_loop()
loop.run_until_complete(example())
loop.close()
Reply: <Msg: subject='time' reply='_INBOX.a91d4fcb94225c12419632fb4' data='...'>

DONE request_reply

import asyncio
import json
from nats.aio.client import Client as NATS
from nats.aio.utils import new_inbox

async def example():

   # [begin request_reply]
   nc = NATS()

   async def sub(msg):
     await nc.publish(msg.reply, b'response')

   await nc.connect(servers=["nats://127.0.0.1:4222"])
   await nc.subscribe("time", cb=sub)

   # Send the request
   try:
     msg = await nc.request("time", b'', timeout=1)
     # Use the response
     print("Reply:", msg)
   except asyncio.TimeoutError:
     print("Timed out waiting for response")

   # [end request_reply]

   await nc.close()

loop = asyncio.get_event_loop()
loop.run_until_complete(example())
loop.close()
Reply: <Msg: subject='_INBOX.fh3njTclrpUDMuSj6ntDwz.fh3njTclrpUDjySj6ntDwz' reply='' data='response...'>

DONE flush

import asyncio
from nats.aio.client import Client as NATS

async def example():

   # [begin flush]
   nc = NATS()

   await nc.connect(servers=["nats://127.0.0.1:4222"])

   await nc.publish("updates", b'All is Well')

   # Sends a PING and wait for a PONG from the server, up to the given timeout.
   # This gives guarantee that the server has processed above message.
   await nc.flush(timeout=1)

   # [end flush]

   await nc.close()

loop = asyncio.get_event_loop()
loop.run_until_complete(example())
loop.close()

COMMENT subscribe_sync

DONE subscribe_async

import asyncio
from nats.aio.client import Client as NATS

async def example():

   # [begin subscribe_async]
   nc = NATS()

   await nc.connect(servers=["nats://127.0.0.1:4222"])

   future = asyncio.Future()

   async def cb(msg):
     nonlocal future
     future.set_result(msg)

   await nc.subscribe("updates", cb=cb)
   await nc.publish("updates", b'All is Well')
   await nc.flush()

   # Wait for message to come in
   msg = await asyncio.wait_for(future, 1)

   # [end subscribe_async]
   print(msg)

   await nc.close()

loop = asyncio.get_event_loop()
loop.run_until_complete(example())
loop.close()

DONE subscribe_w_reply

import asyncio
from nats.aio.client import Client as NATS
from nats.aio.utils import new_inbox
from datetime import datetime

async def example():

   # [begin subscribe_w_reply]
   nc = NATS()

   await nc.connect(servers=["nats://127.0.0.1:4222"])

   future = asyncio.Future()

   async def cb(msg):
     nonlocal future
     future.set_result(msg)

   await nc.subscribe("time", cb=cb)

   await nc.publish_request("time", new_inbox(), b'What is the time?')
   await nc.flush()

   # Read the message
   msg = await asyncio.wait_for(future, 1)

   # Send the time
   time_as_bytes = "{}".format(datetime.now()).encode()
   await nc.publish(msg.reply, time_as_bytes)

   # [end subscribe_w_reply]

   await nc.close()

loop = asyncio.get_event_loop()
loop.run_until_complete(example())
loop.close()
2018-08-12 23:41:28.615782

DONE unsubscribe

import asyncio
from nats.aio.client import Client as NATS

async def example():

   # [begin unsubscribe]
   nc = NATS()

   await nc.connect(servers=["nats://127.0.0.1:4222"])

   future = asyncio.Future()

   async def cb(msg):
     nonlocal future
     future.set_result(msg)

   sid = await nc.subscribe("updates", cb=cb)
   await nc.publish("updates", b'All is Well')

   # Remove interest in subject
   await nc.unsubscribe(sid)

   # Won't be received...
   await nc.publish("updates", b'...')

   # [end unsubscribe]

   await nc.close()

loop = asyncio.get_event_loop()
loop.run_until_complete(example())
loop.close()

DONE unsubscribe_auto

import asyncio
from nats.aio.client import Client as NATS

async def example():

   # [begin unsubscribe_auto]
   nc = NATS()

   await nc.connect(servers=["nats://127.0.0.1:4222"])

   async def cb(msg):
     print(msg)

   sid = await nc.subscribe("updates", cb=cb)
   await nc.auto_unsubscribe(sid, 1)
   await nc.publish("updates", b'All is Well')

   # Won't be received...
   await nc.publish("updates", b'...')

   # [end unsubscribe_auto]

   await asyncio.sleep(1)

   await nc.close()

loop = asyncio.get_event_loop()
loop.run_until_complete(example())
loop.close()

COMMENT subscribe_json

import asyncio
import json
from nats.aio.client import Client as NATS
from nats.aio.utils import new_inbox

async def example():

   # [begin subscribe_json]
   nc = NATS()

   async def sub(msg):
     print(msg)
     # await nc.publish(msg.reply, b'response')

   await nc.connect(servers=["nats://127.0.0.1:4222"])
   await nc.subscribe("updates", cb=sub)

   # Send the request
   try:
     msg = await nc.request("time", b'', timeout=1)
     # Use the response
     print("Reply:", msg)
   except asyncio.TimeoutError:
     print("Timed out waiting for response")

   # [end subscribe_json]

   await nc.close()

loop = asyncio.get_event_loop()
loop.run_until_complete(example())
loop.close()

DONE subscribe_star

import asyncio
from nats.aio.client import Client as NATS

async def example():

   # [begin subscribe_star]
   nc = NATS()

   await nc.connect(servers=["nats://127.0.0.1:4222"])

   # Use queue to wait for 2 messages to arrive
   queue = asyncio.Queue()
   async def cb(msg):
     await queue.put_nowait(msg)

   await nc.subscribe("time.*.east", cb=cb)

   # Send 2 messages and wait for them to come in
   await nc.publish("time.A.east", b'A')
   await nc.publish("time.B.east", b'B')

   msg_A = await queue.get()
   msg_B = await queue.get()

   print("Msg A:", msg_A)
   print("Msg B:", msg_B)

   # [end subscribe_star]

   await nc.close()

loop = asyncio.get_event_loop()
loop.run_until_complete(example())
loop.close()

DONE subscribe_arrow

import asyncio
from nats.aio.client import Client as NATS

async def example():

   # [begin subscribe_arrow]
   nc = NATS()

   await nc.connect(servers=["nats://127.0.0.1:4222"])

   # Use queue to wait for 4 messages to arrive
   queue = asyncio.Queue()
   async def cb(msg):
     await queue.put(msg)

   await nc.subscribe("time.>", cb=cb)

   # Send 2 messages and wait for them to come in
   await nc.publish("time.A.east", b'A')
   await nc.publish("time.B.east", b'B')
   await nc.publish("time.C.west", b'C')
   await nc.publish("time.D.west", b'D')

   for i in range(0, 4):
     msg = await queue.get()
     print("Msg:", msg)

   await nc.close()

   # [end subscribe_arrow]

loop = asyncio.get_event_loop()
loop.run_until_complete(example())
loop.close()

DONE subscribe_queue

import asyncio
from nats.aio.client import Client as NATS

async def example():

   # [begin subscribe_queue]
   nc = NATS()

   await nc.connect(servers=["nats://127.0.0.1:4222"])

   future = asyncio.Future()

   async def cb(msg):
     nonlocal future
     future.set_result(msg)

   await nc.subscribe("updates", queue="workers", cb=cb)
   await nc.publish("updates", b'All is Well')

   msg = await asyncio.wait_for(future, 1)
   print("Msg", msg)

   # [end subscribe_queue]

   await nc.close()

loop = asyncio.get_event_loop()
loop.run_until_complete(example())
loop.close()

DONE wildcard_tester

import asyncio
from nats.aio.client import Client as NATS

async def example():

   # [begin wildcard_tester]
   nc = NATS()

   await nc.connect(servers=["nats://127.0.0.1:4222"])

   await nc.publish("time.us.east", b'...')
   await nc.publish("time.us.east.atlanta", b'...')

   await nc.publish("time.eu.east", b'...')
   await nc.publish("time.eu.east.warsaw", b'...')

   await nc.close()

   # [end wildcard_tester]

loop = asyncio.get_event_loop()
loop.run_until_complete(example())
loop.close()

COMMENT connection_listener

COMMENT servers_added

DONE error_listener

import asyncio
from nats.aio.client import Client as NATS

async def example():

   # [begin error_listener]
   nc = NATS()

   async def error_cb(e):
      print("Error: ", e)

   await nc.connect(
      servers=["nats://127.0.0.1:4222"],
      reconnect_time_wait=10,
      error_cb=error_cb,
      )

   # Do something with the connection.

   # [end error_listener]

   while True:
     if nc.is_closed:
       break
     await asyncio.sleep(1)

   await nc.close()

loop = asyncio.get_event_loop()
loop.run_until_complete(example())
loop.close()

DONE connect_status

import asyncio
from nats.aio.client import Client as NATS

async def example():

   # [begin connect_status]
   nc = NATS()

   await nc.connect(
      servers=["nats://127.0.0.1:4222"],
      )

   # Do something with the connection.

   print("The connection is connected?", nc.is_connected)

   while True:
     if nc.is_reconnecting:
       print("Reconnecting to NATS...")
       break
     await asyncio.sleep(1)

   await nc.close()

   print("The connection is closed?", nc.is_closed)

   # [end connect_status]

loop = asyncio.get_event_loop()
loop.run_until_complete(example())
loop.close()

COMMENT slow_pending_limits

import asyncio
from nats.aio.client import Client as NATS

async def example():

   # [begin slow_pending_limits]
   nc = NATS()

   await nc.connect(
      servers=["nats://127.0.0.1:4222"],
      )

   # Do something with the connection.

   print("The connection is connected?", nc.is_connected)

   while True:
     if nc.is_reconnecting:
       print("Reconnecting to NATS...")
       break
     await asyncio.sleep(1)

   await nc.close()

   print("The connection is closed?", nc.is_closed)

   # [end slow_pending_limits]

loop = asyncio.get_event_loop()
loop.run_until_complete(example())
loop.close()

DONE slow_listener

import asyncio
import nats.aio.errors
from nats.aio.client import Client as NATS

async def example():

   # [begin slow_listener]

   nc = NATS()

   async def error_cb(e):
     if type(e) is nats.aio.errors.ErrSlowConsumer:
       print("Slow consumer error, unsubscribing from handling further messages...")
       await nc.unsubscribe(e.sid)

   await nc.connect(
      servers=["nats://127.0.0.1:4222"],
      error_cb=error_cb,
      )

   msgs = []
   future = asyncio.Future()
   async def cb(msg):
       nonlocal msgs
       nonlocal future
       print(msg)
       msgs.append(msg)

       if len(msgs) == 3:
         # Head of line blocking on other messages caused
	 # by single message proccesing taking long...
         await asyncio.sleep(1)

   await nc.subscribe("updates", cb=cb, pending_msgs_limit=5)

   for i in range(0, 10):
     await nc.publish("updates", "msg #{}".format(i).encode())
     await asyncio.sleep(0)

   try:
     await asyncio.wait_for(future, 1)
   except asyncio.TimeoutError:
     pass

   for msg in msgs:
     print("[Received]", msg)

   await nc.close()

   # [end slow_listener]

loop = asyncio.get_event_loop()
loop.run_until_complete(example())
loop.close()