simple.py 855 B

123456789101112131415161718192021222324252627282930313233343536373839404142
  1. from kafkaesk import Application
  2. from kafkaesk import run_app
  3. from pydantic import BaseModel
  4. import asyncio
  5. import logging
  6. logging.basicConfig(level=logging.INFO)
  7. app = Application()
  8. @app.schema("Foobar")
  9. class Foobar(BaseModel):
  10. foo: str
  11. bar: str
  12. @app.subscribe("content.*", group="example_content_group")
  13. async def messages(data: Foobar, record):
  14. await asyncio.sleep(0.1)
  15. print(f"{data.foo}: {data.bar}: {record}")
  16. async def generate_data(app):
  17. idx = 0
  18. while True:
  19. await app.publish("content.foo", Foobar(foo=str(idx), bar="yo"))
  20. idx += 1
  21. await asyncio.sleep(0.1)
  22. async def run():
  23. app.configure(kafka_servers=["localhost:9092"])
  24. task = asyncio.create_task(generate_data(app))
  25. await run_app(app)
  26. # await app.consume_forever()
  27. if __name__ == "__main__":
  28. asyncio.run(run())