test_handler.py 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252
  1. from kafkaesk.ext.logging.handler import KafkaeskQueue
  2. from kafkaesk.ext.logging.handler import PydanticKafkaeskHandler
  3. from kafkaesk.ext.logging.handler import PydanticLogModel
  4. from kafkaesk.ext.logging.handler import PydanticStreamHandler
  5. from typing import Optional
  6. from unittest.mock import MagicMock
  7. from unittest.mock import Mock
  8. from unittest.mock import patch
  9. import asyncio
  10. import io
  11. import kafkaesk
  12. import logging
  13. import pydantic
  14. import pytest
  15. import time
  16. import uuid
  17. pytestmark = pytest.mark.asyncio
  18. @pytest.fixture(scope="function")
  19. def logger():
  20. ll = logging.getLogger("test")
  21. ll.propagate = False
  22. ll.setLevel(logging.DEBUG)
  23. return ll
  24. @pytest.fixture(scope="function")
  25. def stream_handler(logger):
  26. stream = io.StringIO()
  27. handler = PydanticStreamHandler(stream=stream)
  28. logger.addHandler(handler)
  29. return stream
  30. @pytest.fixture(scope="function")
  31. def kafakesk_handler(app, logger):
  32. handler = PydanticKafkaeskHandler(app, "log.test")
  33. logger.addHandler(handler)
  34. return handler
  35. async def test_handler_initializes_applogger(kafka, logger):
  36. app = kafkaesk.Application(
  37. [f"{kafka[0]}:{kafka[1]}"],
  38. topic_prefix=uuid.uuid4().hex,
  39. kafka_settings={"metadata_max_age_ms": 500},
  40. )
  41. handler = PydanticKafkaeskHandler(app, "log.test")
  42. logger.addHandler(handler)
  43. logger.error("Hi!")
  44. await asyncio.sleep(0.1)
  45. assert app._initialized
  46. @pytest.fixture(scope="function")
  47. def log_consumer(app):
  48. consumed = []
  49. @app.subscribe("log.test", group="test_group")
  50. async def consume(data: PydanticLogModel):
  51. consumed.append(data)
  52. yield consumed
  53. class TestPydanticStreamHandler:
  54. async def test_stream_handler(self, stream_handler, logger):
  55. logger.info("Test Message %s", "extra")
  56. message = stream_handler.getvalue()
  57. assert "Test Message extra" in message
  58. async def test_stream_handler_with_log_model(self, stream_handler, logger):
  59. class LogModel(pydantic.BaseModel):
  60. _is_log_model = True
  61. foo: Optional[str]
  62. logger.info("Test Message %s", "extra", LogModel(foo="bar"))
  63. message = stream_handler.getvalue()
  64. assert "Test Message extra" in message
  65. assert "foo=bar" in message
  66. async def test_stream_handler_with_log_model_shortens_log_messae(self, stream_handler, logger):
  67. class LogModel(pydantic.BaseModel):
  68. _is_log_model = True
  69. foo: str
  70. bar: str
  71. logger.info("Test Message %s", "extra", LogModel(foo="X" * 256, bar="Y" * 256))
  72. message = stream_handler.getvalue()
  73. assert "Test Message extra" in message
  74. assert f"foo={'X' * 256}" in message
  75. assert f"bar={'Y' * 256}" not in message
  76. class TestPydanticKafkaeskHandler:
  77. async def test_kafka_handler(self, app, kafakesk_handler, logger, log_consumer):
  78. async with app:
  79. logger.info("Test Message %s", "extra")
  80. await app.flush()
  81. await app.consume_for(1, seconds=8)
  82. assert len(log_consumer) == 1
  83. assert log_consumer[0].message == "Test Message extra"
  84. async def test_kafka_handler_with_log_model(self, app, kafakesk_handler, logger, log_consumer):
  85. class LogModel(pydantic.BaseModel):
  86. _is_log_model = True
  87. foo: Optional[str]
  88. async with app:
  89. logger.info("Test Message %s", "extra", LogModel(foo="bar"))
  90. await app.flush()
  91. await app.consume_for(1, seconds=8)
  92. assert len(log_consumer) == 1
  93. assert log_consumer[0].message == "Test Message extra"
  94. assert log_consumer[0].foo == "bar"
  95. def test_emit_std_output_queue_full(self):
  96. queue = MagicMock()
  97. with patch("kafkaesk.ext.logging.handler.KafkaeskQueue", return_value=queue), patch(
  98. "kafkaesk.ext.logging.handler.sys.stderr.write"
  99. ) as std_write:
  100. queue.put_nowait.side_effect = asyncio.QueueFull
  101. handler = PydanticKafkaeskHandler(MagicMock(), "foo")
  102. record = Mock()
  103. record.pydantic_data = []
  104. handler.emit(record)
  105. std_write.assert_called_once()
  106. def test_emit_limits_std_output_queue_full(self):
  107. queue = MagicMock()
  108. with patch("kafkaesk.ext.logging.handler.KafkaeskQueue", return_value=queue), patch(
  109. "kafkaesk.ext.logging.handler.sys.stderr.write"
  110. ) as std_write:
  111. queue.put_nowait.side_effect = asyncio.QueueFull
  112. handler = PydanticKafkaeskHandler(MagicMock(), "foo")
  113. handler._last_warning_sent = time.time() + 1
  114. record = Mock()
  115. record.pydantic_data = []
  116. handler.emit(record)
  117. std_write.assert_not_called()
  118. def test_clone(self):
  119. handler = PydanticKafkaeskHandler(MagicMock(), "foo")
  120. handler2 = handler.clone()
  121. assert handler != handler2
  122. assert handler.app == handler2.app
  123. assert handler._queue == handler2._queue
  124. def test_emit_drops_message_on_runtime_error_start(self):
  125. queue = MagicMock()
  126. with patch("kafkaesk.ext.logging.handler.KafkaeskQueue", return_value=queue), patch(
  127. "kafkaesk.ext.logging.handler.sys.stderr.write"
  128. ) as std_write:
  129. queue.running = False
  130. queue.start.side_effect = RuntimeError
  131. handler = PydanticKafkaeskHandler(MagicMock(), "foo")
  132. record = Mock()
  133. record.pydantic_data = []
  134. handler.emit(record)
  135. std_write.assert_called_once()
  136. class TestKafkaeskQueue:
  137. @pytest.fixture(scope="function")
  138. async def queue(self, request, app):
  139. max_queue = 10000
  140. for marker in request.node.iter_markers("with_max_queue"):
  141. max_queue = marker.args[0]
  142. app.schema("PydanticLogModel")(PydanticLogModel)
  143. q = KafkaeskQueue(app, max_queue=max_queue)
  144. return q
  145. async def test_queue(self, app, queue):
  146. consumed = []
  147. @app.subscribe("log.test", group="test_group")
  148. async def consume(data: PydanticLogModel):
  149. consumed.append(data)
  150. async with app:
  151. queue.start()
  152. queue.put_nowait("log.test", PydanticLogModel(foo="bar"))
  153. await app.flush()
  154. await app.consume_for(1, seconds=8)
  155. queue.close()
  156. await queue._task
  157. assert len(consumed) == 1
  158. async def test_queue_flush(self, app, queue, log_consumer):
  159. async with app:
  160. queue.start()
  161. for i in range(10):
  162. queue.put_nowait("log.test", PydanticLogModel(count=i))
  163. await queue.flush()
  164. await app.flush()
  165. await app.consume_for(10, seconds=8)
  166. assert len(log_consumer) == 10
  167. async def test_queue_flush_on_close(self, app, queue, log_consumer):
  168. async with app:
  169. queue.start()
  170. await asyncio.sleep(0.1)
  171. queue.close()
  172. for i in range(10):
  173. queue.put_nowait("log.test", PydanticLogModel(count=i))
  174. await app.flush()
  175. await app.consume_for(10, seconds=8)
  176. assert len(log_consumer) == 10
  177. assert queue._task.done()
  178. @pytest.mark.with_max_queue(1)
  179. async def test_queue_max_size(self, app, queue):
  180. queue.start()
  181. queue.put_nowait("log.test", PydanticLogModel())
  182. with pytest.raises(asyncio.QueueFull):
  183. queue.put_nowait("log.test", PydanticLogModel())