123456789101112131415161718192021222324252627282930313233343536373839404142434445464748 |
- def test_write_temp_file(self):
- task_id = "some_test_id"
- sql = "some_sql"
- sql_params = {':p_data': "2018-01-01"}
- oracle_conn_id = "oracle_conn_id"
- filename = "some_filename"
- azure_data_lake_conn_id = 'azure_data_lake_conn_id'
- azure_data_lake_path = 'azure_data_lake_path'
- delimiter = '|'
- encoding = 'utf-8'
- cursor_description = [
- ('id', "<class 'cx_Oracle.NUMBER'>", 39, None, 38, 0, 0),
- ('description', "<class 'cx_Oracle.STRING'>", 60, 240, None, None, 1),
- ]
- cursor_rows = [[1, 'description 1'], [2, 'description 2']]
- mock_cursor = MagicMock()
- mock_cursor.description = cursor_description
- mock_cursor.__iter__.return_value = cursor_rows
- op = OracleToAzureDataLakeOperator(
- task_id=task_id,
- filename=filename,
- oracle_conn_id=oracle_conn_id,
- sql=sql,
- sql_params=sql_params,
- azure_data_lake_conn_id=azure_data_lake_conn_id,
- azure_data_lake_path=azure_data_lake_path,
- delimiter=delimiter,
- encoding=encoding,
- )
- with TemporaryDirectory(prefix='airflow_oracle_to_azure_op_') as temp:
- op._write_temp_file(mock_cursor, os.path.join(temp, filename))
- assert os.path.exists(os.path.join(temp, filename)) == 1
- with open(os.path.join(temp, filename), 'rb') as csvfile:
- temp_file = csv.reader(csvfile, delimiter=delimiter, encoding=encoding)
- rownum = 0
- for row in temp_file:
- if rownum == 0:
- assert row[0] == 'id'
- assert row[1] == 'description'
- else:
- assert row[0] == str(cursor_rows[rownum - 1][0])
- assert row[1] == cursor_rows[rownum - 1][1]
- rownum = rownum + 1
|