test_schema.py 1012 B

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950
  1. from kafkaesk import Application
  2. from kafkaesk.exceptions import SchemaConflictException
  3. import pydantic
  4. import pytest
  5. pytestmark = pytest.mark.asyncio
  6. async def test_not_allowed_to_register_same_schema_twice():
  7. app = Application()
  8. @app.schema("Foo", version=1)
  9. class Foo1(pydantic.BaseModel):
  10. bar: str
  11. with pytest.raises(SchemaConflictException):
  12. @app.schema("Foo", version=1)
  13. class Foo2(pydantic.BaseModel):
  14. foo: str
  15. async def test_do_not_require_schema_name():
  16. app = Application()
  17. @app.schema()
  18. class Foo(pydantic.BaseModel):
  19. bar: str
  20. assert "Foo:1" in app._schemas
  21. async def test_get_registered_schema():
  22. app = Application()
  23. @app.schema()
  24. class Foo(pydantic.BaseModel):
  25. bar: str
  26. assert app.get_schema_reg(Foo) is not None
  27. async def test_get_registered_schema_missing():
  28. app = Application()
  29. class Foo(pydantic.BaseModel):
  30. bar: str
  31. assert app.get_schema_reg(Foo) is None