data_develop.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164
  1. import os
  2. import json
  3. import string
  4. import random
  5. from uuid import uuid4
  6. from pprint import pprint
  7. from datetime import datetime
  8. from config import (
  9. string_range,
  10. nats_subject,
  11. folder_use
  12. )
  13. def check_folder(folder:str=folder_use) -> None:
  14. """
  15. Function which is about checking folders
  16. Input: folder = string values of it
  17. Output: we created folder if we have to
  18. """
  19. os.path.exists(folder) or os.mkdir(folder)
  20. def get_date_current() -> str:
  21. """
  22. Function which is about getting current datetime for the files
  23. Input: None
  24. Output: we created string values of the current situations
  25. """
  26. return datetime.utcnow().strftime("%Y-%m-%d-%H-%M")
  27. def get_file_name_json(value_input:bool=True) -> str:
  28. """
  29. Function which is dedicated to get file name of the selected json
  30. Input: value_input = check values of the json
  31. Output: string with the selected to get name
  32. """
  33. if value_input == True:
  34. return f"received_{nats_subject}_{get_date_current()}.json"
  35. elif value_input == False:
  36. return f"merged_{nats_subject}.json"
  37. elif value_input == '1':
  38. return f"statistics_{nats_subject}.json"
  39. def develop_file_writes(callback_received:bytes) -> None:
  40. """
  41. Function which is dedicated to writed the file presence
  42. Input: callback_received = bytes which were previously sent
  43. Output: we created or appended values to the file
  44. """
  45. check_folder(folder_use)
  46. value_file = os.path.join(folder_use, get_file_name_json())
  47. if os.path.exists(value_file):
  48. with open(value_file, 'r') as file_write:
  49. value_use = json.load(file_write)
  50. value_use.append(callback_received)
  51. else:
  52. value_use = [callback_received]
  53. with open(value_file, 'w') as file_write:
  54. json.dump(
  55. value_use,
  56. file_write,
  57. indent=4
  58. )
  59. def develop_random_data(index:int=0) -> dict:
  60. """
  61. Function which is about the usage of the random data
  62. Input: None
  63. Output: dictionary with the selected values of the random data
  64. """
  65. return json.dumps(
  66. {
  67. "index": index,
  68. "uuid": str(uuid4()),
  69. "name_person": ''.join(random.choice(string.ascii_lowercase) for _ in range(string_range)),
  70. "date_created": datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%f')
  71. }
  72. )
  73. def develop_callback(message:object) -> None:
  74. """
  75. Function which is about to add the values to the callback
  76. Input: message = received values of it
  77. Output: we dealed with the previous
  78. """
  79. received = datetime.utcnow()
  80. date_used = received.strftime('%Y-%m-%d %H:%M:%S.%f')
  81. message_payload = json.loads(message.payload)
  82. send = datetime.strptime(message_payload.get('date_created'), '%Y-%m-%d %H:%M:%S.%f')
  83. proccessed = datetime.utcnow()
  84. delta_full = proccessed - send
  85. delta_proccessed = proccessed - received
  86. delta_send = received - send
  87. index = message_payload.pop('index')
  88. message_payload.update(
  89. {
  90. 'date_received': date_used,
  91. 'date_processed': proccessed.strftime('%Y-%m-%d %H:%M:%S.%f'),
  92. 'delta_send': delta_send.microseconds/1000000 + delta_send.seconds + delta_send.days*60*60*24,
  93. 'delta_full': delta_full.microseconds/1000000 + delta_full.seconds + delta_full.days*60*60*24,
  94. 'delta_proccessed': delta_proccessed.microseconds/1000000 + delta_proccessed.seconds + delta_proccessed.days*60*60*24,
  95. }
  96. )
  97. develop_file_writes(message_payload)
  98. print('Received Index:', index)
  99. print('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>')
  100. async def develop_callback_async(message:object) -> None:
  101. """
  102. Function which is about to add the values to the callback
  103. Input: message = received values of it
  104. Output: we dealed with the previous
  105. """
  106. received = datetime.utcnow()
  107. date_used = received.strftime('%Y-%m-%d %H:%M:%S.%f')
  108. message_payload = json.loads(message.data.decode())
  109. send = datetime.strptime(message_payload.get('date_created'), '%Y-%m-%d %H:%M:%S.%f')
  110. proccessed = datetime.utcnow()
  111. delta_full = proccessed - send
  112. delta_proccessed = proccessed - received
  113. delta_send = received - send
  114. index = message_payload.pop('index')
  115. message_payload.update(
  116. {
  117. 'date_received': date_used,
  118. 'date_processed': proccessed.strftime('%Y-%m-%d %H:%M:%S.%f'),
  119. 'delta_send': delta_send.microseconds/1000000 + delta_send.seconds + delta_send.days*60*60*24,
  120. 'delta_full': delta_full.microseconds/1000000 + delta_full.seconds + delta_full.days*60*60*24,
  121. 'delta_proccessed': delta_proccessed.microseconds/1000000 + delta_proccessed.seconds + delta_proccessed.days*60*60*24,
  122. }
  123. )
  124. develop_file_writes(message_payload)
  125. print('Received Index:', index)
  126. print('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>')
  127. def merge_callback_result() -> None:
  128. """
  129. Function which is dedicated to use value
  130. Input: None
  131. Output: we created the values of the merged dataframe
  132. """
  133. list_merged, value_use = [], []
  134. for file in os.listdir(folder_use):
  135. if file == get_file_name_json(False):
  136. list_merged.append(file)
  137. elif f"received_{nats_subject}" in file:
  138. list_merged.append(file)
  139. for f in sorted(list_merged):
  140. with open(os.path.join(folder_use, f), 'r') as read:
  141. value_use.extend(json.load(read))
  142. with open(os.path.join(folder_use, get_file_name_json(False)), 'w') as file_write:
  143. json.dump(
  144. value_use,
  145. file_write,
  146. indent=4
  147. )
  148. for f in list_merged:
  149. if f != get_file_name_json(False):
  150. os.remove(
  151. os.path.join(
  152. folder_use,
  153. f
  154. )
  155. )