mock_remote_server.py 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250
  1. """
  2. An HTTP server that listens on localhost and returns a variety of responses for
  3. mocking remote servers.
  4. """
  5. from builtins import str
  6. from builtins import range
  7. from builtins import object
  8. from contextlib import contextmanager
  9. from threading import Thread
  10. from time import sleep
  11. from wsgiref.simple_server import make_server
  12. from future.moves.urllib.request import urlopen
  13. import socket
  14. import os
  15. from functools import reduce
  16. class MockHTTPServer(object):
  17. """
  18. Mock HTTP server that can take the place of a remote server for testing
  19. fetching of remote resources.
  20. Uses contextmanager to allow easy setup and teardown of the WSGI server in
  21. a separate thread, eg::
  22. >>> with MockTestServer().serve() as server_address:
  23. ... urlopen(server_address)
  24. ...
  25. Subclass this and override __call__ to provide your own WSGI handler function.
  26. """
  27. def __call__(self, environ, start_response):
  28. raise NotImplementedError()
  29. @contextmanager
  30. def serve(self, host='localhost', port_range=(8000, 9000)):
  31. """
  32. Start an instance of wsgiref.simple_server set up to handle requests in
  33. a separate daemon thread.
  34. Return the address of the server eg ('http://localhost:8000').
  35. This uses context manager to make sure the server is stopped::
  36. >>> with MockTestServer().serve() as addr:
  37. ... print urlopen('%s/?content=hello+world').read()
  38. ...
  39. 'hello world'
  40. """
  41. for port in range(*port_range):
  42. try:
  43. server = make_server(host, port, self)
  44. except socket.error:
  45. continue
  46. break
  47. else:
  48. raise Exception("Could not bind to a port in range %r" % (port_range,))
  49. serving = True
  50. def _serve_until_stopped():
  51. while serving:
  52. server.handle_request()
  53. thread = Thread(target=_serve_until_stopped)
  54. thread.daemon = True
  55. thread.start()
  56. try:
  57. yield 'http://%s:%d' % (host, port)
  58. finally:
  59. serving = False
  60. # Call the server to make sure the waiting handle_request()
  61. # call completes. Set a very small timeout as we don't actually need to
  62. # wait for a response. We don't care about exceptions here either.
  63. try:
  64. urlopen("http://%s:%s/" % (host, port), timeout=0.01)
  65. except Exception:
  66. pass
  67. @classmethod
  68. def get_content(cls, varspec):
  69. """
  70. Return the value of the variable at varspec, which must be in the
  71. format 'package.module:variable'. If variable is callable, it will be
  72. called and its return value used.
  73. """
  74. modpath, var = varspec.split(':')
  75. mod = reduce(getattr, modpath.split('.')[1:], __import__(modpath))
  76. var = reduce(getattr, var.split('.'), mod)
  77. try:
  78. return var()
  79. except TypeError:
  80. return var
  81. class MockEchoTestServer(MockHTTPServer):
  82. """
  83. WSGI application that echos back the status, headers and
  84. content passed via the URL, eg:
  85. a 500 error response: 'http://localhost/?status=500'
  86. a 200 OK response, returning the function's docstring:
  87. 'http://localhost/?status=200;content-type=text/plain;content_var
  88. =ckan.tests.lib.test_package_search:test_wsgi_app.__doc__'
  89. To specify content, use:
  90. content=string
  91. content_var=package.module:variable
  92. """
  93. def __call__(self, environ, start_response):
  94. from http.client import responses
  95. from webob import Request
  96. request = Request(environ)
  97. status = int(request.str_params.get('status', '200'))
  98. # if 'redirect' in redirect.str_params:
  99. # params = dict([(key, value) for param in request.str_params \
  100. # if key != 'redirect'])
  101. # redirect_status = int(request.str_params['redirect'])
  102. # status = int(request.str_params.get('status', '200'))
  103. # resp = make_response(render_template('error.html'), redirect_status)
  104. # resp.headers['Location'] = url_for(request.path, params)
  105. # return resp
  106. if 'content_var' in request.str_params:
  107. content = request.str_params.get('content_var')
  108. content = self.get_content(content)
  109. elif 'content_long' in request.str_params:
  110. content = '*' * 1000001
  111. else:
  112. content = request.str_params.get('content', '')
  113. if 'method' in request.str_params \
  114. and request.method.lower() != request.str_params['method'].lower():
  115. content = ''
  116. status = 405
  117. if isinstance(content, str):
  118. raise TypeError("Expected raw byte string for content")
  119. headers = [
  120. item
  121. for item in list(request.str_params.items())
  122. if item[0] not in ('content', 'status')
  123. ]
  124. if 'length' in request.str_params:
  125. cl = request.str_params.get('length')
  126. headers += [('Content-Length', cl)]
  127. elif content and 'no-content-length' not in request.str_params:
  128. headers += [('Content-Length', bytes(len(content)))]
  129. start_response(
  130. '%d %s' % (status, responses[status]),
  131. headers
  132. )
  133. return [content]
  134. class MockTimeoutTestServer(MockHTTPServer):
  135. """
  136. Sleeps ``timeout`` seconds before responding. Make sure that your timeout value is
  137. less than this to check handling timeout conditions.
  138. """
  139. def __init__(self, timeout):
  140. super(MockTimeoutTestServer, self).__init__()
  141. self.timeout = timeout
  142. def __call__(self, environ, start_response):
  143. # Sleep until self.timeout or the parent thread finishes
  144. sleep(self.timeout)
  145. start_response('200 OK', [('Content-Type', 'text/plain')])
  146. return ['xyz']
  147. def get_file_content(data_filename):
  148. filepath = os.path.join(os.path.dirname(__file__), 'data', data_filename)
  149. assert os.path.exists(filepath), filepath
  150. with open(filepath, 'rb') as f:
  151. return f.read()
  152. class MockWmsServer(MockHTTPServer):
  153. """Acts like an OGC WMS server (well, one basic call)
  154. """
  155. def __init__(self, wms_version='1.3'):
  156. self.wms_version = wms_version
  157. super(MockWmsServer, self).__init__()
  158. def __call__(self, environ, start_response):
  159. from http.client import responses
  160. from webob import Request
  161. request = Request(environ)
  162. status = int(request.str_params.get('status', '200'))
  163. headers = {'Content-Type': 'text/plain'}
  164. # e.g. params ?service=WMS&request=GetCapabilities&version=1.1.1
  165. if request.str_params.get('service') != 'WMS':
  166. status = 200
  167. content = ERROR_WRONG_SERVICE
  168. elif request.str_params.get('request') != 'GetCapabilities':
  169. status = 405
  170. content = '"request" param wrong'
  171. elif 'version' in request.str_params and \
  172. request.str_params.get('version') != self.wms_version:
  173. status = 405
  174. content = '"version" not compatible - need to be %s' % self.wms_version
  175. elif self.wms_version == '1.1.1':
  176. status = 200
  177. content = get_file_content('wms_getcap_1.1.1.xml')
  178. elif self.wms_version == '1.3':
  179. status = 200
  180. content = get_file_content('wms_getcap_1.3.xml')
  181. start_response(
  182. '%d %s' % (status, responses[status]),
  183. list(headers.items())
  184. )
  185. return [content]
  186. class MockWfsServer(MockHTTPServer):
  187. """Acts like an OGC WFS server (well, one basic call)
  188. """
  189. def __init__(self):
  190. super(MockWfsServer, self).__init__()
  191. def __call__(self, environ, start_response):
  192. from http.client import responses
  193. from webob import Request
  194. request = Request(environ)
  195. status = int(request.str_params.get('status', '200'))
  196. headers = {'Content-Type': 'text/plain'}
  197. # e.g. params ?service=WFS&request=GetCapabilities
  198. if request.str_params.get('service') != 'WFS':
  199. status = 200
  200. content = ERROR_WRONG_SERVICE
  201. elif request.str_params.get('request') != 'GetCapabilities':
  202. status = 405
  203. content = '"request" param wrong'
  204. else:
  205. status = 200
  206. content = get_file_content('wfs_getcap.xml')
  207. start_response(
  208. '%d %s' % (status, responses[status]),
  209. list(headers.items())
  210. )
  211. return [content]
  212. ERROR_WRONG_SERVICE = "<ows:ExceptionReport version='1.1.0' language='en'" \
  213. " xmlns:ows='http://www.opengis.net/ows'><ows:Exception exceptionCode='NoApplicableCode'>" \
  214. "<ows:ExceptionText>Wrong service type.</ows:ExceptionText></ows:Exception></ows:ExceptionReport>"