OLD | NEW |
---|---|
(Empty) | |
1 # Copyright 2014 The Chromium Authors. All rights reserved. | |
2 # Use of this source code is governed by a BSD-style license that can be | |
3 # found in the LICENSE file. | |
4 | |
5 import datetime | |
6 import unittest | |
7 | |
8 from google.appengine.ext import ndb | |
9 from testing_utils import testing | |
10 | |
11 from components import auth | |
12 | |
13 from cipd import impl | |
14 | |
15 | |
16 class TestValidators(unittest.TestCase): | |
17 def test_is_valid_package_name(self): | |
18 self.assertTrue(impl.is_valid_package_name('a')) | |
19 self.assertTrue(impl.is_valid_package_name('a/b')) | |
20 self.assertTrue(impl.is_valid_package_name('a/b/c/1/2/3')) | |
21 self.assertTrue(impl.is_valid_package_name('infra/tools/cipd')) | |
22 self.assertTrue(impl.is_valid_package_name('./-/_')) | |
23 self.assertFalse(impl.is_valid_package_name('')) | |
24 self.assertFalse(impl.is_valid_package_name('/a')) | |
25 self.assertFalse(impl.is_valid_package_name('a/')) | |
26 self.assertFalse(impl.is_valid_package_name('A')) | |
27 self.assertFalse(impl.is_valid_package_name('a/B')) | |
28 self.assertFalse(impl.is_valid_package_name('a\\b')) | |
29 | |
30 def test_is_valid_instance_id(self): | |
31 self.assertTrue(impl.is_valid_instance_id('a'*40)) | |
32 self.assertFalse(impl.is_valid_instance_id('')) | |
33 self.assertFalse(impl.is_valid_instance_id('A'*40)) | |
34 | |
35 | |
36 class TestRepoService(testing.AppengineTestCase): | |
37 def setUp(self): | |
38 super(TestRepoService, self).setUp() | |
39 self.mocked_cas_service = MockedCASService() | |
40 self.mock(impl.cas, 'get_cas_service', lambda: self.mocked_cas_service) | |
41 self.service = impl.get_repo_service() | |
42 | |
43 @staticmethod | |
44 def fake_metadata(user='user'): | |
45 return impl.PackageInstanceMetadata( | |
46 date='Jan 1 2014', | |
47 hostname='hostname', | |
48 user=user, | |
49 registered_by=auth.Identity.from_bytes('user:abc@example.com'), | |
50 registered_ts=datetime.datetime(2014, 1, 1)) | |
51 | |
52 @staticmethod | |
53 def fake_signature(key): | |
54 return impl.PackageInstanceSignature( | |
55 hash_algo='SHA1', | |
56 digest='\x00\x01\x02\x03', | |
57 signature_algo='sig algo', | |
58 signature_key=key, | |
59 signature='signature \x00\x01\x02\x03', | |
60 added_by=auth.Identity.from_bytes('user:abc@example.com'), | |
61 added_ts=datetime.datetime(2014, 1, 1)) | |
62 | |
63 def test_register_new(self): | |
64 pkg, registered = self.service.register_instance( | |
65 package_name='a/b', | |
66 instance_id='a'*40, | |
67 metadata=self.fake_metadata(), | |
68 signatures=[self.fake_signature('key 1'), self.fake_signature('key 2')]) | |
69 self.assertTrue(registered) | |
70 self.assertEqual( | |
71 ndb.Key('Package', 'a/b', 'PackageInstance', 'a'*40), pkg.key) | |
72 expected = { | |
73 'metadata': { | |
74 'date': 'Jan 1 2014', | |
75 'hostname': 'hostname', | |
76 'registered_by': auth.Identity(kind='user', name='abc@example.com'), | |
77 'registered_ts': datetime.datetime(2014, 1, 1, 0, 0), | |
78 'user': 'user', | |
79 }, | |
80 'signature_ids': ['SHA1:sig algo:key 1', 'SHA1:sig algo:key 2'], | |
81 'signatures': [ | |
82 { | |
83 'added_by': auth.Identity(kind='user', name='abc@example.com'), | |
84 'added_ts': datetime.datetime(2014, 1, 1, 0, 0), | |
85 'digest': '\x00\x01\x02\x03', | |
86 'hash_algo': 'SHA1', | |
87 'signature': 'signature \x00\x01\x02\x03', | |
88 'signature_algo': 'sig algo', | |
89 'signature_key': 'key 1', | |
90 }, | |
91 { | |
92 'added_by': auth.Identity(kind='user', name='abc@example.com'), | |
93 'added_ts': datetime.datetime(2014, 1, 1, 0, 0), | |
94 'digest': '\x00\x01\x02\x03', | |
95 'hash_algo': 'SHA1', | |
96 'signature': 'signature \x00\x01\x02\x03', | |
97 'signature_algo': 'sig algo', | |
98 'signature_key': 'key 2', | |
99 }, | |
100 ], | |
101 } | |
102 self.assertEqual(expected, pkg.to_dict()) | |
103 self.assertEqual( | |
104 expected, self.service.get_instance('a/b', 'a'*40).to_dict()) | |
105 | |
106 def test_register_existing(self): | |
107 # First register a package. | |
108 _, registered = self.service.register_instance( | |
109 package_name='a/b', | |
110 instance_id='a'*40, | |
111 metadata=self.fake_metadata('user'), | |
112 signatures=[self.fake_signature('key1')]) | |
113 self.assertTrue(registered) | |
114 | |
115 # Try to register it again, with more signatures. | |
116 pkg, registered = self.service.register_instance( | |
117 package_name='a/b', | |
118 instance_id='a'*40, | |
119 metadata=self.fake_metadata('another user'), | |
120 signatures=[self.fake_signature('key0'), self.fake_signature('key1')]) | |
121 self.assertFalse(registered) | |
122 # Original metadata is preserved. Signatures are merged. | |
123 expected = { | |
124 'metadata': { | |
125 'date': 'Jan 1 2014', | |
126 'hostname': 'hostname', | |
127 'registered_by': auth.Identity(kind='user', name='abc@example.com'), | |
128 'registered_ts': datetime.datetime(2014, 1, 1, 0, 0), | |
129 'user': 'user', | |
130 }, | |
131 'signature_ids': ['SHA1:sig algo:key1', 'SHA1:sig algo:key0'], | |
132 'signatures': [ | |
133 { | |
134 'added_by': auth.Identity(kind='user', name='abc@example.com'), | |
135 'added_ts': datetime.datetime(2014, 1, 1, 0, 0), | |
136 'digest': '\x00\x01\x02\x03', | |
137 'hash_algo': 'SHA1', | |
138 'signature': 'signature \x00\x01\x02\x03', | |
139 'signature_algo': 'sig algo', | |
140 'signature_key': 'key1', | |
141 }, | |
142 { | |
143 'added_by': auth.Identity(kind='user', name='abc@example.com'), | |
144 'added_ts': datetime.datetime(2014, 1, 1, 0, 0), | |
145 'digest': '\x00\x01\x02\x03', | |
146 'hash_algo': 'SHA1', | |
147 'signature': 'signature \x00\x01\x02\x03', | |
148 'signature_algo': 'sig algo', | |
149 'signature_key': 'key0', | |
150 }, | |
151 ], | |
152 } | |
153 self.assertEqual(expected, pkg.to_dict()) | |
154 | |
155 # Again, exact same call. No changes expected. | |
156 pkg, registered = self.service.register_instance( | |
157 package_name='a/b', | |
158 instance_id='a'*40, | |
159 metadata=self.fake_metadata('another user'), | |
160 signatures=[self.fake_signature('key0'), self.fake_signature('key1')]) | |
161 self.assertFalse(registered) | |
162 self.assertEqual(expected, pkg.to_dict()) | |
163 | |
164 def test_add_signatures_missing(self): | |
165 with self.assertRaises(ValueError): | |
nodir
2014/12/30 22:54:01
ValueError is ambiguous. It could be anything, rea
Vadim Sh.
2014/12/31 01:27:36
Done.
| |
166 self.service.add_signatures('a/b', 'a'*40, [self.fake_signature('key')]) | |
167 | |
168 def test_add_signatures(self): | |
169 _, registered = self.service.register_instance( | |
170 package_name='a/b', | |
171 instance_id='a'*40, | |
172 metadata=self.fake_metadata('another user'), | |
173 signatures=[]) | |
174 self.assertTrue(registered) | |
175 | |
176 expected = lambda key: { | |
nodir
2014/12/30 22:54:01
I always thought multi-line lambdas do not satisfy
Vadim Sh.
2014/12/31 01:27:36
Never heard of that :)
Generally I avoid line wra
nodir
2015/01/02 19:13:04
https://google-styleguide.googlecode.com/svn/trunk
Vadim Sh.
2015/01/02 19:31:02
Arguably, lambda x: <dict> is one liner. I prefer
nodir
2015/01/02 20:23:23
Acknowledged.
| |
177 'added_by': auth.Identity(kind='user', name='abc@example.com'), | |
178 'added_ts': datetime.datetime(2014, 1, 1, 0, 0), | |
179 'digest': '\x00\x01\x02\x03', | |
180 'hash_algo': 'SHA1', | |
181 'signature': 'signature \x00\x01\x02\x03', | |
182 'signature_algo': 'sig algo', | |
183 'signature_key': key, | |
184 } | |
185 expected_sig_id = lambda key: 'SHA1:sig algo:%s' % key | |
186 | |
187 # Add one. | |
188 pkg = self.service.add_signatures( | |
189 'a/b', 'a'*40, [self.fake_signature('key0')]) | |
190 self.assertEqual([expected('key0')], pkg.to_dict()['signatures']) | |
191 self.assertEqual([expected_sig_id('key0')], pkg.to_dict()['signature_ids']) | |
192 | |
193 # Add exact same one -> no effect. | |
194 pkg = self.service.add_signatures( | |
195 'a/b', 'a'*40, [self.fake_signature('key0')]) | |
196 self.assertEqual([expected('key0')], pkg.to_dict()['signatures']) | |
197 self.assertEqual([expected_sig_id('key0')], pkg.to_dict()['signature_ids']) | |
198 | |
199 # Add another one. | |
200 pkg = self.service.add_signatures( | |
201 'a/b', 'a'*40, [self.fake_signature('key1')]) | |
202 self.assertEqual( | |
203 [expected('key0'), expected('key1')], pkg.to_dict()['signatures']) | |
204 self.assertEqual( | |
205 [expected_sig_id('key0'), expected_sig_id('key1')], | |
206 pkg.to_dict()['signature_ids']) | |
207 | |
208 def test_is_data_uploaded(self): | |
209 self.mocked_cas_service.uploaded.add(('SHA1', 'a'*40)) | |
210 self.assertTrue(self.service.is_data_uploaded('a/b', 'a'*40)) | |
211 self.assertFalse(self.service.is_data_uploaded('a/b', 'b'*40)) | |
212 | |
213 def test_create_upload_session(self): | |
214 upload_url, upload_session_id = self.service.create_upload_session( | |
215 'a/b', 'a'*40, auth.Identity.from_bytes('user:abc@example.com')) | |
216 self.assertEqual('http://upload_url', upload_url) | |
217 self.assertEqual('upload_session_id', upload_session_id) | |
218 | |
219 | |
220 class MockedCASService(object): | |
221 def __init__(self): | |
222 self.uploaded = set() | |
223 | |
224 def is_object_present(self, algo, digest): | |
225 return (algo, digest) in self.uploaded | |
226 | |
227 def create_upload_session(self, _algo, _digest, _caller): | |
228 class UploadSession(object): | |
229 upload_url = 'http://upload_url' | |
230 return UploadSession(), 'upload_session_id' | |
OLD | NEW |