anonymize.py 5.0 KB


  1. #!/usr/bin/python
  2. # FilterNHSNumbers.py
  3. #
  4. # This script looks through any given files and looks for 10 digit numbers
  5. # which are a valid NHS number.
  6. #
  7. # It then attempts to replace each individual NHS number with a pseudonym,
  8. # which it reads from pseudonyms.csv (and writes back afterwards)
  9. import csv
  10. import os
  11. import random
  12. import sys
  13. LOOKUP = {}
  14. GENERATE_NEW = False
  15. def main():
  16. load_pseudonyms()
  17. if len(sys.argv) > 1:
  18. for fn in sys.argv[1:]:
  19. filter_file(fn)
  20. else:
  21. dirList = os.listdir(os.path.dirname(sys.argv[0]))
  22. for fname in dirList:
  23. split = os.path.splitext(fname)
  24. if (split[1].lower() == '.csv' and split[0].upper()[:5] != 'ANON_'
  25. and split[0] != 'pseudonyms'):
  26. filter_file(fname)
  27. save_pseudonyms()
  28. def filter_file(fn):
  29. print("Scanning %s..." % fn)
  30. locations = find_nhs_numbers(fn)
  31. if locations is None:
  32. print("Failed to open.")
  33. return False
  34. nhs_numbers = []
  35. try:
  36. f = open("DEP2.CSV", 'rb')
  37. except IOError:
  38. return
  39. for location in locations:
  40. f.seek(location)
  41. nhs_numbers.append(f.read(10))
  42. f.close()
  43. print("found %d valid NHS numbers" % len(locations))
  44. replace_nhs_numbers(fn, locations)
  45. def find_nhs_numbers(fn):
  46. try:
  47. f = open(fn, 'rb')
  48. except IOError:
  49. return None
  50. locations = []
  51. num = 0
  52. while True:
  53. c = f.read(1)
  54. if c == '':
  55. break
  56. ascii_ = ord(c)
  57. if ascii_ in (48, 49, 50, 51, 52, 53, 54, 55, 56, 57):
  58. num += 1
  59. else:
  60. if num == 10:
  61. startLocation = f.tell() - 11
  62. f.seek(startLocation)
  63. if validate_nhs_number(f.read(10)):
  64. locations.append(startLocation)
  65. num = 0
  66. return locations
  67. def validate_nhs_number(num):
  68. s = str(num)
  69. if len(s) != 10:
  70. return False
  71. tot = 0
  72. for i in range(9):
  73. dig = int(s[i])
  74. tot += dig * (10-i)
  75. tmp = tot % 11
  76. if tmp == 0:
  77. checksum = 0
  78. else:
  79. checksum = 11 - (tot % 11)
  80. if checksum == 10 or checksum != int(s[9]):
  81. return False
  82. else:
  83. return True
  84. def replace_nhs_numbers(fn, locations):
  85. outFn = os.path.join(os.path.dirname(fn), "ANON_" + os.path.basename(fn))
  86. with open(fn, 'rb') as f, open(outFn, 'wb') as out:
  87. for location in locations:
  88. buf = f.read(location - f.tell())
  89. out.write(buf)
  90. pseudo = str(get_pseudonym((int(f.read(10)))))
  91. out.write(pseudo)
  92. out.write(f.read())
  93. def get_pseudonym(nhs_number):
  94. global GENERATE_NEW
  95. pseudo = LOOKUP.get(nhs_number)
  96. if pseudo is not None:
  97. return pseudo
  98. if GENERATE_NEW is not True:
  99. print("I have encountered a new NHS number (%d) with no pseudonym.\n"
  100. "Should I generate new ones for any new NHS numbers I find "
  101. "from now on?" % nhs_number)
  102. response = raw_input("type y or n:")
  103. if response == 'y':
  104. GENERATE_NEW = True
  105. else:
  106. print("In that case, I will exit now.")
  107. exit()
  108. while True:
  109. digits = []
  110. s = ''
  111. tot = 0
  112. for i in range(9):
  113. if i == 0:
  114. digit = random.randint(1, 9)
  115. else:
  116. digit = random.randint(0, 9)
  117. digits.append(digit)
  118. s += str(digit)
  119. tot += digit * (10 - i) # (10 - i) is the weighting factor
  120. checksum = 11 - (tot % 11)
  121. if checksum == 11:
  122. checksum = 0
  123. if checksum != 10: # 10 is an invalid nhs number
  124. s += str(checksum)
  125. pseudo = int(s)
  126. LOOKUP[nhs_number] = pseudo
  127. return pseudo
  128. def load_pseudonyms():
  129. global GENERATE_NEW
  130. dirs = [os.path.abspath(os.path.dirname(fn)) for fn in sys.argv]
  131. dirs = list(set(dirs))
  132. for dir_ in dirs:
  133. fn = os.path.join(dir_, "pseudonyms.csv")
  134. try:
  135. reader = csv.reader(open(fn, 'r'))
  136. except IOError:
  137. continue
  138. print("Reading pseudonyms from %s" % fn)
  139. for line in reader:
  140. if len(line) > 0:
  141. try:
  142. LOOKUP[int(line[0])] = int(line[1])
  143. except ValueError:
  144. pass
  145. num_loaded = len(LOOKUP)
  146. if num_loaded == 0:
  147. print("I haven't loaded any pseudonyms, should I just generate a "
  148. "pseudonym for every NHS number I encounter?")
  149. if raw_input("Type y or n: ") == 'y':
  150. GENERATE_NEW = True
  151. else:
  152. print("Loaded %d pseudonyms NHS numbers." % num_loaded)
  153. GENERATE_NEW = False
  154. def save_pseudonyms():
  155. fn = os.path.abspath("pseudonyms.csv")
  156. print("Saving %d unique NHS Number pseudonyms to %s" % (len(LOOKUP), fn))
  157. f = open(fn, "w")
  158. f.write("Real_NHS_Number,Pseudonym_NHS_Number\n")
  159. for nhs_number in LOOKUP.keys():
  160. f.write("%d,%d" % (nhs_number, LOOKUP[nhs_number]) + "\n")
  161. main()