8-test_client.py 5.9 KB


  1. import os
  2. import socket
  3. import threading
  4. import time
  5. import msgpack
  6. import pytest
  7. from pynats import NATSClient
  8. from pynats.exceptions import NATSInvalidSchemeError, NATSReadSocketError
  9. @pytest.fixture
  10. def nats_plain_url():
  11. return os.environ.get("NATS_PLAIN_URL", "nats://127.0.0.1:4222")
  12. @pytest.fixture
  13. def nats_tls_url():
  14. return os.environ.get("NATS_TLS_URL", "tls://127.0.0.1:4224")
  15. def test_connect_and_close(nats_plain_url):
  16. client = NATSClient(nats_plain_url, socket_timeout=2)
  17. client.connect()
  18. client.ping()
  19. client.close()
  20. def test_connect_and_close_using_context_manager(nats_plain_url):
  21. with NATSClient(nats_plain_url, socket_timeout=2) as client:
  22. client.ping()
  23. def test_connect_timeout():
  24. client = NATSClient("nats://127.0.0.1:4223", socket_timeout=2)
  25. with pytest.raises(socket.error):
  26. client.connect()
  27. def test_reconnect(nats_plain_url):
  28. client = NATSClient(nats_plain_url, socket_timeout=2)
  29. client.connect()
  30. client.ping()
  31. client.reconnect()
  32. client.ping()
  33. client.close()
  34. def test_tls_connect(nats_tls_url):
  35. client = NATSClient(nats_tls_url, socket_timeout=2)
  36. client.connect()
  37. client.ping()
  38. client.close()
  39. def test_invalid_scheme():
  40. client = NATSClient("http://127.0.0.1:4224")
  41. with pytest.raises(NATSInvalidSchemeError):
  42. client.connect()
  43. def test_subscribe_unsubscribe(nats_plain_url):
  44. with NATSClient(nats_plain_url, socket_timeout=2) as client:
  45. sub = client.subscribe(
  46. "test-subject", callback=lambda x: x, queue="test-queue", max_messages=2
  47. )
  48. client.unsubscribe(sub)
  49. def test_subscribe_timeout(nats_plain_url):
  50. with NATSClient(nats_plain_url, socket_timeout=2) as client:
  51. sub = client.subscribe(
  52. "test-subject", callback=lambda x: x, queue="test-queue", max_messages=1
  53. )
  54. with pytest.raises(socket.timeout):
  55. client.wait(count=1)
  56. client.unsubscribe(sub)
  57. def test_publish(nats_plain_url):
  58. received = []
  59. def worker():
  60. with NATSClient(nats_plain_url, socket_timeout=2) as client:
  61. def callback(message):
  62. received.append(message)
  63. client.subscribe(
  64. "test-subject", callback=callback, queue="test-queue", max_messages=2
  65. )
  66. client.wait(count=2)
  67. t = threading.Thread(target=worker)
  68. t.start()
  69. time.sleep(1)
  70. with NATSClient(nats_plain_url, socket_timeout=2) as client:
  71. # publish without payload
  72. client.publish("test-subject")
  73. # publish with payload
  74. client.publish("test-subject", payload=b"test-payload")
  75. t.join()
  76. assert len(received) == 2
  77. assert received[0].subject == "test-subject"
  78. assert received[0].reply == ""
  79. assert received[0].payload == b""
  80. assert received[1].subject == "test-subject"
  81. assert received[1].reply == ""
  82. assert received[1].payload == b"test-payload"
  83. def test_request(nats_plain_url):
  84. def worker():
  85. with NATSClient(nats_plain_url, socket_timeout=2) as client:
  86. def callback(message):
  87. client.publish(message.reply, payload=b"test-callback-payload")
  88. client.subscribe(
  89. "test-subject", callback=callback, queue="test-queue", max_messages=2
  90. )
  91. client.wait(count=2)
  92. t = threading.Thread(target=worker)
  93. t.start()
  94. time.sleep(1)
  95. with NATSClient(nats_plain_url, socket_timeout=2) as client:
  96. # request without payload
  97. resp = client.request("test-subject")
  98. assert resp.subject.startswith("_INBOX.")
  99. assert resp.reply == ""
  100. assert resp.payload == b"test-callback-payload"
  101. # request with payload
  102. resp = client.request("test-subject", payload=b"test-payload")
  103. assert resp.subject.startswith("_INBOX.")
  104. assert resp.reply == ""
  105. assert resp.payload == b"test-callback-payload"
  106. t.join()
  107. def test_request_msgpack(nats_plain_url):
  108. def worker():
  109. with NATSClient(nats_plain_url, socket_timeout=2) as client:
  110. def callback(message):
  111. client.publish(
  112. message.reply,
  113. payload=msgpack.packb(
  114. {b"v": 3338} if message.payload else {b"v": 32}
  115. ),
  116. )
  117. client.subscribe(
  118. "test-subject", callback=callback, queue="test-queue", max_messages=2
  119. )
  120. client.wait(count=2)
  121. t = threading.Thread(target=worker)
  122. t.start()
  123. time.sleep(1)
  124. with NATSClient(nats_plain_url, socket_timeout=2) as client:
  125. # request without payload
  126. resp = client.request("test-subject")
  127. assert resp.subject.startswith("_INBOX.")
  128. assert resp.reply == ""
  129. assert msgpack.unpackb(resp.payload) == {b"v": 32}
  130. # request with payload
  131. resp = client.request("test-subject", payload=msgpack.packb("test-payload"))
  132. assert resp.subject.startswith("_INBOX.")
  133. assert resp.reply == ""
  134. assert msgpack.unpackb(resp.payload) == {b"v": 3338}
  135. t.join()
  136. def test_request_timeout(nats_plain_url):
  137. with NATSClient(nats_plain_url, socket_timeout=2) as client:
  138. with pytest.raises(socket.timeout):
  139. client.request("test-subject")
  140. def test_graceful_shutdown(nats_plain_url):
  141. def worker(client, connected_event):
  142. client.connect()
  143. connected_event.set()
  144. try:
  145. client.wait()
  146. except NATSReadSocketError:
  147. assert True
  148. except Exception:
  149. raise AssertionError("unexpected Exception raised")
  150. client = NATSClient(nats_plain_url)
  151. connected_event = threading.Event()
  152. thread = threading.Thread(target=worker, args=[client, connected_event])
  153. thread.start()
  154. assert connected_event.wait(5), "unable to connect"
  155. client.close()
  156. thread.join(5)
  157. assert not thread.is_alive(), "thread did not finish"