123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147 |
- import datetime
- from rest_framework.response import Response
- from rest_framework.views import APIView
- from django.http import HttpResponse
- from TestLaboratory.settings import TASK_ROOT, CASE_TEMPLATE_ROOT, HTTP_HEAD
- from apps.file.models import FileManager
- from apps.log.models import get_log, gen_log
- from apps.plan.models import TestPlan
- from apps.task.models import TestTask, TestCase
- from apps.user.middleware.rolecontrol import RoleControl
- from apps.user.models import User
- from utils.util_add_id import get_id
- from utils.util_file.util_fileio import write_file
- import logging
- logger = logging.getLogger('django')
- class TaskListView(APIView):
- # 登录权限验证
- authentication_classes = []
- # create task
- @staticmethod
- @RoleControl
- def post(request, *args, **kwargs):
- plan_id = request.POST.get('plan_id')
- title = request.POST.get('title')
- description = request.POST.get('description')
- executors_id = request.POST.get('executors').split('&&')
- statement_files = request.FILES.getlist('files')
- # step1:判断所属软件及版本是否存在
- plan = TestPlan.objects.filter(id=plan_id, delete=False)
- if not plan:
- logger.error("测试计划已删除或不存在")
- return HttpResponse(status=404, content='测试计划已删除或不存在')
- # step1:创建文件的数据库字段
- fids = ''
- file_paths = []
- file_ids = []
- for file in statement_files:
- create_time = update_time = datetime.datetime.now()
- try:
- write_file(file, TASK_ROOT + "/" + str(create_time))
- except:
- logger.error("文件上传失败")
- return HttpResponse(status=500, content="文件上传失败")
- id_file = get_id(FileManager, "TestLaboratory_V1_File_1")
- try:
- file_new = FileManager.objects.create(id=id_file, category="plan_statement",
- path=str(create_time) + "/" + file.name,
- create_time=create_time,
- update_time=update_time)
- except:
- logger.error("文件路径写入数据库失败")
- return HttpResponse(status=500, content="文件路径写入数据库失败")
- file_ids.append(id_file)
- file_paths.append(TASK_ROOT + file_new.path)
- fids += id_file + '&&'
- fids = fids[:-2]
- task_files = {'case-template': CASE_TEMPLATE_ROOT, 'statement-fids': fids}
- create_time = update_time = datetime.datetime.now()
- return_value = []
- for executor_id in executors_id:
- executor = User.objects.filter(id=executor_id, delete=False)
- if not executor:
- logger.error("执行者用户不存在")
- return HttpResponse(status=404, content='执行者用户不存在')
- task_id = get_id(TestTask, 'TestLaboratory_V1_Task_1')
- task = TestTask.objects.create(id=task_id, title=title, description=description, state=0, plan_id=plan_id,
- statement_file=fids, executor_id=executor_id,
- create_time=create_time, update_time=update_time)
- executor, action, method = get_log(request)
- gen_log(action, "测试任务", task.title, method, executor)
- return_value.append({
- 'software_id': task.plan.software.id,
- 'software_name': task.plan.software.name,
- 'version': task.plan.version,
- 'id': task.id,
- 'executor_id': executor_id,
- 'title': title,
- 'description': description,
- 'state': 0,
- 'files': [{'file_id': id_file, 'file_name': file.name, 'file_url': HTTP_HEAD + file_path} for
- (id_file, file, file_path)
- in zip(file_ids, statement_files, file_paths)],
- 'case_template': HTTP_HEAD + CASE_TEMPLATE_ROOT
- })
- return Response(return_value)
- # view task_list
- @staticmethod
- @RoleControl
- def get(request, *args, **kwargs):
- title = request.GET.get('title')
- software_id = request.GET.get('software_id')
- state = request.GET.get('state')
- executor_id = request.GET.get('executor_id')
- sort = request.GET.get('sort')
- task_all = TestTask.objects.order_by('-create_time')
- if title:
- task_all = task_all.filter(title__contains=title)
- if software_id:
- task_all = task_all.filter(plan__software__id=software_id)
- if state:
- task_all = task_all.filter(state=state)
- if executor_id:
- task_all = task_all.filter(executor_id=executor_id)
- """
- sort=update_time, -update_time, create_time, -create_time, title, -title, state
- """
- if sort:
- task_all = task_all.order_by(sort)
- task_all = task_all.filter(delete=False)
- if not task_all:
- return Response([])
- info = []
- for task in task_all.all():
- fail_num = len(TestCase.objects.filter(task=task, state=2, delete=False))
- info.append({
- 'software_id': task.plan.software.id,
- 'software_name': task.plan.software.name,
- 'version': task.plan.version,
- # 'executor_id': task.executor_id,
- 'executor': {'id': task.executor.id, 'name': task.executor.username},
- 'plan_name': task.plan.title,
- 'id': task.id,
- 'title': task.title,
- 'description': task.description,
- 'state': task.state,
- 'fail_num': fail_num,
- 'create_time': task.create_time,
- 'update_time': task.update_time,
- })
- return Response(info)
|