kafka_2.py 1.6 KB

12345678910111213141516171819202122232425262728293031323334353637
  1. def test__two__processes__with__two__topic__partitions():
  2. NUM_STREAM_PROCESSES = 2
  3. kafka_config.NUM_STREAM_THREADS = 1
  4. consumer = mock.Mock()
  5. producer = mock.Mock()
  6. processor_attrs = {'process.return_value': None}
  7. processor = mock.Mock(**processor_attrs)
  8. kafka_client_supplier_attrs = {'consumer.return_value': consumer,
  9. 'producer.return_value': producer}
  10. kafka_client_supplier = mock.Mock(**kafka_client_supplier_attrs)
  11. topology_builder = TopologyBuilder()
  12. topology_builder.source('my-source', ['my-input-topic-1'])
  13. topology_builder.processor('my-processor', processor, 'my-source')
  14. topology_builder.sink('my-sink', 'my-output-topic-1', 'my-processor')
  15. with mock.patch('winton_kafka_streams.kafka_client_supplier.KafkaClientSupplier', return_value=kafka_client_supplier):
  16. for partition in range(NUM_STREAM_PROCESSES):
  17. kafka_stream_process = KafkaStreams(topology_builder, kafka_config)
  18. topic_partition_attrs = {'topic': 'testtopic',
  19. 'partition': partition}
  20. topic_partition = mock.Mock(**topic_partition_attrs)
  21. kafka_stream_process.stream_threads[0].add_stream_tasks([topic_partition])
  22. record_attrs = {'topic.return_value': 'my-input-topic-1',
  23. 'offset.return_value': 1,
  24. 'partition.return_value': partition}
  25. record = mock.Mock(**record_attrs)
  26. kafka_stream_process.stream_threads[0].add_records_to_tasks([record])