12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455 |
- def get_unzip_hdfs_file(hdfs_file_url, save_dir):
- # 判断保存路径是否存在,不存在的话创建此目录
- if os.path.isdir(save_dir):
- pass
- else:
- os.mkdir(save_dir)
- # hdfs文件名
- filename = hdfs_file_url.split("/").pop()
- # 保存到本地的文件名
- save_filename = ""
- # 判断是否为压缩文件
- if filename.endswith(".gz"):
- save_filename = time.strftime("%Y%m%d%H%M%S", time.localtime(time.time())) + ".gz"
- else:
- save_filename = time.strftime("%Y%m%d%H%M%S", time.localtime(time.time()))
- # 判断保存路径最后是否有/
- if save_dir.endswith("/"):
- save_file = save_dir + save_filename
- else:
- save_file = save_dir + "/" + save_filename
- # 生成下载hdfs文件的命令
- hadoop_get = 'hadoop fs -get %s %s' % (hdfs_file_url, save_file)
- logger.info("download hdfs file cammond: " + hadoop_get)
- # shell执行生成的hdfs命令
- try:
- os.system(hadoop_get)
- except Exception as e:
- logger.error(e)
- return False
- # 判断下载的hdfs文件是否为压缩文件
- if save_file.endswith(".gz"):
- # 对此压缩文件进行压缩
- try:
- # 解压后的文件名
- f_name = save_file.replace(".gz", "")
- # 解压缩
- g_file = gzip.GzipFile(save_file)
- # 写入文件
- open(f_name, "w+").write(g_file.read())
- # 关闭文件流
- g_file.close()
- return f_name
- except Exception as e:
- logger.error(e)
- return False
- else:
- return save_file
|