conftest.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743
  1. import datetime
  2. import json
  3. import logging
  4. import tempfile
  5. import time
  6. from copy import deepcopy
  7. from os import getenv
  8. from pathlib import Path
  9. from types import SimpleNamespace
  10. from urllib.parse import urljoin
  11. from uuid import uuid4
  12. import boto3
  13. import pytest
  14. from boto3.dynamodb.conditions import Key
  15. from botocore.exceptions import ClientError
  16. from botocore.waiter import WaiterModel, create_waiter_with_client
  17. from botocore.auth import SigV4Auth
  18. from botocore.awsrequest import AWSRequest
  19. from requests import Session
  20. from . import empty_table
  21. from backend.ecs_tasks.delete_files.cse import encrypt
  22. logger = logging.getLogger()
  23. #########
  24. # HOOKS #
  25. #########
  26. def pytest_configure(config):
  27. """
  28. Initial test env setup
  29. """
  30. pass
  31. def pytest_unconfigure(config):
  32. """
  33. Teardown actions
  34. """
  35. pass
  36. ############
  37. # FIXTURES #
  38. ############
  39. @pytest.fixture(scope="session")
  40. def stack():
  41. cloudformation = boto3.resource("cloudformation")
  42. stack = cloudformation.Stack(getenv("StackName", "S3F2"))
  43. return {o["OutputKey"]: o["OutputValue"] for o in stack.outputs}
  44. @pytest.fixture
  45. def config(stack):
  46. ssm = boto3.client("ssm")
  47. return json.loads(
  48. ssm.get_parameter(Name=stack["ConfigParameter"], WithDecryption=True)[
  49. "Parameter"
  50. ]["Value"]
  51. )
  52. @pytest.fixture
  53. def config_mutator(config, ssm_client, stack):
  54. ssm = boto3.client("ssm")
  55. def mutator(**kwargs):
  56. tmp = {**config, **kwargs}
  57. ssm.put_parameter(
  58. Name=stack["ConfigParameter"],
  59. Value=json.dumps(tmp),
  60. Type="String",
  61. Overwrite=True,
  62. )
  63. yield mutator
  64. ssm.put_parameter(
  65. Name=stack["ConfigParameter"],
  66. Value=json.dumps(config),
  67. Type="String",
  68. Overwrite=True,
  69. )
  70. @pytest.fixture(scope="session")
  71. def ddb_resource():
  72. return boto3.resource("dynamodb")
  73. @pytest.fixture(scope="session")
  74. def ddb_client():
  75. return boto3.client("dynamodb")
  76. @pytest.fixture(scope="session")
  77. def s3_resource():
  78. return boto3.resource("s3")
  79. @pytest.fixture(scope="session")
  80. def arn_partition():
  81. return boto3.session.Session().get_partition_for_region(
  82. getenv("AWS_DEFAULT_REGION", "eu-west-1")
  83. )
  84. @pytest.fixture(scope="session")
  85. def sf_client():
  86. return boto3.client("stepfunctions")
  87. @pytest.fixture(scope="session")
  88. def glue_client():
  89. return boto3.client("glue")
  90. @pytest.fixture(scope="session")
  91. def kms_client():
  92. return boto3.client("kms")
  93. @pytest.fixture(scope="session")
  94. def ssm_client():
  95. return boto3.client("ssm")
  96. @pytest.fixture(scope="session")
  97. def iam_client():
  98. return boto3.client("iam")
  99. @pytest.fixture(scope="session")
  100. def iam_arn():
  101. return boto3.client("sts").get_caller_identity()["Arn"]
  102. @pytest.fixture(scope="session")
  103. def glue_columns():
  104. return [
  105. {"Name": "customer_id", "Type": "string"},
  106. {"Name": "customerId", "Type": "int"},
  107. {"Name": "customer_id_decimal", "Type": "decimal(6,3)"},
  108. {
  109. "Name": "user_info",
  110. "Type": "struct<personal_information:struct<email:string,first_name:string,last_name:string>>",
  111. },
  112. {"Name": "days_off", "Type": "array<string>"},
  113. ]
  114. @pytest.fixture(scope="module")
  115. def cognito_token(stack):
  116. # Generate User in Cognito
  117. user_pool_id = stack["CognitoUserPoolId"]
  118. client_id = stack["CognitoUserPoolClientId"]
  119. username = "aws-uk-sa-builders@amazon.com"
  120. pwd = "!Acceptance1Tests2password!"
  121. auth_data = {"USERNAME": username, "PASSWORD": pwd}
  122. provider_client = boto3.client("cognito-idp")
  123. # Create the User
  124. provider_client.admin_create_user(
  125. UserPoolId=user_pool_id,
  126. Username=username,
  127. TemporaryPassword=pwd,
  128. MessageAction="SUPPRESS",
  129. )
  130. provider_client.admin_set_user_password(
  131. UserPoolId=user_pool_id, Username=username, Password=pwd, Permanent=True
  132. )
  133. # Allow admin login
  134. provider_client.update_user_pool_client(
  135. UserPoolId=user_pool_id,
  136. ClientId=client_id,
  137. ExplicitAuthFlows=[
  138. "ADMIN_NO_SRP_AUTH",
  139. ],
  140. )
  141. # Get JWT token for the dummy user
  142. resp = provider_client.admin_initiate_auth(
  143. UserPoolId=user_pool_id,
  144. AuthFlow="ADMIN_NO_SRP_AUTH",
  145. AuthParameters=auth_data,
  146. ClientId=client_id,
  147. )
  148. yield resp["AuthenticationResult"]["IdToken"]
  149. provider_client.admin_delete_user(UserPoolId=user_pool_id, Username=username)
  150. @pytest.fixture(scope="module")
  151. def api_client_cognito(cognito_token, stack):
  152. class ApiGwSession(Session):
  153. def __init__(self, base_url=None, default_headers=None):
  154. if default_headers is None:
  155. default_headers = {}
  156. self.base_url = base_url
  157. self.default_headers = default_headers
  158. super(ApiGwSession, self).__init__()
  159. def request(
  160. self, method, url, data=None, params=None, headers=None, *args, **kwargs
  161. ):
  162. url = urljoin("{}/v1/".format(self.base_url), url)
  163. merged_headers = deepcopy(self.default_headers)
  164. if isinstance(headers, dict):
  165. merged_headers.update(headers)
  166. return super(ApiGwSession, self).request(
  167. method, url, data, params, headers=merged_headers, *args, **kwargs
  168. )
  169. hds = {"Content-Type": "application/json"}
  170. if cognito_token:
  171. hds.update({"Authorization": "Bearer {}".format(cognito_token)})
  172. return ApiGwSession(stack["ApiUrl"], hds)
  173. @pytest.fixture(scope="module")
  174. def api_client_iam(stack):
  175. class ApiGwSession(Session):
  176. def __init__(self, base_url=None, default_headers=None):
  177. if default_headers is None:
  178. default_headers = {}
  179. self.base_url = base_url
  180. self.default_headers = default_headers
  181. super(ApiGwSession, self).__init__()
  182. def request(
  183. self, method, url, data=None, params=None, headers=None, *args, **kwargs
  184. ):
  185. url = urljoin("{}/v1/".format(self.base_url), url)
  186. merged_headers = deepcopy(self.default_headers)
  187. if isinstance(headers, dict):
  188. merged_headers.update(headers)
  189. if headers is None or "Authorization" not in headers.keys():
  190. creds = boto3.Session().get_credentials().get_frozen_credentials()
  191. if "json" in kwargs:
  192. request = AWSRequest(
  193. method=method.upper(), url=url, data=json.dumps(kwargs["json"])
  194. )
  195. else:
  196. request = AWSRequest(method=method.upper(), url=url)
  197. SigV4Auth(
  198. creds, "execute-api", getenv("AWS_DEFAULT_REGION", "eu-west-1")
  199. ).add_auth(request)
  200. merged_headers.update(dict(request.headers))
  201. return super(ApiGwSession, self).request(
  202. method, url, data, params, headers=merged_headers, *args, **kwargs
  203. )
  204. hds = {"Content-Type": "application/json"}
  205. return ApiGwSession(stack["ApiUrl"], hds)
  206. @pytest.fixture(scope="module")
  207. def queue_base_endpoint():
  208. return "queue"
  209. @pytest.fixture(scope="module")
  210. def settings_base_endpoint():
  211. return "settings"
  212. @pytest.fixture(scope="module")
  213. def queue_table(ddb_resource, stack):
  214. return ddb_resource.Table(stack["DeletionQueueTable"])
  215. @pytest.fixture
  216. def del_queue_factory(queue_table):
  217. def factory(
  218. match_id="testId",
  219. deletion_queue_item_id="id123",
  220. created_at=round(datetime.datetime.now(datetime.timezone.utc).timestamp()),
  221. data_mappers=[],
  222. matchid_type="Simple",
  223. ):
  224. item = {
  225. "DeletionQueueItemId": deletion_queue_item_id,
  226. "MatchId": match_id,
  227. "CreatedAt": created_at,
  228. "DataMappers": data_mappers,
  229. }
  230. if matchid_type:
  231. item["Type"] = matchid_type
  232. queue_table.put_item(Item=item)
  233. return item
  234. yield factory
  235. empty_table(queue_table, "DeletionQueueItemId")
  236. @pytest.fixture(scope="module")
  237. def data_mapper_base_endpoint():
  238. return "data_mappers"
  239. @pytest.fixture(scope="module")
  240. def data_mapper_table(ddb_resource, stack):
  241. return ddb_resource.Table(stack["DataMapperTable"])
  242. @pytest.fixture(scope="function")
  243. def empty_data_mappers(data_mapper_table):
  244. empty_table(data_mapper_table, "DataMapperId")
  245. yield
  246. empty_table(data_mapper_table, "DataMapperId")
  247. @pytest.fixture
  248. def glue_table_factory(dummy_lake, glue_client, glue_columns):
  249. items = []
  250. bucket_name = dummy_lake["bucket_name"]
  251. def factory(
  252. columns=glue_columns,
  253. fmt="parquet",
  254. database="acceptancetests",
  255. table="acceptancetests",
  256. prefix="prefix",
  257. partition_keys=[],
  258. partitions=[],
  259. partition_key_types="string",
  260. encrypted=False,
  261. ):
  262. glue_client.create_database(DatabaseInput={"Name": database})
  263. input_format = (
  264. "org.apache.hadoop.mapred.TextInputFormat"
  265. if fmt == "json"
  266. else "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"
  267. )
  268. output_format = (
  269. "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat"
  270. if fmt == "json"
  271. else "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"
  272. )
  273. ser_library = (
  274. "org.openx.data.jsonserde.JsonSerDe"
  275. if fmt == "json"
  276. else "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"
  277. )
  278. glue_client.create_table(
  279. DatabaseName=database,
  280. TableInput={
  281. "Name": table,
  282. "StorageDescriptor": {
  283. "Columns": columns,
  284. "Location": "s3://{bucket}/{prefix}/".format(
  285. bucket=bucket_name, prefix=prefix
  286. ),
  287. "InputFormat": input_format,
  288. "OutputFormat": output_format,
  289. "Compressed": False,
  290. "SerdeInfo": {
  291. "SerializationLibrary": ser_library,
  292. "Parameters": {"serialization.format": "1"},
  293. },
  294. "StoredAsSubDirectories": False,
  295. },
  296. "PartitionKeys": [
  297. {"Name": pk, "Type": partition_key_types} for pk in partition_keys
  298. ],
  299. "Parameters": {
  300. "EXTERNAL": "TRUE",
  301. "has_encrypted_data": str(encrypted).lower(),
  302. },
  303. },
  304. )
  305. for p in partitions:
  306. glue_client.create_partition(
  307. DatabaseName=database,
  308. TableName=table,
  309. PartitionInput={
  310. "Values": p,
  311. "StorageDescriptor": {
  312. "Columns": columns,
  313. "Location": "s3://{bucket}/{prefix}/{parts}/".format(
  314. bucket=dummy_lake["bucket_name"],
  315. prefix=prefix,
  316. parts="/".join(p),
  317. ),
  318. "InputFormat": input_format,
  319. "OutputFormat": output_format,
  320. "SerdeInfo": {
  321. "SerializationLibrary": ser_library,
  322. "Parameters": {"serialization.format": "1"},
  323. },
  324. },
  325. },
  326. )
  327. item = {"Database": database, "Table": table}
  328. items.append(item)
  329. return item
  330. yield factory
  331. for i in items:
  332. db_name = i["Database"]
  333. table_name = i["Table"]
  334. glue_client.delete_table(DatabaseName=db_name, Name=table_name)
  335. glue_client.delete_database(Name=db_name)
  336. @pytest.fixture
  337. def glue_data_mapper_factory(
  338. glue_client, data_mapper_table, glue_table_factory, glue_columns
  339. ):
  340. """
  341. Factory for registering a data mapper in DDB and createing a corresponding glue table
  342. """
  343. items = []
  344. def factory(
  345. data_mapper_id="test",
  346. columns=glue_columns,
  347. fmt="parquet",
  348. database="acceptancetests",
  349. table="acceptancetests",
  350. partition_keys=[],
  351. partitions=[],
  352. role_arn=None,
  353. delete_old_versions=False,
  354. ignore_object_not_found_exceptions=False,
  355. column_identifiers=["customer_id"],
  356. partition_key_types="string",
  357. encrypted=False,
  358. ):
  359. item = {
  360. "DataMapperId": data_mapper_id,
  361. "Columns": column_identifiers,
  362. "QueryExecutor": "athena",
  363. "QueryExecutorParameters": {
  364. "DataCatalogProvider": "glue",
  365. "Database": database,
  366. "Table": table,
  367. },
  368. "Format": fmt,
  369. "DeleteOldVersions": delete_old_versions,
  370. "IgnoreObjectNotFoundExceptions": ignore_object_not_found_exceptions,
  371. }
  372. if role_arn:
  373. item["RoleArn"] = role_arn
  374. data_mapper_table.put_item(Item=item)
  375. glue_table_factory(
  376. prefix=data_mapper_id,
  377. columns=columns,
  378. fmt=fmt,
  379. database=database,
  380. table=table,
  381. partition_keys=partition_keys,
  382. partitions=partitions,
  383. partition_key_types=partition_key_types,
  384. encrypted=encrypted,
  385. )
  386. items.append(item)
  387. return item
  388. yield factory
  389. empty_table(data_mapper_table, "DataMapperId")
  390. @pytest.fixture(scope="module")
  391. def jobs_endpoint():
  392. return "jobs"
  393. @pytest.fixture(scope="module")
  394. def job_table(ddb_resource, stack):
  395. return ddb_resource.Table(stack["JobTable"])
  396. @pytest.fixture(scope="module")
  397. def empty_jobs(job_table):
  398. empty_table(job_table, "Id", "Sk")
  399. yield
  400. empty_table(job_table, "Id", "Sk")
  401. @pytest.fixture
  402. def job_factory(job_table, sf_client, stack):
  403. items = []
  404. def factory(
  405. job_id=str(uuid4()),
  406. status="QUEUED",
  407. gsib="0",
  408. created_at=round(datetime.datetime.now().timestamp()),
  409. del_queue_items=[],
  410. **kwargs
  411. ):
  412. item = {
  413. "Id": job_id,
  414. "Sk": job_id,
  415. "Type": "Job",
  416. "JobStatus": status,
  417. "CreatedAt": created_at,
  418. "GSIBucket": gsib,
  419. "AthenaConcurrencyLimit": 15,
  420. "AthenaQueryMaxRetries": 0,
  421. "DeletionTasksMaxNumber": 1,
  422. "QueryExecutionWaitSeconds": 1,
  423. "QueryQueueWaitSeconds": 1,
  424. "ForgetQueueWaitSeconds": 5,
  425. **kwargs,
  426. }
  427. job_table.put_item(Item=item)
  428. items.append(
  429. "{}:{}".format(
  430. stack["StateMachineArn"].replace("stateMachine", "execution"), job_id
  431. )
  432. )
  433. return item
  434. yield factory
  435. empty_table(job_table, "Id", "Sk")
  436. for arn in items:
  437. try:
  438. sf_client.stop_execution(executionArn=arn)
  439. except Exception as e:
  440. logger.warning("Unable to stop execution: {}".format(str(e)))
  441. def get_waiter_model(config_file):
  442. waiter_dir = Path(__file__).parent.parent.joinpath("waiters")
  443. with open(waiter_dir.joinpath(config_file)) as f:
  444. config = json.load(f)
  445. return WaiterModel(config)
  446. @pytest.fixture(scope="session")
  447. def execution_waiter(sf_client):
  448. waiter_model = get_waiter_model("stepfunctions.json")
  449. return create_waiter_with_client("ExecutionComplete", waiter_model, sf_client)
  450. @pytest.fixture(scope="session")
  451. def execution_exists_waiter(sf_client):
  452. waiter_model = get_waiter_model("stepfunctions.json")
  453. return create_waiter_with_client("ExecutionExists", waiter_model, sf_client)
  454. @pytest.fixture(scope="session")
  455. def job_complete_waiter(ddb_client):
  456. waiter_model = get_waiter_model("jobs.json")
  457. return create_waiter_with_client("JobComplete", waiter_model, ddb_client)
  458. @pytest.fixture(scope="session")
  459. def job_finished_waiter(ddb_client):
  460. waiter_model = get_waiter_model("jobs.json")
  461. return create_waiter_with_client("JobFinished", waiter_model, ddb_client)
  462. @pytest.fixture(scope="session")
  463. def job_exists_waiter(ddb_client):
  464. waiter_model = get_waiter_model("jobs.json")
  465. return create_waiter_with_client("JobExists", waiter_model, ddb_client)
  466. @pytest.fixture
  467. def find_phase_ended_waiter(job_table):
  468. def wait(job_id):
  469. for _ in range(100):
  470. items = job_table.query(KeyConditionExpression=Key("Id").eq(job_id))
  471. for item in items["Items"]:
  472. if item.get("EventName") == "FindPhaseEnded":
  473. return
  474. time.sleep(3)
  475. assert False, "Timed out while waiting for find phase to end"
  476. ns = SimpleNamespace()
  477. ns.wait = wait
  478. return ns
  479. @pytest.fixture(scope="module")
  480. def empty_lake(dummy_lake):
  481. dummy_lake["bucket"].objects.delete()
  482. @pytest.fixture(scope="session")
  483. def dummy_lake(s3_resource, stack, data_access_role, arn_partition):
  484. # Lake Config
  485. bucket_name = "test-" + str(uuid4())
  486. # Create the bucket and Glue table
  487. bucket = s3_resource.Bucket(bucket_name)
  488. policy = s3_resource.BucketPolicy(bucket_name)
  489. bucket.create(
  490. CreateBucketConfiguration={
  491. "LocationConstraint": getenv("AWS_DEFAULT_REGION", "eu-west-1")
  492. },
  493. )
  494. bucket.wait_until_exists()
  495. s3_resource.BucketVersioning(bucket_name).enable()
  496. roles = [stack["AthenaExecutionRoleArn"], stack["DeleteTaskRoleArn"]]
  497. if data_access_role:
  498. roles.append(data_access_role["Arn"])
  499. policy.put(
  500. Policy=json.dumps(
  501. {
  502. "Version": "2012-10-17",
  503. "Statement": [
  504. {
  505. "Effect": "Allow",
  506. "Principal": {"AWS": roles},
  507. "Action": "s3:*",
  508. "Resource": [
  509. "arn:{}:s3:::{}".format(arn_partition, bucket_name),
  510. "arn:{}:s3:::{}/*".format(arn_partition, bucket_name),
  511. ],
  512. }
  513. ],
  514. }
  515. )
  516. )
  517. yield {"bucket_name": bucket_name, "bucket": bucket, "policy": policy}
  518. # Cleanup
  519. bucket.objects.delete()
  520. bucket.object_versions.delete()
  521. bucket.delete()
  522. @pytest.fixture
  523. def policy_changer(dummy_lake):
  524. bucket = dummy_lake["bucket"]
  525. policy = bucket.Policy()
  526. original = policy.policy
  527. def update_policy(temp_policy):
  528. policy.put(Policy=json.dumps(temp_policy))
  529. yield update_policy
  530. # reset policy back
  531. policy.put(Policy=original)
  532. @pytest.fixture
  533. def kms_factory(stack):
  534. key_id_arn = stack["KMSKeyArns"]
  535. return key_id_arn.split(",")[0].split("/")[1]
  536. @pytest.fixture
  537. def data_loader(dummy_lake):
  538. loaded_data = []
  539. bucket = dummy_lake["bucket"]
  540. def load_data(filename, object_key, **kwargs):
  541. file_path = str(Path(__file__).parent.joinpath("data").joinpath(filename))
  542. bucket.upload_file(file_path, object_key, ExtraArgs=kwargs)
  543. loaded_data.append(object_key)
  544. yield load_data
  545. for d in loaded_data:
  546. bucket.objects.filter(Prefix=d).delete()
  547. bucket.object_versions.filter(Prefix=d).delete()
  548. @pytest.fixture
  549. def encrypted_data_loader(dummy_lake, kms_client, data_loader):
  550. def load_data(filename, object_key, encryption_key, encryption_algorithm, **kwargs):
  551. file_path = str(Path(__file__).parent.joinpath("data").joinpath(filename))
  552. with open(file_path, "rb") as f:
  553. encrypted, metadata = encrypt(
  554. f,
  555. {
  556. "x-amz-matdesc": json.dumps({"kms_cmk_id": encryption_key}),
  557. "x-amz-cek-alg": encryption_algorithm,
  558. },
  559. kms_client,
  560. )
  561. tmp = tempfile.NamedTemporaryFile()
  562. with open(tmp.name, "wb") as f:
  563. f.write(encrypted.read())
  564. return data_loader(tmp.name, object_key, Metadata=metadata, **kwargs)
  565. yield load_data
  566. def fetch_total_messages(q):
  567. return int(q.attributes["ApproximateNumberOfMessages"]) + int(
  568. q.attributes["ApproximateNumberOfMessagesNotVisible"]
  569. )
  570. @pytest.fixture(scope="session")
  571. def query_queue(stack):
  572. queue = boto3.resource("sqs").Queue(stack["QueryQueueUrl"])
  573. if fetch_total_messages(queue) > 0:
  574. queue.purge()
  575. return queue
  576. @pytest.fixture(scope="session")
  577. def fargate_queue(stack):
  578. queue = boto3.resource("sqs").Queue(stack["DeletionQueueUrl"])
  579. if fetch_total_messages(queue) > 0:
  580. queue.purge()
  581. return queue
  582. @pytest.fixture
  583. def queue_reader(sf_client):
  584. def read(queue, msgs_to_read=10):
  585. messages = queue.receive_messages(
  586. WaitTimeSeconds=5, MaxNumberOfMessages=msgs_to_read
  587. )
  588. for message in messages:
  589. message.delete()
  590. body = json.loads(message.body)
  591. if body.get("TaskToken"):
  592. sf_client.send_task_success(
  593. taskToken=body["TaskToken"], output=json.dumps({})
  594. )
  595. return messages
  596. return read
  597. @pytest.fixture(scope="session")
  598. def data_access_role(iam_client):
  599. try:
  600. return iam_client.get_role(RoleName="S3F2DataAccessRole")["Role"]
  601. except ClientError as e:
  602. logger.warning(str(e))
  603. pytest.exit("Abandoning test run due to missing data access role", 1)