| Index: swarm_client/tests/zip_package_test.py
|
| ===================================================================
|
| --- swarm_client/tests/zip_package_test.py (revision 235167)
|
| +++ swarm_client/tests/zip_package_test.py (working copy)
|
| @@ -1,256 +0,0 @@
|
| -#!/usr/bin/env python
|
| -# Copyright 2013 The Chromium Authors. All rights reserved.
|
| -# Use of this source code is governed by a BSD-style license that can be
|
| -# found in the LICENSE file.
|
| -
|
| -import cStringIO as StringIO
|
| -import logging
|
| -import os
|
| -import shutil
|
| -import subprocess
|
| -import sys
|
| -import tempfile
|
| -import unittest
|
| -import zipfile
|
| -
|
| -ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
| -sys.path.insert(0, ROOT_DIR)
|
| -
|
| -from utils import zip_package
|
| -
|
| -
|
| -class ZipPackageTest(unittest.TestCase):
|
| - def setUp(self):
|
| - super(ZipPackageTest, self).setUp()
|
| - self.temp_dir = tempfile.mkdtemp(prefix='zip_package_test')
|
| -
|
| - def tearDown(self):
|
| - shutil.rmtree(self.temp_dir)
|
| - super(ZipPackageTest, self).tearDown()
|
| -
|
| - def stage_files(self, files):
|
| - """Populates temp directory with given files specified as a list or dict."""
|
| - if not isinstance(files, dict):
|
| - files = dict((p, '') for p in files)
|
| - for path, content in files.iteritems():
|
| - abs_path = os.path.join(self.temp_dir, path.replace('/', os.sep))
|
| - dir_path = os.path.dirname(abs_path)
|
| - if not os.path.exists(dir_path):
|
| - os.makedirs(dir_path)
|
| - with open(abs_path, 'wb') as f:
|
| - f.write(content)
|
| -
|
| - @staticmethod
|
| - def read_zip(stream):
|
| - """Given some stream with zip data, reads and decompresses it into dict."""
|
| - zip_file = zipfile.ZipFile(stream, 'r')
|
| - try:
|
| - return dict((i.filename, zip_file.read(i)) for i in zip_file.infolist())
|
| - finally:
|
| - zip_file.close()
|
| -
|
| - def test_require_absolute_root(self):
|
| - # Absolute path is ok.
|
| - zip_package.ZipPackage(self.temp_dir)
|
| - # Relative path is not ok.
|
| - with self.assertRaises(AssertionError):
|
| - zip_package.ZipPackage('.')
|
| -
|
| - def test_require_absolute_file_paths(self):
|
| - # Add some files to temp_dir.
|
| - self.stage_files([
|
| - 'a.txt',
|
| - 'b.py',
|
| - 'c/c.txt',
|
| - ])
|
| -
|
| - # Item to add -> method used to add it.
|
| - cases = [
|
| - ('a.txt', zip_package.ZipPackage.add_file),
|
| - ('b.py', zip_package.ZipPackage.add_python_file),
|
| - ('c', zip_package.ZipPackage.add_directory),
|
| - ]
|
| -
|
| - for path, method in cases:
|
| - pkg = zip_package.ZipPackage(self.temp_dir)
|
| - # Absolute path is ok.
|
| - method(pkg, os.path.join(self.temp_dir, path))
|
| - # Relative path is not ok.
|
| - with self.assertRaises(AssertionError):
|
| - method(pkg, path)
|
| -
|
| - def test_added_files_are_under_root(self):
|
| - # Add some files to temp_dir.
|
| - self.stage_files([
|
| - 'a.txt',
|
| - 'p.py',
|
| - 'pkg/1.txt',
|
| - 'some_dir/2.txt',
|
| - ])
|
| -
|
| - # Adding using |archive_path| should work.
|
| - pkg = zip_package.ZipPackage(os.path.join(self.temp_dir, 'pkg'))
|
| - pkg.add_file(os.path.join(self.temp_dir, 'a.txt'), '_a.txt')
|
| - pkg.add_python_file(os.path.join(self.temp_dir, 'p.py'), '_p.py')
|
| - pkg.add_directory(os.path.join(self.temp_dir, 'pkg'), '_pkg')
|
| -
|
| - # Adding without |archive_path| should fail.
|
| - with self.assertRaises(zip_package.ZipPackageError):
|
| - pkg.add_file(os.path.join(self.temp_dir, 'a.txt'))
|
| - with self.assertRaises(zip_package.ZipPackageError):
|
| - pkg.add_python_file(os.path.join(self.temp_dir, 'p.py'))
|
| - with self.assertRaises(zip_package.ZipPackageError):
|
| - pkg.add_directory(os.path.join(self.temp_dir, 'a.txt'))
|
| -
|
| - def test_adding_missing_files(self):
|
| - pkg = zip_package.ZipPackage(self.temp_dir)
|
| - with self.assertRaises(zip_package.ZipPackageError):
|
| - pkg.add_file(os.path.join(self.temp_dir, 'im_not_here'))
|
| - with self.assertRaises(zip_package.ZipPackageError):
|
| - pkg.add_python_file(os.path.join(self.temp_dir, 'im_not_here.py'))
|
| - with self.assertRaises(zip_package.ZipPackageError):
|
| - pkg.add_directory(os.path.join(self.temp_dir, 'im_not_here_dir'))
|
| -
|
| - def test_adding_dir_as_file(self):
|
| - # Create 'dir'.
|
| - self.stage_files(['dir/keep'])
|
| - # Try to add it as file, not a directory.
|
| - pkg = zip_package.ZipPackage(self.temp_dir)
|
| - with self.assertRaises(zip_package.ZipPackageError):
|
| - pkg.add_file(os.path.join(self.temp_dir, 'dir'))
|
| - # Adding as directory works.
|
| - pkg.add_directory(os.path.join(self.temp_dir, 'dir'))
|
| -
|
| - def test_adding_non_python_as_python(self):
|
| - self.stage_files(['file.sh'])
|
| - pkg = zip_package.ZipPackage(self.temp_dir)
|
| - with self.assertRaises(zip_package.ZipPackageError):
|
| - pkg.add_python_file(os.path.join(self.temp_dir, 'file.sh'))
|
| -
|
| - def test_adding_py_instead_of_pyc(self):
|
| - self.stage_files([
|
| - 'file.py',
|
| - 'file.pyo',
|
| - 'file.pyc',
|
| - ])
|
| - for alternative in ('file.pyc', 'file.pyo'):
|
| - pkg = zip_package.ZipPackage(self.temp_dir)
|
| - pkg.add_python_file(os.path.join(self.temp_dir, alternative))
|
| - self.assertIn('file.py', pkg.files)
|
| -
|
| - def test_adding_same_file_twice(self):
|
| - self.stage_files(['file'])
|
| - pkg = zip_package.ZipPackage(self.temp_dir)
|
| - pkg.add_file(os.path.join(self.temp_dir, 'file'))
|
| - with self.assertRaises(zip_package.ZipPackageError):
|
| - pkg.add_file(os.path.join(self.temp_dir, 'file'))
|
| -
|
| - def test_add_directory(self):
|
| - should_add = [
|
| - 'script.py',
|
| - 'a/1.txt',
|
| - 'a/2.txt',
|
| - 'a/b/3.txt',
|
| - 'a/script.py',
|
| - ]
|
| - should_ignore = [
|
| - 'script.pyc',
|
| - 'a/script.pyo',
|
| - '.git/stuff',
|
| - '.svn/stuff',
|
| - 'a/.svn/stuff',
|
| - 'a/b/.svn/stuff',
|
| - ]
|
| - # Add a whole set and verify only files from |should_add| were added.
|
| - self.stage_files(should_add + should_ignore)
|
| - pkg = zip_package.ZipPackage(self.temp_dir)
|
| - pkg.add_directory(self.temp_dir)
|
| - self.assertEqual(set(pkg.files), set(should_add))
|
| -
|
| - def test_archive_path_is_respected(self):
|
| - self.stage_files(['a', 'b.py', 'dir/c'])
|
| - pkg = zip_package.ZipPackage(self.temp_dir)
|
| - pkg.add_file(os.path.join(self.temp_dir, 'a'), 'd1/a')
|
| - pkg.add_python_file(os.path.join(self.temp_dir, 'b.py'), 'd2/b.py')
|
| - pkg.add_directory(os.path.join(self.temp_dir, 'dir'), 'd3')
|
| - self.assertEqual(set(pkg.files), set(['d1/a', 'd2/b.py', 'd3/c']))
|
| -
|
| - def test_add_buffer(self):
|
| - pkg = zip_package.ZipPackage(self.temp_dir)
|
| - pkg.add_buffer('buf1', '')
|
| - self.assertEqual(pkg.files, ['buf1'])
|
| - # No unicode.
|
| - with self.assertRaises(AssertionError):
|
| - pkg.add_buffer('buf2', u'unicode')
|
| -
|
| - def test_zipping(self):
|
| - data = {'a': '123', 'b/c': '456'}
|
| - self.stage_files(data)
|
| - pkg = zip_package.ZipPackage(self.temp_dir)
|
| - pkg.add_directory(self.temp_dir)
|
| - # Test zip_into_buffer produces readable zip with same content.
|
| - for compress in (True, False):
|
| - buf = pkg.zip_into_buffer(compress=compress)
|
| - self.assertEqual(data, self.read_zip(StringIO.StringIO(buf)))
|
| - # Test zip_into_file produces readable zip with same content.
|
| - for compress in (True, False):
|
| - path = os.path.join(self.temp_dir, 'pkg.zip')
|
| - pkg.zip_into_file(path, compress=compress)
|
| - with open(path, 'rb') as f:
|
| - self.assertEqual(data, self.read_zip(f))
|
| -
|
| - def test_repeatable_content(self):
|
| - content = []
|
| - for _ in range(2):
|
| - # Build temp dir content from scratch.
|
| - assert not os.listdir(self.temp_dir)
|
| - self.stage_files({'a': '123', 'b': '456', 'c': '789'})
|
| - # Zip it.
|
| - pkg = zip_package.ZipPackage(self.temp_dir)
|
| - pkg.add_directory(self.temp_dir)
|
| - content.append(pkg.zip_into_buffer())
|
| - # Clear everything.
|
| - for name in os.listdir(self.temp_dir):
|
| - os.remove(os.path.join(self.temp_dir, name))
|
| - # Contents of both runs should match exactly.
|
| - self.assertEqual(content[0], content[1])
|
| -
|
| - def test_running_from_zip(self):
|
| - # Test assumes that it runs from a normal checkout, not a zip.
|
| - self.assertFalse(zip_package.is_zipped_module(sys.modules[__name__]))
|
| - self.assertIsNone(zip_package.get_module_zip_archive(sys.modules[__name__]))
|
| - self.assertTrue(os.path.abspath(
|
| - zip_package.get_main_script_path()).startswith(ROOT_DIR))
|
| -
|
| - # Build executable zip that calls same functions.
|
| - pkg = zip_package.ZipPackage(self.temp_dir)
|
| - pkg.add_directory(os.path.join(ROOT_DIR, 'utils'), 'utils')
|
| - pkg.add_buffer('__main__.py', '\n'.join([
|
| - 'import sys',
|
| - '',
|
| - 'from utils import zip_package',
|
| - '',
|
| - 'print zip_package.is_zipped_module(sys.modules[__name__])',
|
| - 'print zip_package.get_module_zip_archive(sys.modules[__name__])',
|
| - 'print zip_package.get_main_script_path()',
|
| - ]))
|
| - zip_file = os.path.join(self.temp_dir, 'out.zip')
|
| - pkg.zip_into_file(zip_file)
|
| -
|
| - # Run the zip, validate results.
|
| - proc = subprocess.Popen(
|
| - [sys.executable, zip_file],
|
| - stdout=subprocess.PIPE,
|
| - stderr=subprocess.PIPE)
|
| - out, err = proc.communicate()
|
| -
|
| - actual = out.strip().splitlines()
|
| - expected = ['True', zip_file, zip_file,]
|
| - self.assertEqual(err, '')
|
| - self.assertEqual(actual, expected)
|
| -
|
| -
|
| -if __name__ == '__main__':
|
| - VERBOSE = '-v' in sys.argv
|
| - logging.basicConfig(level=logging.DEBUG if VERBOSE else logging.ERROR)
|
| - unittest.main()
|
|
|