Index: appengine/monorail/framework/test/monorailrequest_test.py |
diff --git a/appengine/monorail/framework/test/monorailrequest_test.py b/appengine/monorail/framework/test/monorailrequest_test.py |
new file mode 100644 |
index 0000000000000000000000000000000000000000..6a94c936927b8aab50a9066b16bdffbae875ff19 |
--- /dev/null |
+++ b/appengine/monorail/framework/test/monorailrequest_test.py |
@@ -0,0 +1,413 @@ |
+# Copyright 2016 The Chromium Authors. All rights reserved. |
+# Use of this source code is govered by a BSD-style |
+# license that can be found in the LICENSE file or at |
+# https://developers.google.com/open-source/licenses/bsd |
+ |
+"""Unit tests for the monorailrequest module.""" |
+ |
+import re |
+import unittest |
+ |
+import mox |
+ |
+from google.appengine.api import users |
+ |
+import webapp2 |
+ |
+from framework import monorailrequest |
+from framework import permissions |
+from framework import profiler |
+from proto import project_pb2 |
+from proto import tracker_pb2 |
+from services import service_manager |
+from testing import fake |
+from testing import testing_helpers |
+from tracker import tracker_constants |
+ |
+ |
+class HostportReTest(unittest.TestCase): |
+ |
+ def testGood(self): |
+ test_data = [ |
+ 'localhost:8080', |
+ 'app.appspot.com', |
+ 'bugs-staging.chromium.org', |
+ 'vers10n-h3x-dot-app-id.appspot.com', |
+ ] |
+ for hostport in test_data: |
+ self.assertTrue(monorailrequest._HOSTPORT_RE.match(hostport), |
+ msg='Incorrectly rejected %r' % hostport) |
+ |
+ def testBad(self): |
+ test_data = [ |
+ '', |
+ ' ', |
+ '\t', |
+ '\n', |
+ '\'', |
+ '"', |
+ 'version"cruft-dot-app-id.appspot.com', |
+ '\nother header', |
+ 'version&cruft-dot-app-id.appspot.com', |
+ ] |
+ for hostport in test_data: |
+ self.assertFalse(monorailrequest._HOSTPORT_RE.match(hostport), |
+ msg='Incorrectly accepted %r' % hostport) |
+ |
+class AuthDataTest(unittest.TestCase): |
+ |
+ def setUp(self): |
+ self.mox = mox.Mox() |
+ |
+ def tearDown(self): |
+ self.mox.UnsetStubs() |
+ |
+ def testGetUserID(self): |
+ pass # TODO(jrobbins): re-impement |
+ |
+ def testExamineRequestUserID(self): |
+ pass # TODO(jrobbins): re-implement |
+ |
+ |
+class MonorailRequestUnitTest(unittest.TestCase): |
+ |
+ def setUp(self): |
+ self.services = service_manager.Services( |
+ project=fake.ProjectService(), |
+ user=fake.UserService(), |
+ usergroup=fake.UserGroupService()) |
+ self.project = self.services.project.TestAddProject('proj') |
+ self.services.user.TestAddUser('jrobbins@example.com', 111) |
+ |
+ self.profiler = profiler.Profiler() |
+ self.mox = mox.Mox() |
+ self.mox.StubOutWithMock(users, 'get_current_user') |
+ users.get_current_user().AndReturn(None) |
+ self.mox.ReplayAll() |
+ |
+ def tearDown(self): |
+ self.mox.UnsetStubs() |
+ |
+ def testGetIntParamConvertsQueryParamToInt(self): |
+ notice_id = 12345 |
+ mr = testing_helpers.MakeMonorailRequest( |
+ path='/foo?notice=%s' % notice_id) |
+ |
+ value = mr.GetIntParam('notice') |
+ self.assert_(isinstance(value, int)) |
+ self.assertEqual(notice_id, value) |
+ |
+ def testGetIntParamConvertsQueryParamToLong(self): |
+ notice_id = 12345678901234567890 |
+ mr = testing_helpers.MakeMonorailRequest( |
+ path='/foo?notice=%s' % notice_id) |
+ |
+ value = mr.GetIntParam('notice') |
+ self.assertTrue(isinstance(value, long)) |
+ self.assertEqual(notice_id, value) |
+ |
+ def testGetIntListParamNoParam(self): |
+ mr = monorailrequest.MonorailRequest() |
+ mr.ParseRequest( |
+ webapp2.Request.blank('servlet'), self.services, self.profiler) |
+ self.assertEquals(mr.GetIntListParam('ids'), None) |
+ self.assertEquals(mr.GetIntListParam('ids', default_value=['test']), |
+ ['test']) |
+ |
+ def testGetIntListParamOneValue(self): |
+ mr = monorailrequest.MonorailRequest() |
+ mr.ParseRequest( |
+ webapp2.Request.blank('servlet?ids=11'), self.services, self.profiler) |
+ self.assertEquals(mr.GetIntListParam('ids'), [11]) |
+ self.assertEquals(mr.GetIntListParam('ids', default_value=['test']), |
+ [11]) |
+ |
+ def testGetIntListParamMultiValue(self): |
+ mr = monorailrequest.MonorailRequest() |
+ mr.ParseRequest( |
+ webapp2.Request.blank('servlet?ids=21,22,23'), self.services, |
+ self.profiler) |
+ self.assertEquals(mr.GetIntListParam('ids'), [21, 22, 23]) |
+ self.assertEquals(mr.GetIntListParam('ids', default_value=['test']), |
+ [21, 22, 23]) |
+ |
+ def testGetIntListParamBogusValue(self): |
+ mr = monorailrequest.MonorailRequest() |
+ mr.ParseRequest( |
+ webapp2.Request.blank('servlet?ids=not_an_int'), self.services, |
+ self.profiler) |
+ self.assertEquals(mr.GetIntListParam('ids'), None) |
+ self.assertEquals(mr.GetIntListParam('ids', default_value=['test']), |
+ ['test']) |
+ |
+ def testGetIntListParamMalformed(self): |
+ mr = monorailrequest.MonorailRequest() |
+ mr.ParseRequest( |
+ webapp2.Request.blank('servlet?ids=31,32,,'), self.services, |
+ self.profiler) |
+ self.assertEquals(mr.GetIntListParam('ids'), None) |
+ self.assertEquals(mr.GetIntListParam('ids', default_value=['test']), |
+ ['test']) |
+ |
+ def testDefaultValuesNoUrl(self): |
+ """If request has no param, default param values should be used.""" |
+ mr = monorailrequest.MonorailRequest() |
+ mr.ParseRequest( |
+ webapp2.Request.blank('servlet'), self.services, self.profiler) |
+ self.assertEquals(mr.GetParam('r', 3), 3) |
+ self.assertEquals(mr.GetIntParam('r', 3), 3) |
+ self.assertEquals(mr.GetPositiveIntParam('r', 3), 3) |
+ self.assertEquals(mr.GetIntListParam('r', [3, 4]), [3, 4]) |
+ |
+ def _MRWithMockRequest( |
+ self, path, headers=None, *mr_args, **mr_kwargs): |
+ request = webapp2.Request.blank(path, headers=headers) |
+ mr = monorailrequest.MonorailRequest(*mr_args, **mr_kwargs) |
+ mr.ParseRequest(request, self.services, self.profiler) |
+ return mr |
+ |
+ def testParseQueryParameters(self): |
+ mr = self._MRWithMockRequest( |
+ '/p/proj/issues/list?q=foo+OR+bar&num=50') |
+ self.assertEquals('foo OR bar', mr.query) |
+ self.assertEquals(50, mr.num) |
+ |
+ def testParseRequest_Scheme(self): |
+ mr = self._MRWithMockRequest('/p/proj/') |
+ self.assertEquals('http', mr.request.scheme) |
+ |
+ def testParseRequest_HostportAndCurrentPageURL(self): |
+ mr = self._MRWithMockRequest('/p/proj/', headers={ |
+ 'Host': 'example.com', |
+ 'Cookie': 'asdf', |
+ }) |
+ self.assertEquals('http', mr.request.scheme) |
+ self.assertEquals('example.com', mr.request.host) |
+ self.assertEquals('http://example.com/p/proj/', mr.current_page_url) |
+ |
+ def testViewedUser_WithEmail(self): |
+ mr = self._MRWithMockRequest('/u/jrobbins@example.com/') |
+ self.assertEquals('jrobbins@example.com', mr.viewed_username) |
+ self.assertEquals(111, mr.viewed_user_auth.user_id) |
+ self.assertEquals( |
+ self.services.user.GetUser('fake cnxn', 111), |
+ mr.viewed_user_auth.user_pb) |
+ |
+ def testViewedUser_WithUserID(self): |
+ mr = self._MRWithMockRequest('/u/111/') |
+ self.assertEquals('jrobbins@example.com', mr.viewed_username) |
+ self.assertEquals(111, mr.viewed_user_auth.user_id) |
+ self.assertEquals( |
+ self.services.user.GetUser('fake cnxn', 111), |
+ mr.viewed_user_auth.user_pb) |
+ |
+ def testViewedUser_NoSuchEmail(self): |
+ try: |
+ self._MRWithMockRequest('/u/unknownuser@example.com/') |
+ self.fail() |
+ except webapp2.HTTPException as e: |
+ self.assertEquals(404, e.code) |
+ |
+ def testViewedUser_NoSuchUserID(self): |
+ with self.assertRaises(webapp2.HTTPException) as e: |
+ self._MRWithMockRequest('/u/234521111/') |
+ self.assertEquals(404, e.code) |
+ |
+ def testGetParam(self): |
+ mr = testing_helpers.MakeMonorailRequest( |
+ path='/foo?syn=error!&a=a&empty=', |
+ params=dict(over1='over_value1', over2='over_value2')) |
+ |
+ # test tampering |
+ self.assertRaises(monorailrequest.InputException, mr.GetParam, 'a', |
+ antitamper_re=re.compile(r'^$')) |
+ self.assertRaises(monorailrequest.InputException, mr.GetParam, |
+ 'undefined', default_value='default', |
+ antitamper_re=re.compile(r'^$')) |
+ |
+ # test empty value |
+ self.assertEquals('', mr.GetParam( |
+ 'empty', default_value='default', antitamper_re=re.compile(r'^$'))) |
+ |
+ # test default |
+ self.assertEquals('default', mr.GetParam( |
+ 'undefined', default_value='default')) |
+ |
+ def testComputeColSpec(self): |
+ # No config passed, and nothing in URL |
+ mr = testing_helpers.MakeMonorailRequest( |
+ path='/p/proj/issues/detail?id=123') |
+ mr.ComputeColSpec(None) |
+ self.assertEquals(tracker_constants.DEFAULT_COL_SPEC, mr.col_spec) |
+ |
+ # No config passed, but set in URL |
+ mr = testing_helpers.MakeMonorailRequest( |
+ path='/p/proj/issues/detail?id=123&colspec=a b C') |
+ mr.ComputeColSpec(None) |
+ self.assertEquals('a b C', mr.col_spec) |
+ |
+ config = tracker_pb2.ProjectIssueConfig() |
+ |
+ # No default in the config, and nothing in URL |
+ mr = testing_helpers.MakeMonorailRequest( |
+ path='/p/proj/issues/detail?id=123') |
+ mr.ComputeColSpec(config) |
+ self.assertEquals(tracker_constants.DEFAULT_COL_SPEC, mr.col_spec) |
+ |
+ # No default in the config, but set in URL |
+ mr = testing_helpers.MakeMonorailRequest( |
+ path='/p/proj/issues/detail?id=123&colspec=a b C') |
+ mr.ComputeColSpec(config) |
+ self.assertEquals('a b C', mr.col_spec) |
+ |
+ config.default_col_spec = 'd e f' |
+ |
+ # Default in the config, and nothing in URL |
+ mr = testing_helpers.MakeMonorailRequest( |
+ path='/p/proj/issues/detail?id=123') |
+ mr.ComputeColSpec(config) |
+ self.assertEquals('d e f', mr.col_spec) |
+ |
+ # Default in the config, but overrided via URL |
+ mr = testing_helpers.MakeMonorailRequest( |
+ path='/p/proj/issues/detail?id=123&colspec=a b C') |
+ mr.ComputeColSpec(config) |
+ self.assertEquals('a b C', mr.col_spec) |
+ |
+ def testComputeColSpec_XSS(self): |
+ config_1 = tracker_pb2.ProjectIssueConfig() |
+ config_2 = tracker_pb2.ProjectIssueConfig() |
+ config_2.default_col_spec = "id '+alert(1)+'" |
+ mr_1 = testing_helpers.MakeMonorailRequest( |
+ path='/p/proj/issues/detail?id=123') |
+ mr_2 = testing_helpers.MakeMonorailRequest( |
+ path="/p/proj/issues/detail?id=123&colspec=id '+alert(1)+'") |
+ |
+ # Normal colspec in config but malicious request |
+ self.assertRaises( |
+ monorailrequest.InputException, |
+ mr_2.ComputeColSpec, config_1) |
+ |
+ # Malicious colspec in config but normal request |
+ self.assertRaises( |
+ monorailrequest.InputException, |
+ mr_1.ComputeColSpec, config_2) |
+ |
+ # Malicious colspec in config and malicious request |
+ self.assertRaises( |
+ monorailrequest.InputException, |
+ mr_2.ComputeColSpec, config_2) |
+ |
+ |
+class TestMonorailRequestFunctions(unittest.TestCase): |
+ |
+ def testExtractPathIdenifiers_ProjectOnly(self): |
+ username, project_name = monorailrequest._ParsePathIdentifiers( |
+ '/p/proj/issues/list?q=foo+OR+bar&ts=1234') |
+ self.assertIsNone(username) |
+ self.assertEquals('proj', project_name) |
+ |
+ def testExtractPathIdenifiers_ViewedUserOnly(self): |
+ username, project_name = monorailrequest._ParsePathIdentifiers( |
+ '/u/jrobbins@example.com/') |
+ self.assertEquals('jrobbins@example.com', username) |
+ self.assertIsNone(project_name) |
+ |
+ def testExtractPathIdenifiers_ViewedUserURLSpace(self): |
+ username, project_name = monorailrequest._ParsePathIdentifiers( |
+ '/u/jrobbins@example.com/updates') |
+ self.assertEquals('jrobbins@example.com', username) |
+ self.assertIsNone(project_name) |
+ |
+ def testExtractPathIdenifiers_ViewedGroupURLSpace(self): |
+ username, project_name = monorailrequest._ParsePathIdentifiers( |
+ '/g/user-group@example.com/updates') |
+ self.assertEquals('user-group@example.com', username) |
+ self.assertIsNone(project_name) |
+ |
+ def testParseColSpec(self): |
+ parse = monorailrequest.ParseColSpec |
+ self.assertEqual(['PageName', 'Summary', 'Changed', 'ChangedBy'], |
+ parse(u'PageName Summary Changed ChangedBy')) |
+ self.assertEqual(['Foo-Bar', 'Foo-Bar-Baz', 'Release-1.2', 'Hey', 'There'], |
+ parse('Foo-Bar Foo-Bar-Baz Release-1.2 Hey!There')) |
+ self.assertEqual( |
+ ['\xe7\xaa\xbf\xe8\x8b\xa5\xe7\xb9\xb9'.decode('utf-8'), |
+ '\xe5\x9f\xba\xe5\x9c\xb0\xe3\x81\xaf'.decode('utf-8')], |
+ parse('\xe7\xaa\xbf\xe8\x8b\xa5\xe7\xb9\xb9 ' |
+ '\xe5\x9f\xba\xe5\x9c\xb0\xe3\x81\xaf'.decode('utf-8'))) |
+ |
+ |
+class TestPermissionLookup(unittest.TestCase): |
+ OWNER_ID = 1 |
+ OTHER_USER_ID = 2 |
+ |
+ def setUp(self): |
+ self.services = service_manager.Services( |
+ project=fake.ProjectService(), |
+ user=fake.UserService(), |
+ usergroup=fake.UserGroupService()) |
+ self.services.user.TestAddUser('owner@gmail.com', self.OWNER_ID) |
+ self.services.user.TestAddUser('user@gmail.com', self.OTHER_USER_ID) |
+ self.live_project = self.services.project.TestAddProject( |
+ 'live', owner_ids=[self.OWNER_ID]) |
+ self.archived_project = self.services.project.TestAddProject( |
+ 'archived', owner_ids=[self.OWNER_ID], |
+ state=project_pb2.ProjectState.ARCHIVED) |
+ self.members_only_project = self.services.project.TestAddProject( |
+ 'members-only', owner_ids=[self.OWNER_ID], |
+ access=project_pb2.ProjectAccess.MEMBERS_ONLY) |
+ |
+ self.mox = mox.Mox() |
+ |
+ def tearDown(self): |
+ self.mox.UnsetStubs() |
+ |
+ def CheckPermissions(self, perms, expect_view, expect_commit, expect_edit): |
+ may_view = perms.HasPerm(permissions.VIEW, None, None) |
+ self.assertEqual(expect_view, may_view) |
+ may_commit = perms.HasPerm(permissions.COMMIT, None, None) |
+ self.assertEqual(expect_commit, may_commit) |
+ may_edit = perms.HasPerm(permissions.EDIT_PROJECT, None, None) |
+ self.assertEqual(expect_edit, may_edit) |
+ |
+ def MakeRequestAsUser(self, project_name, email): |
+ self.mox.StubOutWithMock(users, 'get_current_user') |
+ users.get_current_user().AndReturn(testing_helpers.Blank( |
+ email=lambda: email)) |
+ self.mox.ReplayAll() |
+ |
+ request = webapp2.Request.blank('/p/' + project_name) |
+ mr = monorailrequest.MonorailRequest() |
+ prof = profiler.Profiler() |
+ with prof.Phase('parse user info'): |
+ mr.ParseRequest(request, self.services, prof) |
+ return mr |
+ |
+ def testOwnerPermissions_Live(self): |
+ mr = self.MakeRequestAsUser('live', 'owner@gmail.com') |
+ self.CheckPermissions(mr.perms, True, True, True) |
+ |
+ def testOwnerPermissions_Archived(self): |
+ mr = self.MakeRequestAsUser('archived', 'owner@gmail.com') |
+ self.CheckPermissions(mr.perms, True, False, True) |
+ |
+ def testOwnerPermissions_MembersOnly(self): |
+ mr = self.MakeRequestAsUser('members-only', 'owner@gmail.com') |
+ self.CheckPermissions(mr.perms, True, True, True) |
+ |
+ def testExternalUserPermissions_Live(self): |
+ mr = self.MakeRequestAsUser('live', 'user@gmail.com') |
+ self.CheckPermissions(mr.perms, True, False, False) |
+ |
+ def testExternalUserPermissions_Archived(self): |
+ mr = self.MakeRequestAsUser('archived', 'user@gmail.com') |
+ self.CheckPermissions(mr.perms, False, False, False) |
+ |
+ def testExternalUserPermissions_MembersOnly(self): |
+ mr = self.MakeRequestAsUser('members-only', 'user@gmail.com') |
+ self.CheckPermissions(mr.perms, False, False, False) |
+ |
+ |
+if __name__ == '__main__': |
+ unittest.main() |