consumer-notification.py 741 B

1234567891011121314151617181920212223
  1. import json
  2. from time import sleep
  3. from kafka import KafkaConsumer
  4. if __name__ == '__main__':
  5. parsed_topic_name = 'parsed_recipes'
  6. # Notify if a recipe has more than 200 calories
  7. calories_threshold = 200
  8. consumer = KafkaConsumer(parsed_topic_name, auto_offset_reset='earliest',
  9. bootstrap_servers=['localhost:9092'], api_version=(0, 10), consumer_timeout_ms=1000)
  10. for msg in consumer:
  11. record = json.loads(msg.value)
  12. calories = int(record['calories'])
  13. title = record['title']
  14. if calories > calories_threshold:
  15. print('Alert: {} calories count is {}'.format(title, calories))
  16. sleep(3)
  17. if consumer is not None:
  18. consumer.close()