test_json.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338
  1. from mock import patch
  2. import gzip
  3. import pyarrow as pa
  4. import pyarrow.parquet as pq
  5. import pytest
  6. import pandas as pd
  7. import tempfile
  8. from backend.ecs_tasks.delete_files.json_handler import delete_matches_from_json_file
  9. pytestmark = [pytest.mark.unit, pytest.mark.ecs_tasks]
  10. def test_it_generates_new_json_file_without_matches():
  11. # Arrange
  12. to_delete = [{"Column": "customer_id", "MatchIds": ["23456"], "Type": "Simple"}]
  13. data = (
  14. '{"customer_id": "12345", "x": 1.2, "d":"2001-01-01"}\n'
  15. '{"customer_id": "23456", "x": 2.3, "d":"2001-01-03"}\n'
  16. '{"customer_id": "34567", "x": 3.4, "d":"2001-01-05"}\n'
  17. )
  18. out_stream = to_json_file(data)
  19. # Act
  20. out, stats = delete_matches_from_json_file(out_stream, to_delete)
  21. assert isinstance(out, pa.BufferOutputStream)
  22. assert {"ProcessedRows": 3, "DeletedRows": 1} == stats
  23. assert to_json_string(out) == (
  24. '{"customer_id": "12345", "x": 1.2, "d":"2001-01-01"}\n'
  25. '{"customer_id": "34567", "x": 3.4, "d":"2001-01-05"}\n'
  26. )
  27. def test_it_handles_json_with_gzip_compression():
  28. # Arrange
  29. to_delete = [{"Column": "customer_id", "MatchIds": ["23456"], "Type": "Simple"}]
  30. data = (
  31. '{"customer_id": "12345", "x": 7, "d":"2001-01-01"}\n'
  32. '{"customer_id": "23456", "x": 8, "d":"2001-01-03"}\n'
  33. '{"customer_id": "34567", "x": 9, "d":"2001-01-05"}\n'
  34. )
  35. out_stream = to_compressed_json_file(data)
  36. # Act
  37. out, stats = delete_matches_from_json_file(out_stream, to_delete, True)
  38. assert isinstance(out, pa.BufferOutputStream)
  39. assert {"ProcessedRows": 3, "DeletedRows": 1} == stats
  40. assert to_decompressed_json_string(out) == (
  41. '{"customer_id": "12345", "x": 7, "d":"2001-01-01"}\n'
  42. '{"customer_id": "34567", "x": 9, "d":"2001-01-05"}\n'
  43. )
  44. def test_delete_correct_rows_when_missing_newline_at_the_end():
  45. # Arrange
  46. to_delete = [{"Column": "customer_id", "MatchIds": ["23456"], "Type": "Simple"}]
  47. data = (
  48. '{"customer_id": "12345", "x": 1.2, "d":"2001-01-01"}\n'
  49. '{"customer_id": "23456", "x": 2.3, "d":"2001-01-03"}\n'
  50. '{"customer_id": "34567", "x": 3.4, "d":"2001-01-05"}'
  51. )
  52. out_stream = to_json_file(data)
  53. # Act
  54. out, stats = delete_matches_from_json_file(out_stream, to_delete, [])
  55. assert isinstance(out, pa.BufferOutputStream)
  56. assert {"ProcessedRows": 3, "DeletedRows": 1} == stats
  57. assert to_json_string(out) == (
  58. '{"customer_id": "12345", "x": 1.2, "d":"2001-01-01"}\n'
  59. '{"customer_id": "34567", "x": 3.4, "d":"2001-01-05"}\n'
  60. )
  61. def test_delete_correct_rows_containing_newlines_as_content():
  62. # UNICODE_NEWLINE_SEP = '\u2028'
  63. # Arrange
  64. to_delete = [{"Column": "customer_id", "MatchIds": ["12345"], "Type": "Simple"}]
  65. data = (
  66. '{"customer_id": "12345", "d": "foo"}\n'
  67. '{"customer_id": "23456", "d": "foo\u2028\\nbar"}\n'
  68. '{"customer_id": "34567", "d": "bar"}\n'
  69. )
  70. out_stream = to_json_file(data)
  71. # Act
  72. out, stats = delete_matches_from_json_file(out_stream, to_delete, [])
  73. assert isinstance(out, pa.BufferOutputStream)
  74. assert {"ProcessedRows": 3, "DeletedRows": 1} == stats
  75. assert to_json_string(out) == (
  76. '{"customer_id": "23456", "d": "foo\u2028\\nbar"}\n'
  77. '{"customer_id": "34567", "d": "bar"}\n'
  78. )
  79. def test_delete_correct_rows_from_json_file_with_complex_types():
  80. # Arrange
  81. to_delete = [{"Column": "user.id", "MatchIds": ["23456"], "Type": "Simple"}]
  82. data = (
  83. '{"user": {"id": "12345", "name": "John"}, "d":["2001-01-01"]}\n'
  84. '{"user": {"id": "23456", "name": "Jane"}, "d":[]}\n'
  85. '{"user": {"id": "34567", "name": "Mary"}, "d":["2001-01-08"]}\n'
  86. )
  87. out_stream = to_json_file(data)
  88. # Act
  89. out, stats = delete_matches_from_json_file(out_stream, to_delete, [])
  90. assert isinstance(out, pa.BufferOutputStream)
  91. assert {"ProcessedRows": 3, "DeletedRows": 1} == stats
  92. assert to_json_string(out) == (
  93. '{"user": {"id": "12345", "name": "John"}, "d":["2001-01-01"]}\n'
  94. '{"user": {"id": "34567", "name": "Mary"}, "d":["2001-01-08"]}\n'
  95. )
  96. def test_delete_correct_rows_from_json_file_with_composite_types_tuple_col():
  97. # Arrange
  98. to_delete = [
  99. {
  100. "Columns": ["first_name", "last_name"],
  101. "MatchIds": [["John", "Doe"], ["Jane", "Doe"], ["Mary", "Doe"]],
  102. "Type": "Composite",
  103. }
  104. ]
  105. data = (
  106. '{"customer_id": 12345, "first_name": "John", "last_name": "Doe"}\n'
  107. '{"customer_id": 23456, "first_name": "Jane", "last_name": "Doe"}\n'
  108. '{"customer_id": 34567, "first_name": "Mary", "last_name": "Hey"}\n'
  109. )
  110. out_stream = to_json_file(data)
  111. # Act
  112. out, stats = delete_matches_from_json_file(out_stream, to_delete)
  113. assert isinstance(out, pa.BufferOutputStream)
  114. assert {"ProcessedRows": 3, "DeletedRows": 2} == stats
  115. assert to_json_string(out) == (
  116. '{"customer_id": 34567, "first_name": "Mary", "last_name": "Hey"}\n'
  117. )
  118. def test_delete_correct_rows_from_json_file_with_composite_types_single_col():
  119. # Arrange
  120. to_delete = [
  121. {
  122. "Columns": ["last_name"],
  123. "MatchIds": [["Doe"]],
  124. "Type": "Composite",
  125. }
  126. ]
  127. data = (
  128. '{"customer_id": 12345, "first_name": "John", "last_name": "Doe"}\n'
  129. '{"customer_id": 23456, "first_name": "Jane", "last_name": "Doe"}\n'
  130. '{"customer_id": 34567, "first_name": "Mary", "last_name": "Hey"}\n'
  131. )
  132. out_stream = to_json_file(data)
  133. # Act
  134. out, stats = delete_matches_from_json_file(out_stream, to_delete)
  135. assert isinstance(out, pa.BufferOutputStream)
  136. assert {"ProcessedRows": 3, "DeletedRows": 2} == stats
  137. assert to_json_string(out) == (
  138. '{"customer_id": 34567, "first_name": "Mary", "last_name": "Hey"}\n'
  139. )
  140. def test_delete_correct_rows_from_json_file_with_composite_types_with_nullable_or_undefined_identifiers():
  141. # Arrange
  142. to_delete = [
  143. {
  144. "Columns": ["user.name", "parents.mother"],
  145. "MatchIds": [["John", "23456"]],
  146. "Type": "Composite",
  147. }
  148. ]
  149. data = (
  150. '{"user": {"id": "12345", "name": "John"}, "parents": {"mother": "23456"}}\n'
  151. '{"user": {"id": "23456", "name": "John"}, "parents": {"mother": null}}\n'
  152. '{"user": {"id": "34567", "name": "John"}}\n'
  153. '{"user": {"id": "45678", "name": "John"}, "parents": {}}\n'
  154. '{"user": {"id": "45678", "name": "John"}, "parents": null}\n'
  155. )
  156. out_stream = to_json_file(data)
  157. # Act
  158. out, stats = delete_matches_from_json_file(out_stream, to_delete)
  159. assert isinstance(out, pa.BufferOutputStream)
  160. assert {"ProcessedRows": 5, "DeletedRows": 1} == stats
  161. assert to_json_string(out) == (
  162. '{"user": {"id": "23456", "name": "John"}, "parents": {"mother": null}}\n'
  163. '{"user": {"id": "34567", "name": "John"}}\n'
  164. '{"user": {"id": "45678", "name": "John"}, "parents": {}}\n'
  165. '{"user": {"id": "45678", "name": "John"}, "parents": null}\n'
  166. )
  167. def test_delete_correct_rows_from_json_file_with_composite_types_multiple_types():
  168. # Arrange
  169. to_delete = [
  170. {
  171. "Columns": ["age", "last_name"],
  172. "MatchIds": [[12, "Doe"]],
  173. "Type": "Composite",
  174. }
  175. ]
  176. data = (
  177. '{"customer_id": 12345, "first_name": "John", "last_name": "Doe", "age": 11}\n'
  178. '{"customer_id": 23456, "first_name": "Jane", "last_name": "Doe", "age": 12}\n'
  179. '{"customer_id": 34567, "first_name": "Mary", "last_name": "Hey", "age": 12}\n'
  180. )
  181. out_stream = to_json_file(data)
  182. # Act
  183. out, stats = delete_matches_from_json_file(out_stream, to_delete)
  184. assert isinstance(out, pa.BufferOutputStream)
  185. assert {"ProcessedRows": 3, "DeletedRows": 1} == stats
  186. assert to_json_string(out) == (
  187. '{"customer_id": 12345, "first_name": "John", "last_name": "Doe", "age": 11}\n'
  188. '{"customer_id": 34567, "first_name": "Mary", "last_name": "Hey", "age": 12}\n'
  189. )
  190. def test_delete_correct_rows_from_json_file_with_both_simple_and_composite_types():
  191. # Arrange
  192. to_delete = [
  193. {"Column": "customer_id", "MatchIds": [12345], "Type": "Simple"},
  194. {
  195. "Columns": ["first_name", "last_name"],
  196. "MatchIds": [["Jane", "Doe"]],
  197. "Type": "Composite",
  198. },
  199. ]
  200. data = (
  201. '{"customer_id": 12345, "first_name": "John", "last_name": "Doe"}\n'
  202. '{"customer_id": 23456, "first_name": "Jane", "last_name": "Doe"}\n'
  203. '{"customer_id": 34567, "first_name": "Mary", "last_name": "Hey"}\n'
  204. )
  205. out_stream = to_json_file(data)
  206. # Act
  207. out, stats = delete_matches_from_json_file(out_stream, to_delete)
  208. assert isinstance(out, pa.BufferOutputStream)
  209. assert {"ProcessedRows": 3, "DeletedRows": 2} == stats
  210. assert to_json_string(out) == (
  211. '{"customer_id": 34567, "first_name": "Mary", "last_name": "Hey"}\n'
  212. )
  213. def test_delete_correct_rows_from_json_file_with_nullable_or_undefined_identifiers():
  214. # Arrange
  215. to_delete = [{"Column": "parents.mother", "MatchIds": ["23456"], "Type": "Simple"}]
  216. data = (
  217. '{"user": {"id": "12345", "name": "John"}, "parents": {"mother": "23456"}}\n'
  218. '{"user": {"id": "23456", "name": "Jane"}, "parents": {"mother": null}}\n'
  219. '{"user": {"id": "34567", "name": "Mary"}}\n'
  220. '{"user": {"id": "45678", "name": "Mike"}, "parents": {}}\n'
  221. '{"user": {"id": "45678", "name": "Anna"}, "parents": null}\n'
  222. )
  223. out_stream = to_json_file(data)
  224. # Act
  225. out, stats = delete_matches_from_json_file(out_stream, to_delete)
  226. assert isinstance(out, pa.BufferOutputStream)
  227. assert {"ProcessedRows": 5, "DeletedRows": 1} == stats
  228. assert to_json_string(out) == (
  229. '{"user": {"id": "23456", "name": "Jane"}, "parents": {"mother": null}}\n'
  230. '{"user": {"id": "34567", "name": "Mary"}}\n'
  231. '{"user": {"id": "45678", "name": "Mike"}, "parents": {}}\n'
  232. '{"user": {"id": "45678", "name": "Anna"}, "parents": null}\n'
  233. )
  234. def test_delete_correct_rows_from_json_file_with_lower_cased_column_id():
  235. # Arrange
  236. to_delete = [{"Column": "userid", "MatchIds": ["23456"], "Type": "Simple"}]
  237. data = (
  238. '{"userId": "12345", "fullName": "JohnDoe"}\n'
  239. '{"userId": "23456", "fullName": "JaneDoe"}\n'
  240. '{"userId": "34567", "fullName": "MaryMary"}\n'
  241. )
  242. out_stream = to_json_file(data)
  243. # Act
  244. out, stats = delete_matches_from_json_file(out_stream, to_delete)
  245. assert isinstance(out, pa.BufferOutputStream)
  246. assert {"ProcessedRows": 3, "DeletedRows": 1} == stats
  247. assert to_json_string(out) == (
  248. '{"userId": "12345", "fullName": "JohnDoe"}\n'
  249. '{"userId": "34567", "fullName": "MaryMary"}\n'
  250. )
  251. def test_delete_correct_rows_from_json_file_with_multiple_identifiers():
  252. # Arrange
  253. to_delete = [
  254. {"Column": "user.id", "MatchIds": ["23456"], "Type": "Simple"},
  255. {"Column": "mother", "MatchIds": ["23456"], "Type": "Simple"},
  256. ]
  257. data = (
  258. '{"user": {"id": "12345", "name": "John"}, "mother": "23456"}\n'
  259. '{"user": {"id": "23456", "name": "Jane"}, "mother": null}\n'
  260. '{"user": {"id": "34567", "name": "Mary"}}\n'
  261. )
  262. out_stream = to_json_file(data)
  263. # Act
  264. out, stats = delete_matches_from_json_file(out_stream, to_delete)
  265. assert isinstance(out, pa.BufferOutputStream)
  266. assert {"ProcessedRows": 3, "DeletedRows": 2} == stats
  267. assert to_json_string(out) == '{"user": {"id": "34567", "name": "Mary"}}\n'
  268. def test_it_throws_meaningful_error_for_serialization_issues():
  269. # Arrange
  270. to_delete = [{"Column": "customer_id", "MatchIds": ["23456"], "Type": "Simple"}]
  271. data = (
  272. '{"customer_id": "12345", "x": 1.2, "d":"2001-01-01"}\n'
  273. '{"customer_id": "23456", "x": 2.3, "d":"invalid\n'
  274. '{"customer_id": "34567", "x": 3.4, "d":"2001-01-05"}\n'
  275. )
  276. out_stream = to_json_file(data)
  277. # Act
  278. with pytest.raises(ValueError) as e:
  279. out, stats = delete_matches_from_json_file(out_stream, to_delete)
  280. assert e.value.args[0] == (
  281. "Serialization error when parsing JSON lines: "
  282. "Unterminated string starting at: line 2 column 40 (char 39)"
  283. )
  284. def to_json_file(data, compressed=False):
  285. mode = "wb" if compressed else "w+t"
  286. tmp = tempfile.NamedTemporaryFile(mode=mode)
  287. tmp.write(data)
  288. tmp.flush()
  289. return open(tmp.name, "rb")
  290. def to_compressed_json_file(data):
  291. return to_json_file(gzip.compress(bytes(data, "utf-8")), True)
  292. def to_json_string(buf, compressed=False):
  293. tmp = tempfile.NamedTemporaryFile(mode="wb")
  294. tmp.write(buf.getvalue())
  295. tmp.flush()
  296. mode = "rb" if compressed else "r"
  297. result = open(tmp.name, mode)
  298. return result.read()
  299. def to_decompressed_json_string(buf):
  300. return gzip.decompress(to_json_string(buf, True)).decode("utf-8")