Index: swarm_client/tests/zip_package_test.py |
=================================================================== |
--- swarm_client/tests/zip_package_test.py (revision 235167) |
+++ swarm_client/tests/zip_package_test.py (working copy) |
@@ -1,256 +0,0 @@ |
-#!/usr/bin/env python |
-# Copyright 2013 The Chromium Authors. All rights reserved. |
-# Use of this source code is governed by a BSD-style license that can be |
-# found in the LICENSE file. |
- |
-import cStringIO as StringIO |
-import logging |
-import os |
-import shutil |
-import subprocess |
-import sys |
-import tempfile |
-import unittest |
-import zipfile |
- |
-ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) |
-sys.path.insert(0, ROOT_DIR) |
- |
-from utils import zip_package |
- |
- |
-class ZipPackageTest(unittest.TestCase): |
- def setUp(self): |
- super(ZipPackageTest, self).setUp() |
- self.temp_dir = tempfile.mkdtemp(prefix='zip_package_test') |
- |
- def tearDown(self): |
- shutil.rmtree(self.temp_dir) |
- super(ZipPackageTest, self).tearDown() |
- |
- def stage_files(self, files): |
- """Populates temp directory with given files specified as a list or dict.""" |
- if not isinstance(files, dict): |
- files = dict((p, '') for p in files) |
- for path, content in files.iteritems(): |
- abs_path = os.path.join(self.temp_dir, path.replace('/', os.sep)) |
- dir_path = os.path.dirname(abs_path) |
- if not os.path.exists(dir_path): |
- os.makedirs(dir_path) |
- with open(abs_path, 'wb') as f: |
- f.write(content) |
- |
- @staticmethod |
- def read_zip(stream): |
- """Given some stream with zip data, reads and decompresses it into dict.""" |
- zip_file = zipfile.ZipFile(stream, 'r') |
- try: |
- return dict((i.filename, zip_file.read(i)) for i in zip_file.infolist()) |
- finally: |
- zip_file.close() |
- |
- def test_require_absolute_root(self): |
- # Absolute path is ok. |
- zip_package.ZipPackage(self.temp_dir) |
- # Relative path is not ok. |
- with self.assertRaises(AssertionError): |
- zip_package.ZipPackage('.') |
- |
- def test_require_absolute_file_paths(self): |
- # Add some files to temp_dir. |
- self.stage_files([ |
- 'a.txt', |
- 'b.py', |
- 'c/c.txt', |
- ]) |
- |
- # Item to add -> method used to add it. |
- cases = [ |
- ('a.txt', zip_package.ZipPackage.add_file), |
- ('b.py', zip_package.ZipPackage.add_python_file), |
- ('c', zip_package.ZipPackage.add_directory), |
- ] |
- |
- for path, method in cases: |
- pkg = zip_package.ZipPackage(self.temp_dir) |
- # Absolute path is ok. |
- method(pkg, os.path.join(self.temp_dir, path)) |
- # Relative path is not ok. |
- with self.assertRaises(AssertionError): |
- method(pkg, path) |
- |
- def test_added_files_are_under_root(self): |
- # Add some files to temp_dir. |
- self.stage_files([ |
- 'a.txt', |
- 'p.py', |
- 'pkg/1.txt', |
- 'some_dir/2.txt', |
- ]) |
- |
- # Adding using |archive_path| should work. |
- pkg = zip_package.ZipPackage(os.path.join(self.temp_dir, 'pkg')) |
- pkg.add_file(os.path.join(self.temp_dir, 'a.txt'), '_a.txt') |
- pkg.add_python_file(os.path.join(self.temp_dir, 'p.py'), '_p.py') |
- pkg.add_directory(os.path.join(self.temp_dir, 'pkg'), '_pkg') |
- |
- # Adding without |archive_path| should fail. |
- with self.assertRaises(zip_package.ZipPackageError): |
- pkg.add_file(os.path.join(self.temp_dir, 'a.txt')) |
- with self.assertRaises(zip_package.ZipPackageError): |
- pkg.add_python_file(os.path.join(self.temp_dir, 'p.py')) |
- with self.assertRaises(zip_package.ZipPackageError): |
- pkg.add_directory(os.path.join(self.temp_dir, 'a.txt')) |
- |
- def test_adding_missing_files(self): |
- pkg = zip_package.ZipPackage(self.temp_dir) |
- with self.assertRaises(zip_package.ZipPackageError): |
- pkg.add_file(os.path.join(self.temp_dir, 'im_not_here')) |
- with self.assertRaises(zip_package.ZipPackageError): |
- pkg.add_python_file(os.path.join(self.temp_dir, 'im_not_here.py')) |
- with self.assertRaises(zip_package.ZipPackageError): |
- pkg.add_directory(os.path.join(self.temp_dir, 'im_not_here_dir')) |
- |
- def test_adding_dir_as_file(self): |
- # Create 'dir'. |
- self.stage_files(['dir/keep']) |
- # Try to add it as file, not a directory. |
- pkg = zip_package.ZipPackage(self.temp_dir) |
- with self.assertRaises(zip_package.ZipPackageError): |
- pkg.add_file(os.path.join(self.temp_dir, 'dir')) |
- # Adding as directory works. |
- pkg.add_directory(os.path.join(self.temp_dir, 'dir')) |
- |
- def test_adding_non_python_as_python(self): |
- self.stage_files(['file.sh']) |
- pkg = zip_package.ZipPackage(self.temp_dir) |
- with self.assertRaises(zip_package.ZipPackageError): |
- pkg.add_python_file(os.path.join(self.temp_dir, 'file.sh')) |
- |
- def test_adding_py_instead_of_pyc(self): |
- self.stage_files([ |
- 'file.py', |
- 'file.pyo', |
- 'file.pyc', |
- ]) |
- for alternative in ('file.pyc', 'file.pyo'): |
- pkg = zip_package.ZipPackage(self.temp_dir) |
- pkg.add_python_file(os.path.join(self.temp_dir, alternative)) |
- self.assertIn('file.py', pkg.files) |
- |
- def test_adding_same_file_twice(self): |
- self.stage_files(['file']) |
- pkg = zip_package.ZipPackage(self.temp_dir) |
- pkg.add_file(os.path.join(self.temp_dir, 'file')) |
- with self.assertRaises(zip_package.ZipPackageError): |
- pkg.add_file(os.path.join(self.temp_dir, 'file')) |
- |
- def test_add_directory(self): |
- should_add = [ |
- 'script.py', |
- 'a/1.txt', |
- 'a/2.txt', |
- 'a/b/3.txt', |
- 'a/script.py', |
- ] |
- should_ignore = [ |
- 'script.pyc', |
- 'a/script.pyo', |
- '.git/stuff', |
- '.svn/stuff', |
- 'a/.svn/stuff', |
- 'a/b/.svn/stuff', |
- ] |
- # Add a whole set and verify only files from |should_add| were added. |
- self.stage_files(should_add + should_ignore) |
- pkg = zip_package.ZipPackage(self.temp_dir) |
- pkg.add_directory(self.temp_dir) |
- self.assertEqual(set(pkg.files), set(should_add)) |
- |
- def test_archive_path_is_respected(self): |
- self.stage_files(['a', 'b.py', 'dir/c']) |
- pkg = zip_package.ZipPackage(self.temp_dir) |
- pkg.add_file(os.path.join(self.temp_dir, 'a'), 'd1/a') |
- pkg.add_python_file(os.path.join(self.temp_dir, 'b.py'), 'd2/b.py') |
- pkg.add_directory(os.path.join(self.temp_dir, 'dir'), 'd3') |
- self.assertEqual(set(pkg.files), set(['d1/a', 'd2/b.py', 'd3/c'])) |
- |
- def test_add_buffer(self): |
- pkg = zip_package.ZipPackage(self.temp_dir) |
- pkg.add_buffer('buf1', '') |
- self.assertEqual(pkg.files, ['buf1']) |
- # No unicode. |
- with self.assertRaises(AssertionError): |
- pkg.add_buffer('buf2', u'unicode') |
- |
- def test_zipping(self): |
- data = {'a': '123', 'b/c': '456'} |
- self.stage_files(data) |
- pkg = zip_package.ZipPackage(self.temp_dir) |
- pkg.add_directory(self.temp_dir) |
- # Test zip_into_buffer produces readable zip with same content. |
- for compress in (True, False): |
- buf = pkg.zip_into_buffer(compress=compress) |
- self.assertEqual(data, self.read_zip(StringIO.StringIO(buf))) |
- # Test zip_into_file produces readable zip with same content. |
- for compress in (True, False): |
- path = os.path.join(self.temp_dir, 'pkg.zip') |
- pkg.zip_into_file(path, compress=compress) |
- with open(path, 'rb') as f: |
- self.assertEqual(data, self.read_zip(f)) |
- |
- def test_repeatable_content(self): |
- content = [] |
- for _ in range(2): |
- # Build temp dir content from scratch. |
- assert not os.listdir(self.temp_dir) |
- self.stage_files({'a': '123', 'b': '456', 'c': '789'}) |
- # Zip it. |
- pkg = zip_package.ZipPackage(self.temp_dir) |
- pkg.add_directory(self.temp_dir) |
- content.append(pkg.zip_into_buffer()) |
- # Clear everything. |
- for name in os.listdir(self.temp_dir): |
- os.remove(os.path.join(self.temp_dir, name)) |
- # Contents of both runs should match exactly. |
- self.assertEqual(content[0], content[1]) |
- |
- def test_running_from_zip(self): |
- # Test assumes that it runs from a normal checkout, not a zip. |
- self.assertFalse(zip_package.is_zipped_module(sys.modules[__name__])) |
- self.assertIsNone(zip_package.get_module_zip_archive(sys.modules[__name__])) |
- self.assertTrue(os.path.abspath( |
- zip_package.get_main_script_path()).startswith(ROOT_DIR)) |
- |
- # Build executable zip that calls same functions. |
- pkg = zip_package.ZipPackage(self.temp_dir) |
- pkg.add_directory(os.path.join(ROOT_DIR, 'utils'), 'utils') |
- pkg.add_buffer('__main__.py', '\n'.join([ |
- 'import sys', |
- '', |
- 'from utils import zip_package', |
- '', |
- 'print zip_package.is_zipped_module(sys.modules[__name__])', |
- 'print zip_package.get_module_zip_archive(sys.modules[__name__])', |
- 'print zip_package.get_main_script_path()', |
- ])) |
- zip_file = os.path.join(self.temp_dir, 'out.zip') |
- pkg.zip_into_file(zip_file) |
- |
- # Run the zip, validate results. |
- proc = subprocess.Popen( |
- [sys.executable, zip_file], |
- stdout=subprocess.PIPE, |
- stderr=subprocess.PIPE) |
- out, err = proc.communicate() |
- |
- actual = out.strip().splitlines() |
- expected = ['True', zip_file, zip_file,] |
- self.assertEqual(err, '') |
- self.assertEqual(actual, expected) |
- |
- |
-if __name__ == '__main__': |
- VERBOSE = '-v' in sys.argv |
- logging.basicConfig(level=logging.DEBUG if VERBOSE else logging.ERROR) |
- unittest.main() |