adls.py 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899
  1. # Licensed to the Apache Software Foundation (ASF) under one
  2. # or more contributor license agreements. See the NOTICE file
  3. # distributed with this work for additional information
  4. # regarding copyright ownership. The ASF licenses this file
  5. # to you under the Apache License, Version 2.0 (the
  6. # "License"); you may not use this file except in compliance
  7. # with the License. You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing,
  12. # software distributed under the License is distributed on an
  13. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  14. # KIND, either express or implied. See the License for the
  15. # specific language governing permissions and limitations
  16. # under the License.
  17. from typing import TYPE_CHECKING, Any, Sequence
  18. from airflow.models import BaseOperator
  19. from airflow.providers.microsoft.azure.hooks.data_lake import AzureDataLakeHook
  20. if TYPE_CHECKING:
  21. from airflow.utils.context import Context
  22. class ADLSDeleteOperator(BaseOperator):
  23. """
  24. Delete files in the specified path.
  25. .. seealso::
  26. For more information on how to use this operator, take a look at the guide:
  27. :ref:`howto/operator:ADLSDeleteOperator`
  28. :param path: A directory or file to remove
  29. :param recursive: Whether to loop into directories in the location and remove the files
  30. :param ignore_not_found: Whether to raise error if file to delete is not found
  31. :param azure_data_lake_conn_id: Reference to the :ref:`Azure Data Lake connection<howto/connection:adl>`.
  32. """
  33. template_fields: Sequence[str] = ('path',)
  34. ui_color = '#901dd2'
  35. def __init__(
  36. self,
  37. *,
  38. path: str,
  39. recursive: bool = False,
  40. ignore_not_found: bool = True,
  41. azure_data_lake_conn_id: str = 'azure_data_lake_default',
  42. **kwargs,
  43. ) -> None:
  44. super().__init__(**kwargs)
  45. self.path = path
  46. self.recursive = recursive
  47. self.ignore_not_found = ignore_not_found
  48. self.azure_data_lake_conn_id = azure_data_lake_conn_id
  49. def execute(self, context: "Context") -> Any:
  50. hook = AzureDataLakeHook(azure_data_lake_conn_id=self.azure_data_lake_conn_id)
  51. return hook.remove(path=self.path, recursive=self.recursive, ignore_not_found=self.ignore_not_found)
  52. class ADLSListOperator(BaseOperator):
  53. """
  54. List all files from the specified path
  55. This operator returns a python list with the names of files which can be used by
  56. `xcom` in the downstream tasks.
  57. :param path: The Azure Data Lake path to find the objects. Supports glob
  58. strings (templated)
  59. :param azure_data_lake_conn_id: Reference to the :ref:`Azure Data Lake connection<howto/connection:adl>`.
  60. **Example**:
  61. The following Operator would list all the Parquet files from ``folder/output/``
  62. folder in the specified ADLS account ::
  63. adls_files = ADLSListOperator(
  64. task_id='adls_files',
  65. path='folder/output/*.parquet',
  66. azure_data_lake_conn_id='azure_data_lake_default'
  67. )
  68. """
  69. template_fields: Sequence[str] = ('path',)
  70. ui_color = '#901dd2'
  71. def __init__(
  72. self, *, path: str, azure_data_lake_conn_id: str = 'azure_data_lake_default', **kwargs
  73. ) -> None:
  74. super().__init__(**kwargs)
  75. self.path = path
  76. self.azure_data_lake_conn_id = azure_data_lake_conn_id
  77. def execute(self, context: "Context") -> list:
  78. hook = AzureDataLakeHook(azure_data_lake_conn_id=self.azure_data_lake_conn_id)
  79. self.log.info('Getting list of ADLS files in path: %s', self.path)
  80. return hook.list(path=self.path)