12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667 |
- import pytest
- import unittest.mock as mock
- from winton_kafka_streams import kafka_config
- from winton_kafka_streams.errors.kafka_streams_error import KafkaStreamsError
- from winton_kafka_streams.kafka_streams import KafkaStreams
- from winton_kafka_streams.processor.processor import BaseProcessor
- from winton_kafka_streams.processor.topology import TopologyBuilder
- class MyTestProcessor(BaseProcessor):
- pass
- def test__given__stream_already_started__when__call_start_again__then__raise_error():
- kafka_config.NUM_STREAM_THREADS = 0
- topology_builder = TopologyBuilder()
- topology_builder.source('my-source', ['my-input-topic-1'])
- topology_builder.processor('my-processor', MyTestProcessor, 'my-source')
- topology_builder.sink('my-sink', 'my-output-topic-1', 'my-processor')
- topology = topology_builder.build()
- kafka_streams = KafkaStreams(topology, kafka_config)
- kafka_streams.start()
- with pytest.raises(KafkaStreamsError, message='KafkaStreams already started.'):
- kafka_streams.start()
- def test__two__processes__with__two__topic__partitions():
- NUM_STREAM_PROCESSES = 2
- kafka_config.NUM_STREAM_THREADS = 1
- consumer = mock.Mock()
- producer = mock.Mock()
- processor_attrs = {'process.return_value': None}
- processor = mock.Mock(**processor_attrs)
- kafka_client_supplier_attrs = {'consumer.return_value': consumer,
- 'producer.return_value': producer}
- kafka_client_supplier = mock.Mock(**kafka_client_supplier_attrs)
- topology_builder = TopologyBuilder()
- topology_builder.source('my-source', ['my-input-topic-1'])
- topology_builder.processor('my-processor', processor, 'my-source')
- topology_builder.sink('my-sink', 'my-output-topic-1', 'my-processor')
- with mock.patch('winton_kafka_streams.kafka_client_supplier.KafkaClientSupplier', return_value=kafka_client_supplier):
- for partition in range(NUM_STREAM_PROCESSES):
- kafka_stream_process = KafkaStreams(topology_builder, kafka_config)
- topic_partition_attrs = {'topic': 'testtopic',
- 'partition': partition}
- topic_partition = mock.Mock(**topic_partition_attrs)
- kafka_stream_process.stream_threads[0].add_stream_tasks([topic_partition])
- record_attrs = {'topic.return_value': 'my-input-topic-1',
- 'offset.return_value': 1,
- 'partition.return_value': partition}
- record = mock.Mock(**record_attrs)
- kafka_stream_process.stream_threads[0].add_records_to_tasks([record])
|