123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- import sys
- PY2 = sys.version_info[0] == 2
- PY3 = (sys.version_info[0] >= 3)
- #import queue
- if PY2:
- import Queue as queue
- else: # PY3
- import queue
- import argparse
- import logging
- import os
- import signal
- import time
- import json
- from threading import Lock
- from threading import Event
- from threading import Thread
- import requests
- import tldextract
- import yaml
- from boto3.session import Session
- from certstream.core import CertStreamClient
- from requests.adapters import HTTPAdapter
- from termcolor import cprint
- ARGS = argparse.Namespace()
- CONFIG = yaml.safe_load(open("config.yaml"))
- KEYWORDS = [line.strip() for line in open("keywords.txt")]
- S3_URL = "http://s3-1-w.amazonaws.com"
- BUCKET_HOST = "%s.s3.amazonaws.com"
- QUEUE_SIZE = CONFIG['queue_size']
- UPDATE_INTERVAL = CONFIG['update_interval'] # seconds
- RATE_LIMIT_SLEEP = CONFIG['rate_limit_sleep'] # seconds
- THREADS = list()
- THREAD_EVENT = Event()
- FOUND_COUNT = 0
- class UpdateThread(Thread):
- def __init__(self, q, *args, **kwargs):
- self.q = q
- self.checked_buckets_since_last_update = 0
- super().__init__(*args, **kwargs)
- def run(self):
- global THREAD_EVENT
- while not THREAD_EVENT.is_set():
- checked_buckets = len(self.q.checked_buckets)
- if checked_buckets > 1:
- cprint("{0} buckets checked ({1:.0f}b/s), {2} buckets found".format(
- checked_buckets,
- (checked_buckets - self.checked_buckets_since_last_update) / UPDATE_INTERVAL,
- FOUND_COUNT), "cyan")
- self.checked_buckets_since_last_update = checked_buckets
- THREAD_EVENT.wait(UPDATE_INTERVAL)
- class CertStreamThread(Thread):
- def __init__(self, q, *args, **kwargs):
- self.q = q
- self.c = CertStreamClient(
- self.process, skip_heartbeats=True, on_open=None, on_error=None)
- super().__init__(*args, **kwargs)
- def run(self):
- global THREAD_EVENT
- while not THREAD_EVENT.is_set():
- cprint("Waiting for Certstream events - this could take a few minutes to queue up...",
- "yellow", attrs=["bold"])
- self.c.run_forever()
- THREAD_EVENT.wait(10)
- def process(self, message, context):
- if message["message_type"] == "heartbeat":
- return
- if message["message_type"] == "certificate_update":
- all_domains = message["data"]["leaf_cert"]["all_domains"]
- if ARGS.skip_lets_encrypt and "Let's Encrypt" in message["data"]["chain"][0]["subject"]["aggregated"]:
- return
- for domain in set(all_domains):
- # cut the crap
- if not domain.startswith("*.")\
- and "cloudflaressl" not in domain\
- and "xn--" not in domain\
- and domain.count("-") < 4\
- and domain.count(".") < 4:
- parts = tldextract.extract(domain)
- for permutation in get_permutations(parts.domain, parts.subdomain):
- self.q.put(BUCKET_HOST % permutation)
- class BucketQueue(queue.Queue):
- def __init__(self, maxsize):
- self.lock = Lock()
- self.checked_buckets = list()
- self.rate_limited = False
- self.next_yield = 0
- super().__init__(maxsize)
- def put(self, bucket_url):
- if bucket_url not in self.checked_buckets:
- self.checked_buckets.append(bucket_url)
- super().put(bucket_url)
- def get(self):
- global THREAD_EVENT
- with self.lock:
- t = time.monotonic()
- if self.rate_limited and t < self.next_yield:
- cprint("You have hit the AWS rate limit - slowing down... (tip: enter credentials in config.yaml)", "yellow")
- THREAD_EVENT.wait(self.next_yield - t)
- t = time.monotonic()
- self.rate_limited = False
- self.next_yield = t + RATE_LIMIT_SLEEP
- return super().get()
- class BucketWorker(Thread):
- def __init__(self, q, *args, **kwargs):
- self.q = q
- self.use_aws = CONFIG["aws_access_key"] and CONFIG["aws_secret"]
- if self.use_aws:
- self.session = Session(
- aws_access_key_id=CONFIG["aws_access_key"], aws_secret_access_key=CONFIG["aws_secret"]).resource("s3")
- else:
- self.session = requests.Session()
- self.session.mount(
- "http://", HTTPAdapter(pool_connections=ARGS.threads, pool_maxsize=QUEUE_SIZE, max_retries=0))
- super().__init__(*args, **kwargs)
- def run(self):
- global THREAD_EVENT
- while not THREAD_EVENT.is_set():
- try:
- bucket_url = self.q.get()
- self.__check_boto(
- bucket_url) if self.use_aws else self.__check_http(bucket_url)
- except Exception as e:
- print(e)
- pass
- finally:
- self.q.task_done()
- def __check_http(self, bucket_url):
- check_response = self.session.head(
- S3_URL, timeout=3, headers={"Host": bucket_url})
- if not ARGS.ignore_rate_limiting\
- and (check_response.status_code == 503 and check_response.reason == "Slow Down"):
- self.q.rate_limited = True
- # add it back to the queue for re-processing
- self.q.put(bucket_url)
- elif check_response.status_code == 307: # valid bucket, lets check if its public
- new_bucket_url = check_response.headers["Location"]
- bucket_response = requests.request(
- "GET" if ARGS.only_interesting else "HEAD", new_bucket_url, timeout=3)
- if bucket_response.status_code == 200\
- and (not ARGS.only_interesting or
- (ARGS.only_interesting and any(keyword in bucket_response.text for keyword in KEYWORDS))):
- self.__output("Found bucket '{}'".format(new_bucket_url), "green")
- self.__log(new_bucket_url)
- def __check_boto(self, bucket_url):
- bucket_name = bucket_url.replace(".s3.amazonaws.com", "")
- try:
- # just to check if the bucket exists. Throws NoSuchBucket exception if not
- self.session.meta.client.head_bucket(Bucket=bucket_name)
- if not ARGS.only_interesting or\
- (ARGS.only_interesting and self.__bucket_contains_any_keywords(bucket_name)):
- owner = None
- acls = None
- try:
- # todo: also check IAM policy as it can override ACLs
- acl = self.session.meta.client.get_bucket_acl(Bucket=bucket_name)
- owner = acl["Owner"]["DisplayName"]
- acls = ". ACLs = {} | {}".format(self.__get_group_acls(acl, "AllUsers"),
- self.__get_group_acls(acl, "AuthenticatedUsers"))
- except:
- acls = ". ACLS = (could not read)"
- color = "green" if not owner else "magenta"
- self.__output("Found bucket '{}'. Owned by '{}'{}".format(
- bucket_url, owner if owner else "(unknown)", acls), color)
- self.__log(bucket_url)
- except Exception as e:
- pass
- def __get_group_acls(self, acl, group):
- group_uri = "http://acs.amazonaws.com/groups/global/%s" % group
- perms = [g["Permission"] for g in acl["Grants"]
- if g["Grantee"]["Type"] == "Group" and g["Grantee"]["URI"] == group_uri]
- return "{}: {}".format(group, ", ".join(perms) if perms else "(none)")
- def __bucket_contains_any_keywords(self, bucket_name):
- try:
- objects = [o.key for o in self.session.Bucket(bucket_name).objects.all()]
- return any(keyword in ",".join(objects) for keyword in KEYWORDS)
- except:
- return False
- def __log(self, new_bucket_url):
- global FOUND_COUNT
- FOUND_COUNT += 1
- if ARGS.log_to_file:
- with open("buckets.log", "a+") as log:
- log.write("%s%s" % (new_bucket_url, os.linesep))
- def __output(self, line, color=None):
- cprint(line, color, attrs=["bold"])
- if CONFIG["slack_webhook"]:
- resp = requests.post(CONFIG['slack_webhook'], data=json.dumps({'text': line}), headers={'Content-Type': 'application/json'})
- if resp.status_code != 200:
- cprint("Could not send to your Slack Webhook. Server returned: %s" % resp.status_code, "red")
- def get_permutations(domain, subdomain=None):
- perms = [
- "%s" % domain,
- "www-%s" % domain,
- "%s-www" % domain,
- ]
- perms.extend([line.strip() % domain for line in open(ARGS.permutations)])
- if subdomain is not None:
- perms.extend([
- "%s-%s" % (subdomain, domain) if subdomain else "",
- "%s-%s" % (domain, subdomain) if subdomain else ""
- ])
- return filter(None, perms)
- def stop():
- global THREAD_EVENT
- cprint("Kill commanded received - Quitting...", "yellow", attrs=["bold"])
- THREAD_EVENT.set()
- sys.exit(0)
- def __signal_handler(signal, frame):
- stop()
- def main():
- global THREADS
- signal.signal(signal.SIGINT, __signal_handler)
- parser = argparse.ArgumentParser(description="Find interesting Amazon S3 Buckets by watching certificate transparency logs.",
- usage="python bucket-stream.py",
- formatter_class=argparse.ArgumentDefaultsHelpFormatter)
- parser.add_argument("--only-interesting", action="store_true", dest="only_interesting", default=False,
- help="Only log 'interesting' buckets whose contents match anything within keywords.txt")
- parser.add_argument("--skip-lets-encrypt", action="store_true", dest="skip_lets_encrypt", default=False,
- help="Skip certs (and thus listed domains) issued by Let's Encrypt CA")
- parser.add_argument("-t", "--threads", metavar="", type=int, dest="threads", default=20,
- help="Number of threads to spawn. More threads = more power. Limited to 5 threads if unauthenticated.")
- parser.add_argument("--ignore-rate-limiting", action="store_true", dest="ignore_rate_limiting", default=False,
- help="If you ignore rate limits not all buckets will be checked")
- parser.add_argument("-l", "--log", dest="log_to_file", default=False, action="store_true",
- help="Log found buckets to a file buckets.log")
- parser.add_argument("-s", "--source", dest="source", default=None,
- help="Data source to check for bucket permutations. Uses certificate transparency logs if not specified.")
- parser.add_argument("-p", "--permutations", dest="permutations", default="permutations/default.txt",
- help="Path of file containing a list of permutations to try (see permutations/ dir).")
- parser.parse_args(namespace=ARGS)
- logging.disable(logging.WARNING)
- if not CONFIG["aws_access_key"] or not CONFIG["aws_secret"]:
- cprint("It is highly recommended to enter AWS keys in config.yaml otherwise you will be severely rate limited!"\
- "You might want to run with --ignore-rate-limiting", "red")
- if ARGS.threads > 5:
- cprint("No AWS keys, reducing threads to 5 to help with rate limiting.", "red")
- ARGS.threads = 5
- THREADS = list()
- cprint("Starting bucket-stream with {0} threads. Loaded {1} permutations."\
- .format(ARGS.threads, len([x for x in get_permutations("")])), "green")
- q = BucketQueue(maxsize=QUEUE_SIZE)
- THREADS.extend([BucketWorker(q) for _ in range(0, ARGS.threads)])
- THREADS.extend([UpdateThread(q)])
- if ARGS.source is None:
- THREADS.extend([CertStreamThread(q)])
- else:
- for line in open(ARGS.source):
- for permutation in get_permutations(line.strip()):
- q.put(BUCKET_HOST % permutation)
- for t in THREADS:
- t.daemon = True
- t.start()
- while True:
- try:
- signal.pause()
- except AttributeError:
- # signal.pause() not implemented on windows
- while not THREAD_EVENT.is_set():
- time.sleep(1)
- stop()
- if __name__ == "__main__":
- main()
|