import datetime import logging import os from django.http import HttpResponse from rest_framework.response import Response from rest_framework.views import APIView from TestLaboratory.settings import PLAN_ROOT from apps.file.models import FileManager from apps.log.models import get_log, gen_log from apps.plan.models import TestPlan from apps.software.models import Software from apps.user.middleware.rolecontrol import RoleControl from apps.user.models import User from utils.util_add_id import gen_next_id, get_id logger = logging.getLogger('django') class PlanListView(APIView): # 登录权限验证 authentication_classes = [] # create plan @staticmethod @RoleControl def post(request, *args, **kwargs): software_id = request.POST.get('software_id') version_number = request.POST.get('version_number') title = request.POST.get('title') plan_type = request.POST.get('type') description = request.POST.get('description') files = request.FILES.getlist('files') creator_id = request.POST.get('creator') # step1:判断所属软件及版本是否存在 software = Software.objects.filter(id=software_id, delete=False) if not software: logger.error("软件已删除或不存在") return HttpResponse(status=404, content='软件已删除或不存在') software = software[0] # 判断版本是否存在 software_version = software.version_set.filter(number=version_number, delete=False) if not software_version: logger.error("软件版本不存在或已删除,请重新选择") return HttpResponse(status=404, content='软件版本不存在或已删除,请重新选择') software_version = software_version[0] # 判断用户是否存在 creator = User.objects.filter(id=creator_id, delete=False) if not creator: logger.error("用户不存在,创建失败") return HttpResponse(status=404, content='用户不存在,创建失败') creator = creator[0] # step5:设置创建时间和修改时间 create_time = update_time = datetime.datetime.now() # step6:创建测试计划的数据库记录 id_plan = get_id(TestPlan, "TestLaboratory_V1_Plan_1") # step7:创建文件的数据库字段 fids = '' file_paths = [] file_ids = [] for file in files: try: file_dir = PLAN_ROOT + "/" + id_plan + "/" + str(create_time) if not os.path.exists(file_dir): os.makedirs(file_dir) file_path = file_dir + "/" + file.name with open(file_path, 'wb') as f: f.write(file.read()) f.close() except: logger.error("文件上传失败") return HttpResponse(status=500, content="文件上传失败") if not FileManager.objects.last(): id_file = "TestLaboratory_V1_File_1" else: id_file = gen_next_id(FileManager.objects.order_by('create_time').last().id) try: file_new = FileManager.objects.create(id=id_file, category="plan_statement", path=id_plan + "/" + str(create_time) + "/" + file.name, create_time=create_time, update_time=update_time) except: logger.error("文件路径写入数据库失败") return HttpResponse(status=500, content="文件路径写入数据库失败") file_ids.append(id_file) file_paths.append(PLAN_ROOT + file_new.path) fids += id_file + "&&" fids = fids[:-2] try: plan = TestPlan.objects.create(id=id_plan, title=title, description=description, state='0', type=plan_type, creator=creator, software=software, version=software_version.number, statement_file=fids, create_time=create_time, update_time=update_time) except: logger.error("测试计划写入数据库失败") return HttpResponse(status=500, content='测试计划写入数据库失败') executor, action, method = get_log(request) gen_log(action, "测试计划", plan.title, method, executor) return Response({ 'software_id': software.id, 'software_name': software.name, 'version': version_number, 'creator_id': creator.id, 'creator': creator.username, 'id': plan.id, 'title': title, 'type': plan_type, 'description': description, 'state': '执行中', 'files': [{'file_id': file_id, 'file_name': file.name, 'file_url': file_path} for (file_id, file, file_path) in zip(file_ids, files, file_paths)] }) # view plan_list @staticmethod @RoleControl def get(request, *args, **kwargs): title = request.GET.get('title') software_id = request.GET.get('software_id') state = request.GET.get('state') creator_id = request.GET.get('creator_id') sort = request.GET.get('sort') plan_all = TestPlan.objects.order_by('-create_time') if title: plan_all = plan_all.filter(title__contains=title) if software_id: plan_all = plan_all.filter(software__id=software_id) if state: plan_all = plan_all.filter(state=state) if creator_id: plan_all = plan_all.filter(creator__id__contains=creator_id) """ sort=update_time, -update_time, create_time, -create_time, title, -title, state """ if sort: plan_all = plan_all.order_by(sort) plan_all = plan_all.filter(delete=False) if not plan_all: return Response([]) info = [] for plan in plan_all.all(): info.append({ 'software_id': plan.software.id, 'software_name': plan.software.name, 'version': plan.version, 'creator_id': plan.creator.id, 'creator': plan.creator.username, 'id': plan.id, 'title': plan.title, 'type': plan.type, 'description': plan.description, 'state': plan.state, 'create_time': plan.create_time, 'update_time': plan.update_time, }) return Response(info)