bucket-stream.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. import sys
  4. PY2 = sys.version_info[0] == 2
  5. PY3 = (sys.version_info[0] >= 3)
  6. #import queue
  7. if PY2:
  8. import Queue as queue
  9. else: # PY3
  10. import queue
  11. import argparse
  12. import logging
  13. import os
  14. import signal
  15. import time
  16. import json
  17. from threading import Lock
  18. from threading import Event
  19. from threading import Thread
  20. import requests
  21. import tldextract
  22. import yaml
  23. from boto3.session import Session
  24. from certstream.core import CertStreamClient
  25. from requests.adapters import HTTPAdapter
  26. from termcolor import cprint
  27. ARGS = argparse.Namespace()
  28. CONFIG = yaml.safe_load(open("config.yaml"))
  29. KEYWORDS = [line.strip() for line in open("keywords.txt")]
  30. S3_URL = "http://s3-1-w.amazonaws.com"
  31. BUCKET_HOST = "%s.s3.amazonaws.com"
  32. QUEUE_SIZE = CONFIG['queue_size']
  33. UPDATE_INTERVAL = CONFIG['update_interval'] # seconds
  34. RATE_LIMIT_SLEEP = CONFIG['rate_limit_sleep'] # seconds
  35. THREADS = list()
  36. THREAD_EVENT = Event()
  37. FOUND_COUNT = 0
  38. class UpdateThread(Thread):
  39. def __init__(self, q, *args, **kwargs):
  40. self.q = q
  41. self.checked_buckets_since_last_update = 0
  42. super().__init__(*args, **kwargs)
  43. def run(self):
  44. global THREAD_EVENT
  45. while not THREAD_EVENT.is_set():
  46. checked_buckets = len(self.q.checked_buckets)
  47. if checked_buckets > 1:
  48. cprint("{0} buckets checked ({1:.0f}b/s), {2} buckets found".format(
  49. checked_buckets,
  50. (checked_buckets - self.checked_buckets_since_last_update) / UPDATE_INTERVAL,
  51. FOUND_COUNT), "cyan")
  52. self.checked_buckets_since_last_update = checked_buckets
  53. THREAD_EVENT.wait(UPDATE_INTERVAL)
  54. class CertStreamThread(Thread):
  55. def __init__(self, q, *args, **kwargs):
  56. self.q = q
  57. self.c = CertStreamClient(
  58. self.process, skip_heartbeats=True, on_open=None, on_error=None)
  59. super().__init__(*args, **kwargs)
  60. def run(self):
  61. global THREAD_EVENT
  62. while not THREAD_EVENT.is_set():
  63. cprint("Waiting for Certstream events - this could take a few minutes to queue up...",
  64. "yellow", attrs=["bold"])
  65. self.c.run_forever()
  66. THREAD_EVENT.wait(10)
  67. def process(self, message, context):
  68. if message["message_type"] == "heartbeat":
  69. return
  70. if message["message_type"] == "certificate_update":
  71. all_domains = message["data"]["leaf_cert"]["all_domains"]
  72. if ARGS.skip_lets_encrypt and "Let's Encrypt" in message["data"]["chain"][0]["subject"]["aggregated"]:
  73. return
  74. for domain in set(all_domains):
  75. # cut the crap
  76. if not domain.startswith("*.")\
  77. and "cloudflaressl" not in domain\
  78. and "xn--" not in domain\
  79. and domain.count("-") < 4\
  80. and domain.count(".") < 4:
  81. parts = tldextract.extract(domain)
  82. for permutation in get_permutations(parts.domain, parts.subdomain):
  83. self.q.put(BUCKET_HOST % permutation)
  84. class BucketQueue(queue.Queue):
  85. def __init__(self, maxsize):
  86. self.lock = Lock()
  87. self.checked_buckets = list()
  88. self.rate_limited = False
  89. self.next_yield = 0
  90. super().__init__(maxsize)
  91. def put(self, bucket_url):
  92. if bucket_url not in self.checked_buckets:
  93. self.checked_buckets.append(bucket_url)
  94. super().put(bucket_url)
  95. def get(self):
  96. global THREAD_EVENT
  97. with self.lock:
  98. t = time.monotonic()
  99. if self.rate_limited and t < self.next_yield:
  100. cprint("You have hit the AWS rate limit - slowing down... (tip: enter credentials in config.yaml)", "yellow")
  101. THREAD_EVENT.wait(self.next_yield - t)
  102. t = time.monotonic()
  103. self.rate_limited = False
  104. self.next_yield = t + RATE_LIMIT_SLEEP
  105. return super().get()
  106. class BucketWorker(Thread):
  107. def __init__(self, q, *args, **kwargs):
  108. self.q = q
  109. self.use_aws = CONFIG["aws_access_key"] and CONFIG["aws_secret"]
  110. if self.use_aws:
  111. self.session = Session(
  112. aws_access_key_id=CONFIG["aws_access_key"], aws_secret_access_key=CONFIG["aws_secret"]).resource("s3")
  113. else:
  114. self.session = requests.Session()
  115. self.session.mount(
  116. "http://", HTTPAdapter(pool_connections=ARGS.threads, pool_maxsize=QUEUE_SIZE, max_retries=0))
  117. super().__init__(*args, **kwargs)
  118. def run(self):
  119. global THREAD_EVENT
  120. while not THREAD_EVENT.is_set():
  121. try:
  122. bucket_url = self.q.get()
  123. self.__check_boto(
  124. bucket_url) if self.use_aws else self.__check_http(bucket_url)
  125. except Exception as e:
  126. print(e)
  127. pass
  128. finally:
  129. self.q.task_done()
  130. def __check_http(self, bucket_url):
  131. check_response = self.session.head(
  132. S3_URL, timeout=3, headers={"Host": bucket_url})
  133. if not ARGS.ignore_rate_limiting\
  134. and (check_response.status_code == 503 and check_response.reason == "Slow Down"):
  135. self.q.rate_limited = True
  136. # add it back to the queue for re-processing
  137. self.q.put(bucket_url)
  138. elif check_response.status_code == 307: # valid bucket, lets check if its public
  139. new_bucket_url = check_response.headers["Location"]
  140. bucket_response = requests.request(
  141. "GET" if ARGS.only_interesting else "HEAD", new_bucket_url, timeout=3)
  142. if bucket_response.status_code == 200\
  143. and (not ARGS.only_interesting or
  144. (ARGS.only_interesting and any(keyword in bucket_response.text for keyword in KEYWORDS))):
  145. self.__output("Found bucket '{}'".format(new_bucket_url), "green")
  146. self.__log(new_bucket_url)
  147. def __check_boto(self, bucket_url):
  148. bucket_name = bucket_url.replace(".s3.amazonaws.com", "")
  149. try:
  150. # just to check if the bucket exists. Throws NoSuchBucket exception if not
  151. self.session.meta.client.head_bucket(Bucket=bucket_name)
  152. if not ARGS.only_interesting or\
  153. (ARGS.only_interesting and self.__bucket_contains_any_keywords(bucket_name)):
  154. owner = None
  155. acls = None
  156. try:
  157. # todo: also check IAM policy as it can override ACLs
  158. acl = self.session.meta.client.get_bucket_acl(Bucket=bucket_name)
  159. owner = acl["Owner"]["DisplayName"]
  160. acls = ". ACLs = {} | {}".format(self.__get_group_acls(acl, "AllUsers"),
  161. self.__get_group_acls(acl, "AuthenticatedUsers"))
  162. except:
  163. acls = ". ACLS = (could not read)"
  164. color = "green" if not owner else "magenta"
  165. self.__output("Found bucket '{}'. Owned by '{}'{}".format(
  166. bucket_url, owner if owner else "(unknown)", acls), color)
  167. self.__log(bucket_url)
  168. except Exception as e:
  169. pass
  170. def __get_group_acls(self, acl, group):
  171. group_uri = "http://acs.amazonaws.com/groups/global/%s" % group
  172. perms = [g["Permission"] for g in acl["Grants"]
  173. if g["Grantee"]["Type"] == "Group" and g["Grantee"]["URI"] == group_uri]
  174. return "{}: {}".format(group, ", ".join(perms) if perms else "(none)")
  175. def __bucket_contains_any_keywords(self, bucket_name):
  176. try:
  177. objects = [o.key for o in self.session.Bucket(bucket_name).objects.all()]
  178. return any(keyword in ",".join(objects) for keyword in KEYWORDS)
  179. except:
  180. return False
  181. def __log(self, new_bucket_url):
  182. global FOUND_COUNT
  183. FOUND_COUNT += 1
  184. if ARGS.log_to_file:
  185. with open("buckets.log", "a+") as log:
  186. log.write("%s%s" % (new_bucket_url, os.linesep))
  187. def __output(self, line, color=None):
  188. cprint(line, color, attrs=["bold"])
  189. if CONFIG["slack_webhook"]:
  190. resp = requests.post(CONFIG['slack_webhook'], data=json.dumps({'text': line}), headers={'Content-Type': 'application/json'})
  191. if resp.status_code != 200:
  192. cprint("Could not send to your Slack Webhook. Server returned: %s" % resp.status_code, "red")
  193. def get_permutations(domain, subdomain=None):
  194. perms = [
  195. "%s" % domain,
  196. "www-%s" % domain,
  197. "%s-www" % domain,
  198. ]
  199. perms.extend([line.strip() % domain for line in open(ARGS.permutations)])
  200. if subdomain is not None:
  201. perms.extend([
  202. "%s-%s" % (subdomain, domain) if subdomain else "",
  203. "%s-%s" % (domain, subdomain) if subdomain else ""
  204. ])
  205. return filter(None, perms)
  206. def stop():
  207. global THREAD_EVENT
  208. cprint("Kill commanded received - Quitting...", "yellow", attrs=["bold"])
  209. THREAD_EVENT.set()
  210. sys.exit(0)
  211. def __signal_handler(signal, frame):
  212. stop()
  213. def main():
  214. global THREADS
  215. signal.signal(signal.SIGINT, __signal_handler)
  216. parser = argparse.ArgumentParser(description="Find interesting Amazon S3 Buckets by watching certificate transparency logs.",
  217. usage="python bucket-stream.py",
  218. formatter_class=argparse.ArgumentDefaultsHelpFormatter)
  219. parser.add_argument("--only-interesting", action="store_true", dest="only_interesting", default=False,
  220. help="Only log 'interesting' buckets whose contents match anything within keywords.txt")
  221. parser.add_argument("--skip-lets-encrypt", action="store_true", dest="skip_lets_encrypt", default=False,
  222. help="Skip certs (and thus listed domains) issued by Let's Encrypt CA")
  223. parser.add_argument("-t", "--threads", metavar="", type=int, dest="threads", default=20,
  224. help="Number of threads to spawn. More threads = more power. Limited to 5 threads if unauthenticated.")
  225. parser.add_argument("--ignore-rate-limiting", action="store_true", dest="ignore_rate_limiting", default=False,
  226. help="If you ignore rate limits not all buckets will be checked")
  227. parser.add_argument("-l", "--log", dest="log_to_file", default=False, action="store_true",
  228. help="Log found buckets to a file buckets.log")
  229. parser.add_argument("-s", "--source", dest="source", default=None,
  230. help="Data source to check for bucket permutations. Uses certificate transparency logs if not specified.")
  231. parser.add_argument("-p", "--permutations", dest="permutations", default="permutations/default.txt",
  232. help="Path of file containing a list of permutations to try (see permutations/ dir).")
  233. parser.parse_args(namespace=ARGS)
  234. logging.disable(logging.WARNING)
  235. if not CONFIG["aws_access_key"] or not CONFIG["aws_secret"]:
  236. cprint("It is highly recommended to enter AWS keys in config.yaml otherwise you will be severely rate limited!"\
  237. "You might want to run with --ignore-rate-limiting", "red")
  238. if ARGS.threads > 5:
  239. cprint("No AWS keys, reducing threads to 5 to help with rate limiting.", "red")
  240. ARGS.threads = 5
  241. THREADS = list()
  242. cprint("Starting bucket-stream with {0} threads. Loaded {1} permutations."\
  243. .format(ARGS.threads, len([x for x in get_permutations("")])), "green")
  244. q = BucketQueue(maxsize=QUEUE_SIZE)
  245. THREADS.extend([BucketWorker(q) for _ in range(0, ARGS.threads)])
  246. THREADS.extend([UpdateThread(q)])
  247. if ARGS.source is None:
  248. THREADS.extend([CertStreamThread(q)])
  249. else:
  250. for line in open(ARGS.source):
  251. for permutation in get_permutations(line.strip()):
  252. q.put(BUCKET_HOST % permutation)
  253. for t in THREADS:
  254. t.daemon = True
  255. t.start()
  256. while True:
  257. try:
  258. signal.pause()
  259. except AttributeError:
  260. # signal.pause() not implemented on windows
  261. while not THREAD_EVENT.is_set():
  262. time.sleep(1)
  263. stop()
  264. if __name__ == "__main__":
  265. main()