OLD | NEW |
---|---|
(Empty) | |
1 #!/usr/bin/env python | |
2 # Copyright (c) 2012 The Chromium Authors. All rights reserved. | |
3 # Use of this source code is governed by a BSD-style license that can be | |
4 # found in the LICENSE file. | |
5 | |
6 """Test gsutil.py.""" | |
7 | |
8 | |
9 import __builtin__ | |
10 import unittest | |
11 import hashlib | |
12 import zipfile | |
13 import shutil | |
14 import sys | |
15 import base64 | |
16 import tempfile | |
17 import json | |
18 import os | |
19 import urllib | |
20 | |
21 | |
22 # Add depot_tools to path | |
23 THIS_DIR = os.path.dirname(os.path.abspath(__file__)) | |
24 DEPOT_TOOLS_DIR = os.path.dirname(THIS_DIR) | |
25 sys.path.append(DEPOT_TOOLS_DIR) | |
26 | |
27 import gsutil | |
28 | |
29 | |
30 class TestError(Exception): | |
31 pass | |
32 | |
33 | |
34 class Buffer(object): | |
35 def __init__(self): | |
36 self.data = '' | |
37 | |
38 def write(self, buf): | |
39 self.data += buf | |
40 | |
41 def read(self, amount=None): | |
42 if not amount: | |
43 amount = len(self.data) | |
44 result = self.data[:amount] | |
45 self.data = self.data[amount:] | |
46 return result | |
47 | |
48 | |
49 class FakeOpen(object): | |
50 def __init__(self): | |
51 self.expectations = [] | |
52 | |
53 def add_expectation(self, filename, data): | |
54 self.expectations.append((filename, data)) | |
55 | |
56 def urlopen(self, url): | |
57 exp_url, exp_data = self.expectations.pop(0) | |
58 if url != exp_url: | |
59 raise TestError('%s incorrect, expected %s' % ( | |
60 url, exp_url)) | |
61 | |
62 buf = Buffer() | |
63 buf.write(exp_data) | |
64 return buf | |
65 | |
66 | |
67 class GsutilUnitTests(unittest.TestCase): | |
68 def setUp(self): | |
69 self.fake_open = FakeOpen() | |
70 self.tempdir = tempfile.mkdtemp() | |
71 self.old_urlopen = getattr(urllib, 'urlopen') | |
72 setattr(urllib, 'urlopen', self.fake_open.urlopen) | |
73 | |
74 def tearDown(self): | |
75 shutil.rmtree(self.tempdir) | |
76 setattr(urllib, 'urlopen', self.old_urlopen) | |
77 | |
78 def test_download_gsutil(self): | |
79 version = '4.2' | |
80 filename = 'gsutil_%s.zip' % version | |
81 full_filename = os.path.join(self.tempdir, filename) | |
82 fake_file = 'This is gsutil.zip' | |
83 fake_file2 = 'This is other gsutil.zip' | |
84 url = '%s%s' % (gsutil.GSUTIL_URL, filename) | |
85 self.fake_open.add_expectation(url, fake_file) | |
86 | |
87 self.assertEquals( | |
88 gsutil.download_gsutil(version, self.tempdir), full_filename) | |
89 with open(full_filename, 'r') as f: | |
90 self.assertEquals(fake_file, f.read()) | |
91 | |
92 metadata_url = gsutil.API_URL + filename | |
93 md5_calc = hashlib.md5() | |
94 md5_calc.update(fake_file) | |
95 b64_md5 = base64.b64encode(md5_calc.hexdigest()) | |
96 self.fake_open.add_expectation(metadata_url, json.dumps({ | |
97 'md5Hash': b64_md5 | |
98 })) | |
99 self.assertEquals( | |
100 gsutil.download_gsutil(version, self.tempdir), full_filename) | |
101 with open(full_filename, 'r') as f: | |
102 self.assertEquals(fake_file, f.read()) | |
103 self.assertEquals(self.fake_open.expectations, []) | |
104 | |
105 self.fake_open.add_expectation(metadata_url, json.dumps({ | |
106 'md5Hash': base64.b64encode('aaaaaaa') # Bad MD5 | |
107 })) | |
108 self.fake_open.add_expectation(url, fake_file2) | |
109 self.assertEquals( | |
110 gsutil.download_gsutil(version, self.tempdir), full_filename) | |
111 with open(full_filename, 'r') as f: | |
112 self.assertEquals(fake_file2, f.read()) | |
113 self.assertEquals(self.fake_open.expectations, []) | |
114 | |
115 def test_ensure_gsutil_full(self): | |
116 version = '4.2' | |
117 gsutil_dir = os.path.join(self.tempdir, 'gsutil_%s' % version, 'gsutil') | |
118 gsutil_bin = os.path.join(gsutil_dir, 'gsutil') | |
119 os.makedirs(gsutil_dir) | |
120 | |
121 # Mock out call(). | |
122 old_call = getattr(gsutil, 'call') | |
dnj
2014/11/20 19:58:27
If this test raises an exception, 'call' will stay
hinoka
2014/11/20 21:20:15
My (and Sergey's) experience with mock libraries a
| |
123 def fake_call(args, **kwargs): | |
124 self.assertEquals(args, [gsutil_bin, 'version']) | |
125 self.assertEquals(kwargs, {'verbose': False}) | |
126 raise gsutil.SubprocessError() | |
127 setattr(gsutil, 'call', fake_call) | |
128 | |
129 with open(gsutil_bin, 'w') as f: | |
130 f.write('Foobar') | |
131 zip_filename = 'gsutil_%s.zip' % version | |
132 url = '%s%s' % (gsutil.GSUTIL_URL, zip_filename) | |
133 _, tempzip = tempfile.mkstemp() | |
134 fake_gsutil = 'Fake gsutil' | |
135 with zipfile.ZipFile(tempzip, 'w') as zf: | |
136 zf.writestr('gsutil/gsutil', fake_gsutil) | |
137 with open(tempzip, 'rb') as f: | |
138 self.fake_open.add_expectation(url, f.read()) | |
139 | |
140 # This should delete the old bin and rewrite it with 'Fake gsutil' | |
141 self.assertRaises( | |
142 gsutil.InvalidGsutilError, gsutil.ensure_gsutil, version, self.tempdir) | |
143 self.assertTrue(os.path.isdir(os.path.join(self.tempdir, '.cache_dir'))) | |
144 self.assertTrue(os.path.exists(gsutil_bin)) | |
145 with open(gsutil_bin, 'r') as f: | |
146 self.assertEquals(f.read(), fake_gsutil) | |
147 | |
148 setattr(gsutil, 'call', old_call) | |
149 | |
150 def test_ensure_gsutil_short(self): | |
151 version = '4.2' | |
152 gsutil_dir = os.path.join(self.tempdir, 'gsutil_%s' % version, 'gsutil') | |
153 gsutil_bin = os.path.join(gsutil_dir, 'gsutil') | |
154 os.makedirs(gsutil_dir) | |
155 | |
156 # Mock out call(). | |
157 old_call = getattr(gsutil, 'call') | |
158 def fake_call(args, **kwargs): | |
159 self.assertEquals(args, [gsutil_bin, 'version']) | |
160 self.assertEquals(kwargs, {'verbose': False}) | |
161 return True | |
162 setattr(gsutil, 'call', fake_call) | |
163 | |
164 with open(gsutil_bin, 'w') as f: | |
165 f.write('Foobar') | |
166 self.assertEquals( | |
167 gsutil.ensure_gsutil(version, self.tempdir), gsutil_bin) | |
168 | |
169 setattr(gsutil, 'call', old_call) | |
170 | |
171 if __name__ == '__main__': | |
172 unittest.main() | |
OLD | NEW |