OLD | NEW |
(Empty) | |
| 1 #!/usr/bin/env python |
| 2 # Copyright (c) 2012 The Chromium Authors. All rights reserved. |
| 3 # Use of this source code is governed by a BSD-style license that can be |
| 4 # found in the LICENSE file. |
| 5 |
| 6 """Test gsutil.py.""" |
| 7 |
| 8 |
| 9 import __builtin__ |
| 10 import unittest |
| 11 import hashlib |
| 12 import zipfile |
| 13 import shutil |
| 14 import sys |
| 15 import base64 |
| 16 import tempfile |
| 17 import json |
| 18 import os |
| 19 import urllib |
| 20 |
| 21 |
| 22 # Add depot_tools to path |
| 23 THIS_DIR = os.path.dirname(os.path.abspath(__file__)) |
| 24 DEPOT_TOOLS_DIR = os.path.dirname(THIS_DIR) |
| 25 sys.path.append(DEPOT_TOOLS_DIR) |
| 26 |
| 27 import gsutil |
| 28 |
| 29 |
| 30 class TestError(Exception): |
| 31 pass |
| 32 |
| 33 |
| 34 class Buffer(object): |
| 35 def __init__(self, data=None): |
| 36 self.data = data or '' |
| 37 |
| 38 def write(self, buf): |
| 39 self.data += buf |
| 40 |
| 41 def read(self, amount=None): |
| 42 if not amount: |
| 43 amount = len(self.data) |
| 44 result = self.data[:amount] |
| 45 self.data = self.data[amount:] |
| 46 return result |
| 47 |
| 48 |
| 49 class FakeCall(object): |
| 50 def __init__(self): |
| 51 self.expectations = [] |
| 52 |
| 53 def add_expectation(self, *args, **kwargs): |
| 54 returns = kwargs.pop('_returns', None) |
| 55 self.expectations.append((args, kwargs, returns)) |
| 56 |
| 57 def __call__(self, *args, **kwargs): |
| 58 if not self.expectations: |
| 59 raise TestError('Got unexpected\n%s\n%s' % (args, kwargs)) |
| 60 exp_args, exp_kwargs, exp_returns = self.expectations.pop(0) |
| 61 if args != exp_args or kwargs != exp_kwargs: |
| 62 message = 'Expected:\n args: %s\n kwargs: %s\n' % (exp_args, exp_kwargs) |
| 63 message += 'Got:\n args: %s\n kwargs: %s\n' % (args, kwargs) |
| 64 raise TestError(message) |
| 65 if isinstance(exp_returns, Exception): |
| 66 raise exp_returns |
| 67 return exp_returns |
| 68 |
| 69 |
| 70 class GsutilUnitTests(unittest.TestCase): |
| 71 def setUp(self): |
| 72 self.fake = FakeCall() |
| 73 self.tempdir = tempfile.mkdtemp() |
| 74 self.old_urlopen = getattr(urllib, 'urlopen') |
| 75 self.old_call = getattr(gsutil, 'call') |
| 76 setattr(urllib, 'urlopen', self.fake) |
| 77 setattr(gsutil, 'call', self.fake) |
| 78 |
| 79 def tearDown(self): |
| 80 self.assertEqual(self.fake.expectations, []) |
| 81 shutil.rmtree(self.tempdir) |
| 82 setattr(urllib, 'urlopen', self.old_urlopen) |
| 83 setattr(gsutil, 'call', self.old_call) |
| 84 |
| 85 def test_download_gsutil(self): |
| 86 version = '4.2' |
| 87 filename = 'gsutil_%s.zip' % version |
| 88 full_filename = os.path.join(self.tempdir, filename) |
| 89 fake_file = 'This is gsutil.zip' |
| 90 fake_file2 = 'This is other gsutil.zip' |
| 91 url = '%s%s' % (gsutil.GSUTIL_URL, filename) |
| 92 self.fake.add_expectation(url, _returns=Buffer(fake_file)) |
| 93 |
| 94 self.assertEquals( |
| 95 gsutil.download_gsutil(version, self.tempdir), full_filename) |
| 96 with open(full_filename, 'r') as f: |
| 97 self.assertEquals(fake_file, f.read()) |
| 98 |
| 99 metadata_url = gsutil.API_URL + filename |
| 100 md5_calc = hashlib.md5() |
| 101 md5_calc.update(fake_file) |
| 102 b64_md5 = base64.b64encode(md5_calc.hexdigest()) |
| 103 self.fake.add_expectation(metadata_url, _returns=Buffer(json.dumps({ |
| 104 'md5Hash': b64_md5 |
| 105 }))) |
| 106 self.assertEquals( |
| 107 gsutil.download_gsutil(version, self.tempdir), full_filename) |
| 108 with open(full_filename, 'r') as f: |
| 109 self.assertEquals(fake_file, f.read()) |
| 110 self.assertEquals(self.fake.expectations, []) |
| 111 |
| 112 self.fake.add_expectation(metadata_url, _returns=Buffer(json.dumps({ |
| 113 'md5Hash': base64.b64encode('aaaaaaa') # Bad MD5 |
| 114 }))) |
| 115 self.fake.add_expectation(url, _returns=Buffer(fake_file2)) |
| 116 self.assertEquals( |
| 117 gsutil.download_gsutil(version, self.tempdir), full_filename) |
| 118 with open(full_filename, 'r') as f: |
| 119 self.assertEquals(fake_file2, f.read()) |
| 120 self.assertEquals(self.fake.expectations, []) |
| 121 |
| 122 def test_ensure_gsutil_full(self): |
| 123 version = '4.2' |
| 124 gsutil_dir = os.path.join(self.tempdir, 'gsutil_%s' % version, 'gsutil') |
| 125 gsutil_bin = os.path.join(gsutil_dir, 'gsutil') |
| 126 os.makedirs(gsutil_dir) |
| 127 |
| 128 self.fake.add_expectation( |
| 129 [sys.executable, gsutil_bin, 'version'], verbose=False, |
| 130 _returns=gsutil.SubprocessError()) |
| 131 |
| 132 with open(gsutil_bin, 'w') as f: |
| 133 f.write('Foobar') |
| 134 zip_filename = 'gsutil_%s.zip' % version |
| 135 url = '%s%s' % (gsutil.GSUTIL_URL, zip_filename) |
| 136 _, tempzip = tempfile.mkstemp() |
| 137 fake_gsutil = 'Fake gsutil' |
| 138 with zipfile.ZipFile(tempzip, 'w') as zf: |
| 139 zf.writestr('gsutil/gsutil', fake_gsutil) |
| 140 with open(tempzip, 'rb') as f: |
| 141 self.fake.add_expectation(url, _returns=Buffer(f.read())) |
| 142 self.fake.add_expectation( |
| 143 [sys.executable, gsutil_bin, 'version'], verbose=False, |
| 144 _returns=gsutil.SubprocessError()) |
| 145 |
| 146 # This should delete the old bin and rewrite it with 'Fake gsutil' |
| 147 self.assertRaises( |
| 148 gsutil.InvalidGsutilError, gsutil.ensure_gsutil, version, self.tempdir) |
| 149 self.assertTrue(os.path.isdir(os.path.join(self.tempdir, '.cache_dir'))) |
| 150 self.assertTrue(os.path.exists(gsutil_bin)) |
| 151 with open(gsutil_bin, 'r') as f: |
| 152 self.assertEquals(f.read(), fake_gsutil) |
| 153 self.assertEquals(self.fake.expectations, []) |
| 154 |
| 155 def test_ensure_gsutil_short(self): |
| 156 version = '4.2' |
| 157 gsutil_dir = os.path.join(self.tempdir, 'gsutil_%s' % version, 'gsutil') |
| 158 gsutil_bin = os.path.join(gsutil_dir, 'gsutil') |
| 159 os.makedirs(gsutil_dir) |
| 160 |
| 161 # Mock out call(). |
| 162 self.fake.add_expectation( |
| 163 [sys.executable, gsutil_bin, 'version'], verbose=False, _returns=True) |
| 164 |
| 165 with open(gsutil_bin, 'w') as f: |
| 166 f.write('Foobar') |
| 167 self.assertEquals( |
| 168 gsutil.ensure_gsutil(version, self.tempdir), gsutil_bin) |
| 169 |
| 170 if __name__ == '__main__': |
| 171 unittest.main() |
OLD | NEW |