8-test_client.py 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223
  1. import os
  2. import socket
  3. import threading
  4. import time
  5. import msgpack
  6. import pytest
  7. from pynats import NATSClient
  8. from pynats.exceptions import NATSInvalidSchemeError, NATSReadSocketError
  9. @pytest.fixture
  10. def nats_plain_url():
  11. return os.environ.get("NATS_PLAIN_URL", "nats://127.0.0.1:4222")
  12. @pytest.fixture
  13. def nats_tls_url():
  14. return os.environ.get("NATS_TLS_URL", "tls://127.0.0.1:4224")
  15. def test_connect_and_close(nats_plain_url):
  16. client = NATSClient(nats_plain_url, socket_timeout=2)
  17. client.connect()
  18. client.ping()
  19. client.close()
  20. def test_connect_and_close_using_context_manager(nats_plain_url):
  21. with NATSClient(nats_plain_url, socket_timeout=2) as client:
  22. client.ping()
  23. def test_connect_timeout():
  24. client = NATSClient("nats://127.0.0.1:4223", socket_timeout=2)
  25. with pytest.raises(socket.error):
  26. client.connect()
  27. def test_reconnect(nats_plain_url):
  28. client = NATSClient(nats_plain_url, socket_timeout=2)
  29. client.connect()
  30. client.ping()
  31. client.reconnect()
  32. client.ping()
  33. client.close()
  34. def test_tls_connect(nats_tls_url):
  35. client = NATSClient(nats_tls_url, socket_timeout=2)
  36. client.connect()
  37. client.ping()
  38. client.close()
  39. def test_invalid_scheme():
  40. client = NATSClient("http://127.0.0.1:4224")
  41. with pytest.raises(NATSInvalidSchemeError):
  42. client.connect()
  43. def test_subscribe_unsubscribe(nats_plain_url):
  44. with NATSClient(nats_plain_url, socket_timeout=2) as client:
  45. sub = client.subscribe(
  46. "test-subject", callback=lambda x: x, queue="test-queue", max_messages=2
  47. )
  48. client.unsubscribe(sub)
  49. def test_subscribe_timeout(nats_plain_url):
  50. with NATSClient(nats_plain_url, socket_timeout=2) as client:
  51. sub = client.subscribe(
  52. "test-subject", callback=lambda x: x, queue="test-queue", max_messages=1
  53. )
  54. with pytest.raises(socket.timeout):
  55. client.wait(count=1)
  56. client.unsubscribe(sub)
  57. def test_publish(nats_plain_url):
  58. received = []
  59. def worker():
  60. with NATSClient(nats_plain_url, socket_timeout=2) as client:
  61. def callback(message):
  62. received.append(message)
  63. client.subscribe(
  64. "test-subject", callback=callback, queue="test-queue", max_messages=2
  65. )
  66. client.wait(count=2)
  67. t = threading.Thread(target=worker)
  68. t.start()
  69. time.sleep(1)
  70. with NATSClient(nats_plain_url, socket_timeout=2) as client:
  71. # publish without payload
  72. client.publish("test-subject")
  73. # publish with payload
  74. client.publish("test-subject", payload=b"test-payload")
  75. t.join()
  76. assert len(received) == 2
  77. assert received[0].subject == "test-subject"
  78. assert received[0].reply == ""
  79. assert received[0].payload == b""
  80. assert received[1].subject == "test-subject"
  81. assert received[1].reply == ""
  82. assert received[1].payload == b"test-payload"
  83. def test_request(nats_plain_url):
  84. def worker():
  85. with NATSClient(nats_plain_url, socket_timeout=2) as client:
  86. def callback(message):
  87. client.publish(message.reply, payload=b"test-callback-payload")
  88. client.subscribe(
  89. "test-subject", callback=callback, queue="test-queue", max_messages=2
  90. )
  91. client.wait(count=2)
  92. t = threading.Thread(target=worker)
  93. t.start()
  94. time.sleep(1)
  95. with NATSClient(nats_plain_url, socket_timeout=2) as client:
  96. # request without payload
  97. resp = client.request("test-subject")
  98. assert resp.subject.startswith("_INBOX.")
  99. assert resp.reply == ""
  100. assert resp.payload == b"test-callback-payload"
  101. # request with payload
  102. resp = client.request("test-subject", payload=b"test-payload")
  103. assert resp.subject.startswith("_INBOX.")
  104. assert resp.reply == ""
  105. assert resp.payload == b"test-callback-payload"
  106. t.join()
  107. def test_request_msgpack(nats_plain_url):
  108. def worker():
  109. with NATSClient(nats_plain_url, socket_timeout=2) as client:
  110. def callback(message):
  111. client.publish(
  112. message.reply,
  113. payload=msgpack.packb(
  114. {b"v": 3338} if message.payload else {b"v": 32}
  115. ),
  116. )
  117. client.subscribe(
  118. "test-subject", callback=callback, queue="test-queue", max_messages=2
  119. )
  120. client.wait(count=2)
  121. t = threading.Thread(target=worker)
  122. t.start()
  123. time.sleep(1)
  124. with NATSClient(nats_plain_url, socket_timeout=2) as client:
  125. # request without payload
  126. resp = client.request("test-subject")
  127. assert resp.subject.startswith("_INBOX.")
  128. assert resp.reply == ""
  129. assert msgpack.unpackb(resp.payload) == {b"v": 32}
  130. # request with payload
  131. resp = client.request("test-subject", payload=msgpack.packb("test-payload"))
  132. assert resp.subject.startswith("_INBOX.")
  133. assert resp.reply == ""
  134. assert msgpack.unpackb(resp.payload) == {b"v": 3338}
  135. t.join()
  136. def test_request_timeout(nats_plain_url):
  137. with NATSClient(nats_plain_url, socket_timeout=2) as client:
  138. with pytest.raises(socket.timeout):
  139. client.request("test-subject")
  140. def test_graceful_shutdown(nats_plain_url):
  141. def worker(client, connected_event):
  142. client.connect()
  143. connected_event.set()
  144. try:
  145. client.wait()
  146. except NATSReadSocketError:
  147. assert True
  148. except Exception:
  149. raise AssertionError("unexpected Exception raised")
  150. client = NATSClient(nats_plain_url)
  151. connected_event = threading.Event()
  152. thread = threading.Thread(target=worker, args=[client, connected_event])
  153. thread.start()
  154. assert connected_event.wait(5), "unable to connect"
  155. client.close()
  156. thread.join(5)
  157. assert not thread.is_alive(), "thread did not finish"