123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344 |
- from asyncio.futures import Future
- from kafkaesk.app import Application
- from kafkaesk.app import published_callback
- from kafkaesk.app import run
- from kafkaesk.app import run_app
- from kafkaesk.app import SchemaRegistration
- from jaeger_client import Config, Tracer
- from opentracing.scope_managers.contextvars import ContextVarsScopeManager
- from tests.utils import record_factory
- from unittest.mock import ANY
- from unittest.mock import AsyncMock
- from unittest.mock import MagicMock
- from unittest.mock import Mock
- from unittest.mock import patch
- import asyncio
- import json
- import kafkaesk
- import kafkaesk.exceptions
- import opentracing
- import pydantic
- import pytest
- import time
- pytestmark = pytest.mark.asyncio
- class TestApplication:
- async def test_app_events(self):
- app = Application()
- async def on_finalize():
- pass
- app.on("finalize", on_finalize)
- assert len(app._event_handlers["finalize"]) == 1
- async def test_app_finalize_event(self):
- app = Application()
- class CallTracker:
- def __init__(self):
- self.called = False
- async def on_finalize(self):
- self.called = True
- tracker = CallTracker()
- app.on("finalize", tracker.on_finalize)
- await app.finalize()
- assert tracker.called is True
- def test_publish_callback(self, metrics):
- fut = Future()
- fut.set_result(record_factory())
- published_callback("topic", time.time() - 1, fut)
- metrics["PUBLISHED_MESSAGES"].labels.assert_called_with(
- stream_id="topic", partition=0, error="none"
- )
- metrics["PUBLISHED_MESSAGES"].labels().inc()
- metrics["PRODUCER_TOPIC_OFFSET"].labels.assert_called_with(stream_id="topic", partition=0)
- metrics["PRODUCER_TOPIC_OFFSET"].labels().set.assert_called_with(0)
- metrics["PUBLISHED_MESSAGES_TIME"].labels.assert_called_with(stream_id="topic")
- assert metrics["PUBLISHED_MESSAGES_TIME"].labels().observe.mock_calls[0].args[
- 0
- ] == pytest.approx(1, 0.1)
- def test_publish_callback_exc(self, metrics):
- fut = Future()
- fut.set_exception(Exception())
- published_callback("topic", time.time(), fut)
- metrics["PUBLISHED_MESSAGES"].labels.assert_called_with(
- stream_id="topic", partition=-1, error="Exception"
- )
- metrics["PUBLISHED_MESSAGES"].labels().inc()
- def test_mount_router(self):
- app = Application()
- router = kafkaesk.Router()
- @router.schema("Foo", streams=["foo.bar"])
- class Foo(pydantic.BaseModel):
- bar: str
- @router.subscribe("foo.bar", group="test_group")
- async def consume(data: Foo, schema, record):
- ...
- app.mount(router)
- assert app.subscriptions == router.subscriptions
- assert app.schemas == router.schemas
- assert app.event_handlers == router.event_handlers
- async def test_consumer_health_check(self):
- app = kafkaesk.Application()
- subscription_consumer = AsyncMock()
- app._subscription_consumers.append(subscription_consumer)
- subscription_consumer.consumer._client.ready.return_value = True
- await app.health_check()
- async def test_consumer_health_check_raises_exception(self):
- app = kafkaesk.Application()
- subscription = kafkaesk.Subscription(
- "test_consumer", lambda record: 1, "group", topics=["foo"]
- )
- subscription_consumer = kafkaesk.BatchConsumer(
- subscription=subscription,
- app=app,
- )
- app._subscription_consumers.append(subscription_consumer)
- subscription_consumer._consumer = AsyncMock()
- subscription_consumer._consumer._client.ready.return_value = False
- with pytest.raises(kafkaesk.exceptions.ConsumerUnhealthyException):
- await app.health_check()
- async def test_consumer_health_check_producer_healthy(self):
- app = kafkaesk.Application()
- app._producer = MagicMock()
- app._producer._sender.sender_task.done.return_value = False
- await app.health_check()
- async def test_consumer_health_check_producer_unhealthy(self):
- app = kafkaesk.Application()
- app._producer = MagicMock()
- app._producer._sender.sender_task.done.return_value = True
- with pytest.raises(kafkaesk.exceptions.ProducerUnhealthyException):
- await app.health_check()
- async def test_configure_kafka_producer(self):
- app = kafkaesk.Application(
- kafka_settings={
- "metadata_max_age_ms": 100,
- "max_batch_size": 100,
- # invalid for producer so should not be applied here
- "max_partition_fetch_bytes": 100,
- }
- )
- # verify it is created correctly
- app.producer_factory()
- # now, validate the wiring
- with patch("kafkaesk.app.aiokafka.AIOKafkaProducer") as mock:
- app.producer_factory()
- mock.assert_called_with(
- bootstrap_servers=None,
- loop=ANY,
- api_version="auto",
- metadata_max_age_ms=100,
- max_batch_size=100,
- )
- async def test_configure_kafka_consumer(self):
- app = kafkaesk.Application(
- kafka_settings={
- "max_partition_fetch_bytes": 100,
- "fetch_max_wait_ms": 100,
- "metadata_max_age_ms": 100,
- # invalid for consumer so should not be applied here
- "max_batch_size": 100,
- }
- )
- # verify it is created correctly
- app.consumer_factory(group_id="foobar")
- # now, validate the wiring
- with patch("kafkaesk.app.aiokafka.AIOKafkaConsumer") as mock:
- app.consumer_factory(group_id="foobar")
- mock.assert_called_with(
- bootstrap_servers=None,
- loop=ANY,
- group_id="foobar",
- api_version="auto",
- auto_offset_reset="earliest",
- enable_auto_commit=False,
- max_partition_fetch_bytes=100,
- fetch_max_wait_ms=100,
- metadata_max_age_ms=100,
- )
- def test_configure(self):
- app = kafkaesk.Application()
- app.configure(
- kafka_servers=["kafka_servers"],
- topic_prefix="topic_prefix",
- kafka_settings={"kafka_settings": "kafka_settings"},
- api_version="api_version",
- replication_factor="replication_factor",
- )
- assert app._kafka_servers == ["kafka_servers"]
- assert app._topic_prefix == "topic_prefix"
- assert app._kafka_settings == {"kafka_settings": "kafka_settings"}
- assert app._kafka_api_version == "api_version"
- assert app._replication_factor == "replication_factor"
- # now make sure none values do not overwrite
- app.configure(
- kafka_servers=None,
- topic_prefix=None,
- kafka_settings=None,
- api_version=None,
- replication_factor=None,
- )
- assert app._kafka_servers == ["kafka_servers"]
- assert app._topic_prefix == "topic_prefix"
- assert app._kafka_settings == {"kafka_settings": "kafka_settings"}
- assert app._kafka_api_version == "api_version"
- assert app._replication_factor == "replication_factor"
- async def test_initialize_with_unconfigured_app_raises_exception(self):
- app = kafkaesk.Application()
- with pytest.raises(kafkaesk.exceptions.AppNotConfiguredException):
- await app.initialize()
- async def test_publish_propagates_headers(self):
- app = kafkaesk.Application(kafka_servers=["foo"])
- class Foo(pydantic.BaseModel):
- bar: str
- producer = AsyncMock()
- producer.send.return_value = fut = asyncio.Future()
- fut.set_result("ok")
- app._get_producer = AsyncMock(return_value=producer)
- app._topic_mng = MagicMock()
- app._topic_mng.get_topic_id.return_value = "foobar"
- app._topic_mng.topic_exists = AsyncMock(return_value=True)
- future = await app.publish("foobar", Foo(bar="foo"), headers=[("foo", b"bar")])
- _ = await future
- producer.send.assert_called_with(
- "foobar",
- value=b'{"schema":"Foo:1","data":{"bar":"foo"}}',
- key=None,
- headers=[("foo", b"bar")],
- )
- async def test_publish_configured_retention_policy(self):
- app = kafkaesk.Application(kafka_servers=["foo"])
- @app.schema(retention=100)
- class Foo(pydantic.BaseModel):
- bar: str
- producer = AsyncMock()
- producer.send.return_value = fut = asyncio.Future()
- fut.set_result("ok")
- app._get_producer = AsyncMock(return_value=producer)
- app._topic_mng = MagicMock()
- app._topic_mng.get_topic_id.return_value = "foobar"
- app._topic_mng.topic_exists = AsyncMock(return_value=False)
- app._topic_mng.create_topic = AsyncMock()
- future = await app.publish("foobar", Foo(bar="foo"), headers=[("foo", b"bar")])
- await future
- app._topic_mng.create_topic.assert_called_with(
- "foobar", replication_factor=None, retention_ms=100 * 1000
- )
- async def test_publish_injects_tracing(self):
- app = kafkaesk.Application(kafka_servers=["foo"])
- producer = AsyncMock()
- producer.send.return_value = fut = asyncio.Future()
- fut.set_result("ok")
- app._get_producer = AsyncMock(return_value=producer)
- config = Config(
- config={"sampler": {"type": "const", "param": 1}, "logging": True, "propagation": "b3"},
- service_name="test_service",
- scope_manager=ContextVarsScopeManager(),
- )
- # this call also sets opentracing.tracer
- tracer = config.initialize_tracer()
- span = tracer.start_span(operation_name="dummy")
- tracer.scope_manager.activate(span, True)
- future = await app.raw_publish("foobar", b"foobar")
- await future
- headers = producer.mock_calls[0].kwargs["headers"]
- assert str(span).startswith(headers[0][1].decode())
- class TestSchemaRegistration:
- def test_schema_registration_repr(self):
- reg = SchemaRegistration(id="id", version=1, model=None)
- assert repr(reg) == "<SchemaRegistration id, version: 1 >"
- test_app = Application()
- def app_callable():
- return test_app
- class TestRun:
- def test_run(self):
- rapp = AsyncMock()
- with patch("kafkaesk.app.run_app", rapp), patch("kafkaesk.app.cli_parser") as cli_parser:
- args = Mock()
- args.app = "tests.unit.test_app:test_app"
- args.kafka_servers = "foo,bar"
- args.kafka_settings = json.dumps({"foo": "bar"})
- args.topic_prefix = "prefix"
- args.api_version = "api_version"
- cli_parser.parse_args.return_value = args
- run()
- rapp.assert_called_once()
- assert test_app._kafka_servers == ["foo", "bar"]
- assert test_app._kafka_settings == {"foo": "bar"}
- assert test_app._topic_prefix == "prefix"
- assert test_app._kafka_api_version == "api_version"
- def test_run_callable(self):
- rapp = AsyncMock()
- with patch("kafkaesk.app.run_app", rapp), patch("kafkaesk.app.cli_parser") as cli_parser:
- args = Mock()
- args.app = "tests.unit.test_app:app_callable"
- args.kafka_settings = None
- cli_parser.parse_args.return_value = args
- run()
- rapp.assert_called_once()
- async def test_run_app(self):
- app_mock = AsyncMock()
- app_mock.consume_forever.return_value = (set(), set())
- loop = MagicMock()
- with patch("kafkaesk.app.asyncio.get_event_loop", return_value=loop):
- await run_app(app_mock)
- app_mock.consume_forever.assert_called_once()
- assert len(loop.add_signal_handler.mock_calls) == 2
|