Chromium Code Reviews| Index: appengine/chrome_infra_packages/cipd/test/impl_test.py |
| diff --git a/appengine/chrome_infra_packages/cipd/test/impl_test.py b/appengine/chrome_infra_packages/cipd/test/impl_test.py |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..e84dc34b51278900cd73d3cc41851accf0f111de |
| --- /dev/null |
| +++ b/appengine/chrome_infra_packages/cipd/test/impl_test.py |
| @@ -0,0 +1,230 @@ |
| +# Copyright 2014 The Chromium Authors. All rights reserved. |
| +# Use of this source code is governed by a BSD-style license that can be |
| +# found in the LICENSE file. |
| + |
| +import datetime |
| +import unittest |
| + |
| +from google.appengine.ext import ndb |
| +from testing_utils import testing |
| + |
| +from components import auth |
| + |
| +from cipd import impl |
| + |
| + |
| +class TestValidators(unittest.TestCase): |
| + def test_is_valid_package_name(self): |
| + self.assertTrue(impl.is_valid_package_name('a')) |
| + self.assertTrue(impl.is_valid_package_name('a/b')) |
| + self.assertTrue(impl.is_valid_package_name('a/b/c/1/2/3')) |
| + self.assertTrue(impl.is_valid_package_name('infra/tools/cipd')) |
| + self.assertTrue(impl.is_valid_package_name('./-/_')) |
| + self.assertFalse(impl.is_valid_package_name('')) |
| + self.assertFalse(impl.is_valid_package_name('/a')) |
| + self.assertFalse(impl.is_valid_package_name('a/')) |
| + self.assertFalse(impl.is_valid_package_name('A')) |
| + self.assertFalse(impl.is_valid_package_name('a/B')) |
| + self.assertFalse(impl.is_valid_package_name('a\\b')) |
| + |
| + def test_is_valid_instance_id(self): |
| + self.assertTrue(impl.is_valid_instance_id('a'*40)) |
| + self.assertFalse(impl.is_valid_instance_id('')) |
| + self.assertFalse(impl.is_valid_instance_id('A'*40)) |
| + |
| + |
| +class TestRepoService(testing.AppengineTestCase): |
| + def setUp(self): |
| + super(TestRepoService, self).setUp() |
| + self.mocked_cas_service = MockedCASService() |
| + self.mock(impl.cas, 'get_cas_service', lambda: self.mocked_cas_service) |
| + self.service = impl.get_repo_service() |
| + |
| + @staticmethod |
| + def fake_metadata(user='user'): |
| + return impl.PackageInstanceMetadata( |
| + date='Jan 1 2014', |
| + hostname='hostname', |
| + user=user, |
| + registered_by=auth.Identity.from_bytes('user:abc@example.com'), |
| + registered_ts=datetime.datetime(2014, 1, 1)) |
| + |
| + @staticmethod |
| + def fake_signature(key): |
| + return impl.PackageInstanceSignature( |
| + hash_algo='SHA1', |
| + digest='\x00\x01\x02\x03', |
| + signature_algo='sig algo', |
| + signature_key=key, |
| + signature='signature \x00\x01\x02\x03', |
| + added_by=auth.Identity.from_bytes('user:abc@example.com'), |
| + added_ts=datetime.datetime(2014, 1, 1)) |
| + |
| + def test_register_new(self): |
| + pkg, registered = self.service.register_instance( |
| + package_name='a/b', |
| + instance_id='a'*40, |
| + metadata=self.fake_metadata(), |
| + signatures=[self.fake_signature('key 1'), self.fake_signature('key 2')]) |
| + self.assertTrue(registered) |
| + self.assertEqual( |
| + ndb.Key('Package', 'a/b', 'PackageInstance', 'a'*40), pkg.key) |
| + expected = { |
| + 'metadata': { |
| + 'date': 'Jan 1 2014', |
| + 'hostname': 'hostname', |
| + 'registered_by': auth.Identity(kind='user', name='abc@example.com'), |
| + 'registered_ts': datetime.datetime(2014, 1, 1, 0, 0), |
| + 'user': 'user', |
| + }, |
| + 'signature_ids': ['SHA1:sig algo:key 1', 'SHA1:sig algo:key 2'], |
| + 'signatures': [ |
| + { |
| + 'added_by': auth.Identity(kind='user', name='abc@example.com'), |
| + 'added_ts': datetime.datetime(2014, 1, 1, 0, 0), |
| + 'digest': '\x00\x01\x02\x03', |
| + 'hash_algo': 'SHA1', |
| + 'signature': 'signature \x00\x01\x02\x03', |
| + 'signature_algo': 'sig algo', |
| + 'signature_key': 'key 1', |
| + }, |
| + { |
| + 'added_by': auth.Identity(kind='user', name='abc@example.com'), |
| + 'added_ts': datetime.datetime(2014, 1, 1, 0, 0), |
| + 'digest': '\x00\x01\x02\x03', |
| + 'hash_algo': 'SHA1', |
| + 'signature': 'signature \x00\x01\x02\x03', |
| + 'signature_algo': 'sig algo', |
| + 'signature_key': 'key 2', |
| + }, |
| + ], |
| + } |
| + self.assertEqual(expected, pkg.to_dict()) |
| + self.assertEqual( |
| + expected, self.service.get_instance('a/b', 'a'*40).to_dict()) |
| + |
| + def test_register_existing(self): |
| + # First register a package. |
| + _, registered = self.service.register_instance( |
| + package_name='a/b', |
| + instance_id='a'*40, |
| + metadata=self.fake_metadata('user'), |
| + signatures=[self.fake_signature('key1')]) |
| + self.assertTrue(registered) |
| + |
| + # Try to register it again, with more signatures. |
| + pkg, registered = self.service.register_instance( |
| + package_name='a/b', |
| + instance_id='a'*40, |
| + metadata=self.fake_metadata('another user'), |
| + signatures=[self.fake_signature('key0'), self.fake_signature('key1')]) |
| + self.assertFalse(registered) |
| + # Original metadata is preserved. Signatures are merged. |
| + expected = { |
| + 'metadata': { |
| + 'date': 'Jan 1 2014', |
| + 'hostname': 'hostname', |
| + 'registered_by': auth.Identity(kind='user', name='abc@example.com'), |
| + 'registered_ts': datetime.datetime(2014, 1, 1, 0, 0), |
| + 'user': 'user', |
| + }, |
| + 'signature_ids': ['SHA1:sig algo:key1', 'SHA1:sig algo:key0'], |
| + 'signatures': [ |
| + { |
| + 'added_by': auth.Identity(kind='user', name='abc@example.com'), |
| + 'added_ts': datetime.datetime(2014, 1, 1, 0, 0), |
| + 'digest': '\x00\x01\x02\x03', |
| + 'hash_algo': 'SHA1', |
| + 'signature': 'signature \x00\x01\x02\x03', |
| + 'signature_algo': 'sig algo', |
| + 'signature_key': 'key1', |
| + }, |
| + { |
| + 'added_by': auth.Identity(kind='user', name='abc@example.com'), |
| + 'added_ts': datetime.datetime(2014, 1, 1, 0, 0), |
| + 'digest': '\x00\x01\x02\x03', |
| + 'hash_algo': 'SHA1', |
| + 'signature': 'signature \x00\x01\x02\x03', |
| + 'signature_algo': 'sig algo', |
| + 'signature_key': 'key0', |
| + }, |
| + ], |
| + } |
| + self.assertEqual(expected, pkg.to_dict()) |
| + |
| + # Again, exact same call. No changes expected. |
| + pkg, registered = self.service.register_instance( |
| + package_name='a/b', |
| + instance_id='a'*40, |
| + metadata=self.fake_metadata('another user'), |
| + signatures=[self.fake_signature('key0'), self.fake_signature('key1')]) |
| + self.assertFalse(registered) |
| + self.assertEqual(expected, pkg.to_dict()) |
| + |
| + def test_add_signatures_missing(self): |
| + with self.assertRaises(ValueError): |
|
nodir
2014/12/30 22:54:01
ValueError is ambiguous. It could be anything, rea
Vadim Sh.
2014/12/31 01:27:36
Done.
|
| + self.service.add_signatures('a/b', 'a'*40, [self.fake_signature('key')]) |
| + |
| + def test_add_signatures(self): |
| + _, registered = self.service.register_instance( |
| + package_name='a/b', |
| + instance_id='a'*40, |
| + metadata=self.fake_metadata('another user'), |
| + signatures=[]) |
| + self.assertTrue(registered) |
| + |
| + expected = lambda key: { |
|
nodir
2014/12/30 22:54:01
I always thought multi-line lambdas do not satisfy
Vadim Sh.
2014/12/31 01:27:36
Never heard of that :)
Generally I avoid line wra
nodir
2015/01/02 19:13:04
https://google-styleguide.googlecode.com/svn/trunk
Vadim Sh.
2015/01/02 19:31:02
Arguably, lambda x: <dict> is one liner. I prefer
nodir
2015/01/02 20:23:23
Acknowledged.
|
| + 'added_by': auth.Identity(kind='user', name='abc@example.com'), |
| + 'added_ts': datetime.datetime(2014, 1, 1, 0, 0), |
| + 'digest': '\x00\x01\x02\x03', |
| + 'hash_algo': 'SHA1', |
| + 'signature': 'signature \x00\x01\x02\x03', |
| + 'signature_algo': 'sig algo', |
| + 'signature_key': key, |
| + } |
| + expected_sig_id = lambda key: 'SHA1:sig algo:%s' % key |
| + |
| + # Add one. |
| + pkg = self.service.add_signatures( |
| + 'a/b', 'a'*40, [self.fake_signature('key0')]) |
| + self.assertEqual([expected('key0')], pkg.to_dict()['signatures']) |
| + self.assertEqual([expected_sig_id('key0')], pkg.to_dict()['signature_ids']) |
| + |
| + # Add exact same one -> no effect. |
| + pkg = self.service.add_signatures( |
| + 'a/b', 'a'*40, [self.fake_signature('key0')]) |
| + self.assertEqual([expected('key0')], pkg.to_dict()['signatures']) |
| + self.assertEqual([expected_sig_id('key0')], pkg.to_dict()['signature_ids']) |
| + |
| + # Add another one. |
| + pkg = self.service.add_signatures( |
| + 'a/b', 'a'*40, [self.fake_signature('key1')]) |
| + self.assertEqual( |
| + [expected('key0'), expected('key1')], pkg.to_dict()['signatures']) |
| + self.assertEqual( |
| + [expected_sig_id('key0'), expected_sig_id('key1')], |
| + pkg.to_dict()['signature_ids']) |
| + |
| + def test_is_data_uploaded(self): |
| + self.mocked_cas_service.uploaded.add(('SHA1', 'a'*40)) |
| + self.assertTrue(self.service.is_data_uploaded('a/b', 'a'*40)) |
| + self.assertFalse(self.service.is_data_uploaded('a/b', 'b'*40)) |
| + |
| + def test_create_upload_session(self): |
| + upload_url, upload_session_id = self.service.create_upload_session( |
| + 'a/b', 'a'*40, auth.Identity.from_bytes('user:abc@example.com')) |
| + self.assertEqual('http://upload_url', upload_url) |
| + self.assertEqual('upload_session_id', upload_session_id) |
| + |
| + |
| +class MockedCASService(object): |
| + def __init__(self): |
| + self.uploaded = set() |
| + |
| + def is_object_present(self, algo, digest): |
| + return (algo, digest) in self.uploaded |
| + |
| + def create_upload_session(self, _algo, _digest, _caller): |
| + class UploadSession(object): |
| + upload_url = 'http://upload_url' |
| + return UploadSession(), 'upload_session_id' |