1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283 |
- from time import sleep
- import requests
- from bs4 import BeautifulSoup
- from kafka import KafkaProducer
- def publish_message(producer_instance, topic_name, key, value):
- try:
- key_bytes = bytes(key, encoding='utf-8')
- value_bytes = bytes(value, encoding='utf-8')
- producer_instance.send(topic_name, key=key_bytes, value=value_bytes)
- producer_instance.flush()
- print('Message published successfully.')
- except Exception as ex:
- print('Exception in publishing message')
- print(str(ex))
- def connect_kafka_producer():
- _producer = None
- try:
- _producer = KafkaProducer(bootstrap_servers=['localhost:9092'], api_version=(0, 10))
- except Exception as ex:
- print('Exception while connecting Kafka')
- print(str(ex))
- finally:
- return _producer
- def fetch_raw(recipe_url):
- html = None
- print('Processing..{}'.format(recipe_url))
- try:
- r = requests.get(recipe_url, headers=headers)
- if r.status_code == 200:
- html = r.text
- except Exception as ex:
- print('Exception while accessing raw html')
- print(str(ex))
- finally:
- return html.strip()
- def get_recipes():
- recipies = []
- salad_url = 'https://www.allrecipes.com/recipes/96/salad/'
- url = 'https://www.allrecipes.com/recipes/96/salad/'
- print('Accessing list')
- try:
- r = requests.get(url, headers=headers)
- if r.status_code == 200:
- html = r.text
- soup = BeautifulSoup(html, 'lxml')
- links = soup.select('.fixed-recipe-card__h3 a')
- idx = 0
- for link in links:
- sleep(2)
- recipe = fetch_raw(link['href'])
- recipies.append(recipe)
- idx += 1
- except Exception as ex:
- print('Exception in get_recipes')
- print(str(ex))
- finally:
- return recipies
- if __name__ == '__main__':
- headers = {
- 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/66.0.3359.181 Safari/537.36',
- 'Pragma': 'no-cache'
- }
- all_recipes = get_recipes()
- if len(all_recipes) > 0:
- kafka_producer = connect_kafka_producer()
- for recipe in all_recipes:
- publish_message(kafka_producer, 'raw_recipes', 'raw', recipe.strip())
- if kafka_producer is not None:
- kafka_producer.close()
|