OLD | NEW |
(Empty) | |
| 1 # Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 # Use of this source code is govered by a BSD-style |
| 3 # license that can be found in the LICENSE file or at |
| 4 # https://developers.google.com/open-source/licenses/bsd |
| 5 |
| 6 """Tests for the usergroup service.""" |
| 7 |
| 8 import collections |
| 9 import unittest |
| 10 |
| 11 import mox |
| 12 |
| 13 from google.appengine.ext import testbed |
| 14 |
| 15 from framework import permissions |
| 16 from framework import sql |
| 17 from proto import usergroup_pb2 |
| 18 from services import service_manager |
| 19 from services import usergroup_svc |
| 20 from testing import fake |
| 21 |
| 22 |
| 23 def MakeUserGroupService(cache_manager, my_mox): |
| 24 usergroup_service = usergroup_svc.UserGroupService(cache_manager) |
| 25 usergroup_service.usergroup_tbl = my_mox.CreateMock(sql.SQLTableManager) |
| 26 usergroup_service.usergroupsettings_tbl = my_mox.CreateMock( |
| 27 sql.SQLTableManager) |
| 28 usergroup_service.usergroupprojects_tbl = my_mox.CreateMock( |
| 29 sql.SQLTableManager) |
| 30 return usergroup_service |
| 31 |
| 32 |
| 33 class MembershipTwoLevelCacheTest(unittest.TestCase): |
| 34 |
| 35 def setUp(self): |
| 36 self.mox = mox.Mox() |
| 37 self.cache_manager = fake.CacheManager() |
| 38 self.usergroup_service = MakeUserGroupService(self.cache_manager, self.mox) |
| 39 |
| 40 def testDeserializeMemberships(self): |
| 41 memberships_rows = [(111L, 777L), (111L, 888L), (222L, 888L)] |
| 42 actual = self.usergroup_service.memberships_2lc._DeserializeMemberships( |
| 43 memberships_rows) |
| 44 self.assertItemsEqual([111L, 222L], actual.keys()) |
| 45 self.assertItemsEqual([777L, 888L], actual[111L]) |
| 46 self.assertItemsEqual([888L], actual[222L]) |
| 47 |
| 48 |
| 49 class UserGroupServiceTest(unittest.TestCase): |
| 50 |
| 51 def setUp(self): |
| 52 self.testbed = testbed.Testbed() |
| 53 self.testbed.activate() |
| 54 self.testbed.init_memcache_stub() |
| 55 |
| 56 self.mox = mox.Mox() |
| 57 self.cnxn = 'fake connection' |
| 58 self.cache_manager = fake.CacheManager() |
| 59 self.usergroup_service = MakeUserGroupService(self.cache_manager, self.mox) |
| 60 self.services = service_manager.Services( |
| 61 user=fake.UserService(), |
| 62 usergroup=self.usergroup_service, |
| 63 project=fake.ProjectService()) |
| 64 |
| 65 def tearDown(self): |
| 66 self.testbed.deactivate() |
| 67 self.mox.UnsetStubs() |
| 68 self.mox.ResetAll() |
| 69 |
| 70 def SetUpCreateGroup( |
| 71 self, group_id, visiblity, external_group_type=None): |
| 72 self.SetUpUpdateSettings(group_id, visiblity, external_group_type) |
| 73 |
| 74 def testCreateGroup_Normal(self): |
| 75 self.services.user.TestAddUser('group@example.com', 888L) |
| 76 self.SetUpCreateGroup(888L, 'anyone') |
| 77 self.mox.ReplayAll() |
| 78 actual_group_id = self.usergroup_service.CreateGroup( |
| 79 self.cnxn, self.services, 'group@example.com', 'anyone') |
| 80 self.mox.VerifyAll() |
| 81 self.assertEqual(888L, actual_group_id) |
| 82 |
| 83 def testCreateGroup_Import(self): |
| 84 self.services.user.TestAddUser('troopers', 888L) |
| 85 self.SetUpCreateGroup(888L, 'owners', 'mdb') |
| 86 self.mox.ReplayAll() |
| 87 actual_group_id = self.usergroup_service.CreateGroup( |
| 88 self.cnxn, self.services, 'troopers', 'owners', 'mdb') |
| 89 self.mox.VerifyAll() |
| 90 self.assertEqual(888L, actual_group_id) |
| 91 |
| 92 def SetUpDetermineWhichUserIDsAreGroups(self, ids_to_query, mock_group_ids): |
| 93 self.usergroup_service.usergroupsettings_tbl.Select( |
| 94 self.cnxn, cols=['group_id'], group_id=ids_to_query).AndReturn( |
| 95 (gid,) for gid in mock_group_ids) |
| 96 |
| 97 def testDetermineWhichUserIDsAreGroups_NoGroups(self): |
| 98 self.SetUpDetermineWhichUserIDsAreGroups([], []) |
| 99 self.mox.ReplayAll() |
| 100 actual_group_ids = self.usergroup_service.DetermineWhichUserIDsAreGroups( |
| 101 self.cnxn, []) |
| 102 self.mox.VerifyAll() |
| 103 self.assertEqual([], actual_group_ids) |
| 104 |
| 105 def testDetermineWhichUserIDsAreGroups_SomeGroups(self): |
| 106 user_ids = [111, 222, 333] |
| 107 group_ids = [888, 999] |
| 108 self.SetUpDetermineWhichUserIDsAreGroups(user_ids + group_ids, group_ids) |
| 109 self.mox.ReplayAll() |
| 110 actual_group_ids = self.usergroup_service.DetermineWhichUserIDsAreGroups( |
| 111 self.cnxn, user_ids + group_ids) |
| 112 self.mox.VerifyAll() |
| 113 self.assertEqual(group_ids, actual_group_ids) |
| 114 |
| 115 def SetUpLookupAllMemberships(self, user_ids, mock_membership_rows): |
| 116 self.usergroup_service.usergroup_tbl.Select( |
| 117 self.cnxn, cols=['user_id', 'group_id'], distinct=True, |
| 118 user_id=user_ids).AndReturn(mock_membership_rows) |
| 119 |
| 120 def testLookupAllMemberships(self): |
| 121 self.usergroup_service.group_dag.initialized = True |
| 122 self.usergroup_service.memberships_2lc.CacheItem(111L, {888L, 999L}) |
| 123 self.SetUpLookupAllMemberships([222L], [(222L, 777L), (222L, 999L)]) |
| 124 self.mox.ReplayAll() |
| 125 actual_membership_dict = self.usergroup_service.LookupAllMemberships( |
| 126 self.cnxn, [111L, 222L]) |
| 127 self.mox.VerifyAll() |
| 128 self.assertEqual( |
| 129 {111L: {888L, 999}, 222L: {777L, 999L}}, |
| 130 actual_membership_dict) |
| 131 |
| 132 def SetUpRemoveMembers(self, group_id, member_ids): |
| 133 self.usergroup_service.usergroup_tbl.Delete( |
| 134 self.cnxn, group_id=group_id, user_id=member_ids) |
| 135 |
| 136 def testRemoveMembers(self): |
| 137 self.usergroup_service.group_dag.initialized = True |
| 138 self.SetUpRemoveMembers(888L, [111L, 222L]) |
| 139 self.SetUpLookupAllMembers([111L, 222L], [], {}, {}) |
| 140 self.mox.ReplayAll() |
| 141 self.usergroup_service.RemoveMembers(self.cnxn, 888L, [111L, 222L]) |
| 142 self.mox.VerifyAll() |
| 143 |
| 144 def testUpdateMembers(self): |
| 145 self.usergroup_service.group_dag.initialized = True |
| 146 self.usergroup_service.usergroup_tbl.Delete( |
| 147 self.cnxn, group_id=888L, user_id=[111L, 222L]) |
| 148 self.usergroup_service.usergroup_tbl.InsertRows( |
| 149 self.cnxn, ['user_id', 'group_id', 'role'], |
| 150 [(111L, 888L, 'member'), (222L, 888L, 'member')]) |
| 151 self.SetUpLookupAllMembers([111L, 222L], [], {}, {}) |
| 152 self.mox.ReplayAll() |
| 153 self.usergroup_service.UpdateMembers( |
| 154 self.cnxn, 888L, [111L, 222L], 'member') |
| 155 self.mox.VerifyAll() |
| 156 |
| 157 def testUpdateMembers_CircleDetection(self): |
| 158 # Two groups: 888 and 999 while 999 is a member of 888. |
| 159 self.SetUpDAG([(888,), (999,)], [(999, 888)]) |
| 160 self.mox.ReplayAll() |
| 161 self.assertRaises( |
| 162 usergroup_svc.CircularGroupException, |
| 163 self.usergroup_service.UpdateMembers, self.cnxn, 999, [888], 'member') |
| 164 self.mox.VerifyAll() |
| 165 |
| 166 def SetUpLookupAllMembers( |
| 167 self, group_ids, direct_member_rows, |
| 168 descedants_dict, indirect_member_rows_dict): |
| 169 self.usergroup_service.usergroup_tbl.Select( |
| 170 self.cnxn, cols=['user_id', 'group_id', 'role'], distinct=True, |
| 171 group_id=group_ids).AndReturn(direct_member_rows) |
| 172 for gid in group_ids: |
| 173 self.usergroup_service.usergroup_tbl.Select( |
| 174 self.cnxn, cols=['user_id'], distinct=True, |
| 175 group_id=descedants_dict.get(gid, [])).AndReturn( |
| 176 indirect_member_rows_dict.get(gid, [])) |
| 177 |
| 178 def testLookupAllMembers(self): |
| 179 self.usergroup_service.group_dag.initialized = True |
| 180 self.usergroup_service.group_dag.user_group_children = ( |
| 181 collections.defaultdict(list)) |
| 182 self.usergroup_service.group_dag.user_group_children[777] = [888] |
| 183 self.usergroup_service.group_dag.user_group_children[888] = [999] |
| 184 self.SetUpLookupAllMembers( |
| 185 [777], |
| 186 [(888, 777, 'member'), (111, 888, 'member'), (999, 888, 'member'), |
| 187 (222, 999, 'member')], |
| 188 {777: [888, 999]}, |
| 189 {777: [(111,), (222,), (999,)]}) |
| 190 |
| 191 self.mox.ReplayAll() |
| 192 members_dict, owners_dict = self.usergroup_service.LookupAllMembers( |
| 193 self.cnxn, [777]) |
| 194 self.mox.VerifyAll() |
| 195 self.assertItemsEqual([111, 222, 888, 999], members_dict[777]) |
| 196 self.assertItemsEqual([], owners_dict[777]) |
| 197 |
| 198 def testExpandAnyUserGroups_NoneRequested(self): |
| 199 self.SetUpDetermineWhichUserIDsAreGroups([], []) |
| 200 self.SetUpLookupMembers({}) |
| 201 self.mox.ReplayAll() |
| 202 direct_ids, indirect_ids = self.usergroup_service.ExpandAnyUserGroups( |
| 203 self.cnxn, []) |
| 204 self.mox.VerifyAll() |
| 205 self.assertItemsEqual([], direct_ids) |
| 206 self.assertItemsEqual([], indirect_ids) |
| 207 |
| 208 def testExpandAnyUserGroups_NoGroups(self): |
| 209 self.SetUpDetermineWhichUserIDsAreGroups([111, 222], []) |
| 210 self.SetUpLookupMembers({}) |
| 211 self.mox.ReplayAll() |
| 212 direct_ids, indirect_ids = self.usergroup_service.ExpandAnyUserGroups( |
| 213 self.cnxn, [111, 222]) |
| 214 self.mox.VerifyAll() |
| 215 self.assertItemsEqual([111, 222], direct_ids) |
| 216 self.assertItemsEqual([], indirect_ids) |
| 217 |
| 218 def testExpandAnyUserGroups_WithGroups(self): |
| 219 self.usergroup_service.group_dag.initialized = True |
| 220 self.SetUpDetermineWhichUserIDsAreGroups([111, 222, 888], [888]) |
| 221 self.SetUpLookupAllMembers( |
| 222 [888], [(222, 888, 'member'), (333, 888, 'member')], {}, {}) |
| 223 self.mox.ReplayAll() |
| 224 direct_ids, indirect_ids = self.usergroup_service.ExpandAnyUserGroups( |
| 225 self.cnxn, [111, 222, 888]) |
| 226 self.mox.VerifyAll() |
| 227 self.assertItemsEqual([111, 222], direct_ids) |
| 228 self.assertItemsEqual([333, 222], indirect_ids) |
| 229 |
| 230 def SetUpLookupMembers(self, group_member_dict): |
| 231 mock_membership_rows = [] |
| 232 group_ids = [] |
| 233 for gid, members in group_member_dict.iteritems(): |
| 234 group_ids.append(gid) |
| 235 mock_membership_rows.extend([(uid, gid, 'member') for uid in members]) |
| 236 group_ids.sort() |
| 237 self.usergroup_service.usergroup_tbl.Select( |
| 238 self.cnxn, cols=['user_id','group_id', 'role'], distinct=True, |
| 239 group_id=group_ids).AndReturn(mock_membership_rows) |
| 240 |
| 241 def testLookupMembers_NoneRequested(self): |
| 242 self.SetUpLookupMembers({}) |
| 243 self.mox.ReplayAll() |
| 244 member_ids, _ = self.usergroup_service.LookupMembers(self.cnxn, []) |
| 245 self.mox.VerifyAll() |
| 246 self.assertItemsEqual({}, member_ids) |
| 247 |
| 248 def testLookupMembers_Nonexistent(self): |
| 249 """If some requested groups don't exist, they are ignored.""" |
| 250 self.SetUpLookupMembers({777: []}) |
| 251 self.mox.ReplayAll() |
| 252 member_ids, _ = self.usergroup_service.LookupMembers(self.cnxn, [777]) |
| 253 self.mox.VerifyAll() |
| 254 self.assertItemsEqual([], member_ids[777]) |
| 255 |
| 256 def testLookupMembers_AllEmpty(self): |
| 257 """Requesting all empty groups results in no members.""" |
| 258 self.SetUpLookupMembers({888: [], 999: []}) |
| 259 self.mox.ReplayAll() |
| 260 member_ids, _ = self.usergroup_service.LookupMembers(self.cnxn, [888, 999]) |
| 261 self.mox.VerifyAll() |
| 262 self.assertItemsEqual([], member_ids[888]) |
| 263 |
| 264 def testLookupMembers_OneGroup(self): |
| 265 self.SetUpLookupMembers({888: [111, 222]}) |
| 266 self.mox.ReplayAll() |
| 267 member_ids, _ = self.usergroup_service.LookupMembers(self.cnxn, [888]) |
| 268 self.mox.VerifyAll() |
| 269 self.assertItemsEqual([111, 222], member_ids[888]) |
| 270 |
| 271 def testLookupMembers_GroupsAndNonGroups(self): |
| 272 """We ignore any non-groups passed in.""" |
| 273 self.SetUpLookupMembers({111: [], 333: [], 888: [111, 222]}) |
| 274 self.mox.ReplayAll() |
| 275 member_ids, _ = self.usergroup_service.LookupMembers( |
| 276 self.cnxn, [111, 333, 888]) |
| 277 self.mox.VerifyAll() |
| 278 self.assertItemsEqual([111, 222], member_ids[888]) |
| 279 |
| 280 def testLookupMembers_OverlappingGroups(self): |
| 281 """We get the union of IDs. Imagine 888 = {111} and 999 = {111, 222}.""" |
| 282 self.SetUpLookupMembers({888: [111], 999: [111, 222]}) |
| 283 self.mox.ReplayAll() |
| 284 member_ids, _ = self.usergroup_service.LookupMembers(self.cnxn, [888, 999]) |
| 285 self.mox.VerifyAll() |
| 286 self.assertItemsEqual([111, 222], member_ids[999]) |
| 287 self.assertItemsEqual([111], member_ids[888]) |
| 288 |
| 289 def testLookupVisibleMembers_LimitedVisiblity(self): |
| 290 """We get only the member IDs in groups that the user is allowed to see.""" |
| 291 self.usergroup_service.group_dag.initialized = True |
| 292 self.SetUpGetGroupSettings( |
| 293 [888, 999], |
| 294 [(888, 'anyone', None, 0), (999, 'members', None, 0)]) |
| 295 self.SetUpLookupMembers({888: [111], 999: [111]}) |
| 296 self.SetUpLookupAllMembers( |
| 297 [888, 999], [(111, 888, 'member'), (111, 999, 'member')], {}, {}) |
| 298 self.mox.ReplayAll() |
| 299 member_ids, _ = self.usergroup_service.LookupVisibleMembers( |
| 300 self.cnxn, [888, 999], permissions.USER_PERMISSIONSET, set(), |
| 301 self.services) |
| 302 self.mox.VerifyAll() |
| 303 self.assertItemsEqual([111], member_ids[888]) |
| 304 self.assertNotIn(999, member_ids) |
| 305 |
| 306 def SetUpGetAllUserGroupsInfo(self, mock_settings_rows, mock_count_rows, |
| 307 mock_friends=None): |
| 308 mock_friends = mock_friends or [] |
| 309 self.usergroup_service.usergroupsettings_tbl.Select( |
| 310 self.cnxn, cols=['group_id', 'email', 'who_can_view_members', |
| 311 'external_group_type', 'last_sync_time'], |
| 312 left_joins=[('User ON UserGroupSettings.group_id = User.user_id', [])] |
| 313 ).AndReturn(mock_settings_rows) |
| 314 self.usergroup_service.usergroup_tbl.Select( |
| 315 self.cnxn, cols=['group_id', 'COUNT(*)'], |
| 316 group_by=['group_id']).AndReturn(mock_count_rows) |
| 317 |
| 318 group_ids = [g[0] for g in mock_settings_rows] |
| 319 self.usergroup_service.usergroupprojects_tbl.Select( |
| 320 self.cnxn, cols=usergroup_svc.USERGROUPPROJECTS_COLS, |
| 321 group_id=group_ids).AndReturn(mock_friends) |
| 322 |
| 323 def testGetAllUserGroupsInfo(self): |
| 324 self.SetUpGetAllUserGroupsInfo( |
| 325 [(888L, 'group@example.com', 'anyone', None, 0)], |
| 326 [(888L, 12)]) |
| 327 self.mox.ReplayAll() |
| 328 actual_infos = self.usergroup_service.GetAllUserGroupsInfo(self.cnxn) |
| 329 self.mox.VerifyAll() |
| 330 self.assertEqual(1, len(actual_infos)) |
| 331 addr, count, group_settings, group_id = actual_infos[0] |
| 332 self.assertEqual('group@example.com', addr) |
| 333 self.assertEqual(12, count) |
| 334 self.assertEqual(usergroup_pb2.MemberVisibility.ANYONE, |
| 335 group_settings.who_can_view_members) |
| 336 self.assertEqual(888L, group_id) |
| 337 |
| 338 def SetUpGetGroupSettings(self, group_ids, mock_result_rows, |
| 339 mock_friends=None): |
| 340 mock_friends = mock_friends or [] |
| 341 self.usergroup_service.usergroupsettings_tbl.Select( |
| 342 self.cnxn, cols=usergroup_svc.USERGROUPSETTINGS_COLS, |
| 343 group_id=group_ids).AndReturn(mock_result_rows) |
| 344 self.usergroup_service.usergroupprojects_tbl.Select( |
| 345 self.cnxn, cols=usergroup_svc.USERGROUPPROJECTS_COLS, |
| 346 group_id=group_ids).AndReturn(mock_friends) |
| 347 |
| 348 def testGetGroupSettings_NoGroupsRequested(self): |
| 349 self.SetUpGetGroupSettings([], []) |
| 350 self.mox.ReplayAll() |
| 351 actual_settings_dict = self.usergroup_service.GetAllGroupSettings( |
| 352 self.cnxn, []) |
| 353 self.mox.VerifyAll() |
| 354 self.assertEqual({}, actual_settings_dict) |
| 355 |
| 356 def testGetGroupSettings_NoGroupsFound(self): |
| 357 self.SetUpGetGroupSettings([777L], []) |
| 358 self.mox.ReplayAll() |
| 359 actual_settings_dict = self.usergroup_service.GetAllGroupSettings( |
| 360 self.cnxn, [777L]) |
| 361 self.mox.VerifyAll() |
| 362 self.assertEqual({}, actual_settings_dict) |
| 363 |
| 364 def testGetGroupSettings_SomeGroups(self): |
| 365 self.SetUpGetGroupSettings( |
| 366 [777L, 888L, 999L], |
| 367 [(888L, 'anyone', None, 0), (999L, 'members', None, 0)]) |
| 368 self.mox.ReplayAll() |
| 369 actual_settings_dict = self.usergroup_service.GetAllGroupSettings( |
| 370 self.cnxn, [777L, 888L, 999L]) |
| 371 self.mox.VerifyAll() |
| 372 self.assertEqual( |
| 373 {888L: usergroup_pb2.MakeSettings('anyone'), |
| 374 999L: usergroup_pb2.MakeSettings('members')}, |
| 375 actual_settings_dict) |
| 376 |
| 377 def testGetGroupSettings_NoSuchGroup(self): |
| 378 self.SetUpGetGroupSettings([777L], []) |
| 379 self.mox.ReplayAll() |
| 380 actual_settings = self.usergroup_service.GetGroupSettings(self.cnxn, 777L) |
| 381 self.mox.VerifyAll() |
| 382 self.assertEqual(None, actual_settings) |
| 383 |
| 384 def testGetGroupSettings_Found(self): |
| 385 self.SetUpGetGroupSettings([888L], [(888L, 'anyone', None, 0)]) |
| 386 self.mox.ReplayAll() |
| 387 actual_settings = self.usergroup_service.GetGroupSettings(self.cnxn, 888) |
| 388 self.mox.VerifyAll() |
| 389 self.assertEqual( |
| 390 usergroup_pb2.MemberVisibility.ANYONE, |
| 391 actual_settings.who_can_view_members) |
| 392 |
| 393 def testGetGroupSettings_Import(self): |
| 394 self.SetUpGetGroupSettings([888L], [(888L, 'owners', 'mdb', 0)]) |
| 395 self.mox.ReplayAll() |
| 396 actual_settings = self.usergroup_service.GetGroupSettings(self.cnxn, 888) |
| 397 self.mox.VerifyAll() |
| 398 self.assertEqual( |
| 399 usergroup_pb2.MemberVisibility.OWNERS, |
| 400 actual_settings.who_can_view_members) |
| 401 self.assertEqual( |
| 402 usergroup_pb2.GroupType.MDB, |
| 403 actual_settings.ext_group_type) |
| 404 |
| 405 def SetUpUpdateSettings(self, group_id, visiblity, external_group_type=None, |
| 406 last_sync_time=0, friend_projects=None): |
| 407 friend_projects = friend_projects or [] |
| 408 self.usergroup_service.usergroupsettings_tbl.InsertRow( |
| 409 self.cnxn, group_id=group_id, who_can_view_members=visiblity, |
| 410 external_group_type=external_group_type, |
| 411 last_sync_time=last_sync_time, |
| 412 replace=True) |
| 413 self.usergroup_service.usergroupprojects_tbl.Delete( |
| 414 self.cnxn, group_id=group_id) |
| 415 if friend_projects: |
| 416 rows = [(group_id, p_id) for p_id in friend_projects] |
| 417 self.usergroup_service.usergroupprojects_tbl.InsertRows( |
| 418 self.cnxn, ['group_id', 'project_id'], rows) |
| 419 |
| 420 def testUpdateSettings_Normal(self): |
| 421 self.SetUpUpdateSettings(888L, 'anyone') |
| 422 self.mox.ReplayAll() |
| 423 self.usergroup_service.UpdateSettings( |
| 424 self.cnxn, 888L, usergroup_pb2.MakeSettings('anyone')) |
| 425 self.mox.VerifyAll() |
| 426 |
| 427 def testUpdateSettings_Import(self): |
| 428 self.SetUpUpdateSettings(888L, 'owners', 'mdb') |
| 429 self.mox.ReplayAll() |
| 430 self.usergroup_service.UpdateSettings( |
| 431 self.cnxn, 888L, |
| 432 usergroup_pb2.MakeSettings('owners', 'mdb')) |
| 433 self.mox.VerifyAll() |
| 434 |
| 435 def SetUpDAG(self, group_id_rows, usergroup_rows): |
| 436 self.usergroup_service.usergroupsettings_tbl.Select( |
| 437 self.cnxn, cols=['group_id']).AndReturn(group_id_rows) |
| 438 self.usergroup_service.usergroup_tbl.Select( |
| 439 self.cnxn, cols=['user_id', 'group_id'], distinct=True, |
| 440 user_id=[r[0] for r in group_id_rows]).AndReturn(usergroup_rows) |
| 441 |
| 442 def testDAG_Build(self): |
| 443 # Old entries should go away after rebuilding |
| 444 self.usergroup_service.group_dag.user_group_parents = ( |
| 445 collections.defaultdict(list)) |
| 446 self.usergroup_service.group_dag.user_group_parents[111] = [222] |
| 447 # Two groups: 888 and 999 while 999 is a member of 888. |
| 448 self.SetUpDAG([(888,), (999,)], [(999, 888)]) |
| 449 self.mox.ReplayAll() |
| 450 self.usergroup_service.group_dag.Build(self.cnxn) |
| 451 self.mox.VerifyAll() |
| 452 self.assertIn(888, self.usergroup_service.group_dag.user_group_children) |
| 453 self.assertIn(999, self.usergroup_service.group_dag.user_group_parents) |
| 454 self.assertNotIn(111, self.usergroup_service.group_dag.user_group_parents) |
| 455 |
| 456 def testDAG_GetAllAncestors(self): |
| 457 # Three groups: 777, 888 and 999. |
| 458 # 999 is a direct member of 888, and 888 is a direct member of 777. |
| 459 self.SetUpDAG([(777,), (888,), (999,)], [(999, 888), (888, 777)]) |
| 460 self.mox.ReplayAll() |
| 461 ancestors = self.usergroup_service.group_dag.GetAllAncestors( |
| 462 self.cnxn, 999) |
| 463 self.mox.VerifyAll() |
| 464 ancestors.sort() |
| 465 self.assertEqual([777, 888], ancestors) |
| 466 |
| 467 def testDAG_GetAllAncestorsDiamond(self): |
| 468 # Four groups: 666, 777, 888 and 999. |
| 469 # 999 is a direct member of both 888 and 777, |
| 470 # 888 is a direct member of 666, and 777 is also a direct member of 666. |
| 471 self.SetUpDAG([(666, ), (777,), (888,), (999,)], |
| 472 [(999, 888), (999, 777), (888, 666), (777, 666)]) |
| 473 self.mox.ReplayAll() |
| 474 ancestors = self.usergroup_service.group_dag.GetAllAncestors( |
| 475 self.cnxn, 999) |
| 476 self.mox.VerifyAll() |
| 477 ancestors.sort() |
| 478 self.assertEqual([666, 777, 888], ancestors) |
| 479 |
| 480 def testDAG_GetAllDescendants(self): |
| 481 # Four groups: 666, 777, 888 and 999. |
| 482 # 999 is a direct member of both 888 and 777, |
| 483 # 888 is a direct member of 666, and 777 is also a direct member of 666. |
| 484 self.SetUpDAG([(666, ), (777,), (888,), (999,)], |
| 485 [(999, 888), (999, 777), (888, 666), (777, 666)]) |
| 486 self.mox.ReplayAll() |
| 487 descendants = self.usergroup_service.group_dag.GetAllDescendants( |
| 488 self.cnxn, 666) |
| 489 self.mox.VerifyAll() |
| 490 descendants.sort() |
| 491 self.assertEqual([777, 888, 999], descendants) |
| 492 |
| 493 def testDAG_IsChild(self): |
| 494 # Four groups: 666, 777, 888 and 999. |
| 495 # 999 is a direct member of both 888 and 777, |
| 496 # 888 is a direct member of 666, and 777 is also a direct member of 666. |
| 497 self.SetUpDAG([(666, ), (777,), (888,), (999,)], |
| 498 [(999, 888), (999, 777), (888, 666), (777, 666)]) |
| 499 self.mox.ReplayAll() |
| 500 result1 = self.usergroup_service.group_dag.IsChild( |
| 501 self.cnxn, 777, 666) |
| 502 result2 = self.usergroup_service.group_dag.IsChild( |
| 503 self.cnxn, 777, 888) |
| 504 self.mox.VerifyAll() |
| 505 self.assertTrue(result1) |
| 506 self.assertFalse(result2) |
| 507 |
| 508 |
| 509 if __name__ == '__main__': |
| 510 unittest.main() |
OLD | NEW |