import datetime from rest_framework.response import Response from rest_framework.views import APIView from django.http import HttpResponse from TestLaboratory.settings import TASK_ROOT, CASE_TEMPLATE_ROOT, HTTP_HEAD, PLAN_ROOT, CASE_FILE_ROOT, SOFTWARE_ROOT from apps.file.models import FileManager from apps.log.models import get_log, gen_log from apps.plan.models import TestPlan from apps.task.models import TestTask, TestCase, BeidouCase from apps.user.middleware.rolecontrol import RoleControl from apps.user.models import User from utils.util_add_id import get_id from utils.util_file.util_fileio import write_file import logging logger = logging.getLogger('django') def model_to_dict(model_obj, ignore=()): ''' 将一个model对象转换成字典 ''' att_dict = {} for field in model_obj._meta.fields: name = field.attname # 获取字段名 value = getattr(model_obj, name) # 获取对象属性 if name in ignore: continue # print(name,value) # 检查传入的数据能否被序列化 if isinstance(value, (int, float)) and name not in ['delete']: att_dict[name] = value # 生成字典 return att_dict class BeidouTaskEditView(APIView): # 登录权限验证 authentication_classes = [] # create task @staticmethod @RoleControl def post(request, task_id=None, *args, **kwargs): task_name_old = request.POST.get('task_name_old') task_name_new = request.POST.get('task_name_new') description = request.POST.get('description') statement_files = request.FILES.getlist('files') plan_id = request.POST.get('plan_id') # step1:判断所属软件及版本是否存在 plan = TestPlan.objects.filter(id=plan_id, delete=False) if not plan: logger.error("测试计划已删除或不存在") return HttpResponse(status=404, content='测试计划已删除或不存在') task_all = TestTask.objects.filter(plan_id=plan_id, title=task_name_old) if not task_all: logger.error("测试任务不存在") return HttpResponse(status=404, content='测试任务不存在') for task in task_all.all(): if task_name_new: task.title = task_name_new if description: task.description = description fids = '' for file in statement_files: create_time = update_time = datetime.datetime.now() try: write_file(file, TASK_ROOT + "/" + task.id + "/" + str(create_time)) except: logger.error("文件上传失败") return HttpResponse(status=500, content="文件上传失败") id_file = get_id(FileManager, "TestLaboratory_V1_File_1") try: file_new = FileManager.objects.create(id=id_file, category="plan_statement", path=task.id + "/" + str( create_time) + "/" + file.name, create_time=create_time, update_time=update_time) except: logger.error("文件路径写入数据库失败") return HttpResponse(status=500, content="文件路径写入数据库失败") fids += id_file + '&&' fids = fids[:-2] task_sids = task.statement_file.split('&&') if task_sids is not None and task_sids[0] != '': for statement_id in task_sids: statement = FileManager.objects.get(id=statement_id) statement.delete = True statement.save() task.statement_file = fids task.save() executor, action, method = get_log(request) gen_log(action, "北斗测试任务", task_name_new, method, executor) return HttpResponse("编辑已保存") @staticmethod @RoleControl def delete(request, *args, **kwargs): task_name = request.POST.get('task_name') plan_id = request.POST.get('plan_id') # step1:判断所属软件及版本是否存在 plan = TestPlan.objects.filter(id=plan_id, delete=False) if not plan: logger.error("测试计划已删除或不存在") return HttpResponse(status=404, content='测试计划已删除或不存在') task_all = TestTask.objects.filter(plan_id=plan_id, title=task_name) if not task_all: logger.error("测试任务已删除或不存在") return HttpResponse(status=404, content='测试任务已删除或不存在') for task in task_all.all(): task_sids = task.statement_file.split('&&') if task_sids[0]: for statement_id in task_sids: statement = FileManager.objects.get(id=statement_id) statement.delete = True statement.save() task.delete = True task.save() executor, action, method = get_log(request) gen_log(action, "北斗测试任务", task.title, method, executor) return HttpResponse("删除完成") class BeidouTaskView(APIView): # 登录权限验证 authentication_classes = [] # create task @staticmethod @RoleControl def get(request, task_id=None, *args, **kwargs): task = TestTask.objects.filter(id=task_id, delete=False) if not task: logger.error("测试计划已删除或不存在") return HttpResponse(status=404, content='测试计划已删除或不存在') task = task[0] version = task.plan.software.version_set.filter(number=task.plan.version, delete=False) if not version: logger.error("测试任务对应软件版本不存在或已删除") return HttpResponse(status=404, content='测试任务对应软件版本不存在或已删除') version = version[0] version_fid = version.version_file version_file = FileManager.objects.filter(id=version_fid) if not version_file: logger.error("无法定位测试任务对应的软件实体") return HttpResponse(status=404, content='无法定位测试任务对应的软件实体') version_file = version_file[0] plan_sids = task.plan.statement_file.split('&&') plan_file_names = [] plan_file_urls = [] if plan_sids[0]: for plan_statement_id in plan_sids: plan_statement = FileManager.objects.get(id=plan_statement_id) plan_file_urls.append(PLAN_ROOT + plan_statement.path) plan_file_names.append(plan_statement.path.split('/')[-1]) task_sids = task.statement_file.split('&&') task_file_names = [] task_file_urls = [] if task_sids[0]: for task_statement_id in task_sids: task_statement = FileManager.objects.get(id=task_statement_id) task_file_urls.append(TASK_ROOT + task_statement.path) task_file_names.append(task_statement.path.split('/')[-1]) case_info = {} beidou_case = BeidouCase.objects.filter(task_id=task_id) if beidou_case: beidou_case = beidou_case[0] case_info = model_to_dict(beidou_case) return Response({ 'software_id': task.plan.software.id, 'software_name': task.plan.software.name, 'version': task.plan.version, 'version_file': {'file_name': version_file.path.split('/')[-1], 'file_url': HTTP_HEAD + SOFTWARE_ROOT + version_file.path}, 'plan': { 'id': task.plan.id, 'title': task.plan.title, 'description': task.plan.description, 'state': task.plan.state, 'statements': [{'file_id': file_sid, 'file_name': file_name, 'file_url': HTTP_HEAD + file_url} for file_sid, file_name, file_url in zip(plan_sids, plan_file_names, plan_file_urls)], 'creator': {'executor_id': task.plan.creator.id, 'executor_name': task.plan.creator.username}, }, 'task': { 'id': task.id, 'title': task.title, 'description': task.description, 'state': task.state, 'statements': [{'file_id': file_sid, 'file_name': file_name, 'file_url': HTTP_HEAD + file_url} for file_sid, file_name, file_url in zip(task_sids, task_file_names, task_file_urls)], 'executor': {'id': task.executor.id, 'name': task.executor.username}, 'result': case_info }, 'create_time': task.create_time, 'update_time': task.update_time, })