| Index: swarm_client/tests/zip_package_test.py | 
| =================================================================== | 
| --- swarm_client/tests/zip_package_test.py	(revision 235167) | 
| +++ swarm_client/tests/zip_package_test.py	(working copy) | 
| @@ -1,256 +0,0 @@ | 
| -#!/usr/bin/env python | 
| -# Copyright 2013 The Chromium Authors. All rights reserved. | 
| -# Use of this source code is governed by a BSD-style license that can be | 
| -# found in the LICENSE file. | 
| - | 
| -import cStringIO as StringIO | 
| -import logging | 
| -import os | 
| -import shutil | 
| -import subprocess | 
| -import sys | 
| -import tempfile | 
| -import unittest | 
| -import zipfile | 
| - | 
| -ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) | 
| -sys.path.insert(0, ROOT_DIR) | 
| - | 
| -from utils import zip_package | 
| - | 
| - | 
| -class ZipPackageTest(unittest.TestCase): | 
| -  def setUp(self): | 
| -    super(ZipPackageTest, self).setUp() | 
| -    self.temp_dir = tempfile.mkdtemp(prefix='zip_package_test') | 
| - | 
| -  def tearDown(self): | 
| -    shutil.rmtree(self.temp_dir) | 
| -    super(ZipPackageTest, self).tearDown() | 
| - | 
| -  def stage_files(self, files): | 
| -    """Populates temp directory with given files specified as a list or dict.""" | 
| -    if not isinstance(files, dict): | 
| -      files = dict((p, '') for p in files) | 
| -    for path, content in files.iteritems(): | 
| -      abs_path = os.path.join(self.temp_dir, path.replace('/', os.sep)) | 
| -      dir_path = os.path.dirname(abs_path) | 
| -      if not os.path.exists(dir_path): | 
| -        os.makedirs(dir_path) | 
| -      with open(abs_path, 'wb') as f: | 
| -        f.write(content) | 
| - | 
| -  @staticmethod | 
| -  def read_zip(stream): | 
| -    """Given some stream with zip data, reads and decompresses it into dict.""" | 
| -    zip_file = zipfile.ZipFile(stream, 'r') | 
| -    try: | 
| -      return dict((i.filename, zip_file.read(i)) for i in zip_file.infolist()) | 
| -    finally: | 
| -      zip_file.close() | 
| - | 
| -  def test_require_absolute_root(self): | 
| -    # Absolute path is ok. | 
| -    zip_package.ZipPackage(self.temp_dir) | 
| -    # Relative path is not ok. | 
| -    with self.assertRaises(AssertionError): | 
| -      zip_package.ZipPackage('.') | 
| - | 
| -  def test_require_absolute_file_paths(self): | 
| -    # Add some files to temp_dir. | 
| -    self.stage_files([ | 
| -      'a.txt', | 
| -      'b.py', | 
| -      'c/c.txt', | 
| -    ]) | 
| - | 
| -    # Item to add -> method used to add it. | 
| -    cases = [ | 
| -      ('a.txt', zip_package.ZipPackage.add_file), | 
| -      ('b.py', zip_package.ZipPackage.add_python_file), | 
| -      ('c', zip_package.ZipPackage.add_directory), | 
| -    ] | 
| - | 
| -    for path, method in cases: | 
| -      pkg = zip_package.ZipPackage(self.temp_dir) | 
| -      # Absolute path is ok. | 
| -      method(pkg, os.path.join(self.temp_dir, path)) | 
| -      # Relative path is not ok. | 
| -      with self.assertRaises(AssertionError): | 
| -        method(pkg, path) | 
| - | 
| -  def test_added_files_are_under_root(self): | 
| -    # Add some files to temp_dir. | 
| -    self.stage_files([ | 
| -      'a.txt', | 
| -      'p.py', | 
| -      'pkg/1.txt', | 
| -      'some_dir/2.txt', | 
| -    ]) | 
| - | 
| -    # Adding using |archive_path| should work. | 
| -    pkg = zip_package.ZipPackage(os.path.join(self.temp_dir, 'pkg')) | 
| -    pkg.add_file(os.path.join(self.temp_dir, 'a.txt'), '_a.txt') | 
| -    pkg.add_python_file(os.path.join(self.temp_dir, 'p.py'), '_p.py') | 
| -    pkg.add_directory(os.path.join(self.temp_dir, 'pkg'), '_pkg') | 
| - | 
| -    # Adding without |archive_path| should fail. | 
| -    with self.assertRaises(zip_package.ZipPackageError): | 
| -      pkg.add_file(os.path.join(self.temp_dir, 'a.txt')) | 
| -    with self.assertRaises(zip_package.ZipPackageError): | 
| -      pkg.add_python_file(os.path.join(self.temp_dir, 'p.py')) | 
| -    with self.assertRaises(zip_package.ZipPackageError): | 
| -      pkg.add_directory(os.path.join(self.temp_dir, 'a.txt')) | 
| - | 
| -  def test_adding_missing_files(self): | 
| -    pkg = zip_package.ZipPackage(self.temp_dir) | 
| -    with self.assertRaises(zip_package.ZipPackageError): | 
| -      pkg.add_file(os.path.join(self.temp_dir, 'im_not_here')) | 
| -    with self.assertRaises(zip_package.ZipPackageError): | 
| -      pkg.add_python_file(os.path.join(self.temp_dir, 'im_not_here.py')) | 
| -    with self.assertRaises(zip_package.ZipPackageError): | 
| -      pkg.add_directory(os.path.join(self.temp_dir, 'im_not_here_dir')) | 
| - | 
| -  def test_adding_dir_as_file(self): | 
| -    # Create 'dir'. | 
| -    self.stage_files(['dir/keep']) | 
| -    # Try to add it as file, not a directory. | 
| -    pkg = zip_package.ZipPackage(self.temp_dir) | 
| -    with self.assertRaises(zip_package.ZipPackageError): | 
| -      pkg.add_file(os.path.join(self.temp_dir, 'dir')) | 
| -    # Adding as directory works. | 
| -    pkg.add_directory(os.path.join(self.temp_dir, 'dir')) | 
| - | 
| -  def test_adding_non_python_as_python(self): | 
| -    self.stage_files(['file.sh']) | 
| -    pkg = zip_package.ZipPackage(self.temp_dir) | 
| -    with self.assertRaises(zip_package.ZipPackageError): | 
| -      pkg.add_python_file(os.path.join(self.temp_dir, 'file.sh')) | 
| - | 
| -  def test_adding_py_instead_of_pyc(self): | 
| -    self.stage_files([ | 
| -      'file.py', | 
| -      'file.pyo', | 
| -      'file.pyc', | 
| -    ]) | 
| -    for alternative in ('file.pyc', 'file.pyo'): | 
| -      pkg = zip_package.ZipPackage(self.temp_dir) | 
| -      pkg.add_python_file(os.path.join(self.temp_dir, alternative)) | 
| -      self.assertIn('file.py', pkg.files) | 
| - | 
| -  def test_adding_same_file_twice(self): | 
| -    self.stage_files(['file']) | 
| -    pkg = zip_package.ZipPackage(self.temp_dir) | 
| -    pkg.add_file(os.path.join(self.temp_dir, 'file')) | 
| -    with self.assertRaises(zip_package.ZipPackageError): | 
| -      pkg.add_file(os.path.join(self.temp_dir, 'file')) | 
| - | 
| -  def test_add_directory(self): | 
| -    should_add = [ | 
| -      'script.py', | 
| -      'a/1.txt', | 
| -      'a/2.txt', | 
| -      'a/b/3.txt', | 
| -      'a/script.py', | 
| -    ] | 
| -    should_ignore = [ | 
| -      'script.pyc', | 
| -      'a/script.pyo', | 
| -      '.git/stuff', | 
| -      '.svn/stuff', | 
| -      'a/.svn/stuff', | 
| -      'a/b/.svn/stuff', | 
| -    ] | 
| -    # Add a whole set and verify only files from |should_add| were added. | 
| -    self.stage_files(should_add + should_ignore) | 
| -    pkg = zip_package.ZipPackage(self.temp_dir) | 
| -    pkg.add_directory(self.temp_dir) | 
| -    self.assertEqual(set(pkg.files), set(should_add)) | 
| - | 
| -  def test_archive_path_is_respected(self): | 
| -    self.stage_files(['a', 'b.py', 'dir/c']) | 
| -    pkg = zip_package.ZipPackage(self.temp_dir) | 
| -    pkg.add_file(os.path.join(self.temp_dir, 'a'), 'd1/a') | 
| -    pkg.add_python_file(os.path.join(self.temp_dir, 'b.py'), 'd2/b.py') | 
| -    pkg.add_directory(os.path.join(self.temp_dir, 'dir'), 'd3') | 
| -    self.assertEqual(set(pkg.files), set(['d1/a', 'd2/b.py', 'd3/c'])) | 
| - | 
| -  def test_add_buffer(self): | 
| -    pkg = zip_package.ZipPackage(self.temp_dir) | 
| -    pkg.add_buffer('buf1', '') | 
| -    self.assertEqual(pkg.files, ['buf1']) | 
| -    # No unicode. | 
| -    with self.assertRaises(AssertionError): | 
| -      pkg.add_buffer('buf2', u'unicode') | 
| - | 
| -  def test_zipping(self): | 
| -    data = {'a': '123', 'b/c': '456'} | 
| -    self.stage_files(data) | 
| -    pkg = zip_package.ZipPackage(self.temp_dir) | 
| -    pkg.add_directory(self.temp_dir) | 
| -    # Test zip_into_buffer produces readable zip with same content. | 
| -    for compress in (True, False): | 
| -      buf = pkg.zip_into_buffer(compress=compress) | 
| -      self.assertEqual(data, self.read_zip(StringIO.StringIO(buf))) | 
| -    # Test zip_into_file produces readable zip with same content. | 
| -    for compress in (True, False): | 
| -      path = os.path.join(self.temp_dir, 'pkg.zip') | 
| -      pkg.zip_into_file(path, compress=compress) | 
| -      with open(path, 'rb') as f: | 
| -        self.assertEqual(data, self.read_zip(f)) | 
| - | 
| -  def test_repeatable_content(self): | 
| -    content = [] | 
| -    for _ in range(2): | 
| -      # Build temp dir content from scratch. | 
| -      assert not os.listdir(self.temp_dir) | 
| -      self.stage_files({'a': '123', 'b': '456', 'c': '789'}) | 
| -      # Zip it. | 
| -      pkg = zip_package.ZipPackage(self.temp_dir) | 
| -      pkg.add_directory(self.temp_dir) | 
| -      content.append(pkg.zip_into_buffer()) | 
| -      # Clear everything. | 
| -      for name in os.listdir(self.temp_dir): | 
| -        os.remove(os.path.join(self.temp_dir, name)) | 
| -    # Contents of both runs should match exactly. | 
| -    self.assertEqual(content[0], content[1]) | 
| - | 
| -  def test_running_from_zip(self): | 
| -    # Test assumes that it runs from a normal checkout, not a zip. | 
| -    self.assertFalse(zip_package.is_zipped_module(sys.modules[__name__])) | 
| -    self.assertIsNone(zip_package.get_module_zip_archive(sys.modules[__name__])) | 
| -    self.assertTrue(os.path.abspath( | 
| -        zip_package.get_main_script_path()).startswith(ROOT_DIR)) | 
| - | 
| -    # Build executable zip that calls same functions. | 
| -    pkg = zip_package.ZipPackage(self.temp_dir) | 
| -    pkg.add_directory(os.path.join(ROOT_DIR, 'utils'), 'utils') | 
| -    pkg.add_buffer('__main__.py', '\n'.join([ | 
| -      'import sys', | 
| -      '', | 
| -      'from utils import zip_package', | 
| -      '', | 
| -      'print zip_package.is_zipped_module(sys.modules[__name__])', | 
| -      'print zip_package.get_module_zip_archive(sys.modules[__name__])', | 
| -      'print zip_package.get_main_script_path()', | 
| -    ])) | 
| -    zip_file = os.path.join(self.temp_dir, 'out.zip') | 
| -    pkg.zip_into_file(zip_file) | 
| - | 
| -    # Run the zip, validate results. | 
| -    proc = subprocess.Popen( | 
| -        [sys.executable, zip_file], | 
| -        stdout=subprocess.PIPE, | 
| -        stderr=subprocess.PIPE) | 
| -    out, err = proc.communicate() | 
| - | 
| -    actual = out.strip().splitlines() | 
| -    expected = ['True', zip_file, zip_file,] | 
| -    self.assertEqual(err, '') | 
| -    self.assertEqual(actual, expected) | 
| - | 
| - | 
| -if __name__ == '__main__': | 
| -  VERBOSE = '-v' in sys.argv | 
| -  logging.basicConfig(level=logging.DEBUG if VERBOSE else logging.ERROR) | 
| -  unittest.main() | 
|  |