| Index: appengine/chrome_infra_packages/cipd/test/impl_test.py
|
| diff --git a/appengine/chrome_infra_packages/cipd/test/impl_test.py b/appengine/chrome_infra_packages/cipd/test/impl_test.py
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..3f9a542b6f7c9e705c1f6570f725832c766044d1
|
| --- /dev/null
|
| +++ b/appengine/chrome_infra_packages/cipd/test/impl_test.py
|
| @@ -0,0 +1,170 @@
|
| +# Copyright 2014 The Chromium Authors. All rights reserved.
|
| +# Use of this source code is governed by a BSD-style license that can be
|
| +# found in the LICENSE file.
|
| +
|
| +import datetime
|
| +import unittest
|
| +
|
| +from google.appengine.ext import ndb
|
| +from testing_utils import testing
|
| +
|
| +from components import auth
|
| +
|
| +from cipd import impl
|
| +
|
| +
|
| +class TestValidators(unittest.TestCase):
|
| + def test_is_valid_package_name(self):
|
| + self.assertTrue(impl.is_valid_package_name('a'))
|
| + self.assertTrue(impl.is_valid_package_name('a/b'))
|
| + self.assertTrue(impl.is_valid_package_name('a/b/c/1/2/3'))
|
| + self.assertTrue(impl.is_valid_package_name('infra/tools/cipd'))
|
| + self.assertTrue(impl.is_valid_package_name('./-/_'))
|
| + self.assertFalse(impl.is_valid_package_name(''))
|
| + self.assertFalse(impl.is_valid_package_name('/a'))
|
| + self.assertFalse(impl.is_valid_package_name('a/'))
|
| + self.assertFalse(impl.is_valid_package_name('A'))
|
| + self.assertFalse(impl.is_valid_package_name('a/B'))
|
| + self.assertFalse(impl.is_valid_package_name('a\\b'))
|
| +
|
| + def test_is_valid_instance_id(self):
|
| + self.assertTrue(impl.is_valid_instance_id('a'*40))
|
| + self.assertFalse(impl.is_valid_instance_id(''))
|
| + self.assertFalse(impl.is_valid_instance_id('A'*40))
|
| +
|
| +
|
| +class TestRepoService(testing.AppengineTestCase):
|
| + def setUp(self):
|
| + super(TestRepoService, self).setUp()
|
| + self.mocked_cas_service = MockedCASService()
|
| + self.mock(impl.cas, 'get_cas_service', lambda: self.mocked_cas_service)
|
| + self.service = impl.get_repo_service()
|
| +
|
| + @staticmethod
|
| + def fake_signature(key):
|
| + return impl.PackageInstanceSignature(
|
| + hash_algo='SHA1',
|
| + digest='\x00\x01\x02\x03',
|
| + signature_algo='sig algo',
|
| + signature_key=key,
|
| + signature='signature \x00\x01\x02\x03',
|
| + added_by=auth.Identity.from_bytes('user:abc@example.com'),
|
| + added_ts=datetime.datetime(2014, 1, 1))
|
| +
|
| + def test_register_new(self):
|
| + pkg = self.service.register_instance(
|
| + package_name='a/b',
|
| + instance_id='a'*40,
|
| + signatures=[self.fake_signature('key 1'), self.fake_signature('key 2')],
|
| + caller=auth.Identity.from_bytes('user:abc@example.com'),
|
| + now=datetime.datetime(2014, 1, 1, 0, 0))
|
| + self.assertEqual(
|
| + ndb.Key('Package', 'a/b', 'PackageInstance', 'a'*40), pkg.key)
|
| + expected = {
|
| + 'registered_by': auth.Identity(kind='user', name='abc@example.com'),
|
| + 'registered_ts': datetime.datetime(2014, 1, 1, 0, 0),
|
| + 'signature_keys': ['key 1', 'key 2'],
|
| + 'signatures': [
|
| + {
|
| + 'added_by': auth.Identity(kind='user', name='abc@example.com'),
|
| + 'added_ts': datetime.datetime(2014, 1, 1, 0, 0),
|
| + 'digest': '\x00\x01\x02\x03',
|
| + 'hash_algo': 'SHA1',
|
| + 'signature': 'signature \x00\x01\x02\x03',
|
| + 'signature_algo': 'sig algo',
|
| + 'signature_key': 'key 1',
|
| + },
|
| + {
|
| + 'added_by': auth.Identity(kind='user', name='abc@example.com'),
|
| + 'added_ts': datetime.datetime(2014, 1, 1, 0, 0),
|
| + 'digest': '\x00\x01\x02\x03',
|
| + 'hash_algo': 'SHA1',
|
| + 'signature': 'signature \x00\x01\x02\x03',
|
| + 'signature_algo': 'sig algo',
|
| + 'signature_key': 'key 2',
|
| + },
|
| + ],
|
| + }
|
| + self.assertEqual(expected, pkg.to_dict())
|
| + self.assertEqual(
|
| + expected, self.service.get_instance('a/b', 'a'*40).to_dict())
|
| +
|
| + def test_register_existing(self):
|
| + # First register a package.
|
| + self.service.register_instance(
|
| + package_name='a/b',
|
| + instance_id='a'*40,
|
| + signatures=[self.fake_signature('key1')],
|
| + caller=auth.Identity.from_bytes('user:abc@example.com'))
|
| + # Try to register it again.
|
| + with self.assertRaises(impl.PackageInstanceExistsError):
|
| + self.service.register_instance(
|
| + package_name='a/b',
|
| + instance_id='a'*40,
|
| + signatures=[],
|
| + caller=auth.Identity.from_bytes('user:abc@example.com'))
|
| +
|
| + def test_add_signatures_missing(self):
|
| + with self.assertRaises(impl.PackageInstanceNotFoundError):
|
| + self.service.add_signatures('a/b', 'a'*40, [self.fake_signature('key')])
|
| +
|
| + def test_add_signatures(self):
|
| + self.service.register_instance(
|
| + package_name='a/b',
|
| + instance_id='a'*40,
|
| + signatures=[],
|
| + caller=auth.Identity.from_bytes('user:abc@example.com'))
|
| +
|
| + expected = lambda key: {
|
| + 'added_by': auth.Identity(kind='user', name='abc@example.com'),
|
| + 'added_ts': datetime.datetime(2014, 1, 1, 0, 0),
|
| + 'digest': '\x00\x01\x02\x03',
|
| + 'hash_algo': 'SHA1',
|
| + 'signature': 'signature \x00\x01\x02\x03',
|
| + 'signature_algo': 'sig algo',
|
| + 'signature_key': key,
|
| + }
|
| +
|
| + # Add one.
|
| + pkg = self.service.add_signatures(
|
| + 'a/b', 'a'*40, [self.fake_signature('key0')])
|
| + self.assertEqual([expected('key0')], pkg.to_dict()['signatures'])
|
| + self.assertEqual(['key0'], pkg.to_dict()['signature_keys'])
|
| +
|
| + # Add exact same one -> no effect.
|
| + pkg = self.service.add_signatures(
|
| + 'a/b', 'a'*40, [self.fake_signature('key0')])
|
| + self.assertEqual([expected('key0')], pkg.to_dict()['signatures'])
|
| + self.assertEqual(['key0'], pkg.to_dict()['signature_keys'])
|
| +
|
| + # Add another one.
|
| + pkg = self.service.add_signatures(
|
| + 'a/b', 'a'*40, [self.fake_signature('key1')])
|
| + self.assertEqual(
|
| + [expected('key0'), expected('key1')], pkg.to_dict()['signatures'])
|
| + self.assertEqual(
|
| + ['key0', 'key1'], pkg.to_dict()['signature_keys'])
|
| +
|
| + def test_is_instance_file_uploaded(self):
|
| + self.mocked_cas_service.uploaded.add(('SHA1', 'a'*40))
|
| + self.assertTrue(self.service.is_instance_file_uploaded('a/b', 'a'*40))
|
| + self.assertFalse(self.service.is_instance_file_uploaded('a/b', 'b'*40))
|
| +
|
| + def test_create_upload_session(self):
|
| + upload_url, upload_session_id = self.service.create_upload_session(
|
| + 'a/b', 'a'*40, auth.Identity.from_bytes('user:abc@example.com'))
|
| + self.assertEqual('http://upload_url', upload_url)
|
| + self.assertEqual('upload_session_id', upload_session_id)
|
| +
|
| +
|
| +class MockedCASService(object):
|
| + def __init__(self):
|
| + self.uploaded = set()
|
| +
|
| + def is_object_present(self, algo, digest):
|
| + return (algo, digest) in self.uploaded
|
| +
|
| + def create_upload_session(self, _algo, _digest, _caller):
|
| + class UploadSession(object):
|
| + upload_url = 'http://upload_url'
|
| + return UploadSession(), 'upload_session_id'
|
|
|