utils.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. import ast
  2. import json
  3. import logging
  4. import os
  5. import random
  6. logger = logging.getLogger(__name__)
  7. def walk_files(path, endpoint='.py'):
  8. file_list = []
  9. for root, dirs, files in os.walk(path):
  10. for file in files:
  11. file_path = os.path.join(root, file)
  12. if file_path.endswith(endpoint):
  13. file_list.append(file_path)
  14. return file_list
  15. def write_to_json(file_path, output_path):
  16. types = os.listdir(file_path)
  17. json_input = list()
  18. for directory in types:
  19. for file_name in walk_files(file_path + "/" + directory):
  20. try:
  21. with open(file_name, 'r', encoding='utf-8') as file:
  22. content = file.read()
  23. except Exception as e:
  24. logger.error("failed write: " + file_name)
  25. pass
  26. json_input.append({
  27. "func": content,
  28. "type": directory,
  29. "type_index": types.index(directory),
  30. "path": file_name
  31. })
  32. random.shuffle(json_input)
  33. length = len(json_input)
  34. with open(output_path + "train.jsonl", 'w+') as train_out:
  35. train_json = json.dumps(json_input[:int(7 * length / 10)])
  36. train_out.write(train_json)
  37. with open(output_path + "test.jsonl", 'w+') as test_out:
  38. test_json = json.dumps(json_input[int(7 * length / 10): int(9 * length / 10)])
  39. test_out.write(test_json)
  40. with open(output_path + "eval.jsonl", 'w+') as eval_out:
  41. eval_json = json.dumps(json_input[int(9 * length / 10):])
  42. eval_out.write(eval_json)
  43. def write_content_to_file(content, file_path):
  44. if not os.path.exists(file_path[:file_path.rindex("/")]):
  45. os.makedirs(file_path[:file_path.rindex("/")])
  46. try:
  47. with open(file_path, 'w+') as file_out:
  48. file_out.write(content)
  49. except:
  50. print(file_path)
  51. pass
  52. def split_file(file_dir, output_dir, endpoint=".py"):
  53. if not os.path.exists(output_dir):
  54. os.makedirs(output_dir)
  55. for file_path in walk_files(file_dir, endpoint):
  56. with open(file_path, 'r',encoding='utf8') as file:
  57. content = file.read()
  58. try:
  59. root = ast.parse(content)
  60. except Exception as e:
  61. print("错误明细:", e.__class__.__name__, e, file_path)
  62. continue
  63. file_id = 1
  64. for node in root.body:
  65. if isinstance(node, ast.FunctionDef):
  66. new_file_name = file_path.replace(file_dir, output_dir).replace(".py", "_" + str(file_id) + ".py")
  67. write_content_to_file(ast.get_source_segment(content, node), new_file_name)
  68. file_id += 1
  69. elif isinstance(node, ast.ClassDef):
  70. for son in node.body:
  71. if isinstance(son, ast.FunctionDef):
  72. new_file_name = file_path.replace(file_dir, output_dir).replace(".py",
  73. "_" + str(file_id) + ".py")
  74. write_content_to_file(ast.get_source_segment(content, son), new_file_name)
  75. file_id += 1
  76. def split_file_by_func(file_path):
  77. """
  78. :param file_path:
  79. :return: [func_name: (func_content, func_node of ast)]
  80. """
  81. with open(file_path, 'r',encoding='utf8') as file:
  82. content = file.read()
  83. root = None
  84. func_content = {}
  85. try:
  86. root = ast.parse(content)
  87. for node in root.body:
  88. if isinstance(node, ast.FunctionDef):
  89. func_content[node.name] = (ast.get_source_segment(content, node), node)
  90. elif isinstance(node, ast.ClassDef):
  91. class_name = node.name
  92. for son in node.body:
  93. if isinstance(son, ast.FunctionDef):
  94. func_name = class_name + '.' + node.name
  95. func_content[func_name] = (ast.get_source_segment(content, son), son)
  96. except Exception as e:
  97. logger.error(e.__class__.__name__ + " " + file_path)
  98. return func_content
  99. def load_json(json_file):
  100. with open(json_file, 'r') as load_f:
  101. load_dict = json.load(load_f)
  102. return load_dict
  103. def write_json(json_file, data):
  104. with open(json_file, 'w') as file:
  105. file.write(json.dumps(data))
  106. if __name__ == '__main__':
  107. write_to_json("dataset/origin", "dataset/")
  108. # for directory in os.listdir("dataset/origin"):
  109. # print(directory, len(walk_files("dataset/origin/" + directory)))