caseview.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259
  1. import datetime
  2. import os
  3. from django.db.models import Q
  4. from django.http import HttpResponse, HttpResponseBadRequest
  5. from rest_framework.response import Response
  6. from rest_framework.views import APIView
  7. from TestLaboratory.settings import CASE_FILE_ROOT, CASE_FILE_EXPORT, HTTP_HEAD
  8. from apps.file.models import FileManager
  9. from apps.log.models import get_log, gen_log
  10. from apps.task.models import TestTask, TestCase
  11. from apps.user.middleware.rolecontrol import RoleControl
  12. from util_xlsx import load_xlsx, write_to_xlsx
  13. from utils.util_add_id import get_id
  14. from utils.util_file.util_fileio import write_file
  15. import logging
  16. logger = logging.getLogger('django')
  17. class CaseView(APIView):
  18. # 登录权限验证
  19. authentication_classes = []
  20. # get testcase
  21. @staticmethod
  22. # @RoleControl
  23. def get(request, task_id, *args, **kwargs):
  24. export_type = request.GET.get('type')
  25. task = TestTask.objects.filter(id=task_id, delete=False)
  26. if not task:
  27. logger.error("测试任务已删除或不存在")
  28. return HttpResponse(status=404, content='测试任务已删除或不存在')
  29. task = task[0]
  30. cases = None
  31. file_name = None
  32. if export_type == '0':
  33. cases = TestCase.objects.filter(task=task, delete=False)
  34. file_name = task.title+'-全部测试用例.xls'
  35. elif export_type == '1':
  36. cases = TestCase.objects.filter(~Q(state=0), task=task, delete=False)
  37. file_name = task.title+'-已执行测试用例.xls'
  38. elif export_type == '2':
  39. cases = TestCase.objects.filter(state=2, task=task, delete=False)
  40. file_name = task.title+'-不合格测试用例.xls'
  41. if not cases:
  42. logger.error("无符合要求的测试用例")
  43. return HttpResponse(status=404, content='无符合要求的测试用例')
  44. update_time = datetime.datetime.now()
  45. file_dir = CASE_FILE_EXPORT + task_id + "/" + str(update_time)
  46. if not os.path.exists(file_dir):
  47. os.makedirs(file_dir)
  48. file_path = file_dir + "/" + file_name
  49. cases_info = [['被测软件ID:', task.plan.software.id],
  50. ['被测软件名:', task.plan.software.name],
  51. ['被测软件版本', task.plan.version],
  52. ['编号', '名称', '前置条件', '优先级', '测试环境', '测试类型', '测试步骤', '预期结果', '实际结果', '状态',
  53. '编写人', '评审员', '执行人', '备注']]
  54. for case in cases:
  55. state = '未执行'
  56. if case.state == 1:
  57. state = '执行成功'
  58. elif case.state == 2:
  59. state = '执行失败'
  60. cases_info.append([case.id_in_task, case.name, case.requisite, case.priority, case.environment,
  61. case.type, case.process, case.expected_result, case.actual_result, state,
  62. case.writer, case.assessor, case.executor, case.remark])
  63. sio = write_to_xlsx(file_path, cases_info)
  64. sio.seek(0)
  65. response = HttpResponse(sio.getvalue(), content_type='application/vnd.ms-excel')
  66. # response['Content-Disposition'] = 'attachment; filename*=UTF-8''{}'.format(escape_uri_path(file_name))
  67. response['Content-Disposition'] = 'attachment; filename="{0}"'.format(file_name).encode('utf-8', 'ISO-8859-1')
  68. response.write(sio.getvalue())
  69. # executor, action, method = get_log(request)
  70. # gen_log(action, "测试任务", task.title, method, executor)
  71. return response
  72. class CaseExecute(APIView):
  73. # 登录权限验证
  74. authentication_classes = []
  75. # execute task
  76. @staticmethod
  77. @RoleControl
  78. def post(request, task_id, *args, **kwargs):
  79. executor, action, method = get_log(request)
  80. task = TestTask.objects.filter(id=task_id, delete=False)
  81. if not task:
  82. logger.error("测试任务已删除或不存在")
  83. return HttpResponseBadRequest(status=404, content='测试任务已删除或不存在')
  84. task = task[0]
  85. file = request.FILES.get('file')
  86. create_time = update_time = datetime.datetime.now()
  87. write_file(file, CASE_FILE_ROOT + task_id + "/" + str(create_time))
  88. return_info = load_xlsx(
  89. CASE_FILE_ROOT + task_id + "/" + str(create_time) + '/' + file.name,
  90. task)
  91. if return_info['code'] != 200:
  92. logger.error(str(return_info['info']))
  93. return Response({'status': return_info['code'],
  94. 'error': str(return_info['info'])})
  95. case_info_dict_list = []
  96. case_infos = return_info['info']
  97. total_num = len(case_infos)
  98. not_execute_num = success_num = fail_num = 0
  99. id_error = []
  100. for i in range(len(case_infos)):
  101. case_info_list = case_infos[i]
  102. cases_same_id = TestCase.objects.filter(id=task.id,
  103. id_in_task=str(
  104. case_info_list[0]),
  105. delete=False)
  106. if cases_same_id:
  107. case_name = cases_same_id[0].name
  108. if case_name != case_info_list[1]:
  109. id_error.append('第{}行id为{}的测试用例名称与前版本不同,请检查错误'
  110. .format(i + 3, cases_same_id[0].id_in_task))
  111. state = 0
  112. if case_info_list[9] == '未执行':
  113. not_execute_num += 1
  114. if case_info_list[9] == '执行成功':
  115. state = 1
  116. success_num += 1
  117. elif case_info_list[9] == '执行失败':
  118. state = 2
  119. fail_num += 1
  120. case_info = {
  121. 'id_in_task': str(case_info_list[0]),
  122. 'name': case_info_list[1],
  123. 'requisite': case_info_list[2],
  124. 'priority': case_info_list[3],
  125. 'environment': case_info_list[4],
  126. 'type': case_info_list[5],
  127. 'process': case_info_list[6],
  128. 'expected_result': case_info_list[7],
  129. 'actual_result': case_info_list[8],
  130. 'state': state,
  131. 'writer': case_info_list[10],
  132. 'assessor': case_info_list[11],
  133. 'executor': case_info_list[12],
  134. 'task': task,
  135. 'remark': case_info_list[13],
  136. 'software': task.plan.software,
  137. 'version': task.plan.version
  138. }
  139. case_info_dict_list.append(case_info)
  140. if id_error:
  141. logger.error("id错误")
  142. return Response({'status': 50001, 'error': id_error})
  143. # 在创建测试用例阶段,每次创建都为新建,新建之前删除task下的所有测试用例
  144. if task.state == 0:
  145. cases_before = task.testcase_set.filter(delete=False)
  146. for case_before in cases_before:
  147. case_before.delete = True
  148. case_before.save()
  149. for case_info in case_info_dict_list:
  150. case_info['id'] = get_id(TestCase, "TestLaboratory_V1_Case_1")
  151. case_info['create_time'] = create_time
  152. case_info['update_time'] = update_time
  153. case_info['executor_id_id'] = executor.id
  154. TestCase.objects.create(**case_info)
  155. elif task.state == 1:
  156. for i in range(len(case_info_dict_list)):
  157. case_info = case_info_dict_list[i]
  158. case = TestCase.objects.filter(delete=False, task=task,
  159. id_in_task=case_info[
  160. 'id_in_task'])
  161. if not case:
  162. logger.error(
  163. 'id为{}的测试用例不存在'.format(case_info['id_in_task']))
  164. return Response({'status': 50001,
  165. 'error': 'id为{}的测试用例不存在'.format(
  166. case_info['id_in_task'])})
  167. elif len(case) > 1:
  168. logger.error(
  169. 'id为{}的测试用例存在多个'.format(case_info['id_in_task']))
  170. return Response({'status': 50001,
  171. 'error': 'id为{}的测试用例存在多个'.format(
  172. case_info['id_in_task'])})
  173. case.update(**case_info)
  174. case[0].save()
  175. file_id = get_id(FileManager, "TestLaboratory_V1_File_1")
  176. try:
  177. file_new = FileManager.objects.create(id=file_id,
  178. category="case_file",
  179. path=task_id + "/" + str(
  180. create_time) + "/" + file.name,
  181. create_time=create_time,
  182. update_time=update_time)
  183. except:
  184. logger.error("文件路径写入数据库失败")
  185. return Response({'status': 50001, 'error': "文件路径写入数据库失败"})
  186. task.case_file = file_id
  187. task.case_all = len(task.testcase_set.filter(delete=False))
  188. task.case_not_execute = len(
  189. task.testcase_set.filter(delete=False, state=0))
  190. task.case_success = len(task.testcase_set.filter(delete=False, state=1))
  191. task.case_fail = len(task.testcase_set.filter(delete=False, state=2))
  192. print(task.case_all, task.case_not_execute, task.case_success,
  193. task.case_fail)
  194. task.save()
  195. executionInfo = [
  196. {
  197. 'operationName': 'filename',
  198. 'content': file.name
  199. },
  200. {
  201. 'operationName': 'fileurl',
  202. 'content': HTTP_HEAD + CASE_FILE_ROOT + file_new.path
  203. },
  204. {
  205. 'operationName': 'state',
  206. 'content': task.state
  207. },
  208. {
  209. 'operationName': 'updateTime',
  210. 'content': update_time
  211. },
  212. {
  213. 'operationName': 'totalNum',
  214. 'content': task.case_all
  215. },
  216. {
  217. 'operationName': 'notExecuteNum',
  218. 'content': task.case_not_execute
  219. },
  220. {
  221. 'operationName': 'successNum',
  222. 'content': task.case_success
  223. },
  224. {
  225. 'operationName': 'failNum',
  226. 'content': task.case_fail
  227. }
  228. ]
  229. gen_log(action, "测试任务", task.title, method, executor)
  230. return Response(executionInfo)