producer_consumer_parse_recipes_3.py 2.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071
  1. def parse(markup):
  2. title = '-'
  3. submit_by = '-'
  4. description = '-'
  5. calories = 0
  6. ingredients = []
  7. rec = {}
  8. try:
  9. soup = BeautifulSoup(markup, 'lxml')
  10. # title
  11. title_section = soup.select('.recipe-summary__h1')
  12. # submitter
  13. submitter_section = soup.select('.submitter__name')
  14. # description
  15. description_section = soup.select('.submitter__description')
  16. # ingredients
  17. ingredients_section = soup.select('.recipe-ingred_txt')
  18. # calories
  19. calories_section = soup.select('.calorie-count')
  20. if calories_section:
  21. calories = calories_section[0].text.replace('cals', '').strip()
  22. if ingredients_section:
  23. for ingredient in ingredients_section:
  24. ingredient_text = ingredient.text.strip()
  25. if 'Add all ingredients to list' not in ingredient_text and ingredient_text != '':
  26. ingredients.append({'step': ingredient.text.strip()})
  27. if description_section:
  28. description = description_section[0].text.strip().replace('"', '')
  29. if submitter_section:
  30. submit_by = submitter_section[0].text.strip()
  31. if title_section:
  32. title = title_section[0].text
  33. rec = {'title': title, 'submitter': submit_by, 'description': description, 'calories': calories,
  34. 'ingredients': ingredients}
  35. except Exception as ex:
  36. print('Exception while parsing')
  37. print(str(ex))
  38. finally:
  39. return json.dumps(rec)
  40. if __name__ == '__main__':
  41. print('Running Consumer..')
  42. parsed_records = []
  43. topic_name = 'raw_recipes'
  44. parsed_topic_name = 'parsed_recipes'
  45. consumer = KafkaConsumer(topic_name, auto_offset_reset='earliest',
  46. bootstrap_servers=['localhost:9092'], api_version=(0, 10), consumer_timeout_ms=1000)
  47. for msg in consumer:
  48. html = msg.value
  49. result = parse(html)
  50. parsed_records.append(result)
  51. consumer.close()
  52. sleep(5)
  53. if len(parsed_records) > 0:
  54. print('Publishing records..')
  55. producer = connect_kafka_producer()
  56. for rec in parsed_records:
  57. publish_message(producer, parsed_topic_name, 'parsed', rec)