1234567891011121314151617181920212223 |
- import json
- from time import sleep
- from kafka import KafkaConsumer
- if __name__ == '__main__':
- parsed_topic_name = 'parsed_recipes'
- # Notify if a recipe has more than 200 calories
- calories_threshold = 200
- consumer = KafkaConsumer(parsed_topic_name, auto_offset_reset='earliest',
- bootstrap_servers=['localhost:9092'], api_version=(0, 10), consumer_timeout_ms=1000)
- for msg in consumer:
- record = json.loads(msg.value)
- calories = int(record['calories'])
- title = record['title']
- if calories > calories_threshold:
- print('Alert: {} calories count is {}'.format(title, calories))
- sleep(3)
- if consumer is not None:
- consumer.close()
|