123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259 |
- import datetime
- import os
- from django.db.models import Q
- from django.http import HttpResponse, HttpResponseBadRequest
- from rest_framework.response import Response
- from rest_framework.views import APIView
- from TestLaboratory.settings import CASE_FILE_ROOT, CASE_FILE_EXPORT, HTTP_HEAD
- from apps.file.models import FileManager
- from apps.log.models import get_log, gen_log
- from apps.task.models import TestTask, TestCase
- from apps.user.middleware.rolecontrol import RoleControl
- from util_xlsx import load_xlsx, write_to_xlsx
- from utils.util_add_id import get_id
- from utils.util_file.util_fileio import write_file
- import logging
- logger = logging.getLogger('django')
- class CaseView(APIView):
- # 登录权限验证
- authentication_classes = []
- # get testcase
- @staticmethod
- # @RoleControl
- def get(request, task_id, *args, **kwargs):
- export_type = request.GET.get('type')
- task = TestTask.objects.filter(id=task_id, delete=False)
- if not task:
- logger.error("测试任务已删除或不存在")
- return HttpResponse(status=404, content='测试任务已删除或不存在')
- task = task[0]
- cases = None
- file_name = None
- if export_type == '0':
- cases = TestCase.objects.filter(task=task, delete=False)
- file_name = task.title+'-全部测试用例.xls'
- elif export_type == '1':
- cases = TestCase.objects.filter(~Q(state=0), task=task, delete=False)
- file_name = task.title+'-已执行测试用例.xls'
- elif export_type == '2':
- cases = TestCase.objects.filter(state=2, task=task, delete=False)
- file_name = task.title+'-不合格测试用例.xls'
- if not cases:
- logger.error("无符合要求的测试用例")
- return HttpResponse(status=404, content='无符合要求的测试用例')
- update_time = datetime.datetime.now()
- file_dir = CASE_FILE_EXPORT + task_id + "/" + str(update_time)
- if not os.path.exists(file_dir):
- os.makedirs(file_dir)
- file_path = file_dir + "/" + file_name
- cases_info = [['被测软件ID:', task.plan.software.id],
- ['被测软件名:', task.plan.software.name],
- ['被测软件版本', task.plan.version],
- ['编号', '名称', '前置条件', '优先级', '测试环境', '测试类型', '测试步骤', '预期结果', '实际结果', '状态',
- '编写人', '评审员', '执行人', '备注']]
- for case in cases:
- state = '未执行'
- if case.state == 1:
- state = '执行成功'
- elif case.state == 2:
- state = '执行失败'
- cases_info.append([case.id_in_task, case.name, case.requisite, case.priority, case.environment,
- case.type, case.process, case.expected_result, case.actual_result, state,
- case.writer, case.assessor, case.executor, case.remark])
- sio = write_to_xlsx(file_path, cases_info)
- sio.seek(0)
- response = HttpResponse(sio.getvalue(), content_type='application/vnd.ms-excel')
- # response['Content-Disposition'] = 'attachment; filename*=UTF-8''{}'.format(escape_uri_path(file_name))
- response['Content-Disposition'] = 'attachment; filename="{0}"'.format(file_name).encode('utf-8', 'ISO-8859-1')
- response.write(sio.getvalue())
- # executor, action, method = get_log(request)
- # gen_log(action, "测试任务", task.title, method, executor)
- return response
- class CaseExecute(APIView):
- # 登录权限验证
- authentication_classes = []
- # execute task
- @staticmethod
- @RoleControl
- def post(request, task_id, *args, **kwargs):
- executor, action, method = get_log(request)
- task = TestTask.objects.filter(id=task_id, delete=False)
- if not task:
- logger.error("测试任务已删除或不存在")
- return HttpResponseBadRequest(status=404, content='测试任务已删除或不存在')
- task = task[0]
- file = request.FILES.get('file')
- create_time = update_time = datetime.datetime.now()
- write_file(file, CASE_FILE_ROOT + task_id + "/" + str(create_time))
- return_info = load_xlsx(
- CASE_FILE_ROOT + task_id + "/" + str(create_time) + '/' + file.name,
- task)
- if return_info['code'] != 200:
- logger.error(str(return_info['info']))
- return Response({'status': return_info['code'],
- 'error': str(return_info['info'])})
- case_info_dict_list = []
- case_infos = return_info['info']
- total_num = len(case_infos)
- not_execute_num = success_num = fail_num = 0
- id_error = []
- for i in range(len(case_infos)):
- case_info_list = case_infos[i]
- cases_same_id = TestCase.objects.filter(id=task.id,
- id_in_task=str(
- case_info_list[0]),
- delete=False)
- if cases_same_id:
- case_name = cases_same_id[0].name
- if case_name != case_info_list[1]:
- id_error.append('第{}行id为{}的测试用例名称与前版本不同,请检查错误'
- .format(i + 3, cases_same_id[0].id_in_task))
- state = 0
- if case_info_list[9] == '未执行':
- not_execute_num += 1
- if case_info_list[9] == '执行成功':
- state = 1
- success_num += 1
- elif case_info_list[9] == '执行失败':
- state = 2
- fail_num += 1
- case_info = {
- 'id_in_task': str(case_info_list[0]),
- 'name': case_info_list[1],
- 'requisite': case_info_list[2],
- 'priority': case_info_list[3],
- 'environment': case_info_list[4],
- 'type': case_info_list[5],
- 'process': case_info_list[6],
- 'expected_result': case_info_list[7],
- 'actual_result': case_info_list[8],
- 'state': state,
- 'writer': case_info_list[10],
- 'assessor': case_info_list[11],
- 'executor': case_info_list[12],
- 'task': task,
- 'remark': case_info_list[13],
- 'software': task.plan.software,
- 'version': task.plan.version
- }
- case_info_dict_list.append(case_info)
- if id_error:
- logger.error("id错误")
- return Response({'status': 50001, 'error': id_error})
- # 在创建测试用例阶段,每次创建都为新建,新建之前删除task下的所有测试用例
- if task.state == 0:
- cases_before = task.testcase_set.filter(delete=False)
- for case_before in cases_before:
- case_before.delete = True
- case_before.save()
- for case_info in case_info_dict_list:
- case_info['id'] = get_id(TestCase, "TestLaboratory_V1_Case_1")
- case_info['create_time'] = create_time
- case_info['update_time'] = update_time
- case_info['executor_id_id'] = executor.id
- TestCase.objects.create(**case_info)
- elif task.state == 1:
- for i in range(len(case_info_dict_list)):
- case_info = case_info_dict_list[i]
- case = TestCase.objects.filter(delete=False, task=task,
- id_in_task=case_info[
- 'id_in_task'])
- if not case:
- logger.error(
- 'id为{}的测试用例不存在'.format(case_info['id_in_task']))
- return Response({'status': 50001,
- 'error': 'id为{}的测试用例不存在'.format(
- case_info['id_in_task'])})
- elif len(case) > 1:
- logger.error(
- 'id为{}的测试用例存在多个'.format(case_info['id_in_task']))
- return Response({'status': 50001,
- 'error': 'id为{}的测试用例存在多个'.format(
- case_info['id_in_task'])})
- case.update(**case_info)
- case[0].save()
- file_id = get_id(FileManager, "TestLaboratory_V1_File_1")
- try:
- file_new = FileManager.objects.create(id=file_id,
- category="case_file",
- path=task_id + "/" + str(
- create_time) + "/" + file.name,
- create_time=create_time,
- update_time=update_time)
- except:
- logger.error("文件路径写入数据库失败")
- return Response({'status': 50001, 'error': "文件路径写入数据库失败"})
- task.case_file = file_id
- task.case_all = len(task.testcase_set.filter(delete=False))
- task.case_not_execute = len(
- task.testcase_set.filter(delete=False, state=0))
- task.case_success = len(task.testcase_set.filter(delete=False, state=1))
- task.case_fail = len(task.testcase_set.filter(delete=False, state=2))
- print(task.case_all, task.case_not_execute, task.case_success,
- task.case_fail)
- task.save()
- executionInfo = [
- {
- 'operationName': 'filename',
- 'content': file.name
- },
- {
- 'operationName': 'fileurl',
- 'content': HTTP_HEAD + CASE_FILE_ROOT + file_new.path
- },
- {
- 'operationName': 'state',
- 'content': task.state
- },
- {
- 'operationName': 'updateTime',
- 'content': update_time
- },
- {
- 'operationName': 'totalNum',
- 'content': task.case_all
- },
- {
- 'operationName': 'notExecuteNum',
- 'content': task.case_not_execute
- },
- {
- 'operationName': 'successNum',
- 'content': task.case_success
- },
- {
- 'operationName': 'failNum',
- 'content': task.case_fail
- }
- ]
- gen_log(action, "测试任务", task.title, method, executor)
- return Response(executionInfo)
|