test_run.py 1.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455
  1. from .produce import Foo
  2. from .produce import producer
  3. from kafkaesk import Application
  4. import asyncio
  5. import pytest
  6. import signal
  7. TOPIC = "test-run"
  8. GROUP = "test-run2"
  9. pytestmark = pytest.mark.asyncio
  10. test_app = Application()
  11. test_app.schema(streams=[TOPIC])(Foo)
  12. @test_app.subscribe(TOPIC, group=GROUP)
  13. async def _consumer(ob: Foo, record, app):
  14. ...
  15. async def test_run_exits_cleanly_while_consuming(kafka, topic_prefix):
  16. kserver = f"{kafka[0]}:{kafka[1]}"
  17. app = Application([kserver], topic_prefix=topic_prefix)
  18. async with app:
  19. pro = asyncio.create_task(producer(app, TOPIC))
  20. proc = await asyncio.create_subprocess_exec(
  21. "kafkaesk",
  22. "tests.acceptance.test_run:test_app",
  23. "--kafka-servers",
  24. kserver,
  25. "--topic-prefix",
  26. topic_prefix,
  27. # cwd=_test_dir,
  28. )
  29. await asyncio.sleep(5)
  30. pro.cancel()
  31. proc.send_signal(signal.SIGINT)
  32. await proc.wait()
  33. assert proc.returncode == 0
  34. results = await app.topic_mng.list_consumer_group_offsets(GROUP)
  35. topic_id = app.topic_mng.get_topic_id(TOPIC)
  36. count = 0
  37. for tp, pos in results.items():
  38. if tp.topic != topic_id:
  39. continue
  40. count += pos.offset
  41. assert count > 0