test_healthcheck.py 1.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354
  1. from aiokafka import ConsumerRecord
  2. from kafkaesk import Application
  3. from kafkaesk.exceptions import ConsumerUnhealthyException
  4. from .produce import producer
  5. from kafkaesk.exceptions import ProducerUnhealthyException
  6. from kafkaesk.kafka import KafkaTopicManager
  7. from unittest.mock import call
  8. from unittest.mock import MagicMock
  9. from unittest.mock import Mock
  10. from unittest.mock import patch
  11. import aiokafka.structs
  12. import asyncio
  13. import pydantic
  14. import pytest
  15. import uuid
  16. try:
  17. from unittest.mock import AsyncMock
  18. except: # noqa
  19. AsyncMock = None # type: ignore
  20. pytestmark = pytest.mark.asyncio
  21. TOPIC = "test-hc"
  22. async def test_health_check_should_fail_with_unhandled(app: Application):
  23. @app.subscribe(TOPIC, group=TOPIC)
  24. async def consume(data):
  25. raise Exception("failure!")
  26. async with app:
  27. produce = asyncio.create_task(producer(app, TOPIC))
  28. fut = asyncio.create_task(app.consume_forever())
  29. await fut
  30. with pytest.raises(ConsumerUnhealthyException):
  31. await app.health_check()
  32. produce.cancel()
  33. async def test_health_check_should_succeed(app):
  34. @app.subscribe(TOPIC, group=TOPIC)
  35. async def consume(data):
  36. ...
  37. async with app:
  38. produce = asyncio.create_task(producer(app, TOPIC))
  39. asyncio.create_task(app.consume_forever())
  40. await asyncio.sleep(1) # wait for some to produce and then be consumed to cause failure
  41. await app.health_check()
  42. produce.cancel()