DLfile.py 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125
  1. from azure.datalake.store import core, lib
  2. import config
  3. import sys, io
  4. import schedule, threading, time
  5. from datetime import datetime
  6. from os import listdir
  7. from os.path import isfile, join
  8. import glob
  9. def run_once_threaded(job_func):
  10. job_thread = threading.Thread(target=job_func)
  11. job_thread.start()
  12. return schedule.CancelJob
  13. def run_threaded(job_func):
  14. job_thread = threading.Thread(target=job_func)
  15. job_thread.start()
  16. local_upload_folder_path = "LOCAL_FOLDER_PATH"
  17. adls_upload_folder_path = "ADLS_FOLDER_PATH"
  18. orginal_stdout = sys.stdout
  19. buf = io.StringIO()
  20. sys.stdout = buf
  21. adlCreds = -1
  22. uploaded_files = False
  23. def postToTeams():
  24. output = buf.getvalue()
  25. if output == "":
  26. return
  27. orginal_stdout.write(output)
  28. now = datetime.now()
  29. current_time = now.strftime("%H:%M:%S")
  30. config.sendToTeams("{}<br>{}".format(current_time, output))
  31. buf.truncate(0)
  32. buf.seek(0)
  33. def authenticate():
  34. global adlCreds
  35. adlCreds = lib.auth(config.azure_tenant_id)
  36. def authenticated():
  37. if adlCreds == -1:
  38. return
  39. # print("Authentication sucess!")
  40. run_once_threaded(upload_files)
  41. return schedule.CancelJob
  42. def upload_files():
  43. adl = core.AzureDLFileSystem(adlCreds, store_name=config.store_name)
  44. uploadedFolders = adl.ls(adls_upload_folder_path)
  45. uploadedFolders = set([folder.replace(adls_upload_folder_path[1:], "")+"/" for folder in uploadedFolders])
  46. local_folders = glob.glob(local_upload_folder_path+"*") # * means all if need specific format then *.csv
  47. local_folders = set([d.replace(local_upload_folder_path, "")+"/" for d in local_folders])
  48. to_upload_folders = local_folders.difference(uploadedFolders)
  49. folder_names = sorted([d.replace(local_upload_folder_path, "") for d in to_upload_folders])
  50. files = []
  51. for folder in folder_names:
  52. path = local_upload_folder_path+folder
  53. for f in listdir(path):
  54. if isfile(join(path, f)):
  55. files.append(folder+f)
  56. print("Uploading the following folders:<br>{}<br>Total number of files to upload:<br>{}".format(", ". join(folder_names), len(files)))
  57. for f in files:
  58. adl.put(local_upload_folder_path+f, adls_upload_folder_path+f)
  59. print("Upload finished.")
  60. time.sleep(2)
  61. global uploaded_files
  62. uploaded_files = True
  63. def exit_program():
  64. if uploaded_files == True:
  65. exit()
  66. schedule.every(2).seconds.do(run_threaded, postToTeams)
  67. schedule.every().seconds.do(run_once_threaded, authenticate)
  68. schedule.every().seconds.do(authenticated)
  69. schedule.every().seconds.do(exit_program)
  70. while 1:
  71. schedule.run_pending()
  72. time.sleep(1)