12345678910111213141516171819202122232425262728293031323334353637 |
- def test__two__processes__with__two__topic__partitions():
- NUM_STREAM_PROCESSES = 2
- kafka_config.NUM_STREAM_THREADS = 1
- consumer = mock.Mock()
- producer = mock.Mock()
- processor_attrs = {'process.return_value': None}
- processor = mock.Mock(**processor_attrs)
- kafka_client_supplier_attrs = {'consumer.return_value': consumer,
- 'producer.return_value': producer}
- kafka_client_supplier = mock.Mock(**kafka_client_supplier_attrs)
- topology_builder = TopologyBuilder()
- topology_builder.source('my-source', ['my-input-topic-1'])
- topology_builder.processor('my-processor', processor, 'my-source')
- topology_builder.sink('my-sink', 'my-output-topic-1', 'my-processor')
- with mock.patch('winton_kafka_streams.kafka_client_supplier.KafkaClientSupplier', return_value=kafka_client_supplier):
- for partition in range(NUM_STREAM_PROCESSES):
- kafka_stream_process = KafkaStreams(topology_builder, kafka_config)
- topic_partition_attrs = {'topic': 'testtopic',
- 'partition': partition}
- topic_partition = mock.Mock(**topic_partition_attrs)
- kafka_stream_process.stream_threads[0].add_stream_tasks([topic_partition])
- record_attrs = {'topic.return_value': 'my-input-topic-1',
- 'offset.return_value': 1,
- 'partition.return_value': partition}
- record = mock.Mock(**record_attrs)
- kafka_stream_process.stream_threads[0].add_records_to_tasks([record])
|