123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101 |
- import json
- from time import sleep
- from bs4 import BeautifulSoup
- from kafka import KafkaConsumer, KafkaProducer
- def publish_message(producer_instance, topic_name, key, value):
- try:
- key_bytes = bytes(key, encoding='utf-8')
- value_bytes = bytes(value, encoding='utf-8')
- producer_instance.send(topic_name, key=key_bytes, value=value_bytes)
- producer_instance.flush()
- print('Message published successfully.')
- except Exception as ex:
- print('Exception in publishing message')
- print(str(ex))
- def connect_kafka_producer():
- _producer = None
- try:
- _producer = KafkaProducer(bootstrap_servers=['localhost:9092'], api_version=(0, 10))
- except Exception as ex:
- print('Exception while connecting Kafka')
- print(str(ex))
- finally:
- return _producer
- def parse(markup):
- title = '-'
- submit_by = '-'
- description = '-'
- calories = 0
- ingredients = []
- rec = {}
- try:
- soup = BeautifulSoup(markup, 'lxml')
- # title
- title_section = soup.select('.recipe-summary__h1')
- # submitter
- submitter_section = soup.select('.submitter__name')
- # description
- description_section = soup.select('.submitter__description')
- # ingredients
- ingredients_section = soup.select('.recipe-ingred_txt')
- # calories
- calories_section = soup.select('.calorie-count')
- if calories_section:
- calories = calories_section[0].text.replace('cals', '').strip()
- if ingredients_section:
- for ingredient in ingredients_section:
- ingredient_text = ingredient.text.strip()
- if 'Add all ingredients to list' not in ingredient_text and ingredient_text != '':
- ingredients.append({'step': ingredient.text.strip()})
- if description_section:
- description = description_section[0].text.strip().replace('"', '')
- if submitter_section:
- submit_by = submitter_section[0].text.strip()
- if title_section:
- title = title_section[0].text
- rec = {'title': title, 'submitter': submit_by, 'description': description, 'calories': calories,
- 'ingredients': ingredients}
- except Exception as ex:
- print('Exception while parsing')
- print(str(ex))
- finally:
- return json.dumps(rec)
- if __name__ == '__main__':
- print('Running Consumer..')
- parsed_records = []
- topic_name = 'raw_recipes'
- parsed_topic_name = 'parsed_recipes'
- consumer = KafkaConsumer(topic_name, auto_offset_reset='earliest',
- bootstrap_servers=['localhost:9092'], api_version=(0, 10), consumer_timeout_ms=1000)
- for msg in consumer:
- html = msg.value
- result = parse(html)
- parsed_records.append(result)
- consumer.close()
- sleep(5)
- if len(parsed_records) > 0:
- print('Publishing records..')
- producer = connect_kafka_producer()
- for rec in parsed_records:
- publish_message(producer, parsed_topic_name, 'parsed', rec)
|