1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071 |
- def parse(markup):
- title = '-'
- submit_by = '-'
- description = '-'
- calories = 0
- ingredients = []
- rec = {}
- try:
- soup = BeautifulSoup(markup, 'lxml')
- # title
- title_section = soup.select('.recipe-summary__h1')
- # submitter
- submitter_section = soup.select('.submitter__name')
- # description
- description_section = soup.select('.submitter__description')
- # ingredients
- ingredients_section = soup.select('.recipe-ingred_txt')
- # calories
- calories_section = soup.select('.calorie-count')
- if calories_section:
- calories = calories_section[0].text.replace('cals', '').strip()
- if ingredients_section:
- for ingredient in ingredients_section:
- ingredient_text = ingredient.text.strip()
- if 'Add all ingredients to list' not in ingredient_text and ingredient_text != '':
- ingredients.append({'step': ingredient.text.strip()})
- if description_section:
- description = description_section[0].text.strip().replace('"', '')
- if submitter_section:
- submit_by = submitter_section[0].text.strip()
- if title_section:
- title = title_section[0].text
- rec = {'title': title, 'submitter': submit_by, 'description': description, 'calories': calories,
- 'ingredients': ingredients}
- except Exception as ex:
- print('Exception while parsing')
- print(str(ex))
- finally:
- return json.dumps(rec)
- if __name__ == '__main__':
- print('Running Consumer..')
- parsed_records = []
- topic_name = 'raw_recipes'
- parsed_topic_name = 'parsed_recipes'
- consumer = KafkaConsumer(topic_name, auto_offset_reset='earliest',
- bootstrap_servers=['localhost:9092'], api_version=(0, 10), consumer_timeout_ms=1000)
- for msg in consumer:
- html = msg.value
- result = parse(html)
- parsed_records.append(result)
- consumer.close()
- sleep(5)
- if len(parsed_records) > 0:
- print('Publishing records..')
- producer = connect_kafka_producer()
- for rec in parsed_records:
- publish_message(producer, parsed_topic_name, 'parsed', rec)
|