tasklistview.py 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
  1. import datetime
  2. from rest_framework.response import Response
  3. from rest_framework.views import APIView
  4. from django.http import HttpResponse
  5. from TestLaboratory.settings import TASK_ROOT, CASE_TEMPLATE_ROOT, HTTP_HEAD
  6. from apps.company.models import Company
  7. from apps.file.models import FileManager
  8. from apps.log.models import get_log, gen_log
  9. from apps.plan.models import TestPlan
  10. from apps.task.models import TestTask, TestCase
  11. from apps.user.middleware.rolecontrol import RoleControl
  12. from apps.user.models import User
  13. from utils.util_add_id import get_id
  14. from utils.util_file.util_fileio import write_file
  15. import logging
  16. logger = logging.getLogger('django')
  17. class TaskListView(APIView):
  18. # 登录权限验证
  19. authentication_classes = []
  20. # create task
  21. @staticmethod
  22. @RoleControl
  23. def post(request, *args, **kwargs):
  24. plan_id = request.POST.get('plan_id')
  25. title = request.POST.get('title')
  26. description = request.POST.get('description')
  27. executors_id = request.POST.get('executors').split('&&')
  28. statement_files = request.FILES.getlist('files')
  29. # step1:判断所属软件及版本是否存在
  30. plan = TestPlan.objects.filter(id=plan_id, delete=False)
  31. if not plan:
  32. logger.error("测试计划已删除或不存在")
  33. return HttpResponse(status=404, content='测试计划已删除或不存在')
  34. # step1:创建文件的数据库字段
  35. fids = ''
  36. file_paths = []
  37. file_ids = []
  38. for file in statement_files:
  39. create_time = update_time = datetime.datetime.now()
  40. try:
  41. write_file(file, TASK_ROOT + "/" + str(create_time))
  42. except:
  43. logger.error("文件上传失败")
  44. return HttpResponse(status=500, content="文件上传失败")
  45. id_file = get_id(FileManager, "TestLaboratory_V1_File_1")
  46. try:
  47. file_new = FileManager.objects.create(id=id_file, category="plan_statement",
  48. path=str(create_time) + "/" + file.name,
  49. create_time=create_time,
  50. update_time=update_time)
  51. except:
  52. logger.error("文件路径写入数据库失败")
  53. return HttpResponse(status=500, content="文件路径写入数据库失败")
  54. file_ids.append(id_file)
  55. file_paths.append(TASK_ROOT + file_new.path)
  56. fids += id_file + '&&'
  57. fids = fids[:-2]
  58. task_files = {'case-template': CASE_TEMPLATE_ROOT, 'statement-fids': fids}
  59. create_time = update_time = datetime.datetime.now()
  60. return_value = []
  61. for executor_id in executors_id:
  62. executor = User.objects.filter(id=executor_id, delete=False)
  63. if not executor:
  64. logger.error("执行者用户不存在")
  65. return HttpResponse(status=404, content='执行者用户不存在')
  66. task_id = get_id(TestTask, 'TestLaboratory_V1_Task_1')
  67. task = TestTask.objects.create(id=task_id, title=title, description=description, state=0, plan_id=plan_id,
  68. statement_file=fids, executor_id=executor_id,
  69. create_time=create_time, update_time=update_time)
  70. executor, action, method = get_log(request)
  71. gen_log(action, "测试任务", task.title, method, executor)
  72. return_value.append({
  73. 'software_id': task.plan.software.id,
  74. 'software_name': task.plan.software.name,
  75. 'version': task.plan.version,
  76. 'id': task.id,
  77. 'executor_id': executor_id,
  78. 'title': title,
  79. 'description': description,
  80. 'state': 0,
  81. 'files': [{'file_id': id_file, 'file_name': file.name, 'file_url': HTTP_HEAD + file_path} for
  82. (id_file, file, file_path)
  83. in zip(file_ids, statement_files, file_paths)],
  84. 'case_template': HTTP_HEAD + CASE_TEMPLATE_ROOT
  85. })
  86. return Response(return_value)
  87. # view task_list
  88. @staticmethod
  89. @RoleControl
  90. def get(request, *args, **kwargs):
  91. title = request.GET.get('title')
  92. software_id = request.GET.get('software_id')
  93. state = request.GET.get('state')
  94. executor_id = request.GET.get('executor_id')
  95. sort = request.GET.get('sort')
  96. task_all = TestTask.objects.order_by('-create_time')
  97. if title:
  98. task_all = task_all.filter(title__contains=title)
  99. if software_id:
  100. task_all = task_all.filter(plan__software__id=software_id)
  101. if state:
  102. task_all = task_all.filter(state=state)
  103. if executor_id:
  104. task_all = task_all.filter(executor_id=executor_id)
  105. """
  106. sort=update_time, -update_time, create_time, -create_time, title, -title, state
  107. """
  108. if sort:
  109. task_all = task_all.order_by(sort)
  110. task_all = task_all.filter(delete=False)
  111. if not task_all:
  112. return Response([])
  113. info = []
  114. for task in task_all.all():
  115. company_name = ''
  116. try:
  117. company = Company.objects.get(id=task.company_id)
  118. company_name = company.name
  119. except:
  120. pass
  121. fail_num = len(TestCase.objects.filter(task=task, state=2, delete=False))
  122. info.append({
  123. 'software_id': task.plan.software.id,
  124. 'software_name': task.plan.software.name,
  125. 'version': task.plan.version,
  126. # 'executor_id': task.executor_id,
  127. 'executor': {'id': task.executor.id, 'name': task.executor.username},
  128. 'plan_name': task.plan.title,
  129. 'id': task.id,
  130. 'title': task.title,
  131. 'description': task.description,
  132. 'state': task.state,
  133. 'fail_num': fail_num,
  134. 'create_time': task.create_time,
  135. 'update_time': task.update_time,
  136. 'company_name':company_name
  137. })
  138. return Response(info)