123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209 |
- #!/usr/bin/python
- # FilterNHSNumbers.py
- #
- # This script looks through any given files and looks for 10 digit numbers
- # which are a valid NHS number.
- #
- # It then attempts to replace each individual NHS number with a pseudonym,
- # which it reads from pseudonyms.csv (and writes back afterwards)
- import csv
- import os
- import random
- import sys
- LOOKUP = {}
- GENERATE_NEW = False
- def main():
- load_pseudonyms()
- if len(sys.argv) > 1:
- for fn in sys.argv[1:]:
- filter_file(fn)
- else:
- dirList = os.listdir(os.path.dirname(sys.argv[0]))
- for fname in dirList:
- split = os.path.splitext(fname)
- if (split[1].lower() == '.csv' and split[0].upper()[:5] != 'ANON_'
- and split[0] != 'pseudonyms'):
- filter_file(fname)
- save_pseudonyms()
- def filter_file(fn):
- print("Scanning %s..." % fn)
- locations = find_nhs_numbers(fn)
- if locations is None:
- print("Failed to open.")
- return False
- nhs_numbers = []
- try:
- f = open("DEP2.CSV", 'rb')
- except IOError:
- return
- for location in locations:
- f.seek(location)
- nhs_numbers.append(f.read(10))
- f.close()
- print("found %d valid NHS numbers" % len(locations))
- replace_nhs_numbers(fn, locations)
- def find_nhs_numbers(fn):
- try:
- f = open(fn, 'rb')
- except IOError:
- return None
- locations = []
- num = 0
- while True:
- c = f.read(1)
- if c == '':
- break
- ascii_ = ord(c)
- if ascii_ in (48, 49, 50, 51, 52, 53, 54, 55, 56, 57):
- num += 1
- else:
- if num == 10:
- startLocation = f.tell() - 11
- f.seek(startLocation)
- if validate_nhs_number(f.read(10)):
- locations.append(startLocation)
- num = 0
- return locations
- def validate_nhs_number(num):
- s = str(num)
- if len(s) != 10:
- return False
- tot = 0
- for i in range(9):
- dig = int(s[i])
- tot += dig * (10-i)
- tmp = tot % 11
- if tmp == 0:
- checksum = 0
- else:
- checksum = 11 - (tot % 11)
- if checksum == 10 or checksum != int(s[9]):
- return False
- else:
- return True
- def replace_nhs_numbers(fn, locations):
- outFn = os.path.join(os.path.dirname(fn), "ANON_" + os.path.basename(fn))
- with open(fn, 'rb') as f, open(outFn, 'wb') as out:
- for location in locations:
- buf = f.read(location - f.tell())
- out.write(buf)
- pseudo = str(get_pseudonym((int(f.read(10)))))
- out.write(pseudo)
- out.write(f.read())
- def get_pseudonym(nhs_number):
- global GENERATE_NEW
- pseudo = LOOKUP.get(nhs_number)
- if pseudo is not None:
- return pseudo
- if GENERATE_NEW is not True:
- print("I have encountered a new NHS number (%d) with no pseudonym.\n"
- "Should I generate new ones for any new NHS numbers I find "
- "from now on?" % nhs_number)
- response = raw_input("type y or n:")
- if response == 'y':
- GENERATE_NEW = True
- else:
- print("In that case, I will exit now.")
- exit()
- while True:
- digits = []
- s = ''
- tot = 0
- for i in range(9):
- if i == 0:
- digit = random.randint(1, 9)
- else:
- digit = random.randint(0, 9)
- digits.append(digit)
- s += str(digit)
- tot += digit * (10 - i) # (10 - i) is the weighting factor
- checksum = 11 - (tot % 11)
- if checksum == 11:
- checksum = 0
- if checksum != 10: # 10 is an invalid nhs number
- s += str(checksum)
- pseudo = int(s)
- LOOKUP[nhs_number] = pseudo
- return pseudo
- def load_pseudonyms():
- global GENERATE_NEW
- dirs = [os.path.abspath(os.path.dirname(fn)) for fn in sys.argv]
- dirs = list(set(dirs))
- for dir_ in dirs:
- fn = os.path.join(dir_, "pseudonyms.csv")
- try:
- reader = csv.reader(open(fn, 'r'))
- except IOError:
- continue
- print("Reading pseudonyms from %s" % fn)
- for line in reader:
- if len(line) > 0:
- try:
- LOOKUP[int(line[0])] = int(line[1])
- except ValueError:
- pass
- num_loaded = len(LOOKUP)
- if num_loaded == 0:
- print("I haven't loaded any pseudonyms, should I just generate a "
- "pseudonym for every NHS number I encounter?")
- if raw_input("Type y or n: ") == 'y':
- GENERATE_NEW = True
- else:
- print("Loaded %d pseudonyms NHS numbers." % num_loaded)
- GENERATE_NEW = False
- def save_pseudonyms():
- fn = os.path.abspath("pseudonyms.csv")
- print("Saving %d unique NHS Number pseudonyms to %s" % (len(LOOKUP), fn))
- f = open(fn, "w")
- f.write("Real_NHS_Number,Pseudonym_NHS_Number\n")
- for nhs_number in LOOKUP.keys():
- f.write("%d,%d" % (nhs_number, LOOKUP[nhs_number]) + "\n")
- main()
|