123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132 |
- from typing import List
- import pandas as pd
- from pandas import DataFrame
- from pandas.core.dtypes.common import is_numeric_dtype
- from pandas.core.indexes.numeric import NumericIndex
- from .mondrian_anonymizer import MondrianAnonymizer
- class DataFrameAnonymizer:
- AVG_OVERWRITE = True
- mondrian: MondrianAnonymizer # takes care of partitioning dataframe using mondrian algorithm
- def __init__(self, sensitive_attribute_columns: List[str], feature_columns=None, avg_columns=None, format_to_str=False):
- self.sensitive_attribute_columns = sensitive_attribute_columns
- self.feature_columns = feature_columns
- self.avg_columns = avg_columns
- self.format_to_str = format_to_str
- # Set feature colums from all other columns than sensitive columns
- def init_feature_colums(self, df):
- # Setup feature columns / Quasi identifiers
- fc = []
- if self.feature_columns is None:
- # Assume that all other columns are feature columns
- for col in df.columns:
- if col not in self.sensitive_attribute_columns:
- fc.append(col)
- self.feature_columns = fc
- def anonymize(self, df, k, l=0):
- # Check inputs
- if df is None or len(df) == 0:
- raise Exception("Dataframe is empty")
- if self.sensitive_attribute_columns is None or len(self.sensitive_attribute_columns) == 0:
- raise Exception("Provide at least one sensitive attribute column")
- if not self.feature_columns:
- self.init_feature_colums(df)
- if self.avg_columns:
- for c in self.avg_columns:
- if not is_numeric_dtype(df[c]):
- raise Exception("Column " + c + " is not numeric and average cannot be calculated.")
- mondrian = MondrianAnonymizer(df, self.feature_columns, self.sensitive_attribute_columns)
- partitions = mondrian.partition(k, l)
- dfa = self.build_anonymized_dataframe(df, partitions)
- return dfa
- def anonymize_k_anonymity(self, df, k) -> DataFrame:
- return self.anonymize(df, k)
- def anonymize_l_diversity(self, df, k, l) -> DataFrame:
- return self.anonymize(df, k, l=l)
- def anonymize_t_closeness(self, df, k) -> DataFrame:
- return self.anonymize(df, k)
- @staticmethod
- def __agg_column_str(series):
- if is_numeric_dtype(series):
- minimum = series.min()
- maximum = series.max()
- return "{min} - {max}".format(min=minimum, max=maximum)
- else:
- series.astype("category")
- l = [str(n) for n in set(series)]
- return ", ".join(l)
- @staticmethod
- def __agg_column_list(series):
- if is_numeric_dtype(series):
- minimum = series.min()
- maximum = series.max()
- return [minimum, maximum]
- else:
- series.astype("category")
- l = [str(n) for n in set(series)]
- return l
- def partition_dataframe(self, df, k, l=0) -> List[NumericIndex]:
- mondrian = MondrianAnonymizer(df, self.feature_columns, self.sensitive_attribute_columns)
- partitions = mondrian.partition(k, l)
- return partitions
- def build_anonymized_dataframe(self, df, partitions) -> DataFrame:
- aggregations = {}
- sensitive_columns = self.sensitive_attribute_columns
- feature_columns = self.feature_columns
- sa_len = len(sensitive_columns)
- for column in feature_columns:
- if self.format_to_str:
- aggregations[column] = self.__agg_column_str
- else:
- aggregations[column] = self.__agg_column_list
- rows = []
- for i, partition in enumerate(partitions):
- dfp = df.loc[partition]
- grouped_columns = dfp.agg(aggregations, squeeze=False)
- values = grouped_columns.to_dict()
- if self.avg_columns:
- # handle average columns and set average instead of interval
- # overwrite column with average
- for avg_col in self.avg_columns:
- col_name = avg_col + '_avg' if not self.AVG_OVERWRITE else avg_col
- if avg_col in feature_columns:
- avg_val = dfp[avg_col].mean()
- values.update({col_name: avg_val})
- grouped_sensitive_columns = dfp.groupby(sensitive_columns, as_index=False)
- for grouped_sensitive_value in grouped_sensitive_columns:
- for sensitive_column in sensitive_columns:
- if sa_len > 1:
- # Value is tuple
- sensitive_value = grouped_sensitive_value[0][sensitive_columns.index(sensitive_column)]
- else:
- sensitive_value = grouped_sensitive_value[0]
- count = len(grouped_sensitive_value[1])
- values.update(
- {
- sensitive_column: sensitive_value,
- sensitive_column + "_count": count,
- }
- )
- rows.append(values.copy())
- return pd.DataFrame(rows)
|