123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237 |
- """ Swift tests """
- import sys
- import os
- import copy
- import logging
- from sys import exc_info
- from contextlib import contextmanager
- from tempfile import NamedTemporaryFile
- from eventlet.green import socket
- from tempfile import mkdtemp
- from shutil import rmtree
- from tests import get_config
- from ConfigParser import MissingSectionHeaderError
- from StringIO import StringIO
- from swift.common.utils import readconf, TRUE_VALUES
- from logging import Handler
- import logging.handlers
- def readuntil2crlfs(fd):
- rv = ''
- lc = ''
- crlfs = 0
- while crlfs < 2:
- c = fd.read(1)
- rv = rv + c
- if c == '\r' and lc != '\n':
- crlfs = 0
- if lc == '\r' and c == '\n':
- crlfs += 1
- lc = c
- return rv
- def connect_tcp(hostport):
- rv = socket.socket()
- rv.connect(hostport)
- return rv
- @contextmanager
- def tmpfile(content):
- with NamedTemporaryFile('w', delete=False) as f:
- file_name = f.name
- f.write(str(content))
- try:
- yield file_name
- finally:
- os.unlink(file_name)
- xattr_data = {}
- def _get_inode(fd):
- if not isinstance(fd, int):
- try:
- fd = fd.fileno()
- except AttributeError:
- return os.stat(fd).st_ino
- return os.fstat(fd).st_ino
- def _setxattr(fd, k, v):
- inode = _get_inode(fd)
- data = xattr_data.get(inode, {})
- data[k] = v
- xattr_data[inode] = data
- def _getxattr(fd, k):
- inode = _get_inode(fd)
- data = xattr_data.get(inode, {}).get(k)
- if not data:
- raise IOError
- return data
- import xattr
- xattr.setxattr = _setxattr
- xattr.getxattr = _getxattr
- @contextmanager
- def temptree(files, contents=''):
- # generate enough contents to fill the files
- c = len(files)
- contents = (list(contents) + [''] * c)[:c]
- tempdir = mkdtemp()
- for path, content in zip(files, contents):
- if os.path.isabs(path):
- path = '.' + path
- new_path = os.path.join(tempdir, path)
- subdir = os.path.dirname(new_path)
- if not os.path.exists(subdir):
- os.makedirs(subdir)
- with open(new_path, 'w') as f:
- f.write(str(content))
- try:
- yield tempdir
- finally:
- rmtree(tempdir)
- class NullLoggingHandler(logging.Handler):
- def emit(self, record):
- pass
- class FakeLogger(object):
- # a thread safe logger
- def __init__(self, *args, **kwargs):
- self._clear()
- self.level = logging.NOTSET
- if 'facility' in kwargs:
- self.facility = kwargs['facility']
- def _clear(self):
- self.log_dict = dict(
- error=[], info=[], warning=[], debug=[], exception=[])
- def error(self, *args, **kwargs):
- self.log_dict['error'].append((args, kwargs))
- def info(self, *args, **kwargs):
- self.log_dict['info'].append((args, kwargs))
- def warning(self, *args, **kwargs):
- self.log_dict['warning'].append((args, kwargs))
- def debug(self, *args, **kwargs):
- self.log_dict['debug'].append((args, kwargs))
- def exception(self, *args, **kwargs):
- self.log_dict['exception'].append((args, kwargs, str(exc_info()[1])))
- # mock out the StatsD logging methods:
- def set_statsd_prefix(self, *a, **kw):
- pass
- increment = decrement = timing = timing_since = update_stats = \
- set_statsd_prefix
- def setFormatter(self, obj):
- self.formatter = obj
- def close(self):
- self._clear()
- def set_name(self, name):
- # don't touch _handlers
- self._name = name
- def acquire(self):
- pass
- def release(self):
- pass
- def createLock(self):
- pass
- def emit(self, record):
- pass
- def handle(self, record):
- pass
- def flush(self):
- pass
- def handleError(self, record):
- pass
- original_syslog_handler = logging.handlers.SysLogHandler
- def fake_syslog_handler():
- for attr in dir(original_syslog_handler):
- if attr.startswith('LOG'):
- setattr(FakeLogger, attr,
- copy.copy(getattr(logging.handlers.SysLogHandler, attr)))
- FakeLogger.priority_map = \
- copy.deepcopy(logging.handlers.SysLogHandler.priority_map)
- logging.handlers.SysLogHandler = FakeLogger
- if get_config('unit_test').get('fake_syslog', 'False').lower() in TRUE_VALUES:
- fake_syslog_handler()
- class MockTrue(object):
- """
- Instances of MockTrue evaluate like True
- Any attr accessed on an instance of MockTrue will return a MockTrue
- instance. Any method called on an instance of MockTrue will return
- a MockTrue instance.
- >>> thing = MockTrue()
- >>> thing
- True
- >>> thing == True # True == True
- True
- >>> thing == False # True == False
- False
- >>> thing != True # True != True
- False
- >>> thing != False # True != False
- True
- >>> thing.attribute
- True
- >>> thing.method()
- True
- >>> thing.attribute.method()
- True
- >>> thing.method().attribute
- True
- """
- def __getattribute__(self, *args, **kwargs):
- return self
- def __call__(self, *args, **kwargs):
- return self
- def __repr__(*args, **kwargs):
- return repr(True)
- def __eq__(self, other):
- return other is True
- def __ne__(self, other):
- return other is not True