from azure.datalake.store import core, lib import config import sys, io import schedule, threading, time from datetime import datetime from os import listdir from os.path import isfile, join import glob def run_once_threaded(job_func): job_thread = threading.Thread(target=job_func) job_thread.start() return schedule.CancelJob def run_threaded(job_func): job_thread = threading.Thread(target=job_func) job_thread.start() local_upload_folder_path = "LOCAL_FOLDER_PATH" adls_upload_folder_path = "ADLS_FOLDER_PATH" orginal_stdout = sys.stdout buf = io.StringIO() sys.stdout = buf adlCreds = -1 uploaded_files = False def postToTeams(): output = buf.getvalue() if output == "": return orginal_stdout.write(output) now = datetime.now() current_time = now.strftime("%H:%M:%S") config.sendToTeams("{}
{}".format(current_time, output)) buf.truncate(0) buf.seek(0) def authenticate(): global adlCreds adlCreds = lib.auth(config.azure_tenant_id) def authenticated(): if adlCreds == -1: return # print("Authentication sucess!") run_once_threaded(upload_files) return schedule.CancelJob def upload_files(): adl = core.AzureDLFileSystem(adlCreds, store_name=config.store_name) uploadedFolders = adl.ls(adls_upload_folder_path) uploadedFolders = set([folder.replace(adls_upload_folder_path[1:], "")+"/" for folder in uploadedFolders]) local_folders = glob.glob(local_upload_folder_path+"*") # * means all if need specific format then *.csv local_folders = set([d.replace(local_upload_folder_path, "")+"/" for d in local_folders]) to_upload_folders = local_folders.difference(uploadedFolders) folder_names = sorted([d.replace(local_upload_folder_path, "") for d in to_upload_folders]) files = [] for folder in folder_names: path = local_upload_folder_path+folder for f in listdir(path): if isfile(join(path, f)): files.append(folder+f) print("Uploading the following folders:
{}
Total number of files to upload:
{}".format(", ". join(folder_names), len(files))) for f in files: adl.put(local_upload_folder_path+f, adls_upload_folder_path+f) print("Upload finished.") time.sleep(2) global uploaded_files uploaded_files = True def exit_program(): if uploaded_files == True: exit() schedule.every(2).seconds.do(run_threaded, postToTeams) schedule.every().seconds.do(run_once_threaded, authenticate) schedule.every().seconds.do(authenticated) schedule.every().seconds.do(exit_program) while 1: schedule.run_pending() time.sleep(1)