#+TITLE: Python Examples #+startup: showeverything #+property: header-args :results output ** COMMENT Notes #+BEGIN_SRC emacs-lisp (setq org-babel-python-command "python3") #+END_SRC ** [35/35] [100%] Progress *** DONE connect_default #+BEGIN_SRC python :tangle connect_default.py import asyncio from nats.aio.client import Client as NATS async def example(): # [begin connect_default] nc = NATS() await nc.connect() # Do something with the connection await nc.close() # [end connect_default] loop = asyncio.get_event_loop() loop.run_until_complete(example()) loop.close() #+END_SRC #+RESULTS: *** DONE connect_url #+BEGIN_SRC python :tangle connect_url.py import asyncio from nats.aio.client import Client as NATS async def example(): # [begin connect_url] nc = NATS() await nc.connect(servers=["nats://demo.nats.io:4222"]) # Do something with the connection await nc.close() # [end connect_url] loop = asyncio.get_event_loop() loop.run_until_complete(example()) loop.close() #+END_SRC #+RESULTS: *** COMMENT connect_options #+BEGIN_SRC python :tangle connect_url.py import asyncio from nats.aio.client import Client as NATS async def example(): # [begin connect_url] nc = NATS() await nc.connect(servers=["nats://demo.nats.io:4222"]) # Do something with the connection await nc.close() # [end connect_url] loop = asyncio.get_event_loop() loop.run_until_complete(example()) loop.close() #+END_SRC *** DONE connect_multiple #+BEGIN_SRC python :tangle connect_multiple.py import asyncio from nats.aio.client import Client as NATS async def example(): # [begin connect_multiple] nc = NATS() await nc.connect(servers=[ "nats://127.0.0.1:1222", "nats://127.0.0.1:1223", "nats://127.0.0.1:1224" ]) # Do something with the connection await nc.close() # [end connect_multiple] loop = asyncio.get_event_loop() loop.run_until_complete(example()) loop.close() #+END_SRC #+RESULTS: *** DONE reconnect_no_random #+BEGIN_SRC python :tangle reconnect_no_random.py import asyncio from nats.aio.client import Client as NATS async def example(): # [begin reconnect_no_random] nc = NATS() await nc.connect( servers=[ "nats://127.0.0.1:1222", "nats://127.0.0.1:1223", "nats://127.0.0.1:1224" ], dont_randomize=True, ) # Do something with the connection await nc.close() # [end reconnect_no_random] loop = asyncio.get_event_loop() loop.run_until_complete(example()) loop.close() #+END_SRC #+RESULTS: *** DONE reconnect_none #+BEGIN_SRC python :tangle reconnect_none.py import asyncio from nats.aio.client import Client as NATS async def example(): # [begin reconnect_none] nc = NATS() await nc.connect( servers=[ "nats://127.0.0.1:1222", "nats://127.0.0.1:1223", "nats://127.0.0.1:1224" ], allow_reconnect=False, ) # Do something with the connection await nc.close() # [end reconnect_none] loop = asyncio.get_event_loop() loop.run_until_complete(example()) loop.close() #+END_SRC #+RESULTS: *** DONE reconnect_10x #+BEGIN_SRC python :tangle reconnect_10x.py import asyncio from nats.aio.client import Client as NATS async def example(): # [begin reconnect_10x] nc = NATS() await nc.connect( servers=["nats://demo.nats.io:4222"], max_reconnect_attempts=10, ) # Do something with the connection await nc.close() # [end reconnect_10x] loop = asyncio.get_event_loop() loop.run_until_complete(example()) loop.close() #+END_SRC #+RESULTS: *** COMMENT reconnect_5mb #+BEGIN_SRC python :tangle reconnect_5mb.py import asyncio from nats.aio.client import Client as NATS async def example(): # [begin reconnect_5mb] nc = NATS() await nc.connect( servers=["nats://demo.nats.io:4222"], max_reconnect_attempts=10, ) # Do something with the connection await nc.close() # [end reconnect_5mb] loop = asyncio.get_event_loop() loop.run_until_complete(example()) loop.close() #+END_SRC *** DONE reconnect_10s #+BEGIN_SRC python :tangle reconnect_10s.py import asyncio from nats.aio.client import Client as NATS async def example(): # [begin reconnect_10s] nc = NATS() await nc.connect( servers=["nats://demo.nats.io:4222"], reconnect_time_wait=10, ) # Do something with the connection await nc.close() # [end reconnect_10s] loop = asyncio.get_event_loop() loop.run_until_complete(example()) loop.close() #+END_SRC #+RESULTS: *** DONE reconnect_event #+BEGIN_SRC python :tangle reconnect_event.py import asyncio from nats.aio.client import Client as NATS async def example(): # [begin reconnect_event] nc = NATS() async def disconnected_cb(): print("Got disconnected!") async def reconnected_cb(): # See who we are connected to on reconnect. print("Got reconnected to {url}".format(url=nc.connected_url.netloc)) await nc.connect( servers=["nats://127.0.0.1:4222"], reconnect_time_wait=10, reconnected_cb=reconnected_cb, disconnected_cb=disconnected_cb, ) # Do something with the connection. # [end reconnect_event] while True: if nc.is_closed: break await asyncio.sleep(1) await nc.close() loop = asyncio.get_event_loop() loop.run_until_complete(example()) loop.close() #+END_SRC *** DONE ping_20s #+BEGIN_SRC python :tangle ping_20s.py import asyncio from nats.aio.client import Client as NATS async def example(): # [begin ping_20s] nc = NATS() await nc.connect( servers=["nats://demo.nats.io:4222"], # Set Ping Interval to 20 seconds ping_interval=20, ) # Do something with the connection. # [end ping_20s] while True: if nc.is_closed: break await asyncio.sleep(1) await nc.close() loop = asyncio.get_event_loop() loop.run_until_complete(example()) loop.close() #+END_SRC *** DONE ping_5 #+BEGIN_SRC python :tangle ping_5.py import asyncio from nats.aio.client import Client as NATS async def example(): # [begin ping_5] nc = NATS() await nc.connect( servers=["nats://127.0.0.1:4222"], # Set maximum number of PINGs out without getting a PONG back # before the connection will be disconnected as a stale connection. max_outstanding_pings=5, ping_interval=1, ) # Do something with the connection. # [end ping_5] while True: if nc.is_closed: break await asyncio.sleep(1) await nc.close() loop = asyncio.get_event_loop() loop.run_until_complete(example()) loop.close() #+END_SRC *** DONE max_payload #+BEGIN_SRC python :tangle max_payload.py import asyncio from nats.aio.client import Client as NATS async def example(): # [begin max_payload] nc = NATS() await nc.connect(servers=["nats://demo.nats.io:4222"]) print("Maximum payload is %d bytes" % nc.max_payload) # Do something with the max payload. # [end max_payload] await nc.close() loop = asyncio.get_event_loop() loop.run_until_complete(example()) loop.close() #+END_SRC #+RESULTS: : Maximum payload is 1048576 bytes *** COMMENT control_2k *** COMMENT no_echo *** DONE connect_pedantic #+BEGIN_SRC python :tangle connect_pedantic.py import asyncio from nats.aio.client import Client as NATS async def example(): # [begin connect_pedantic] nc = NATS() await nc.connect(servers=["nats://127.0.0.1:4222"], pedantic=True) # Do something with the connection. # [end connect_pedantic] await nc.close() loop = asyncio.get_event_loop() loop.run_until_complete(example()) loop.close() #+END_SRC #+RESULTS: *** DONE connect_verbose #+BEGIN_SRC python :tangle connect_verbose.py import asyncio from nats.aio.client import Client as NATS async def example(): # [begin connect_verbose] nc = NATS() await nc.connect(servers=["nats://127.0.0.1:4222"], verbose=True) # Do something with the connection. # [end connect_verbose] await nc.close() loop = asyncio.get_event_loop() loop.run_until_complete(example()) loop.close() #+END_SRC #+RESULTS: *** DONE connect_name #+BEGIN_SRC python :tangle connect_name.py import asyncio from nats.aio.client import Client as NATS async def example(): # [begin connect_name] nc = NATS() await nc.connect(servers=["nats://127.0.0.1:4222"], name="my-connection") # Do something with the connection. # [end connect_name] await nc.close() loop = asyncio.get_event_loop() loop.run_until_complete(example()) loop.close() #+END_SRC #+RESULTS: *** DONE connect_tls #+BEGIN_SRC python :tangle connect_tls.py import asyncio import ssl from nats.aio.client import Client as NATS async def example(): # [begin connect_tls] nc = NATS() ssl_ctx = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH) ssl_ctx.load_verify_locations('ca.pem') ssl_ctx.load_cert_chain(certfile='client-cert.pem', keyfile='client-key.pem') await nc.connect(io_loop=loop, tls=ssl_ctx) await nc.connect(servers=["nats://127.0.0.1:4222"], tls=ssl_ctx) # Do something with the connection. # [end connect_tls] await nc.close() loop = asyncio.get_event_loop() loop.run_until_complete(example()) loop.close() #+END_SRC *** COMMENT connect_tls_url *** DONE connect_userpass #+BEGIN_SRC python :tangle connect_userpass.py import asyncio from nats.aio.client import Client as NATS async def example(): # [begin connect_userpass] nc = NATS() await nc.connect(servers=["nats://myname:password@127.0.0.1:4222"]) # Do something with the connection. # [end connect_userpass] await nc.close() loop = asyncio.get_event_loop() loop.run_until_complete(example()) loop.close() #+END_SRC *** DONE connect_userpass_url #+BEGIN_SRC python :tangle connect_userpass_url.py import asyncio from nats.aio.client import Client as NATS async def example(): # [begin connect_userpass_url] nc = NATS() await nc.connect(servers=["nats://myname:password@127.0.0.1:4222"]) # Do something with the connection. # [end connect_userpass_url] await nc.close() loop = asyncio.get_event_loop() loop.run_until_complete(example()) loop.close() #+END_SRC #+RESULTS: *** DONE connect_token #+BEGIN_SRC python :tangle connect_token.py import asyncio from nats.aio.client import Client as NATS async def example(): # [begin connect_token] nc = NATS() await nc.connect(servers=["nats://mytoken@127.0.0.1:4222"]) # Do something with the connection. # [end connect_token] await nc.close() loop = asyncio.get_event_loop() loop.run_until_complete(example()) loop.close() #+END_SRC #+RESULTS: *** DONE connect_token_url #+BEGIN_SRC python :tangle connect_token_url.py import asyncio from nats.aio.client import Client as NATS async def example(): # [begin connect_token_url] nc = NATS() await nc.connect(servers=["nats://mytoken@127.0.0.1:4222"]) # Do something with the connection. # [end connect_token_url] await nc.close() loop = asyncio.get_event_loop() loop.run_until_complete(example()) loop.close() #+END_SRC *** DONE publish_bytes #+BEGIN_SRC python :tangle publish_bytes.py import asyncio from nats.aio.client import Client as NATS async def example(): # [begin publish_bytes] nc = NATS() await nc.connect(servers=["nats://127.0.0.1:4222"]) await nc.publish("updates", b'All is Well') # [end publish_bytes] await nc.close() loop = asyncio.get_event_loop() loop.run_until_complete(example()) loop.close() #+END_SRC #+RESULTS: *** DONE publish_json #+BEGIN_SRC python :tangle publish_json.py import asyncio import json from nats.aio.client import Client as NATS async def example(): # [begin publish_json] nc = NATS() await nc.connect(servers=["nats://127.0.0.1:4222"]) await nc.publish("updates", json.dumps({"symbol": "GOOG", "price": 1200 }).encode()) # [end publish_json] await nc.close() loop = asyncio.get_event_loop() loop.run_until_complete(example()) loop.close() #+END_SRC #+RESULTS: *** DONE publish_with_reply #+BEGIN_SRC python :tangle publish_with_reply.py import asyncio import json from nats.aio.client import Client as NATS from nats.aio.utils import new_inbox async def example(): # [begin publish_with_reply] nc = NATS() future = asyncio.Future() async def sub(msg): nonlocal future future.set_result(msg) await nc.connect(servers=["nats://127.0.0.1:4222"]) await nc.subscribe("time", cb=sub) unique_reply_to = new_inbox() await nc.publish_request("time", unique_reply_to, b'') # Use the response msg = await asyncio.wait_for(future, 1) print("Reply:", msg) # [end publish_with_reply] await nc.close() loop = asyncio.get_event_loop() loop.run_until_complete(example()) loop.close() #+END_SRC #+RESULTS: : Reply: *** DONE request_reply #+BEGIN_SRC python :tangle request_reply.py import asyncio import json from nats.aio.client import Client as NATS from nats.aio.utils import new_inbox async def example(): # [begin request_reply] nc = NATS() async def sub(msg): await nc.publish(msg.reply, b'response') await nc.connect(servers=["nats://127.0.0.1:4222"]) await nc.subscribe("time", cb=sub) # Send the request try: msg = await nc.request("time", b'', timeout=1) # Use the response print("Reply:", msg) except asyncio.TimeoutError: print("Timed out waiting for response") # [end request_reply] await nc.close() loop = asyncio.get_event_loop() loop.run_until_complete(example()) loop.close() #+END_SRC #+RESULTS: : Reply: *** DONE flush #+BEGIN_SRC python :tangle flush.py import asyncio from nats.aio.client import Client as NATS async def example(): # [begin flush] nc = NATS() await nc.connect(servers=["nats://127.0.0.1:4222"]) await nc.publish("updates", b'All is Well') # Sends a PING and wait for a PONG from the server, up to the given timeout. # This gives guarantee that the server has processed above message. await nc.flush(timeout=1) # [end flush] await nc.close() loop = asyncio.get_event_loop() loop.run_until_complete(example()) loop.close() #+END_SRC *** COMMENT subscribe_sync *** DONE subscribe_async #+BEGIN_SRC python :tangle subscribe_async.py import asyncio from nats.aio.client import Client as NATS async def example(): # [begin subscribe_async] nc = NATS() await nc.connect(servers=["nats://127.0.0.1:4222"]) future = asyncio.Future() async def cb(msg): nonlocal future future.set_result(msg) await nc.subscribe("updates", cb=cb) await nc.publish("updates", b'All is Well') await nc.flush() # Wait for message to come in msg = await asyncio.wait_for(future, 1) # [end subscribe_async] print(msg) await nc.close() loop = asyncio.get_event_loop() loop.run_until_complete(example()) loop.close() #+END_SRC #+RESULTS: *** DONE subscribe_w_reply #+BEGIN_SRC python :tangle subscribe_w_reply.py import asyncio from nats.aio.client import Client as NATS from nats.aio.utils import new_inbox from datetime import datetime async def example(): # [begin subscribe_w_reply] nc = NATS() await nc.connect(servers=["nats://127.0.0.1:4222"]) future = asyncio.Future() async def cb(msg): nonlocal future future.set_result(msg) await nc.subscribe("time", cb=cb) await nc.publish_request("time", new_inbox(), b'What is the time?') await nc.flush() # Read the message msg = await asyncio.wait_for(future, 1) # Send the time time_as_bytes = "{}".format(datetime.now()).encode() await nc.publish(msg.reply, time_as_bytes) # [end subscribe_w_reply] await nc.close() loop = asyncio.get_event_loop() loop.run_until_complete(example()) loop.close() #+END_SRC #+RESULTS: : 2018-08-12 23:41:28.615782 *** DONE unsubscribe #+BEGIN_SRC python :tangle unsubscribe.py import asyncio from nats.aio.client import Client as NATS async def example(): # [begin unsubscribe] nc = NATS() await nc.connect(servers=["nats://127.0.0.1:4222"]) future = asyncio.Future() async def cb(msg): nonlocal future future.set_result(msg) sid = await nc.subscribe("updates", cb=cb) await nc.publish("updates", b'All is Well') # Remove interest in subject await nc.unsubscribe(sid) # Won't be received... await nc.publish("updates", b'...') # [end unsubscribe] await nc.close() loop = asyncio.get_event_loop() loop.run_until_complete(example()) loop.close() #+END_SRC *** DONE unsubscribe_auto #+BEGIN_SRC python :tangle unsubscribe_auto.py import asyncio from nats.aio.client import Client as NATS async def example(): # [begin unsubscribe_auto] nc = NATS() await nc.connect(servers=["nats://127.0.0.1:4222"]) async def cb(msg): print(msg) sid = await nc.subscribe("updates", cb=cb) await nc.auto_unsubscribe(sid, 1) await nc.publish("updates", b'All is Well') # Won't be received... await nc.publish("updates", b'...') # [end unsubscribe_auto] await asyncio.sleep(1) await nc.close() loop = asyncio.get_event_loop() loop.run_until_complete(example()) loop.close() #+END_SRC *** COMMENT subscribe_json #+BEGIN_SRC python :tangle subscribe_json.py import asyncio import json from nats.aio.client import Client as NATS from nats.aio.utils import new_inbox async def example(): # [begin subscribe_json] nc = NATS() async def sub(msg): print(msg) # await nc.publish(msg.reply, b'response') await nc.connect(servers=["nats://127.0.0.1:4222"]) await nc.subscribe("updates", cb=sub) # Send the request try: msg = await nc.request("time", b'', timeout=1) # Use the response print("Reply:", msg) except asyncio.TimeoutError: print("Timed out waiting for response") # [end subscribe_json] await nc.close() loop = asyncio.get_event_loop() loop.run_until_complete(example()) loop.close() #+END_SRC *** DONE subscribe_star #+BEGIN_SRC python :tangle subscribe_star.py import asyncio from nats.aio.client import Client as NATS async def example(): # [begin subscribe_star] nc = NATS() await nc.connect(servers=["nats://127.0.0.1:4222"]) # Use queue to wait for 2 messages to arrive queue = asyncio.Queue() async def cb(msg): await queue.put_nowait(msg) await nc.subscribe("time.*.east", cb=cb) # Send 2 messages and wait for them to come in await nc.publish("time.A.east", b'A') await nc.publish("time.B.east", b'B') msg_A = await queue.get() msg_B = await queue.get() print("Msg A:", msg_A) print("Msg B:", msg_B) # [end subscribe_star] await nc.close() loop = asyncio.get_event_loop() loop.run_until_complete(example()) loop.close() #+END_SRC *** DONE subscribe_arrow #+BEGIN_SRC python :tangle subscribe_arrow.py import asyncio from nats.aio.client import Client as NATS async def example(): # [begin subscribe_arrow] nc = NATS() await nc.connect(servers=["nats://127.0.0.1:4222"]) # Use queue to wait for 4 messages to arrive queue = asyncio.Queue() async def cb(msg): await queue.put(msg) await nc.subscribe("time.>", cb=cb) # Send 2 messages and wait for them to come in await nc.publish("time.A.east", b'A') await nc.publish("time.B.east", b'B') await nc.publish("time.C.west", b'C') await nc.publish("time.D.west", b'D') for i in range(0, 4): msg = await queue.get() print("Msg:", msg) await nc.close() # [end subscribe_arrow] loop = asyncio.get_event_loop() loop.run_until_complete(example()) loop.close() #+END_SRC *** DONE subscribe_queue #+BEGIN_SRC python :tangle subscribe_queue.py import asyncio from nats.aio.client import Client as NATS async def example(): # [begin subscribe_queue] nc = NATS() await nc.connect(servers=["nats://127.0.0.1:4222"]) future = asyncio.Future() async def cb(msg): nonlocal future future.set_result(msg) await nc.subscribe("updates", queue="workers", cb=cb) await nc.publish("updates", b'All is Well') msg = await asyncio.wait_for(future, 1) print("Msg", msg) # [end subscribe_queue] await nc.close() loop = asyncio.get_event_loop() loop.run_until_complete(example()) loop.close() #+END_SRC *** DONE wildcard_tester #+BEGIN_SRC python :tangle wildcard_tester.py import asyncio from nats.aio.client import Client as NATS async def example(): # [begin wildcard_tester] nc = NATS() await nc.connect(servers=["nats://127.0.0.1:4222"]) await nc.publish("time.us.east", b'...') await nc.publish("time.us.east.atlanta", b'...') await nc.publish("time.eu.east", b'...') await nc.publish("time.eu.east.warsaw", b'...') await nc.close() # [end wildcard_tester] loop = asyncio.get_event_loop() loop.run_until_complete(example()) loop.close() #+END_SRC *** COMMENT connection_listener *** COMMENT servers_added *** DONE error_listener #+BEGIN_SRC python :tangle error_listener.py import asyncio from nats.aio.client import Client as NATS async def example(): # [begin error_listener] nc = NATS() async def error_cb(e): print("Error: ", e) await nc.connect( servers=["nats://127.0.0.1:4222"], reconnect_time_wait=10, error_cb=error_cb, ) # Do something with the connection. # [end error_listener] while True: if nc.is_closed: break await asyncio.sleep(1) await nc.close() loop = asyncio.get_event_loop() loop.run_until_complete(example()) loop.close() #+END_SRC *** DONE connect_status #+BEGIN_SRC python :tangle connect_status.py import asyncio from nats.aio.client import Client as NATS async def example(): # [begin connect_status] nc = NATS() await nc.connect( servers=["nats://127.0.0.1:4222"], ) # Do something with the connection. print("The connection is connected?", nc.is_connected) while True: if nc.is_reconnecting: print("Reconnecting to NATS...") break await asyncio.sleep(1) await nc.close() print("The connection is closed?", nc.is_closed) # [end connect_status] loop = asyncio.get_event_loop() loop.run_until_complete(example()) loop.close() #+END_SRC *** COMMENT slow_pending_limits #+BEGIN_SRC python :tangle slow_pending_limits.py import asyncio from nats.aio.client import Client as NATS async def example(): # [begin slow_pending_limits] nc = NATS() await nc.connect( servers=["nats://127.0.0.1:4222"], ) # Do something with the connection. print("The connection is connected?", nc.is_connected) while True: if nc.is_reconnecting: print("Reconnecting to NATS...") break await asyncio.sleep(1) await nc.close() print("The connection is closed?", nc.is_closed) # [end slow_pending_limits] loop = asyncio.get_event_loop() loop.run_until_complete(example()) loop.close() #+END_SRC *** DONE slow_listener #+BEGIN_SRC python :tangle slow_listener.py import asyncio import nats.aio.errors from nats.aio.client import Client as NATS async def example(): # [begin slow_listener] nc = NATS() async def error_cb(e): if type(e) is nats.aio.errors.ErrSlowConsumer: print("Slow consumer error, unsubscribing from handling further messages...") await nc.unsubscribe(e.sid) await nc.connect( servers=["nats://127.0.0.1:4222"], error_cb=error_cb, ) msgs = [] future = asyncio.Future() async def cb(msg): nonlocal msgs nonlocal future print(msg) msgs.append(msg) if len(msgs) == 3: # Head of line blocking on other messages caused # by single message proccesing taking long... await asyncio.sleep(1) await nc.subscribe("updates", cb=cb, pending_msgs_limit=5) for i in range(0, 10): await nc.publish("updates", "msg #{}".format(i).encode()) await asyncio.sleep(0) try: await asyncio.wait_for(future, 1) except asyncio.TimeoutError: pass for msg in msgs: print("[Received]", msg) await nc.close() # [end slow_listener] loop = asyncio.get_event_loop() loop.run_until_complete(example()) loop.close() #+END_SRC