test_metrics.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. from aiokafka.structs import OffsetAndMetadata
  2. from aiokafka.structs import TopicPartition
  3. from kafkaesk.app import Application
  4. from kafkaesk.consumer import BatchConsumer, Subscription
  5. from tests.utils import record_factory
  6. from unittest.mock import AsyncMock
  7. from unittest.mock import MagicMock
  8. from unittest.mock import patch
  9. import asyncio
  10. import pytest
  11. pytestmark = pytest.mark.asyncio
  12. async def test_record_metric_on_rebalance():
  13. async def coro(*arg, **kwargs):
  14. pass
  15. with patch("kafkaesk.consumer.CONSUMER_REBALANCED") as rebalance_metric:
  16. app_mock = AsyncMock()
  17. app_mock.topic_mng.list_consumer_group_offsets.return_value = {
  18. TopicPartition(topic="foobar", partition=0): OffsetAndMetadata(offset=0, metadata={})
  19. }
  20. subscription = Subscription(
  21. "test_consumer",
  22. coro,
  23. "group",
  24. topics=["stream.foo"],
  25. )
  26. rebalance_listener = BatchConsumer(
  27. subscription=subscription,
  28. app=app_mock,
  29. )
  30. rebalance_listener._consumer = AsyncMock()
  31. await rebalance_listener.on_partitions_assigned(
  32. [TopicPartition(topic="foobar", partition=0)]
  33. )
  34. rebalance_metric.labels.assert_called_with(
  35. partition=0,
  36. group_id="group",
  37. event="assigned",
  38. )
  39. rebalance_metric.labels().inc.assert_called_once()
  40. async def test_record_metric_on_publish():
  41. """
  42. this test is acting funny on github action...
  43. """
  44. with patch("kafkaesk.app.PUBLISHED_MESSAGES") as published_metric, patch(
  45. "kafkaesk.app.PUBLISHED_MESSAGES_TIME"
  46. ) as published_metric_time, patch("kafkaesk.metrics.PUBLISH_MESSAGES") as publish_metric, patch(
  47. "kafkaesk.metrics.PUBLISH_MESSAGES_TIME"
  48. ) as publish_metric_time:
  49. app = Application()
  50. async def _fake_publish(*args, **kwargs):
  51. async def _publish():
  52. return record_factory()
  53. return asyncio.create_task(_publish())
  54. producer = AsyncMock()
  55. producer.send.side_effect = _fake_publish
  56. app._get_producer = AsyncMock(return_value=producer)
  57. app._topic_mng = MagicMock()
  58. app._topic_mng.get_topic_id.return_value = "foobar"
  59. await (await app.raw_publish("foo", b"data"))
  60. published_metric.labels.assert_called_with(stream_id="foobar", partition=0, error="none")
  61. published_metric.labels(
  62. stream_id="foobar", partition=0, error="none"
  63. ).inc.assert_called_once()
  64. published_metric_time.labels.assert_called_with(stream_id="foobar")
  65. published_metric_time.labels(stream_id="foobar").observe.assert_called_once()
  66. publish_metric.labels.assert_called_with(stream_id="foobar", error="none")
  67. publish_metric.labels(stream_id="foobar", error="none").inc.assert_called_once()
  68. publish_metric_time.labels.assert_called_with(stream_id="foobar")
  69. publish_metric_time.labels(stream_id="foobar").observe.assert_called_once()
  70. async def test_record_metric_error():
  71. """
  72. this test is acting funny on github action...
  73. """
  74. with patch("kafkaesk.metrics.PUBLISH_MESSAGES") as publish_metric, patch(
  75. "kafkaesk.metrics.PUBLISH_MESSAGES_TIME"
  76. ) as publish_metric_time:
  77. app = Application()
  78. producer = AsyncMock()
  79. producer.send.side_effect = Exception
  80. app._get_producer = AsyncMock(return_value=producer)
  81. app._topic_mng = MagicMock()
  82. app._topic_mng.get_topic_id.return_value = "foobar"
  83. with pytest.raises(Exception):
  84. await app.raw_publish("foo", b"data")
  85. publish_metric.labels.assert_called_with(stream_id="foobar", error="exception")
  86. publish_metric.labels(stream_id="foobar", error="none").inc.assert_called_once()
  87. publish_metric_time.labels.assert_called_with(stream_id="foobar")
  88. publish_metric_time.labels(stream_id="foobar").observe.assert_called_once()