test_parquet.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399
  1. from io import BytesIO
  2. from mock import patch
  3. from decimal import Decimal
  4. import pyarrow as pa
  5. import pyarrow.json as pj
  6. import pyarrow.parquet as pq
  7. import pytest
  8. import pandas as pd
  9. import tempfile
  10. from backend.ecs_tasks.delete_files.parquet_handler import (
  11. delete_matches_from_parquet_file,
  12. delete_from_table,
  13. load_parquet,
  14. )
  15. pytestmark = [pytest.mark.unit, pytest.mark.ecs_tasks]
  16. @patch("backend.ecs_tasks.delete_files.parquet_handler.load_parquet")
  17. @patch("backend.ecs_tasks.delete_files.parquet_handler.delete_from_table")
  18. def test_it_generates_new_parquet_file_without_matches(mock_delete, mock_load_parquet):
  19. # Arrange
  20. column = {
  21. "Column": "customer_id",
  22. "MatchIds": ["12345", "23456"],
  23. "Type": "Simple",
  24. }
  25. data = [{"customer_id": "12345"}, {"customer_id": "34567"}]
  26. df = pd.DataFrame(data)
  27. buf = BytesIO()
  28. df.to_parquet(buf)
  29. br = pa.BufferReader(buf.getvalue())
  30. f = pq.ParquetFile(br, memory_map=False)
  31. mock_df = pd.DataFrame([{"customer_id": "12345"}])
  32. mock_delete.return_value = [pa.Table.from_pandas(mock_df), 1]
  33. mock_load_parquet.return_value = f
  34. # Act
  35. out, stats = delete_matches_from_parquet_file("input_file.parquet", column)
  36. assert isinstance(out, pa.BufferOutputStream)
  37. assert {"ProcessedRows": 2, "DeletedRows": 1} == stats
  38. res = pa.BufferReader(out.getvalue())
  39. newf = pq.ParquetFile(res, memory_map=False)
  40. assert 1 == newf.read().num_rows
  41. @patch("backend.ecs_tasks.delete_files.parquet_handler.load_parquet")
  42. def test_it_handles_files_with_multiple_row_groups_and_pandas_indexes(
  43. mock_load_parquet,
  44. ):
  45. # Arrange
  46. data = [
  47. {"customer_id": "12345"},
  48. {"customer_id": "34567"},
  49. ]
  50. columns = [{"Column": "customer_id", "MatchIds": ["12345"], "Type": "Simple"}]
  51. df = pd.DataFrame(data, list("ab"))
  52. table = pa.Table.from_pandas(df)
  53. buf = BytesIO()
  54. # Create parquet with multiple row groups
  55. with pq.ParquetWriter(buf, table.schema) as writer:
  56. for i in range(3):
  57. writer.write_table(table)
  58. br = pa.BufferReader(buf.getvalue())
  59. f = pq.ParquetFile(br, memory_map=False)
  60. mock_load_parquet.return_value = f
  61. # Act
  62. out, stats = delete_matches_from_parquet_file("input_file.parquet", columns)
  63. # Assert
  64. assert {"ProcessedRows": 6, "DeletedRows": 3} == stats
  65. res = pa.BufferReader(out.getvalue())
  66. newf = pq.ParquetFile(res, memory_map=False)
  67. assert 3 == newf.num_row_groups
  68. assert 3 == newf.read().num_rows
  69. def test_delete_correct_rows_from_table():
  70. data = [
  71. {"customer_id": "12345"},
  72. {"customer_id": "23456"},
  73. {"customer_id": "34567"},
  74. ]
  75. columns = [
  76. {"Column": "customer_id", "MatchIds": ["12345", "23456"], "Type": "Simple"}
  77. ]
  78. df = pd.DataFrame(data)
  79. table = pa.Table.from_pandas(df)
  80. table, deleted_rows = delete_from_table(table, columns)
  81. res = table.to_pandas()
  82. assert len(res) == 1
  83. assert deleted_rows == 2
  84. assert table.to_pydict() == {"customer_id": ["34567"]}
  85. def test_delete_handles_multiple_columns_with_no_rows_left():
  86. data = [
  87. {"customer_id": "12345", "other_customer_id": "23456"},
  88. ]
  89. columns = [
  90. {"Column": "customer_id", "MatchIds": ["12345"], "Type": "Simple"},
  91. {"Column": "other_customer_id", "MatchIds": ["23456"], "Type": "Simple"},
  92. ]
  93. df = pd.DataFrame(data)
  94. table = pa.Table.from_pandas(df)
  95. table, deleted_rows = delete_from_table(table, columns)
  96. res = table.to_pandas()
  97. assert len(res) == 0
  98. assert deleted_rows == 1
  99. def test_handles_lower_cased_column_names():
  100. data = [
  101. {"userData": {"customerId": "12345"}},
  102. {"userData": {"customerId": "23456"}},
  103. {"userData": {"customerId": "34567"}},
  104. ]
  105. columns = [
  106. {
  107. "Column": "userdata.customerid",
  108. "MatchIds": ["12345", "23456"],
  109. "Type": "Simple",
  110. }
  111. ]
  112. df = pd.DataFrame(data)
  113. table = pa.Table.from_pandas(df)
  114. table, deleted_rows = delete_from_table(table, columns)
  115. res = table.to_pandas()
  116. assert len(res) == 1
  117. assert deleted_rows == 2
  118. assert table.to_pydict() == {"userData": [{"customerId": "34567"}]}
  119. def test_it_handles_data_with_pandas_indexes():
  120. data = [
  121. {"customer_id": "12345"},
  122. {"customer_id": "23456"},
  123. {"customer_id": "34567"},
  124. ]
  125. columns = [
  126. {"Column": "customer_id", "MatchIds": ["12345", "23456"], "Type": "Simple"}
  127. ]
  128. df = pd.DataFrame(data, list("abc"))
  129. table = pa.Table.from_pandas(df)
  130. table, deleted_rows = delete_from_table(table, columns)
  131. res = table.to_pandas()
  132. assert len(res) == 1
  133. assert deleted_rows == 2
  134. assert table.to_pydict() == {"customer_id": ["34567"], "__index_level_0__": ["c"]}
  135. def test_delete_correct_rows_from_parquet_table_with_complex_types():
  136. data = {
  137. "customer_id": [12345, 23456, 34567],
  138. "user_info": [
  139. {"personal_information": {"name": "matteo", "email": "12345@test.com"}},
  140. {"personal_information": {"name": "nick", "email": "23456@test.com"}},
  141. {"personal_information": {"name": "chris", "email": "34567@test.com"}},
  142. ],
  143. }
  144. columns = [
  145. {
  146. "Column": "user_info.personal_information.name",
  147. "MatchIds": ["matteo", "chris"],
  148. "Type": "Simple",
  149. }
  150. ]
  151. df = pd.DataFrame(data)
  152. table = pa.Table.from_pandas(df)
  153. table, deleted_rows = delete_from_table(table, columns)
  154. res = table.to_pandas()
  155. assert len(res) == 1
  156. assert deleted_rows == 2
  157. assert res["customer_id"].values[0] == 23456
  158. # user_info is saved preserving original schema:
  159. assert res["user_info"].values[0] == {
  160. "personal_information": {"name": "nick", "email": "23456@test.com"}
  161. }
  162. def test_delete_correct_rows_from_parquet_table_with_composite_types_tuple_col():
  163. data = {
  164. "customer_id": [12345, 23456, 34567],
  165. "first_name": ["john", "jane", "matteo"],
  166. "last_name": ["doe", "doe", "hey"],
  167. }
  168. columns = [
  169. {
  170. "Columns": ["first_name", "last_name"],
  171. "MatchIds": [["john", "doe"], ["jane", "doe"], ["matteo", "doe"]],
  172. "Type": "Composite",
  173. }
  174. ]
  175. df = pd.DataFrame(data)
  176. table = pa.Table.from_pandas(df)
  177. table, deleted_rows = delete_from_table(table, columns)
  178. res = table.to_pandas()
  179. assert len(res) == 1
  180. assert deleted_rows == 2
  181. assert res["customer_id"].values[0] == 34567
  182. def test_delete_correct_rows_from_parquet_table_with_composite_types_single_col():
  183. data = {
  184. "customer_id": [12345, 23456, 34567],
  185. "first_name": ["john", "jane", "matteo"],
  186. "last_name": ["doe", "doe", "hey"],
  187. }
  188. columns = [{"Columns": ["last_name"], "MatchIds": [["doe"]], "Type": "Composite"}]
  189. df = pd.DataFrame(data)
  190. table = pa.Table.from_pandas(df)
  191. table, deleted_rows = delete_from_table(table, columns)
  192. res = table.to_pandas()
  193. assert len(res) == 1
  194. assert deleted_rows == 2
  195. assert res["customer_id"].values[0] == 34567
  196. def test_delete_correct_rows_from_parquet_table_with_composite_types_multiple_types():
  197. data = {
  198. "age": [11, 12, 12],
  199. "customer_id": [12345, 23456, 34567],
  200. "first_name": ["john", "jane", "matteo"],
  201. "last_name": ["doe", "doe", "hey"],
  202. }
  203. columns = [
  204. {
  205. "Columns": ["age", "last_name"],
  206. "MatchIds": [[12, "doe"]],
  207. "Type": "Composite",
  208. }
  209. ]
  210. df = pd.DataFrame(data)
  211. table = pa.Table.from_pandas(df)
  212. table, deleted_rows = delete_from_table(table, columns)
  213. res = table.to_pandas()
  214. assert len(res) == 2
  215. assert deleted_rows == 1
  216. assert res["customer_id"].values[0] == 12345
  217. assert res["customer_id"].values[1] == 34567
  218. def test_delete_correct_rows_from_parquet_table_with_complex_composite_types():
  219. data = {
  220. "customer_id": [12345, 23456, 34567],
  221. "details": [
  222. {"first_name": "John", "last_name": "Doe"},
  223. {"first_name": "Jane", "last_name": "Doe"},
  224. {"first_name": "Matteo", "last_name": "Hey"},
  225. ],
  226. }
  227. columns = [
  228. {
  229. "Columns": ["details.first_name", "details.last_name"],
  230. "MatchIds": [["John", "Doe"], ["Jane", "Doe"], ["Matteo", "Doe"]],
  231. "Type": "Composite",
  232. }
  233. ]
  234. df = pd.DataFrame(data)
  235. table = pa.Table.from_pandas(df)
  236. table, deleted_rows = delete_from_table(table, columns)
  237. res = table.to_pandas()
  238. assert len(res) == 1
  239. assert deleted_rows == 2
  240. assert res["customer_id"].values[0] == 34567
  241. def test_delete_correct_rows_from_parquet_table_with_both_simple_and_composite_types():
  242. data = {
  243. "customer_id": [12345, 23456, 34567],
  244. "first_name": ["john", "jane", "matteo"],
  245. "last_name": ["doe", "doe", "hey"],
  246. }
  247. columns = [
  248. {"Column": "customer_id", "MatchIds": [12345], "Type": "Simple"},
  249. {
  250. "Columns": ["first_name", "last_name"],
  251. "MatchIds": [["jane", "doe"]],
  252. "Type": "Composite",
  253. },
  254. ]
  255. df = pd.DataFrame(data)
  256. table = pa.Table.from_pandas(df)
  257. table, deleted_rows = delete_from_table(table, columns)
  258. res = table.to_pandas()
  259. assert len(res) == 1
  260. assert deleted_rows == 2
  261. assert res["customer_id"].values[0] == 34567
  262. def test_it_loads_parquet_files():
  263. data = [{"customer_id": "12345"}, {"customer_id": "23456"}]
  264. df = pd.DataFrame(data)
  265. buf = BytesIO()
  266. df.to_parquet(buf, compression="snappy")
  267. resp = load_parquet(
  268. pa.BufferReader(buf.getvalue())
  269. ) # BufferReader inherits from NativeFile
  270. assert 2 == resp.read().num_rows
  271. def test_delete_correct_rows_from_parquet_table_with_decimal_types():
  272. data = {
  273. "customer_id_decimal": [
  274. Decimal("123.450"),
  275. Decimal("234.560"),
  276. Decimal("345.670"),
  277. ]
  278. }
  279. columns = [
  280. {
  281. "Column": "customer_id_decimal",
  282. "MatchIds": ["123.450", "234.560"],
  283. "Type": "Simple",
  284. },
  285. ]
  286. df = pd.DataFrame(data)
  287. table = pa.Table.from_pandas(df)
  288. table, deleted_rows = delete_from_table(table, columns)
  289. res = table.to_pandas()
  290. assert len(res) == 1
  291. assert deleted_rows == 2
  292. assert res["customer_id_decimal"].values[0] == Decimal("345.670")
  293. def test_delete_correct_rows_from_parquet_table_with_decimal_complex_types():
  294. data = {
  295. "customer_id": [12345, 23456, 34567],
  296. "user_info": [
  297. {"personal_information": {"name": "matteo", "decimal": Decimal("12.34")}},
  298. {"personal_information": {"name": "nick", "decimal": Decimal("23.45")}},
  299. {"personal_information": {"name": "chris", "decimal": Decimal("34.56")}},
  300. ],
  301. }
  302. columns = [
  303. {
  304. "Column": "user_info.personal_information.decimal",
  305. "MatchIds": ["12.34", "34.56"],
  306. "Type": "Simple",
  307. }
  308. ]
  309. df = pd.DataFrame(data)
  310. table = pa.Table.from_pandas(df)
  311. table, deleted_rows = delete_from_table(table, columns)
  312. res = table.to_pandas()
  313. assert len(res) == 1
  314. assert deleted_rows == 2
  315. assert res["customer_id"].values[0] == 23456
  316. # user_info is saved preserving original schema:
  317. assert res["user_info"].values[0] == {
  318. "personal_information": {"name": "nick", "decimal": Decimal("23.45")}
  319. }
  320. def test_delete_correct_rows_from_parquet_table_with_decimal_complex_composite_types():
  321. data = {
  322. "customer_id": [12345, 23456, 34567],
  323. "user_info": [
  324. {"personal_information": {"name": "matteo", "decimal": Decimal("12.34")}},
  325. {"personal_information": {"name": "nick", "decimal": Decimal("23.45")}},
  326. {"personal_information": {"name": "chris", "decimal": Decimal("34.56")}},
  327. ],
  328. }
  329. columns = [
  330. {
  331. "Columns": [
  332. "user_info.personal_information.name",
  333. "user_info.personal_information.decimal",
  334. ],
  335. "MatchIds": [["matteo", "12.34"], ["chris", "34.56"], ["nick", "11.22"]],
  336. "Type": "Composite",
  337. }
  338. ]
  339. df = pd.DataFrame(data)
  340. table = pa.Table.from_pandas(df)
  341. table, deleted_rows = delete_from_table(table, columns)
  342. res = table.to_pandas()
  343. assert len(res) == 1
  344. assert deleted_rows == 2
  345. assert res["customer_id"].values[0] == 23456
  346. # user_info is saved preserving original schema:
  347. assert res["user_info"].values[0] == {
  348. "personal_information": {"name": "nick", "decimal": Decimal("23.45")}
  349. }
  350. def test_it_throws_for_invalid_schema_column_not_found():
  351. with pytest.raises(ValueError) as e:
  352. data = {"customer_id": [12345, 23456, 34567]}
  353. columns = [
  354. {
  355. "Column": "user_info.personal_information.name",
  356. "MatchIds": ["matteo"],
  357. "Type": "Simple",
  358. }
  359. ]
  360. df = pd.DataFrame(data)
  361. table = pa.Table.from_pandas(df)
  362. table, deleted_rows = delete_from_table(table, columns)
  363. assert e.value.args[0] == "Column user_info.personal_information.name not found."