interface.py 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183
  1. import os
  2. import time
  3. from flask import jsonify
  4. from accuracy.accuracytest import test_recall_accuracy, test_stamp
  5. from accuracy.accuracytest import test_missed
  6. from analyze.outanalyze import out_analyze
  7. from lattices.buildtree import switch_dict
  8. from parse.parse import parse_files, add_code_outside_func
  9. from parse.parse2nd import parse_files_2nd
  10. from models.funcnode import match_data_type
  11. from utils.fileio import load_json, write_csv, write_to_excel
  12. from utils.funclink import get_link, get_call_flow
  13. from utils.source import get_file_list
  14. from utils import log
  15. from utils.ERRORLIST import error_list
  16. logging = log.getlogger()
  17. def get_program_purpose(source, lattices, func_node_dict, node_list):
  18. program_name = source.replace("\\", '/').split("/")[-1]
  19. # 项目名称中有purpose 作为 项目的purpose
  20. purpose = match_data_type(program_name, lattices['purpose'])
  21. data_type = match_data_type(program_name, lattices['dataType'])
  22. if purpose[0][0] != "None":
  23. return purpose[0]
  24. # 项目名称中有datatype 找datatype对应的purpose作为项目的purpose
  25. elif data_type[0] != ("None", "none"):
  26. for private_info_pair in func_node_dict.values():
  27. for pair in private_info_pair:
  28. if pair[0] == data_type[0]:
  29. return data_type[1]
  30. else:
  31. for key, value in func_node_dict.items():
  32. if key.endswith("main") or key.endswith("__main__"):
  33. main_purpose = [item[1] for item in value]
  34. dict_num = {}
  35. for item in main_purpose:
  36. if item not in dict_num.keys():
  37. dict_num[item] = main_purpose.count(item)
  38. # print(dict_num)
  39. most_counter = sorted(dict_num.items(), key=lambda x: x[1], reverse=True)[0][0]
  40. return most_counter
  41. empty = []
  42. for l in node_list:
  43. empty.extend(l.purpose)
  44. dict_num = {}
  45. for item in empty:
  46. if item not in dict_num.keys():
  47. dict_num[item] = empty.count(item)
  48. # print(dict_num)
  49. if node_list:
  50. most_counter = sorted(dict_num.items(), key=lambda x: x[1], reverse=True)[0][0]
  51. else:
  52. most_counter = None
  53. return most_counter
  54. def test_projects(_path, _lattice):
  55. projects = os.listdir(_path)
  56. for project in projects:
  57. if project != ".idea" and project != ".DS_Store":
  58. project_path = _path + '/' + project
  59. stamp, func_node_dict = annotate(
  60. project_path,
  61. lattice, False)
  62. print(project)
  63. for s in stamp:
  64. print(s)
  65. save_path = 'analyze/output/' + project.replace('.py', '') + '.xls'
  66. write_to_excel(stamp, save_path)
  67. # print(os.listdir(_path))
  68. def annotate(source, lattices, entire=False):
  69. """
  70. Args:
  71. source: file_name which can be directory, file, zip
  72. lattices: _
  73. entire: 是否打印完整结果
  74. Returns:
  75. """
  76. logging.warning("Start getting file list...")
  77. lattices = switch_dict(lattices)
  78. source, file_list = get_file_list(source)
  79. # print(source, file_list)
  80. logging.warning("Start getting all operations for private info and methods call graph...")
  81. # 解析文件,获取隐私数据操作 和 函数调用图
  82. node_list, func_dict = parse_files(file_list, source, lattices)
  83. # print("func_dict", func_dict)
  84. if entire or not entire: # 当entire 为True时 要检测方法外代码行
  85. node_list = add_code_outside_func(file_list, lattices, node_list)
  86. # 递归获取所有方法可能的隐私数据和操作
  87. logging.warning("Start getting suspected data and operations in the first recursion...")
  88. func_node_dict = get_link(func_dict, source, file_list)
  89. # 第二遍递归
  90. logging.warning("Start second recursion...")
  91. node_list2nd = parse_files_2nd(file_list, source, func_node_dict,
  92. node_list)
  93. for node in node_list2nd:
  94. print(node)
  95. # try:
  96. # # 获取文件列表(文件名)
  97. #
  98. # except Exception as e:
  99. # # 因为有各种报错 包括编译错误SyntaxError 包循环依赖导致的KeyError 以及可能出现的其他error 具体信息都在e中 就直接返回e 而不返回具体文件名和行数
  100. #
  101. # logging.error(
  102. # "Error happened in " + e.__traceback__.tb_frame.f_globals["__file__"] + str(e.__traceback__.tb_lineno))
  103. # return {"correctness": False, "result": e}
  104. if error_list:
  105. return {"correctness": False, "result": error_list}
  106. # 将第二次递归对内容添加到列表
  107. node_list.extend(node_list2nd)
  108. # 去重
  109. node_list_no_repeated = []
  110. node_string = [node.__str__() for node in node_list]
  111. for node in node_list:
  112. if node_string.count(node.__str__()) == 1:
  113. node_list_no_repeated.append(node)
  114. else:
  115. node_string.remove(node.__str__())
  116. # for node in node_list_no_repeated:
  117. # print(node)
  118. # 计算准确率
  119. logging.warning("Start calculate the accuracy...")
  120. # 隐私扫描结果输出到json文件
  121. logging.warning("Output the result into file...")
  122. return_value = {"correctness": True, "result": {}}
  123. if not entire:
  124. out_analyze(node_list_no_repeated, source,
  125. "analyze/output2/" + source.replace('\\', '/').split("/")[-1] + ".xls", entire)
  126. call_flow = get_call_flow(source, file_list)
  127. anno = {}
  128. for key, value in func_node_dict.items():
  129. anno[key] = []
  130. for pair in value:
  131. item = {"dataType": {"value": pair[0], "confidence": 1}, "purpose": {"value": pair[1], "confidence": 1}}
  132. if item not in anno[key]:
  133. anno[key].append(item)
  134. return_value['result'].update(annotation=anno, call_flow=call_flow)
  135. else:
  136. # 当entire 为true
  137. purpose = get_program_purpose(source, lattices, func_node_dict, node_list_no_repeated)
  138. node_list_filtered = [item for item in node_list_no_repeated if
  139. item.purpose is not None and
  140. purpose in item.purpose]
  141. out_analyze(node_list_filtered, source, "analyze/output2/" + source.replace('\\', '/').split("/")[-1] + ".xls",
  142. entire)
  143. data_type_list = []
  144. for item in node_list_filtered:
  145. for data_type_each in item.private_word_list:
  146. if data_type_each != ("None", "none") and {"dataType": data_type_each[0],
  147. "confidence": 1} not in data_type_list:
  148. data_type_list.append({"dataType": data_type_each[0], "confidence": 1})
  149. return_value['result'] = {"dataType": data_type_list, "purpose": purpose}
  150. return return_value
  151. if __name__ == '__main__':
  152. data_type = load_json('lattices/datatype.json')
  153. purpose_dict = load_json('lattices/purpose.json')
  154. lattice = {'dataType': data_type, 'purpose': purpose_dict}
  155. # res = annotate("D:\\Download\\azure-storage-blob-master\\sdk\\storage\\azure-storage-file-share\\samples", lattice,
  156. # False)
  157. res = annotate("/Users/liufan/Documents/实验室/隐私扫描项目/SAP检测项目/cms/test", lattice, False)
  158. print('----------------annotation-------------------')
  159. for key, value in res['result']['annotation'].items():
  160. print(key, value)
  161. print('----------------call-flow-------------------')
  162. for key, value in res['result']['call_flow'].items():
  163. print(key, value)