logger_1.py 3.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
  1. def load_environment_vars():
  2. """Loads the environment information receivedfrom dockers
  3. boostrap_servers, backend, control_topic
  4. Returns:
  5. boostrap_servers (str): list of boostrap server for the Kafka connection
  6. backend (str): hostname of the backend
  7. control_topic (str): name of the control topic
  8. """
  9. bootstrap_servers = os.environ.get('BOOTSTRAP_SERVERS')
  10. backend = os.environ.get('BACKEND')
  11. control_topic = os.environ.get('CONTROL_TOPIC')
  12. return (bootstrap_servers, backend, control_topic)
  13. if __name__ == '__main__':
  14. try:
  15. logging.basicConfig(
  16. stream=sys.stdout,
  17. level=logging.INFO,
  18. format='%(asctime)s.%(msecs)03d %(levelname)s %(module)s: %(message)s',
  19. datefmt='%Y-%m-%d %H:%M:%S')
  20. bootstrap_servers, backend, control_topic = load_environment_vars()
  21. """Loads the environment information"""
  22. logging.info("Received environment information (bootstrap_servers, backend, control_topic, control_topic) ([%s], [%s], [%s])",
  23. bootstrap_servers, backend, control_topic)
  24. consumer = KafkaConsumer(control_topic, enable_auto_commit=False, bootstrap_servers=bootstrap_servers, group_id='logger')
  25. """Starts a Kafka consumer to receive the datasource information from the control topic"""
  26. url = 'http://'+backend+'/datasources/'
  27. logging.info("Created and connected Kafka consumer for control topic")
  28. for msg in consumer:
  29. """Gets a new message from Kafka control topic"""
  30. logging.info("Message received in control topic")
  31. logging.info(msg)
  32. try:
  33. deployment_id = int.from_bytes(msg.key, byteorder='big')
  34. """Whether the deployment ID received matches the received in this task, then it is a datasource for this task."""
  35. data = json.loads(msg.value)
  36. """ Data received from Kafka control topic. Data is a JSON with this format:
  37. dic={
  38. 'topic': ..,
  39. 'input_format': ..,
  40. 'input_config' : ..,
  41. 'validation_rate' : ..,
  42. 'total_msg': ..
  43. 'description': ..,
  44. }
  45. """
  46. retry = 0
  47. data['deployment'] = str(deployment_id)
  48. data['input_config'] = json.dumps(data['input_config'])
  49. data['time'] = datetime.datetime.utcfromtimestamp(msg.timestamp/1000.0).strftime("%Y-%m-%dT%H:%M:%S%Z")
  50. ok = False
  51. logging.info("Sending datasource to backend: [%s]", data)
  52. while not ok and retry < RETRIES:
  53. try:
  54. request = Request(url, json.dumps(data).encode(), headers={'Content-type': 'application/json'})
  55. with urlopen(request) as resp:
  56. res = resp.read()
  57. ok = True
  58. resp.close()
  59. logging.info("Datasource sent to backend!!")
  60. consumer.commit()
  61. """commit the offset to Kafka after sending the data to the backend"""
  62. except Exception as e:
  63. retry+=1
  64. time.sleep(SLEEP_BETWEEN_REQUESTS)
  65. logging.error("Error sending data to the backend [%s]. Try again in [%d]s.", str(e), SLEEP_BETWEEN_REQUESTS)
  66. except Exception as e:
  67. logging.error("Error with the received datasource [%s]. Waiting for new data.", str(e))
  68. except Exception as e:
  69. traceback.print_exc()
  70. logging.error("Error in main [%s]. Component will be restarted.", str(e))