| OLD | NEW |
| (Empty) |
| 1 #!/usr/bin/env python | |
| 2 # Copyright (c) 2012 The Chromium Authors. All rights reserved. | |
| 3 # Use of this source code is governed by a BSD-style license that can be | |
| 4 # found in the LICENSE file. | |
| 5 | |
| 6 import hashlib | |
| 7 import json | |
| 8 import logging | |
| 9 import optparse | |
| 10 import os | |
| 11 import re | |
| 12 import sys | |
| 13 import unittest | |
| 14 import urllib2 | |
| 15 | |
| 16 BASE_DIR = os.path.dirname(os.path.abspath(__file__)) | |
| 17 | |
| 18 import fill | |
| 19 import local_gae | |
| 20 | |
| 21 | |
| 22 class TestCase(unittest.TestCase): | |
| 23 def setUp(self): | |
| 24 super(TestCase, self).setUp() | |
| 25 # Restart the server on each test. It's a bit slow but safer. | |
| 26 self.local_gae = local_gae.LocalGae() | |
| 27 self.local_gae.start_server(logging.getLogger().isEnabledFor(logging.DEBUG)) | |
| 28 self.url = 'http://127.0.0.1:%d/' % self.local_gae.port | |
| 29 self.clear_cookies() | |
| 30 | |
| 31 def tearDown(self): | |
| 32 if self.local_gae: | |
| 33 self.local_gae.stop_server() | |
| 34 self.local_gae = None | |
| 35 super(TestCase, self).tearDown() | |
| 36 | |
| 37 def get(self, suburl): | |
| 38 return self.local_gae.get(suburl) | |
| 39 | |
| 40 def post(self, suburl, data): | |
| 41 return self.local_gae.post(suburl, data) | |
| 42 | |
| 43 def clear_cookies(self): | |
| 44 self.local_gae.clear_cookies() | |
| 45 | |
| 46 def login(self, username, admin=False): | |
| 47 self.local_gae.login(username, admin) | |
| 48 | |
| 49 def set_admin_pwd(self, password): | |
| 50 # There will be no entities until main() has been called. So do a dummy | |
| 51 # request first. | |
| 52 hashvalue = hashlib.sha1(password).hexdigest() | |
| 53 try: | |
| 54 self.get('doesnt_exist') | |
| 55 except urllib2.HTTPError: | |
| 56 pass | |
| 57 | |
| 58 output = self.local_gae.query( | |
| 59 'import base_page\n' | |
| 60 | |
| 61 # First verify the default value exists. | |
| 62 'n = db.GqlQuery("SELECT * FROM Passwords").count()\n' | |
| 63 'assert n == 1, "n == 1"\n' | |
| 64 | |
| 65 # Then override its value with |password|. | |
| 66 'p = db.GqlQuery("SELECT * FROM Passwords").get()\n' | |
| 67 + ('p.password_sha1 = %r\n' % hashvalue) + | |
| 68 'p.put()\n' | |
| 69 'print db.GqlQuery("SELECT * FROM Passwords").count(),\n') | |
| 70 self.assertEqual(output, '1') | |
| 71 | |
| 72 def set_global_config(self, app_name, public_access): | |
| 73 cmd = ( | |
| 74 'import base_page\n' | |
| 75 | |
| 76 # Verify the default config exists. | |
| 77 'n = db.GqlQuery("SELECT * FROM GlobalConfig").count()\n' | |
| 78 'assert n == 1, "n == 1"\n' | |
| 79 | |
| 80 # Then make sure access is sane. | |
| 81 'config = base_page.GlobalConfig(app_name=%r)\n' % app_name + | |
| 82 'config.public_access = %r\n' % public_access + | |
| 83 'config.put()\n' | |
| 84 'print "ok",\n' | |
| 85 ) | |
| 86 output = self.local_gae.query(cmd) | |
| 87 self.assertEqual(output, 'ok') | |
| 88 | |
| 89 | |
| 90 class PublicTestCase(TestCase): | |
| 91 def setUp(self): | |
| 92 super(PublicTestCase, self).setUp() | |
| 93 self.set_global_config(app_name='bogus_app', public_access=True) | |
| 94 | |
| 95 | |
| 96 class StatusTest(PublicTestCase): | |
| 97 def test_all_status(self): | |
| 98 out = self.get('allstatus').splitlines() | |
| 99 out = [i for i in out if i] | |
| 100 self.assertEquals(2, len(out)) | |
| 101 self.assertEquals('Who,When,GeneralStatus,Message', out[0]) | |
| 102 self.assertTrue( | |
| 103 re.match('none,.+?, \d+?, .+?,open,welcome to status', out[1]), out[1]) | |
| 104 | |
| 105 def test_status(self): | |
| 106 self.assertEqual('1', self.get('status')) | |
| 107 | |
| 108 def test_current(self): | |
| 109 out = self.get('current') | |
| 110 self.assertTrue(100 < len(out)) | |
| 111 self.assertTrue(out.startswith('<html>')) | |
| 112 | |
| 113 def test_current_raw(self): | |
| 114 # Default value. | |
| 115 self.assertEqual('welcome to status', self.get('current?format=raw')) | |
| 116 | |
| 117 def test_current_json(self): | |
| 118 # pylint: disable=E1103 | |
| 119 out = json.loads(self.get('current?format=json')) | |
| 120 expected = [ | |
| 121 'date', 'username', 'message', 'general_state', 'can_commit_freely', | |
| 122 ] | |
| 123 # TODO(maruel): Test actual values. | |
| 124 self.assertEqual(sorted(expected), sorted(out.keys())) | |
| 125 | |
| 126 def test_status_push(self): | |
| 127 self.assertEqual('welcome to status', self.get('current?format=raw')) | |
| 128 self.assertEqual('welcome to status', self.get('current?format=raw')) | |
| 129 # Set a password, force status with password. | |
| 130 self.set_admin_pwd('bleh') | |
| 131 data = { | |
| 132 'message': 'foo', | |
| 133 'password': 'bleh', | |
| 134 'username': 'user1', | |
| 135 } | |
| 136 self.assertEqual('OK', self.post('status', data)) | |
| 137 self.assertEqual('foo', self.get('current?format=raw')) | |
| 138 data['message'] = 'bar' | |
| 139 data['password'] = 'wrong password' | |
| 140 self.assertRaises(urllib2.HTTPError, self.post, 'status', data) | |
| 141 # Wasn't updated since the password was wrong. | |
| 142 self.assertEqual('foo', self.get('current?format=raw')) | |
| 143 data['message'] = 'boo' | |
| 144 data['password'] = 'bleh' | |
| 145 self.assertEqual('OK', self.post('status', data)) | |
| 146 self.assertEqual('boo', self.get('current?format=raw')) | |
| 147 | |
| 148 def test_root(self): | |
| 149 self.assertTrue(100 < len(self.get(''))) | |
| 150 | |
| 151 | |
| 152 class LkgrTest(PublicTestCase): | |
| 153 def test_lkgr(self): | |
| 154 self.assertEqual('', self.get('lkgr')) | |
| 155 | |
| 156 def test_lkgr_set(self): | |
| 157 self.set_admin_pwd('bleh') | |
| 158 data = { | |
| 159 'revision': 42, | |
| 160 'password': 'bleh', | |
| 161 'success': '1', | |
| 162 'steps': '', | |
| 163 } | |
| 164 out = self.post('revisions', data) | |
| 165 self.assertEqual('', out) | |
| 166 self.assertEqual('42', self.get('lkgr')) | |
| 167 self.assertRaises(urllib2.HTTPError, self.get, 'git-lkgr') | |
| 168 data['git_hash'] = 'c305f265aba93cc594a0fece50346c3af7fe3301' | |
| 169 out = self.post('revisions', data) | |
| 170 self.assertEqual('', out) | |
| 171 self.assertEqual('c305f265aba93cc594a0fece50346c3af7fe3301', | |
| 172 self.get('git-lkgr')) | |
| 173 data['password'] = 'wrongpassword' | |
| 174 data['revision'] = 23 | |
| 175 self.assertRaises(urllib2.HTTPError, self.post, 'revisions', data) | |
| 176 self.assertEqual('42', self.get('lkgr')) | |
| 177 self.assertEqual('c305f265aba93cc594a0fece50346c3af7fe3301', | |
| 178 self.get('git-lkgr')) | |
| 179 data['password'] = 'bleh' | |
| 180 data['revision'] = 31337 | |
| 181 out = self.post('revisions', data) | |
| 182 self.assertEqual('', out) | |
| 183 self.assertEqual('31337', self.get('lkgr')) | |
| 184 self.assertEqual('c305f265aba93cc594a0fece50346c3af7fe3301', | |
| 185 self.get('git-lkgr')) | |
| 186 data['git_hash'] = '988881adc9fc3655077dc2d4d757d480b5ea0e11' | |
| 187 out = self.post('revisions', data) | |
| 188 self.assertEqual('', out) | |
| 189 self.assertEqual('31337', self.get('lkgr')) | |
| 190 self.assertEqual('988881adc9fc3655077dc2d4d757d480b5ea0e11', | |
| 191 self.get('git-lkgr')) | |
| 192 | |
| 193 | |
| 194 class CommitQueueTest(PublicTestCase): | |
| 195 def _fill(self): | |
| 196 # Example dump taken from a run. | |
| 197 total = 0 | |
| 198 for packet in fill.load_packets(): | |
| 199 total += int(self.post('cq/receiver', packet)) | |
| 200 self.assertEquals(9, total) | |
| 201 | |
| 202 def test_summary_json(self): | |
| 203 self._fill() | |
| 204 self.assertEquals(7, len(json.loads(self.get('cq/?format=json')))) | |
| 205 self.assertEquals([], json.loads(self.get('cq/doesntexist?format=json'))) | |
| 206 self.assertEquals( | |
| 207 4, | |
| 208 len(json.loads(self.get( | |
| 209 urllib2.quote('cq/bar@chromium.org') + '?format=json')))) | |
| 210 self.assertEquals( | |
| 211 3, | |
| 212 len(json.loads(self.get( | |
| 213 urllib2.quote('cq/joe@chromium.org') + '?format=json')))) | |
| 214 | |
| 215 | |
| 216 class AccessControl(TestCase): | |
| 217 def _check_post_thru_ui(self, fails=False, fails_main_page=False): | |
| 218 if fails_main_page: | |
| 219 self.assertRaises(urllib2.HTTPError, self.get, '') | |
| 220 self.assertRaises( | |
| 221 urllib2.HTTPError, self.post, '', | |
| 222 {'message': 'foo', 'last_status_key': 'junk'}) | |
| 223 else: | |
| 224 main_page = self.get('') | |
| 225 last_status_key = re.search( | |
| 226 r'name="last_status_key" value="(.*?)"', main_page) | |
| 227 if fails: | |
| 228 # last_status_key doesn't appear if you aren't an admin. | |
| 229 self.assertEqual(None, last_status_key) | |
| 230 self.assertRaises( | |
| 231 urllib2.HTTPError, self.post, '', | |
| 232 {'message': 'foo', 'last_status_key': 'junk'}) | |
| 233 else: | |
| 234 self.post('', {'message': 'foo', | |
| 235 'last_status_key': last_status_key.group(1)}) | |
| 236 self.assertEqual('foo', self.get('current?format=raw')) | |
| 237 | |
| 238 def _check_current_page(self, fails=False, seeks_login=False): | |
| 239 if fails: | |
| 240 self.assertRaises(urllib2.HTTPError, self.get, 'current') | |
| 241 elif seeks_login: | |
| 242 out = self.get('current') | |
| 243 self.assertTrue(100 < len(out)) | |
| 244 self.assertTrue(out.startswith('<html>')) | |
| 245 self.assertTrue('Login Required' in out) | |
| 246 else: | |
| 247 out = self.get('current') | |
| 248 self.assertTrue(100 < len(out)) | |
| 249 self.assertTrue(out.startswith('<html>')) | |
| 250 self.assertTrue('<title>Login</title>' not in out) | |
| 251 self.assertTrue('Login Required' not in out) | |
| 252 | |
| 253 def _check_current_raw_page(self, fails=False, seeks_login=False): | |
| 254 if fails: | |
| 255 self.assertRaises(urllib2.HTTPError, self.get, 'current?format=raw') | |
| 256 elif seeks_login: | |
| 257 out = self.get('current?format=raw') | |
| 258 self.assertTrue(100 < len(out)) | |
| 259 self.assertTrue(out.startswith('<html>')) | |
| 260 self.assertTrue('<title>Login</title>' in out) | |
| 261 else: | |
| 262 out = self.get('current?format=raw') | |
| 263 self.assertTrue(not out.startswith('<html>')) | |
| 264 self.assertTrue('<title>Login</title>' not in out) | |
| 265 self.assertTrue('Login Required' not in out) | |
| 266 | |
| 267 def _check_post_thru_status_fails(self): | |
| 268 self.assertRaises(urllib2.HTTPError, self.post, | |
| 269 'status', {'message': 'foo'}) | |
| 270 | |
| 271 def test_default_denies_chromium(self): | |
| 272 # Confirm default config does not allow chromium.org access. | |
| 273 self.login('bob@chromium.org') | |
| 274 self._check_current_page(fails=True) | |
| 275 self._check_current_raw_page(fails=True) | |
| 276 self._check_post_thru_ui(fails=True, fails_main_page=True) | |
| 277 self._check_post_thru_status_fails() | |
| 278 | |
| 279 def test_private_requires_login(self): | |
| 280 # Confirm private access redirects to a login screen. | |
| 281 self._check_current_page(seeks_login=True) | |
| 282 self._check_current_raw_page(seeks_login=True) | |
| 283 | |
| 284 def test_private_allows_google(self): | |
| 285 self.login('bob@google.com') | |
| 286 self._check_current_page() | |
| 287 self._check_current_raw_page() | |
| 288 self._check_post_thru_ui() | |
| 289 # Status, however, requires bot login. | |
| 290 self._check_post_thru_status_fails() | |
| 291 | |
| 292 def test_private_denies_other(self): | |
| 293 self.login('bob@example.com') | |
| 294 self._check_current_page(fails=True) | |
| 295 self._check_current_raw_page(fails=True) | |
| 296 self._check_post_thru_ui(fails=True, fails_main_page=True) | |
| 297 self._check_post_thru_status_fails() | |
| 298 | |
| 299 def test_public_allows_chromium(self): | |
| 300 self.set_global_config(app_name='foo', public_access=True) | |
| 301 self.login('bob@chromium.org') | |
| 302 self._check_current_page() | |
| 303 self._check_current_raw_page() | |
| 304 self._check_post_thru_ui() | |
| 305 # Status, however, requires bot login. | |
| 306 self._check_post_thru_status_fails() | |
| 307 | |
| 308 def test_public_is_limited(self): | |
| 309 self.set_global_config(app_name='foo', public_access=True) | |
| 310 self.login('bar@baz.com') | |
| 311 self._check_current_page() | |
| 312 self._check_current_raw_page() | |
| 313 self._check_post_thru_ui(fails=True) | |
| 314 self._check_post_thru_status_fails() | |
| 315 | |
| 316 def test_non_bot_admins_cant_forge(self): | |
| 317 self.login('admin@google.com') | |
| 318 data = { | |
| 319 'message': 'foo', | |
| 320 'username': 'bogus@google.com', | |
| 321 } | |
| 322 self.assertRaises(urllib2.HTTPError, self.post, 'status', data) | |
| 323 self.assertNotEqual('foo', self.get('current?format=raw')) | |
| 324 | |
| 325 def test_update_global_config(self): | |
| 326 """Verify updating the global config affects the active instance""" | |
| 327 result = self.local_gae.query( | |
| 328 'import base_page\n' | |
| 329 | |
| 330 # Verify the default config exists. | |
| 331 'n = base_page.GlobalConfig.all().count()\n' | |
| 332 'assert n == 1, "n == 1"\n' | |
| 333 | |
| 334 # Verify there is a config, and shows False. | |
| 335 'q = base_page.GlobalConfig.all()\n' | |
| 336 'assert q.count() == 1, "q.count() == 1"\n' | |
| 337 'config = q.get()\n' | |
| 338 'assert not config.public_access, "not config.public_access"\n' | |
| 339 | |
| 340 # Make the instance public. | |
| 341 'config.public_access = True\n' | |
| 342 'config.put()\n' | |
| 343 'print "ok",\n') | |
| 344 self.assertEqual('ok', result) | |
| 345 # Login and try various operations. | |
| 346 self.login('bob@chromium.org') | |
| 347 self._check_current_page() | |
| 348 self._check_current_raw_page() | |
| 349 self._check_post_thru_ui() | |
| 350 # Verify the config now shows True. | |
| 351 result = self.local_gae.query( | |
| 352 'import base_page\n' | |
| 353 'q = base_page.GlobalConfig.all()\n' | |
| 354 'assert q.count() == 1, "q.count() == 1"\n' | |
| 355 'print q.get().public_access\n') | |
| 356 self.assertEqual('True\n', result) | |
| 357 | |
| 358 | |
| 359 def _init_logging(argv): | |
| 360 """Set up our logging by re-using some of the unittest flags""" | |
| 361 parser = optparse.OptionParser() | |
| 362 parser.add_option('-v', action='count', default=0) | |
| 363 (opts, _) = parser.parse_args([x for x in argv if x.startswith('-v')]) | |
| 364 | |
| 365 levels = [logging.WARNING, logging.INFO, logging.DEBUG] | |
| 366 logging.basicConfig(level=levels[min(2, opts.v)]) | |
| 367 | |
| 368 | |
| 369 if __name__ == '__main__': | |
| 370 _init_logging(sys.argv) | |
| 371 unittest.main() | |
| OLD | NEW |