test_utils.py 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. import sys
  2. import pytest
  3. from collections import Counter
  4. from io import BytesIO
  5. import os
  6. import tempfile
  7. import warcio.utils as utils
  8. from . import get_test_file
  9. try:
  10. from multidict import CIMultiDict, MultiDict
  11. except ImportError:
  12. pass
  13. class TestUtils(object):
  14. def test_headers_to_str_headers(self):
  15. result = [('foo', 'bar'), ('baz', 'barf')]
  16. header_dict = {'foo': b'bar', b'baz': 'barf'}
  17. ret = utils.headers_to_str_headers(header_dict)
  18. assert Counter(ret) == Counter(result)
  19. aiohttp_raw_headers = ((b'foo', b'bar'), (b'baz', b'barf'))
  20. assert Counter(utils.headers_to_str_headers(aiohttp_raw_headers)) == Counter(result)
  21. @pytest.mark.skipif('multidict' not in sys.modules, reason='requires multidict be installed')
  22. def test_multidict_headers_to_str_headers(self):
  23. result = [('foo', 'bar'), ('baz', 'barf')]
  24. aiohttp_headers = MultiDict(foo='bar', baz=b'barf')
  25. ret = utils.headers_to_str_headers(aiohttp_headers)
  26. assert Counter(ret) == Counter(result)
  27. # This case-insensitive thingie titlecases the key
  28. aiohttp_headers = CIMultiDict(Foo='bar', Baz=b'barf')
  29. titlecase_result = [('Foo', 'bar'), ('Baz', 'barf')]
  30. ret = utils.headers_to_str_headers(aiohttp_headers)
  31. assert Counter(ret) == Counter(titlecase_result)
  32. def test_open_or_default(self):
  33. default_fh = BytesIO(b'NOTWARC/1.0\r\n')
  34. with utils.open_or_default(get_test_file('example.warc'), 'rb', default_fh) as fh:
  35. assert fh.readline().decode('utf-8') == 'WARC/1.0\r\n'
  36. with utils.open_or_default(None, 'rb', default_fh) as fh:
  37. assert fh.readline().decode('utf-8') == 'NOTWARC/1.0\r\n'
  38. default_fh.seek(0)
  39. with utils.open_or_default(b'-', 'rb', default_fh) as fh:
  40. assert fh.readline().decode('utf-8') == 'NOTWARC/1.0\r\n'
  41. default_fh.seek(0)
  42. with utils.open_or_default(u'-', 'rb', default_fh) as fh:
  43. assert fh.readline().decode('utf-8') == 'NOTWARC/1.0\r\n'
  44. default_fh.seek(0)
  45. with utils.open_or_default(default_fh, 'rb', None) as fh:
  46. assert fh.readline().decode('utf-8') == 'NOTWARC/1.0\r\n'
  47. def test_to_native_str(self):
  48. # binary string
  49. assert utils.to_native_str(b'10') == '10'
  50. # unicode string
  51. assert utils.to_native_str(u'10') == '10'
  52. # default string
  53. assert utils.to_native_str('10') == '10'
  54. # not string, leave as is
  55. assert utils.to_native_str(10) == 10
  56. def test_open_exclusive(self):
  57. temp_dir = tempfile.mkdtemp('warctest')
  58. full_name = os.path.join(temp_dir, 'foo.txt')
  59. with utils.open(full_name, 'xb') as fh:
  60. fh.write(b'test\r\nfoo')
  61. with pytest.raises(OSError):
  62. with utils.open(full_name, 'xb') as fh:
  63. fh.write(b'test\r\nfoo')
  64. with utils.open(full_name, 'rb') as fh:
  65. assert fh.read() == b'test\r\nfoo'
  66. os.remove(full_name)
  67. os.rmdir(temp_dir)