Index: tests/gstools_unittest.py |
diff --git a/tests/gstools_unittest.py b/tests/gstools_unittest.py |
new file mode 100755 |
index 0000000000000000000000000000000000000000..e35b0bff52e1fc3a81489b57331730d13063b5f7 |
--- /dev/null |
+++ b/tests/gstools_unittest.py |
@@ -0,0 +1,412 @@ |
+#!/usr/bin/env python |
+# Copyright (c) 2012 The Chromium Authors. All rights reserved. |
+# Use of this source code is governed by a BSD-style license that can be |
+# found in the LICENSE file. |
+ |
+"""Unit tests for download_to/upload_from_google_storage.py.""" |
+ |
+import os |
+import sys |
+import unittest |
+import threading |
+import StringIO |
+import Queue |
+import optparse |
+ |
+sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) |
+ |
+import upload_to_google_storage |
+import download_from_google_storage |
+ |
+# ../third_party/gsutil/gsutil |
+GSUTIL_DEFAULT_PATH = os.path.join( |
+ os.path.dirname(os.path.dirname(os.path.abspath(__file__))), |
+ 'third_party', 'gsutil', 'gsutil') |
+ |
+ |
+class GsutilMock(object): |
+ def __init__(self, path, boto_path=None, timeout=None): |
+ self.path = path |
+ self.timeout = timeout |
+ self.boto_path = boto_path |
+ self.expected = [] |
+ self.history = [] |
+ self.lock = threading.Lock() |
+ |
+ def add_expected(self, return_code, out, err): |
+ self.expected.append((return_code, out, err)) |
+ |
+ def append_history(self, method, args): |
+ with self.lock: |
+ self.history.append((method, args)) |
+ |
+ def call(self, *args): |
+ self.append_history('call', args) |
+ if self.expected: |
+ return self.expected.pop(0)[0] |
+ else: |
+ return 0 |
+ |
+ def check_call(self, *args): |
+ self.append_history('check_call', args) |
+ if self.expected: |
+ return self.expected.pop(0) |
+ else: |
+ return (0, '', '') |
+ |
+ def clone(self): |
+ return self |
+ |
+ |
+class GstoolsUnitTests(unittest.TestCase): |
+ def setUp(self): |
+ self.base_path = os.path.join( |
+ os.path.dirname(os.path.abspath(__file__)), 'gstools') |
+ |
+ def test_gsutil(self): |
+ gsutil = download_from_google_storage.Gsutil(GSUTIL_DEFAULT_PATH) |
+ self.assertEquals(gsutil.path, GSUTIL_DEFAULT_PATH) |
+ code, _, err = gsutil.check_call() |
+ self.assertEquals(code, 0) |
+ self.assertEquals(err, '') |
+ |
+ def test_gsutil_version(self): |
+ gsutil = download_from_google_storage.Gsutil(GSUTIL_DEFAULT_PATH) |
+ _, _, err = gsutil.check_call('version') |
+ err_lines = err.splitlines() |
+ self.assertEquals(err_lines[0], 'gsutil version 3.25') |
+ self.assertEquals( |
+ err_lines[1], |
+ 'checksum ce71ac982f1148315e7fa65cff2f83e8 (OK)') |
+ |
+ def test_get_sha1(self): |
+ lorem_ipsum = os.path.join(self.base_path, 'lorem_ipsum.txt') |
+ self.assertEquals( |
+ download_from_google_storage.get_sha1(lorem_ipsum), |
+ '7871c8e24da15bad8b0be2c36edc9dc77e37727f') |
+ |
+ def test_get_md5(self): |
+ lorem_ipsum = os.path.join(self.base_path, 'lorem_ipsum.txt') |
+ self.assertEquals( |
+ upload_to_google_storage.get_md5(lorem_ipsum), |
+ '634d7c1ed3545383837428f031840a1e') |
+ |
+ def test_get_md5_cached_read(self): |
+ lorem_ipsum = os.path.join(self.base_path, 'lorem_ipsum.txt') |
+ # Use a fake 'stale' MD5 sum. Expected behavior is to return stale sum. |
+ self.assertEquals( |
+ upload_to_google_storage.get_md5_cached(lorem_ipsum), |
+ '734d7c1ed3545383837428f031840a1e') |
+ |
+ def test_get_md5_cached_write(self): |
+ lorem_ipsum2 = os.path.join(self.base_path, 'lorem_ipsum2.txt') |
+ lorem_ipsum2_md5 = os.path.join(self.base_path, 'lorem_ipsum2.txt.md5') |
+ if os.path.exists(lorem_ipsum2_md5): |
+ os.remove(lorem_ipsum2_md5) |
+ # Use a fake 'stale' MD5 sum. Expected behavior is to return stale sum. |
+ self.assertEquals( |
+ upload_to_google_storage.get_md5_cached(lorem_ipsum2), |
+ '4c02d1eb455a0f22c575265d17b84b6d') |
+ self.assertTrue(os.path.exists(lorem_ipsum2_md5)) |
+ self.assertEquals( |
+ open(lorem_ipsum2_md5, 'rb').read(), |
+ '4c02d1eb455a0f22c575265d17b84b6d') |
+ os.remove(lorem_ipsum2_md5) # Clean up. |
+ self.assertFalse(os.path.exists(lorem_ipsum2_md5)) |
+ |
+ |
+class UploadTests(unittest.TestCase): |
+ def setUp(self): |
+ self.gsutil = GsutilMock(GSUTIL_DEFAULT_PATH) |
+ self.base_path = os.path.join( |
+ os.path.dirname(os.path.abspath(__file__)), 'gstools') |
+ self.base_url = 'gs://sometesturl' |
+ self.parser = optparse.OptionParser() |
+ self.ret_codes = Queue.Queue() |
+ self.stdout_queue = Queue.Queue() |
+ self.lorem_ipsum = os.path.join(self.base_path, 'lorem_ipsum.txt') |
+ self.lorem_ipsum_sha1 = '7871c8e24da15bad8b0be2c36edc9dc77e37727f' |
+ |
+ def test_upload_single_file(self): |
+ filenames = [self.lorem_ipsum] |
+ output_filename = '%s.sha1' % self.lorem_ipsum |
+ if os.path.exists(output_filename): |
+ os.remove(output_filename) |
+ upload_to_google_storage.upload_to_google_storage( |
+ filenames, self.base_url, self.gsutil, True, False, 1, False) |
+ self.assertEquals( |
+ self.gsutil.history, |
+ [('check_call', |
+ ('ls', '%s/%s' % (self.base_url, self.lorem_ipsum_sha1))), |
+ ('check_call', |
+ ('cp', '-q', filenames[0], '%s/%s' % (self.base_url, |
+ self.lorem_ipsum_sha1)))]) |
+ self.assertTrue(os.path.exists(output_filename)) |
+ self.assertEquals( |
+ open(output_filename, 'rb').read(), |
+ '7871c8e24da15bad8b0be2c36edc9dc77e37727f') |
+ os.remove(output_filename) |
+ |
+ def test_upload_single_file_remote_exists(self): |
+ filenames = [self.lorem_ipsum] |
+ output_filename = '%s.sha1' % self.lorem_ipsum |
+ etag_string = 'ETag: 634d7c1ed3545383837428f031840a1e' |
+ if os.path.exists(output_filename): |
+ os.remove(output_filename) |
+ self.gsutil.add_expected(0, '', '') |
+ self.gsutil.add_expected(0, etag_string, '') |
+ upload_to_google_storage.upload_to_google_storage( |
+ filenames, self.base_url, self.gsutil, False, False, 1, False) |
+ self.assertEquals( |
+ self.gsutil.history, |
+ [('check_call', |
+ ('ls', '%s/%s' % (self.base_url, self.lorem_ipsum_sha1))), |
+ ('check_call', |
+ ('ls', '-L', '%s/%s' % (self.base_url, self.lorem_ipsum_sha1)))]) |
+ self.assertTrue(os.path.exists(output_filename)) |
+ self.assertEquals( |
+ open(output_filename, 'rb').read(), |
+ '7871c8e24da15bad8b0be2c36edc9dc77e37727f') |
+ os.remove(output_filename) |
+ |
+ def test_upload_worker_errors(self): |
+ work_queue = Queue.Queue() |
+ work_queue.put((self.lorem_ipsum, self.lorem_ipsum_sha1)) |
+ work_queue.put((None, None)) |
+ self.gsutil.add_expected(1, '', '') # For the first ls call. |
+ self.gsutil.add_expected(20, '', 'Expected error message') |
+ # pylint: disable=W0212 |
+ upload_to_google_storage._upload_worker( |
+ 0, |
+ work_queue, |
+ self.base_url, |
+ self.gsutil, |
+ threading.Lock(), |
+ False, |
+ False, |
+ self.stdout_queue, |
+ self.ret_codes) |
+ expected_ret_codes = [ |
+ (20, |
+ 'Encountered error on uploading %s to %s/%s\nExpected error message' % |
+ (self.lorem_ipsum, self.base_url, self.lorem_ipsum_sha1))] |
+ self.assertEquals(list(self.ret_codes.queue), expected_ret_codes) |
+ |
+ |
+ def test_skip_hashing(self): |
+ filenames = [self.lorem_ipsum] |
+ output_filename = '%s.sha1' % self.lorem_ipsum |
+ fake_hash = '6871c8e24da15bad8b0be2c36edc9dc77e37727f' |
+ with open(output_filename, 'wb') as f: |
+ f.write(fake_hash) # Fake hash. |
+ upload_to_google_storage.upload_to_google_storage( |
+ filenames, self.base_url, self.gsutil, False, False, 1, True) |
+ self.assertEquals( |
+ self.gsutil.history, |
+ [('check_call', |
+ ('ls', '%s/%s' % (self.base_url, fake_hash))), |
+ ('check_call', |
+ ('ls', '-L', '%s/%s' % (self.base_url, fake_hash))), |
+ ('check_call', |
+ ('cp', '-q', filenames[0], '%s/%s' % (self.base_url, fake_hash)))]) |
+ self.assertEquals( |
+ open(output_filename, 'rb').read(), fake_hash) |
+ os.remove(output_filename) |
+ |
+ def test_get_targets_no_args(self): |
+ try: |
+ upload_to_google_storage.get_targets([], self.parser, False) |
+ except SystemExit, e: |
+ self.assertEquals(type(e), type(SystemExit())) |
+ self.assertEquals(e.code, 2) |
+ except Exception, e: |
+ self.fail('unexpected exception: %s' % e) |
+ else: |
+ self.fail('SystemExit exception expected') |
+ |
+ def test_get_targets_passthrough(self): |
+ result = upload_to_google_storage.get_targets( |
+ ['a', 'b', 'c', 'd', 'e'], |
+ self.parser, |
+ False) |
+ self.assertEquals(result, ['a', 'b', 'c', 'd', 'e']) |
+ |
+ def test_get_targets_multiple_stdin(self): |
+ inputs = ['a', 'b', 'c', 'd', 'e'] |
+ sys.stdin = StringIO.StringIO(os.linesep.join(inputs)) |
+ result = upload_to_google_storage.get_targets( |
+ ['-'], |
+ self.parser, |
+ False) |
+ self.assertEquals(result, inputs) |
+ |
+ def test_get_targets_multiple_stdin_null(self): |
+ inputs = ['a', 'b', 'c', 'd', 'e'] |
+ sys.stdin = StringIO.StringIO('\0'.join(inputs)) |
+ result = upload_to_google_storage.get_targets( |
+ ['-'], |
+ self.parser, |
+ True) |
+ self.assertEquals(result, inputs) |
+ |
+ |
+class DownloadTests(unittest.TestCase): |
+ def setUp(self): |
+ self.gsutil = GsutilMock(GSUTIL_DEFAULT_PATH) |
+ self.base_path = os.path.join( |
+ os.path.dirname(os.path.abspath(__file__)), |
+ 'gstools', |
+ 'download_test_data') |
+ self.base_url = 'gs://sometesturl' |
+ self.parser = optparse.OptionParser() |
+ self.queue = Queue.Queue() |
+ self.ret_codes = Queue.Queue() |
+ self.lorem_ipsum = os.path.join(self.base_path, 'lorem_ipsum.txt') |
+ self.lorem_ipsum_sha1 = '7871c8e24da15bad8b0be2c36edc9dc77e37727f' |
+ self.maxDiff = None |
+ |
+ def test_enumerate_files_non_recursive(self): |
+ queue_size = download_from_google_storage.enumerate_work_queue( |
+ self.base_path, self.queue, True, False, False, None, False) |
+ result = list(self.queue.queue) |
+ expected_queue = [ |
+ ('e6c4fbd4fe7607f3e6ebf68b2ea4ef694da7b4fe', |
+ os.path.join(self.base_path, 'rootfolder_text.txt')), |
+ ('7871c8e24da15bad8b0be2c36edc9dc77e37727f', |
+ os.path.join(self.base_path, 'uploaded_lorem_ipsum.txt'))] |
+ for item in result: |
+ self.assertTrue(item in expected_queue) |
+ self.assertEquals(queue_size, 2) |
+ |
+ def test_enumerate_files_recursive(self): |
+ queue_size = download_from_google_storage.enumerate_work_queue( |
+ self.base_path, self.queue, True, True, False, None, False) |
+ expected_queue = [ |
+ ('e6c4fbd4fe7607f3e6ebf68b2ea4ef694da7b4fe', |
+ os.path.join(self.base_path, 'rootfolder_text.txt')), |
+ ('7871c8e24da15bad8b0be2c36edc9dc77e37727f', |
+ os.path.join(self.base_path, 'uploaded_lorem_ipsum.txt')), |
+ ('b5415aa0b64006a95c0c409182e628881d6d6463', |
+ os.path.join(self.base_path, 'subfolder', 'subfolder_text.txt'))] |
+ result = list(self.queue.queue) |
+ for item in result: |
+ self.assertTrue(item in expected_queue) |
+ self.assertEquals(queue_size, 3) |
+ |
+ def test_download_worker_single_file(self): |
+ sha1_hash = '7871c8e24da15bad8b0be2c36edc9dc77e37727f' |
+ input_filename = '%s/%s' % (self.base_url, sha1_hash) |
+ output_filename = os.path.join(self.base_path, 'uploaded_lorem_ipsum.txt') |
+ self.queue.put((sha1_hash, output_filename)) |
+ self.queue.put((None, None)) |
+ stdout_queue = Queue.Queue() |
+ # pylint: disable=W0212 |
+ download_from_google_storage._downloader_worker_thread( |
+ 0, self.queue, False, self.base_url, self.gsutil, |
+ stdout_queue, self.ret_codes) |
+ expected_calls = [ |
+ ('check_call', |
+ ('ls', input_filename)), |
+ ('check_call', |
+ ('cp', '-q', input_filename, output_filename))] |
+ expected_output = [ |
+ '0> Downloading %s...' % output_filename] |
+ self.assertEquals(list(stdout_queue.queue), expected_output) |
+ self.assertEquals(self.gsutil.history, expected_calls) |
+ |
+ def test_download_worker_skips_file(self): |
+ sha1_hash = 'e6c4fbd4fe7607f3e6ebf68b2ea4ef694da7b4fe' |
+ output_filename = os.path.join(self.base_path, 'rootfolder_text.txt') |
+ self.queue.put((sha1_hash, output_filename)) |
+ self.queue.put((None, None)) |
+ stdout_queue = Queue.Queue() |
+ # pylint: disable=W0212 |
+ download_from_google_storage._downloader_worker_thread( |
+ 0, self.queue, False, self.base_url, self.gsutil, |
+ stdout_queue, self.ret_codes) |
+ expected_output = [ |
+ '0> File %s exists and SHA1 matches. Skipping.' % output_filename |
+ ] |
+ self.assertEquals(list(stdout_queue.queue), expected_output) |
+ self.assertEquals(self.gsutil.history, []) |
+ |
+ def test_download_worker_skips_not_found_file(self): |
+ sha1_hash = '7871c8e24da15bad8b0be2c36edc9dc77e37727f' |
+ input_filename = '%s/%s' % (self.base_url, sha1_hash) |
+ output_filename = os.path.join(self.base_path, 'uploaded_lorem_ipsum.txt') |
+ self.queue.put((sha1_hash, output_filename)) |
+ self.queue.put((None, None)) |
+ stdout_queue = Queue.Queue() |
+ self.gsutil.add_expected(1, '', '') # Return error when 'ls' is called. |
+ # pylint: disable=W0212 |
+ download_from_google_storage._downloader_worker_thread( |
+ 0, self.queue, False, self.base_url, self.gsutil, |
+ stdout_queue, self.ret_codes) |
+ expected_output = [ |
+ '0> File %s for %s does not exist, skipping.' % ( |
+ input_filename, output_filename), |
+ ] |
+ expected_calls = [ |
+ ('check_call', |
+ ('ls', input_filename)) |
+ ] |
+ expected_ret_codes = [ |
+ (1, 'File %s for %s does not exist.' % ( |
+ input_filename, output_filename)) |
+ ] |
+ self.assertEquals(list(stdout_queue.queue), expected_output) |
+ self.assertEquals(self.gsutil.history, expected_calls) |
+ self.assertEquals(list(self.ret_codes.queue), expected_ret_codes) |
+ |
+ def test_download_cp_fails(self): |
+ sha1_hash = '7871c8e24da15bad8b0be2c36edc9dc77e37727f' |
+ input_filename = '%s/%s' % (self.base_url, sha1_hash) |
+ output_filename = os.path.join(self.base_path, 'uploaded_lorem_ipsum.txt') |
+ self.gsutil.add_expected(0, '', '') |
+ self.gsutil.add_expected(101, '', 'Test error message.') |
+ # pylint: disable=W0212 |
+ code = download_from_google_storage.download_from_google_storage( |
+ input_filename=sha1_hash, |
+ base_url=self.base_url, |
+ gsutil=self.gsutil, |
+ num_threads=1, |
+ directory=False, |
+ recursive=False, |
+ force=True, |
+ output=output_filename, |
+ ignore_errors=False, |
+ sha1_file=False) |
+ expected_calls = [ |
+ ('check_call', |
+ ('ls', input_filename)), |
+ ('check_call', |
+ ('cp', '-q', input_filename, output_filename)) |
+ ] |
+ self.assertEquals(self.gsutil.history, expected_calls) |
+ self.assertEquals(code, 101) |
+ |
+ def test_download_directory_no_recursive_non_force(self): |
+ sha1_hash = '7871c8e24da15bad8b0be2c36edc9dc77e37727f' |
+ input_filename = '%s/%s' % (self.base_url, sha1_hash) |
+ output_filename = os.path.join(self.base_path, 'uploaded_lorem_ipsum.txt') |
+ download_from_google_storage.download_from_google_storage( |
M-A Ruel
2013/03/08 13:27:41
You don't check the return code here.
Split this
Ryan Tseng
2013/03/08 23:22:13
Done.
|
+ input_filename=self.base_path, |
+ base_url=self.base_url, |
+ gsutil=self.gsutil, |
+ num_threads=1, |
+ directory=True, |
+ recursive=False, |
+ force=False, |
+ output=None, |
+ ignore_errors=False, |
+ sha1_file=False) |
+ expected_calls = [ |
+ ('check_call', |
+ ('ls', input_filename)), |
+ ('check_call', |
+ ('cp', '-q', input_filename, output_filename))] |
+ self.assertEquals(self.gsutil.history, expected_calls) |
+ |
+ |
+if __name__ == '__main__': |
+ unittest.main() |