caseview.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276
  1. import datetime
  2. import os
  3. from django.db.models import Q
  4. from django.http import HttpResponse, HttpResponseBadRequest
  5. from rest_framework.response import Response
  6. from rest_framework.views import APIView
  7. from TestLaboratory.settings import CASE_FILE_ROOT, CASE_FILE_EXPORT, HTTP_HEAD
  8. from apps.file.models import FileManager
  9. from apps.log.models import get_log, gen_log
  10. from apps.task.models import TestTask, TestCase, BeidouCase
  11. from apps.user.middleware.rolecontrol import RoleControl
  12. from apps.user.models import User
  13. from util_xlsx import load_xlsx, write_to_xlsx
  14. from utils.util_add_id import get_id
  15. from utils.util_file.util_fileio import write_file
  16. import logging
  17. logger = logging.getLogger('django')
  18. class CaseView(APIView):
  19. # 登录权限验证
  20. authentication_classes = []
  21. @staticmethod
  22. def convert_to_string(word_list):
  23. for i in range(len(word_list)):
  24. if word_list[i] is None:
  25. word_list[i] = ''
  26. elif not isinstance(word_list[i], str):
  27. word_list[i] = str(word_list[i])
  28. return word_list
  29. # get testcase
  30. @staticmethod
  31. # @RoleControl
  32. def get(request, task_id, *args, **kwargs):
  33. export_type = request.GET.get('type')
  34. task = TestTask.objects.filter(id=task_id, delete=False)
  35. if not task:
  36. logger.error("测试任务已删除或不存在")
  37. return HttpResponse(status=404, content='测试任务已删除或不存在')
  38. task = task[0]
  39. cases = None
  40. file_name = None
  41. if export_type == '0':
  42. cases = BeidouCase.objects.filter(task=task, delete=False)
  43. file_name = task.title+'-全部测试用例.xls'
  44. elif export_type == '1':
  45. cases = BeidouCase.objects.filter(~Q(state=0), task=task, delete=False)
  46. file_name = task.title+'-已执行测试用例.xls'
  47. elif export_type == '2':
  48. cases = BeidouCase.objects.filter(state=2, task=task, delete=False)
  49. file_name = task.title+'-不合格测试用例.xls'
  50. if not cases:
  51. logger.error("无符合要求的测试用例")
  52. return HttpResponse(status=404, content='无符合要求的测试用例')
  53. update_time = datetime.datetime.now()
  54. file_dir = CASE_FILE_EXPORT + task_id + "/" + str(update_time)
  55. if not os.path.exists(file_dir):
  56. os.makedirs(file_dir)
  57. file_path = file_dir + "/" + file_name
  58. cases_info = [
  59. ['测试项目', '版本', '测试内容', '定位精度', '定时精度', '伪距测量精度', '载波测量精度', '测速精度',
  60. '捕获灵敏度', '接收灵敏度', '跟踪灵敏度', '功率范围', '自主完好性', '系统完好性', '首次捕获时间',
  61. '失锁重捕时间', '定位测速更新率', '跟踪通道数', '最大速度', '最大加速度', '执行结果', '测试人员', '备注']]
  62. for case in cases:
  63. power = ''
  64. if case.power_range_low and case.power_range_high:
  65. power = str(case.power_range_low) + '-' + str(case.power_range_high)
  66. state = '成功'
  67. if case.state == 2:
  68. state = '失败'
  69. executor_name = User.objects.filter(id=case.executor_id)[0].username
  70. cases_info.append(CaseView.convert_to_string(
  71. [case.software.name, case.version, case.title, case.positioning_accuracy, case.timing_accuracy,
  72. case.pseudo_range_measurement_accuracy, case.carrier_measurement_accuracy,
  73. case.speed_measurement_accuracy, case.sensitivity_capture, case.receiving_sensitivity,
  74. case.sensitivity_track, power, case.self_integrity, case.system_integrity, case.capture_time,
  75. case.recapture_time, case.update_rate, case.trace_channels, case.max_velocity, case.max_acceleration,
  76. state, executor_name, case.remark]
  77. ))
  78. sio = write_to_xlsx(file_path, cases_info)
  79. sio.seek(0)
  80. response = HttpResponse(sio.getvalue(), content_type='application/vnd.ms-excel')
  81. # response['Content-Disposition'] = 'attachment; filename*=UTF-8''{}'.format(escape_uri_path(file_name))
  82. response['Content-Disposition'] = 'attachment; filename="{0}"'.format(file_name).encode('utf-8', 'ISO-8859-1')
  83. response.write(sio.getvalue())
  84. # executor, action, method = get_log(request)
  85. # gen_log(action, "测试任务", task.title, method, executor)
  86. return response
  87. class CaseExecute(APIView):
  88. # 登录权限验证
  89. authentication_classes = []
  90. # execute task
  91. @staticmethod
  92. @RoleControl
  93. def post(request, task_id, *args, **kwargs):
  94. executor, action, method = get_log(request)
  95. task = TestTask.objects.filter(id=task_id, delete=False)
  96. if not task:
  97. logger.error("测试任务已删除或不存在")
  98. return HttpResponseBadRequest(status=404, content='测试任务已删除或不存在')
  99. task = task[0]
  100. file = request.FILES.get('file')
  101. create_time = update_time = datetime.datetime.now()
  102. write_file(file, CASE_FILE_ROOT + task_id + "/" + str(create_time))
  103. return_info = load_xlsx(
  104. CASE_FILE_ROOT + task_id + "/" + str(create_time) + '/' + file.name,
  105. task)
  106. if return_info['code'] != 200:
  107. logger.error(str(return_info['info']))
  108. return Response({'status': return_info['code'],
  109. 'error': str(return_info['info'])})
  110. case_info_dict_list = []
  111. case_infos = return_info['info']
  112. total_num = len(case_infos)
  113. not_execute_num = success_num = fail_num = 0
  114. id_error = []
  115. for i in range(len(case_infos)):
  116. case_info_list = case_infos[i]
  117. cases_same_id = TestCase.objects.filter(id=task.id,
  118. id_in_task=str(
  119. case_info_list[0]),
  120. delete=False)
  121. if cases_same_id:
  122. case_name = cases_same_id[0].name
  123. if case_name != case_info_list[1]:
  124. id_error.append('第{}行id为{}的测试用例名称与前版本不同,请检查错误'
  125. .format(i + 3, cases_same_id[0].id_in_task))
  126. state = 0
  127. if case_info_list[9] == '未执行':
  128. not_execute_num += 1
  129. if case_info_list[9] == '执行成功':
  130. state = 1
  131. success_num += 1
  132. elif case_info_list[9] == '执行失败':
  133. state = 2
  134. fail_num += 1
  135. case_info = {
  136. 'id_in_task': str(case_info_list[0]),
  137. 'name': case_info_list[1],
  138. 'requisite': case_info_list[2],
  139. 'priority': case_info_list[3],
  140. 'environment': case_info_list[4],
  141. 'type': case_info_list[5],
  142. 'process': case_info_list[6],
  143. 'expected_result': case_info_list[7],
  144. 'actual_result': case_info_list[8],
  145. 'state': state,
  146. 'writer': case_info_list[10],
  147. 'assessor': case_info_list[11],
  148. 'executor': case_info_list[12],
  149. 'task': task,
  150. 'remark': case_info_list[13],
  151. 'software': task.plan.software,
  152. 'version': task.plan.version
  153. }
  154. case_info_dict_list.append(case_info)
  155. if id_error:
  156. logger.error("id错误")
  157. return Response({'status': 50001, 'error': id_error})
  158. # 在创建测试用例阶段,每次创建都为新建,新建之前删除task下的所有测试用例
  159. if task.state == 0:
  160. cases_before = task.testcase_set.filter(delete=False)
  161. for case_before in cases_before:
  162. case_before.delete = True
  163. case_before.save()
  164. for case_info in case_info_dict_list:
  165. case_info['id'] = get_id(TestCase, "TestLaboratory_V1_Case_1")
  166. case_info['create_time'] = create_time
  167. case_info['update_time'] = update_time
  168. case_info['executor_id_id'] = executor.id
  169. TestCase.objects.create(**case_info)
  170. elif task.state == 1:
  171. for i in range(len(case_info_dict_list)):
  172. case_info = case_info_dict_list[i]
  173. case = TestCase.objects.filter(delete=False, task=task,
  174. id_in_task=case_info[
  175. 'id_in_task'])
  176. if not case:
  177. logger.error(
  178. 'id为{}的测试用例不存在'.format(case_info['id_in_task']))
  179. return Response({'status': 50001,
  180. 'error': 'id为{}的测试用例不存在'.format(
  181. case_info['id_in_task'])})
  182. elif len(case) > 1:
  183. logger.error(
  184. 'id为{}的测试用例存在多个'.format(case_info['id_in_task']))
  185. return Response({'status': 50001,
  186. 'error': 'id为{}的测试用例存在多个'.format(
  187. case_info['id_in_task'])})
  188. case.update(**case_info)
  189. case[0].save()
  190. file_id = get_id(FileManager, "TestLaboratory_V1_File_1")
  191. try:
  192. file_new = FileManager.objects.create(id=file_id,
  193. category="case_file",
  194. path=task_id + "/" + str(
  195. create_time) + "/" + file.name,
  196. create_time=create_time,
  197. update_time=update_time)
  198. except:
  199. logger.error("文件路径写入数据库失败")
  200. return Response({'status': 50001, 'error': "文件路径写入数据库失败"})
  201. task.case_file = file_id
  202. task.case_all = len(task.testcase_set.filter(delete=False))
  203. task.case_not_execute = len(
  204. task.testcase_set.filter(delete=False, state=0))
  205. task.case_success = len(task.testcase_set.filter(delete=False, state=1))
  206. task.case_fail = len(task.testcase_set.filter(delete=False, state=2))
  207. print(task.case_all, task.case_not_execute, task.case_success,
  208. task.case_fail)
  209. task.save()
  210. executionInfo = [
  211. {
  212. 'operationName': 'filename',
  213. 'content': file.name
  214. },
  215. {
  216. 'operationName': 'fileurl',
  217. 'content': HTTP_HEAD + CASE_FILE_ROOT + file_new.path
  218. },
  219. {
  220. 'operationName': 'state',
  221. 'content': task.state
  222. },
  223. {
  224. 'operationName': 'updateTime',
  225. 'content': update_time
  226. },
  227. {
  228. 'operationName': 'totalNum',
  229. 'content': task.case_all
  230. },
  231. {
  232. 'operationName': 'notExecuteNum',
  233. 'content': task.case_not_execute
  234. },
  235. {
  236. 'operationName': 'successNum',
  237. 'content': task.case_success
  238. },
  239. {
  240. 'operationName': 'failNum',
  241. 'content': task.case_fail
  242. }
  243. ]
  244. gen_log(action, "测试任务", task.title, method, executor)
  245. return Response(executionInfo)