123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164 |
- import os
- import json
- import string
- import random
- from uuid import uuid4
- from pprint import pprint
- from datetime import datetime
- from config import (
- string_range,
- nats_subject,
- folder_use
- )
- def check_folder(folder:str=folder_use) -> None:
- """
- Function which is about checking folders
- Input: folder = string values of it
- Output: we created folder if we have to
- """
- os.path.exists(folder) or os.mkdir(folder)
- def get_date_current() -> str:
- """
- Function which is about getting current datetime for the files
- Input: None
- Output: we created string values of the current situations
- """
- return datetime.utcnow().strftime("%Y-%m-%d-%H-%M")
- def get_file_name_json(value_input:bool=True) -> str:
- """
- Function which is dedicated to get file name of the selected json
- Input: value_input = check values of the json
- Output: string with the selected to get name
- """
- if value_input == True:
- return f"received_{nats_subject}_{get_date_current()}.json"
- elif value_input == False:
- return f"merged_{nats_subject}.json"
- elif value_input == '1':
- return f"statistics_{nats_subject}.json"
- def develop_file_writes(callback_received:bytes) -> None:
- """
- Function which is dedicated to writed the file presence
- Input: callback_received = bytes which were previously sent
- Output: we created or appended values to the file
- """
- check_folder(folder_use)
- value_file = os.path.join(folder_use, get_file_name_json())
- if os.path.exists(value_file):
- with open(value_file, 'r') as file_write:
- value_use = json.load(file_write)
- value_use.append(callback_received)
- else:
- value_use = [callback_received]
- with open(value_file, 'w') as file_write:
- json.dump(
- value_use,
- file_write,
- indent=4
- )
- def develop_random_data(index:int=0) -> dict:
- """
- Function which is about the usage of the random data
- Input: None
- Output: dictionary with the selected values of the random data
- """
- return json.dumps(
- {
- "index": index,
- "uuid": str(uuid4()),
- "name_person": ''.join(random.choice(string.ascii_lowercase) for _ in range(string_range)),
- "date_created": datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%f')
- }
- )
- def develop_callback(message:object) -> None:
- """
- Function which is about to add the values to the callback
- Input: message = received values of it
- Output: we dealed with the previous
- """
- received = datetime.utcnow()
- date_used = received.strftime('%Y-%m-%d %H:%M:%S.%f')
- message_payload = json.loads(message.payload)
- send = datetime.strptime(message_payload.get('date_created'), '%Y-%m-%d %H:%M:%S.%f')
- proccessed = datetime.utcnow()
- delta_full = proccessed - send
- delta_proccessed = proccessed - received
- delta_send = received - send
- index = message_payload.pop('index')
- message_payload.update(
- {
- 'date_received': date_used,
- 'date_processed': proccessed.strftime('%Y-%m-%d %H:%M:%S.%f'),
- 'delta_send': delta_send.microseconds/1000000 + delta_send.seconds + delta_send.days*60*60*24,
- 'delta_full': delta_full.microseconds/1000000 + delta_full.seconds + delta_full.days*60*60*24,
- 'delta_proccessed': delta_proccessed.microseconds/1000000 + delta_proccessed.seconds + delta_proccessed.days*60*60*24,
- }
- )
- develop_file_writes(message_payload)
- print('Received Index:', index)
- print('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>')
- async def develop_callback_async(message:object) -> None:
- """
- Function which is about to add the values to the callback
- Input: message = received values of it
- Output: we dealed with the previous
- """
- received = datetime.utcnow()
- date_used = received.strftime('%Y-%m-%d %H:%M:%S.%f')
- message_payload = json.loads(message.data.decode())
- send = datetime.strptime(message_payload.get('date_created'), '%Y-%m-%d %H:%M:%S.%f')
- proccessed = datetime.utcnow()
- delta_full = proccessed - send
- delta_proccessed = proccessed - received
- delta_send = received - send
- index = message_payload.pop('index')
- message_payload.update(
- {
- 'date_received': date_used,
- 'date_processed': proccessed.strftime('%Y-%m-%d %H:%M:%S.%f'),
- 'delta_send': delta_send.microseconds/1000000 + delta_send.seconds + delta_send.days*60*60*24,
- 'delta_full': delta_full.microseconds/1000000 + delta_full.seconds + delta_full.days*60*60*24,
- 'delta_proccessed': delta_proccessed.microseconds/1000000 + delta_proccessed.seconds + delta_proccessed.days*60*60*24,
- }
- )
- develop_file_writes(message_payload)
- print('Received Index:', index)
- print('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>')
- def merge_callback_result() -> None:
- """
- Function which is dedicated to use value
- Input: None
- Output: we created the values of the merged dataframe
- """
- list_merged, value_use = [], []
- for file in os.listdir(folder_use):
- if file == get_file_name_json(False):
- list_merged.append(file)
- elif f"received_{nats_subject}" in file:
- list_merged.append(file)
- for f in sorted(list_merged):
- with open(os.path.join(folder_use, f), 'r') as read:
- value_use.extend(json.load(read))
- with open(os.path.join(folder_use, get_file_name_json(False)), 'w') as file_write:
- json.dump(
- value_use,
- file_write,
- indent=4
- )
- for f in list_merged:
- if f != get_file_name_json(False):
- os.remove(
- os.path.join(
- folder_use,
- f
- )
- )
|