123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197 |
- # -*- coding: utf-8 -*-
- """
- Created on Tue May 12 00:00:00 2020
- @author: Shaji
- """
- from . import exceptions
- from datetime import datetime
- import os
- import pandas as pd
- def column_level_check(source_df,target_df,primary_keys):
- """
- Usage: [arg1]:[Pandas DataFrame - source], [arg2]:[Pandas DataFrame - target], [arg3]:[Primary keys (separated by comma)]
- Description: Performs column level testing between two DataFrames by joining using the primary keys.
- Returns: [Mismatch Count], [Test Log (list)], [Pandas dataframe - mismatch (if any)]
- """
- global execution_status
- systime=datetime.now()
- start_time=systime.strftime("%Y")+'-'+systime.strftime("%m")+'-'+systime.strftime("%d")+' '+systime.strftime("%H")+':'+systime.strftime("%M")+':'+systime.strftime("%S")
- log_list=[]
- execution_status='RUNNING'
- log_list.append('START TIME: '+start_time)
- key_list=primary_keys.split(',')
- src=source_df
- tgt=target_df
- log_list.append(str(datetime.now())+': DIFFERENTIATING SOURCE AND TARGET COLUMNS')
- if execution_status!='FAILED':
- try:
- src_k=[]
- src_columns=[]
- for i in src.columns:
- if str.lower(i) in [str.lower(key) for key in key_list]:
- src_columns.append(str.lower(i))
- src_k.append(str.lower(i))
- else:
- src_columns.append(str(i) + '_src')
- src.columns = src_columns
- tgt_k=[]
- tgt_columns=[]
- for i in tgt.columns:
- if str.lower(i) in [str.lower(key) for key in key_list]:
- tgt_columns.append(str.lower(i))
- tgt_k.append(str.lower(i))
- else:
- tgt_columns.append(str(i) + '_tgt')
- tgt.columns = tgt_columns
- except Exception as e:
- print('Failed while DIFFERENTIATING SOURCE AND TARGET COLUMNS: '+str(e))
- log_list.append('Failed while DIFFERENTIATING SOURCE AND TARGET COLUMNS: '+str(e))
- execution_status='FAILED'
- log_list.append(str(datetime.now())+': CHECKING IF THE GROUP BY MAKES THE RECORD LEVEL SAME AS ACTUAL')
- if execution_status!='FAILED':
- try:
- index_unique_flag=[]
- if src.groupby(src_k).count().shape[0]==src.shape[0]:
- index_unique_flag.append(True)
- else:
- index_unique_flag.append(False)
- if tgt.groupby(tgt_k).count().shape[0]==tgt.shape[0]:
- index_unique_flag.append(True)
- else:
- index_unique_flag.append(False)
- except Exception as e:
- print('Failed while CHECKING IF THE GROUP BY MAKES THE RECORD LEVEL SAME AS ACTUAL: '+str(e))
- log_list.append('Failed while CHECKING IF THE GROUP BY MAKES THE RECORD LEVEL SAME AS ACTUAL: '+str(e))
- execution_status='FAILED'
- if execution_status!='FAILED':
- try:
- if all(index_unique_flag)==True:
- log_list.append(str(datetime.now())+': JOINING THE TABLES')
- try:
- df=tgt.set_index(tgt_k).join(src.set_index(src_k),how='left')
- except Exception as e:
- print('Failed while JOINING THE TABLES: '+str(e))
- log_list.append('Failed while JOINING THE TABLES: '+str(e))
- execution_status='FAILED'
- log_list.append(str(datetime.now())+': FINDING THE TARGET COLUMN AND SOURCE COLUMN TO BE COMPARED')
- if execution_status!='FAILED':
- try:
- ma_list=[]
- for i in range(len(df.columns)):
- if df.columns[i][-3:]=='tgt':
- for j in range(len(df.columns)):
- if df.columns[j][-3:]=='src':
- if str.lower(df.columns[i][:-4])==str.lower(df.columns[j][:-4]):
- ma_list.append([j,i])
- match_cols=''
- for i in range(len(ma_list)):
- match_cols+=str(i+1)+': '+df.columns[ma_list[i][1]]+' = '+df.columns[ma_list[i][0]]+' , '
- log_list.append('Matching columns '+match_cols)
- except Exception as e:
- print('Failed while FINDING THE TARGET COLUMN AND SOURCE COLUMN TO BE COMPARED: '+str(e))
- log_list.append('Failed while FINDING THE TARGET COLUMN AND SOURCE COLUMN TO BE COMPARED: '+str(e))
- execution_status='FAILED'
- log_list.append(str(datetime.now())+': COMPARISION STARTED')
- if execution_status!='FAILED':
- try:
- mis_cols=[]
- res=[]
- index=[]
- for i in range(len(ma_list)):
- if all(df[df.columns[ma_list[i][0]]].apply(lambda x:str(x).strip()).astype(str).fillna(str(0))==df[df.columns[ma_list[i][1]]].apply(lambda x:str(x).strip()).astype(str).fillna(str(0)))==True:
- res.append(True)
- else:
- res.append(False)
- mis_cols.append(df.columns[ma_list[i][0]])
- mis_cols.append(df.columns[ma_list[i][1]])
- for j in range(len(df[df.columns[ma_list[i][0]]].apply(lambda x:str(x).strip()).astype(str).fillna(str(0))==df[df.columns[ma_list[i][1]]].apply(lambda x:str(x).strip()).astype(str).fillna(str(0)))):
- if list(df[df.columns[ma_list[i][0]]].apply(lambda x:str(x).strip()).astype(str).fillna(str(0))==df[df.columns[ma_list[i][1]]].apply(lambda x:str(x).strip()).astype(str).fillna(str(0)))[j]==False:
- index.append(j)
- un_df=df[mis_cols].iloc[list(set(index))]
- except Exception as e:
- print('Failed while COMPARING: '+str(e))
- log_list.append('Failed while COMPARING: '+str(e))
- execution_status='FAILED'
- log_list.append(str(datetime.now())+': TEST RESULT:')
- if execution_status!='FAILED':
- try:
- if all(res)==True:
- mismatch_count=0
- print('COLUMN LEVEL CHECK PASSED')
- execution_status='SUCCESS'
- log_list.append('COLUMN LEVEL CHECK PASSED')
- else:
- log_list.append((str(len(set(index)))+' records unmatched'))
- log_list.append('Column level check Failed')
- mismatch_count=str(len(set(index)))
- execution_status='SUCCESS'
- except Exception as e:
- print('Failed while getting the TEST RESULT: '+str(e))
- log_list.append('Failed while getting the TEST RESULT: '+str(e))
- execution_status='FAILED'
- else:
- log_list.append('The records grouped at the level of key columns are not unique')
- except Exception as e:
- log_list.append('Failed while CHECKING IF THE GROUP BY MAKES THE RECORD LEVEL SAME AS ACTUAL: '+str(e))
- execution_status='FAILED'
- if execution_status=='FAILED':
- print('Check Logs for the error message')
- raise exceptionsExecutionError
- return mismatch_count,log_list,un_df
- def sort_and_compare(source_df,target_df):
- """
- Usage: [arg1]:[Pandas DataFrame - source], [arg2]:[Pandas DataFrame - target]
- Description: Sort and Compare two datasets.
- Returns: [Mismatch Count], [Test Log (list)], [Pandas dataframe - mismatch (if any)]
- """
- log_list=[]
- col1=source_df.columns
- col2=target_df.columns
- cols=list(set(col1.sort_values()).intersection(set(col2.sort_values())))
- log_list.append('Common column(s): '+', '.join(cols))
- source_df.sort_values(cols, axis=0, ascending=True, inplace=True)
- target_df.sort_values(cols, axis=0, ascending=True, inplace=True)
- data1=source_df[cols].reset_index(drop=True)
- data2=target_df[cols].reset_index(drop=True)
- data1.head()
- data2.head()
- result=data1==data2
- bool_list=[]
- mis_cols=[]
- mis_index=[]
- for i in cols:
- if all(result[i])==True:
- bool_list.append(True)
- else:
- bool_list.append(False)
- mis_cols.append(i)
- for j in range(len(result[i])):
- if result[i][j]==False:
- mis_index.append(j)
- un_df=pd.concat([data1.iloc[list(set(mis_index))],data2.iloc[list(set(mis_index))]],axis=1)
- mismatch_count=0
- if all(bool_list)==True:
- log_list.append('Records are matching')
- else:
- mismatch_count=len(set(mis_index))
- log_list.append(str(mismatch_count)+' records unmatched')
- log_list.append('Column(s): '+', '.join(mis_cols))
- return mismatch_count,log_list,un_df[mis_cols]
|