kafka_1.py 624 B

1234567891011121314151617
  1. def test__given__stream_already_started__when__call_start_again__then__raise_error():
  2. kafka_config.NUM_STREAM_THREADS = 0
  3. topology_builder = TopologyBuilder()
  4. topology_builder.source('my-source', ['my-input-topic-1'])
  5. topology_builder.processor('my-processor', MyTestProcessor, 'my-source')
  6. topology_builder.sink('my-sink', 'my-output-topic-1', 'my-processor')
  7. topology = topology_builder.build()
  8. kafka_streams = KafkaStreams(topology, kafka_config)
  9. kafka_streams.start()
  10. with pytest.raises(KafkaStreamsError, message='KafkaStreams already started.'):
  11. kafka_streams.start()