123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139 |
- from .produce import Foo
- from .produce import producer
- from kafkaesk.consumer import BatchConsumer, Subscription
- import asyncio
- import kafkaesk
- import pytest
- pytestmark = pytest.mark.asyncio
- GROUP = TOPIC = "test-rebalance"
- async def test_cancel_getone(app):
- app.schema(streams=[TOPIC])(Foo)
- async def handler(*args, **kwargs):
- pass
- async with app:
- subscription = Subscription(
- "test_consumer",
- handler,
- GROUP,
- topics=[TOPIC],
- timeout_seconds=1,
- )
- consumer = BatchConsumer(
- subscription=subscription,
- app=app,
- )
- await consumer.initialize()
- raw_consumer = consumer._consumer
- with raw_consumer._subscription.fetch_context():
- try:
- await asyncio.wait_for(raw_consumer._fetcher.next_record([]), timeout=0.1)
- except asyncio.TimeoutError:
- assert len(raw_consumer._fetcher._fetch_waiters) == 0
- await raw_consumer.stop()
- async def test_many_consumers_rebalancing(kafka, topic_prefix):
- apps = []
- for idx in range(5):
- app = kafkaesk.Application(
- [f"{kafka[0]}:{kafka[1]}"],
- topic_prefix=topic_prefix,
- )
- app.schema(streams=[TOPIC])(Foo)
- app.id = idx
- @app.subscribe(TOPIC, group=GROUP)
- async def consumer(ob: Foo, record, app):
- ...
- await app.initialize()
- apps.append(app)
- produce = asyncio.create_task(producer(apps[0], TOPIC))
- consumer_tasks = []
- for app in apps:
- consumer_tasks.append(asyncio.create_task(app.consume_forever()))
- await asyncio.sleep(5)
- # cycle through each, destroying...
- for idx in range(5):
- await apps[idx].stop()
- await asyncio.sleep(1)
- assert consumer_tasks[idx].done()
- # start again
- consumer_tasks[idx] = asyncio.create_task(apps[idx].consume_forever())
- produce.cancel()
- for idx in range(5):
- await apps[idx].stop()
- async def test_consume_every_message_once_during_rebalance(kafka, topic_prefix):
- """
- No matter what, even without reassignment, some messages
- seem to be relayed. You can see if when a single consumer and no rebalance
- sometimes.
- """
- consumed = {}
- def record_msg(record):
- key = f"{record.partition}-{record.offset}"
- if key not in consumed:
- consumed[key] = 0
- consumed[key] += 1
- apps = []
- for idx in range(5):
- app = kafkaesk.Application(
- [f"{kafka[0]}:{kafka[1]}"],
- topic_prefix=topic_prefix,
- )
- app.schema(streams=[TOPIC])(Foo)
- app.id = idx
- @app.subscribe(TOPIC, group=GROUP)
- async def consumer(ob: Foo, record, app):
- record_msg(record)
- await app.initialize()
- apps.append(app)
- consumer_tasks = []
- for app in apps:
- consumer_tasks.append(asyncio.create_task(app.consume_forever()))
- await asyncio.sleep(1)
- produce = asyncio.create_task(producer(apps[0], TOPIC))
- await asyncio.sleep(5)
- # cycle through each, destroying...
- for idx in range(5):
- await apps[idx].stop()
- await asyncio.sleep(1)
- assert consumer_tasks[idx].done()
- # start again
- consumer_tasks[idx] = asyncio.create_task(apps[idx].consume_forever())
- produce.cancel()
- for idx in range(5):
- await apps[idx].stop()
- assert len(consumed) > 100
- # now check that we always consumed a message only once
- for v in consumed.values():
- assert v == 1
|