funclink.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324
  1. import ast
  2. import copy
  3. import logging
  4. import os
  5. import re
  6. import shutil
  7. from _ast import AST
  8. from utils.ERRORLIST import error_list
  9. import numpy as np
  10. import graphviz
  11. import pyan
  12. from utils.fileio import verify_file_list
  13. class ProjectAnalyzer:
  14. def __init__(self, project, file_list):
  15. tmpfile = "./tmp.gv"
  16. self._clazzs = find_all_class(file_list, project=project)
  17. file_list = verify_file_list(file_list)
  18. init_file = os.path.join(project, "__init__.py")
  19. if os.path.isfile(init_file):
  20. tmp_dir = os.path.join(project, "private_info_scanning_tempt")
  21. os.mkdir(tmp_dir)
  22. shutil.move(init_file, tmp_dir)
  23. graghviz(tmpfile, file_list)
  24. shutil.move(os.path.join(tmp_dir, "__init__.py"), init_file)
  25. shutil.rmtree(tmp_dir)
  26. else:
  27. graghviz(tmpfile, file_list)
  28. # graghviz(tmpfile, file_list)
  29. # _methods:函数名
  30. # _method_matrix:直接调用矩阵
  31. # 行:被调用者
  32. # 列:调用者
  33. self._methods, self._method_matrix = analyze_gv(tmpfile, project=project, endpoint=".py",
  34. class_exclude=self._clazzs)
  35. # _matrix:可达性矩阵
  36. # _mediate:中间节点矩阵
  37. self._matrix = copy.deepcopy(self._method_matrix)
  38. self._mediate = algorithm(self._matrix)
  39. def get_methods(self):
  40. return self._methods
  41. def get_class(self):
  42. return self._clazzs
  43. def find_direct_callee_func(self, target_func=None):
  44. dimension = len(self._methods)
  45. if target_func is None:
  46. # 找出所有函数的直接callee
  47. result = {}
  48. for i in range(dimension):
  49. result[self._methods[i]] = []
  50. for i in range(dimension):
  51. for j in range(dimension):
  52. if self._method_matrix[j][i] > 0:
  53. result[self._methods[i]].append(self._methods[j])
  54. return result
  55. else:
  56. # 找出特定函数的直接callee 通过find
  57. # index = -1
  58. # for i in self._methods:
  59. # if i.endswith(target_func):
  60. # index = self._methods.index(i)
  61. # break
  62. index = self._methods.index(target_func)
  63. if index < 0:
  64. raise Exception("no such method")
  65. result = []
  66. for i in range(dimension):
  67. if self._method_matrix[i][index] > 0:
  68. result.append(self._methods[i])
  69. return result
  70. def find_direct_call_func(self, target_func=None):
  71. dimension = len(self._methods)
  72. if target_func is None:
  73. # 找出所有函数的直接caller
  74. result = {}
  75. for i in range(dimension):
  76. result[self._methods[i]] = []
  77. for i in range(dimension):
  78. for j in range(dimension):
  79. if self._method_matrix[i][j] > 0:
  80. result[self._methods[i]].append(self._methods[j])
  81. return result
  82. else:
  83. # 找出特定函数的直接caller
  84. index = self._methods.index(target_func)
  85. if index < 0:
  86. raise Exception("no such method")
  87. result = []
  88. for i in range(dimension):
  89. if self._method_matrix[index][i] > 0:
  90. result.append(self._methods[i])
  91. return result
  92. def find_all_call_func(self, target_func):
  93. dimension = len(self._methods)
  94. index = self._methods.index(target_func)
  95. if index < 0:
  96. raise Exception("no such method")
  97. result = []
  98. for i in range(dimension):
  99. if self._matrix[index][i] > 0:
  100. callpath = algorithm2(self._matrix, self._mediate, index, i)
  101. result.append((self._methods[i], list(reversed([self._methods[x] for x in callpath]))))
  102. return result
  103. def find_all_class(file_list: list, project="", endpoint=".py"):
  104. result = []
  105. for f in file_list:
  106. with open(f, 'r', encoding='utf8') as file:
  107. lines = file.readlines()
  108. try:
  109. tree = ast.parse(''.join(lines))
  110. except SyntaxError as e:
  111. e.filename = f
  112. error_list.append(e)
  113. pass
  114. for node in tree.body:
  115. part_result = find_class(node)
  116. for i in range(len(part_result)):
  117. pa = part_result[i]
  118. pa = (f[len(project) + 1:len(f) - len(endpoint)] + os.path.sep + pa).replace(os.path.sep, ".")
  119. part_result[i] = pa
  120. result.extend(part_result)
  121. return result
  122. def find_class(node: AST):
  123. if not isinstance(node, ast.ClassDef):
  124. return []
  125. else:
  126. result = [node.name]
  127. for son in node.body:
  128. result.extend(find_class(son))
  129. return result
  130. def analyze_gv(gv, project="", endpoint=".py", class_exclude=None):
  131. method_adjacency = []
  132. methods = []
  133. clazzs = [] if class_exclude is None else class_exclude
  134. with open(gv, 'r') as gv_file:
  135. # 遍历找到所有的函数依赖关系
  136. gv_file.seek(0, 0)
  137. for line in gv_file.readlines():
  138. is_dependency = re.search(r'style="solid"', line)
  139. # is_import_file = re.search(r'__')
  140. if is_dependency is None:
  141. # 函数定义,不是函数依赖
  142. continue
  143. match_group = re.search(r'([a-zA-Z0-9_]+)\s*->\s*([a-zA-Z0-9_]+)', line)
  144. if match_group is not None:
  145. origin = match_group.group(1).replace("__", ".")
  146. target = match_group.group(2).replace("__", ".")
  147. # 去除私有方法
  148. flag1 = match_group.group(1).find("____") >= 0
  149. flag2 = match_group.group(2).find("____") >= 0
  150. # 去除类
  151. flag3 = origin in clazzs
  152. flag4 = target in clazzs
  153. # 去除依赖文件
  154. flag5 = os.path.isfile(project + "/" + match_group.group(1).replace("__", "/") + endpoint)
  155. flag6 = os.path.isfile(project + "/" + match_group.group(2).replace("__", "/") + endpoint)
  156. if not flag1 and not flag3:
  157. if flag5:
  158. origin += ".__main__"
  159. if origin not in methods:
  160. methods.append(origin)
  161. if not flag2 and not flag4:
  162. if flag6:
  163. target += ".__main__"
  164. if target not in methods:
  165. methods.append(target)
  166. if flag6 or flag1 or flag2 or flag3 or flag4:
  167. continue
  168. method_adjacency.append((methods.index(target),
  169. methods.index(origin)))
  170. method_num = len(methods)
  171. method_matrix = [[0] * method_num for _ in range(method_num)]
  172. for adjacency in method_adjacency:
  173. method_matrix[adjacency[0]][adjacency[1]] = 1
  174. return methods, method_matrix
  175. """
  176. matrix1:原始矩阵(直接调用关系)
  177. return:中间节点矩阵
  178. """
  179. def algorithm(matrix):
  180. # 可达性矩阵
  181. dimension = len(matrix)
  182. # 中间节点矩阵
  183. mediate_matrix = [[0] * dimension for _ in range(dimension)]
  184. # n三次方次迭代
  185. for i in range(dimension):
  186. for j in range(dimension):
  187. if matrix[j][i] > 0:
  188. for k in range(dimension):
  189. if matrix[j][k] == 0 and matrix[i][k] > 0:
  190. matrix[j][k] = 1
  191. mediate_matrix[j][k] = i
  192. return mediate_matrix
  193. """
  194. matrix1:可达性矩阵
  195. matrix2:中间节点矩阵
  196. return:可达路径(包括起点终点)
  197. """
  198. def algorithm2(matrix1, matrix2, start, end):
  199. if matrix1[start][end] == 0:
  200. # 不可达
  201. return ()
  202. else:
  203. mediate = matrix2[start][end]
  204. if mediate == 0:
  205. # 可达,无中间节点
  206. return start, end
  207. # 可达,有中间节点
  208. left = list(algorithm2(matrix1, matrix2, start, mediate))
  209. right = list(algorithm2(matrix1, matrix2, mediate, end))
  210. left.pop()
  211. left.extend(right)
  212. return left
  213. def test_algorithm():
  214. matrix = [[0, 1, 0, 0], [0, 0, 1, 0], [0, 0, 0, 1], [0, 0, 0, 0]]
  215. algorithm(matrix)
  216. print(matrix)
  217. def graghviz(output, args: list):
  218. try:
  219. res = pyan.create_callgraph(args, format="dot")
  220. with open(output, 'w') as f:
  221. f.write(res)
  222. except Exception as e:
  223. logging.error(str(e))
  224. error_list.append(e)
  225. pass
  226. def walk_files_path(path, endpoint='.py'):
  227. file_list = []
  228. for root, dirs, files in os.walk(path):
  229. for file in files:
  230. file_path = os.path.join(root, file)
  231. if file_path.endswith(endpoint):
  232. file_list.append(file_path)
  233. return file_list
  234. def get_link(func_node_dict, source_dir, file_list):
  235. func_node_dict_all = {}
  236. for key in func_node_dict.keys():
  237. func_node_dict_all[key] = func_node_dict[key]
  238. pa = ProjectAnalyzer(source_dir, file_list)
  239. for method in func_node_dict.keys():
  240. if method in pa.get_methods():
  241. for method_link in (pa.find_all_call_func(method)):
  242. if func_node_dict_all[method][0][0] == "None" and method_link[0] in func_node_dict.keys():
  243. private_info_without_usage = [info for info in func_node_dict_all[method_link[0]] if
  244. info[1] != "None"]
  245. for pair in func_node_dict_all[method]:
  246. # private_info 添加
  247. private_info_each = [(private[0], pair[1]) for private in func_node_dict_all[method_link[0]]
  248. if
  249. private[1] == "None" and pair[1] != "None"]
  250. private_info_without_usage.extend(private_info_each)
  251. func_node_dict_all[method_link[0]] = private_info_without_usage
  252. else:
  253. for pair in func_node_dict_all[method]:
  254. if method_link[0] in func_node_dict.keys():
  255. func_node_dict_all[method_link[0]].append(pair)
  256. else:
  257. # print(method, method_link[0])
  258. func_node_dict_all[method_link[0]] = [pair]
  259. return func_node_dict_all
  260. def get_call_flow(source_dir, file_list):
  261. func_flow = {}
  262. pa = ProjectAnalyzer(source_dir, file_list)
  263. for method in pa.get_methods():
  264. func_call = pa.find_direct_callee_func(method)
  265. if func_call:
  266. func_flow[method] = func_call
  267. return func_flow
  268. if __name__ == '__main__':
  269. project = "/Users/liufan/Documents/实验室/隐私扫描项目/SAP检测项目/mini"
  270. file_list = walk_files_path(project)
  271. p = ProjectAnalyzer(project, file_list)
  272. for method in p.get_methods():
  273. print(method, p.find_all_call_func(method))
  274. print(p.get_methods())
  275. # graghviz("program.gv", ["D:\\study\\python\\test\\main.py","D:\\study\\python\\test\\live.py"])