123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206 |
- #!/usr/bin/env python3
- # coding=utf-8
- import datetime
- import logging
- import platform
- import re
- import subprocess
- from pathlib import Path
- from typing import Optional
- from praw.models import Comment, Submission
- from bdfr.exceptions import BulkDownloaderException
- from bdfr.resource import Resource
- logger = logging.getLogger(__name__)
- class FileNameFormatter:
- key_terms = (
- 'date',
- 'flair',
- 'postid',
- 'redditor',
- 'subreddit',
- 'title',
- 'upvotes',
- )
- def __init__(self, file_format_string: str, directory_format_string: str, time_format_string: str):
- if not self.validate_string(file_format_string):
- raise BulkDownloaderException(f'"{file_format_string}" is not a valid format string')
- self.file_format_string = file_format_string
- self.directory_format_string: list[str] = directory_format_string.split('/')
- self.time_format_string = time_format_string
- def _format_name(self, submission: (Comment, Submission), format_string: str) -> str:
- if isinstance(submission, Submission):
- attributes = self._generate_name_dict_from_submission(submission)
- elif isinstance(submission, Comment):
- attributes = self._generate_name_dict_from_comment(submission)
- else:
- raise BulkDownloaderException(f'Cannot name object {type(submission).__name__}')
- result = format_string
- for key in attributes.keys():
- if re.search(fr'(?i).*{{{key}}}.*', result):
- key_value = str(attributes.get(key, 'unknown'))
- key_value = FileNameFormatter._convert_unicode_escapes(key_value)
- key_value = key_value.replace('\\', '\\\\')
- result = re.sub(fr'(?i){{{key}}}', key_value, result)
- result = result.replace('/', '')
- if platform.system() == 'Windows':
- result = FileNameFormatter._format_for_windows(result)
- return result
- @staticmethod
- def _convert_unicode_escapes(in_string: str) -> str:
- pattern = re.compile(r'(\\u\d{4})')
- matches = re.search(pattern, in_string)
- if matches:
- for match in matches.groups():
- converted_match = bytes(match, 'utf-8').decode('unicode-escape')
- in_string = in_string.replace(match, converted_match)
- return in_string
- def _generate_name_dict_from_submission(self, submission: Submission) -> dict:
- submission_attributes = {
- 'title': submission.title,
- 'subreddit': submission.subreddit.display_name,
- 'redditor': submission.author.name if submission.author else 'DELETED',
- 'postid': submission.id,
- 'upvotes': submission.score,
- 'flair': submission.link_flair_text,
- 'date': self._convert_timestamp(submission.created_utc),
- }
- return submission_attributes
- def _convert_timestamp(self, timestamp: float) -> str:
- input_time = datetime.datetime.fromtimestamp(timestamp)
- if self.time_format_string.upper().strip() == 'ISO':
- return input_time.isoformat()
- else:
- return input_time.strftime(self.time_format_string)
- def _generate_name_dict_from_comment(self, comment: Comment) -> dict:
- comment_attributes = {
- 'title': comment.submission.title,
- 'subreddit': comment.subreddit.display_name,
- 'redditor': comment.author.name if comment.author else 'DELETED',
- 'postid': comment.id,
- 'upvotes': comment.score,
- 'flair': '',
- 'date': self._convert_timestamp(comment.created_utc),
- }
- return comment_attributes
- def format_path(
- self,
- resource: Resource,
- destination_directory: Path,
- index: Optional[int] = None,
- ) -> Path:
- subfolder = Path(
- destination_directory,
- *[self._format_name(resource.source_submission, part) for part in self.directory_format_string],
- )
- index = f'_{str(index)}' if index else ''
- if not resource.extension:
- raise BulkDownloaderException(f'Resource from {resource.url} has no extension')
- file_name = str(self._format_name(resource.source_submission, self.file_format_string))
- file_name = re.sub(r'\n', ' ', file_name)
- if not re.match(r'.*\.$', file_name) and not re.match(r'^\..*', resource.extension):
- ending = index + '.' + resource.extension
- else:
- ending = index + resource.extension
- try:
- file_path = self.limit_file_name_length(file_name, ending, subfolder)
- except TypeError:
- raise BulkDownloaderException(f'Could not determine path name: {subfolder}, {index}, {resource.extension}')
- return file_path
- @staticmethod
- def limit_file_name_length(filename: str, ending: str, root: Path) -> Path:
- root = root.resolve().expanduser()
- possible_id = re.search(r'((?:_\w{6})?$)', filename)
- if possible_id:
- ending = possible_id.group(1) + ending
- filename = filename[:possible_id.start()]
- max_path = FileNameFormatter.find_max_path_length()
- max_file_part_length_chars = 255 - len(ending)
- max_file_part_length_bytes = 255 - len(ending.encode('utf-8'))
- max_path_length = max_path - len(ending) - len(str(root)) - 1
- out = Path(root, filename + ending)
- while any([len(filename) > max_file_part_length_chars,
- len(filename.encode('utf-8')) > max_file_part_length_bytes,
- len(str(out)) > max_path_length,
- ]):
- filename = filename[:-1]
- out = Path(root, filename + ending)
- return out
- @staticmethod
- def find_max_path_length() -> int:
- try:
- return int(subprocess.check_output(['getconf', 'PATH_MAX', '/']))
- except (ValueError, subprocess.CalledProcessError, OSError):
- if platform.system() == 'Windows':
- return 260
- else:
- return 4096
- def format_resource_paths(
- self,
- resources: list[Resource],
- destination_directory: Path,
- ) -> list[tuple[Path, Resource]]:
- out = []
- if len(resources) == 1:
- try:
- out.append((self.format_path(resources[0], destination_directory, None), resources[0]))
- except BulkDownloaderException as e:
- logger.error(f'Could not generate file path for resource {resources[0].url}: {e}')
- logger.exception('Could not generate file path')
- else:
- for i, res in enumerate(resources, start=1):
- logger.log(9, f'Formatting filename with index {i}')
- try:
- out.append((self.format_path(res, destination_directory, i), res))
- except BulkDownloaderException as e:
- logger.error(f'Could not generate file path for resource {res.url}: {e}')
- logger.exception('Could not generate file path')
- return out
- @staticmethod
- def validate_string(test_string: str) -> bool:
- if not test_string:
- return False
- result = any([f'{{{key}}}' in test_string.lower() for key in FileNameFormatter.key_terms])
- if result:
- if 'POSTID' not in test_string:
- logger.warning('Some files might not be downloaded due to name conflicts as filenames are'
- ' not guaranteed to be be unique without {POSTID}')
- return True
- else:
- return False
- @staticmethod
- def _format_for_windows(input_string: str) -> str:
- invalid_characters = r'<>:"\/|?*'
- for char in invalid_characters:
- input_string = input_string.replace(char, '')
- input_string = FileNameFormatter._strip_emojis(input_string)
- return input_string
- @staticmethod
- def _strip_emojis(input_string: str) -> str:
- result = input_string.encode('ascii', errors='ignore').decode('utf-8')
- return result
|