test_kafka.py 2.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152
  1. from kafkaesk.kafka import KafkaTopicManager
  2. from unittest.mock import patch
  3. import kafka.errors
  4. import pytest
  5. pytestmark = pytest.mark.asyncio
  6. async def test_create_topic_uses_replication_factor_from_servers():
  7. mng = KafkaTopicManager(["foo", "bar"])
  8. with patch("kafka.admin.client.KafkaAdminClient"):
  9. await mng.create_topic("Foobar")
  10. client = await mng.get_admin_client()
  11. assert client.create_topics.called
  12. assert client.create_topics.call_args[0][0][0].replication_factor == 2
  13. async def test_create_topic_uses_replication_factor_from_servers_min_3():
  14. mng = KafkaTopicManager(["foo", "bar", "foo2", "foo3", "foo4"])
  15. with patch("kafka.admin.client.KafkaAdminClient"):
  16. await mng.create_topic("Foobar")
  17. client = await mng.get_admin_client()
  18. assert client.create_topics.called
  19. assert client.create_topics.call_args[0][0][0].replication_factor == 3
  20. async def test_create_topic_uses_replication_factor():
  21. mng = KafkaTopicManager(["foo", "bar"], replication_factor=1)
  22. with patch("kafka.admin.client.KafkaAdminClient"):
  23. await mng.create_topic("Foobar", retention_ms=100)
  24. client = await mng.get_admin_client()
  25. assert client.create_topics.called
  26. assert client.create_topics.call_args[0][0][0].replication_factor == 1
  27. assert client.create_topics.call_args[0][0][0].topic_configs["retention.ms"] == 100
  28. async def test_create_topic_already_exists():
  29. mng = KafkaTopicManager(["foo", "bar"], replication_factor=1)
  30. with patch("kafka.admin.client.KafkaAdminClient"):
  31. client = await mng.get_admin_client()
  32. client.create_topics.side_effect = kafka.errors.TopicAlreadyExistsError
  33. await mng.create_topic("Foobar")
  34. client.create_topics.assert_called_once()
  35. def test_constructor_translates_api_version():
  36. mng = KafkaTopicManager(["foobar"], kafka_api_version="auto")
  37. assert mng.kafka_api_version is None
  38. mng = KafkaTopicManager(["foobar"], kafka_api_version="2.4.0")
  39. assert mng.kafka_api_version == (2, 4, 0)