123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606 |
- import math
- import copy
- """
- Applied: (TO DOCUMENT)
- TODO:
- 1) Remove ever dict.keys() used and replace it with dict because dict.keys() creates a list of keys in memory.
- (More costly than looking through the dictionary itself! Further information below.)
- https://stackoverflow.com/questions/4730993/python-key-in-dict-keys-performance-for-large-dictionaries
- """
- DATASET_BY_ATTRIB_DICT = {"outlook": ["Sunny", "Sunny", "Overcast", "Rain", "Rain", "Rain", "Overcast",
- "Sunny", "Sunny", "Rain", "Sunny", "Overcast", "Overcast", "Rain"],
- "temperature": ["Hot", "Hot", "Hot", "Mild", "Cool", "Cool", "Cool",
- "Mild", "Cool", "Mild", "Mild", "Mild", "Hot", "Mild"],
- "humidity": ["High", "High", "High", "High", "Normal", "Normal", "Normal",
- "High", "Normal", "Normal", "Normal", "High", "Normal", "High"],
- "wind": ["Weak", "Strong", "Weak", "Weak", "Weak", "Strong", "Strong",
- "Weak", "Weak", "Weak", "Strong", "Strong", "Weak", "Strong"]}
- TARGET_ATTRIB_LIST = ["No", "No", "Yes", "Yes", "Yes", "No", "Yes", "No", "Yes", "Yes", "Yes", "Yes", "Yes", "No"]
- TARGET_ATTRIB_NAME = "play tennis"
- TRAIN_DATA_SIZE = len(TARGET_ATTRIB_LIST)
- class Node:
- def __init__(self, node_name, derived_nodes=[]):
- self.node_name = node_name
- self.derived_nodes = derived_nodes
- class ID3DecisionTree:
- def __init__(self):
- self.root_node = None
-
-
-
-
- self.active_branch_nodes = []
-
-
- self.linked_attributes = []
-
-
-
-
-
-
-
-
-
- """ Example structure: (where 'AN'-attribute name; 'ON'-outcome name; 'TON'-target outcome name)
- dataset_occurrence_dict = {"AN 1": {"ON 1": {"TON 1": 1, "TON 2": 2},
- "ON 2": {"TON 1": 0, "TON 2": 1},
- "ON 3": {"TON 1": 0, "TON 2": 1}
- },
- "AN 2": {"ON 1": {"TON 1": 4, "TON 2": 0},
- "ON 2": {"TON 1": 1, "TON 2": 0}
- }
- }
-
- The example above can be read, for attribute 1 - AN1, there are 3 outcomes - ON1, ON2, ON3.
- The target has 2 possible outcomes TON1 and TON2. Those values are being tracked/accounted for,
- for each possible outcome of each attribute. For AN1, ON1 there is 1 occurrence of TON1 and 2 occurrences of
- TON2. For AN1, ON2 there are 0 occurrences of TON1, and 1 occurrence of TON2 therefore the answer for this
- branch is TON2. Same for AN1, ON3 - answer TON2. If all the occurrences of TON1 and TON2 for attrib 1 (AN1)
- are summed, we get the number of entries in the given data set.
- """
- self.dataset_occurrence_dict = {}
-
- """ Construct dataset distribution/occurrence dictionary - "dataset_occurrence_dict".
- PARAMETERS
- :param (dict) dataset_by_attrib_dict
- :param (list) target_list """
- def generate_occurrences(self, dataset_by_attrib_dict, target_list):
-
-
- for attrib_name in dataset_by_attrib_dict.keys():
-
- self.dataset_occurrence_dict.update({attrib_name: {}})
-
- attribute_list = dataset_by_attrib_dict[attrib_name]
- unique_attrib_outcomes = list(set(attribute_list))
- unique_answers = list(set(target_list))
-
- for attrib_outcome in unique_attrib_outcomes:
-
- self.dataset_occurrence_dict[attrib_name].update({attrib_outcome: {}})
-
-
- for outcome in unique_answers:
- self.dataset_occurrence_dict[attrib_name][attrib_outcome].update({outcome: 0})
-
-
- for itter in range(len(attribute_list)):
-
- curr_attrib_occ = attribute_list[itter]
- curr_target_occ = target_list[itter]
-
- self.dataset_occurrence_dict[attrib_name][curr_attrib_occ][curr_target_occ] += 1
- """ After a node is added to the tree the "dataset_occurrence_dict" dictionary should be updated.
- PARAMETERS
- :param (list) attrib_list - the raw attrib data from the dataset.
- :param (list) target_list - the raw target data from the dataset. """
- def get_next_branch_occurrences(self, dataset_by_attrib_dict, target_list):
-
-
-
- subdict = copy.deepcopy(dataset_by_attrib_dict)
- subtar = copy.deepcopy(target_list)
- indices_to_remove = []
- attrib_to_remove = None
-
- for attrib_key in subdict:
- attrib_found = False
-
- for count in range(len(subdict[attrib_key])):
-
- if dataset_by_attrib_dict[attrib_key][count] == self.active_branch_nodes[0].node_name:
- attrib_found = True
-
-
- if attrib_key in subdict:
- attrib_to_remove = attrib_key
- else:
- indices_to_remove.append(count)
-
-
-
- if attrib_found:
- break
-
-
- del subdict[attrib_to_remove]
- for attrib in subdict:
-
- complete_list = subdict[attrib]
- sublist = [value for index, value in enumerate(complete_list) if index not in indices_to_remove]
- subdict[attrib] = sublist
-
-
-
-
-
- subtar = [value for index, value in enumerate(subtar) if index not in indices_to_remove]
-
-
-
-
-
-
- return subdict, subtar
- """ Checks if a branch is complete, i.e. the target outcome was found.
- PARAMETERS
- :param (dict) target_val_dist_for_attrib
- :returns (list) comp_branches - contains all the target outcomes reached for the given attribute."""
- def track_target_outcomes(self, target_val_dist_for_attrib):
- comp_branches = []
-
- for attrib_outcome_key in target_val_dist_for_attrib.keys():
-
- non_zero_outcome_count = 0
-
- branch_answer = None
-
-
-
-
-
- for target_outcome_key in target_val_dist_for_attrib[attrib_outcome_key].keys():
-
- """"Another Example: if the target is can_buy_computer(possible values/outcomes: Yes or No) and the current
- attribute is age (possible values/outcomes: <=30, 31..40 and >40) this will return how many of the entries
- where age is <=30 are no, then how many of the entries where age is <=30 are yes, then how many
- of the entries where age is 31..40 are yes and so on, until all cases are looked at. """
- outcome_occurrences = target_val_dist_for_attrib[attrib_outcome_key][target_outcome_key]
-
-
- if outcome_occurrences > 0:
- non_zero_outcome_count += 1
- if non_zero_outcome_count == 1:
- branch_answer = target_outcome_key
- if non_zero_outcome_count == 0:
- print("INVALID RESULT!")
- elif non_zero_outcome_count == 1:
- print("THE ANSWER FOR <<", attrib_outcome_key, ">> is <<", branch_answer, ">>")
- comp_branches.append({attrib_outcome_key: branch_answer})
- elif non_zero_outcome_count > 1:
- print("THE BRANCH <<", attrib_outcome_key, ">> IS STILL ACTIVE!")
- return comp_branches
-
- def count_value_occ(self, unique_values, attrib_data):
- attrib_val_occ = {}
-
- for value in unique_values:
- attrib_val_occ.update({value: 0})
-
- for u_value in unique_values:
- attrib_val_occ[u_value] = attrib_data.count(u_value)
- return attrib_val_occ
- def calc_entropy(self, attrib_uv_count, overall):
- entropy = 0
-
- for key in attrib_uv_count.keys():
-
-
- if attrib_uv_count[key] != 0:
- fraction = attrib_uv_count[key] / overall
- target_attrib_calc = fraction * math.log2(fraction)
- entropy += target_attrib_calc
- return abs(entropy)
- def calc_attrib_entropy(self, attrib_occurrences):
- entropy_list = {}
- for attrib_val_key in attrib_occurrences.keys():
- attrib_val = attrib_occurrences[attrib_val_key]
- overall = 0
- for target_values in attrib_val.values():
- overall += target_values
- print("CALC TARGET ENTROPY FOR EACH ATTRIB OUTCOME: ", attrib_val)
- attrib_entropy = self.calc_entropy(attrib_val, overall)
- entropy_list.update({attrib_val_key: attrib_entropy})
- print("Entropy list: ", entropy_list)
- return entropy_list
-
- def calc_entropy_weigh_avg(self, target_val_dist_attrib, overall, attrib_entropy):
- weighted_entropy_avg = 0
- for key in target_val_dist_attrib.keys():
- curr_value = 0
- for value in target_val_dist_attrib[key].values():
- curr_value += value
- weighted_entropy_avg += curr_value / overall * attrib_entropy[key]
-
- return weighted_entropy_avg
- def calc_info_gain(self, target_entropy, target_dist_for_attrib):
-
- attrib_entropy = self.calc_attrib_entropy(target_dist_for_attrib)
-
- weighted_avg_e = self.calc_entropy_weigh_avg(target_dist_for_attrib, TRAIN_DATA_SIZE, attrib_entropy)
-
- attrib_info_gain = target_entropy - weighted_avg_e
- return attrib_info_gain
-
-
-
- def build_node(self, name, completed_branches):
- attrib_node = Node(name)
- derived_nodes = []
- completed_outcomes = []
- for branch in completed_branches:
- completed_outcomes.append(list(branch.keys())[0])
-
-
-
-
- for outcome_name in self.dataset_occurrence_dict[name]:
- new_outcome_node = Node(outcome_name)
-
-
- for branch in completed_branches:
- if outcome_name in branch:
-
- if len(new_outcome_node.derived_nodes) == 0:
-
- endpoint_node = Node(branch[outcome_name], None)
- new_outcome_node.derived_nodes.append(endpoint_node)
-
-
-
-
-
- temp_outcome = copy.deepcopy(new_outcome_node)
- derived_nodes.append(temp_outcome)
-
- if outcome_name not in completed_outcomes:
-
- self.active_branch_nodes.append(temp_outcome)
- """print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! Completed Nodes:", acc_completed)
- acc_completed[name]["completed"] = True
- all_outcomes_list = list(self.dataset_occurrence_dict[name].keys())
- for outcome in all_outcomes_list:
- if outcome in acc_completed[name]["outcomes"]:
- print(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> ", outcome, " TRUE")
- else:
- print(">>>>>>>>>>>>>>>>>>>>>>>>>>>>> ", outcome, " FALSE")
- acc_completed[name]["completed"] = False
- print(all_outcomes_list)"""
- new_outcome_node.derived_nodes.clear()
-
- attrib_node.derived_nodes = derived_nodes
- return attrib_node
-
-
- def link_node(self, new_node):
- """
- print(" <<< CHECKING IF THE TREE SEGMENT IS BUILT RIGHT! >>> ")
- # TEMP
- print("ATTRIBUTE/PARENT NODE: ", new_node.node_name)
- print("DERIVED NODES LIST: ", new_node.derived_nodes)
- print("FOR EACH NODE IN DERIVED NODES.")
- for node in new_node.derived_nodes:
- print("\t OUTCOME NODE FOR ATTRIB: ", node.node_name)
- for other in node.derived_nodes:
- print("\t\t TARGET OUTCOME REACHED: ", other.node_name)"""
- if self.root_node is None:
- self.root_node = new_node
- else:
-
-
- self.active_branch_nodes[0].derived_nodes.append(new_node)
-
-
- self.active_branch_nodes.pop(0)
-
- def build_tree_chunk(self, dataset_by_attrib_dict, target_attrib_list):
- self.generate_occurrences(dataset_by_attrib_dict, target_attrib_list)
-
-
- target_uv_data = list(set(target_attrib_list))
- target_uv_count = self.count_value_occ(target_uv_data, target_attrib_list)
-
- target_entropy = self.calc_entropy(target_uv_count, TRAIN_DATA_SIZE)
-
-
-
- next_node_data = {"name": None, "info gain": 0, "completed": None}
- for attrib_name in self.dataset_occurrence_dict.keys():
- print("\n", "-" * 50)
-
- print("attrib_name: ", attrib_name)
-
-
- target_dist_for_attrib = self.dataset_occurrence_dict[attrib_name]
-
-
- completed_branches = self.track_target_outcomes(target_dist_for_attrib)
- print("COMPLETED BRANCHES: ", completed_branches)
- attrib_info_gain = self.calc_info_gain(target_entropy, target_dist_for_attrib)
-
- if next_node_data["info gain"] < attrib_info_gain:
- next_node_data["name"] = attrib_name
- next_node_data["info gain"] = attrib_info_gain
- next_node_data["completed"] = completed_branches
- print("------> The next new node is: ", next_node_data["name"], "\n\n")
- new_node = self.build_node(next_node_data["name"], next_node_data["completed"])
- self.link_node(new_node)
-
- def build_tree(self, dataset_by_attrib_dict, target_attrib_list):
- self.build_tree_chunk(dataset_by_attrib_dict, target_attrib_list)
- print("\n\n")
- while len(self.active_branch_nodes) != 0:
- print(">>>>>>>>>>>>>>>>>>> Current active node: ", self.active_branch_nodes[0].node_name)
-
- sub_attrib_dict, sub_tar_list = self.get_next_branch_occurrences(dataset_by_attrib_dict, target_attrib_list)
- self.build_tree_chunk(sub_attrib_dict, sub_tar_list)
- print("\n\n>>>>>>>>>>>>>>>>>>> List of active nodes: ", self.active_branch_nodes)
- print("\n\n", "<"*5, "THE TREE IS COMPLETE!", ">"*5, "\n\n")
- def visualise_tree(self):
- current_node = self.root_node
- while current_node is not None:
- print(current_node.node_name)
-
-
-
-
-
- def classify(self, entry_index, dataset_by_attrib_dict):
- answer = None
-
- current_node = self.root_node
- while current_node.derived_nodes is not None:
- print("\n <<< TRAVERSING TREE >>> ")
- print("Current Attrib: ", current_node.node_name)
-
- column_name = current_node.node_name
-
- current_outcome_name = dataset_by_attrib_dict[column_name][entry_index]
- print("\tCurrent outcome name: ", current_outcome_name)
-
- for outcome_node in current_node.derived_nodes:
- if outcome_node.node_name == current_outcome_name:
-
-
- current_node = outcome_node.derived_nodes[0]
-
- answer = current_node.node_name
- print(" <<< FOUND VALUE >>> ")
- print(" The answer is: ", answer)
- return answer
- def test_run_algorithm():
- print(" "*10, " << ID3 CLASSIFICATION ALGORITHM >> ", " "*10)
- tree = ID3DecisionTree()
- tree.build_tree(DATASET_BY_ATTRIB_DICT, TARGET_ATTRIB_LIST)
-
-
- entry_index = 0
- tree.classify(entry_index, DATASET_BY_ATTRIB_DICT)
- test_run_algorithm()
- """
- # Remove the completed branches
- for branch in completed_branches:
- for key in branch.keys():
- target_val_dist_for_grade.pop(key)
- print("After removing completed branches: ", target_val_dist_for_grade)
- """
- """
- What is "Training Data"?
- Building the tree is done with training data which already has the answer to whatever question is being asked.
- the example given with the data on the slides that asks if someone can buy a laptop is training data
- because it already knows the answer.
- """
- """
- Apply information gain function to each attribute calculate_gain(attr_out)
- Should that be applied to the target as well? No
- Example:
- - G(train_data, O) = 0.246
- - G(train_data, H) = 0.151
- - G(train_data, W) = 0.048
- Once the root node is known, look at how many unique values are there.
- If there are 4 possible values and they are not numbers,
- for example "Sunny", "Rainy", etc. there should be 4 nodes.
- """
- """
- What is "Test Data"?
- Test data is when we get a new entry and we want to classify it.
- For example: In the bank they may use an already trained ID3 algorithm to check if you should get a credit card or not.
- They will have different attributes like - number of times you have gone bankrupt; what is your current net worth;
- are you a student; what is your credit score; etc.
- Then the target attribute will be EligibleForCreditCard(True or False)
- """
- """
- Steps:
- 1. Find which is the current attribute to look through (To start with ask the tree which attribute is the root node)
- 1.1 (When building the tree need to make sure the attributes have the exact same name as the Node data)
- 1.2 Search trough all possible attributes
- 1.3 Check if the attribute name == the node name
-
- 2. Find the attribute value for the current row
- 2.1 Ask the data set which value is given for this attribute
- 2.2 Find the which of the children nodes in the tree are equivalent to the given value
-
- Repeat these steps recursively until an answer is found.
- """
|