beidoutasklist.py 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220
  1. import datetime
  2. import functools
  3. from rest_framework.response import Response
  4. from rest_framework.views import APIView
  5. from django.http import HttpResponse
  6. from TestLaboratory.settings import TASK_ROOT, CASE_TEMPLATE_ROOT, HTTP_HEAD
  7. from apps.file.models import FileManager
  8. from apps.log.models import get_log, gen_log
  9. from apps.plan.models import TestPlan
  10. from apps.task.models import TestTask, TestCase
  11. from apps.user.middleware.rolecontrol import RoleControl
  12. from apps.user.models import User
  13. from utils.util_add_id import get_id
  14. from utils.util_file.util_fileio import write_file
  15. import logging
  16. logger = logging.getLogger('django')
  17. class BeidouTaskCreateView(APIView):
  18. # 登录权限验证
  19. authentication_classes = []
  20. # create task
  21. @staticmethod
  22. @RoleControl
  23. def post(request, *args, **kwargs):
  24. plan_id = request.POST.get('plan_id')
  25. title = request.POST.get('task_name')
  26. description = request.POST.get('description')
  27. executors_id = request.POST.get('executors').split('&&')
  28. statement_files = request.FILES.getlist('files')
  29. task_type = request.POST.get('type')
  30. # step1:判断所属软件及版本是否存在
  31. plan = TestPlan.objects.filter(id=plan_id, delete=False)
  32. if not plan:
  33. logger.error("测试计划已删除或不存在")
  34. return HttpResponse(status=404, content='测试计划已删除或不存在')
  35. task = TestTask.objects.filter(plan_id=plan_id, title=title)
  36. if task:
  37. logger.error("测试任务已存在,请重新命名")
  38. return HttpResponse(status=404, content='测试任务已存在,请重新命名')
  39. # step1:创建文件的数据库字段
  40. fids = ''
  41. file_paths = []
  42. file_ids = []
  43. for file in statement_files:
  44. create_time = update_time = datetime.datetime.now()
  45. try:
  46. write_file(file, TASK_ROOT + "/" + str(create_time))
  47. except:
  48. logger.error("文件上传失败")
  49. return HttpResponse(status=500, content="文件上传失败")
  50. id_file = get_id(FileManager, "TestLaboratory_V1_File_1")
  51. try:
  52. file_new = FileManager.objects.create(id=id_file, category="plan_statement",
  53. path=str(create_time) + "/" + file.name,
  54. create_time=create_time,
  55. update_time=update_time)
  56. except:
  57. logger.error("文件路径写入数据库失败")
  58. return HttpResponse(status=500, content="文件路径写入数据库失败")
  59. file_ids.append(id_file)
  60. file_paths.append(TASK_ROOT + file_new.path)
  61. fids += id_file + '&&'
  62. fids = fids[:-2]
  63. create_time = update_time = datetime.datetime.now()
  64. return_value = []
  65. for executor_id in executors_id:
  66. executor = User.objects.filter(id=executor_id, delete=False)
  67. if not executor:
  68. logger.error("执行者用户不存在")
  69. return HttpResponse(status=404, content='执行者用户不存在')
  70. task_id = get_id(TestTask, 'TestLaboratory_V1_Task_1')
  71. task = TestTask.objects.create(id=task_id, title=title, description=description, state=0, plan_id=plan_id,
  72. statement_file=fids, executor_id=executor_id, type=task_type,
  73. create_time=create_time, update_time=update_time)
  74. return_value.append({
  75. 'software_id': task.plan.software.id,
  76. 'software_name': task.plan.software.name,
  77. 'version': task.plan.version,
  78. 'id': task.id,
  79. 'executor_id': executor_id,
  80. 'title': title,
  81. 'description': description,
  82. 'type': task_type,
  83. 'state': task.state,
  84. 'files': [{'file_id': id_file, 'file_name': file.name, 'file_url': HTTP_HEAD + file_path} for
  85. (id_file, file, file_path)
  86. in zip(file_ids, statement_files, file_paths)],
  87. })
  88. executor, action, method = get_log(request)
  89. gen_log(action, "北斗测试任务", task.title, method, executor)
  90. return Response(return_value)
  91. class BeidouTaskListView(APIView):
  92. authentication_classes = []
  93. @staticmethod
  94. @RoleControl
  95. def get(request, *args, **kwargs):
  96. title = request.GET.get('task_name')
  97. plan_id = request.GET.get('plan_id')
  98. software_id = request.GET.get('software_id')
  99. sort = request.GET.get('sort')
  100. task_all = TestTask.objects.order_by('-create_time')
  101. if title:
  102. task_all = task_all.filter(title__contains=title)
  103. if software_id:
  104. task_all = task_all.filter(plan__software__id=software_id)
  105. if plan_id:
  106. task_all = task_all.filter(plan__id=plan_id)
  107. """
  108. sort=update_time, -update_time, create_time, -create_time, title, -title, state
  109. """
  110. if sort:
  111. task_all = task_all.order_by(sort)
  112. task_all = task_all.filter(delete=False)
  113. if not task_all:
  114. return Response([])
  115. info = list()
  116. for task in task_all.all():
  117. info.append({
  118. 'software_id': task.plan.software.id,
  119. 'software_name': task.plan.software.name,
  120. 'version': task.plan.version,
  121. 'plan_id': task.plan.id,
  122. 'plan_name': task.plan.title,
  123. 'title': task.title,
  124. 'description': task.description,
  125. 'type': task.type,
  126. })
  127. info = functools.reduce(lambda x, y: y in x and x or x + [y], info, [])
  128. for i in range(len(info)):
  129. info[i]['task_id'] = 'TestLaboratory_V1_Task_1'
  130. return Response(info)
  131. class BeidouTaskInfoView(APIView):
  132. authentication_classes = []
  133. @staticmethod
  134. @RoleControl
  135. def get(request, *args, **kwargs):
  136. title = request.GET.get('task_name')
  137. plan_id = request.GET.get('plan_id')
  138. software_id = request.GET.get('software_id')
  139. state = request.GET.get('state')
  140. executor_id = request.GET.get('executor_id')
  141. task_type = request.GET.get('type')
  142. sort = request.GET.get('sort')
  143. task_all = TestTask.objects.order_by('-create_time')
  144. if title:
  145. task_all = task_all.filter(title__contains=title)
  146. if software_id:
  147. task_all = task_all.filter(plan__software__id=software_id)
  148. if plan_id:
  149. task_all = task_all.filter(plan__id=plan_id)
  150. if state:
  151. task_all = task_all.filter(state=state)
  152. if executor_id:
  153. task_all = task_all.filter(executor_id=executor_id)
  154. if task_type:
  155. task_all = task_all.filter(type=task_type)
  156. """
  157. sort=update_time, -update_time, create_time, -create_time, title, -title, state
  158. """
  159. if sort:
  160. task_all = task_all.order_by(sort)
  161. task_all = task_all.filter(delete=False)
  162. if not task_all:
  163. return Response([])
  164. info = list()
  165. for task in task_all.all():
  166. task_sids = task.statement_file.split('&&')
  167. task_file_names = []
  168. task_file_urls = []
  169. if task_sids[0]:
  170. for task_statement_id in task_sids:
  171. task_statement = FileManager.objects.get(id=task_statement_id)
  172. task_file_urls.append(TASK_ROOT + task_statement.path)
  173. task_file_names.append(task_statement.path.split('/')[-1])
  174. info.append({
  175. 'software_id': task.plan.software.id,
  176. 'software_name': task.plan.software.name,
  177. 'version': task.plan.version,
  178. 'plan_id': task.plan.id,
  179. 'plan_name': task.plan.title,
  180. 'id': task.id,
  181. 'title': task.title,
  182. 'description': task.description,
  183. 'type': task.type,
  184. 'state': task.state,
  185. 'statements': [{'file_id': file_sid, 'file_name': file_name, 'file_url': HTTP_HEAD + file_url} for
  186. file_sid, file_name, file_url in zip(task_sids, task_file_names, task_file_urls)],
  187. 'executor_id': task.executor.id,
  188. 'executor_name': task.executor.username,
  189. 'create_time': task.create_time,
  190. 'update_time': task.update_time,
  191. })
  192. return Response(list(info))