logger.py 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
  1. __author__ = 'Cristian Martin Fdz'
  2. from kafka import KafkaConsumer
  3. from urllib.parse import urlencode
  4. from urllib.request import Request, urlopen
  5. import traceback
  6. import logging
  7. import sys
  8. import os
  9. import json
  10. import time
  11. import datetime
  12. RETRIES = 10
  13. '''Number of retries for requests'''
  14. SLEEP_BETWEEN_REQUESTS = 5
  15. '''Number of seconds between failed requests'''
  16. def load_environment_vars():
  17. """Loads the environment information receivedfrom dockers
  18. boostrap_servers, backend, control_topic
  19. Returns:
  20. boostrap_servers (str): list of boostrap server for the Kafka connection
  21. backend (str): hostname of the backend
  22. control_topic (str): name of the control topic
  23. """
  24. bootstrap_servers = os.environ.get('BOOTSTRAP_SERVERS')
  25. backend = os.environ.get('BACKEND')
  26. control_topic = os.environ.get('CONTROL_TOPIC')
  27. return (bootstrap_servers, backend, control_topic)
  28. if __name__ == '__main__':
  29. try:
  30. logging.basicConfig(
  31. stream=sys.stdout,
  32. level=logging.INFO,
  33. format='%(asctime)s.%(msecs)03d %(levelname)s %(module)s: %(message)s',
  34. datefmt='%Y-%m-%d %H:%M:%S')
  35. bootstrap_servers, backend, control_topic = load_environment_vars()
  36. """Loads the environment information"""
  37. logging.info("Received environment information (bootstrap_servers, backend, control_topic, control_topic) ([%s], [%s], [%s])",
  38. bootstrap_servers, backend, control_topic)
  39. consumer = KafkaConsumer(control_topic, enable_auto_commit=False, bootstrap_servers=bootstrap_servers, group_id='logger')
  40. """Starts a Kafka consumer to receive the datasource information from the control topic"""
  41. url = 'http://'+backend+'/datasources/'
  42. logging.info("Created and connected Kafka consumer for control topic")
  43. for msg in consumer:
  44. """Gets a new message from Kafka control topic"""
  45. logging.info("Message received in control topic")
  46. logging.info(msg)
  47. try:
  48. deployment_id = int.from_bytes(msg.key, byteorder='big')
  49. """Whether the deployment ID received matches the received in this task, then it is a datasource for this task."""
  50. data = json.loads(msg.value)
  51. """ Data received from Kafka control topic. Data is a JSON with this format:
  52. dic={
  53. 'topic': ..,
  54. 'input_format': ..,
  55. 'input_config' : ..,
  56. 'validation_rate' : ..,
  57. 'total_msg': ..
  58. 'description': ..,
  59. }
  60. """
  61. retry = 0
  62. data['deployment'] = str(deployment_id)
  63. data['input_config'] = json.dumps(data['input_config'])
  64. data['time'] = datetime.datetime.utcfromtimestamp(msg.timestamp/1000.0).strftime("%Y-%m-%dT%H:%M:%S%Z")
  65. ok = False
  66. logging.info("Sending datasource to backend: [%s]", data)
  67. while not ok and retry < RETRIES:
  68. try:
  69. request = Request(url, json.dumps(data).encode(), headers={'Content-type': 'application/json'})
  70. with urlopen(request) as resp:
  71. res = resp.read()
  72. ok = True
  73. resp.close()
  74. logging.info("Datasource sent to backend!!")
  75. consumer.commit()
  76. """commit the offset to Kafka after sending the data to the backend"""
  77. except Exception as e:
  78. retry+=1
  79. time.sleep(SLEEP_BETWEEN_REQUESTS)
  80. logging.error("Error sending data to the backend [%s]. Try again in [%d]s.", str(e), SLEEP_BETWEEN_REQUESTS)
  81. except Exception as e:
  82. logging.error("Error with the received datasource [%s]. Waiting for new data.", str(e))
  83. except Exception as e:
  84. traceback.print_exc()
  85. logging.error("Error in main [%s]. Component will be restarted.", str(e))