producer_consumer_parse_recipes.py 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. import json
  2. from time import sleep
  3. from bs4 import BeautifulSoup
  4. from kafka import KafkaConsumer, KafkaProducer
  5. def publish_message(producer_instance, topic_name, key, value):
  6. try:
  7. key_bytes = bytes(key, encoding='utf-8')
  8. value_bytes = bytes(value, encoding='utf-8')
  9. producer_instance.send(topic_name, key=key_bytes, value=value_bytes)
  10. producer_instance.flush()
  11. print('Message published successfully.')
  12. except Exception as ex:
  13. print('Exception in publishing message')
  14. print(str(ex))
  15. def connect_kafka_producer():
  16. _producer = None
  17. try:
  18. _producer = KafkaProducer(bootstrap_servers=['localhost:9092'], api_version=(0, 10))
  19. except Exception as ex:
  20. print('Exception while connecting Kafka')
  21. print(str(ex))
  22. finally:
  23. return _producer
  24. def parse(markup):
  25. title = '-'
  26. submit_by = '-'
  27. description = '-'
  28. calories = 0
  29. ingredients = []
  30. rec = {}
  31. try:
  32. soup = BeautifulSoup(markup, 'lxml')
  33. # title
  34. title_section = soup.select('.recipe-summary__h1')
  35. # submitter
  36. submitter_section = soup.select('.submitter__name')
  37. # description
  38. description_section = soup.select('.submitter__description')
  39. # ingredients
  40. ingredients_section = soup.select('.recipe-ingred_txt')
  41. # calories
  42. calories_section = soup.select('.calorie-count')
  43. if calories_section:
  44. calories = calories_section[0].text.replace('cals', '').strip()
  45. if ingredients_section:
  46. for ingredient in ingredients_section:
  47. ingredient_text = ingredient.text.strip()
  48. if 'Add all ingredients to list' not in ingredient_text and ingredient_text != '':
  49. ingredients.append({'step': ingredient.text.strip()})
  50. if description_section:
  51. description = description_section[0].text.strip().replace('"', '')
  52. if submitter_section:
  53. submit_by = submitter_section[0].text.strip()
  54. if title_section:
  55. title = title_section[0].text
  56. rec = {'title': title, 'submitter': submit_by, 'description': description, 'calories': calories,
  57. 'ingredients': ingredients}
  58. except Exception as ex:
  59. print('Exception while parsing')
  60. print(str(ex))
  61. finally:
  62. return json.dumps(rec)
  63. if __name__ == '__main__':
  64. print('Running Consumer..')
  65. parsed_records = []
  66. topic_name = 'raw_recipes'
  67. parsed_topic_name = 'parsed_recipes'
  68. consumer = KafkaConsumer(topic_name, auto_offset_reset='earliest',
  69. bootstrap_servers=['localhost:9092'], api_version=(0, 10), consumer_timeout_ms=1000)
  70. for msg in consumer:
  71. html = msg.value
  72. result = parse(html)
  73. parsed_records.append(result)
  74. consumer.close()
  75. sleep(5)
  76. if len(parsed_records) > 0:
  77. print('Publishing records..')
  78. producer = connect_kafka_producer()
  79. for rec in parsed_records:
  80. publish_message(producer, parsed_topic_name, 'parsed', rec)