123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146 |
- from re import A
- import boto3
- import botocore
- import click
- import configparser
- from csv import DictWriter
- import io
- import itertools
- import json
- import mimetypes
- import os
- import re
- import sys
- import textwrap
- from . import policies
- def bucket_exists(s3, bucket):
- try:
- s3.head_bucket(Bucket=bucket)
- return True
- except botocore.exceptions.ClientError:
- return False
- def user_exists(iam, username):
- try:
- iam.get_user(UserName=username)
- return True
- except iam.exceptions.NoSuchEntityException:
- return False
- def common_boto3_options(fn):
- for decorator in reversed(
- (
- click.option(
- "--access-key",
- help="AWS access key ID",
- ),
- click.option(
- "--secret-key",
- help="AWS secret access key",
- ),
- click.option(
- "--session-token",
- help="AWS session token",
- ),
- click.option(
- "--endpoint-url",
- help="Custom endpoint URL",
- ),
- click.option(
- "-a",
- "--auth",
- type=click.File("r"),
- help="Path to JSON/INI file containing credentials",
- ),
- )
- ):
- fn = decorator(fn)
- return fn
- def common_output_options(fn):
- for decorator in reversed(
- (
- click.option("--nl", help="Output newline-delimited JSON", is_flag=True),
- click.option("--csv", help="Output CSV", is_flag=True),
- click.option("--tsv", help="Output TSV", is_flag=True),
- )
- ):
- fn = decorator(fn)
- return fn
- @click.group()
- @click.version_option()
- def cli():
- "A tool for creating credentials for accessing S3 buckets"
- class PolicyParam(click.ParamType):
- "Returns string of guaranteed well-formed JSON"
- name = "policy"
- def convert(self, policy, param, ctx):
- if policy.strip().startswith("{"):
- # Verify policy string is valid JSON
- try:
- json.loads(policy)
- except ValueError:
- self.fail("Invalid JSON string")
- return policy
- else:
- # Assume policy is a file path or '-'
- try:
- with click.open_file(policy) as f:
- contents = f.read()
- try:
- json.loads(contents)
- return contents
- except ValueError:
- self.fail(
- "{} contained invalid JSON".format(
- "Input" if policy == "-" else "File"
- )
- )
- except FileNotFoundError:
- self.fail("File not found")
- class DurationParam(click.ParamType):
- name = "duration"
- pattern = re.compile(r"^(\d+)(m|h|s)?$")
- def convert(self, value, param, ctx):
- match = self.pattern.match(value)
- if match is None:
- self.fail("Duration must be of form 3600s or 15m or 2h")
- integer_string, suffix = match.groups()
- integer = int(integer_string)
- if suffix == "m":
- integer *= 60
- elif suffix == "h":
- integer *= 3600
- # Must be between 15 minutes and 12 hours
- if not (15 * 60 <= integer <= 12 * 60 * 60):
- self.fail("Duration must be between 15 minutes and 12 hours")
- return integer
- class StatementParam(click.ParamType):
- "Ensures statement is valid JSON with required fields"
- name = "statement"
- def convert(self, statement, param, ctx):
- try:
- data = json.loads(statement)
- except ValueError:
- self.fail("Invalid JSON string")
- if not isinstance(data, dict):
- self.fail("JSON must be an object")
- missing_keys = {"Effect", "Action", "Resource"} - data.keys()
- if missing_keys:
- self.fail(
- "Statement JSON missing required keys: {}".format(
- ", ".join(sorted(missing_keys))
- )
- )
- return data
- @cli.command()
- @click.argument(
- "buckets",
- nargs=-1,
- required=True,
- )
- @click.option("--read-only", help="Only allow reading from the bucket", is_flag=True)
- @click.option("--write-only", help="Only allow writing to the bucket", is_flag=True)
- @click.option(
- "--prefix", help="Restrict to keys starting with this prefix", default="*"
- )
- @click.option(
- "extra_statements",
- "--statement",
- multiple=True,
- type=StatementParam(),
- help="JSON statement to add to the policy",
- )
- @click.option(
- "--public-bucket",
- help="Bucket policy for allowing public access",
- is_flag=True,
- )
- def policy(buckets, read_only, write_only, prefix, extra_statements, public_bucket):
- """
- Output generated JSON policy for one or more buckets
- Takes the same options as s3-credentials create
- To output a read-only JSON policy for a bucket:
- s3-credentials policy my-bucket --read-only
- """
- "Generate JSON policy for one or more buckets"
- if public_bucket:
- if len(buckets) != 1:
- raise click.ClickException(
- "--public-bucket policy can only be generated for a single bucket"
- )
- click.echo(
- json.dumps(policies.bucket_policy_allow_all_get(buckets[0]), indent=4)
- )
- return
- permission = "read-write"
- if read_only:
- permission = "read-only"
- if write_only:
- permission = "write-only"
- statements = []
- if permission == "read-write":
- for bucket in buckets:
- statements.extend(policies.read_write_statements(bucket, prefix))
- elif permission == "read-only":
- for bucket in buckets:
- statements.extend(policies.read_only_statements(bucket, prefix))
- elif permission == "write-only":
- for bucket in buckets:
- statements.extend(policies.write_only_statements(bucket, prefix))
- else:
- assert False, "Unknown permission: {}".format(permission)
- if extra_statements:
- statements.extend(extra_statements)
- bucket_access_policy = policies.wrap_policy(statements)
- click.echo(json.dumps(bucket_access_policy, indent=4))
- @cli.command()
- @click.argument(
- "buckets",
- nargs=-1,
- required=True,
- )
- @click.option(
- "format_",
- "-f",
- "--format",
- type=click.Choice(["ini", "json"]),
- default="json",
- help="Output format for credentials",
- )
- @click.option(
- "-d",
- "--duration",
- type=DurationParam(),
- help="How long should these credentials work for? Default is forever, use 3600 for 3600 seconds, 15m for 15 minutes, 1h for 1 hour",
- )
- @click.option("--username", help="Username to create or existing user to use")
- @click.option(
- "-c",
- "--create-bucket",
- help="Create buckets if they do not already exist",
- is_flag=True,
- )
- @click.option(
- "--prefix", help="Restrict to keys starting with this prefix", default="*"
- )
- @click.option(
- "--public",
- help="Make the created bucket public: anyone will be able to download files if they know their name",
- is_flag=True,
- )
- @click.option("--read-only", help="Only allow reading from the bucket", is_flag=True)
- @click.option("--write-only", help="Only allow writing to the bucket", is_flag=True)
- @click.option(
- "--policy",
- type=PolicyParam(),
- help="Path to a policy.json file, or literal JSON string - $!BUCKET_NAME!$ will be replaced with the name of the bucket",
- )
- @click.option(
- "extra_statements",
- "--statement",
- multiple=True,
- type=StatementParam(),
- help="JSON statement to add to the policy",
- )
- @click.option("--bucket-region", help="Region in which to create buckets")
- @click.option("--silent", help="Don't show performed steps", is_flag=True)
- @click.option("--dry-run", help="Show steps without executing them", is_flag=True)
- @click.option(
- "--user-permissions-boundary",
- help=(
- "Custom permissions boundary to use for created users, or 'none' to "
- "create without. Defaults to limiting to S3 based on "
- "--read-only and --write-only options."
- ),
- )
- @common_boto3_options
- def create(
- buckets,
- format_,
- duration,
- username,
- create_bucket,
- prefix,
- public,
- read_only,
- write_only,
- policy,
- extra_statements,
- bucket_region,
- user_permissions_boundary,
- silent,
- dry_run,
- **boto_options
- ):
- """
- Create and return new AWS credentials for specified S3 buckets - optionally
- also creating the bucket if it does not yet exist.
- To create a new bucket and output read-write credentials:
- s3-credentials create my-new-bucket -c
- To create read-only credentials for an existing bucket:
- s3-credentials create my-existing-bucket --read-only
- To create write-only credentials that are only valid for 15 minutes:
- s3-credentials create my-existing-bucket --write-only -d 15m
- """
- if read_only and write_only:
- raise click.ClickException(
- "Cannot use --read-only and --write-only at the same time"
- )
- extra_statements = list(extra_statements)
- def log(message):
- if not silent:
- click.echo(message, err=True)
- permission = "read-write"
- if read_only:
- permission = "read-only"
- if write_only:
- permission = "write-only"
- s3 = None
- iam = None
- sts = None
- if not dry_run:
- s3 = make_client("s3", **boto_options)
- iam = make_client("iam", **boto_options)
- sts = make_client("sts", **boto_options)
- # Verify buckets
- for bucket in buckets:
- # Create bucket if it doesn't exist
- if dry_run or (not bucket_exists(s3, bucket)):
- if (not dry_run) and (not create_bucket):
- raise click.ClickException(
- "Bucket does not exist: {} - try --create-bucket to create it".format(
- bucket
- )
- )
- if dry_run or create_bucket:
- kwargs = {}
- if bucket_region:
- kwargs = {
- "CreateBucketConfiguration": {
- "LocationConstraint": bucket_region
- }
- }
- bucket_policy = {}
- if public:
- bucket_policy = policies.bucket_policy_allow_all_get(bucket)
- if dry_run:
- click.echo(
- "Would create bucket: '{}'{}".format(
- bucket,
- (
- " with args {}".format(json.dumps(kwargs, indent=4))
- if kwargs
- else ""
- ),
- )
- )
- if bucket_policy:
- click.echo("... then attach the following bucket policy to it:")
- click.echo(json.dumps(bucket_policy, indent=4))
- else:
- s3.create_bucket(Bucket=bucket, **kwargs)
- info = "Created bucket: {}".format(bucket)
- if bucket_region:
- info += " in region: {}".format(bucket_region)
- log(info)
- if bucket_policy:
- s3.put_bucket_policy(
- Bucket=bucket, Policy=json.dumps(bucket_policy)
- )
- log("Attached bucket policy allowing public access")
- # At this point the buckets definitely exist - create the inline policy for assume_role()
- assume_role_policy = {}
- if policy:
- assume_role_policy = json.loads(policy.replace("$!BUCKET_NAME!$", bucket))
- else:
- statements = []
- if permission == "read-write":
- for bucket in buckets:
- statements.extend(policies.read_write_statements(bucket, prefix))
- elif permission == "read-only":
- for bucket in buckets:
- statements.extend(policies.read_only_statements(bucket, prefix))
- elif permission == "write-only":
- for bucket in buckets:
- statements.extend(policies.write_only_statements(bucket, prefix))
- else:
- assert False, "Unknown permission: {}".format(permission)
- statements.extend(extra_statements)
- assume_role_policy = policies.wrap_policy(statements)
- if duration:
- # We're going to use sts.assume_role() rather than creating a user
- if dry_run:
- click.echo("Would ensure role: 's3-credentials.AmazonS3FullAccess'")
- click.echo(
- "Would assume role using following policy for {} seconds:".format(
- duration
- )
- )
- click.echo(json.dumps(assume_role_policy, indent=4))
- else:
- s3_role_arn = ensure_s3_role_exists(iam, sts)
- log("Assume role against {} for {}s".format(s3_role_arn, duration))
- credentials_response = sts.assume_role(
- RoleArn=s3_role_arn,
- RoleSessionName="s3.{permission}.{buckets}".format(
- permission="custom" if (policy or extra_statements) else permission,
- buckets=",".join(buckets),
- ),
- Policy=json.dumps(assume_role_policy),
- DurationSeconds=duration,
- )
- if format_ == "ini":
- click.echo(
- (
- "[default]\naws_access_key_id={}\n"
- "aws_secret_access_key={}\naws_session_token={}"
- ).format(
- credentials_response["Credentials"]["AccessKeyId"],
- credentials_response["Credentials"]["SecretAccessKey"],
- credentials_response["Credentials"]["SessionToken"],
- )
- )
- else:
- click.echo(
- json.dumps(
- credentials_response["Credentials"], indent=4, default=str
- )
- )
- return
- # No duration, so wo create a new user so we can issue non-expiring credentials
- if not username:
- # Default username is "s3.read-write.bucket1,bucket2"
- username = "s3.{permission}.{buckets}".format(
- permission="custom" if (policy or extra_statements) else permission,
- buckets=",".join(buckets),
- )
- if dry_run or (not user_exists(iam, username)):
- kwargs = {"UserName": username}
- if user_permissions_boundary != "none":
- # This is a user-account level limitation, it does not grant
- # permissions on its own but is a useful extra level of defense
- # https://github.com/simonw/s3-credentials/issues/1#issuecomment-958201717
- if not user_permissions_boundary:
- # Pick one based on --read-only/--write-only
- if read_only:
- user_permissions_boundary = (
- "arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess"
- )
- else:
- # Need full access in order to be able to write
- user_permissions_boundary = (
- "arn:aws:iam::aws:policy/AmazonS3FullAccess"
- )
- kwargs["PermissionsBoundary"] = user_permissions_boundary
- info = " user: '{}'".format(username)
- if user_permissions_boundary != "none":
- info += " with permissions boundary: '{}'".format(user_permissions_boundary)
- if dry_run:
- click.echo("Would create{}".format(info))
- else:
- iam.create_user(**kwargs)
- log("Created {}".format(info))
- # Add inline policies to the user so they can access the buckets
- user_policy = {}
- for bucket in buckets:
- policy_name = "s3.{permission}.{bucket}".format(
- permission="custom" if (policy or extra_statements) else permission,
- bucket=bucket,
- )
- if policy:
- user_policy = json.loads(policy.replace("$!BUCKET_NAME!$", bucket))
- else:
- if permission == "read-write":
- user_policy = policies.read_write(bucket, prefix, extra_statements)
- elif permission == "read-only":
- user_policy = policies.read_only(bucket, prefix, extra_statements)
- elif permission == "write-only":
- user_policy = policies.write_only(bucket, prefix, extra_statements)
- else:
- assert False, "Unknown permission: {}".format(permission)
- if dry_run:
- click.echo(
- "Would attach policy called '{}' to user '{}', details:\n{}".format(
- policy_name,
- username,
- json.dumps(user_policy, indent=4),
- )
- )
- else:
- iam.put_user_policy(
- PolicyDocument=json.dumps(user_policy),
- PolicyName=policy_name,
- UserName=username,
- )
- log("Attached policy {} to user {}".format(policy_name, username))
- # Retrieve and print out the credentials
- if dry_run:
- click.echo("Would call create access key for user '{}'".format(username))
- else:
- response = iam.create_access_key(
- UserName=username,
- )
- log("Created access key for user: {}".format(username))
- if format_ == "ini":
- click.echo(
- ("[default]\naws_access_key_id={}\n" "aws_secret_access_key={}").format(
- response["AccessKey"]["AccessKeyId"],
- response["AccessKey"]["SecretAccessKey"],
- )
- )
- elif format_ == "json":
- click.echo(json.dumps(response["AccessKey"], indent=4, default=str))
- @cli.command()
- @common_boto3_options
- def whoami(**boto_options):
- "Identify currently authenticated user"
- sts = make_client("sts", **boto_options)
- identity = sts.get_caller_identity()
- identity.pop("ResponseMetadata")
- click.echo(json.dumps(identity, indent=4, default=str))
- @cli.command()
- @common_output_options
- @common_boto3_options
- def list_users(nl, csv, tsv, **boto_options):
- """
- List all users for this account
- s3-credentials list-users
- Add --csv or --csv for CSV or TSV format:
- s3-credentials list-users --csv
- """
- iam = make_client("iam", **boto_options)
- output(
- paginate(iam, "list_users", "Users"),
- (
- "UserName",
- "UserId",
- "Arn",
- "Path",
- "CreateDate",
- "PasswordLastUsed",
- "PermissionsBoundary",
- "Tags",
- ),
- nl,
- csv,
- tsv,
- )
- @cli.command()
- @click.argument("role_names", nargs=-1)
- @click.option("--details", help="Include attached policies (slower)", is_flag=True)
- @common_output_options
- @common_boto3_options
- def list_roles(role_names, details, nl, csv, tsv, **boto_options):
- """
- List roles
- To list all roles for this AWS account:
- s3-credentials list-roles
- Add --csv or --csv for CSV or TSV format:
- s3-credentials list-roles --csv
- For extra details per role (much slower) add --details
- s3-credentials list-roles --details
- """
- iam = make_client("iam", **boto_options)
- headers = (
- "Path",
- "RoleName",
- "RoleId",
- "Arn",
- "CreateDate",
- "AssumeRolePolicyDocument",
- "Description",
- "MaxSessionDuration",
- "PermissionsBoundary",
- "Tags",
- "RoleLastUsed",
- )
- if details:
- headers += ("inline_policies", "attached_policies")
- def iterate():
- for role in paginate(iam, "list_roles", "Roles"):
- if role_names and role["RoleName"] not in role_names:
- continue
- if details:
- role_name = role["RoleName"]
- role["inline_policies"] = []
- # Get inline policy names, then policy for each one
- for policy_name in paginate(
- iam, "list_role_policies", "PolicyNames", RoleName=role_name
- ):
- role_policy_response = iam.get_role_policy(
- RoleName=role_name,
- PolicyName=policy_name,
- )
- role_policy_response.pop("ResponseMetadata", None)
- role["inline_policies"].append(role_policy_response)
- # Get attached managed policies
- role["attached_policies"] = []
- for attached in paginate(
- iam,
- "list_attached_role_policies",
- "AttachedPolicies",
- RoleName=role_name,
- ):
- policy_arn = attached["PolicyArn"]
- attached_policy_response = iam.get_policy(
- PolicyArn=policy_arn,
- )
- policy_details = attached_policy_response["Policy"]
- # Also need to fetch the policy JSON
- version_id = policy_details["DefaultVersionId"]
- policy_version_response = iam.get_policy_version(
- PolicyArn=policy_arn,
- VersionId=version_id,
- )
- policy_details["PolicyVersion"] = policy_version_response[
- "PolicyVersion"
- ]
- role["attached_policies"].append(policy_details)
- yield role
- output(iterate(), headers, nl, csv, tsv)
- @cli.command()
- @click.argument("usernames", nargs=-1)
- @common_boto3_options
- def list_user_policies(usernames, **boto_options):
- """
- List inline policies for specified users
- s3-credentials list-user-policies username
- Returns policies for all users if no usernames are provided.
- """
- iam = make_client("iam", **boto_options)
- if not usernames:
- usernames = [user["UserName"] for user in paginate(iam, "list_users", "Users")]
- for username in usernames:
- click.echo("User: {}".format(username))
- for policy_name in paginate(
- iam, "list_user_policies", "PolicyNames", UserName=username
- ):
- click.echo("PolicyName: {}".format(policy_name))
- policy_response = iam.get_user_policy(
- UserName=username, PolicyName=policy_name
- )
- click.echo(
- json.dumps(policy_response["PolicyDocument"], indent=4, default=str)
- )
- @cli.command()
- @click.argument("buckets", nargs=-1)
- @click.option("--details", help="Include extra bucket details (slower)", is_flag=True)
- @common_output_options
- @common_boto3_options
- def list_buckets(buckets, details, nl, csv, tsv, **boto_options):
- """
- List buckets
- To list all buckets and their creation time as JSON:
- s3-credentials list-buckets
- Add --csv or --csv for CSV or TSV format:
- s3-credentials list-buckets --csv
- For extra details per bucket (much slower) add --details
- s3-credentials list-buckets --details
- """
- s3 = make_client("s3", **boto_options)
- headers = ["Name", "CreationDate"]
- if details:
- headers += ["bucket_acl", "public_access_block", "bucket_website"]
- def iterator():
- for bucket in s3.list_buckets()["Buckets"]:
- if buckets and (bucket["Name"] not in buckets):
- continue
- if details:
- bucket_acl = dict(
- (key, value)
- for key, value in s3.get_bucket_acl(
- Bucket=bucket["Name"],
- ).items()
- if key != "ResponseMetadata"
- )
- try:
- pab = s3.get_public_access_block(
- Bucket=bucket["Name"],
- )["PublicAccessBlockConfiguration"]
- except s3.exceptions.ClientError:
- pab = None
- try:
- bucket_website = dict(
- (key, value)
- for key, value in s3.get_bucket_website(
- Bucket=bucket["Name"],
- ).items()
- if key != "ResponseMetadata"
- )
- except s3.exceptions.ClientError:
- bucket_website = None
- bucket["bucket_acl"] = bucket_acl
- bucket["public_access_block"] = pab
- bucket["bucket_website"] = bucket_website
- yield bucket
- output(iterator(), headers, nl, csv, tsv)
- @cli.command()
- @click.argument("usernames", nargs=-1, required=True)
- @common_boto3_options
- def delete_user(usernames, **boto_options):
- """
- Delete specified users, their access keys and their inline policies
- s3-credentials delete-user username1 username2
- """
- iam = make_client("iam", **boto_options)
- for username in usernames:
- click.echo("User: {}".format(username))
- # Fetch and delete their policies
- policy_names_to_delete = list(
- paginate(iam, "list_user_policies", "PolicyNames", UserName=username)
- )
- for policy_name in policy_names_to_delete:
- iam.delete_user_policy(
- UserName=username,
- PolicyName=policy_name,
- )
- click.echo(" Deleted policy: {}".format(policy_name))
- # Fetch and delete their access keys
- access_key_ids_to_delete = [
- access_key["AccessKeyId"]
- for access_key in paginate(
- iam, "list_access_keys", "AccessKeyMetadata", UserName=username
- )
- ]
- for access_key_id in access_key_ids_to_delete:
- iam.delete_access_key(
- UserName=username,
- AccessKeyId=access_key_id,
- )
- click.echo(" Deleted access key: {}".format(access_key_id))
- iam.delete_user(UserName=username)
- click.echo(" Deleted user")
- def make_client(service, access_key, secret_key, session_token, endpoint_url, auth):
- if auth:
- if access_key or secret_key or session_token:
- raise click.ClickException(
- "--auth cannot be used with --access-key, --secret-key or --session-token"
- )
- auth_content = auth.read().strip()
- if auth_content.startswith("{"):
- # Treat as JSON
- decoded = json.loads(auth_content)
- access_key = decoded.get("AccessKeyId")
- secret_key = decoded.get("SecretAccessKey")
- session_token = decoded.get("SessionToken")
- else:
- # Treat as INI
- config = configparser.ConfigParser()
- config.read_string(auth_content)
- # Use the first section that has an aws_access_key_id
- for section in config.sections():
- if "aws_access_key_id" in config[section]:
- access_key = config[section].get("aws_access_key_id")
- secret_key = config[section].get("aws_secret_access_key")
- session_token = config[section].get("aws_session_token")
- break
- kwargs = {}
- if access_key:
- kwargs["aws_access_key_id"] = access_key
- if secret_key:
- kwargs["aws_secret_access_key"] = secret_key
- if session_token:
- kwargs["aws_session_token"] = session_token
- if endpoint_url:
- kwargs["endpoint_url"] = endpoint_url
- return boto3.client(service, **kwargs)
- def ensure_s3_role_exists(iam, sts):
- "Create s3-credentials.AmazonS3FullAccess role if not exists, return ARN"
- role_name = "s3-credentials.AmazonS3FullAccess"
- account_id = sts.get_caller_identity()["Account"]
- try:
- role = iam.get_role(RoleName=role_name)
- return role["Role"]["Arn"]
- except iam.exceptions.NoSuchEntityException:
- create_role_response = iam.create_role(
- Description=(
- "Role used by the s3-credentials tool to create time-limited "
- "credentials that are restricted to specific buckets"
- ),
- RoleName=role_name,
- AssumeRolePolicyDocument=json.dumps(
- {
- "Version": "2012-10-17",
- "Statement": [
- {
- "Effect": "Allow",
- "Principal": {
- "AWS": "arn:aws:iam::{}:root".format(account_id)
- },
- "Action": "sts:AssumeRole",
- }
- ],
- }
- ),
- )
- # Attach AmazonS3FullAccess to it - note that even though we use full access
- # on the role itself any time we call sts.assume_role() we attach an additional
- # policy to ensure reduced access for the temporary credentials
- iam.attach_role_policy(
- RoleName="s3-credentials.AmazonS3FullAccess",
- PolicyArn="arn:aws:iam::aws:policy/AmazonS3FullAccess",
- )
- return create_role_response["Role"]["Arn"]
- @cli.command()
- @click.argument("bucket")
- @click.option("--prefix", help="List keys starting with this prefix")
- @common_output_options
- @common_boto3_options
- def list_bucket(bucket, prefix, nl, csv, tsv, **boto_options):
- """
- List contents of bucket
- To list the contents of a bucket as JSON:
- s3-credentials list-bucket my-bucket
- Add --csv or --csv for CSV or TSV format:
- s3-credentials list-bucket my-bucket --csv
- """
- s3 = make_client("s3", **boto_options)
- kwargs = {"Bucket": bucket}
- if prefix:
- kwargs["Prefix"] = prefix
- try:
- output(
- paginate(s3, "list_objects_v2", "Contents", **kwargs),
- ("Key", "LastModified", "ETag", "Size", "StorageClass", "Owner"),
- nl,
- csv,
- tsv,
- )
- except botocore.exceptions.ClientError as e:
- raise click.ClickException(e)
- @cli.command()
- @click.argument("bucket")
- @click.argument("key")
- @click.argument(
- "path",
- type=click.Path(
- exists=True, file_okay=True, dir_okay=False, readable=True, allow_dash=True
- ),
- )
- @click.option(
- "--content-type",
- help="Content-Type to use (default is auto-detected based on file extension)",
- )
- @click.option("silent", "-s", "--silent", is_flag=True, help="Don't show progress bar")
- @common_boto3_options
- def put_object(bucket, key, path, content_type, silent, **boto_options):
- """
- Upload an object to an S3 bucket
- To upload a file to /my-key.txt in the my-bucket bucket:
- s3-credentials put-object my-bucket my-key.txt /path/to/file.txt
- Use - to upload content from standard input:
- echo "Hello" | s3-credentials put-object my-bucket hello.txt -
- """
- s3 = make_client("s3", **boto_options)
- size = None
- extra_args = {}
- if path == "-":
- # boto needs to be able to seek
- fp = io.BytesIO(sys.stdin.buffer.read())
- if not silent:
- size = fp.getbuffer().nbytes
- else:
- if not content_type:
- content_type = mimetypes.guess_type(path)[0]
- fp = click.open_file(path, "rb")
- if not silent:
- size = os.path.getsize(path)
- if content_type is not None:
- extra_args["ContentType"] = content_type
- if not silent:
- # Show progress bar
- with click.progressbar(length=size, label="Uploading") as bar:
- s3.upload_fileobj(
- fp, bucket, key, Callback=bar.update, ExtraArgs=extra_args
- )
- else:
- s3.upload_fileobj(fp, bucket, key, ExtraArgs=extra_args)
- @cli.command()
- @click.argument("bucket")
- @click.argument("key")
- @click.option(
- "output",
- "-o",
- "--output",
- type=click.Path(file_okay=True, dir_okay=False, writable=True, allow_dash=False),
- help="Write to this file instead of stdout",
- )
- @common_boto3_options
- def get_object(bucket, key, output, **boto_options):
- """
- Download an object from an S3 bucket
- To see the contents of the bucket on standard output:
- s3-credentials get-object my-bucket hello.txt
- To save to a file:
- s3-credentials get-object my-bucket hello.txt -o hello.txt
- """
- s3 = make_client("s3", **boto_options)
- if not output:
- fp = sys.stdout.buffer
- else:
- fp = click.open_file(output, "wb")
- s3.download_fileobj(bucket, key, fp)
- @cli.command()
- @click.argument("bucket")
- @click.option(
- "allowed_methods",
- "-m",
- "--allowed-method",
- multiple=True,
- help="Allowed method e.g. GET",
- )
- @click.option(
- "allowed_headers",
- "-h",
- "--allowed-header",
- multiple=True,
- help="Allowed header e.g. Authorization",
- )
- @click.option(
- "allowed_origins",
- "-o",
- "--allowed-origin",
- multiple=True,
- help="Allowed origin e.g. https://www.example.com/",
- )
- @click.option(
- "expose_headers",
- "-e",
- "--expose-header",
- multiple=True,
- help="Header to expose e.g. ETag",
- )
- @click.option(
- "max_age_seconds",
- "--max-age-seconds",
- type=int,
- help="How long to cache preflight requests",
- )
- @common_boto3_options
- def set_cors_policy(
- bucket,
- allowed_methods,
- allowed_headers,
- allowed_origins,
- expose_headers,
- max_age_seconds,
- **boto_options
- ):
- """
- Set CORS policy for a bucket
- To allow GET requests from any origin:
- s3-credentials set-cors-policy my-bucket
- To allow GET and PUT from a specific origin and expose ETag headers:
- \b
- s3-credentials set-cors-policy my-bucket \\
- --allowed-method GET \\
- --allowed-method PUT \\
- --allowed-origin https://www.example.com/ \\
- --expose-header ETag
- """
- s3 = make_client("s3", **boto_options)
- if not bucket_exists(s3, bucket):
- raise click.ClickException("Bucket {} does not exists".format(bucket))
- cors_rule = {
- "ID": "set-by-s3-credentials",
- "AllowedOrigins": allowed_origins or ["*"],
- "AllowedHeaders": allowed_headers,
- "AllowedMethods": allowed_methods or ["GET"],
- "ExposeHeaders": expose_headers,
- }
- if max_age_seconds:
- cors_rule["MaxAgeSeconds"] = max_age_seconds
- try:
- s3.put_bucket_cors(Bucket=bucket, CORSConfiguration={"CORSRules": [cors_rule]})
- except botocore.exceptions.ClientError as e:
- raise click.ClickException(e)
- @cli.command()
- @click.argument("bucket")
- @common_boto3_options
- def get_cors_policy(bucket, **boto_options):
- """
- Get CORS policy for a bucket
- s3-credentials get-cors-policy my-bucket
- Returns the CORS policy for this bucket, if set, as JSON
- """
- s3 = make_client("s3", **boto_options)
- try:
- response = s3.get_bucket_cors(Bucket=bucket)
- except botocore.exceptions.ClientError as e:
- raise click.ClickException(e)
- click.echo(json.dumps(response["CORSRules"], indent=4, default=str))
- def output(iterator, headers, nl, csv, tsv):
- if nl:
- for item in iterator:
- click.echo(json.dumps(item, default=str))
- elif csv or tsv:
- writer = DictWriter(
- sys.stdout, headers, dialect="excel-tab" if tsv else "excel"
- )
- writer.writeheader()
- writer.writerows(fix_json(row) for row in iterator)
- else:
- for line in stream_indented_json(iterator):
- click.echo(line)
- def stream_indented_json(iterator, indent=2):
- # We have to iterate two-at-a-time so we can know if we
- # should output a trailing comma or if we have reached
- # the last item.
- current_iter, next_iter = itertools.tee(iterator, 2)
- next(next_iter, None)
- first = True
- for item, next_item in itertools.zip_longest(current_iter, next_iter):
- is_last = next_item is None
- data = item
- line = "{first}{serialized}{separator}{last}".format(
- first="[\n" if first else "",
- serialized=textwrap.indent(
- json.dumps(data, indent=indent, default=str), " " * indent
- ),
- separator="," if not is_last else "",
- last="\n]" if is_last else "",
- )
- yield line
- first = False
- if first:
- # We didn't output anything, so yield the empty list
- yield "[]"
- def paginate(service, method, list_key, **kwargs):
- paginator = service.get_paginator(method)
- for response in paginator.paginate(**kwargs):
- yield from response[list_key]
- def fix_json(row):
- # If a key value is list or dict, json encode it
- return dict(
- [
- (
- key,
- json.dumps(value, indent=2, default=str)
- if isinstance(value, (dict, list, tuple))
- else value,
- )
- for key, value in row.items()
- ]
- )
|