kafka.py 2.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667
  1. import pytest
  2. import unittest.mock as mock
  3. from winton_kafka_streams import kafka_config
  4. from winton_kafka_streams.errors.kafka_streams_error import KafkaStreamsError
  5. from winton_kafka_streams.kafka_streams import KafkaStreams
  6. from winton_kafka_streams.processor.processor import BaseProcessor
  7. from winton_kafka_streams.processor.topology import TopologyBuilder
  8. class MyTestProcessor(BaseProcessor):
  9. pass
  10. def test__given__stream_already_started__when__call_start_again__then__raise_error():
  11. kafka_config.NUM_STREAM_THREADS = 0
  12. topology_builder = TopologyBuilder()
  13. topology_builder.source('my-source', ['my-input-topic-1'])
  14. topology_builder.processor('my-processor', MyTestProcessor, 'my-source')
  15. topology_builder.sink('my-sink', 'my-output-topic-1', 'my-processor')
  16. topology = topology_builder.build()
  17. kafka_streams = KafkaStreams(topology, kafka_config)
  18. kafka_streams.start()
  19. with pytest.raises(KafkaStreamsError, message='KafkaStreams already started.'):
  20. kafka_streams.start()
  21. def test__two__processes__with__two__topic__partitions():
  22. NUM_STREAM_PROCESSES = 2
  23. kafka_config.NUM_STREAM_THREADS = 1
  24. consumer = mock.Mock()
  25. producer = mock.Mock()
  26. processor_attrs = {'process.return_value': None}
  27. processor = mock.Mock(**processor_attrs)
  28. kafka_client_supplier_attrs = {'consumer.return_value': consumer,
  29. 'producer.return_value': producer}
  30. kafka_client_supplier = mock.Mock(**kafka_client_supplier_attrs)
  31. topology_builder = TopologyBuilder()
  32. topology_builder.source('my-source', ['my-input-topic-1'])
  33. topology_builder.processor('my-processor', processor, 'my-source')
  34. topology_builder.sink('my-sink', 'my-output-topic-1', 'my-processor')
  35. with mock.patch('winton_kafka_streams.kafka_client_supplier.KafkaClientSupplier', return_value=kafka_client_supplier):
  36. for partition in range(NUM_STREAM_PROCESSES):
  37. kafka_stream_process = KafkaStreams(topology_builder, kafka_config)
  38. topic_partition_attrs = {'topic': 'testtopic',
  39. 'partition': partition}
  40. topic_partition = mock.Mock(**topic_partition_attrs)
  41. kafka_stream_process.stream_threads[0].add_stream_tasks([topic_partition])
  42. record_attrs = {'topic.return_value': 'my-input-topic-1',
  43. 'offset.return_value': 1,
  44. 'partition.return_value': partition}
  45. record = mock.Mock(**record_attrs)
  46. kafka_stream_process.stream_threads[0].add_records_to_tasks([record])