parallel.py 1.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061
  1. from kafkaesk import Application
  2. from kafkaesk import run_app
  3. from pydantic import BaseModel
  4. import asyncio
  5. import logging
  6. import random
  7. logging.basicConfig(level=logging.INFO)
  8. app = Application()
  9. @app.schema("Foobar", streams=["content.foo", "slow.content.foo", "failed.content.foo"])
  10. class Foobar(BaseModel):
  11. timeout: int
  12. async def consumer_logic(data: Foobar, record, subscriber):
  13. try:
  14. print(f"{data} -- {record.headers}: waiting {data.timeout}s...")
  15. await asyncio.sleep(data.timeout)
  16. print(f"{data}: done...")
  17. except asyncio.CancelledError:
  18. # Slow topic
  19. print(f"{data} timeout message, sending to slow topic...")
  20. await subscriber.publish(f"slow.{record.topic}", record, headers=[("slow", b"true")])
  21. except Exception:
  22. await subscriber.publish(f"failed.{record.topic}", record)
  23. async def generate_data(app):
  24. idx = 0
  25. while True:
  26. timeout = random.randint(0, 10)
  27. await app.publish("content.foo", Foobar(timeout=timeout))
  28. idx += 1
  29. await asyncio.sleep(0.1)
  30. async def run():
  31. app.configure(kafka_servers=["localhost:9092"])
  32. task = asyncio.create_task(generate_data(app))
  33. # Regular tasks should be consumed in less than 5s
  34. app.subscribe("content.*", group="example_content_group", concurrency=10, timeout_seconds=5)(
  35. consumer_logic
  36. )
  37. # Timeout taks (slow) can be consumed independendly, with different configuration and logic
  38. app.subscribe(
  39. "slow.content.*", group="timeout_example_content_group", concurrency=1, timeout_seconds=None
  40. )(consumer_logic)
  41. await run_app(app)
  42. if __name__ == "__main__":
  43. asyncio.run(run())