OLD | NEW |
---|---|
(Empty) | |
1 #!/usr/bin/env python | |
2 # Copyright (c) 2012 The Chromium Authors. All rights reserved. | |
3 # Use of this source code is governed by a BSD-style license that can be | |
4 # found in the LICENSE file. | |
5 | |
6 """Unit tests for download_from_google_storage.py.""" | |
7 | |
8 import os | |
9 import sys | |
10 import unittest | |
11 import threading | |
12 import Queue | |
13 import optparse | |
14 | |
15 sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) | |
16 | |
17 import upload_to_google_storage | |
18 import download_from_google_storage | |
19 | |
20 # ../third_party/gsutil/gsutil | |
21 GSUTIL_DEFAULT_PATH = os.path.join( | |
22 os.path.dirname(os.path.dirname(os.path.abspath(__file__))), | |
23 'third_party', 'gsutil', 'gsutil') | |
24 | |
25 | |
26 class GsutilMock(object): | |
27 def __init__(self, path, boto_path=None, timeout=None): | |
28 self.path = path | |
29 self.timeout = timeout | |
30 self.boto_path = boto_path | |
31 self.expected = [] | |
32 self.history = [] | |
33 self.lock = threading.Lock() | |
34 | |
35 def add_expected(self, return_code, out, err): | |
36 self.expected.append((return_code, out, err)) | |
37 | |
38 def append_history(self, method, args): | |
39 with self.lock: | |
40 self.history.append((method, args)) | |
41 | |
42 def call(self, *args): | |
43 self.append_history('call', args) | |
44 if self.expected: | |
45 return self.expected.pop(0)[0] | |
M-A Ruel
2013/03/09 12:41:13
You need a lock for all the function; you could ge
Ryan Tseng
2013/03/11 17:35:14
Done, moved lock to call() function
| |
46 else: | |
47 return 0 | |
48 | |
49 def check_call(self, *args): | |
50 self.append_history('check_call', args) | |
51 if self.expected: | |
52 return self.expected.pop(0) | |
53 else: | |
54 return (0, '', '') | |
55 | |
56 def clone(self): | |
57 return self | |
58 | |
59 | |
60 class GstoolsUnitTests(unittest.TestCase): | |
61 def setUp(self): | |
62 self.base_path = os.path.join( | |
63 os.path.dirname(os.path.abspath(__file__)), 'gstools') | |
64 | |
65 def test_gsutil(self): | |
66 gsutil = download_from_google_storage.Gsutil(GSUTIL_DEFAULT_PATH) | |
67 self.assertEquals(gsutil.path, GSUTIL_DEFAULT_PATH) | |
68 code, _, err = gsutil.check_call() | |
69 self.assertEquals(code, 0) | |
70 self.assertEquals(err, '') | |
71 | |
72 def test_gsutil_version(self): | |
73 gsutil = download_from_google_storage.Gsutil(GSUTIL_DEFAULT_PATH) | |
74 _, _, err = gsutil.check_call('version') | |
75 err_lines = err.splitlines() | |
76 self.assertEquals(err_lines[0], 'gsutil version 3.25') | |
77 self.assertEquals( | |
78 err_lines[1], | |
79 'checksum ce71ac982f1148315e7fa65cff2f83e8 (OK)') | |
80 | |
81 def test_get_sha1(self): | |
82 lorem_ipsum = os.path.join(self.base_path, 'lorem_ipsum.txt') | |
83 self.assertEquals( | |
84 download_from_google_storage.get_sha1(lorem_ipsum), | |
85 '7871c8e24da15bad8b0be2c36edc9dc77e37727f') | |
86 | |
87 def test_get_md5(self): | |
88 lorem_ipsum = os.path.join(self.base_path, 'lorem_ipsum.txt') | |
89 self.assertEquals( | |
90 upload_to_google_storage.get_md5(lorem_ipsum), | |
91 '634d7c1ed3545383837428f031840a1e') | |
92 | |
93 def test_get_md5_cached_read(self): | |
94 lorem_ipsum = os.path.join(self.base_path, 'lorem_ipsum.txt') | |
95 # Use a fake 'stale' MD5 sum. Expected behavior is to return stale sum. | |
96 self.assertEquals( | |
97 upload_to_google_storage.get_md5_cached(lorem_ipsum), | |
98 '734d7c1ed3545383837428f031840a1e') | |
99 | |
100 def test_get_md5_cached_write(self): | |
101 lorem_ipsum2 = os.path.join(self.base_path, 'lorem_ipsum2.txt') | |
102 lorem_ipsum2_md5 = os.path.join(self.base_path, 'lorem_ipsum2.txt.md5') | |
103 if os.path.exists(lorem_ipsum2_md5): | |
104 os.remove(lorem_ipsum2_md5) | |
105 # Use a fake 'stale' MD5 sum. Expected behavior is to return stale sum. | |
106 self.assertEquals( | |
107 upload_to_google_storage.get_md5_cached(lorem_ipsum2), | |
108 '4c02d1eb455a0f22c575265d17b84b6d') | |
109 self.assertTrue(os.path.exists(lorem_ipsum2_md5)) | |
110 self.assertEquals( | |
111 open(lorem_ipsum2_md5, 'rb').read(), | |
112 '4c02d1eb455a0f22c575265d17b84b6d') | |
113 os.remove(lorem_ipsum2_md5) # Clean up. | |
114 self.assertFalse(os.path.exists(lorem_ipsum2_md5)) | |
115 | |
116 | |
117 class DownloadTests(unittest.TestCase): | |
118 def setUp(self): | |
119 self.gsutil = GsutilMock(GSUTIL_DEFAULT_PATH) | |
120 self.base_path = os.path.join( | |
121 os.path.dirname(os.path.abspath(__file__)), | |
122 'gstools', | |
123 'download_test_data') | |
124 self.base_url = 'gs://sometesturl' | |
125 self.parser = optparse.OptionParser() | |
126 self.queue = Queue.Queue() | |
127 self.ret_codes = Queue.Queue() | |
128 self.lorem_ipsum = os.path.join(self.base_path, 'lorem_ipsum.txt') | |
129 self.lorem_ipsum_sha1 = '7871c8e24da15bad8b0be2c36edc9dc77e37727f' | |
130 self.maxDiff = None | |
131 | |
132 def test_enumerate_files_non_recursive(self): | |
133 queue_size = download_from_google_storage.enumerate_work_queue( | |
134 self.base_path, self.queue, True, False, False, None, False) | |
135 result = list(self.queue.queue) | |
136 expected_queue = [ | |
137 ('e6c4fbd4fe7607f3e6ebf68b2ea4ef694da7b4fe', | |
138 os.path.join(self.base_path, 'rootfolder_text.txt')), | |
139 ('7871c8e24da15bad8b0be2c36edc9dc77e37727f', | |
140 os.path.join(self.base_path, 'uploaded_lorem_ipsum.txt'))] | |
141 for item in result: | |
142 self.assertTrue(item in expected_queue) | |
143 self.assertEquals(queue_size, 2) | |
144 | |
145 def test_enumerate_files_recursive(self): | |
146 queue_size = download_from_google_storage.enumerate_work_queue( | |
147 self.base_path, self.queue, True, True, False, None, False) | |
148 expected_queue = [ | |
149 ('e6c4fbd4fe7607f3e6ebf68b2ea4ef694da7b4fe', | |
150 os.path.join(self.base_path, 'rootfolder_text.txt')), | |
151 ('7871c8e24da15bad8b0be2c36edc9dc77e37727f', | |
152 os.path.join(self.base_path, 'uploaded_lorem_ipsum.txt')), | |
153 ('b5415aa0b64006a95c0c409182e628881d6d6463', | |
154 os.path.join(self.base_path, 'subfolder', 'subfolder_text.txt'))] | |
155 result = list(self.queue.queue) | |
156 for item in result: | |
157 self.assertTrue(item in expected_queue) | |
158 self.assertEquals(queue_size, 3) | |
159 | |
160 def test_download_worker_single_file(self): | |
161 sha1_hash = '7871c8e24da15bad8b0be2c36edc9dc77e37727f' | |
162 input_filename = '%s/%s' % (self.base_url, sha1_hash) | |
163 output_filename = os.path.join(self.base_path, 'uploaded_lorem_ipsum.txt') | |
164 self.queue.put((sha1_hash, output_filename)) | |
165 self.queue.put((None, None)) | |
166 stdout_queue = Queue.Queue() | |
167 # pylint: disable=W0212 | |
168 download_from_google_storage._downloader_worker_thread( | |
169 0, self.queue, False, self.base_url, self.gsutil, | |
170 stdout_queue, self.ret_codes) | |
171 expected_calls = [ | |
172 ('check_call', | |
173 ('ls', input_filename)), | |
174 ('check_call', | |
175 ('cp', '-q', input_filename, output_filename))] | |
176 expected_output = [ | |
177 '0> Downloading %s...' % output_filename] | |
178 self.assertEquals(list(stdout_queue.queue), expected_output) | |
179 self.assertEquals(self.gsutil.history, expected_calls) | |
180 | |
181 def test_download_worker_skips_file(self): | |
182 sha1_hash = 'e6c4fbd4fe7607f3e6ebf68b2ea4ef694da7b4fe' | |
183 output_filename = os.path.join(self.base_path, 'rootfolder_text.txt') | |
184 self.queue.put((sha1_hash, output_filename)) | |
185 self.queue.put((None, None)) | |
186 stdout_queue = Queue.Queue() | |
187 # pylint: disable=W0212 | |
188 download_from_google_storage._downloader_worker_thread( | |
189 0, self.queue, False, self.base_url, self.gsutil, | |
190 stdout_queue, self.ret_codes) | |
191 expected_output = [ | |
192 '0> File %s exists and SHA1 matches. Skipping.' % output_filename | |
193 ] | |
194 self.assertEquals(list(stdout_queue.queue), expected_output) | |
195 self.assertEquals(self.gsutil.history, []) | |
196 | |
197 def test_download_worker_skips_not_found_file(self): | |
198 sha1_hash = '7871c8e24da15bad8b0be2c36edc9dc77e37727f' | |
199 input_filename = '%s/%s' % (self.base_url, sha1_hash) | |
200 output_filename = os.path.join(self.base_path, 'uploaded_lorem_ipsum.txt') | |
201 self.queue.put((sha1_hash, output_filename)) | |
202 self.queue.put((None, None)) | |
203 stdout_queue = Queue.Queue() | |
204 self.gsutil.add_expected(1, '', '') # Return error when 'ls' is called. | |
205 # pylint: disable=W0212 | |
206 download_from_google_storage._downloader_worker_thread( | |
207 0, self.queue, False, self.base_url, self.gsutil, | |
208 stdout_queue, self.ret_codes) | |
209 expected_output = [ | |
210 '0> File %s for %s does not exist, skipping.' % ( | |
211 input_filename, output_filename), | |
212 ] | |
213 expected_calls = [ | |
214 ('check_call', | |
215 ('ls', input_filename)) | |
216 ] | |
217 expected_ret_codes = [ | |
218 (1, 'File %s for %s does not exist.' % ( | |
219 input_filename, output_filename)) | |
220 ] | |
221 self.assertEquals(list(stdout_queue.queue), expected_output) | |
222 self.assertEquals(self.gsutil.history, expected_calls) | |
223 self.assertEquals(list(self.ret_codes.queue), expected_ret_codes) | |
224 | |
225 def test_download_cp_fails(self): | |
226 sha1_hash = '7871c8e24da15bad8b0be2c36edc9dc77e37727f' | |
227 input_filename = '%s/%s' % (self.base_url, sha1_hash) | |
228 output_filename = os.path.join(self.base_path, 'uploaded_lorem_ipsum.txt') | |
229 self.gsutil.add_expected(0, '', '') | |
230 self.gsutil.add_expected(101, '', 'Test error message.') | |
231 # pylint: disable=W0212 | |
232 code = download_from_google_storage.download_from_google_storage( | |
233 input_filename=sha1_hash, | |
234 base_url=self.base_url, | |
235 gsutil=self.gsutil, | |
236 num_threads=1, | |
237 directory=False, | |
238 recursive=False, | |
239 force=True, | |
240 output=output_filename, | |
241 ignore_errors=False, | |
242 sha1_file=False) | |
243 expected_calls = [ | |
244 ('check_call', | |
245 ('ls', input_filename)), | |
246 ('check_call', | |
247 ('cp', '-q', input_filename, output_filename)) | |
248 ] | |
249 self.assertEquals(self.gsutil.history, expected_calls) | |
250 self.assertEquals(code, 101) | |
251 | |
252 def test_download_directory_no_recursive_non_force(self): | |
253 sha1_hash = '7871c8e24da15bad8b0be2c36edc9dc77e37727f' | |
254 input_filename = '%s/%s' % (self.base_url, sha1_hash) | |
255 output_filename = os.path.join(self.base_path, 'uploaded_lorem_ipsum.txt') | |
256 code = download_from_google_storage.download_from_google_storage( | |
257 input_filename=self.base_path, | |
258 base_url=self.base_url, | |
259 gsutil=self.gsutil, | |
260 num_threads=1, | |
261 directory=True, | |
262 recursive=False, | |
263 force=False, | |
264 output=None, | |
265 ignore_errors=False, | |
266 sha1_file=False) | |
267 expected_calls = [ | |
268 ('check_call', | |
269 ('ls', input_filename)), | |
270 ('check_call', | |
271 ('cp', '-q', input_filename, output_filename))] | |
272 self.assertEquals(self.gsutil.history, expected_calls) | |
273 self.assertEquals(code, 0) | |
274 | |
275 if __name__ == '__main__': | |
276 unittest.main() | |
OLD | NEW |