OLD | NEW |
| (Empty) |
1 #!/usr/bin/env python | |
2 # Copyright (c) 2012 The Chromium Authors. All rights reserved. | |
3 # Use of this source code is governed by a BSD-style license that can be | |
4 # found in the LICENSE file. | |
5 | |
6 import hashlib | |
7 import json | |
8 import logging | |
9 import optparse | |
10 import os | |
11 import re | |
12 import sys | |
13 import unittest | |
14 import urllib2 | |
15 | |
16 BASE_DIR = os.path.dirname(os.path.abspath(__file__)) | |
17 | |
18 import fill | |
19 import local_gae | |
20 | |
21 | |
22 class TestCase(unittest.TestCase): | |
23 def setUp(self): | |
24 super(TestCase, self).setUp() | |
25 # Restart the server on each test. It's a bit slow but safer. | |
26 self.local_gae = local_gae.LocalGae() | |
27 self.local_gae.start_server(logging.getLogger().isEnabledFor(logging.DEBUG)) | |
28 self.url = 'http://127.0.0.1:%d/' % self.local_gae.port | |
29 self.clear_cookies() | |
30 | |
31 def tearDown(self): | |
32 if self.local_gae: | |
33 self.local_gae.stop_server() | |
34 self.local_gae = None | |
35 super(TestCase, self).tearDown() | |
36 | |
37 def get(self, suburl): | |
38 return self.local_gae.get(suburl) | |
39 | |
40 def post(self, suburl, data): | |
41 return self.local_gae.post(suburl, data) | |
42 | |
43 def clear_cookies(self): | |
44 self.local_gae.clear_cookies() | |
45 | |
46 def login(self, username, admin=False): | |
47 self.local_gae.login(username, admin) | |
48 | |
49 def set_admin_pwd(self, password): | |
50 # There will be no entities until main() has been called. So do a dummy | |
51 # request first. | |
52 hashvalue = hashlib.sha1(password).hexdigest() | |
53 try: | |
54 self.get('doesnt_exist') | |
55 except urllib2.HTTPError: | |
56 pass | |
57 | |
58 output = self.local_gae.query( | |
59 'import base_page\n' | |
60 | |
61 # First verify the default value exists. | |
62 'n = db.GqlQuery("SELECT * FROM Passwords").count()\n' | |
63 'assert n == 1, "n == 1"\n' | |
64 | |
65 # Then override its value with |password|. | |
66 'p = db.GqlQuery("SELECT * FROM Passwords").get()\n' | |
67 + ('p.password_sha1 = %r\n' % hashvalue) + | |
68 'p.put()\n' | |
69 'print db.GqlQuery("SELECT * FROM Passwords").count(),\n') | |
70 self.assertEqual(output, '1') | |
71 | |
72 def set_global_config(self, app_name, public_access): | |
73 cmd = ( | |
74 'import base_page\n' | |
75 | |
76 # Verify the default config exists. | |
77 'n = db.GqlQuery("SELECT * FROM GlobalConfig").count()\n' | |
78 'assert n == 1, "n == 1"\n' | |
79 | |
80 # Then make sure access is sane. | |
81 'config = base_page.GlobalConfig(app_name=%r)\n' % app_name + | |
82 'config.public_access = %r\n' % public_access + | |
83 'config.put()\n' | |
84 'print "ok",\n' | |
85 ) | |
86 output = self.local_gae.query(cmd) | |
87 self.assertEqual(output, 'ok') | |
88 | |
89 | |
90 class PublicTestCase(TestCase): | |
91 def setUp(self): | |
92 super(PublicTestCase, self).setUp() | |
93 self.set_global_config(app_name='bogus_app', public_access=True) | |
94 | |
95 | |
96 class StatusTest(PublicTestCase): | |
97 def test_all_status(self): | |
98 out = self.get('allstatus').splitlines() | |
99 out = [i for i in out if i] | |
100 self.assertEquals(2, len(out)) | |
101 self.assertEquals('Who,When,GeneralStatus,Message', out[0]) | |
102 self.assertTrue( | |
103 re.match('none,.+?, \d+?, .+?,open,welcome to status', out[1]), out[1]) | |
104 | |
105 def test_status(self): | |
106 self.assertEqual('1', self.get('status')) | |
107 | |
108 def test_current(self): | |
109 out = self.get('current') | |
110 self.assertTrue(100 < len(out)) | |
111 self.assertTrue(out.startswith('<html>')) | |
112 | |
113 def test_current_raw(self): | |
114 # Default value. | |
115 self.assertEqual('welcome to status', self.get('current?format=raw')) | |
116 | |
117 def test_current_json(self): | |
118 # pylint: disable=E1103 | |
119 out = json.loads(self.get('current?format=json')) | |
120 expected = [ | |
121 'date', 'username', 'message', 'general_state', 'can_commit_freely', | |
122 ] | |
123 # TODO(maruel): Test actual values. | |
124 self.assertEqual(sorted(expected), sorted(out.keys())) | |
125 | |
126 def test_status_push(self): | |
127 self.assertEqual('welcome to status', self.get('current?format=raw')) | |
128 self.assertEqual('welcome to status', self.get('current?format=raw')) | |
129 # Set a password, force status with password. | |
130 self.set_admin_pwd('bleh') | |
131 data = { | |
132 'message': 'foo', | |
133 'password': 'bleh', | |
134 'username': 'user1', | |
135 } | |
136 self.assertEqual('OK', self.post('status', data)) | |
137 self.assertEqual('foo', self.get('current?format=raw')) | |
138 data['message'] = 'bar' | |
139 data['password'] = 'wrong password' | |
140 self.assertRaises(urllib2.HTTPError, self.post, 'status', data) | |
141 # Wasn't updated since the password was wrong. | |
142 self.assertEqual('foo', self.get('current?format=raw')) | |
143 data['message'] = 'boo' | |
144 data['password'] = 'bleh' | |
145 self.assertEqual('OK', self.post('status', data)) | |
146 self.assertEqual('boo', self.get('current?format=raw')) | |
147 | |
148 def test_root(self): | |
149 self.assertTrue(100 < len(self.get(''))) | |
150 | |
151 | |
152 class LkgrTest(PublicTestCase): | |
153 def test_lkgr(self): | |
154 self.assertEqual('', self.get('lkgr')) | |
155 | |
156 def test_lkgr_set(self): | |
157 self.set_admin_pwd('bleh') | |
158 data = { | |
159 'revision': 42, | |
160 'password': 'bleh', | |
161 'success': '1', | |
162 'steps': '', | |
163 } | |
164 out = self.post('revisions', data) | |
165 self.assertEqual('', out) | |
166 self.assertEqual('42', self.get('lkgr')) | |
167 self.assertRaises(urllib2.HTTPError, self.get, 'git-lkgr') | |
168 data['git_hash'] = 'c305f265aba93cc594a0fece50346c3af7fe3301' | |
169 out = self.post('revisions', data) | |
170 self.assertEqual('', out) | |
171 self.assertEqual('c305f265aba93cc594a0fece50346c3af7fe3301', | |
172 self.get('git-lkgr')) | |
173 data['password'] = 'wrongpassword' | |
174 data['revision'] = 23 | |
175 self.assertRaises(urllib2.HTTPError, self.post, 'revisions', data) | |
176 self.assertEqual('42', self.get('lkgr')) | |
177 self.assertEqual('c305f265aba93cc594a0fece50346c3af7fe3301', | |
178 self.get('git-lkgr')) | |
179 data['password'] = 'bleh' | |
180 data['revision'] = 31337 | |
181 out = self.post('revisions', data) | |
182 self.assertEqual('', out) | |
183 self.assertEqual('31337', self.get('lkgr')) | |
184 self.assertEqual('c305f265aba93cc594a0fece50346c3af7fe3301', | |
185 self.get('git-lkgr')) | |
186 data['git_hash'] = '988881adc9fc3655077dc2d4d757d480b5ea0e11' | |
187 out = self.post('revisions', data) | |
188 self.assertEqual('', out) | |
189 self.assertEqual('31337', self.get('lkgr')) | |
190 self.assertEqual('988881adc9fc3655077dc2d4d757d480b5ea0e11', | |
191 self.get('git-lkgr')) | |
192 | |
193 | |
194 class CommitQueueTest(PublicTestCase): | |
195 def _fill(self): | |
196 # Example dump taken from a run. | |
197 total = 0 | |
198 for packet in fill.load_packets(): | |
199 total += int(self.post('cq/receiver', packet)) | |
200 self.assertEquals(9, total) | |
201 | |
202 def test_summary_json(self): | |
203 self._fill() | |
204 self.assertEquals(7, len(json.loads(self.get('cq/?format=json')))) | |
205 self.assertEquals([], json.loads(self.get('cq/doesntexist?format=json'))) | |
206 self.assertEquals( | |
207 4, | |
208 len(json.loads(self.get( | |
209 urllib2.quote('cq/bar@chromium.org') + '?format=json')))) | |
210 self.assertEquals( | |
211 3, | |
212 len(json.loads(self.get( | |
213 urllib2.quote('cq/joe@chromium.org') + '?format=json')))) | |
214 | |
215 | |
216 class AccessControl(TestCase): | |
217 def _check_post_thru_ui(self, fails=False, fails_main_page=False): | |
218 if fails_main_page: | |
219 self.assertRaises(urllib2.HTTPError, self.get, '') | |
220 self.assertRaises( | |
221 urllib2.HTTPError, self.post, '', | |
222 {'message': 'foo', 'last_status_key': 'junk'}) | |
223 else: | |
224 main_page = self.get('') | |
225 last_status_key = re.search( | |
226 r'name="last_status_key" value="(.*?)"', main_page) | |
227 if fails: | |
228 # last_status_key doesn't appear if you aren't an admin. | |
229 self.assertEqual(None, last_status_key) | |
230 self.assertRaises( | |
231 urllib2.HTTPError, self.post, '', | |
232 {'message': 'foo', 'last_status_key': 'junk'}) | |
233 else: | |
234 self.post('', {'message': 'foo', | |
235 'last_status_key': last_status_key.group(1)}) | |
236 self.assertEqual('foo', self.get('current?format=raw')) | |
237 | |
238 def _check_current_page(self, fails=False, seeks_login=False): | |
239 if fails: | |
240 self.assertRaises(urllib2.HTTPError, self.get, 'current') | |
241 elif seeks_login: | |
242 out = self.get('current') | |
243 self.assertTrue(100 < len(out)) | |
244 self.assertTrue(out.startswith('<html>')) | |
245 self.assertTrue('Login Required' in out) | |
246 else: | |
247 out = self.get('current') | |
248 self.assertTrue(100 < len(out)) | |
249 self.assertTrue(out.startswith('<html>')) | |
250 self.assertTrue('<title>Login</title>' not in out) | |
251 self.assertTrue('Login Required' not in out) | |
252 | |
253 def _check_current_raw_page(self, fails=False, seeks_login=False): | |
254 if fails: | |
255 self.assertRaises(urllib2.HTTPError, self.get, 'current?format=raw') | |
256 elif seeks_login: | |
257 out = self.get('current?format=raw') | |
258 self.assertTrue(100 < len(out)) | |
259 self.assertTrue(out.startswith('<html>')) | |
260 self.assertTrue('<title>Login</title>' in out) | |
261 else: | |
262 out = self.get('current?format=raw') | |
263 self.assertTrue(not out.startswith('<html>')) | |
264 self.assertTrue('<title>Login</title>' not in out) | |
265 self.assertTrue('Login Required' not in out) | |
266 | |
267 def _check_post_thru_status_fails(self): | |
268 self.assertRaises(urllib2.HTTPError, self.post, | |
269 'status', {'message': 'foo'}) | |
270 | |
271 def test_default_denies_chromium(self): | |
272 # Confirm default config does not allow chromium.org access. | |
273 self.login('bob@chromium.org') | |
274 self._check_current_page(fails=True) | |
275 self._check_current_raw_page(fails=True) | |
276 self._check_post_thru_ui(fails=True, fails_main_page=True) | |
277 self._check_post_thru_status_fails() | |
278 | |
279 def test_private_requires_login(self): | |
280 # Confirm private access redirects to a login screen. | |
281 self._check_current_page(seeks_login=True) | |
282 self._check_current_raw_page(seeks_login=True) | |
283 | |
284 def test_private_allows_google(self): | |
285 self.login('bob@google.com') | |
286 self._check_current_page() | |
287 self._check_current_raw_page() | |
288 self._check_post_thru_ui() | |
289 # Status, however, requires bot login. | |
290 self._check_post_thru_status_fails() | |
291 | |
292 def test_private_denies_other(self): | |
293 self.login('bob@example.com') | |
294 self._check_current_page(fails=True) | |
295 self._check_current_raw_page(fails=True) | |
296 self._check_post_thru_ui(fails=True, fails_main_page=True) | |
297 self._check_post_thru_status_fails() | |
298 | |
299 def test_public_allows_chromium(self): | |
300 self.set_global_config(app_name='foo', public_access=True) | |
301 self.login('bob@chromium.org') | |
302 self._check_current_page() | |
303 self._check_current_raw_page() | |
304 self._check_post_thru_ui() | |
305 # Status, however, requires bot login. | |
306 self._check_post_thru_status_fails() | |
307 | |
308 def test_public_is_limited(self): | |
309 self.set_global_config(app_name='foo', public_access=True) | |
310 self.login('bar@baz.com') | |
311 self._check_current_page() | |
312 self._check_current_raw_page() | |
313 self._check_post_thru_ui(fails=True) | |
314 self._check_post_thru_status_fails() | |
315 | |
316 def test_non_bot_admins_cant_forge(self): | |
317 self.login('admin@google.com') | |
318 data = { | |
319 'message': 'foo', | |
320 'username': 'bogus@google.com', | |
321 } | |
322 self.assertRaises(urllib2.HTTPError, self.post, 'status', data) | |
323 self.assertNotEqual('foo', self.get('current?format=raw')) | |
324 | |
325 def test_update_global_config(self): | |
326 """Verify updating the global config affects the active instance""" | |
327 result = self.local_gae.query( | |
328 'import base_page\n' | |
329 | |
330 # Verify the default config exists. | |
331 'n = base_page.GlobalConfig.all().count()\n' | |
332 'assert n == 1, "n == 1"\n' | |
333 | |
334 # Verify there is a config, and shows False. | |
335 'q = base_page.GlobalConfig.all()\n' | |
336 'assert q.count() == 1, "q.count() == 1"\n' | |
337 'config = q.get()\n' | |
338 'assert not config.public_access, "not config.public_access"\n' | |
339 | |
340 # Make the instance public. | |
341 'config.public_access = True\n' | |
342 'config.put()\n' | |
343 'print "ok",\n') | |
344 self.assertEqual('ok', result) | |
345 # Login and try various operations. | |
346 self.login('bob@chromium.org') | |
347 self._check_current_page() | |
348 self._check_current_raw_page() | |
349 self._check_post_thru_ui() | |
350 # Verify the config now shows True. | |
351 result = self.local_gae.query( | |
352 'import base_page\n' | |
353 'q = base_page.GlobalConfig.all()\n' | |
354 'assert q.count() == 1, "q.count() == 1"\n' | |
355 'print q.get().public_access\n') | |
356 self.assertEqual('True\n', result) | |
357 | |
358 | |
359 def _init_logging(argv): | |
360 """Set up our logging by re-using some of the unittest flags""" | |
361 parser = optparse.OptionParser() | |
362 parser.add_option('-v', action='count', default=0) | |
363 (opts, _) = parser.parse_args([x for x in argv if x.startswith('-v')]) | |
364 | |
365 levels = [logging.WARNING, logging.INFO, logging.DEBUG] | |
366 logging.basicConfig(level=levels[min(2, opts.v)]) | |
367 | |
368 | |
369 if __name__ == '__main__': | |
370 _init_logging(sys.argv) | |
371 unittest.main() | |
OLD | NEW |