OLD | NEW |
---|---|
(Empty) | |
1 #!/usr/bin/env python | |
2 # Copyright (c) 2012 The Chromium Authors. All rights reserved. | |
3 # Use of this source code is governed by a BSD-style license that can be | |
4 # found in the LICENSE file. | |
5 | |
6 """Test gsutil.py.""" | |
7 | |
8 | |
9 import __builtin__ | |
10 import unittest | |
11 import hashlib | |
12 import zipfile | |
13 import shutil | |
14 import sys | |
15 import base64 | |
16 import tempfile | |
17 import json | |
18 import os | |
19 import urllib | |
20 | |
21 | |
22 # Add depot_tools to path | |
23 THIS_DIR = os.path.dirname(os.path.abspath(__file__)) | |
24 DEPOT_TOOLS_DIR = os.path.dirname(THIS_DIR) | |
25 sys.path.append(DEPOT_TOOLS_DIR) | |
26 | |
27 import gsutil | |
28 | |
29 | |
30 class TestError(Exception): | |
31 pass | |
32 | |
33 | |
34 class Buffer(object): | |
35 def __init__(self, data=None): | |
36 data = data or '' | |
37 self.data = data | |
dnj
2014/11/20 23:03:02
Might as well do "self.data = data or ''"
Ryan Tseng
2014/11/21 02:14:55
Done.
| |
38 | |
39 def write(self, buf): | |
40 self.data += buf | |
41 | |
42 def read(self, amount=None): | |
43 if not amount: | |
44 amount = len(self.data) | |
45 result = self.data[:amount] | |
46 self.data = self.data[amount:] | |
47 return result | |
48 | |
49 | |
50 class FakeCall(object): | |
51 def __init__(self): | |
52 self.expectations = [] | |
53 | |
54 def add_expectation(self, *args, **kwargs): | |
55 returns = kwargs.pop('_returns', None) | |
56 self.expectations.append((args, kwargs, returns)) | |
57 | |
58 def __call__(self, *args, **kwargs): | |
59 if not self.expectations: | |
60 raise TestError('Got unexpected\n%s\n%s' % (args, kwargs)) | |
61 exp_args, exp_kwargs, exp_returns = self.expectations.pop(0) | |
62 if args != exp_args or kwargs != exp_kwargs: | |
63 message = 'Expected:\n args: %s\n kwargs: %s\n' % (exp_args, exp_kwargs) | |
64 message += 'Got:\n args: %s\n kwargs: %s\n' % (args, kwargs) | |
65 raise TestError(message) | |
66 if isinstance(exp_returns, Exception): | |
67 raise exp_returns | |
68 return exp_returns | |
69 | |
70 | |
71 class GsutilUnitTests(unittest.TestCase): | |
72 def setUp(self): | |
73 self.fake = FakeCall() | |
74 self.tempdir = tempfile.mkdtemp() | |
75 self.old_urlopen = getattr(urllib, 'urlopen') | |
76 self.old_call = getattr(gsutil, 'call') | |
77 setattr(urllib, 'urlopen', self.fake) | |
78 setattr(gsutil, 'call', self.fake) | |
79 | |
80 def tearDown(self): | |
81 self.assertEqual(self.fake.expectations, []) | |
82 shutil.rmtree(self.tempdir) | |
83 setattr(urllib, 'urlopen', self.old_urlopen) | |
84 setattr(gsutil, 'call', self.old_call) | |
85 | |
86 def test_download_gsutil(self): | |
87 version = '4.2' | |
88 filename = 'gsutil_%s.zip' % version | |
89 full_filename = os.path.join(self.tempdir, filename) | |
90 fake_file = 'This is gsutil.zip' | |
91 fake_file2 = 'This is other gsutil.zip' | |
92 url = '%s%s' % (gsutil.GSUTIL_URL, filename) | |
93 self.fake.add_expectation(url, _returns=Buffer(fake_file)) | |
94 | |
95 self.assertEquals( | |
96 gsutil.download_gsutil(version, self.tempdir), full_filename) | |
97 with open(full_filename, 'r') as f: | |
98 self.assertEquals(fake_file, f.read()) | |
99 | |
100 metadata_url = gsutil.API_URL + filename | |
101 md5_calc = hashlib.md5() | |
102 md5_calc.update(fake_file) | |
103 b64_md5 = base64.b64encode(md5_calc.hexdigest()) | |
104 self.fake.add_expectation(metadata_url, _returns=Buffer(json.dumps({ | |
105 'md5Hash': b64_md5 | |
106 }))) | |
107 self.assertEquals( | |
108 gsutil.download_gsutil(version, self.tempdir), full_filename) | |
109 with open(full_filename, 'r') as f: | |
110 self.assertEquals(fake_file, f.read()) | |
111 self.assertEquals(self.fake.expectations, []) | |
112 | |
113 self.fake.add_expectation(metadata_url, _returns=Buffer(json.dumps({ | |
114 'md5Hash': base64.b64encode('aaaaaaa') # Bad MD5 | |
115 }))) | |
116 self.fake.add_expectation(url, _returns=Buffer(fake_file2)) | |
117 self.assertEquals( | |
118 gsutil.download_gsutil(version, self.tempdir), full_filename) | |
119 with open(full_filename, 'r') as f: | |
120 self.assertEquals(fake_file2, f.read()) | |
121 self.assertEquals(self.fake.expectations, []) | |
122 | |
123 def test_ensure_gsutil_full(self): | |
124 version = '4.2' | |
125 gsutil_dir = os.path.join(self.tempdir, 'gsutil_%s' % version, 'gsutil') | |
126 gsutil_bin = os.path.join(gsutil_dir, 'gsutil') | |
127 os.makedirs(gsutil_dir) | |
128 | |
129 self.fake.add_expectation( | |
130 [gsutil_bin, 'version'], verbose=False, | |
131 _returns=gsutil.SubprocessError()) | |
132 | |
133 with open(gsutil_bin, 'w') as f: | |
134 f.write('Foobar') | |
135 zip_filename = 'gsutil_%s.zip' % version | |
136 url = '%s%s' % (gsutil.GSUTIL_URL, zip_filename) | |
137 _, tempzip = tempfile.mkstemp() | |
138 fake_gsutil = 'Fake gsutil' | |
139 with zipfile.ZipFile(tempzip, 'w') as zf: | |
140 zf.writestr('gsutil/gsutil', fake_gsutil) | |
141 with open(tempzip, 'rb') as f: | |
142 self.fake.add_expectation(url, _returns=Buffer(f.read())) | |
143 self.fake.add_expectation( | |
144 [gsutil_bin, 'version'], verbose=False, | |
145 _returns=gsutil.SubprocessError()) | |
146 | |
147 # This should delete the old bin and rewrite it with 'Fake gsutil' | |
148 self.assertRaises( | |
149 gsutil.InvalidGsutilError, gsutil.ensure_gsutil, version, self.tempdir) | |
150 self.assertTrue(os.path.isdir(os.path.join(self.tempdir, '.cache_dir'))) | |
151 self.assertTrue(os.path.exists(gsutil_bin)) | |
152 with open(gsutil_bin, 'r') as f: | |
153 self.assertEquals(f.read(), fake_gsutil) | |
154 self.assertEquals(self.fake.expectations, []) | |
155 | |
156 def test_ensure_gsutil_short(self): | |
157 version = '4.2' | |
158 gsutil_dir = os.path.join(self.tempdir, 'gsutil_%s' % version, 'gsutil') | |
159 gsutil_bin = os.path.join(gsutil_dir, 'gsutil') | |
160 os.makedirs(gsutil_dir) | |
161 | |
162 # Mock out call(). | |
163 self.fake.add_expectation( | |
164 [gsutil_bin, 'version'], verbose=False, _returns=True) | |
165 | |
166 with open(gsutil_bin, 'w') as f: | |
167 f.write('Foobar') | |
168 self.assertEquals( | |
169 gsutil.ensure_gsutil(version, self.tempdir), gsutil_bin) | |
170 | |
171 if __name__ == '__main__': | |
172 unittest.main() | |
OLD | NEW |