import datetime import os from django.db.models import Q from django.http import HttpResponse, HttpResponseBadRequest from rest_framework.response import Response from rest_framework.views import APIView from TestLaboratory.settings import CASE_FILE_ROOT, CASE_FILE_EXPORT, HTTP_HEAD from apps.file.models import FileManager from apps.log.models import get_log, gen_log from apps.task.models import TestTask, TestCase from apps.user.middleware.rolecontrol import RoleControl from util_xlsx import load_xlsx, write_to_xlsx from utils.util_add_id import get_id from utils.util_file.util_fileio import write_file import logging logger = logging.getLogger('django') class CaseView(APIView): # 登录权限验证 authentication_classes = [] # get testcase @staticmethod # @RoleControl def get(request, task_id, *args, **kwargs): export_type = request.GET.get('type') task = TestTask.objects.filter(id=task_id, delete=False) if not task: logger.error("测试任务已删除或不存在") return HttpResponse(status=404, content='测试任务已删除或不存在') task = task[0] cases = None file_name = None if export_type == '0': cases = TestCase.objects.filter(task=task, delete=False) file_name = task.title+'-全部测试用例.xls' elif export_type == '1': cases = TestCase.objects.filter(~Q(state=0), task=task, delete=False) file_name = task.title+'-已执行测试用例.xls' elif export_type == '2': cases = TestCase.objects.filter(state=2, task=task, delete=False) file_name = task.title+'-不合格测试用例.xls' if not cases: logger.error("无符合要求的测试用例") return HttpResponse(status=404, content='无符合要求的测试用例') update_time = datetime.datetime.now() file_dir = CASE_FILE_EXPORT + task_id + "/" + str(update_time) if not os.path.exists(file_dir): os.makedirs(file_dir) file_path = file_dir + "/" + file_name cases_info = [['被测软件ID:', task.plan.software.id], ['被测软件名:', task.plan.software.name], ['被测软件版本', task.plan.version], ['编号', '名称', '前置条件', '优先级', '测试环境', '测试类型', '测试步骤', '预期结果', '实际结果', '状态', '编写人', '评审员', '执行人', '备注']] for case in cases: state = '未执行' if case.state == 1: state = '执行成功' elif case.state == 2: state = '执行失败' cases_info.append([case.id_in_task, case.name, case.requisite, case.priority, case.environment, case.type, case.process, case.expected_result, case.actual_result, state, case.writer, case.assessor, case.executor, case.remark]) sio = write_to_xlsx(file_path, cases_info) sio.seek(0) response = HttpResponse(sio.getvalue(), content_type='application/vnd.ms-excel') # response['Content-Disposition'] = 'attachment; filename*=UTF-8''{}'.format(escape_uri_path(file_name)) response['Content-Disposition'] = 'attachment; filename="{0}"'.format(file_name).encode('utf-8', 'ISO-8859-1') response.write(sio.getvalue()) # executor, action, method = get_log(request) # gen_log(action, "测试任务", task.title, method, executor) return response class CaseExecute(APIView): # 登录权限验证 authentication_classes = [] # execute task @staticmethod @RoleControl def post(request, task_id, *args, **kwargs): executor, action, method = get_log(request) task = TestTask.objects.filter(id=task_id, delete=False) if not task: logger.error("测试任务已删除或不存在") return HttpResponseBadRequest(status=404, content='测试任务已删除或不存在') task = task[0] file = request.FILES.get('file') create_time = update_time = datetime.datetime.now() write_file(file, CASE_FILE_ROOT + task_id + "/" + str(create_time)) return_info = load_xlsx( CASE_FILE_ROOT + task_id + "/" + str(create_time) + '/' + file.name, task) if return_info['code'] != 200: logger.error(str(return_info['info'])) return Response({'status': return_info['code'], 'error': str(return_info['info'])}) case_info_dict_list = [] case_infos = return_info['info'] total_num = len(case_infos) not_execute_num = success_num = fail_num = 0 id_error = [] for i in range(len(case_infos)): case_info_list = case_infos[i] cases_same_id = TestCase.objects.filter(id=task.id, id_in_task=str( case_info_list[0]), delete=False) if cases_same_id: case_name = cases_same_id[0].name if case_name != case_info_list[1]: id_error.append('第{}行id为{}的测试用例名称与前版本不同,请检查错误' .format(i + 3, cases_same_id[0].id_in_task)) state = 0 if case_info_list[9] == '未执行': not_execute_num += 1 if case_info_list[9] == '执行成功': state = 1 success_num += 1 elif case_info_list[9] == '执行失败': state = 2 fail_num += 1 case_info = { 'id_in_task': str(case_info_list[0]), 'name': case_info_list[1], 'requisite': case_info_list[2], 'priority': case_info_list[3], 'environment': case_info_list[4], 'type': case_info_list[5], 'process': case_info_list[6], 'expected_result': case_info_list[7], 'actual_result': case_info_list[8], 'state': state, 'writer': case_info_list[10], 'assessor': case_info_list[11], 'executor': case_info_list[12], 'task': task, 'remark': case_info_list[13], 'software': task.plan.software, 'version': task.plan.version } case_info_dict_list.append(case_info) if id_error: logger.error("id错误") return Response({'status': 50001, 'error': id_error}) # 在创建测试用例阶段,每次创建都为新建,新建之前删除task下的所有测试用例 if task.state == 0: cases_before = task.testcase_set.filter(delete=False) for case_before in cases_before: case_before.delete = True case_before.save() for case_info in case_info_dict_list: case_info['id'] = get_id(TestCase, "TestLaboratory_V1_Case_1") case_info['create_time'] = create_time case_info['update_time'] = update_time case_info['executor_id_id'] = executor.id TestCase.objects.create(**case_info) elif task.state == 1: for i in range(len(case_info_dict_list)): case_info = case_info_dict_list[i] case = TestCase.objects.filter(delete=False, task=task, id_in_task=case_info[ 'id_in_task']) if not case: logger.error( 'id为{}的测试用例不存在'.format(case_info['id_in_task'])) return Response({'status': 50001, 'error': 'id为{}的测试用例不存在'.format( case_info['id_in_task'])}) elif len(case) > 1: logger.error( 'id为{}的测试用例存在多个'.format(case_info['id_in_task'])) return Response({'status': 50001, 'error': 'id为{}的测试用例存在多个'.format( case_info['id_in_task'])}) case.update(**case_info) case[0].save() file_id = get_id(FileManager, "TestLaboratory_V1_File_1") try: file_new = FileManager.objects.create(id=file_id, category="case_file", path=task_id + "/" + str( create_time) + "/" + file.name, create_time=create_time, update_time=update_time) except: logger.error("文件路径写入数据库失败") return Response({'status': 50001, 'error': "文件路径写入数据库失败"}) task.case_file = file_id task.case_all = len(task.testcase_set.filter(delete=False)) task.case_not_execute = len( task.testcase_set.filter(delete=False, state=0)) task.case_success = len(task.testcase_set.filter(delete=False, state=1)) task.case_fail = len(task.testcase_set.filter(delete=False, state=2)) print(task.case_all, task.case_not_execute, task.case_success, task.case_fail) task.save() executionInfo = [ { 'operationName': 'filename', 'content': file.name }, { 'operationName': 'fileurl', 'content': HTTP_HEAD + CASE_FILE_ROOT + file_new.path }, { 'operationName': 'state', 'content': task.state }, { 'operationName': 'updateTime', 'content': update_time }, { 'operationName': 'totalNum', 'content': task.case_all }, { 'operationName': 'notExecuteNum', 'content': task.case_not_execute }, { 'operationName': 'successNum', 'content': task.case_success }, { 'operationName': 'failNum', 'content': task.case_fail } ] gen_log(action, "测试任务", task.title, method, executor) return Response(executionInfo)