Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(155)

Side by Side Diff: appengine/chrome_infra_packages/cipd/test/impl_test.py

Issue 816433004: cipd: registerPackage method implementation. (Closed) Base URL: https://chromium.googlesource.com/infra/infra.git@master
Patch Set: Created 5 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 # Copyright 2014 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file.
4
5 import datetime
6 import unittest
7
8 from google.appengine.ext import ndb
9 from testing_utils import testing
10
11 from components import auth
12
13 from cipd import impl
14
15
16 class TestValidators(unittest.TestCase):
17 def test_is_valid_package_name(self):
18 self.assertTrue(impl.is_valid_package_name('a'))
19 self.assertTrue(impl.is_valid_package_name('a/b'))
20 self.assertTrue(impl.is_valid_package_name('a/b/c/1/2/3'))
21 self.assertTrue(impl.is_valid_package_name('infra/tools/cipd'))
22 self.assertTrue(impl.is_valid_package_name('./-/_'))
23 self.assertFalse(impl.is_valid_package_name(''))
24 self.assertFalse(impl.is_valid_package_name('/a'))
25 self.assertFalse(impl.is_valid_package_name('a/'))
26 self.assertFalse(impl.is_valid_package_name('A'))
27 self.assertFalse(impl.is_valid_package_name('a/B'))
28 self.assertFalse(impl.is_valid_package_name('a\\b'))
29
30 def test_is_valid_instance_id(self):
31 self.assertTrue(impl.is_valid_instance_id('a'*40))
32 self.assertFalse(impl.is_valid_instance_id(''))
33 self.assertFalse(impl.is_valid_instance_id('A'*40))
34
35
36 class TestRepoService(testing.AppengineTestCase):
37 def setUp(self):
38 super(TestRepoService, self).setUp()
39 self.mocked_cas_service = MockedCASService()
40 self.mock(impl.cas, 'get_cas_service', lambda: self.mocked_cas_service)
41 self.service = impl.get_repo_service()
42
43 @staticmethod
44 def fake_metadata(user='user'):
45 return impl.PackageInstanceMetadata(
46 date='Jan 1 2014',
47 hostname='hostname',
48 user=user,
49 registered_by=auth.Identity.from_bytes('user:abc@example.com'),
50 registered_ts=datetime.datetime(2014, 1, 1))
51
52 @staticmethod
53 def fake_signature(key):
54 return impl.PackageInstanceSignature(
55 hash_algo='SHA1',
56 digest='\x00\x01\x02\x03',
57 signature_algo='sig algo',
58 signature_key=key,
59 signature='signature \x00\x01\x02\x03',
60 added_by=auth.Identity.from_bytes('user:abc@example.com'),
61 added_ts=datetime.datetime(2014, 1, 1))
62
63 def test_register_new(self):
64 pkg, registered = self.service.register_instance(
65 package_name='a/b',
66 instance_id='a'*40,
67 metadata=self.fake_metadata(),
68 signatures=[self.fake_signature('key 1'), self.fake_signature('key 2')])
69 self.assertTrue(registered)
70 self.assertEqual(
71 ndb.Key('Package', 'a/b', 'PackageInstance', 'a'*40), pkg.key)
72 expected = {
73 'metadata': {
74 'date': 'Jan 1 2014',
75 'hostname': 'hostname',
76 'registered_by': auth.Identity(kind='user', name='abc@example.com'),
77 'registered_ts': datetime.datetime(2014, 1, 1, 0, 0),
78 'user': 'user',
79 },
80 'signature_ids': ['SHA1:sig algo:key 1', 'SHA1:sig algo:key 2'],
81 'signatures': [
82 {
83 'added_by': auth.Identity(kind='user', name='abc@example.com'),
84 'added_ts': datetime.datetime(2014, 1, 1, 0, 0),
85 'digest': '\x00\x01\x02\x03',
86 'hash_algo': 'SHA1',
87 'signature': 'signature \x00\x01\x02\x03',
88 'signature_algo': 'sig algo',
89 'signature_key': 'key 1',
90 },
91 {
92 'added_by': auth.Identity(kind='user', name='abc@example.com'),
93 'added_ts': datetime.datetime(2014, 1, 1, 0, 0),
94 'digest': '\x00\x01\x02\x03',
95 'hash_algo': 'SHA1',
96 'signature': 'signature \x00\x01\x02\x03',
97 'signature_algo': 'sig algo',
98 'signature_key': 'key 2',
99 },
100 ],
101 }
102 self.assertEqual(expected, pkg.to_dict())
103 self.assertEqual(
104 expected, self.service.get_instance('a/b', 'a'*40).to_dict())
105
106 def test_register_existing(self):
107 # First register a package.
108 _, registered = self.service.register_instance(
109 package_name='a/b',
110 instance_id='a'*40,
111 metadata=self.fake_metadata('user'),
112 signatures=[self.fake_signature('key1')])
113 self.assertTrue(registered)
114
115 # Try to register it again, with more signatures.
116 pkg, registered = self.service.register_instance(
117 package_name='a/b',
118 instance_id='a'*40,
119 metadata=self.fake_metadata('another user'),
120 signatures=[self.fake_signature('key0'), self.fake_signature('key1')])
121 self.assertFalse(registered)
122 # Original metadata is preserved. Signatures are merged.
123 expected = {
124 'metadata': {
125 'date': 'Jan 1 2014',
126 'hostname': 'hostname',
127 'registered_by': auth.Identity(kind='user', name='abc@example.com'),
128 'registered_ts': datetime.datetime(2014, 1, 1, 0, 0),
129 'user': 'user',
130 },
131 'signature_ids': ['SHA1:sig algo:key1', 'SHA1:sig algo:key0'],
132 'signatures': [
133 {
134 'added_by': auth.Identity(kind='user', name='abc@example.com'),
135 'added_ts': datetime.datetime(2014, 1, 1, 0, 0),
136 'digest': '\x00\x01\x02\x03',
137 'hash_algo': 'SHA1',
138 'signature': 'signature \x00\x01\x02\x03',
139 'signature_algo': 'sig algo',
140 'signature_key': 'key1',
141 },
142 {
143 'added_by': auth.Identity(kind='user', name='abc@example.com'),
144 'added_ts': datetime.datetime(2014, 1, 1, 0, 0),
145 'digest': '\x00\x01\x02\x03',
146 'hash_algo': 'SHA1',
147 'signature': 'signature \x00\x01\x02\x03',
148 'signature_algo': 'sig algo',
149 'signature_key': 'key0',
150 },
151 ],
152 }
153 self.assertEqual(expected, pkg.to_dict())
154
155 # Again, exact same call. No changes expected.
156 pkg, registered = self.service.register_instance(
157 package_name='a/b',
158 instance_id='a'*40,
159 metadata=self.fake_metadata('another user'),
160 signatures=[self.fake_signature('key0'), self.fake_signature('key1')])
161 self.assertFalse(registered)
162 self.assertEqual(expected, pkg.to_dict())
163
164 def test_add_signatures_missing(self):
165 with self.assertRaises(ValueError):
nodir 2014/12/30 22:54:01 ValueError is ambiguous. It could be anything, rea
Vadim Sh. 2014/12/31 01:27:36 Done.
166 self.service.add_signatures('a/b', 'a'*40, [self.fake_signature('key')])
167
168 def test_add_signatures(self):
169 _, registered = self.service.register_instance(
170 package_name='a/b',
171 instance_id='a'*40,
172 metadata=self.fake_metadata('another user'),
173 signatures=[])
174 self.assertTrue(registered)
175
176 expected = lambda key: {
nodir 2014/12/30 22:54:01 I always thought multi-line lambdas do not satisfy
Vadim Sh. 2014/12/31 01:27:36 Never heard of that :) Generally I avoid line wra
nodir 2015/01/02 19:13:04 https://google-styleguide.googlecode.com/svn/trunk
Vadim Sh. 2015/01/02 19:31:02 Arguably, lambda x: <dict> is one liner. I prefer
nodir 2015/01/02 20:23:23 Acknowledged.
177 'added_by': auth.Identity(kind='user', name='abc@example.com'),
178 'added_ts': datetime.datetime(2014, 1, 1, 0, 0),
179 'digest': '\x00\x01\x02\x03',
180 'hash_algo': 'SHA1',
181 'signature': 'signature \x00\x01\x02\x03',
182 'signature_algo': 'sig algo',
183 'signature_key': key,
184 }
185 expected_sig_id = lambda key: 'SHA1:sig algo:%s' % key
186
187 # Add one.
188 pkg = self.service.add_signatures(
189 'a/b', 'a'*40, [self.fake_signature('key0')])
190 self.assertEqual([expected('key0')], pkg.to_dict()['signatures'])
191 self.assertEqual([expected_sig_id('key0')], pkg.to_dict()['signature_ids'])
192
193 # Add exact same one -> no effect.
194 pkg = self.service.add_signatures(
195 'a/b', 'a'*40, [self.fake_signature('key0')])
196 self.assertEqual([expected('key0')], pkg.to_dict()['signatures'])
197 self.assertEqual([expected_sig_id('key0')], pkg.to_dict()['signature_ids'])
198
199 # Add another one.
200 pkg = self.service.add_signatures(
201 'a/b', 'a'*40, [self.fake_signature('key1')])
202 self.assertEqual(
203 [expected('key0'), expected('key1')], pkg.to_dict()['signatures'])
204 self.assertEqual(
205 [expected_sig_id('key0'), expected_sig_id('key1')],
206 pkg.to_dict()['signature_ids'])
207
208 def test_is_data_uploaded(self):
209 self.mocked_cas_service.uploaded.add(('SHA1', 'a'*40))
210 self.assertTrue(self.service.is_data_uploaded('a/b', 'a'*40))
211 self.assertFalse(self.service.is_data_uploaded('a/b', 'b'*40))
212
213 def test_create_upload_session(self):
214 upload_url, upload_session_id = self.service.create_upload_session(
215 'a/b', 'a'*40, auth.Identity.from_bytes('user:abc@example.com'))
216 self.assertEqual('http://upload_url', upload_url)
217 self.assertEqual('upload_session_id', upload_session_id)
218
219
220 class MockedCASService(object):
221 def __init__(self):
222 self.uploaded = set()
223
224 def is_object_present(self, algo, digest):
225 return (algo, digest) in self.uploaded
226
227 def create_upload_session(self, _algo, _digest, _caller):
228 class UploadSession(object):
229 upload_url = 'http://upload_url'
230 return UploadSession(), 'upload_session_id'
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698