def test_write_temp_file(self): task_id = "some_test_id" sql = "some_sql" sql_params = {':p_data': "2018-01-01"} oracle_conn_id = "oracle_conn_id" filename = "some_filename" azure_data_lake_conn_id = 'azure_data_lake_conn_id' azure_data_lake_path = 'azure_data_lake_path' delimiter = '|' encoding = 'utf-8' cursor_description = [ ('id', "", 39, None, 38, 0, 0), ('description', "", 60, 240, None, None, 1), ] cursor_rows = [[1, 'description 1'], [2, 'description 2']] mock_cursor = MagicMock() mock_cursor.description = cursor_description mock_cursor.__iter__.return_value = cursor_rows op = OracleToAzureDataLakeOperator( task_id=task_id, filename=filename, oracle_conn_id=oracle_conn_id, sql=sql, sql_params=sql_params, azure_data_lake_conn_id=azure_data_lake_conn_id, azure_data_lake_path=azure_data_lake_path, delimiter=delimiter, encoding=encoding, ) with TemporaryDirectory(prefix='airflow_oracle_to_azure_op_') as temp: op._write_temp_file(mock_cursor, os.path.join(temp, filename)) assert os.path.exists(os.path.join(temp, filename)) == 1 with open(os.path.join(temp, filename), 'rb') as csvfile: temp_file = csv.reader(csvfile, delimiter=delimiter, encoding=encoding) rownum = 0 for row in temp_file: if rownum == 0: assert row[0] == 'id' assert row[1] == 'description' else: assert row[0] == str(cursor_rows[rownum - 1][0]) assert row[1] == cursor_rows[rownum - 1][1] rownum = rownum + 1