def get_unzip_hdfs_file(hdfs_file_url, save_dir): # 判断保存路径是否存在,不存在的话创建此目录 if os.path.isdir(save_dir): pass else: os.mkdir(save_dir) # hdfs文件名 filename = hdfs_file_url.split("/").pop() # 保存到本地的文件名 save_filename = "" # 判断是否为压缩文件 if filename.endswith(".gz"): save_filename = time.strftime("%Y%m%d%H%M%S", time.localtime(time.time())) + ".gz" else: save_filename = time.strftime("%Y%m%d%H%M%S", time.localtime(time.time())) # 判断保存路径最后是否有/ if save_dir.endswith("/"): save_file = save_dir + save_filename else: save_file = save_dir + "/" + save_filename # 生成下载hdfs文件的命令 hadoop_get = 'hadoop fs -get %s %s' % (hdfs_file_url, save_file) logger.info("download hdfs file cammond: " + hadoop_get) # shell执行生成的hdfs命令 try: os.system(hadoop_get) except Exception as e: logger.error(e) return False # 判断下载的hdfs文件是否为压缩文件 if save_file.endswith(".gz"): # 对此压缩文件进行压缩 try: # 解压后的文件名 f_name = save_file.replace(".gz", "") # 解压缩 g_file = gzip.GzipFile(save_file) # 写入文件 open(f_name, "w+").write(g_file.read()) # 关闭文件流 g_file.close() return f_name except Exception as e: logger.error(e) return False else: return save_file