| Index: client/libs/arfile/arfile_test.py
|
| diff --git a/client/libs/arfile/arfile_test.py b/client/libs/arfile/arfile_test.py
|
| new file mode 100755
|
| index 0000000000000000000000000000000000000000..08290d784f19de07107dff8abce22996c531df30
|
| --- /dev/null
|
| +++ b/client/libs/arfile/arfile_test.py
|
| @@ -0,0 +1,603 @@
|
| +#!/usr/bin/env python
|
| +# Copyright 2016 The LUCI Authors. All rights reserved.
|
| +# Use of this source code is governed under the Apache License, Version 2.0
|
| +# that can be found in the LICENSE file.
|
| +
|
| +# pylint: disable=relative-import
|
| +
|
| +import doctest
|
| +import io
|
| +import os
|
| +import shutil
|
| +import subprocess
|
| +import sys
|
| +import tempfile
|
| +import unittest
|
| +
|
| +import arfile
|
| +import cli
|
| +
|
| +
|
| +ARFILE_DIR = os.path.dirname(os.path.abspath(__file__))
|
| +sys.path.insert(0, ARFILE_DIR)
|
| +
|
| +
|
| +if not hasattr(subprocess, 'DEVNULL'):
|
| + subprocess.DEVNULL = file(os.devnull, 'wb')
|
| +
|
| +
|
| +def filesystem_supports_unicode():
|
| + try:
|
| + u'\u2603'.encode(sys.getfilesystemencoding())
|
| + return True
|
| + except UnicodeEncodeError:
|
| + return False
|
| +
|
| +
|
| +class ClosesSaveIOBytes(io.BytesIO):
|
| +
|
| + def close(self):
|
| + _value = self.getvalue()
|
| + self.getvalue = lambda: _value
|
| + io.BytesIO.close(self)
|
| +
|
| +
|
| +AR_TEST_SIMPLE1 = (
|
| + # ar file header
|
| + '!<arch>\n'
|
| + # File 1
|
| + # ----------------------
|
| + # (16 bytes) simple file
|
| + 'filename1 '
|
| + # (12 bytes) modification time
|
| + '123 '
|
| + # (6 bytes) user id
|
| + '1000 '
|
| + # (6 bytes) group id
|
| + '1000 '
|
| + # (8 bytes) file mode
|
| + '100640 '
|
| + # (10 bytes) data size
|
| + '6 '
|
| + # (2 bytes) file magic
|
| + '\x60\n'
|
| + # File data
|
| + 'abc123'
|
| + # Finished
|
| + '')
|
| +
|
| +AR_TEST_SIMPLE_UTF = (
|
| + # ar file header
|
| + '!<arch>\n'
|
| + # File 1
|
| + # ----------------------
|
| + # (16 bytes) simple file
|
| + '\xe2\x98\x83 '
|
| + # (12 bytes) modification time
|
| + '123 '
|
| + # (6 bytes) user id
|
| + '1000 '
|
| + # (6 bytes) group id
|
| + '1000 '
|
| + # (8 bytes) file mode
|
| + '100640 '
|
| + # (10 bytes) data size
|
| + '4 '
|
| + # (2 bytes) file magic
|
| + '\x60\n'
|
| + # (4 bytes) File data
|
| + '\xf0\x9f\x92\xa9'
|
| + # Finished
|
| + '')
|
| +
|
| +AR_TEST_BSD1 = (
|
| + # ar file header
|
| + '!<arch>\n'
|
| + # File 1
|
| + # ----------------------
|
| + # (16 bytes) BSD style filename length
|
| + '#1/9 '
|
| + # (12 bytes) modification time
|
| + '1234 '
|
| + # (6 bytes) user id
|
| + '1001 '
|
| + # (6 bytes) group id
|
| + '1001 '
|
| + # (8 bytes) file mode
|
| + '100644 '
|
| + # (10 bytes) data size
|
| + '15 '
|
| + # (2 bytes) file magic
|
| + '\x60\n'
|
| + # BSD style filename
|
| + 'filename1'
|
| + # File data
|
| + 'abc123'
|
| + # Padding
|
| + '\n'
|
| + # Finished
|
| + '')
|
| +
|
| +AR_TEST_BSD2 = (
|
| + # ar file header
|
| + '!<arch>\n'
|
| +
|
| + # File 1
|
| + # ----------------------
|
| + # (16 bytes) filename len
|
| + '#1/5 '
|
| + # (12 bytes) mtime
|
| + '1447140471 '
|
| + # (6 bytes) owner id
|
| + '1000 '
|
| + # (6 bytes) group id
|
| + '1000 '
|
| + # (8 bytes) file mode
|
| + '100640 '
|
| + # (10 bytes) Data size
|
| + '13 '
|
| + # (2 bytes) File magic
|
| + '\x60\n'
|
| + # (9 bytes) File name
|
| + 'file1'
|
| + # (6 bytes) File data
|
| + 'contents'
|
| + # (1 byte) Padding
|
| + '\n'
|
| +
|
| + # File 2
|
| + # ----------------------
|
| + # (16 bytes) filename len
|
| + '#1/7 '
|
| + # (12 bytes) mtime
|
| + '1447140471 '
|
| + # (6 bytes) owner id
|
| + '1000 '
|
| + # (6 bytes) group id
|
| + '1000 '
|
| + # (8 bytes) file mode
|
| + '100640 '
|
| + # (10 bytes) Data size
|
| + '10 '
|
| + # (2 bytes) File magic
|
| + '\x60\n'
|
| + # (9 bytes) File name
|
| + 'fileabc'
|
| + # (6 bytes) File data
|
| + '123'
|
| + # (0 byte) No padding
|
| + ''
|
| +
|
| + # File 3
|
| + # ----------------------
|
| + # (16 bytes) filename len
|
| + '#1/10 '
|
| + # (12 bytes) mtime
|
| + '1447140471 '
|
| + # (6 bytes) owner id
|
| + '1000 '
|
| + # (6 bytes) group id
|
| + '1000 '
|
| + # (8 bytes) file mode
|
| + '100640 '
|
| + # (10 bytes) Data size
|
| + '16 '
|
| + # (2 bytes) File magic
|
| + '\x60\n'
|
| + # (9 bytes) File name
|
| + 'dir1/file1'
|
| + # (6 bytes) File data
|
| + '123abc'
|
| + # (0 byte) No padding
|
| + ''
|
| +
|
| + # Finished
|
| + '')
|
| +
|
| +AR_TEST_BSD_UTF = (
|
| + # ar file header
|
| + '!<arch>\n'
|
| + # File 1
|
| + # ----------------------
|
| + # (16 bytes) BSD style filename length
|
| + '#1/3 '
|
| + # (12 bytes) modification time
|
| + '1234 '
|
| + # (6 bytes) user id
|
| + '1001 '
|
| + # (6 bytes) group id
|
| + '1001 '
|
| + # (8 bytes) file mode
|
| + '100644 '
|
| + # (10 bytes) data size
|
| + '7 '
|
| + # (2 bytes) file magic
|
| + '\x60\n'
|
| + # (3 bytes) BSD style filename
|
| + '\xe2\x98\x83'
|
| + # (4 bytes) File data
|
| + '\xf0\x9f\x92\xa9'
|
| + # Padding
|
| + '\n'
|
| + # Finished
|
| + '')
|
| +
|
| +
|
| +class TestArFileReader(unittest.TestCase):
|
| +
|
| + def testSimple1(self):
|
| + fileobj = io.BytesIO(AR_TEST_SIMPLE1)
|
| +
|
| + afri = iter(arfile.ArFileReader(fileobj))
|
| + ai, af = afri.next()
|
| + self.assertIs(arfile.AR_FORMAT_SIMPLE, ai.format)
|
| + self.assertEqual('filename1', ai.name)
|
| + self.assertEqual(6, ai.size)
|
| + self.assertEqual(123, ai.mtime)
|
| + self.assertEqual(1000, ai.uid)
|
| + self.assertEqual(1000, ai.gid)
|
| + self.assertEqual('0100640', oct(ai.mode))
|
| + self.assertEqual('abc123', af.read(ai.size))
|
| +
|
| + def testSimpleUTF(self):
|
| + fileobj = io.BytesIO(AR_TEST_SIMPLE_UTF)
|
| +
|
| + afri = iter(arfile.ArFileReader(fileobj))
|
| + ai, af = afri.next()
|
| + self.assertIs(arfile.AR_FORMAT_SIMPLE, ai.format)
|
| + self.assertEqual(u'\u2603', ai.name)
|
| + self.assertEqual(4, ai.size)
|
| + self.assertEqual(123, ai.mtime)
|
| + self.assertEqual(1000, ai.uid)
|
| + self.assertEqual(1000, ai.gid)
|
| + self.assertEqual('0100640', oct(ai.mode))
|
| + self.assertEqual(u'\U0001f4a9', af.read(ai.size).decode('utf-8'))
|
| +
|
| + def testBSD1(self):
|
| + fileobj = io.BytesIO(AR_TEST_BSD1)
|
| +
|
| + afri = iter(arfile.ArFileReader(fileobj))
|
| + ai, af = afri.next()
|
| + self.assertIs(arfile.AR_FORMAT_BSD, ai.format)
|
| + self.assertEqual('filename1', ai.name)
|
| + self.assertEqual(6, ai.size)
|
| + self.assertEqual(1234, ai.mtime)
|
| + self.assertEqual(1001, ai.uid)
|
| + self.assertEqual(1001, ai.gid)
|
| + self.assertEqual('0100644', oct(ai.mode))
|
| + self.assertEqual('abc123', af.read(ai.size))
|
| +
|
| + def testBSD2(self):
|
| + fileobj = io.BytesIO(AR_TEST_BSD2)
|
| +
|
| + afri = iter(arfile.ArFileReader(fileobj))
|
| + ai, af = afri.next()
|
| + self.assertIs(arfile.AR_FORMAT_BSD, ai.format)
|
| + self.assertEqual('file1', ai.name)
|
| + self.assertEqual(8, ai.size)
|
| + self.assertEqual(1447140471, ai.mtime)
|
| + self.assertEqual(1000, ai.uid)
|
| + self.assertEqual(1000, ai.gid)
|
| + self.assertEqual('0100640', oct(ai.mode))
|
| + self.assertEqual('contents', af.read(ai.size))
|
| +
|
| + ai, af = afri.next()
|
| + self.assertIs(arfile.AR_FORMAT_BSD, ai.format)
|
| + self.assertEqual('fileabc', ai.name)
|
| + self.assertEqual(3, ai.size)
|
| + self.assertEqual(1447140471, ai.mtime)
|
| + self.assertEqual(1000, ai.uid)
|
| + self.assertEqual(1000, ai.gid)
|
| + self.assertEqual('0100640', oct(ai.mode))
|
| + self.assertEqual('123', af.read(ai.size))
|
| +
|
| + ai, af = afri.next()
|
| + self.assertIs(arfile.AR_FORMAT_BSD, ai.format)
|
| + self.assertEqual('dir1/file1', ai.name)
|
| + self.assertEqual(6, ai.size)
|
| + self.assertEqual(1447140471, ai.mtime)
|
| + self.assertEqual(1000, ai.uid)
|
| + self.assertEqual(1000, ai.gid)
|
| + self.assertEqual('0100640', oct(ai.mode))
|
| + self.assertEqual('123abc', af.read(ai.size))
|
| +
|
| + def testBSDUTF(self):
|
| + fileobj = io.BytesIO(AR_TEST_BSD_UTF)
|
| +
|
| + afri = iter(arfile.ArFileReader(fileobj))
|
| + ai, af = afri.next()
|
| + self.assertIs(arfile.AR_FORMAT_BSD, ai.format)
|
| + self.assertEqual(u'\u2603', ai.name)
|
| + self.assertEqual(4, ai.size)
|
| + self.assertEqual(1234, ai.mtime)
|
| + self.assertEqual(1001, ai.uid)
|
| + self.assertEqual(1001, ai.gid)
|
| + self.assertEqual('0100644', oct(ai.mode))
|
| + self.assertEqual(u'\U0001f4a9', af.read(ai.size).decode('utf-8'))
|
| +
|
| +
|
| +class TestArFileWriter(unittest.TestCase):
|
| +
|
| + def testSimple1(self):
|
| + fileobj = ClosesSaveIOBytes()
|
| +
|
| + afw = arfile.ArFileWriter(fileobj)
|
| + ai = arfile.ArInfo(
|
| + arfile.AR_FORMAT_SIMPLE, 'filename1', 6, 123, 1000, 1000, 0100640)
|
| + afw.addfile(ai, io.BytesIO('abc123'))
|
| + afw.close()
|
| +
|
| + self.assertMultiLineEqual(AR_TEST_SIMPLE1, fileobj.getvalue())
|
| +
|
| + def testSimpleUTF(self):
|
| + fileobj = ClosesSaveIOBytes()
|
| +
|
| + afw = arfile.ArFileWriter(fileobj)
|
| + ai = arfile.ArInfo(
|
| + arfile.AR_FORMAT_SIMPLE, u'\u2603', 4, 123, 1000, 1000, 0100640)
|
| + afw.addfile(ai, io.BytesIO(u'\U0001f4a9'.encode('utf-8')))
|
| + afw.close()
|
| +
|
| + self.assertMultiLineEqual(AR_TEST_SIMPLE_UTF, fileobj.getvalue())
|
| +
|
| + def testBSD1(self):
|
| + fileobj = ClosesSaveIOBytes()
|
| +
|
| + afw = arfile.ArFileWriter(fileobj)
|
| + ai = arfile.ArInfo(
|
| + arfile.AR_FORMAT_BSD, 'filename1', 6, 1234, 1001, 1001, 0100644)
|
| + afw.addfile(ai, io.BytesIO('abc123'))
|
| + afw.close()
|
| +
|
| + self.assertMultiLineEqual(AR_TEST_BSD1, fileobj.getvalue())
|
| +
|
| + def testBSD2(self):
|
| + fileobj = ClosesSaveIOBytes()
|
| +
|
| + afw = arfile.ArFileWriter(fileobj)
|
| + afw.addfile(
|
| + arfile.ArInfo.fromdefault(
|
| + 'file1', 8, arformat=arfile.AR_FORMAT_BSD),
|
| + io.BytesIO('contents'))
|
| + afw.addfile(
|
| + arfile.ArInfo.fromdefault(
|
| + 'fileabc', 3, arformat=arfile.AR_FORMAT_BSD),
|
| + io.BytesIO('123'))
|
| + afw.addfile(
|
| + arfile.ArInfo.fromdefault(
|
| + 'dir1/file1', 6, arformat=arfile.AR_FORMAT_BSD),
|
| + io.BytesIO('123abc'))
|
| + afw.close()
|
| +
|
| + self.assertMultiLineEqual(AR_TEST_BSD2, fileobj.getvalue())
|
| +
|
| + def testBSDUTF(self):
|
| + fileobj = ClosesSaveIOBytes()
|
| +
|
| + afw = arfile.ArFileWriter(fileobj)
|
| + ai = arfile.ArInfo(
|
| + arfile.AR_FORMAT_BSD, u'\u2603', 4, 1234, 1001, 1001, 0100644)
|
| + afw.addfile(ai, io.BytesIO(u'\U0001f4a9'.encode('utf-8')))
|
| + afw.close()
|
| +
|
| + self.assertMultiLineEqual(AR_TEST_BSD_UTF, fileobj.getvalue())
|
| +
|
| +
|
| +class BaseTestSuite(object):
|
| +
|
| + def testSimple1(self):
|
| + self.assertWorking(
|
| + (
|
| + arfile.ArInfo(
|
| + arfile.AR_FORMAT_SIMPLE, 'filename1',
|
| + 6, 123, 1000, 1000, 0100640),
|
| + 'abc123'))
|
| +
|
| + def testSimpleUTF(self):
|
| + self.assertWorking(
|
| + (
|
| + arfile.ArInfo(
|
| + arfile.AR_FORMAT_SIMPLE, u'\u2603',
|
| + 4, 123, 1000, 1000, 0100640),
|
| + u'\U0001f4a9'.encode('utf-8')))
|
| +
|
| + def testBSD1(self):
|
| + self.assertWorking(
|
| + (
|
| + arfile.ArInfo(
|
| + arfile.AR_FORMAT_BSD, 'filename1',
|
| + 6, 123, 1000, 1000, 0100640),
|
| + 'abc123'))
|
| +
|
| + def testBSD2(self):
|
| + self.assertWorking(
|
| + (
|
| + arfile.ArInfo.fromdefault(
|
| + 'file1', 8, arformat=arfile.AR_FORMAT_BSD),
|
| + 'contents'),
|
| + (
|
| + arfile.ArInfo.fromdefault(
|
| + 'fileabc', 3, arformat=arfile.AR_FORMAT_BSD),
|
| + '123'),
|
| + (
|
| + arfile.ArInfo.fromdefault(
|
| + 'dir1/file1', 6, arformat=arfile.AR_FORMAT_BSD),
|
| + '123abc'))
|
| +
|
| + def testBSDUTF(self):
|
| + self.assertWorking(
|
| + (
|
| + arfile.ArInfo(
|
| + arfile.AR_FORMAT_BSD, u'\u2603',
|
| + 4, 123, 1000, 1000, 0100640),
|
| + u'\U0001f4a9'.encode('utf-8')))
|
| +
|
| + def testMixed(self):
|
| + self.assertWorking(
|
| + (arfile.ArInfo.fromdefault('file1', 0), ''),
|
| + (arfile.ArInfo.fromdefault('f f', 1), 'a'),
|
| + (arfile.ArInfo.fromdefault('123456789abcedefa', 1), 'a'))
|
| +
|
| +
|
| +class TestArRoundTrip(BaseTestSuite, unittest.TestCase):
|
| +
|
| + def assertWorking(self, *initems):
|
| + outfile = ClosesSaveIOBytes()
|
| +
|
| + afw = arfile.ArFileWriter(outfile)
|
| + for ai, data in initems:
|
| + assert ai.size == len(data)
|
| + afw.addfile(ai, io.BytesIO(data))
|
| + afw.close()
|
| +
|
| + infile = io.BytesIO(outfile.getvalue())
|
| + afr = arfile.ArFileReader(infile)
|
| +
|
| + outitems = []
|
| + for ai, fd in afr:
|
| + data = fd.read(ai.size)
|
| + outitems.append((ai, data))
|
| +
|
| + self.assertSequenceEqual(initems, outitems)
|
| +
|
| +
|
| +def system_has_ar():
|
| + retcode = subprocess.call(
|
| + 'ar', stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
|
| + return retcode == 1
|
| +
|
| +
|
| +@unittest.skipIf(not system_has_ar(), 'no ar binary found.')
|
| +class TestArExternal(BaseTestSuite, unittest.TestCase):
|
| +
|
| + def assertWorking(self, *initems):
|
| + tf = tempfile.NamedTemporaryFile(mode='wb')
|
| + afw = arfile.ArFileWriter(tf)
|
| +
|
| + files = []
|
| + for ai, data in initems:
|
| + files.append(ai.name)
|
| + assert ai.size == len(data)
|
| + afw.addfile(ai, io.BytesIO(data))
|
| + afw.flush()
|
| +
|
| + output = subprocess.check_output(['ar', 't', tf.name])
|
| + self.assertMultiLineEqual('\n'.join(files), output.decode('utf-8').strip())
|
| + tf.close()
|
| +
|
| +
|
| +class TestCLI(unittest.TestCase):
|
| +
|
| + def runCLI(self, args):
|
| + orig_stdout = sys.stdout
|
| + orig_stderr = sys.stderr
|
| + try:
|
| + sys.stdout = io.StringIO()
|
| + sys.stderr = io.StringIO()
|
| + cli.main('artool', args)
|
| + return sys.stdout.getvalue(), sys.stderr.getvalue()
|
| + finally:
|
| + sys.stdout = orig_stdout
|
| + sys.stderr = orig_stderr
|
| +
|
| + def assertCLI(self, *initems, **kw):
|
| + extra_args = kw.get('extra_args', [])
|
| +
|
| + indir = None
|
| + ardir = None
|
| + outdir = None
|
| + try:
|
| + indir = tempfile.mkdtemp().decode(sys.getfilesystemencoding())
|
| + ardir = tempfile.mkdtemp().decode(sys.getfilesystemencoding())
|
| + outdir = tempfile.mkdtemp().decode(sys.getfilesystemencoding())
|
| +
|
| + arp = os.path.join(ardir, 'out.ar')
|
| + assert not os.path.exists(arp)
|
| +
|
| + # Write out a directory tree
|
| + files = []
|
| + for fp, contents in initems:
|
| + fn = os.path.join(indir, fp)
|
| + dn = os.path.dirname(fn)
|
| + if not os.path.exists(dn):
|
| + os.makedirs(dn)
|
| +
|
| + with file(fn, 'wb') as f:
|
| + f.write(contents)
|
| +
|
| + files.append(fp)
|
| +
|
| + files.sort()
|
| + fileslist = '\n'.join(files)
|
| +
|
| + # Create an archive from a directory
|
| + self.runCLI(['create', '--filename', arp, indir] + extra_args)
|
| + self.assertTrue(
|
| + os.path.exists(arp), '%s file should exists' % arp)
|
| +
|
| + # List the archive contents
|
| + output, _ = self.runCLI(['list', '--filename', arp])
|
| + filesoutput = '\n'.join(sorted(output[:-1].split('\n')))
|
| + self.assertMultiLineEqual(fileslist, filesoutput)
|
| +
|
| + # Extract the archive
|
| + os.chdir(outdir)
|
| + self.runCLI(['extract', '--filename', arp] + extra_args)
|
| +
|
| + # Walk the directory tree and collect the extracted output
|
| + outitems = []
|
| + for root, _, files in os.walk(outdir):
|
| + for fn in files:
|
| + fp = os.path.join(root, fn)
|
| + outitems.append([fp[len(outdir)+1:], file(fp, 'rb').read()])
|
| +
|
| + # Check the two are equal
|
| + self.assertSequenceEqual(sorted(initems), sorted(outitems))
|
| +
|
| + finally:
|
| + if indir:
|
| + shutil.rmtree(indir, ignore_errors=True)
|
| + if ardir:
|
| + shutil.rmtree(ardir, ignore_errors=True)
|
| + if outdir:
|
| + shutil.rmtree(outdir, ignore_errors=True)
|
| +
|
| + def testSimple1(self):
|
| + self.assertCLI(['file1', 'contents1'])
|
| +
|
| + def testFullStat(self):
|
| + self.assertCLI(
|
| + ['file1', 'contents1'],
|
| + extra_args=['--dont-use-defaults'])
|
| +
|
| + def testMultiple(self):
|
| + self.assertCLI(
|
| + ['file1', 'contents1'],
|
| + ['dir1/file2', 'contents2'],
|
| + ['dir2/dir3/file3', 'contents3'],
|
| + ['file4', 'contents4'],
|
| + )
|
| +
|
| + def testUnicodeContents(self):
|
| + self.assertCLI(['file1', u'\u2603'.encode('utf-8')])
|
| +
|
| + def testFilenameSpaces(self):
|
| + self.assertCLI(
|
| + ['f f1', 'contents1'],
|
| + ['d d1/file2', 'contents2'],
|
| + ['d d1/f f3', 'contents3'],
|
| + ['file4', 'contents4'],
|
| + )
|
| +
|
| + def testBigFile(self):
|
| + self.assertCLI(['bigfile', 'data'*1024*1024*10])
|
| +
|
| + @unittest.skipIf(
|
| + not filesystem_supports_unicode(), 'no unicode file support')
|
| + def testUnicode(self):
|
| + self.assertCLI([u'\u2603', u'\U0001f4a9'.encode('utf-8')])
|
| +
|
| +
|
| +if __name__ == '__main__':
|
| + doctest.testmod(arfile)
|
| + unittest.main()
|
|
|