import datetime from rest_framework.response import Response from rest_framework.views import APIView from django.http import HttpResponse from TestLaboratory.settings import TASK_ROOT, CASE_TEMPLATE_ROOT, HTTP_HEAD from apps.file.models import FileManager from apps.log.models import get_log, gen_log from apps.plan.models import TestPlan from apps.task.models import TestTask, TestCase from apps.user.middleware.rolecontrol import RoleControl from apps.user.models import User from utils.util_add_id import get_id from utils.util_file.util_fileio import write_file import logging logger = logging.getLogger('django') class TaskListView(APIView): # 登录权限验证 authentication_classes = [] # create task @staticmethod @RoleControl def post(request, *args, **kwargs): plan_id = request.POST.get('plan_id') title = request.POST.get('title') description = request.POST.get('description') executors_id = request.POST.get('executors').split('&&') statement_files = request.FILES.getlist('files') # step1:判断所属软件及版本是否存在 plan = TestPlan.objects.filter(id=plan_id, delete=False) if not plan: logger.error("测试计划已删除或不存在") return HttpResponse(status=404, content='测试计划已删除或不存在') # step1:创建文件的数据库字段 fids = '' file_paths = [] file_ids = [] for file in statement_files: create_time = update_time = datetime.datetime.now() try: write_file(file, TASK_ROOT + "/" + str(create_time)) except: logger.error("文件上传失败") return HttpResponse(status=500, content="文件上传失败") id_file = get_id(FileManager, "TestLaboratory_V1_File_1") try: file_new = FileManager.objects.create(id=id_file, category="plan_statement", path=str(create_time) + "/" + file.name, create_time=create_time, update_time=update_time) except: logger.error("文件路径写入数据库失败") return HttpResponse(status=500, content="文件路径写入数据库失败") file_ids.append(id_file) file_paths.append(TASK_ROOT + file_new.path) fids += id_file + '&&' fids = fids[:-2] task_files = {'case-template': CASE_TEMPLATE_ROOT, 'statement-fids': fids} create_time = update_time = datetime.datetime.now() return_value = [] for executor_id in executors_id: executor = User.objects.filter(id=executor_id, delete=False) if not executor: logger.error("执行者用户不存在") return HttpResponse(status=404, content='执行者用户不存在') task_id = get_id(TestTask, 'TestLaboratory_V1_Task_1') task = TestTask.objects.create(id=task_id, title=title, description=description, state=0, plan_id=plan_id, statement_file=fids, executor_id=executor_id, create_time=create_time, update_time=update_time) executor, action, method = get_log(request) gen_log(action, "测试任务", task.title, method, executor) return_value.append({ 'software_id': task.plan.software.id, 'software_name': task.plan.software.name, 'version': task.plan.version, 'id': task.id, 'executor_id': executor_id, 'title': title, 'description': description, 'state': 0, 'files': [{'file_id': id_file, 'file_name': file.name, 'file_url': HTTP_HEAD + file_path} for (id_file, file, file_path) in zip(file_ids, statement_files, file_paths)], 'case_template': HTTP_HEAD + CASE_TEMPLATE_ROOT }) return Response(return_value) # view task_list @staticmethod @RoleControl def get(request, *args, **kwargs): title = request.GET.get('title') software_id = request.GET.get('software_id') state = request.GET.get('state') executor_id = request.GET.get('executor_id') sort = request.GET.get('sort') task_all = TestTask.objects.order_by('-create_time') if title: task_all = task_all.filter(title__contains=title) if software_id: task_all = task_all.filter(plan__software__id=software_id) if state: task_all = task_all.filter(state=state) if executor_id: task_all = task_all.filter(executor_id=executor_id) """ sort=update_time, -update_time, create_time, -create_time, title, -title, state """ if sort: task_all = task_all.order_by(sort) task_all = task_all.filter(delete=False) if not task_all: return Response([]) info = [] for task in task_all.all(): fail_num = len(TestCase.objects.filter(task=task, state=2, delete=False)) info.append({ 'software_id': task.plan.software.id, 'software_name': task.plan.software.name, 'version': task.plan.version, # 'executor_id': task.executor_id, 'executor': {'id': task.executor.id, 'name': task.executor.username}, 'plan_name': task.plan.title, 'id': task.id, 'title': task.title, 'description': task.description, 'state': task.state, 'fail_num': fail_num, 'create_time': task.create_time, 'update_time': task.update_time, }) return Response(info)