Index: tests/gsutil_test.py |
diff --git a/tests/gsutil_test.py b/tests/gsutil_test.py |
new file mode 100755 |
index 0000000000000000000000000000000000000000..0b26be8beaa9ffed0c485833f43bf2c4faaf4da3 |
--- /dev/null |
+++ b/tests/gsutil_test.py |
@@ -0,0 +1,172 @@ |
+#!/usr/bin/env python |
+# Copyright (c) 2012 The Chromium Authors. All rights reserved. |
+# Use of this source code is governed by a BSD-style license that can be |
+# found in the LICENSE file. |
+ |
+"""Test gsutil.py.""" |
+ |
+ |
+import __builtin__ |
+import unittest |
+import hashlib |
+import zipfile |
+import shutil |
+import sys |
+import base64 |
+import tempfile |
+import json |
+import os |
+import urllib |
+ |
+ |
+# Add depot_tools to path |
+THIS_DIR = os.path.dirname(os.path.abspath(__file__)) |
+DEPOT_TOOLS_DIR = os.path.dirname(THIS_DIR) |
+sys.path.append(DEPOT_TOOLS_DIR) |
+ |
+import gsutil |
+ |
+ |
+class TestError(Exception): |
+ pass |
+ |
+ |
+class Buffer(object): |
+ def __init__(self): |
+ self.data = '' |
+ |
+ def write(self, buf): |
+ self.data += buf |
+ |
+ def read(self, amount=None): |
+ if not amount: |
+ amount = len(self.data) |
+ result = self.data[:amount] |
+ self.data = self.data[amount:] |
+ return result |
+ |
+ |
+class FakeOpen(object): |
+ def __init__(self): |
+ self.expectations = [] |
+ |
+ def add_expectation(self, filename, data): |
+ self.expectations.append((filename, data)) |
+ |
+ def urlopen(self, url): |
+ exp_url, exp_data = self.expectations.pop(0) |
+ if url != exp_url: |
+ raise TestError('%s incorrect, expected %s' % ( |
+ url, exp_url)) |
+ |
+ buf = Buffer() |
+ buf.write(exp_data) |
+ return buf |
+ |
+ |
+class GsutilUnitTests(unittest.TestCase): |
+ def setUp(self): |
+ self.fake_open = FakeOpen() |
+ self.tempdir = tempfile.mkdtemp() |
+ self.old_urlopen = getattr(urllib, 'urlopen') |
+ setattr(urllib, 'urlopen', self.fake_open.urlopen) |
+ |
+ def tearDown(self): |
+ shutil.rmtree(self.tempdir) |
+ setattr(urllib, 'urlopen', self.old_urlopen) |
+ |
+ def test_download_gsutil(self): |
+ version = '4.2' |
+ filename = 'gsutil_%s.zip' % version |
+ full_filename = os.path.join(self.tempdir, filename) |
+ fake_file = 'This is gsutil.zip' |
+ fake_file2 = 'This is other gsutil.zip' |
+ url = '%s%s' % (gsutil.GSUTIL_URL, filename) |
+ self.fake_open.add_expectation(url, fake_file) |
+ |
+ self.assertEquals( |
+ gsutil.download_gsutil(version, self.tempdir), full_filename) |
+ with open(full_filename, 'r') as f: |
+ self.assertEquals(fake_file, f.read()) |
+ |
+ metadata_url = gsutil.API_URL + filename |
+ md5_calc = hashlib.md5() |
+ md5_calc.update(fake_file) |
+ b64_md5 = base64.b64encode(md5_calc.hexdigest()) |
+ self.fake_open.add_expectation(metadata_url, json.dumps({ |
+ 'md5Hash': b64_md5 |
+ })) |
+ self.assertEquals( |
+ gsutil.download_gsutil(version, self.tempdir), full_filename) |
+ with open(full_filename, 'r') as f: |
+ self.assertEquals(fake_file, f.read()) |
+ self.assertEquals(self.fake_open.expectations, []) |
+ |
+ self.fake_open.add_expectation(metadata_url, json.dumps({ |
+ 'md5Hash': base64.b64encode('aaaaaaa') # Bad MD5 |
+ })) |
+ self.fake_open.add_expectation(url, fake_file2) |
+ self.assertEquals( |
+ gsutil.download_gsutil(version, self.tempdir), full_filename) |
+ with open(full_filename, 'r') as f: |
+ self.assertEquals(fake_file2, f.read()) |
+ self.assertEquals(self.fake_open.expectations, []) |
+ |
+ def test_ensure_gsutil_full(self): |
+ version = '4.2' |
+ gsutil_dir = os.path.join(self.tempdir, 'gsutil_%s' % version, 'gsutil') |
+ gsutil_bin = os.path.join(gsutil_dir, 'gsutil') |
+ os.makedirs(gsutil_dir) |
+ |
+ # Mock out call(). |
+ old_call = getattr(gsutil, 'call') |
dnj
2014/11/20 19:58:27
If this test raises an exception, 'call' will stay
hinoka
2014/11/20 21:20:15
My (and Sergey's) experience with mock libraries a
|
+ def fake_call(args, **kwargs): |
+ self.assertEquals(args, [gsutil_bin, 'version']) |
+ self.assertEquals(kwargs, {'verbose': False}) |
+ raise gsutil.SubprocessError() |
+ setattr(gsutil, 'call', fake_call) |
+ |
+ with open(gsutil_bin, 'w') as f: |
+ f.write('Foobar') |
+ zip_filename = 'gsutil_%s.zip' % version |
+ url = '%s%s' % (gsutil.GSUTIL_URL, zip_filename) |
+ _, tempzip = tempfile.mkstemp() |
+ fake_gsutil = 'Fake gsutil' |
+ with zipfile.ZipFile(tempzip, 'w') as zf: |
+ zf.writestr('gsutil/gsutil', fake_gsutil) |
+ with open(tempzip, 'rb') as f: |
+ self.fake_open.add_expectation(url, f.read()) |
+ |
+ # This should delete the old bin and rewrite it with 'Fake gsutil' |
+ self.assertRaises( |
+ gsutil.InvalidGsutilError, gsutil.ensure_gsutil, version, self.tempdir) |
+ self.assertTrue(os.path.isdir(os.path.join(self.tempdir, '.cache_dir'))) |
+ self.assertTrue(os.path.exists(gsutil_bin)) |
+ with open(gsutil_bin, 'r') as f: |
+ self.assertEquals(f.read(), fake_gsutil) |
+ |
+ setattr(gsutil, 'call', old_call) |
+ |
+ def test_ensure_gsutil_short(self): |
+ version = '4.2' |
+ gsutil_dir = os.path.join(self.tempdir, 'gsutil_%s' % version, 'gsutil') |
+ gsutil_bin = os.path.join(gsutil_dir, 'gsutil') |
+ os.makedirs(gsutil_dir) |
+ |
+ # Mock out call(). |
+ old_call = getattr(gsutil, 'call') |
+ def fake_call(args, **kwargs): |
+ self.assertEquals(args, [gsutil_bin, 'version']) |
+ self.assertEquals(kwargs, {'verbose': False}) |
+ return True |
+ setattr(gsutil, 'call', fake_call) |
+ |
+ with open(gsutil_bin, 'w') as f: |
+ f.write('Foobar') |
+ self.assertEquals( |
+ gsutil.ensure_gsutil(version, self.tempdir), gsutil_bin) |
+ |
+ setattr(gsutil, 'call', old_call) |
+ |
+if __name__ == '__main__': |
+ unittest.main() |