123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447 |
- import os
- import shutil
- import stat
- import subprocess
- from concurrent.futures import ALL_COMPLETED, ThreadPoolExecutor, wait
- from datetime import datetime
- from typing import List, Optional, Union
- import woodchips
- from github import Gist, Github, Repository
- from github_archive.constants import (
- DEFAULT_BASE_URL,
- DEFAULT_LOCATION,
- DEFAULT_LOG_LEVEL,
- DEFAULT_NUM_THREADS,
- DEFAULT_TIMEOUT,
- )
- CLONE_OPERATION = 'clone'
- PULL_OPERATION = 'pull'
- GIST_CONTEXT = 'gist'
- ORG_CONTEXT = 'org'
- PERSONAL_CONTEXT = 'personal'
- STAR_CONTEXT = 'star'
- USER_CONTEXT = 'user'
- LOGGER_NAME = 'github-archive'
- class GithubArchive:
- def __init__(
- self,
- token=None,
- users=None,
- orgs=None,
- gists=None,
- stars=None,
- view=False,
- clone=False,
- pull=False,
- forks=False,
- location=DEFAULT_LOCATION,
- include=None,
- exclude=None,
- use_https=False,
- timeout=DEFAULT_TIMEOUT,
- threads=DEFAULT_NUM_THREADS,
- base_url=DEFAULT_BASE_URL,
- log_level=DEFAULT_LOG_LEVEL,
- ):
- # Parameter variables
- self.token = token
- self.users = users.lower().split(',') if users else ''
- self.orgs = orgs.lower().split(',') if orgs else ''
- self.gists = gists.lower().split(',') if gists else ''
- self.stars = stars.lower().split(',') if stars else ''
- self.view = view
- self.clone = clone
- self.pull = pull
- self.forks = forks
- self.location = location
- self.include = include.lower().split(',') if include else ''
- self.exclude = exclude.lower().split(',') if exclude else ''
- self.use_https = use_https
- self.timeout = timeout
- self.threads = threads
- self.base_url = base_url
- self.log_level = log_level
- # Internal variables
- self.github_instance = Github(login_or_token=self.token, base_url=self.base_url)
- self.authenticated_user = self.github_instance.get_user() if self.token else None
- self.authenticated_username = self.authenticated_user.login.lower() if self.token else None
- def run(self):
- """Run the tool based on the arguments passed via the CLI."""
- self.initialize_project()
- logger = woodchips.get(LOGGER_NAME)
- logger.info('# GitHub Archive started...\n')
- start_time = datetime.now()
- failed_repo_dirs = []
- # Personal (includes personal authenticated items)
- if self.token and self.authenticated_user_in_users and self.users:
- logger.info('# Making API call to GitHub for personal repos...\n')
- personal_repos = self.get_all_git_assets(PERSONAL_CONTEXT)
- if self.view:
- logger.info('# Viewing user repos...\n')
- self.view_repos(personal_repos)
- if self.clone:
- logger.info('# Cloning missing personal repos...\n')
- failed_repos = self.iterate_repos_to_archive(personal_repos, CLONE_OPERATION)
- if any(failed_repos):
- failed_repo_dirs.extend(failed_repos)
- if self.pull:
- logger.info('# Pulling changes to personal repos...\n')
- _ = self.iterate_repos_to_archive(personal_repos, PULL_OPERATION)
- # We remove the authenticated user from the list so that we don't double pull their
- # repos for the `users` logic.
- self.users.remove(self.authenticated_username)
- # Users (can include personal non-authenticated items, excludes personal authenticated calls)
- if self.users and len(self.users) > 0:
- logger.info('# Making API calls to GitHub for user repos...\n')
- user_repos = self.get_all_git_assets(USER_CONTEXT)
- if self.view:
- logger.info('# Viewing user repos...\n')
- self.view_repos(user_repos)
- if self.clone:
- logger.info('# Cloning missing user repos...\n')
- failed_repos = self.iterate_repos_to_archive(user_repos, CLONE_OPERATION)
- if any(failed_repos):
- failed_repo_dirs.extend(failed_repos)
- if self.pull:
- logger.info('# Pulling changes to user repos...\n')
- _ = self.iterate_repos_to_archive(user_repos, PULL_OPERATION)
- # Orgs
- if self.orgs:
- logger.info('# Making API calls to GitHub for org repos...\n')
- org_repos = self.get_all_git_assets(ORG_CONTEXT)
- if self.view:
- logger.info('# Viewing org repos...\n')
- self.view_repos(org_repos)
- if self.clone:
- logger.info('# Cloning missing org repos...\n')
- failed_repos = self.iterate_repos_to_archive(org_repos, CLONE_OPERATION)
- if any(failed_repos):
- failed_repo_dirs.extend(failed_repos)
- if self.pull:
- logger.info('# Pulling changes to org repos...\n')
- _ = self.iterate_repos_to_archive(org_repos, PULL_OPERATION)
- # Stars
- if self.stars:
- logger.info('# Making API call to GitHub for starred repos...\n')
- starred_repos = self.get_all_git_assets(STAR_CONTEXT)
- if self.view:
- logger.info('# Viewing stars...\n')
- self.view_repos(starred_repos)
- if self.clone:
- logger.info('# Cloning missing starred repos...\n')
- failed_repos = self.iterate_repos_to_archive(starred_repos, CLONE_OPERATION)
- if any(failed_repos):
- failed_repo_dirs.extend(failed_repos)
- if self.pull:
- logger.info('# Pulling changes to starred repos...\n')
- _ = self.iterate_repos_to_archive(starred_repos, PULL_OPERATION)
- if failed_repo_dirs:
- logger.info('Cleaning up repos...\n')
- self.remove_failed_dirs('repos', failed_repo_dirs)
- # Gists
- if self.gists:
- logger.info('# Making API call to GitHub for gists...\n')
- gists = self.get_all_git_assets(GIST_CONTEXT)
- failed_gist_dirs = []
- if self.view:
- logger.info('# Viewing gists...\n')
- self.view_gists(gists)
- if self.clone:
- logger.info('# Cloning missing gists...\n')
- failed_gists = self.iterate_gists_to_archive(gists, CLONE_OPERATION)
- if any(failed_gists):
- failed_gist_dirs.extend(failed_gists)
- if self.pull:
- logger.info('# Pulling changes to gists...\n')
- _ = self.iterate_gists_to_archive(gists, PULL_OPERATION)
- if failed_gist_dirs:
- logger.info('Cleaning up gists...\n')
- self.remove_failed_dirs('gists', failed_gist_dirs)
- execution_time = f'Execution time: {datetime.now() - start_time}.'
- finish_message = f'GitHub Archive complete! {execution_time}\n'
- logger.info(finish_message)
- def setup_logger(self):
- """Sets up a logger to log to console and a file.
- - Logging can be called with the `logger` property
- - Files will automatically roll over
- """
- logger = woodchips.Logger(
- name=LOGGER_NAME,
- level=self.log_level,
- )
- logger.log_to_console()
- logger.log_to_file(location=os.path.join(self.location, 'logs'))
- def initialize_project(self):
- """Initialize the tool and ensure everything is in order before running any logic.
- This function also ensures the minimum set of requirements are passed in to run the tool:
- 1. a git operation
- 2. a list of assets to run operations on
- """
- self.setup_logger()
- logger = woodchips.get(LOGGER_NAME)
- if not os.path.exists(self.location):
- os.makedirs(os.path.join(self.location, 'repos'))
- os.makedirs(os.path.join(self.location, 'gists'))
- if (self.users or self.orgs or self.gists or self.stars) and not (self.view or self.clone or self.pull):
- message = 'A git operation must be specified when a list of users or orgs is provided.'
- logger.critical(message)
- raise ValueError(message)
- elif not (self.users or self.orgs or self.gists or self.stars) and (self.view or self.clone or self.pull):
- message = 'A list must be provided when a git operation is specified.'
- logger.critical(message)
- raise ValueError(message)
- elif not (self.users or self.orgs or self.gists or self.stars or self.view or self.clone or self.pull):
- message = 'At least one git operation and one list must be provided to run github-archive.'
- logger.critical(message)
- raise ValueError(message)
- elif self.include and self.exclude:
- message = 'The include and exclude flags are mutually exclusive. Only one can be used on each run.'
- logger.critical(message)
- raise ValueError(message)
- def authenticated_user_in_users(self) -> bool:
- return self.authenticated_user.login.lower() in self.users
- def get_all_git_assets(self, context: str) -> List[Union[Repository.Repository, Gist.Gist]]:
- """Retrieve a list of lists via API of git assets (repos, gists) of the
- specified owner(s) (users, orgs). Return a sorted, flat, sorted list of git assets.
- """
- logger = woodchips.get(LOGGER_NAME)
- get_org_repos = lambda owner: self.github_instance.get_organization(owner).get_repos() # noqa
- get_personal_repos = lambda owner: self.authenticated_user.get_repos(affiliation='owner') # noqa
- get_starred_repos = lambda owner: self.github_instance.get_user(owner).get_starred() # noqa
- get_user_gists = lambda owner: self.github_instance.get_user(owner).get_gists() # noqa
- get_user_repos = lambda owner: self.github_instance.get_user(owner).get_repos() # noqa
- context_manager = {
- GIST_CONTEXT: [self.gists, get_user_gists, 'gists'],
- ORG_CONTEXT: [self.orgs, get_org_repos, 'repos'],
- PERSONAL_CONTEXT: [self.users, get_personal_repos, 'repos'],
- STAR_CONTEXT: [self.stars, get_starred_repos, 'starred repos'],
- USER_CONTEXT: [self.users, get_user_repos, 'repos'],
- }
- all_git_assets = []
- owner_list = context_manager[context][0]
- git_asset_string = context_manager[context][2]
- for owner in owner_list:
- formatted_owner_name = owner.strip()
- git_assets = context_manager[context][1](owner)
- logger.debug(f'{formatted_owner_name} {git_asset_string} retrieved!')
- for item in git_assets:
- if context == GIST_CONTEXT:
- # Automatically add gists since we don't support forked gists
- all_git_assets.append(item)
- elif self.forks or (self.forks is False and item.fork is False):
- all_git_assets.append(item)
- else:
- # Do not include this forked asset
- pass
- final_sorted_list = sorted(all_git_assets, key=lambda item: item.owner.login)
- return final_sorted_list
- def iterate_repos_to_archive(self, repos: List[Repository.Repository], operation: str) -> List[Optional[str]]:
- """Iterate over each repository and start a thread if it can be archived.
- We ignore repos not in the include or in the exclude list if either are present.
- """
- logger = woodchips.get(LOGGER_NAME)
- pool = ThreadPoolExecutor(self.threads)
- thread_list = []
- for repo in repos:
- if (
- (not self.include and not self.exclude)
- or (self.include and repo.name in self.include)
- or (self.exclude and repo.name not in self.exclude)
- ):
- repo_owner_username = repo.owner.login.lower()
- repo_path = os.path.join(self.location, 'repos', repo_owner_username, repo.name)
- thread_list.append(
- pool.submit(
- self.archive_repo,
- repo=repo,
- repo_path=repo_path,
- operation=operation,
- )
- )
- else:
- logger.debug(f'{repo.name} skipped due to include/exclude filtering')
- wait(thread_list, return_when=ALL_COMPLETED)
- failed_repos = [repo.result() for repo in thread_list if repo.result()]
- return failed_repos
- def iterate_gists_to_archive(self, gists: List[Gist.Gist], operation: str) -> List[Optional[str]]:
- """Iterate over each gist and start a thread if it can be archived."""
- pool = ThreadPoolExecutor(self.threads)
- thread_list = []
- for gist in gists:
- gist_path = os.path.join(self.location, 'gists', gist.id)
- thread_list.append(
- pool.submit(
- self.archive_gist,
- gist=gist,
- gist_path=gist_path,
- operation=operation,
- )
- )
- wait(thread_list, return_when=ALL_COMPLETED)
- failed_gists = [gist.result() for gist in thread_list if gist.result()]
- return failed_gists
- def view_repos(self, repos: List[Repository.Repository]):
- """View a list of repos that will be cloned/pulled."""
- logger = woodchips.get(LOGGER_NAME)
- for repo in repos:
- repo_name = f'{repo.owner.login}/{repo.name}'
- logger.info(repo_name)
- def view_gists(self, gists: List[Gist.Gist]):
- """View a list of gists that will be cloned/pulled."""
- logger = woodchips.get(LOGGER_NAME)
- for gist in gists:
- gist_id = f'{gist.owner.login}/{gist.id}'
- logger.info(gist_id)
- def archive_repo(self, repo: Repository.Repository, repo_path: str, operation: str) -> Optional[str]:
- """Clone and pull repos based on the operation passed.
- We return the name of the repo if its git operation fails, otherwise return None.
- """
- logger = woodchips.get(LOGGER_NAME)
- failed_repo = None
- full_repo_name = os.path.join(repo.owner.login, repo.name) # We use a path here to properly remove failed dirs
- if (os.path.exists(repo_path) and operation == CLONE_OPERATION) or (
- not os.path.exists(repo_path) and operation == PULL_OPERATION
- ):
- pass
- else:
- commands = {
- PULL_OPERATION: ['git', '-C', repo_path, 'pull', '--rebase'],
- }
- if self.use_https:
- commands.update({CLONE_OPERATION: ['git', 'clone', repo.html_url, repo_path]})
- else:
- commands.update({CLONE_OPERATION: ['git', 'clone', repo.ssh_url, repo_path]})
- git_command = commands[operation]
- try:
- subprocess.run(
- git_command,
- stdout=subprocess.DEVNULL,
- stdin=subprocess.DEVNULL,
- stderr=subprocess.DEVNULL,
- check=True,
- timeout=self.timeout,
- )
- logger.info(f'Repo: {full_repo_name} {operation} success!')
- except subprocess.TimeoutExpired:
- logger.error(f'Git operation timed out archiving {repo.name}.')
- failed_repo = full_repo_name
- except subprocess.CalledProcessError as error:
- logger.error(f'Failed to {operation} {repo.name}\n{error}')
- failed_repo = full_repo_name
- return failed_repo
- def archive_gist(self, gist: Gist.Gist, gist_path: str, operation: str) -> Optional[str]:
- """Clone and pull gists based on the operation passed.
- We return the name of the gist if its git operation fails, otherwise return None.
- """
- logger = woodchips.get(LOGGER_NAME)
- failed_gist = None
- full_gist_id = os.path.join(gist.owner.login, gist.id) # We use a path here to properly remove failed dirs
- if (os.path.exists(gist_path) and operation == CLONE_OPERATION) or (
- not os.path.exists(gist_path) and operation == PULL_OPERATION
- ):
- pass
- else:
- commands = {
- CLONE_OPERATION: ['git', 'clone', gist.html_url, gist_path],
- PULL_OPERATION: ['git', '-C', gist_path, 'pull', '--rebase'],
- }
- git_command = commands[operation]
- try:
- subprocess.run(
- git_command,
- stdout=subprocess.DEVNULL,
- stdin=subprocess.DEVNULL,
- stderr=subprocess.DEVNULL,
- check=True,
- timeout=self.timeout,
- )
- logger.info(f'Gist: {full_gist_id} {operation} success!')
- except subprocess.TimeoutExpired:
- logger.error(f'Git operation timed out archiving {gist.id}.')
- failed_gist = full_gist_id
- except subprocess.CalledProcessError as error:
- logger.error(f'Failed to {operation} {gist.id}\n{error}')
- failed_gist = full_gist_id
- return failed_gist
- def remove_failed_dirs(self, dirs_location: str, failed_dirs: List[str]):
- """Removes a directory if it fails a git operation due to
- timing out or other errors so it can be retried on the next run.
- """
- logger = woodchips.get(LOGGER_NAME)
- def make_dir_writable(function, path, exception):
- """The `.git` folder on Windows cannot be gracefully removed due to being read-only,
- so we make the directory writable on a failure and retry the original function.
- """
- os.chmod(path, stat.S_IWRITE)
- function(path)
- for directory in set(failed_dirs):
- path = os.path.join(self.location, dirs_location, directory)
- if os.path.exists(path):
- logger.debug(f'Removing {directory} due to a failed git operation...')
- shutil.rmtree(path, onerror=make_dir_writable)
|