utils_28.py 1.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455
  1. def get_unzip_hdfs_file(hdfs_file_url, save_dir):
  2. # 判断保存路径是否存在,不存在的话创建此目录
  3. if os.path.isdir(save_dir):
  4. pass
  5. else:
  6. os.mkdir(save_dir)
  7. # hdfs文件名
  8. filename = hdfs_file_url.split("/").pop()
  9. # 保存到本地的文件名
  10. save_filename = ""
  11. # 判断是否为压缩文件
  12. if filename.endswith(".gz"):
  13. save_filename = time.strftime("%Y%m%d%H%M%S", time.localtime(time.time())) + ".gz"
  14. else:
  15. save_filename = time.strftime("%Y%m%d%H%M%S", time.localtime(time.time()))
  16. # 判断保存路径最后是否有/
  17. if save_dir.endswith("/"):
  18. save_file = save_dir + save_filename
  19. else:
  20. save_file = save_dir + "/" + save_filename
  21. # 生成下载hdfs文件的命令
  22. hadoop_get = 'hadoop fs -get %s %s' % (hdfs_file_url, save_file)
  23. logger.info("download hdfs file cammond: " + hadoop_get)
  24. # shell执行生成的hdfs命令
  25. try:
  26. os.system(hadoop_get)
  27. except Exception as e:
  28. logger.error(e)
  29. return False
  30. # 判断下载的hdfs文件是否为压缩文件
  31. if save_file.endswith(".gz"):
  32. # 对此压缩文件进行压缩
  33. try:
  34. # 解压后的文件名
  35. f_name = save_file.replace(".gz", "")
  36. # 解压缩
  37. g_file = gzip.GzipFile(save_file)
  38. # 写入文件
  39. open(f_name, "w+").write(g_file.read())
  40. # 关闭文件流
  41. g_file.close()
  42. return f_name
  43. except Exception as e:
  44. logger.error(e)
  45. return False
  46. else:
  47. return save_file