123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102 |
- __author__ = 'Cristian Martin Fdz'
- from kafka import KafkaConsumer
- from urllib.parse import urlencode
- from urllib.request import Request, urlopen
- import traceback
- import logging
- import sys
- import os
- import json
- import time
- import datetime
- RETRIES = 10
- '''Number of retries for requests'''
- SLEEP_BETWEEN_REQUESTS = 5
- '''Number of seconds between failed requests'''
- def load_environment_vars():
- """Loads the environment information receivedfrom dockers
- boostrap_servers, backend, control_topic
- Returns:
- boostrap_servers (str): list of boostrap server for the Kafka connection
- backend (str): hostname of the backend
- control_topic (str): name of the control topic
- """
- bootstrap_servers = os.environ.get('BOOTSTRAP_SERVERS')
- backend = os.environ.get('BACKEND')
- control_topic = os.environ.get('CONTROL_TOPIC')
- return (bootstrap_servers, backend, control_topic)
- if __name__ == '__main__':
- try:
- logging.basicConfig(
- stream=sys.stdout,
- level=logging.INFO,
- format='%(asctime)s.%(msecs)03d %(levelname)s %(module)s: %(message)s',
- datefmt='%Y-%m-%d %H:%M:%S')
- bootstrap_servers, backend, control_topic = load_environment_vars()
- """Loads the environment information"""
- logging.info("Received environment information (bootstrap_servers, backend, control_topic, control_topic) ([%s], [%s], [%s])",
- bootstrap_servers, backend, control_topic)
-
- consumer = KafkaConsumer(control_topic, enable_auto_commit=False, bootstrap_servers=bootstrap_servers, group_id='logger')
- """Starts a Kafka consumer to receive the datasource information from the control topic"""
-
- url = 'http://'+backend+'/datasources/'
- logging.info("Created and connected Kafka consumer for control topic")
- for msg in consumer:
- """Gets a new message from Kafka control topic"""
- logging.info("Message received in control topic")
- logging.info(msg)
- try:
- deployment_id = int.from_bytes(msg.key, byteorder='big')
- """Whether the deployment ID received matches the received in this task, then it is a datasource for this task."""
-
- data = json.loads(msg.value)
- """ Data received from Kafka control topic. Data is a JSON with this format:
- dic={
- 'topic': ..,
- 'input_format': ..,
- 'input_config' : ..,
- 'validation_rate' : ..,
- 'total_msg': ..
- 'description': ..,
- }
- """
-
- retry = 0
- data['deployment'] = str(deployment_id)
- data['input_config'] = json.dumps(data['input_config'])
- data['time'] = datetime.datetime.utcfromtimestamp(msg.timestamp/1000.0).strftime("%Y-%m-%dT%H:%M:%S%Z")
- ok = False
- logging.info("Sending datasource to backend: [%s]", data)
- while not ok and retry < RETRIES:
- try:
- request = Request(url, json.dumps(data).encode(), headers={'Content-type': 'application/json'})
- with urlopen(request) as resp:
- res = resp.read()
- ok = True
- resp.close()
- logging.info("Datasource sent to backend!!")
- consumer.commit()
- """commit the offset to Kafka after sending the data to the backend"""
-
- except Exception as e:
- retry+=1
- time.sleep(SLEEP_BETWEEN_REQUESTS)
- logging.error("Error sending data to the backend [%s]. Try again in [%d]s.", str(e), SLEEP_BETWEEN_REQUESTS)
- except Exception as e:
- logging.error("Error with the received datasource [%s]. Waiting for new data.", str(e))
- except Exception as e:
- traceback.print_exc()
- logging.error("Error in main [%s]. Component will be restarted.", str(e))
|