刘凡 3282579ec1 first commit | 2 年 前 | |
---|---|---|
.. | ||
.github | 2 年 前 | |
examples | 2 年 前 | |
kafkaesk | 2 年 前 | |
stubs | 2 年 前 | |
tests | 2 年 前 | |
.bandit | 2 年 前 | |
.flake8 | 2 年 前 | |
.gitignore | 2 年 前 | |
.isort.cfg | 2 年 前 | |
.pre-commit-config.yaml | 2 年 前 | |
LICENSE | 2 年 前 | |
README.md | 2 年 前 | |
docker-compose.yml | 2 年 前 | |
mypy.ini | 2 年 前 | |
poetry.lock | 2 年 前 | |
pyproject.toml | 2 年 前 | |
pytest.ini | 2 年 前 |
This project is meant to help facilitate effortless publishing and subscribing to events with Python and Kafka.
Consider this Python project as syntactic sugar around these ideas.
Using pydantic but can be done with pure JSON.
import kafkaesk
from pydantic import BaseModel
app = kafkaesk.Application()
@app.schema("Content", version=1, retention=24 * 60 * 60)
class ContentMessage(BaseModel):
foo: str
async def foobar():
# ...
# doing something in an async func
await app.publish("content.edited.Resource", data=ContentMessage(foo="bar"))
A convenience method is available in the subscriber
dependency instance, this allow to header
propagation from the consumed message.
import kafkaesk
from pydantic import BaseModel
app = kafkaesk.Application()
@app.schema("Content", version=1, retention=24 * 60 * 60)
class ContentMessage(BaseModel):
foo: str
@app.subscribe("content.*", "group_id")
async def get_messages(data: ContentMessage, subscriber):
print(f"{data.foo}")
# This will propagate `data` record headers
await subscriber.publish("content.edited.Resource", data=ContentMessage(foo="bar"))
import kafkaesk
from pydantic import BaseModel
app = kafkaesk.Application()
@app.schema("Content", version=1, retention=24 * 60 * 60)
class ContentMessage(BaseModel):
foo: str
@app.subscribe("content.*", "group_id")
async def get_messages(data: ContentMessage):
print(f"{data.foo}")
If you do not want to have global application configuration, you can lazily configure the application and register schemas/subscribers separately.
import kafkaesk
from pydantic import BaseModel
router = kafkaesk.Router()
@router.schema("Content", version=1, retention=24 * 60 * 60)
class ContentMessage(BaseModel):
foo: str
@router.subscribe("content.*", "group_id")
async def get_messages(data: ContentMessage):
print(f"{data.foo}")
if __name__ == "__main__":
app = kafkaesk.Application()
app.mount(router)
kafkaesk.run(app)
Optional consumer injected parameters:
Depending on the type annotation for the first parameter, you will get different data injected:
async def get_messages(data: ContentMessage)
: parses pydantic schemaasync def get_messages(data: bytes)
: give raw byte dataasync def get_messages(record: aiokafka.structs.ConsumerRecord)
: give kafka record objectasync def get_messages(data)
: raw json data in messageTo accomplish a manual commit strategy yourself:
app = kafkaesk.Application(auto_commit=False)
@app.subscribe("content.*", "group_id")
async def get_messages(data: ContentMessage, subscriber):
print(f"{data.foo}")
await subscriber.consumer.commit()
Add these values to your kafak_settings
:
`ssl_context` - this should be a placeholder as the SSL Context is generally created within the application
`security_protocol` - one of SSL or PLAINTEXT
`sasl_mechanism` - one of PLAIN, GSSAPI, SCRAM-SHA-256, SCRAM-SHA-512, OAUTHBEARER
`sasl_plain_username`
`sasl_plain_password`
This is a library around using kafka. Kafka itself does not enforce these concepts.
{
"schema": "schema_name:1",
"data": { ... }
}
kafkaesk mymodule:app --kafka-servers=localhost:9092
Options:
auto
True
5000
poetry install
Run tests:
docker-compose up
KAFKA=localhost:9092 poetry run pytest tests
This extension includes classes to extend Python's logging framework to publish structured log messages to a Kafka topic.
This extension is made up of three main components: an extended logging.LogRecord
and some custom logging.Handler
s.
See logger.py
in examples directory.
kafkaesk.ext.logging.record.factory
is a function that will return kafkaesk.ext.logging.record.PydanticLogRecord
objects.
The factory()
function scans through any args
passed to a logger and checks each item to determine if it is a subclass of pydantid.BaseModel
.
If it is a base model instance and model._is_log_model
evaluates to True
the model will be removed from args
and added to record._pydantic_data
.
After that factory()
will use logging's existing logic to finish creating the log record.
This extensions ships with two handlers capable of handling kafkaesk.ext.logging.handler.PydanticLogModel
classes: kafakesk.ext.logging.handler.PydanticStreamHandler
and kafkaesk.ext.logging.handler.PydanticKafkaeskHandler
.
The stream handler is a very small wrapper around logging.StreamHandler
, the signature is the same, the only difference is that the handler will attempt to convert any pydantic models it receives to a human readable log message.
The kafkaesk handler has a few more bits going on in the background.
The handler has two required inputs, a kafkaesk.app.Application
instance and a stream name.
Once initialized any logs emitted by the handler will be saved into an internal queue. There is a worker task that handles pulling logs from the queue and writing those logs to the specified topic.
It's hard and "kafka" is already a fun name. Hopefully this library isn't literally "kafkaesque" for you.