def load_environment_vars(): """Loads the environment information receivedfrom dockers boostrap_servers, backend, control_topic Returns: boostrap_servers (str): list of boostrap server for the Kafka connection backend (str): hostname of the backend control_topic (str): name of the control topic """ bootstrap_servers = os.environ.get('BOOTSTRAP_SERVERS') backend = os.environ.get('BACKEND') control_topic = os.environ.get('CONTROL_TOPIC') return (bootstrap_servers, backend, control_topic) if __name__ == '__main__': try: logging.basicConfig( stream=sys.stdout, level=logging.INFO, format='%(asctime)s.%(msecs)03d %(levelname)s %(module)s: %(message)s', datefmt='%Y-%m-%d %H:%M:%S') bootstrap_servers, backend, control_topic = load_environment_vars() """Loads the environment information""" logging.info("Received environment information (bootstrap_servers, backend, control_topic, control_topic) ([%s], [%s], [%s])", bootstrap_servers, backend, control_topic) consumer = KafkaConsumer(control_topic, enable_auto_commit=False, bootstrap_servers=bootstrap_servers, group_id='logger') """Starts a Kafka consumer to receive the datasource information from the control topic""" url = 'http://'+backend+'/datasources/' logging.info("Created and connected Kafka consumer for control topic") for msg in consumer: """Gets a new message from Kafka control topic""" logging.info("Message received in control topic") logging.info(msg) try: deployment_id = int.from_bytes(msg.key, byteorder='big') """Whether the deployment ID received matches the received in this task, then it is a datasource for this task.""" data = json.loads(msg.value) """ Data received from Kafka control topic. Data is a JSON with this format: dic={ 'topic': .., 'input_format': .., 'input_config' : .., 'validation_rate' : .., 'total_msg': .. 'description': .., } """ retry = 0 data['deployment'] = str(deployment_id) data['input_config'] = json.dumps(data['input_config']) data['time'] = datetime.datetime.utcfromtimestamp(msg.timestamp/1000.0).strftime("%Y-%m-%dT%H:%M:%S%Z") ok = False logging.info("Sending datasource to backend: [%s]", data) while not ok and retry < RETRIES: try: request = Request(url, json.dumps(data).encode(), headers={'Content-type': 'application/json'}) with urlopen(request) as resp: res = resp.read() ok = True resp.close() logging.info("Datasource sent to backend!!") consumer.commit() """commit the offset to Kafka after sending the data to the backend""" except Exception as e: retry+=1 time.sleep(SLEEP_BETWEEN_REQUESTS) logging.error("Error sending data to the backend [%s]. Try again in [%d]s.", str(e), SLEEP_BETWEEN_REQUESTS) except Exception as e: logging.error("Error with the received datasource [%s]. Waiting for new data.", str(e)) except Exception as e: traceback.print_exc() logging.error("Error in main [%s]. Component will be restarted.", str(e))