archiver_4.py 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2017 Vector Creations Ltd
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. from paramiko.client import AutoAddPolicy, SSHClient
  16. from datetime import date
  17. from collections import namedtuple
  18. import argparse
  19. import progressbar
  20. import gzip
  21. import re
  22. import os
  23. import os.path
  24. import yaml
  25. FIND_COMMAND_TEMPLATE = 'find %(dir)s -name "%(glob)s"'
  26. DATE_REGEX = re.compile("(20[0-9][0-9])-([0-9][0-9])-([0-9][0-9])")
  27. Service = namedtuple("Service", (
  28. "name", "host", "account", "directory", "pattern",
  29. "days_to_keep_on_remote", "retention_period_days",
  30. ))
  31. def filter_by_age(files, comparator):
  32. """Filter files based on the date in their name relative to today.
  33. Args:
  34. files (iterable(str)): filenames with a date in them
  35. comparator (func): A function that takes a date.timedelta and returns
  36. a bool that indicates whether to include file in the output list
  37. Returns:
  38. list(str): The filtered list of files
  39. """
  40. today = date.today()
  41. results = []
  42. for f in files:
  43. m = DATE_REGEX.search(f)
  44. if not m:
  45. continue
  46. f_date = date(int(m.group(1)), int(m.group(2)), int(m.group(3)))
  47. if comparator(today - f_date):
  48. results.append((f_date, f))
  49. results.sort()
  50. return [f for _, f in results]
  51. class Archiver(object):
  52. def __init__(self, base_dir, verbose, dry_run, remove, use_ssh_agent):
  53. """
  54. Args:
  55. base_dir(str): Local base path to log files to
  56. verbose(bool): Print what its doing
  57. dry_run(bool): Don't actually copy files, just print
  58. remove(bool): Actually remove remote files
  59. use_ssh_agent(bool): Allow SSH client to try keys in SSH agent
  60. """
  61. self.base_dir = base_dir
  62. self.verbose = verbose
  63. self.dry_run = dry_run
  64. self.remove = remove
  65. self.use_ssh_agent = use_ssh_agent
  66. def archive_service(self, service):
  67. """Actually do the archiving step for the given Service
  68. """
  69. # Create the base directory for this service, i.e. where we put logs.
  70. base_dir = os.path.join(self.base_dir, service.name, service.host)
  71. if not os.path.exists(base_dir):
  72. os.makedirs(base_dir)
  73. if "<DATE->" not in service.pattern:
  74. # We ignore services that don't have a <DATE-> in their pattern
  75. print ("Warning:", service.name, "does not include date. Ignoring.")
  76. # Connect to remote
  77. client = SSHClient()
  78. # TODO: Use something other than auto add policy?
  79. client.set_missing_host_key_policy(AutoAddPolicy())
  80. client.connect(
  81. service.host,
  82. username=service.account,
  83. compress=True,
  84. allow_agent=self.use_ssh_agent,
  85. )
  86. # Fetch list of files from the remote
  87. glob = service.pattern.replace("<DATE->", "????-??-??")
  88. cmd = FIND_COMMAND_TEMPLATE % {
  89. "dir": service.directory,
  90. "glob": glob,
  91. }
  92. _, stdout, _ = client.exec_command(cmd)
  93. files = stdout.readlines()
  94. files[:] = list(f.strip() for f in files)
  95. files.sort()
  96. # Filter the files to ones we want to archive
  97. files = filter_by_age(
  98. files,
  99. lambda d: d.days > service.days_to_keep_on_remote
  100. )
  101. # For each file download to a pending file name (optionally gzipping)
  102. # and only after it has succesfully been downloaded do we optionally
  103. # delete from the remote.
  104. sftp = client.open_sftp()
  105. for file_name in files:
  106. local_name = os.path.join(base_dir, os.path.basename(file_name))
  107. if not file_name.endswith(".gz"):
  108. local_name += ".gz"
  109. pending_name = local_name + ".download"
  110. if os.path.exists(pending_name):
  111. os.remove(pending_name)
  112. if os.path.exists(local_name):
  113. print ("Warning: ", local_name, "already exists")
  114. continue
  115. # Set up progress bar for downloads
  116. if self.verbose:
  117. widgets = [
  118. os.path.basename(file_name), " ",
  119. progressbar.Percentage(),
  120. ' ', progressbar.Bar(),
  121. ' ', progressbar.ETA(),
  122. ' ', progressbar.FileTransferSpeed(),
  123. ]
  124. pb = progressbar.ProgressBar(widgets=widgets)
  125. def progress_cb(bytes_downloaded, total_size):
  126. pb.max_value = total_size
  127. pb.update(bytes_downloaded)
  128. else:
  129. def progress_cb(bytes_downloaded, total_size):
  130. pass
  131. if self.verbose or self.dry_run:
  132. print ("Archiving: %s:%s to %s" % (
  133. service.host, file_name, local_name,
  134. )
  135. )
  136. if not self.dry_run:
  137. # If filename does not end with '.gz' then we compress while
  138. # we download
  139. # TODO: Should we be preserving last modified times?
  140. if not file_name.endswith(".gz"):
  141. with gzip.open(pending_name, 'wb', compresslevel=9) as f:
  142. sftp.getfo(file_name, f, callback=progress_cb)
  143. else:
  144. sftp.get(file_name, pending_name, callback=progress_cb)
  145. if self.verbose:
  146. pb.finish()
  147. os.rename(pending_name, local_name)
  148. if self.remove:
  149. if self.verbose:
  150. print ("Removing remote")
  151. sftp.remove(file_name)
  152. sftp.close()
  153. client.close()
  154. # We now go and delete any files that are older than the retention
  155. # period, if specified
  156. if service.retention_period_days:
  157. local_files = list(
  158. os.path.join(dirpath, filename)
  159. for dirpath, _, filenames in os.walk(base_dir)
  160. for filename in filenames
  161. )
  162. files_to_delete = filter_by_age(
  163. local_files,
  164. lambda d: d.days > service.retention_period_days
  165. )
  166. for file_name in files_to_delete:
  167. if self.verbose or self.dry_run:
  168. print ("Deleting file due to retention policy: %s" % (
  169. file_name,
  170. ))
  171. if not self.dry_run:
  172. os.remove(file_name)
  173. if __name__ == "__main__":
  174. parser = argparse.ArgumentParser()
  175. parser.add_argument("config")
  176. parser.add_argument("-v", "--verbose", help="increase output verbosity",
  177. action="store_true")
  178. parser.add_argument("-n", "--dry-run",
  179. help="print files that would be archived",
  180. action="store_true")
  181. parser.add_argument("--remove", help="remove files from remote",
  182. action="store_true")
  183. parser.add_argument("--use-ssh-agent",
  184. help="allow using keys from ssh agent",
  185. action="store_true")
  186. args = parser.parse_args()
  187. config_file = args.config
  188. config = yaml.load(open(config_file))
  189. base_dir = config["archive_dir"]
  190. services = [
  191. Service(
  192. name=name,
  193. host=host,
  194. account=serv_config["account"],
  195. directory=serv_config["directory"],
  196. pattern=serv_config["pattern"],
  197. days_to_keep_on_remote=serv_config["days_to_keep_on_remote"],
  198. retention_period_days=serv_config.get("retention_period_days"),
  199. )
  200. for name, serv_config in config["services"].iteritems()
  201. for host in serv_config["hosts"]
  202. ]
  203. archiver = Archiver(
  204. base_dir, args.verbose, args.dry_run, args.remove, args.use_ssh_agent
  205. )
  206. for service in services:
  207. if args.verbose:
  208. print ("Handling", service.name, service.host)
  209. try:
  210. archiver.archive_service(service)
  211. except Exception as e:
  212. print ("Error while processing", service.name, service.host, e)