etl_testing.py 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197
  1. # -*- coding: utf-8 -*-
  2. """
  3. Created on Tue May 12 00:00:00 2020
  4. @author: Shaji
  5. """
  6. from . import exceptions
  7. from datetime import datetime
  8. import os
  9. import pandas as pd
  10. def column_level_check(source_df,target_df,primary_keys):
  11. """
  12. Usage: [arg1]:[Pandas DataFrame - source], [arg2]:[Pandas DataFrame - target], [arg3]:[Primary keys (separated by comma)]
  13. Description: Performs column level testing between two DataFrames by joining using the primary keys.
  14. Returns: [Mismatch Count], [Test Log (list)], [Pandas dataframe - mismatch (if any)]
  15. """
  16. global execution_status
  17. systime=datetime.now()
  18. start_time=systime.strftime("%Y")+'-'+systime.strftime("%m")+'-'+systime.strftime("%d")+' '+systime.strftime("%H")+':'+systime.strftime("%M")+':'+systime.strftime("%S")
  19. log_list=[]
  20. execution_status='RUNNING'
  21. log_list.append('START TIME: '+start_time)
  22. key_list=primary_keys.split(',')
  23. src=source_df
  24. tgt=target_df
  25. log_list.append(str(datetime.now())+': DIFFERENTIATING SOURCE AND TARGET COLUMNS')
  26. if execution_status!='FAILED':
  27. try:
  28. src_k=[]
  29. src_columns=[]
  30. for i in src.columns:
  31. if str.lower(i) in [str.lower(key) for key in key_list]:
  32. src_columns.append(str.lower(i))
  33. src_k.append(str.lower(i))
  34. else:
  35. src_columns.append(str(i) + '_src')
  36. src.columns = src_columns
  37. tgt_k=[]
  38. tgt_columns=[]
  39. for i in tgt.columns:
  40. if str.lower(i) in [str.lower(key) for key in key_list]:
  41. tgt_columns.append(str.lower(i))
  42. tgt_k.append(str.lower(i))
  43. else:
  44. tgt_columns.append(str(i) + '_tgt')
  45. tgt.columns = tgt_columns
  46. except Exception as e:
  47. print('Failed while DIFFERENTIATING SOURCE AND TARGET COLUMNS: '+str(e))
  48. log_list.append('Failed while DIFFERENTIATING SOURCE AND TARGET COLUMNS: '+str(e))
  49. execution_status='FAILED'
  50. log_list.append(str(datetime.now())+': CHECKING IF THE GROUP BY MAKES THE RECORD LEVEL SAME AS ACTUAL')
  51. if execution_status!='FAILED':
  52. try:
  53. index_unique_flag=[]
  54. if src.groupby(src_k).count().shape[0]==src.shape[0]:
  55. index_unique_flag.append(True)
  56. else:
  57. index_unique_flag.append(False)
  58. if tgt.groupby(tgt_k).count().shape[0]==tgt.shape[0]:
  59. index_unique_flag.append(True)
  60. else:
  61. index_unique_flag.append(False)
  62. except Exception as e:
  63. print('Failed while CHECKING IF THE GROUP BY MAKES THE RECORD LEVEL SAME AS ACTUAL: '+str(e))
  64. log_list.append('Failed while CHECKING IF THE GROUP BY MAKES THE RECORD LEVEL SAME AS ACTUAL: '+str(e))
  65. execution_status='FAILED'
  66. if execution_status!='FAILED':
  67. try:
  68. if all(index_unique_flag)==True:
  69. log_list.append(str(datetime.now())+': JOINING THE TABLES')
  70. try:
  71. df=tgt.set_index(tgt_k).join(src.set_index(src_k),how='left')
  72. except Exception as e:
  73. print('Failed while JOINING THE TABLES: '+str(e))
  74. log_list.append('Failed while JOINING THE TABLES: '+str(e))
  75. execution_status='FAILED'
  76. log_list.append(str(datetime.now())+': FINDING THE TARGET COLUMN AND SOURCE COLUMN TO BE COMPARED')
  77. if execution_status!='FAILED':
  78. try:
  79. ma_list=[]
  80. for i in range(len(df.columns)):
  81. if df.columns[i][-3:]=='tgt':
  82. for j in range(len(df.columns)):
  83. if df.columns[j][-3:]=='src':
  84. if str.lower(df.columns[i][:-4])==str.lower(df.columns[j][:-4]):
  85. ma_list.append([j,i])
  86. match_cols=''
  87. for i in range(len(ma_list)):
  88. match_cols+=str(i+1)+': '+df.columns[ma_list[i][1]]+' = '+df.columns[ma_list[i][0]]+' , '
  89. log_list.append('Matching columns '+match_cols)
  90. except Exception as e:
  91. print('Failed while FINDING THE TARGET COLUMN AND SOURCE COLUMN TO BE COMPARED: '+str(e))
  92. log_list.append('Failed while FINDING THE TARGET COLUMN AND SOURCE COLUMN TO BE COMPARED: '+str(e))
  93. execution_status='FAILED'
  94. log_list.append(str(datetime.now())+': COMPARISION STARTED')
  95. if execution_status!='FAILED':
  96. try:
  97. mis_cols=[]
  98. res=[]
  99. index=[]
  100. for i in range(len(ma_list)):
  101. if all(df[df.columns[ma_list[i][0]]].apply(lambda x:str(x).strip()).astype(str).fillna(str(0))==df[df.columns[ma_list[i][1]]].apply(lambda x:str(x).strip()).astype(str).fillna(str(0)))==True:
  102. res.append(True)
  103. else:
  104. res.append(False)
  105. mis_cols.append(df.columns[ma_list[i][0]])
  106. mis_cols.append(df.columns[ma_list[i][1]])
  107. for j in range(len(df[df.columns[ma_list[i][0]]].apply(lambda x:str(x).strip()).astype(str).fillna(str(0))==df[df.columns[ma_list[i][1]]].apply(lambda x:str(x).strip()).astype(str).fillna(str(0)))):
  108. if list(df[df.columns[ma_list[i][0]]].apply(lambda x:str(x).strip()).astype(str).fillna(str(0))==df[df.columns[ma_list[i][1]]].apply(lambda x:str(x).strip()).astype(str).fillna(str(0)))[j]==False:
  109. index.append(j)
  110. un_df=df[mis_cols].iloc[list(set(index))]
  111. except Exception as e:
  112. print('Failed while COMPARING: '+str(e))
  113. log_list.append('Failed while COMPARING: '+str(e))
  114. execution_status='FAILED'
  115. log_list.append(str(datetime.now())+': TEST RESULT:')
  116. if execution_status!='FAILED':
  117. try:
  118. if all(res)==True:
  119. mismatch_count=0
  120. print('COLUMN LEVEL CHECK PASSED')
  121. execution_status='SUCCESS'
  122. log_list.append('COLUMN LEVEL CHECK PASSED')
  123. else:
  124. log_list.append((str(len(set(index)))+' records unmatched'))
  125. log_list.append('Column level check Failed')
  126. mismatch_count=str(len(set(index)))
  127. execution_status='SUCCESS'
  128. except Exception as e:
  129. print('Failed while getting the TEST RESULT: '+str(e))
  130. log_list.append('Failed while getting the TEST RESULT: '+str(e))
  131. execution_status='FAILED'
  132. else:
  133. log_list.append('The records grouped at the level of key columns are not unique')
  134. except Exception as e:
  135. log_list.append('Failed while CHECKING IF THE GROUP BY MAKES THE RECORD LEVEL SAME AS ACTUAL: '+str(e))
  136. execution_status='FAILED'
  137. if execution_status=='FAILED':
  138. print('Check Logs for the error message')
  139. raise exceptionsExecutionError
  140. return mismatch_count,log_list,un_df
  141. def sort_and_compare(source_df,target_df):
  142. """
  143. Usage: [arg1]:[Pandas DataFrame - source], [arg2]:[Pandas DataFrame - target]
  144. Description: Sort and Compare two datasets.
  145. Returns: [Mismatch Count], [Test Log (list)], [Pandas dataframe - mismatch (if any)]
  146. """
  147. log_list=[]
  148. col1=source_df.columns
  149. col2=target_df.columns
  150. cols=list(set(col1.sort_values()).intersection(set(col2.sort_values())))
  151. log_list.append('Common column(s): '+', '.join(cols))
  152. source_df.sort_values(cols, axis=0, ascending=True, inplace=True)
  153. target_df.sort_values(cols, axis=0, ascending=True, inplace=True)
  154. data1=source_df[cols].reset_index(drop=True)
  155. data2=target_df[cols].reset_index(drop=True)
  156. data1.head()
  157. data2.head()
  158. result=data1==data2
  159. bool_list=[]
  160. mis_cols=[]
  161. mis_index=[]
  162. for i in cols:
  163. if all(result[i])==True:
  164. bool_list.append(True)
  165. else:
  166. bool_list.append(False)
  167. mis_cols.append(i)
  168. for j in range(len(result[i])):
  169. if result[i][j]==False:
  170. mis_index.append(j)
  171. un_df=pd.concat([data1.iloc[list(set(mis_index))],data2.iloc[list(set(mis_index))]],axis=1)
  172. mismatch_count=0
  173. if all(bool_list)==True:
  174. log_list.append('Records are matching')
  175. else:
  176. mismatch_count=len(set(mis_index))
  177. log_list.append(str(mismatch_count)+' records unmatched')
  178. log_list.append('Column(s): '+', '.join(mis_cols))
  179. return mismatch_count,log_list,un_df[mis_cols]