test_rebalance.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139
  1. from .produce import Foo
  2. from .produce import producer
  3. from kafkaesk.consumer import BatchConsumer, Subscription
  4. import asyncio
  5. import kafkaesk
  6. import pytest
  7. pytestmark = pytest.mark.asyncio
  8. GROUP = TOPIC = "test-rebalance"
  9. async def test_cancel_getone(app):
  10. app.schema(streams=[TOPIC])(Foo)
  11. async def handler(*args, **kwargs):
  12. pass
  13. async with app:
  14. subscription = Subscription(
  15. "test_consumer",
  16. handler,
  17. GROUP,
  18. topics=[TOPIC],
  19. timeout_seconds=1,
  20. )
  21. consumer = BatchConsumer(
  22. subscription=subscription,
  23. app=app,
  24. )
  25. await consumer.initialize()
  26. raw_consumer = consumer._consumer
  27. with raw_consumer._subscription.fetch_context():
  28. try:
  29. await asyncio.wait_for(raw_consumer._fetcher.next_record([]), timeout=0.1)
  30. except asyncio.TimeoutError:
  31. assert len(raw_consumer._fetcher._fetch_waiters) == 0
  32. await raw_consumer.stop()
  33. async def test_many_consumers_rebalancing(kafka, topic_prefix):
  34. apps = []
  35. for idx in range(5):
  36. app = kafkaesk.Application(
  37. [f"{kafka[0]}:{kafka[1]}"],
  38. topic_prefix=topic_prefix,
  39. )
  40. app.schema(streams=[TOPIC])(Foo)
  41. app.id = idx
  42. @app.subscribe(TOPIC, group=GROUP)
  43. async def consumer(ob: Foo, record, app):
  44. ...
  45. await app.initialize()
  46. apps.append(app)
  47. produce = asyncio.create_task(producer(apps[0], TOPIC))
  48. consumer_tasks = []
  49. for app in apps:
  50. consumer_tasks.append(asyncio.create_task(app.consume_forever()))
  51. await asyncio.sleep(5)
  52. # cycle through each, destroying...
  53. for idx in range(5):
  54. await apps[idx].stop()
  55. await asyncio.sleep(1)
  56. assert consumer_tasks[idx].done()
  57. # start again
  58. consumer_tasks[idx] = asyncio.create_task(apps[idx].consume_forever())
  59. produce.cancel()
  60. for idx in range(5):
  61. await apps[idx].stop()
  62. async def test_consume_every_message_once_during_rebalance(kafka, topic_prefix):
  63. """
  64. No matter what, even without reassignment, some messages
  65. seem to be relayed. You can see if when a single consumer and no rebalance
  66. sometimes.
  67. """
  68. consumed = {}
  69. def record_msg(record):
  70. key = f"{record.partition}-{record.offset}"
  71. if key not in consumed:
  72. consumed[key] = 0
  73. consumed[key] += 1
  74. apps = []
  75. for idx in range(5):
  76. app = kafkaesk.Application(
  77. [f"{kafka[0]}:{kafka[1]}"],
  78. topic_prefix=topic_prefix,
  79. )
  80. app.schema(streams=[TOPIC])(Foo)
  81. app.id = idx
  82. @app.subscribe(TOPIC, group=GROUP)
  83. async def consumer(ob: Foo, record, app):
  84. record_msg(record)
  85. await app.initialize()
  86. apps.append(app)
  87. consumer_tasks = []
  88. for app in apps:
  89. consumer_tasks.append(asyncio.create_task(app.consume_forever()))
  90. await asyncio.sleep(1)
  91. produce = asyncio.create_task(producer(apps[0], TOPIC))
  92. await asyncio.sleep(5)
  93. # cycle through each, destroying...
  94. for idx in range(5):
  95. await apps[idx].stop()
  96. await asyncio.sleep(1)
  97. assert consumer_tasks[idx].done()
  98. # start again
  99. consumer_tasks[idx] = asyncio.create_task(apps[idx].consume_forever())
  100. produce.cancel()
  101. for idx in range(5):
  102. await apps[idx].stop()
  103. assert len(consumed) > 100
  104. # now check that we always consumed a message only once
  105. for v in consumed.values():
  106. assert v == 1