caseview.py 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224
  1. import datetime
  2. import os
  3. from django.db.models import Q
  4. from django.http import HttpResponse, HttpResponseBadRequest
  5. from rest_framework.response import Response
  6. from rest_framework.views import APIView
  7. from TestLaboratory.settings import CASE_FILE_ROOT, CASE_FILE_EXPORT, HTTP_HEAD
  8. from apps.file.models import FileManager
  9. from apps.log.models import get_log, gen_log
  10. from apps.task.models import TestTask, TestCase
  11. from apps.user.middleware.rolecontrol import RoleControl
  12. from util_xlsx import load_xlsx, write_to_xlsx
  13. from utils.util_add_id import get_id
  14. from utils.util_file.util_fileio import write_file
  15. class CaseView(APIView):
  16. # 登录权限验证
  17. authentication_classes = []
  18. # get testcase
  19. @staticmethod
  20. # @RoleControl
  21. def get(request, task_id, *args, **kwargs):
  22. export_type = request.GET.get('type')
  23. task = TestTask.objects.filter(id=task_id, delete=False)
  24. if not task:
  25. return HttpResponse(status=404, content='测试任务已删除或不存在')
  26. task = task[0]
  27. cases = None
  28. file_name = None
  29. if export_type == '0':
  30. cases = TestCase.objects.filter(task=task, delete=False)
  31. file_name = task.title+'-全部测试用例.xls'
  32. elif export_type == '1':
  33. cases = TestCase.objects.filter(~Q(state=0), task=task, delete=False)
  34. file_name = task.title+'-已执行测试用例.xls'
  35. elif export_type == '2':
  36. cases = TestCase.objects.filter(state=2, task=task, delete=False)
  37. file_name = task.title+'-不合格测试用例.xls'
  38. if not cases:
  39. return HttpResponse(status=404, content='无符合要求的测试用例')
  40. update_time = datetime.datetime.now()
  41. file_dir = CASE_FILE_EXPORT + task_id + "/" + str(update_time)
  42. if not os.path.exists(file_dir):
  43. os.makedirs(file_dir)
  44. file_path = file_dir + "/" + file_name
  45. cases_info = [['被测软件ID:', task.plan.software.id],
  46. ['被测软件名:', task.plan.software.name],
  47. ['被测软件版本', task.plan.version],
  48. ['编号', '名称', '前置条件', '优先级', '测试环境', '测试类型', '测试步骤', '预期结果', '实际结果', '状态',
  49. '编写人', '评审员', '执行人', '备注']]
  50. for case in cases:
  51. state = '未执行'
  52. if case.state == 1:
  53. state = '执行成功'
  54. elif case.state == 2:
  55. state = '执行失败'
  56. cases_info.append([case.id_in_task, case.name, case.requisite, case.priority, case.environment,
  57. case.type, case.process, case.expected_result, case.actual_result, state,
  58. case.writer, case.assessor, case.executor, case.remark])
  59. sio = write_to_xlsx(file_path, cases_info)
  60. sio.seek(0)
  61. response = HttpResponse(sio.getvalue(), content_type='application/vnd.ms-excel')
  62. # response['Content-Disposition'] = 'attachment; filename*=UTF-8''{}'.format(escape_uri_path(file_name))
  63. response['Content-Disposition'] = 'attachment; filename="{0}"'.format(file_name).encode('utf-8', 'ISO-8859-1')
  64. response.write(sio.getvalue())
  65. # executor, action, method = get_log(request)
  66. # gen_log(action, "测试任务", task.title, method, executor)
  67. return response
  68. # execute task
  69. @staticmethod
  70. @RoleControl
  71. def put(request, task_id, *args, **kwargs):
  72. task = TestTask.objects.filter(id=task_id, delete=False)
  73. if not task:
  74. return HttpResponseBadRequest(status=404, content='测试任务已删除或不存在')
  75. task = task[0]
  76. file = request.FILES.get('file')
  77. create_time = update_time = datetime.datetime.now()
  78. write_file(file, CASE_FILE_ROOT + task_id + "/" + str(create_time))
  79. return_info = load_xlsx(CASE_FILE_ROOT + task_id + "/" + str(create_time) + '/' + file.name, task)
  80. if return_info['code'] != 200:
  81. return Response({'status': return_info['code'], 'error': str(return_info['info'])})
  82. case_info_dict_list = []
  83. case_infos = return_info['info']
  84. total_num = len(case_infos)
  85. not_execute_num = success_num = fail_num = 0
  86. id_error = []
  87. for i in range(len(case_infos)):
  88. case_info_list = case_infos[i]
  89. cases_same_id = TestCase.objects.filter(software=task.plan.software, id_in_task=str(case_info_list[0]),
  90. delete=False)
  91. if cases_same_id:
  92. case_name = cases_same_id[0].name
  93. if case_name != case_info_list[1]:
  94. id_error.append('第{}行id为{}的测试用例名称与前版本不同,请检查错误'
  95. .format(i + 3, cases_same_id[0].id_in_task))
  96. state = 0
  97. if case_info_list[9] == '未执行':
  98. not_execute_num += 1
  99. if case_info_list[9] == '执行成功':
  100. state = 1
  101. success_num += 1
  102. elif case_info_list[9] == '执行失败':
  103. state = 2
  104. fail_num += 1
  105. case_info = {
  106. 'id_in_task': str(case_info_list[0]),
  107. 'name': case_info_list[1],
  108. 'requisite': case_info_list[2],
  109. 'priority': case_info_list[3],
  110. 'environment': case_info_list[4],
  111. 'type': case_info_list[5],
  112. 'process': case_info_list[6],
  113. 'expected_result': case_info_list[7],
  114. 'actual_result': case_info_list[8],
  115. 'state': state,
  116. 'writer': case_info_list[10],
  117. 'assessor': case_info_list[11],
  118. 'executor': case_info_list[12],
  119. 'task': task,
  120. 'remark': case_info_list[13],
  121. 'software': task.plan.software,
  122. 'version': task.plan.version
  123. }
  124. case_info_dict_list.append(case_info)
  125. if id_error:
  126. return Response({'status': 50001, 'error': id_error})
  127. # 在创建测试用例阶段,每次创建都为新建,新建之前删除task下的所有测试用例
  128. if task.state == 0:
  129. cases_before = task.testcase_set.filter(delete=False)
  130. for case_before in cases_before:
  131. case_before.delete = True
  132. case_before.save()
  133. for case_info in case_info_dict_list:
  134. case_info['id'] = get_id(TestCase, "TestLaboratory_V1_Case_1")
  135. case_info['create_time'] = create_time
  136. case_info['update_time'] = update_time
  137. TestCase.objects.create(**case_info)
  138. elif task.state == 1:
  139. for i in range(len(case_info_dict_list)):
  140. case_info = case_info_dict_list[i]
  141. case = TestCase.objects.filter(delete=False, task=task, id_in_task=case_info['id_in_task'])
  142. if not case:
  143. return Response({'status': 50001, 'error': 'id为{}的测试用例不存在'.format(case_info['id_in_task'])})
  144. elif len(case) > 1:
  145. return Response({'status': 50001, 'error': 'id为{}的测试用例存在多个'.format(case_info['id_in_task'])})
  146. case.update(**case_info)
  147. case[0].save()
  148. file_id = get_id(FileManager, "TestLaboratory_V1_File_1")
  149. try:
  150. file_new = FileManager.objects.create(id=file_id, category="case_file",
  151. path=task_id + "/" + str(create_time) + "/" + file.name,
  152. create_time=create_time,
  153. update_time=update_time)
  154. except:
  155. return Response({'status': 50001, 'error': "文件路径写入数据库失败"})
  156. task.case_file = file_id
  157. task.case_all = len(task.testcase_set.filter(delete=False))
  158. task.case_not_execute = len(task.testcase_set.filter(delete=False, state=0))
  159. task.case_success = len(task.testcase_set.filter(delete=False, state=1))
  160. task.case_fail = len(task.testcase_set.filter(delete=False, state=2))
  161. print(task.case_all, task.case_not_execute, task.case_success, task.case_fail)
  162. task.save()
  163. executionInfo = [
  164. {
  165. 'operationName': 'filename',
  166. 'content': file.name
  167. },
  168. {
  169. 'operationName': 'fileurl',
  170. 'content': HTTP_HEAD + CASE_FILE_ROOT + file_new.path
  171. },
  172. {
  173. 'operationName': 'state',
  174. 'content': task.state
  175. },
  176. {
  177. 'operationName': 'updateTime',
  178. 'content': update_time
  179. },
  180. {
  181. 'operationName': 'totalNum',
  182. 'content': task.case_all
  183. },
  184. {
  185. 'operationName': 'notExecuteNum',
  186. 'content': task.case_not_execute
  187. },
  188. {
  189. 'operationName': 'successNum',
  190. 'content': task.case_success
  191. },
  192. {
  193. 'operationName': 'failNum',
  194. 'content': task.case_fail
  195. }
  196. ]
  197. executor, action, method = get_log(request)
  198. gen_log(action, "测试任务", task.title, method, executor)
  199. return Response(executionInfo)