logger.py 1.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041
  1. from kafkaesk import Application
  2. from kafkaesk.ext.logging import PydanticKafkaeskHandler
  3. from kafkaesk.ext.logging import PydanticLogModel
  4. from kafkaesk.ext.logging import PydanticStreamHandler
  5. from pydantic import BaseModel
  6. from typing import Optional
  7. import asyncio
  8. import logging
  9. class UserLog(BaseModel):
  10. _is_log_model = True
  11. user: Optional[str]
  12. async def test_log() -> None:
  13. app = Application(kafka_servers=["localhost:9092"])
  14. logger = logging.getLogger("kafkaesk.ext.logging.kafka")
  15. handler = PydanticKafkaeskHandler(app, "logging.test")
  16. logger.addHandler(handler)
  17. logger.setLevel(logging.DEBUG)
  18. stream_logger = logging.getLogger("kafakesk.ext.logging.stream")
  19. stream_handler = PydanticStreamHandler()
  20. stream_logger.addHandler(stream_handler)
  21. stream_logger.setLevel(logging.DEBUG)
  22. @app.subscribe("logging.test", group="example.logging.consumer")
  23. async def consume(data: PydanticLogModel) -> None:
  24. stream_logger.info(data.json())
  25. async with app:
  26. logger.debug("Log Message", UserLog(user="kafkaesk"))
  27. await app.flush()
  28. await app.consume_for(1, seconds=5)
  29. if __name__ == "__main__":
  30. asyncio.run(test_log())