123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252 |
- from kafkaesk.ext.logging.handler import KafkaeskQueue
- from kafkaesk.ext.logging.handler import PydanticKafkaeskHandler
- from kafkaesk.ext.logging.handler import PydanticLogModel
- from kafkaesk.ext.logging.handler import PydanticStreamHandler
- from typing import Optional
- from unittest.mock import MagicMock
- from unittest.mock import Mock
- from unittest.mock import patch
- import asyncio
- import io
- import kafkaesk
- import logging
- import pydantic
- import pytest
- import time
- import uuid
- pytestmark = pytest.mark.asyncio
- @pytest.fixture(scope="function")
- def logger():
- ll = logging.getLogger("test")
- ll.propagate = False
- ll.setLevel(logging.DEBUG)
- return ll
- @pytest.fixture(scope="function")
- def stream_handler(logger):
- stream = io.StringIO()
- handler = PydanticStreamHandler(stream=stream)
- logger.addHandler(handler)
- return stream
- @pytest.fixture(scope="function")
- def kafakesk_handler(app, logger):
- handler = PydanticKafkaeskHandler(app, "log.test")
- logger.addHandler(handler)
- return handler
- async def test_handler_initializes_applogger(kafka, logger):
- app = kafkaesk.Application(
- [f"{kafka[0]}:{kafka[1]}"],
- topic_prefix=uuid.uuid4().hex,
- kafka_settings={"metadata_max_age_ms": 500},
- )
- handler = PydanticKafkaeskHandler(app, "log.test")
- logger.addHandler(handler)
- logger.error("Hi!")
- await asyncio.sleep(0.1)
- assert app._initialized
- @pytest.fixture(scope="function")
- def log_consumer(app):
- consumed = []
- @app.subscribe("log.test", group="test_group")
- async def consume(data: PydanticLogModel):
- consumed.append(data)
- yield consumed
- class TestPydanticStreamHandler:
- async def test_stream_handler(self, stream_handler, logger):
- logger.info("Test Message %s", "extra")
- message = stream_handler.getvalue()
- assert "Test Message extra" in message
- async def test_stream_handler_with_log_model(self, stream_handler, logger):
- class LogModel(pydantic.BaseModel):
- _is_log_model = True
- foo: Optional[str]
- logger.info("Test Message %s", "extra", LogModel(foo="bar"))
- message = stream_handler.getvalue()
- assert "Test Message extra" in message
- assert "foo=bar" in message
- async def test_stream_handler_with_log_model_shortens_log_messae(self, stream_handler, logger):
- class LogModel(pydantic.BaseModel):
- _is_log_model = True
- foo: str
- bar: str
- logger.info("Test Message %s", "extra", LogModel(foo="X" * 256, bar="Y" * 256))
- message = stream_handler.getvalue()
- assert "Test Message extra" in message
- assert f"foo={'X' * 256}" in message
- assert f"bar={'Y' * 256}" not in message
- class TestPydanticKafkaeskHandler:
- async def test_kafka_handler(self, app, kafakesk_handler, logger, log_consumer):
- async with app:
- logger.info("Test Message %s", "extra")
- await app.flush()
- await app.consume_for(1, seconds=8)
- assert len(log_consumer) == 1
- assert log_consumer[0].message == "Test Message extra"
- async def test_kafka_handler_with_log_model(self, app, kafakesk_handler, logger, log_consumer):
- class LogModel(pydantic.BaseModel):
- _is_log_model = True
- foo: Optional[str]
- async with app:
- logger.info("Test Message %s", "extra", LogModel(foo="bar"))
- await app.flush()
- await app.consume_for(1, seconds=8)
- assert len(log_consumer) == 1
- assert log_consumer[0].message == "Test Message extra"
- assert log_consumer[0].foo == "bar"
- def test_emit_std_output_queue_full(self):
- queue = MagicMock()
- with patch("kafkaesk.ext.logging.handler.KafkaeskQueue", return_value=queue), patch(
- "kafkaesk.ext.logging.handler.sys.stderr.write"
- ) as std_write:
- queue.put_nowait.side_effect = asyncio.QueueFull
- handler = PydanticKafkaeskHandler(MagicMock(), "foo")
- record = Mock()
- record.pydantic_data = []
- handler.emit(record)
- std_write.assert_called_once()
- def test_emit_limits_std_output_queue_full(self):
- queue = MagicMock()
- with patch("kafkaesk.ext.logging.handler.KafkaeskQueue", return_value=queue), patch(
- "kafkaesk.ext.logging.handler.sys.stderr.write"
- ) as std_write:
- queue.put_nowait.side_effect = asyncio.QueueFull
- handler = PydanticKafkaeskHandler(MagicMock(), "foo")
- handler._last_warning_sent = time.time() + 1
- record = Mock()
- record.pydantic_data = []
- handler.emit(record)
- std_write.assert_not_called()
- def test_clone(self):
- handler = PydanticKafkaeskHandler(MagicMock(), "foo")
- handler2 = handler.clone()
- assert handler != handler2
- assert handler.app == handler2.app
- assert handler._queue == handler2._queue
- def test_emit_drops_message_on_runtime_error_start(self):
- queue = MagicMock()
- with patch("kafkaesk.ext.logging.handler.KafkaeskQueue", return_value=queue), patch(
- "kafkaesk.ext.logging.handler.sys.stderr.write"
- ) as std_write:
- queue.running = False
- queue.start.side_effect = RuntimeError
- handler = PydanticKafkaeskHandler(MagicMock(), "foo")
- record = Mock()
- record.pydantic_data = []
- handler.emit(record)
- std_write.assert_called_once()
- class TestKafkaeskQueue:
- @pytest.fixture(scope="function")
- async def queue(self, request, app):
- max_queue = 10000
- for marker in request.node.iter_markers("with_max_queue"):
- max_queue = marker.args[0]
- app.schema("PydanticLogModel")(PydanticLogModel)
- q = KafkaeskQueue(app, max_queue=max_queue)
- return q
- async def test_queue(self, app, queue):
- consumed = []
- @app.subscribe("log.test", group="test_group")
- async def consume(data: PydanticLogModel):
- consumed.append(data)
- async with app:
- queue.start()
- queue.put_nowait("log.test", PydanticLogModel(foo="bar"))
- await app.flush()
- await app.consume_for(1, seconds=8)
- queue.close()
- await queue._task
- assert len(consumed) == 1
- async def test_queue_flush(self, app, queue, log_consumer):
- async with app:
- queue.start()
- for i in range(10):
- queue.put_nowait("log.test", PydanticLogModel(count=i))
- await queue.flush()
- await app.flush()
- await app.consume_for(10, seconds=8)
- assert len(log_consumer) == 10
- async def test_queue_flush_on_close(self, app, queue, log_consumer):
- async with app:
- queue.start()
- await asyncio.sleep(0.1)
- queue.close()
- for i in range(10):
- queue.put_nowait("log.test", PydanticLogModel(count=i))
- await app.flush()
- await app.consume_for(10, seconds=8)
- assert len(log_consumer) == 10
- assert queue._task.done()
- @pytest.mark.with_max_queue(1)
- async def test_queue_max_size(self, app, queue):
- queue.start()
- queue.put_nowait("log.test", PydanticLogModel())
- with pytest.raises(asyncio.QueueFull):
- queue.put_nowait("log.test", PydanticLogModel())
|