123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109 |
- from aiokafka.structs import OffsetAndMetadata
- from aiokafka.structs import TopicPartition
- from kafkaesk.app import Application
- from kafkaesk.consumer import BatchConsumer, Subscription
- from tests.utils import record_factory
- from unittest.mock import AsyncMock
- from unittest.mock import MagicMock
- from unittest.mock import patch
- import asyncio
- import pytest
- pytestmark = pytest.mark.asyncio
- async def test_record_metric_on_rebalance():
- async def coro(*arg, **kwargs):
- pass
- with patch("kafkaesk.consumer.CONSUMER_REBALANCED") as rebalance_metric:
- app_mock = AsyncMock()
- app_mock.topic_mng.list_consumer_group_offsets.return_value = {
- TopicPartition(topic="foobar", partition=0): OffsetAndMetadata(offset=0, metadata={})
- }
- subscription = Subscription(
- "test_consumer",
- coro,
- "group",
- topics=["stream.foo"],
- )
- rebalance_listener = BatchConsumer(
- subscription=subscription,
- app=app_mock,
- )
- rebalance_listener._consumer = AsyncMock()
- await rebalance_listener.on_partitions_assigned(
- [TopicPartition(topic="foobar", partition=0)]
- )
- rebalance_metric.labels.assert_called_with(
- partition=0,
- group_id="group",
- event="assigned",
- )
- rebalance_metric.labels().inc.assert_called_once()
- async def test_record_metric_on_publish():
- """
- this test is acting funny on github action...
- """
- with patch("kafkaesk.app.PUBLISHED_MESSAGES") as published_metric, patch(
- "kafkaesk.app.PUBLISHED_MESSAGES_TIME"
- ) as published_metric_time, patch("kafkaesk.metrics.PUBLISH_MESSAGES") as publish_metric, patch(
- "kafkaesk.metrics.PUBLISH_MESSAGES_TIME"
- ) as publish_metric_time:
- app = Application()
- async def _fake_publish(*args, **kwargs):
- async def _publish():
- return record_factory()
- return asyncio.create_task(_publish())
- producer = AsyncMock()
- producer.send.side_effect = _fake_publish
- app._get_producer = AsyncMock(return_value=producer)
- app._topic_mng = MagicMock()
- app._topic_mng.get_topic_id.return_value = "foobar"
- await (await app.raw_publish("foo", b"data"))
- published_metric.labels.assert_called_with(stream_id="foobar", partition=0, error="none")
- published_metric.labels(
- stream_id="foobar", partition=0, error="none"
- ).inc.assert_called_once()
- published_metric_time.labels.assert_called_with(stream_id="foobar")
- published_metric_time.labels(stream_id="foobar").observe.assert_called_once()
- publish_metric.labels.assert_called_with(stream_id="foobar", error="none")
- publish_metric.labels(stream_id="foobar", error="none").inc.assert_called_once()
- publish_metric_time.labels.assert_called_with(stream_id="foobar")
- publish_metric_time.labels(stream_id="foobar").observe.assert_called_once()
- async def test_record_metric_error():
- """
- this test is acting funny on github action...
- """
- with patch("kafkaesk.metrics.PUBLISH_MESSAGES") as publish_metric, patch(
- "kafkaesk.metrics.PUBLISH_MESSAGES_TIME"
- ) as publish_metric_time:
- app = Application()
- producer = AsyncMock()
- producer.send.side_effect = Exception
- app._get_producer = AsyncMock(return_value=producer)
- app._topic_mng = MagicMock()
- app._topic_mng.get_topic_id.return_value = "foobar"
- with pytest.raises(Exception):
- await app.raw_publish("foo", b"data")
- publish_metric.labels.assert_called_with(stream_id="foobar", error="exception")
- publish_metric.labels(stream_id="foobar", error="none").inc.assert_called_once()
- publish_metric_time.labels.assert_called_with(stream_id="foobar")
- publish_metric_time.labels(stream_id="foobar").observe.assert_called_once()
|