123456789101112131415161718192021222324252627282930313233343536373839404142 |
- from kafkaesk import Application
- from kafkaesk import run_app
- from pydantic import BaseModel
- import asyncio
- import logging
- logging.basicConfig(level=logging.INFO)
- app = Application()
- @app.schema("Foobar")
- class Foobar(BaseModel):
- foo: str
- bar: str
- @app.subscribe("content.*", group="example_content_group")
- async def messages(data: Foobar, record):
- await asyncio.sleep(0.1)
- print(f"{data.foo}: {data.bar}: {record}")
- async def generate_data(app):
- idx = 0
- while True:
- await app.publish("content.foo", Foobar(foo=str(idx), bar="yo"))
- idx += 1
- await asyncio.sleep(0.1)
- async def run():
- app.configure(kafka_servers=["localhost:9092"])
- task = asyncio.create_task(generate_data(app))
- await run_app(app)
- # await app.consume_forever()
- if __name__ == "__main__":
- asyncio.run(run())
|