123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168 |
- import datetime
- import json
- import os
- from rest_framework.response import Response
- from rest_framework.views import APIView
- from django.http import HttpResponse
- from apps.file.models import FileManager
- from apps.log.models import get_log, gen_log
- from apps.plan.models import TestPlan
- from apps.software.models import Software
- from apps.user.middleware.rolecontrol import RoleControl
- from apps.user.models import User
- from utils.util_add_id import gen_next_id, get_id
- from TestLaboratory.settings import PLAN_ROOT
- import logging
- logger = logging.getLogger('django')
- class PlanListView(APIView):
- # 登录权限验证
- authentication_classes = []
- # create plan
- @staticmethod
- @RoleControl
- def post(request, *args, **kwargs):
- software_id = request.POST.get('software_id')
- version_number = request.POST.get('version_number')
- title = request.POST.get('title')
- description = request.POST.get('description')
- files = request.FILES.getlist('files')
- creator_id = request.POST.get('creator')
- # step1:判断所属软件及版本是否存在
- software = Software.objects.filter(id=software_id, delete=False)
- if not software:
- logger.error("软件已删除或不存在")
- return HttpResponse(status=404, content='软件已删除或不存在')
- software = software[0]
- # 判断版本是否存在
- software_version = software.version_set.filter(number=version_number, delete=False)
- if not software_version:
- logger.error("软件版本不存在或已删除,请重新选择")
- return HttpResponse(status=404, content='软件版本不存在或已删除,请重新选择')
- software_version = software_version[0]
- # 判断用户是否存在
- creator = User.objects.filter(id=creator_id, delete=False)
- if not creator:
- logger.error("用户不存在,创建失败")
- return HttpResponse(status=404, content='用户不存在,创建失败')
- creator = creator[0]
- # step5:设置创建时间和修改时间
- create_time = update_time = datetime.datetime.now()
- # step6:创建测试计划的数据库记录
- id_plan = get_id(TestPlan, "TestLaboratory_V1_Plan_1")
- # step7:创建文件的数据库字段
- fids = ''
- file_paths = []
- file_ids = []
- for file in files:
- try:
- file_dir = PLAN_ROOT + "/" + id_plan + "/" + str(create_time)
- if not os.path.exists(file_dir):
- os.makedirs(file_dir)
- file_path = file_dir + "/" + file.name
- with open(file_path, 'wb') as f:
- f.write(file.read())
- f.close()
- except:
- logger.error("文件上传失败")
- return HttpResponse(status=500, content="文件上传失败")
- if not FileManager.objects.last():
- id_file = "TestLaboratory_V1_File_1"
- else:
- id_file = gen_next_id(FileManager.objects.order_by('create_time').last().id)
- try:
- file_new = FileManager.objects.create(id=id_file, category="plan_statement",
- path=id_plan + "/" + str(create_time) + "/" + file.name,
- create_time=create_time,
- update_time=update_time)
- except:
- logger.error("文件路径写入数据库失败")
- return HttpResponse(status=500, content="文件路径写入数据库失败")
- file_ids.append(id_file)
- file_paths.append(PLAN_ROOT + file_new.path)
- fids += id_file + "&&"
- fids = fids[:-2]
- try:
- plan = TestPlan.objects.create(id=id_plan, title=title, description=description, state='0',
- creator=creator, software=software, version=software_version.number,
- statement_file=fids,
- create_time=create_time, update_time=update_time)
- except:
- logger.error("测试计划写入数据库失败")
- return HttpResponse(status=500, content='测试计划写入数据库失败')
- executor, action, method = get_log(request)
- gen_log(action, "测试计划", plan.title, method, executor)
- return Response({
- 'software_id': software.id,
- 'software_name': software.name,
- 'version': version_number,
- 'creator_id': creator.id,
- 'creator': creator.username,
- 'id': plan.id,
- 'title': title,
- 'description': description,
- 'state': '执行中',
- 'files': [{'file_id': file_id, 'file_name': file.name, 'file_url': file_path} for (file_id, file, file_path)
- in zip(file_ids, files, file_paths)]
- })
- # view plan_list
- @staticmethod
- @RoleControl
- def get(request, *args, **kwargs):
- title = request.GET.get('title')
- software_id = request.GET.get('software_id')
- state = request.GET.get('state')
- creator_id = request.GET.get('creator_id')
- sort = request.GET.get('sort')
- plan_all = TestPlan.objects.order_by('-create_time')
- if title:
- plan_all = plan_all.filter(title__contains=title)
- if software_id:
- plan_all = plan_all.filter(software__id__contains=software_id)
- if state:
- plan_all = plan_all.filter(state=state)
- if creator_id:
- plan_all = plan_all.filter(creator__id__contains=creator_id)
- """
- sort=update_time, -update_time, create_time, -create_time, title, -title, state
- """
- if sort:
- plan_all = plan_all.order_by(sort)
- plan_all = plan_all.filter(delete=False)
- if not plan_all:
- return Response([])
- info = []
- for plan in plan_all.all():
- info.append({
- 'software_id': plan.software.id,
- 'software_name': plan.software.name,
- 'version': plan.version,
- 'creator_id': plan.creator.id,
- 'creator': plan.creator.username,
- 'id': plan.id,
- 'title': plan.title,
- 'description': plan.description,
- 'state': plan.state,
- 'create_time': plan.create_time,
- 'update_time': plan.update_time,
- })
- return Response(info)
|