12345678910111213141516171819202122232425262728293031323334353637383940414243 |
- def archive_repo(self, repo: Repository.Repository, repo_path: str, operation: str) -> Optional[str]:
- """Clone and pull repos based on the operation passed.
- We return the name of the repo if its git operation fails, otherwise return None.
- """
- logger = woodchips.get(LOGGER_NAME)
- failed_repo = None
- full_repo_name = os.path.join(repo.owner.login, repo.name) # We use a path here to properly remove failed dirs
- if (os.path.exists(repo_path) and operation == CLONE_OPERATION) or (
- not os.path.exists(repo_path) and operation == PULL_OPERATION
- ):
- pass
- else:
- commands = {
- PULL_OPERATION: ['git', '-C', repo_path, 'pull', '--rebase'],
- }
- if self.use_https:
- commands.update({CLONE_OPERATION: ['git', 'clone', repo.html_url, repo_path]})
- else:
- commands.update({CLONE_OPERATION: ['git', 'clone', repo.ssh_url, repo_path]})
- git_command = commands[operation]
- try:
- subprocess.run(
- git_command,
- stdout=subprocess.DEVNULL,
- stdin=subprocess.DEVNULL,
- stderr=subprocess.DEVNULL,
- check=True,
- timeout=self.timeout,
- )
- logger.info(f'Repo: {full_repo_name} {operation} success!')
- except subprocess.TimeoutExpired:
- logger.error(f'Git operation timed out archiving {repo.name}.')
- failed_repo = full_repo_name
- except subprocess.CalledProcessError as error:
- logger.error(f'Failed to {operation} {repo.name}\n{error}')
- failed_repo = full_repo_name
- return failed_repo
|