Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1999)

Unified Diff: appengine/chrome_infra_packages/cipd/test/impl_test.py

Issue 816433004: cipd: registerPackage method implementation. (Closed) Base URL: https://chromium.googlesource.com/infra/infra.git@master
Patch Set: remove metadata Created 5 years, 12 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: appengine/chrome_infra_packages/cipd/test/impl_test.py
diff --git a/appengine/chrome_infra_packages/cipd/test/impl_test.py b/appengine/chrome_infra_packages/cipd/test/impl_test.py
new file mode 100644
index 0000000000000000000000000000000000000000..3f9a542b6f7c9e705c1f6570f725832c766044d1
--- /dev/null
+++ b/appengine/chrome_infra_packages/cipd/test/impl_test.py
@@ -0,0 +1,170 @@
+# Copyright 2014 The Chromium Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+import datetime
+import unittest
+
+from google.appengine.ext import ndb
+from testing_utils import testing
+
+from components import auth
+
+from cipd import impl
+
+
+class TestValidators(unittest.TestCase):
+ def test_is_valid_package_name(self):
+ self.assertTrue(impl.is_valid_package_name('a'))
+ self.assertTrue(impl.is_valid_package_name('a/b'))
+ self.assertTrue(impl.is_valid_package_name('a/b/c/1/2/3'))
+ self.assertTrue(impl.is_valid_package_name('infra/tools/cipd'))
+ self.assertTrue(impl.is_valid_package_name('./-/_'))
+ self.assertFalse(impl.is_valid_package_name(''))
+ self.assertFalse(impl.is_valid_package_name('/a'))
+ self.assertFalse(impl.is_valid_package_name('a/'))
+ self.assertFalse(impl.is_valid_package_name('A'))
+ self.assertFalse(impl.is_valid_package_name('a/B'))
+ self.assertFalse(impl.is_valid_package_name('a\\b'))
+
+ def test_is_valid_instance_id(self):
+ self.assertTrue(impl.is_valid_instance_id('a'*40))
+ self.assertFalse(impl.is_valid_instance_id(''))
+ self.assertFalse(impl.is_valid_instance_id('A'*40))
+
+
+class TestRepoService(testing.AppengineTestCase):
+ def setUp(self):
+ super(TestRepoService, self).setUp()
+ self.mocked_cas_service = MockedCASService()
+ self.mock(impl.cas, 'get_cas_service', lambda: self.mocked_cas_service)
+ self.service = impl.get_repo_service()
+
+ @staticmethod
+ def fake_signature(key):
+ return impl.PackageInstanceSignature(
+ hash_algo='SHA1',
+ digest='\x00\x01\x02\x03',
+ signature_algo='sig algo',
+ signature_key=key,
+ signature='signature \x00\x01\x02\x03',
+ added_by=auth.Identity.from_bytes('user:abc@example.com'),
+ added_ts=datetime.datetime(2014, 1, 1))
+
+ def test_register_new(self):
+ pkg = self.service.register_instance(
+ package_name='a/b',
+ instance_id='a'*40,
+ signatures=[self.fake_signature('key 1'), self.fake_signature('key 2')],
+ caller=auth.Identity.from_bytes('user:abc@example.com'),
+ now=datetime.datetime(2014, 1, 1, 0, 0))
+ self.assertEqual(
+ ndb.Key('Package', 'a/b', 'PackageInstance', 'a'*40), pkg.key)
+ expected = {
+ 'registered_by': auth.Identity(kind='user', name='abc@example.com'),
+ 'registered_ts': datetime.datetime(2014, 1, 1, 0, 0),
+ 'signature_keys': ['key 1', 'key 2'],
+ 'signatures': [
+ {
+ 'added_by': auth.Identity(kind='user', name='abc@example.com'),
+ 'added_ts': datetime.datetime(2014, 1, 1, 0, 0),
+ 'digest': '\x00\x01\x02\x03',
+ 'hash_algo': 'SHA1',
+ 'signature': 'signature \x00\x01\x02\x03',
+ 'signature_algo': 'sig algo',
+ 'signature_key': 'key 1',
+ },
+ {
+ 'added_by': auth.Identity(kind='user', name='abc@example.com'),
+ 'added_ts': datetime.datetime(2014, 1, 1, 0, 0),
+ 'digest': '\x00\x01\x02\x03',
+ 'hash_algo': 'SHA1',
+ 'signature': 'signature \x00\x01\x02\x03',
+ 'signature_algo': 'sig algo',
+ 'signature_key': 'key 2',
+ },
+ ],
+ }
+ self.assertEqual(expected, pkg.to_dict())
+ self.assertEqual(
+ expected, self.service.get_instance('a/b', 'a'*40).to_dict())
+
+ def test_register_existing(self):
+ # First register a package.
+ self.service.register_instance(
+ package_name='a/b',
+ instance_id='a'*40,
+ signatures=[self.fake_signature('key1')],
+ caller=auth.Identity.from_bytes('user:abc@example.com'))
+ # Try to register it again.
+ with self.assertRaises(impl.PackageInstanceExistsError):
+ self.service.register_instance(
+ package_name='a/b',
+ instance_id='a'*40,
+ signatures=[],
+ caller=auth.Identity.from_bytes('user:abc@example.com'))
+
+ def test_add_signatures_missing(self):
+ with self.assertRaises(impl.PackageInstanceNotFoundError):
+ self.service.add_signatures('a/b', 'a'*40, [self.fake_signature('key')])
+
+ def test_add_signatures(self):
+ self.service.register_instance(
+ package_name='a/b',
+ instance_id='a'*40,
+ signatures=[],
+ caller=auth.Identity.from_bytes('user:abc@example.com'))
+
+ expected = lambda key: {
+ 'added_by': auth.Identity(kind='user', name='abc@example.com'),
+ 'added_ts': datetime.datetime(2014, 1, 1, 0, 0),
+ 'digest': '\x00\x01\x02\x03',
+ 'hash_algo': 'SHA1',
+ 'signature': 'signature \x00\x01\x02\x03',
+ 'signature_algo': 'sig algo',
+ 'signature_key': key,
+ }
+
+ # Add one.
+ pkg = self.service.add_signatures(
+ 'a/b', 'a'*40, [self.fake_signature('key0')])
+ self.assertEqual([expected('key0')], pkg.to_dict()['signatures'])
+ self.assertEqual(['key0'], pkg.to_dict()['signature_keys'])
+
+ # Add exact same one -> no effect.
+ pkg = self.service.add_signatures(
+ 'a/b', 'a'*40, [self.fake_signature('key0')])
+ self.assertEqual([expected('key0')], pkg.to_dict()['signatures'])
+ self.assertEqual(['key0'], pkg.to_dict()['signature_keys'])
+
+ # Add another one.
+ pkg = self.service.add_signatures(
+ 'a/b', 'a'*40, [self.fake_signature('key1')])
+ self.assertEqual(
+ [expected('key0'), expected('key1')], pkg.to_dict()['signatures'])
+ self.assertEqual(
+ ['key0', 'key1'], pkg.to_dict()['signature_keys'])
+
+ def test_is_instance_file_uploaded(self):
+ self.mocked_cas_service.uploaded.add(('SHA1', 'a'*40))
+ self.assertTrue(self.service.is_instance_file_uploaded('a/b', 'a'*40))
+ self.assertFalse(self.service.is_instance_file_uploaded('a/b', 'b'*40))
+
+ def test_create_upload_session(self):
+ upload_url, upload_session_id = self.service.create_upload_session(
+ 'a/b', 'a'*40, auth.Identity.from_bytes('user:abc@example.com'))
+ self.assertEqual('http://upload_url', upload_url)
+ self.assertEqual('upload_session_id', upload_session_id)
+
+
+class MockedCASService(object):
+ def __init__(self):
+ self.uploaded = set()
+
+ def is_object_present(self, algo, digest):
+ return (algo, digest) in self.uploaded
+
+ def create_upload_session(self, _algo, _digest, _caller):
+ class UploadSession(object):
+ upload_url = 'http://upload_url'
+ return UploadSession(), 'upload_session_id'

Powered by Google App Engine
This is Rietveld 408576698