github_archive.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447
  1. import os
  2. import shutil
  3. import stat
  4. import subprocess
  5. from concurrent.futures import ALL_COMPLETED, ThreadPoolExecutor, wait
  6. from datetime import datetime
  7. from typing import List, Optional, Union
  8. import woodchips
  9. from github import Gist, Github, Repository
  10. from github_archive.constants import (
  11. DEFAULT_BASE_URL,
  12. DEFAULT_LOCATION,
  13. DEFAULT_LOG_LEVEL,
  14. DEFAULT_NUM_THREADS,
  15. DEFAULT_TIMEOUT,
  16. )
  17. CLONE_OPERATION = 'clone'
  18. PULL_OPERATION = 'pull'
  19. GIST_CONTEXT = 'gist'
  20. ORG_CONTEXT = 'org'
  21. PERSONAL_CONTEXT = 'personal'
  22. STAR_CONTEXT = 'star'
  23. USER_CONTEXT = 'user'
  24. LOGGER_NAME = 'github-archive'
  25. class GithubArchive:
  26. def __init__(
  27. self,
  28. token=None,
  29. users=None,
  30. orgs=None,
  31. gists=None,
  32. stars=None,
  33. view=False,
  34. clone=False,
  35. pull=False,
  36. forks=False,
  37. location=DEFAULT_LOCATION,
  38. include=None,
  39. exclude=None,
  40. use_https=False,
  41. timeout=DEFAULT_TIMEOUT,
  42. threads=DEFAULT_NUM_THREADS,
  43. base_url=DEFAULT_BASE_URL,
  44. log_level=DEFAULT_LOG_LEVEL,
  45. ):
  46. # Parameter variables
  47. self.token = token
  48. self.users = users.lower().split(',') if users else ''
  49. self.orgs = orgs.lower().split(',') if orgs else ''
  50. self.gists = gists.lower().split(',') if gists else ''
  51. self.stars = stars.lower().split(',') if stars else ''
  52. self.view = view
  53. self.clone = clone
  54. self.pull = pull
  55. self.forks = forks
  56. self.location = location
  57. self.include = include.lower().split(',') if include else ''
  58. self.exclude = exclude.lower().split(',') if exclude else ''
  59. self.use_https = use_https
  60. self.timeout = timeout
  61. self.threads = threads
  62. self.base_url = base_url
  63. self.log_level = log_level
  64. # Internal variables
  65. self.github_instance = Github(login_or_token=self.token, base_url=self.base_url)
  66. self.authenticated_user = self.github_instance.get_user() if self.token else None
  67. self.authenticated_username = self.authenticated_user.login.lower() if self.token else None
  68. def run(self):
  69. """Run the tool based on the arguments passed via the CLI."""
  70. self.initialize_project()
  71. logger = woodchips.get(LOGGER_NAME)
  72. logger.info('# GitHub Archive started...\n')
  73. start_time = datetime.now()
  74. failed_repo_dirs = []
  75. # Personal (includes personal authenticated items)
  76. if self.token and self.authenticated_user_in_users and self.users:
  77. logger.info('# Making API call to GitHub for personal repos...\n')
  78. personal_repos = self.get_all_git_assets(PERSONAL_CONTEXT)
  79. if self.view:
  80. logger.info('# Viewing user repos...\n')
  81. self.view_repos(personal_repos)
  82. if self.clone:
  83. logger.info('# Cloning missing personal repos...\n')
  84. failed_repos = self.iterate_repos_to_archive(personal_repos, CLONE_OPERATION)
  85. if any(failed_repos):
  86. failed_repo_dirs.extend(failed_repos)
  87. if self.pull:
  88. logger.info('# Pulling changes to personal repos...\n')
  89. _ = self.iterate_repos_to_archive(personal_repos, PULL_OPERATION)
  90. # We remove the authenticated user from the list so that we don't double pull their
  91. # repos for the `users` logic.
  92. self.users.remove(self.authenticated_username)
  93. # Users (can include personal non-authenticated items, excludes personal authenticated calls)
  94. if self.users and len(self.users) > 0:
  95. logger.info('# Making API calls to GitHub for user repos...\n')
  96. user_repos = self.get_all_git_assets(USER_CONTEXT)
  97. if self.view:
  98. logger.info('# Viewing user repos...\n')
  99. self.view_repos(user_repos)
  100. if self.clone:
  101. logger.info('# Cloning missing user repos...\n')
  102. failed_repos = self.iterate_repos_to_archive(user_repos, CLONE_OPERATION)
  103. if any(failed_repos):
  104. failed_repo_dirs.extend(failed_repos)
  105. if self.pull:
  106. logger.info('# Pulling changes to user repos...\n')
  107. _ = self.iterate_repos_to_archive(user_repos, PULL_OPERATION)
  108. # Orgs
  109. if self.orgs:
  110. logger.info('# Making API calls to GitHub for org repos...\n')
  111. org_repos = self.get_all_git_assets(ORG_CONTEXT)
  112. if self.view:
  113. logger.info('# Viewing org repos...\n')
  114. self.view_repos(org_repos)
  115. if self.clone:
  116. logger.info('# Cloning missing org repos...\n')
  117. failed_repos = self.iterate_repos_to_archive(org_repos, CLONE_OPERATION)
  118. if any(failed_repos):
  119. failed_repo_dirs.extend(failed_repos)
  120. if self.pull:
  121. logger.info('# Pulling changes to org repos...\n')
  122. _ = self.iterate_repos_to_archive(org_repos, PULL_OPERATION)
  123. # Stars
  124. if self.stars:
  125. logger.info('# Making API call to GitHub for starred repos...\n')
  126. starred_repos = self.get_all_git_assets(STAR_CONTEXT)
  127. if self.view:
  128. logger.info('# Viewing stars...\n')
  129. self.view_repos(starred_repos)
  130. if self.clone:
  131. logger.info('# Cloning missing starred repos...\n')
  132. failed_repos = self.iterate_repos_to_archive(starred_repos, CLONE_OPERATION)
  133. if any(failed_repos):
  134. failed_repo_dirs.extend(failed_repos)
  135. if self.pull:
  136. logger.info('# Pulling changes to starred repos...\n')
  137. _ = self.iterate_repos_to_archive(starred_repos, PULL_OPERATION)
  138. if failed_repo_dirs:
  139. logger.info('Cleaning up repos...\n')
  140. self.remove_failed_dirs('repos', failed_repo_dirs)
  141. # Gists
  142. if self.gists:
  143. logger.info('# Making API call to GitHub for gists...\n')
  144. gists = self.get_all_git_assets(GIST_CONTEXT)
  145. failed_gist_dirs = []
  146. if self.view:
  147. logger.info('# Viewing gists...\n')
  148. self.view_gists(gists)
  149. if self.clone:
  150. logger.info('# Cloning missing gists...\n')
  151. failed_gists = self.iterate_gists_to_archive(gists, CLONE_OPERATION)
  152. if any(failed_gists):
  153. failed_gist_dirs.extend(failed_gists)
  154. if self.pull:
  155. logger.info('# Pulling changes to gists...\n')
  156. _ = self.iterate_gists_to_archive(gists, PULL_OPERATION)
  157. if failed_gist_dirs:
  158. logger.info('Cleaning up gists...\n')
  159. self.remove_failed_dirs('gists', failed_gist_dirs)
  160. execution_time = f'Execution time: {datetime.now() - start_time}.'
  161. finish_message = f'GitHub Archive complete! {execution_time}\n'
  162. logger.info(finish_message)
  163. def setup_logger(self):
  164. """Sets up a logger to log to console and a file.
  165. - Logging can be called with the `logger` property
  166. - Files will automatically roll over
  167. """
  168. logger = woodchips.Logger(
  169. name=LOGGER_NAME,
  170. level=self.log_level,
  171. )
  172. logger.log_to_console()
  173. logger.log_to_file(location=os.path.join(self.location, 'logs'))
  174. def initialize_project(self):
  175. """Initialize the tool and ensure everything is in order before running any logic.
  176. This function also ensures the minimum set of requirements are passed in to run the tool:
  177. 1. a git operation
  178. 2. a list of assets to run operations on
  179. """
  180. self.setup_logger()
  181. logger = woodchips.get(LOGGER_NAME)
  182. if not os.path.exists(self.location):
  183. os.makedirs(os.path.join(self.location, 'repos'))
  184. os.makedirs(os.path.join(self.location, 'gists'))
  185. if (self.users or self.orgs or self.gists or self.stars) and not (self.view or self.clone or self.pull):
  186. message = 'A git operation must be specified when a list of users or orgs is provided.'
  187. logger.critical(message)
  188. raise ValueError(message)
  189. elif not (self.users or self.orgs or self.gists or self.stars) and (self.view or self.clone or self.pull):
  190. message = 'A list must be provided when a git operation is specified.'
  191. logger.critical(message)
  192. raise ValueError(message)
  193. elif not (self.users or self.orgs or self.gists or self.stars or self.view or self.clone or self.pull):
  194. message = 'At least one git operation and one list must be provided to run github-archive.'
  195. logger.critical(message)
  196. raise ValueError(message)
  197. elif self.include and self.exclude:
  198. message = 'The include and exclude flags are mutually exclusive. Only one can be used on each run.'
  199. logger.critical(message)
  200. raise ValueError(message)
  201. def authenticated_user_in_users(self) -> bool:
  202. return self.authenticated_user.login.lower() in self.users
  203. def get_all_git_assets(self, context: str) -> List[Union[Repository.Repository, Gist.Gist]]:
  204. """Retrieve a list of lists via API of git assets (repos, gists) of the
  205. specified owner(s) (users, orgs). Return a sorted, flat, sorted list of git assets.
  206. """
  207. logger = woodchips.get(LOGGER_NAME)
  208. get_org_repos = lambda owner: self.github_instance.get_organization(owner).get_repos() # noqa
  209. get_personal_repos = lambda owner: self.authenticated_user.get_repos(affiliation='owner') # noqa
  210. get_starred_repos = lambda owner: self.github_instance.get_user(owner).get_starred() # noqa
  211. get_user_gists = lambda owner: self.github_instance.get_user(owner).get_gists() # noqa
  212. get_user_repos = lambda owner: self.github_instance.get_user(owner).get_repos() # noqa
  213. context_manager = {
  214. GIST_CONTEXT: [self.gists, get_user_gists, 'gists'],
  215. ORG_CONTEXT: [self.orgs, get_org_repos, 'repos'],
  216. PERSONAL_CONTEXT: [self.users, get_personal_repos, 'repos'],
  217. STAR_CONTEXT: [self.stars, get_starred_repos, 'starred repos'],
  218. USER_CONTEXT: [self.users, get_user_repos, 'repos'],
  219. }
  220. all_git_assets = []
  221. owner_list = context_manager[context][0]
  222. git_asset_string = context_manager[context][2]
  223. for owner in owner_list:
  224. formatted_owner_name = owner.strip()
  225. git_assets = context_manager[context][1](owner)
  226. logger.debug(f'{formatted_owner_name} {git_asset_string} retrieved!')
  227. for item in git_assets:
  228. if context == GIST_CONTEXT:
  229. # Automatically add gists since we don't support forked gists
  230. all_git_assets.append(item)
  231. elif self.forks or (self.forks is False and item.fork is False):
  232. all_git_assets.append(item)
  233. else:
  234. # Do not include this forked asset
  235. pass
  236. final_sorted_list = sorted(all_git_assets, key=lambda item: item.owner.login)
  237. return final_sorted_list
  238. def iterate_repos_to_archive(self, repos: List[Repository.Repository], operation: str) -> List[Optional[str]]:
  239. """Iterate over each repository and start a thread if it can be archived.
  240. We ignore repos not in the include or in the exclude list if either are present.
  241. """
  242. logger = woodchips.get(LOGGER_NAME)
  243. pool = ThreadPoolExecutor(self.threads)
  244. thread_list = []
  245. for repo in repos:
  246. if (
  247. (not self.include and not self.exclude)
  248. or (self.include and repo.name in self.include)
  249. or (self.exclude and repo.name not in self.exclude)
  250. ):
  251. repo_owner_username = repo.owner.login.lower()
  252. repo_path = os.path.join(self.location, 'repos', repo_owner_username, repo.name)
  253. thread_list.append(
  254. pool.submit(
  255. self.archive_repo,
  256. repo=repo,
  257. repo_path=repo_path,
  258. operation=operation,
  259. )
  260. )
  261. else:
  262. logger.debug(f'{repo.name} skipped due to include/exclude filtering')
  263. wait(thread_list, return_when=ALL_COMPLETED)
  264. failed_repos = [repo.result() for repo in thread_list if repo.result()]
  265. return failed_repos
  266. def iterate_gists_to_archive(self, gists: List[Gist.Gist], operation: str) -> List[Optional[str]]:
  267. """Iterate over each gist and start a thread if it can be archived."""
  268. pool = ThreadPoolExecutor(self.threads)
  269. thread_list = []
  270. for gist in gists:
  271. gist_path = os.path.join(self.location, 'gists', gist.id)
  272. thread_list.append(
  273. pool.submit(
  274. self.archive_gist,
  275. gist=gist,
  276. gist_path=gist_path,
  277. operation=operation,
  278. )
  279. )
  280. wait(thread_list, return_when=ALL_COMPLETED)
  281. failed_gists = [gist.result() for gist in thread_list if gist.result()]
  282. return failed_gists
  283. def view_repos(self, repos: List[Repository.Repository]):
  284. """View a list of repos that will be cloned/pulled."""
  285. logger = woodchips.get(LOGGER_NAME)
  286. for repo in repos:
  287. repo_name = f'{repo.owner.login}/{repo.name}'
  288. logger.info(repo_name)
  289. def view_gists(self, gists: List[Gist.Gist]):
  290. """View a list of gists that will be cloned/pulled."""
  291. logger = woodchips.get(LOGGER_NAME)
  292. for gist in gists:
  293. gist_id = f'{gist.owner.login}/{gist.id}'
  294. logger.info(gist_id)
  295. def archive_repo(self, repo: Repository.Repository, repo_path: str, operation: str) -> Optional[str]:
  296. """Clone and pull repos based on the operation passed.
  297. We return the name of the repo if its git operation fails, otherwise return None.
  298. """
  299. logger = woodchips.get(LOGGER_NAME)
  300. failed_repo = None
  301. full_repo_name = os.path.join(repo.owner.login, repo.name) # We use a path here to properly remove failed dirs
  302. if (os.path.exists(repo_path) and operation == CLONE_OPERATION) or (
  303. not os.path.exists(repo_path) and operation == PULL_OPERATION
  304. ):
  305. pass
  306. else:
  307. commands = {
  308. PULL_OPERATION: ['git', '-C', repo_path, 'pull', '--rebase'],
  309. }
  310. if self.use_https:
  311. commands.update({CLONE_OPERATION: ['git', 'clone', repo.html_url, repo_path]})
  312. else:
  313. commands.update({CLONE_OPERATION: ['git', 'clone', repo.ssh_url, repo_path]})
  314. git_command = commands[operation]
  315. try:
  316. subprocess.run(
  317. git_command,
  318. stdout=subprocess.DEVNULL,
  319. stdin=subprocess.DEVNULL,
  320. stderr=subprocess.DEVNULL,
  321. check=True,
  322. timeout=self.timeout,
  323. )
  324. logger.info(f'Repo: {full_repo_name} {operation} success!')
  325. except subprocess.TimeoutExpired:
  326. logger.error(f'Git operation timed out archiving {repo.name}.')
  327. failed_repo = full_repo_name
  328. except subprocess.CalledProcessError as error:
  329. logger.error(f'Failed to {operation} {repo.name}\n{error}')
  330. failed_repo = full_repo_name
  331. return failed_repo
  332. def archive_gist(self, gist: Gist.Gist, gist_path: str, operation: str) -> Optional[str]:
  333. """Clone and pull gists based on the operation passed.
  334. We return the name of the gist if its git operation fails, otherwise return None.
  335. """
  336. logger = woodchips.get(LOGGER_NAME)
  337. failed_gist = None
  338. full_gist_id = os.path.join(gist.owner.login, gist.id) # We use a path here to properly remove failed dirs
  339. if (os.path.exists(gist_path) and operation == CLONE_OPERATION) or (
  340. not os.path.exists(gist_path) and operation == PULL_OPERATION
  341. ):
  342. pass
  343. else:
  344. commands = {
  345. CLONE_OPERATION: ['git', 'clone', gist.html_url, gist_path],
  346. PULL_OPERATION: ['git', '-C', gist_path, 'pull', '--rebase'],
  347. }
  348. git_command = commands[operation]
  349. try:
  350. subprocess.run(
  351. git_command,
  352. stdout=subprocess.DEVNULL,
  353. stdin=subprocess.DEVNULL,
  354. stderr=subprocess.DEVNULL,
  355. check=True,
  356. timeout=self.timeout,
  357. )
  358. logger.info(f'Gist: {full_gist_id} {operation} success!')
  359. except subprocess.TimeoutExpired:
  360. logger.error(f'Git operation timed out archiving {gist.id}.')
  361. failed_gist = full_gist_id
  362. except subprocess.CalledProcessError as error:
  363. logger.error(f'Failed to {operation} {gist.id}\n{error}')
  364. failed_gist = full_gist_id
  365. return failed_gist
  366. def remove_failed_dirs(self, dirs_location: str, failed_dirs: List[str]):
  367. """Removes a directory if it fails a git operation due to
  368. timing out or other errors so it can be retried on the next run.
  369. """
  370. logger = woodchips.get(LOGGER_NAME)
  371. def make_dir_writable(function, path, exception):
  372. """The `.git` folder on Windows cannot be gracefully removed due to being read-only,
  373. so we make the directory writable on a failure and retry the original function.
  374. """
  375. os.chmod(path, stat.S_IWRITE)
  376. function(path)
  377. for directory in set(failed_dirs):
  378. path = os.path.join(self.location, dirs_location, directory)
  379. if os.path.exists(path):
  380. logger.debug(f'Removing {directory} due to a failed git operation...')
  381. shutil.rmtree(path, onerror=make_dir_writable)