123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354 |
- from aiokafka import ConsumerRecord
- from kafkaesk import Application
- from kafkaesk.exceptions import ConsumerUnhealthyException
- from .produce import producer
- from kafkaesk.exceptions import ProducerUnhealthyException
- from kafkaesk.kafka import KafkaTopicManager
- from unittest.mock import call
- from unittest.mock import MagicMock
- from unittest.mock import Mock
- from unittest.mock import patch
- import aiokafka.structs
- import asyncio
- import pydantic
- import pytest
- import uuid
- try:
- from unittest.mock import AsyncMock
- except: # noqa
- AsyncMock = None # type: ignore
- pytestmark = pytest.mark.asyncio
- TOPIC = "test-hc"
- async def test_health_check_should_fail_with_unhandled(app: Application):
- @app.subscribe(TOPIC, group=TOPIC)
- async def consume(data):
- raise Exception("failure!")
- async with app:
- produce = asyncio.create_task(producer(app, TOPIC))
- fut = asyncio.create_task(app.consume_forever())
- await fut
- with pytest.raises(ConsumerUnhealthyException):
- await app.health_check()
- produce.cancel()
- async def test_health_check_should_succeed(app):
- @app.subscribe(TOPIC, group=TOPIC)
- async def consume(data):
- ...
- async with app:
- produce = asyncio.create_task(producer(app, TOPIC))
- asyncio.create_task(app.consume_forever())
- await asyncio.sleep(1) # wait for some to produce and then be consumed to cause failure
- await app.health_check()
- produce.cancel()
|