test_oracle_to_azure_data_lake_1.py 1.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748
  1. def test_write_temp_file(self):
  2. task_id = "some_test_id"
  3. sql = "some_sql"
  4. sql_params = {':p_data': "2018-01-01"}
  5. oracle_conn_id = "oracle_conn_id"
  6. filename = "some_filename"
  7. azure_data_lake_conn_id = 'azure_data_lake_conn_id'
  8. azure_data_lake_path = 'azure_data_lake_path'
  9. delimiter = '|'
  10. encoding = 'utf-8'
  11. cursor_description = [
  12. ('id', "<class 'cx_Oracle.NUMBER'>", 39, None, 38, 0, 0),
  13. ('description', "<class 'cx_Oracle.STRING'>", 60, 240, None, None, 1),
  14. ]
  15. cursor_rows = [[1, 'description 1'], [2, 'description 2']]
  16. mock_cursor = MagicMock()
  17. mock_cursor.description = cursor_description
  18. mock_cursor.__iter__.return_value = cursor_rows
  19. op = OracleToAzureDataLakeOperator(
  20. task_id=task_id,
  21. filename=filename,
  22. oracle_conn_id=oracle_conn_id,
  23. sql=sql,
  24. sql_params=sql_params,
  25. azure_data_lake_conn_id=azure_data_lake_conn_id,
  26. azure_data_lake_path=azure_data_lake_path,
  27. delimiter=delimiter,
  28. encoding=encoding,
  29. )
  30. with TemporaryDirectory(prefix='airflow_oracle_to_azure_op_') as temp:
  31. op._write_temp_file(mock_cursor, os.path.join(temp, filename))
  32. assert os.path.exists(os.path.join(temp, filename)) == 1
  33. with open(os.path.join(temp, filename), 'rb') as csvfile:
  34. temp_file = csv.reader(csvfile, delimiter=delimiter, encoding=encoding)
  35. rownum = 0
  36. for row in temp_file:
  37. if rownum == 0:
  38. assert row[0] == 'id'
  39. assert row[1] == 'description'
  40. else:
  41. assert row[0] == str(cursor_rows[rownum - 1][0])
  42. assert row[1] == cursor_rows[rownum - 1][1]
  43. rownum = rownum + 1