file.py 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224
  1. #
  2. # Licensed to the Apache Software Foundation (ASF) under one
  3. # or more contributor license agreements. See the NOTICE file
  4. # distributed with this work for additional information
  5. # regarding copyright ownership. The ASF licenses this file
  6. # to you under the Apache License, Version 2.0 (the
  7. # "License"); you may not use this file except in compliance
  8. # with the License. You may obtain a copy of the License at
  9. #
  10. # http://www.apache.org/licenses/LICENSE-2.0
  11. #
  12. # Unless required by applicable law or agreed to in writing,
  13. # software distributed under the License is distributed on an
  14. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  15. # KIND, either express or implied. See the License for the
  16. # specific language governing permissions and limitations
  17. # under the License.
  18. import logging
  19. import os
  20. import re
  21. import zipfile
  22. from pathlib import Path
  23. from typing import Dict, Generator, List, Optional, Pattern
  24. from airflow.configuration import conf
  25. log = logging.getLogger(__name__)
  26. def TemporaryDirectory(*args, **kwargs): # pylint: disable=invalid-name
  27. """This function is deprecated. Please use `tempfile.TemporaryDirectory`"""
  28. import warnings
  29. from tempfile import TemporaryDirectory as TmpDir
  30. warnings.warn(
  31. "This function is deprecated. Please use `tempfile.TemporaryDirectory`",
  32. DeprecationWarning,
  33. stacklevel=2,
  34. )
  35. return TmpDir(*args, **kwargs)
  36. def mkdirs(path, mode):
  37. """
  38. Creates the directory specified by path, creating intermediate directories
  39. as necessary. If directory already exists, this is a no-op.
  40. :param path: The directory to create
  41. :type path: str
  42. :param mode: The mode to give to the directory e.g. 0o755, ignores umask
  43. :type mode: int
  44. """
  45. import warnings
  46. warnings.warn(
  47. f"This function is deprecated. Please use `pathlib.Path({path}).mkdir`",
  48. DeprecationWarning,
  49. stacklevel=2,
  50. )
  51. Path(path).mkdir(mode=mode, parents=True, exist_ok=True)
  52. ZIP_REGEX = re.compile(r'((.*\.zip){})?(.*)'.format(re.escape(os.sep)))
  53. def correct_maybe_zipped(fileloc):
  54. """
  55. If the path contains a folder with a .zip suffix, then
  56. the folder is treated as a zip archive and path to zip is returned.
  57. """
  58. _, archive, _ = ZIP_REGEX.search(fileloc).groups()
  59. if archive and zipfile.is_zipfile(archive):
  60. return archive
  61. else:
  62. return fileloc
  63. def open_maybe_zipped(fileloc, mode='r'):
  64. """
  65. Opens the given file. If the path contains a folder with a .zip suffix, then
  66. the folder is treated as a zip archive, opening the file inside the archive.
  67. :return: a file object, as in `open`, or as in `ZipFile.open`.
  68. """
  69. _, archive, filename = ZIP_REGEX.search(fileloc).groups()
  70. if archive and zipfile.is_zipfile(archive):
  71. return zipfile.ZipFile(archive, mode=mode).open(filename)
  72. else:
  73. return open(fileloc, mode=mode)
  74. def find_path_from_directory(base_dir_path: str, ignore_file_name: str) -> Generator[str, None, None]:
  75. """
  76. Search the file and return the path of the file that should not be ignored.
  77. :param base_dir_path: the base path to be searched for.
  78. :param ignore_file_name: the file name in which specifies a regular expression pattern is written.
  79. :return : file path not to be ignored.
  80. """
  81. patterns_by_dir: Dict[str, List[Pattern[str]]] = {}
  82. for root, dirs, files in os.walk(str(base_dir_path), followlinks=True):
  83. patterns: List[Pattern[str]] = patterns_by_dir.get(root, [])
  84. ignore_file_path = os.path.join(root, ignore_file_name)
  85. if os.path.isfile(ignore_file_path):
  86. with open(ignore_file_path) as file:
  87. lines_no_comments = [re.sub(r"\s*#.*", "", line) for line in file.read().split("\n")]
  88. patterns += [re.compile(line) for line in lines_no_comments if line]
  89. patterns = list(set(patterns))
  90. dirs[:] = [
  91. subdir
  92. for subdir in dirs
  93. if not any(
  94. p.search(os.path.join(os.path.relpath(root, str(base_dir_path)), subdir)) for p in patterns
  95. )
  96. ]
  97. patterns_by_dir.update({os.path.join(root, sd): patterns.copy() for sd in dirs})
  98. for file in files: # type: ignore
  99. if file == ignore_file_name:
  100. continue
  101. abs_file_path = os.path.join(root, str(file))
  102. rel_file_path = os.path.join(os.path.relpath(root, str(base_dir_path)), str(file))
  103. if any(p.search(rel_file_path) for p in patterns):
  104. continue
  105. yield str(abs_file_path)
  106. def list_py_file_paths(
  107. directory: str,
  108. safe_mode: bool = conf.getboolean('core', 'DAG_DISCOVERY_SAFE_MODE', fallback=True),
  109. include_examples: Optional[bool] = None,
  110. include_smart_sensor: Optional[bool] = conf.getboolean('smart_sensor', 'use_smart_sensor'),
  111. ):
  112. """
  113. Traverse a directory and look for Python files.
  114. :param directory: the directory to traverse
  115. :type directory: unicode
  116. :param safe_mode: whether to use a heuristic to determine whether a file
  117. contains Airflow DAG definitions. If not provided, use the
  118. core.DAG_DISCOVERY_SAFE_MODE configuration setting. If not set, default
  119. to safe.
  120. :type safe_mode: bool
  121. :param include_examples: include example DAGs
  122. :type include_examples: bool
  123. :param include_smart_sensor: include smart sensor native control DAGs
  124. :type include_examples: bool
  125. :return: a list of paths to Python files in the specified directory
  126. :rtype: list[unicode]
  127. """
  128. if include_examples is None:
  129. include_examples = conf.getboolean('core', 'LOAD_EXAMPLES')
  130. file_paths: List[str] = []
  131. if directory is None:
  132. file_paths = []
  133. elif os.path.isfile(directory):
  134. file_paths = [directory]
  135. elif os.path.isdir(directory):
  136. find_dag_file_paths(directory, file_paths, safe_mode)
  137. if include_examples:
  138. from airflow import example_dags
  139. example_dag_folder = example_dags.__path__[0] # type: ignore
  140. file_paths.extend(list_py_file_paths(example_dag_folder, safe_mode, False, False))
  141. if include_smart_sensor:
  142. from airflow import smart_sensor_dags
  143. smart_sensor_dag_folder = smart_sensor_dags.__path__[0] # type: ignore
  144. file_paths.extend(list_py_file_paths(smart_sensor_dag_folder, safe_mode, False, False))
  145. return file_paths
  146. def find_dag_file_paths(directory: str, file_paths: list, safe_mode: bool):
  147. """Finds file paths of all DAG files."""
  148. for file_path in find_path_from_directory(directory, ".airflowignore"):
  149. try:
  150. if not os.path.isfile(file_path):
  151. continue
  152. _, file_ext = os.path.splitext(os.path.split(file_path)[-1])
  153. if file_ext != '.py' and not zipfile.is_zipfile(file_path):
  154. continue
  155. if not might_contain_dag(file_path, safe_mode):
  156. continue
  157. file_paths.append(file_path)
  158. except Exception: # noqa pylint: disable=broad-except
  159. log.exception("Error while examining %s", file_path)
  160. COMMENT_PATTERN = re.compile(r"\s*#.*")
  161. def might_contain_dag(file_path: str, safe_mode: bool, zip_file: Optional[zipfile.ZipFile] = None):
  162. """
  163. Heuristic that guesses whether a Python file contains an Airflow DAG definition.
  164. :param file_path: Path to the file to be checked.
  165. :param safe_mode: Is safe mode active?. If no, this function always returns True.
  166. :param zip_file: if passed, checks the archive. Otherwise, check local filesystem.
  167. :return: True, if file might contain DAGS.
  168. """
  169. if not safe_mode:
  170. return True
  171. if zip_file:
  172. with zip_file.open(file_path) as current_file:
  173. content = current_file.read()
  174. else:
  175. if zipfile.is_zipfile(file_path):
  176. return True
  177. with open(file_path, 'rb') as dag_file:
  178. content = dag_file.read()
  179. content = content.lower()
  180. return all(s in content for s in (b'dag', b'airflow'))
  181. def get_sha1hash(file_path: str) -> str:
  182. import hashlib
  183. with open(file_path, 'rb') as file:
  184. return hashlib.sha1(file.read()).hexdigest()