dataFrameAnonymizer.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. from typing import List
  2. import pandas as pd
  3. from pandas import DataFrame
  4. from pandas.core.dtypes.common import is_numeric_dtype
  5. from pandas.core.indexes.numeric import NumericIndex
  6. from .mondrian_anonymizer import MondrianAnonymizer
  7. class DataFrameAnonymizer:
  8. AVG_OVERWRITE = True
  9. mondrian: MondrianAnonymizer # takes care of partitioning dataframe using mondrian algorithm
  10. def __init__(self, sensitive_attribute_columns: List[str], feature_columns=None, avg_columns=None, format_to_str=False):
  11. self.sensitive_attribute_columns = sensitive_attribute_columns
  12. self.feature_columns = feature_columns
  13. self.avg_columns = avg_columns
  14. self.format_to_str = format_to_str
  15. # Set feature colums from all other columns than sensitive columns
  16. def init_feature_colums(self, df):
  17. # Setup feature columns / Quasi identifiers
  18. fc = []
  19. if self.feature_columns is None:
  20. # Assume that all other columns are feature columns
  21. for col in df.columns:
  22. if col not in self.sensitive_attribute_columns:
  23. fc.append(col)
  24. self.feature_columns = fc
  25. def anonymize(self, df, k, l=0):
  26. # Check inputs
  27. if df is None or len(df) == 0:
  28. raise Exception("Dataframe is empty")
  29. if self.sensitive_attribute_columns is None or len(self.sensitive_attribute_columns) == 0:
  30. raise Exception("Provide at least one sensitive attribute column")
  31. if not self.feature_columns:
  32. self.init_feature_colums(df)
  33. if self.avg_columns:
  34. for c in self.avg_columns:
  35. if not is_numeric_dtype(df[c]):
  36. raise Exception("Column " + c + " is not numeric and average cannot be calculated.")
  37. mondrian = MondrianAnonymizer(df, self.feature_columns, self.sensitive_attribute_columns)
  38. partitions = mondrian.partition(k, l)
  39. dfa = self.build_anonymized_dataframe(df, partitions)
  40. return dfa
  41. def anonymize_k_anonymity(self, df, k) -> DataFrame:
  42. return self.anonymize(df, k)
  43. def anonymize_l_diversity(self, df, k, l) -> DataFrame:
  44. return self.anonymize(df, k, l=l)
  45. def anonymize_t_closeness(self, df, k) -> DataFrame:
  46. return self.anonymize(df, k)
  47. @staticmethod
  48. def __agg_column_str(series):
  49. if is_numeric_dtype(series):
  50. minimum = series.min()
  51. maximum = series.max()
  52. return "{min} - {max}".format(min=minimum, max=maximum)
  53. else:
  54. series.astype("category")
  55. l = [str(n) for n in set(series)]
  56. return ", ".join(l)
  57. @staticmethod
  58. def __agg_column_list(series):
  59. if is_numeric_dtype(series):
  60. minimum = series.min()
  61. maximum = series.max()
  62. return [minimum, maximum]
  63. else:
  64. series.astype("category")
  65. l = [str(n) for n in set(series)]
  66. return l
  67. def partition_dataframe(self, df, k, l=0) -> List[NumericIndex]:
  68. mondrian = MondrianAnonymizer(df, self.feature_columns, self.sensitive_attribute_columns)
  69. partitions = mondrian.partition(k, l)
  70. return partitions
  71. def build_anonymized_dataframe(self, df, partitions) -> DataFrame:
  72. aggregations = {}
  73. sensitive_columns = self.sensitive_attribute_columns
  74. feature_columns = self.feature_columns
  75. sa_len = len(sensitive_columns)
  76. for column in feature_columns:
  77. if self.format_to_str:
  78. aggregations[column] = self.__agg_column_str
  79. else:
  80. aggregations[column] = self.__agg_column_list
  81. rows = []
  82. for i, partition in enumerate(partitions):
  83. dfp = df.loc[partition]
  84. grouped_columns = dfp.agg(aggregations, squeeze=False)
  85. values = grouped_columns.to_dict()
  86. if self.avg_columns:
  87. # handle average columns and set average instead of interval
  88. # overwrite column with average
  89. for avg_col in self.avg_columns:
  90. col_name = avg_col + '_avg' if not self.AVG_OVERWRITE else avg_col
  91. if avg_col in feature_columns:
  92. avg_val = dfp[avg_col].mean()
  93. values.update({col_name: avg_val})
  94. grouped_sensitive_columns = dfp.groupby(sensitive_columns, as_index=False)
  95. for grouped_sensitive_value in grouped_sensitive_columns:
  96. for sensitive_column in sensitive_columns:
  97. if sa_len > 1:
  98. # Value is tuple
  99. sensitive_value = grouped_sensitive_value[0][sensitive_columns.index(sensitive_column)]
  100. else:
  101. sensitive_value = grouped_sensitive_value[0]
  102. count = len(grouped_sensitive_value[1])
  103. values.update(
  104. {
  105. sensitive_column: sensitive_value,
  106. sensitive_column + "_count": count,
  107. }
  108. )
  109. rows.append(values.copy())
  110. return pd.DataFrame(rows)