import torch import shutil import os def save_checkpoint(state, is_best, file_path, file_name='checkpoint.pth.tar'): """ Saves the current state of the model. Does a copy of the file in case the model performed better than previously. Parameters: state (dict): Includes optimizer and model state dictionaries. is_best (bool): True if model is best performing model. file_path (str): Path to save the file. file_name (str): File name with extension (default: checkpoint.pth.tar). """ save_path = os.path.join(file_path, file_name) torch.save(state, save_path) if is_best: shutil.copyfile(save_path, os.path.join(file_path, 'model_best.pth.tar')) def save_task_checkpoint(file_path, task_num): """ Saves the current state of the model for a given task by copying existing checkpoint created by the save_checkpoint function. Parameters: file_path (str): Path to save the file, task_num (int): Number of task increment. """ save_path = os.path.join(file_path, 'checkpoint_task_' + str(task_num) + '.pth.tar') shutil.copyfile(os.path.join(file_path, 'checkpoint.pth.tar'), save_path) def pickle_dump(item, out_file): with open(out_file, "wb") as opened_file: pickle.dump(item, opened_file) def write_to_clf(clf_data, save_file): # Save dataset for text classification to file. """ clf_data: List[List[str]] [[text1, label1],[text2,label2]...] file format: tsv, row: text + tab + label """ with open(save_file, 'w', encoding='utf-8') as f: f.writelines("\n".join(["\t".join(str(r) for r in row) for row in clf_data])) def write_to_seq2seq(seq_data, save_file): """ clf_data: List[List[str]] [[src1, tgt1],[src2,tgt2]...] file format: tsv, row: src + tab + tgt """ with open(save_file, 'w', encoding='utf-8') as f: f.writelines("\n".join(["\t".join([str(r) for r in row]) for row in seq_data])) def write_to_ner(cls, ner_data, save_file): """ :param cls: :param ner_data: :param save_file: :return: """ with open(save_file, 'w', encoding='utf-8') as f: f.writelines("\n".join(["\t".join(str(r) for r in row) for row in ner_data])) def quick_save(self, model, save_name, optimizer=None): save_path = os.path.join(self.save_dir, save_name + '_weights.pth') if optimizer: opt_weights = optimizer.get_weights() np.save(os.path.join(self.save_dir, save_name + '_opt_weights'), opt_weights) model.save_weights(save_path, save_format='h5') def save(self, model, iter_nb, train_metrics_values, test_metrics_values, tasks_weights=[], optimizer=None): self.logs_dict['train'][str(iter_nb)] = {} self.logs_dict['val'][str(iter_nb)] = {} for k in range(len(self.metrics)): self.logs_dict['train'][str(iter_nb)][self.metrics[k]] = float(train_metrics_values[k]) self.logs_dict['val'][str(iter_nb)][self.metrics[k]] = float(test_metrics_values[k]) if len(tasks_weights) > 0: for k in range(len(tasks_weights)): self.logs_dict['val'][str(iter_nb)]['weight_' + str(k)] = tasks_weights[k] with open(self.logs_file, 'w') as f: json.dump(self.logs_dict, f) ckpt = { 'model_state_dict': model.state_dict(), 'iter_nb': iter_nb, } if optimizer: ckpt['optimizer_state_dict'] = optimizer.state_dict() # Saves best miou score if reached if 'MEAN_IOU' in self.metrics: miou = float(test_metrics_values[self.metrics.index('MEAN_IOU')]) if miou > self.best_miou and iter_nb > 0: print('Best miou. Saving it.') torch.save(ckpt, self.best_miou_weights_file) self.best_miou = miou self.config_dict['best_miou'] = self.best_miou # Saves best relative error if reached if 'REL_ERR' in self.metrics: rel_error = float(test_metrics_values[self.metrics.index('REL_ERR')]) if rel_error < self.best_rel_error and iter_nb > 0: print('Best rel error. Saving it.') torch.save(ckpt, self.best_rel_error_weights_file) self.best_rel_error = rel_error self.config_dict['best_rel_error'] = self.best_rel_error # Saves last checkpoint torch.save(ckpt, self.last_checkpoint_weights_file) self.iter_nb = iter_nb self.config_dict['iter'] = self.iter_nb with open(self.config_file, 'w') as f: json.dump(self.config_dict, f) def extract_spec(dataset='train'): f = open(data_path + dataset + '_list.txt', 'r') i = 0 for file_name in f: i = i + 1 if not (i % 10): print(i) # load audio file file_name = file_name.rstrip('\n') file_path = data_path + file_name # print file_path y0, sr = librosa.load(file_path, sr=22050) # we use first 1 second half = len(y0) / 4 y = y0[:round(half)] # mfcc mfcc = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=MFCC_DIM) # delta mfcc and double delta delta_mfcc = librosa.feature.delta(mfcc) ddelta_mfcc = librosa.feature.delta(mfcc, order=2) # STFT D = np.abs(librosa.core.stft(y, hop_length=512, n_fft=1024, win_length=1024)) D_dB = librosa.amplitude_to_db(D, ref=np.max) # mel spectrogram mel_S = librosa.feature.melspectrogram(S=D, sr=sr, n_mels=128) S_dB = librosa.power_to_db(mel_S, ref=np.max) # log compression # spectral centroid spec_centroid = librosa.feature.spectral_centroid(S=D) # concatenate all features features = np.concatenate([mfcc, delta_mfcc, ddelta_mfcc, spec_centroid], axis=0) # save mfcc as a file file_name = file_name.replace('.wav', '.npy') save_file = spec_path + file_name if not os.path.exists(os.path.dirname(save_file)): os.makedirs(os.path.dirname(save_file)) np.save(save_file, features) f.close(); def extract_codebook(dataset='train'): f = open(data_path + dataset + '_list.txt', 'r') i = 0 for file_name in f: i = i + 1 if not (i % 10): print(i) # load audio file file_name = file_name.rstrip('\n') file_path = data_path + file_name # #print file_path y0, sr = librosa.load(file_path, sr=22050) # we use first 1 second half = len(y0) / 4 y = y0[:round(half)] # STFT S_full, phase = librosa.magphase(librosa.stft(y, n_fft=1024, window='hann', hop_length=256, win_length=1024)) n = len(y) # Check the shape of matrix: row must corresponds to the example index !!! X = S_full.T # codebook by using K-Means Clustering K = 20 kmeans = KMeans(n_clusters=K, random_state=0).fit(X) features_kmeans = np.zeros(X.shape[0]) # for each sample, summarize feature!!! codebook = np.zeros(K) for sample in range(X.shape[0]): features_kmeans[sample] = kmeans.labels_[sample] # codebook histogram! unique, counts = np.unique(features_kmeans, return_counts=True) for u in unique: u = int(u) codebook[u] = counts[u] # save mfcc as a file file_name = file_name.replace('.wav', '.npy') save_file = codebook_path + file_name if not os.path.exists(os.path.dirname(save_file)): os.makedirs(os.path.dirname(save_file)) np.save(save_file, codebook) f.close() def run(self): file = QtCore.QFile(self.filePath) if not file.open(QtCore.QIODevice.WriteOnly): self.saveFileFinished.emit(SAVE_FILE_ERROR, self.urlStr, self.filePath) file.write(self.fileData) file.close() self.saveFileFinished.emit(0, self.urlStr, self.filePath) def saveFile(self, fileName, data): file = QtCore.QFile(fileName) if not file.open(QtCore.QIODevice.WriteOnly): return False file.write(data.readAll()) file.close() return True def serialize(self): """Callback to serialize the array.""" string_file = io.BytesIO() try: numpy.save(string_file, self.array, allow_pickle=False) serialized = string_file.getvalue() finally: string_file.close() return serialized def train(self, save=False, save_dir=None): train_img_list = glob.glob(self.path_train + "/*") print(train_img_list) train_features = [] for img_file in train_img_list: img = io.imread(img_file) img = color.rgb2lab(img) img_features = self.extract_texton_feature(img, self.fb, self.nb_features) train_features.extend(img_features) train_features = np.array(train_features) print(train_features.shape) kmeans_cluster = MiniBatchKMeans(n_clusters=self.nb_clusters, verbose=1, max_iter=300) kmeans_cluster.fit(train_features) print(kmeans_cluster.cluster_centers_) print(kmeans_cluster.cluster_centers_.shape) self.cluster = kmeans_cluster # save kmeans result if save is True: with open(save_dir, 'wb') as f: pickle.dump(self.cluster, f) def save(self, event): if not self.filename: self.save_as(event) else: if self.writefile(self.filename): self.set_saved(True) try: self.editwin.store_file_breaks() except AttributeError: # may be a PyShell pass self.text.focus_set() return "break" def writefile(self, filename): self.fixlastline() chars = self.encode(self.text.get("1.0", "end-1c")) if self.eol_convention != "\n": chars = chars.replace("\n", self.eol_convention) try: f = open(filename, "wb") f.write(chars) f.flush() f.close() return True except IOError as msg: tkMessageBox.showerror("I/O Error", str(msg), master=self.text) return False def save_response_content(response, destination, file_size=None, chunk_size=32768): if file_size is not None: pbar = tqdm(total=math.ceil(file_size / chunk_size), unit='chunk') readable_file_size = sizeof_fmt(file_size) else: pbar = None with open(destination, 'wb') as f: downloaded_size = 0 for chunk in response.iter_content(chunk_size): downloaded_size += chunk_size if pbar is not None: pbar.update(1) pbar.set_description(f'Download {sizeof_fmt(downloaded_size)} ' f'/ {readable_file_size}') if chunk: # filter out keep-alive new chunks f.write(chunk) if pbar is not None: pbar.close() def generateHuman(cloth_list, person_id, sex): haveAcc = 0 # load acc hair = open('modeleTxt/hair.txt', 'r').readlines() shoe = open('modeleTxt/shoe.txt', 'r').readlines() pifu = open('modeleTxt/skin.txt', 'r').readlines() if not os.path.exists(person_save_Folder): os.makedirs(person_save_Folder) if sex > 0: Gender1 = 1000000 else: Gender1 = 0 # setting Gender = '%.6f' % (Gender1 / 1000000) Muscle = '%.6f' % (random.randint(0, 1000000) / 1000000) African_1 = random.randint(0, 1000000) African = '%.6f' % (African_1 / 1000000) Asian_1 = random.randint(0, 1000000 - African_1) Asian = '%.6f' % (Asian_1 / 1000000) Caucasian = '%.6f' % ((1000000 - Asian_1 - African_1) / 1000000) if Gender1 > 1000000 / 2: m_height = random.gauss(170, 5.7) / 200 while m_height > 1: m_height = random.gauss(170, 5.7) / 200 Height = '%.6f' % (m_height) else: m_height = random.gauss(160, 5.2) / 200 while m_height > 1: m_height = random.gauss(160, 5.2) / 200 Height = '%.6f' % (m_height) BreastSize = '%.6f' % (random.randint(0, 70) / 100) Age = '%.6f' % (random.randint(20, 90) / 100) BreastFirmness = '%.6f' % (random.randint(30, 100) / 100) Weight = '%.6f' % (random.randint(0, 1000000) / 1000000) file_name = 'B' + str(person_id) # creating person file f = open(person_save_Folder + file_name + ".mhm", 'a') f.write('# Written by MakeHuman 1.1.1\n') f.write('version v1.1.1\n') f.write('tags ' + file_name + '\n') f.write('camera 0.0 0.0 0.0 0.0 0.0 1.0\n') f.write('modifier macrodetails-universal/Muscle ' + Muscle + '\n') f.write('modifier macrodetails/African ' + African + '\n') f.write('modifier macrodetails-proportions/BodyProportions 0.500000\n') f.write('modifier macrodetails/Gender ' + Gender + '\n') f.write('modifier macrodetails-height/Height ' + Height + '\n') f.write('modifier breast/BreastSize ' + BreastSize + '\n') f.write('modifier macrodetails/Age ' + Age + '\n') f.write('modifier breast/BreastFirmness ' + BreastFirmness + '\n') f.write('modifier macrodetails/Asian ' + Asian + '\n') f.write('modifier macrodetails/Caucasian ' + Caucasian + '\n') f.write('modifier macrodetails-universal/Weight ' + Weight + '\n') f.write('skeleton cmu_mb.mhskel\n') f.write('eyes HighPolyEyes 2c12f43b-1303-432c-b7ce-d78346baf2e6\n') # adding clothes if Gender1 > 1000000 / 2: f.write(hair[random.randint(0, len(hair) - 1)]) else: f.write(hair[random.randint(0, len(hair) - 1)]) f.write(shoe[random.randint(0, len(shoe) - 1)]) for i in range(0, len(cloth_list)): f.write(cloth_list[i] + '\n') f.write('clothesHideFaces True\n') f.write(pifu[random.randint(0, len(pifu) - 1)]) f.write('material Braid01 eead6f99-d6c6-4f6b-b6c2-210459d7a62e braid01.mhmat\n') f.write('material HighPolyEyes 2c12f43b-1303-432c-b7ce-d78346baf2e6 eyes/materials/brown.mhmat\n') f.write('subdivide False\n') def notice_write(request): if request.method == 'POST': form = ContentForm(request.POST) form_file = FileForm(request.POST, request.FILES) if form.is_valid(): question = form.save(commit=False) question.author = request.user question.create_date = timezone.now() question.boardname_id = 7 question.save() if form_file.is_valid(): form_file = FileForm(request.POST, request.FILES) file_save = form_file.save(commit=False) file_save.author = request.user file_save.postcontent = question file_save.boardname_id = 7 file_save.file = request.FILES.get("file") file_save.save() return redirect('notice_view') return render(request, 'notice_write.html') def test_write(request): if request.method == 'POST': form = ContentForm(request.POST) form_file = FileForm(request.POST, request.FILES) if form.is_valid(): question = form.save(commit=False) question.author = request.user question.create_date = timezone.now() question.boardname_id = 14 question.save() if form_file.is_valid(): form_file = FileForm(request.POST, request.FILES) file_save = form_file.save(commit=False) file_save.author = request.user file_save.postcontent = question file_save.boardname_id = 14 file_save.file = request.FILES.get("file") file_save.save() return redirect('test_list') return render(request, 'test_write.html') def down_file(url, name, path): if os.path.exists(path): return print("开始下载:" + name + ".mp3") headers = {'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8', "Accept-Encoding": "gzip, deflate, br", "Accept-Language": "zh-CN,zh;q=0.9", "Upgrade-Insecure-Requests": "1", 'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36'} count = 0 while count < 3: try: r = requests.get(url, headers=headers, stream=True, timeout=60) # print(r.status_code) if (r.status_code == 200): with open(path, "wb+") as f: for chunk in r.iter_content(1024): f.write(chunk) print("完成下载:" + name + ".mp3") break except Exception as e: print(e) print("下载出错:" + name + ".mp3,3秒后重试") if os.path.exists(path): os.remove(path) time.sleep(3) count += 1 pass def save_as(): global file_name content = content_text.get(1.0, 'end') with open(file_name, 'w') as save: save.write(content) def export_save(data_player, data_kick, guild_id, save_name=""): if save_name: save_name = "_" + save_name print(" - Partie enregistrée -") with open(f"saves/save{save_name}.json", "w") as file: file.write(json.dumps( { "players": [data_player[player_id].export() for player_id in data_player], "kicks": data_kick, "guild_id": guild_id }, indent=4)) def conv(heic_path, save_dir, filetype, quality): # 保存先のディレクトリとファイル名 extension = "." + filetype save_path = save_dir / filetype / pathlib.Path(*heic_path.parts[1:]).with_suffix(extension) # フォルダ作成 save_path.parent.mkdir(parents=True, exist_ok=True) # HEICファイルpyheifで読み込み heif_file = pyheif.read(heic_path) # 読み込んだファイルの中身をdata変数へ data = Image.frombytes( heif_file.mode, heif_file.size, heif_file.data, "raw", heif_file.mode, heif_file.stride, ) # JPEGで保存 data.save(save_path, quality=quality) print("保存:", save_path) def parsing_sravni_ru(soup): names = soup.find_all('span', class_='_106rrj0') # scraping names # scraping age childrens age_divs = soup.find_all('div', {'style': 'grid-area:firstCell-1', 'class': '_pjql8'}) ages = [] for i in age_divs: age_span = i.find('span') ages.append(age_span) # scraping course duration duration_divs = soup.find_all('div', {'style': 'grid-area:secondCell-1', 'class': '_pjql8'}) durations = [] for i in duration_divs: duration_span = i.find('span') durations.append(duration_span) # scraping price prices = soup.find_all('span', class_='_e9qrci _k8dl2y') items = [] for (n, l, i, p) in zip(names, ages, durations, prices): name = n.text.strip() age = l.text.strip() duration = i.text.strip() price = p.text.strip().replace('\xa0', '') items.append( { 'name': name, 'age': age, 'duration': duration, 'price': price, } ) # save json file with open("./data/items.json", "w", encoding="utf-8") as f: json.dump(items, f, indent=4, ensure_ascii=False) with open("./data/items.csv", 'a', encoding="utf-8") as file: for i in items: writer = csv.writer(file) writer.writerow( ( i['name'], i['age'], i['duration'], i['price'] ) ) def save_to_file(self, path): with open(path, "w") as f: f.write(self.cert_pem()) f.write(self.key_pem()) def save_cert_to_file(self, path): with open(path, "w") as f: f.write(self.cert_pem()) def _save_large_file(self, os_path, content, format): """Save content of a generic file.""" if format not in {'text', 'base64'}: raise web.HTTPError( 400, "Must specify format of file contents as 'text' or 'base64'", ) try: if format == 'text': bcontent = content.encode('utf8') else: b64_bytes = content.encode('ascii') bcontent = base64.b64decode(b64_bytes) except Exception as e: raise web.HTTPError( 400, u'Encoding error saving %s: %s' % (os_path, e) ) with self.perm_to_403(os_path): if os.path.islink(os_path): os_path = os.path.join(os.path.dirname(os_path), os.readlink(os_path)) with io.open(os_path, 'ab') as f: f.write(bcontent) def get_unzip_hdfs_file(hdfs_file_url, save_dir): # 判断保存路径是否存在,不存在的话创建此目录 if os.path.isdir(save_dir): pass else: os.mkdir(save_dir) # hdfs文件名 filename = hdfs_file_url.split("/").pop() # 保存到本地的文件名 save_filename = "" # 判断是否为压缩文件 if filename.endswith(".gz"): save_filename = time.strftime("%Y%m%d%H%M%S", time.localtime(time.time())) + ".gz" else: save_filename = time.strftime("%Y%m%d%H%M%S", time.localtime(time.time())) # 判断保存路径最后是否有/ if save_dir.endswith("/"): save_file = save_dir + save_filename else: save_file = save_dir + "/" + save_filename # 生成下载hdfs文件的命令 hadoop_get = 'hadoop fs -get %s %s' % (hdfs_file_url, save_file) logger.info("download hdfs file cammond: " + hadoop_get) # shell执行生成的hdfs命令 try: os.system(hadoop_get) except Exception as e: logger.error(e) return False # 判断下载的hdfs文件是否为压缩文件 if save_file.endswith(".gz"): # 对此压缩文件进行压缩 try: # 解压后的文件名 f_name = save_file.replace(".gz", "") # 解压缩 g_file = gzip.GzipFile(save_file) # 写入文件 open(f_name, "w+").write(g_file.read()) # 关闭文件流 g_file.close() return f_name except Exception as e: logger.error(e) return False else: return save_file """ 根据HDFS文件目录下载此目录下所有的文件 参数说明: hdfs_dir:HDFS文件目录 save_dir:要保存的目录 返回结果说明:执行成功返回True,执行失败返回False """ def get_unzip_hdfs_file_from_dir(hdfs_dir, save_dir): # 命令:获取hdfs目录下的文件 hadoop_ls = "hadoop fs -ls %s | grep -i '^-'" % hdfs_dir # 解压后的文件列表 save_file_list = [] # 执行shell命令 hdfs_result = exec_sh(hadoop_ls, None) # 获取命令执行输出 hdfs_stdout = hdfs_result["stdout"] # print("hdfs_stdout = " + hdfs_stdout) # 要下载的HDFS文件列表 hdfs_list = [] # 判断是否有输出 if hdfs_stdout: # 以行分割, 一行是一个文件的信息 hdfs_lines = hdfs_stdout.split("\n") # 对每一行进行处理 for line in hdfs_lines: # 以空白字符为分割符获取hdfs文件名 line_list = re.split("\s+", line) # -rw-r--r-- 2 caoweidong supergroup 42815 2017-01-23 14:20 /user/000000_0.gz if line_list.__len__() == 8: # print("line_list[7] = " + line_list[7]) # HDFS文件加入下载列表 hdfs_list.append(line_list[7]) else: pass # 下载文件 for file in hdfs_list: save_filename = get_unzip_hdfs_file(file, save_dir) save_file_list.append(save_filename) return save_file_list else: return False def save_game(self): save_file = open("saves/main_save.xml", "w+") level = self.save_level() self.tree.append(level) team = self.save_team() self.tree.append(team) # Store XML tree in file save_file.write(etree.tostring(self.tree, pretty_print=True, encoding="unicode")) save_file.close() def save_upload_file( self, file: UploadFile, save_dir_path: pathlib.Path, job_id: str, dt_string: str, ) -> pathlib.Path: """Save `file` under `save_dir_path`. Args: file (UploadFile): A file want to save. save_dir_path (pathlib.Path): A path to directory where file will be saved. job_id (str): A job id. This will used part of filename. dt_string (str): A datetime info. This will used part of filename. Return: pathlib.Path: A path where file is saved. """ if not save_dir_path.exists(): save_dir_path.mkdir(parents=True, exist_ok=True) save_path: Final = save_dir_path / f"{dt_string}_{job_id}_{file.filename}" try: with save_path.open("wb") as f: shutil.copyfileobj(file.file, f) finally: file.file.close() return save_path def save_output(output, list_to_save): if not output: with open(output, "w") as f: for item in list_to_save: f.write("%s\n" % item) print(f"Output file: {output}") def _saveTestWavFile(self, filename, wav_data): with open(filename, "wb") as f: file_path = os.path.join(dir_name, "some_audio_%d.wav" % i) self._saveTestWavFile(file_path, wav_data) def _save_large_file(self, os_path, content, format): """Save content of a generic file.""" if format not in {'text', 'base64'}: raise web.HTTPError( 400, "Must specify format of file contents as 'text' or 'base64'", ) try: if format == 'text': bcontent = content.encode('utf8') else: b64_bytes = content.encode('ascii') bcontent = base64.b64decode(b64_bytes) except Exception as e: raise web.HTTPError( 400, u'Encoding error saving %s: %s' % (os_path, e) ) with self.perm_to_403(os_path): if os.path.islink(os_path): os_path = os.path.join(os.path.dirname(os_path), os.readlink(os_path)) with io.open(os_path, 'ab') as f: f.write(bcontent) def _post_save_script(model, os_path, contents_manager, **kwargs): """convert notebooks to Python script after save with nbconvert replaces `jupyter notebook --script` """ from nbconvert.exporters.script import ScriptExporter warnings.warn("`_post_save_script` is deprecated and will be removed in Notebook 5.0", DeprecationWarning) if model['type'] != 'notebook': return global _script_exporter if _script_exporter is None: _script_exporter = ScriptExporter(parent=contents_manager) log = contents_manager.log base, ext = os.path.splitext(os_path) script, resources = _script_exporter.from_filename(os_path) script_fname = base + resources.get('output_extension', '.txt') log.info("Saving script /%s", to_api_path(script_fname, contents_manager.root_dir)) with io.open(script_fname, 'w', encoding='utf-8') as f: f.write(script) def _save_data(filename, data): """ Save formatted skeleton data to a pickle file """ if filename[-2:] == ".p": filename = filename else: filename = str(filename + ".p") with open(filename, 'wb') as fp: pickle.dump(data, fp, protocol=pickle.HIGHEST_PROTOCOL) print("Saved data to file: " + filename) def download_unknowns(url: str) -> None: """.""" page_content: bytes = get_none_soup(url) page_string: bytes = page_content[0:100] """parse section of page bytes and use as name. If unknown encoding convert to number string (exclude first few bytes that state filetype) """ try: page_unicode = page_string.decode("ISO-8859-1").replace(R'%', '_') page_parsed = [char for char in page_unicode if char.isalnum() or char == '_'] unknown_file_name = "".join(page_parsed)[10:30] except UnicodeDecodeError: try: page_unicode = page_string.decode('utf-8').replace(R'%', '_') page_parsed = [char for char in page_unicode if char.isalnum() or char == '_'] unknown_file_name = "".join(page_parsed)[10:30] except UnicodeDecodeError: unknown_file_name = "unk_" for char in page_content[10:30]: if char != b'\\': unknown_file_name += str(char) print(unknown_file_name) """check beginning of page bytes for a filetype""" if b'%PDF' in page_string: # ; extension = '.pdf' else: extension = '.unk.txt' with open(save_file_dir + '/' + unknown_file_name + extension, 'wb') as file: file.write(page_content) # ; print(save_file_dir) def download_images(start_url: str, filetypes: List[str]) -> None: """..""" base_url = get_base_url(start_url) # print(start_url) soup = get_soup(start_url) # ;print(soup) if soup is not None: for index, image in enumerate(soup.select('img')): # print(image) # image_raw = str(image) src_raw = str(image.get('src')) # print(image.attrs['src']) if src_raw.startswith('http'): image_url = src_raw elif src_raw.startswith('/'): image_url = base_url + src_raw else: image_url = src_raw # print(image_url) for image_type in filter(lambda x: x in src_raw, filetypes): # print(image) image_response = requests.get(image_url, stream=True) if image_response.status_code == 200: image_name = re.sub(r'.*/', '', src_raw).replace(R'.', '_') # print(image_name, index) fp: BinaryIO = open(save_image_dir + '/' + image_name + str(index) + image_type, 'wb') fp.write(image_response.content) fp.close() # i = Image.open(BytesIO(image_response.content)) # i.save(image_name) def _unicode_save(self, temp_file): im = pygame.Surface((10, 10), 0, 32) try: with open(temp_file, "w") as f: pass os.remove(temp_file) except IOError: raise unittest.SkipTest("the path cannot be opened") self.assertFalse(os.path.exists(temp_file)) try: imageext.save_extended(im, temp_file) self.assertGreater(os.path.getsize(temp_file), 10) finally: try: os.remove(temp_file) except EnvironmentError: pass