OLD | NEW |
| (Empty) |
1 # Copyright (c) 2007-2008 Twisted Matrix Laboratories. | |
2 # See LICENSE for details. | |
3 | |
4 """ | |
5 Tests for L{strcred}. | |
6 """ | |
7 | |
8 import os | |
9 import StringIO | |
10 | |
11 from twisted import plugin | |
12 from twisted.trial import unittest | |
13 from twisted.cred import credentials, checkers, error, strcred | |
14 from twisted.plugins import cred_file, cred_anonymous | |
15 from twisted.python import usage | |
16 | |
17 | |
18 try: | |
19 import crypt | |
20 except ImportError: | |
21 crypt = None | |
22 | |
23 try: | |
24 import pwd | |
25 except ImportError: | |
26 pwd = None | |
27 | |
28 try: | |
29 import spwd | |
30 except ImportError: | |
31 spwd = None | |
32 | |
33 | |
34 | |
35 def getInvalidAuthType(): | |
36 """ | |
37 Helper method to produce an auth type that doesn't exist. | |
38 """ | |
39 invalidAuthType = 'ThisPluginDoesNotExist' | |
40 while (invalidAuthType in | |
41 [factory.authType for factory in strcred.findCheckerFactories()]): | |
42 invalidAuthType += '_' | |
43 return invalidAuthType | |
44 | |
45 | |
46 | |
47 class TestPublicAPI(unittest.TestCase): | |
48 | |
49 def test_emptyDescription(self): | |
50 """ | |
51 Test that the description string cannot be empty. | |
52 """ | |
53 iat = getInvalidAuthType() | |
54 self.assertRaises(strcred.InvalidAuthType, strcred.makeChecker, iat) | |
55 self.assertRaises(strcred.InvalidAuthType, strcred.findCheckerFactory, i
at) | |
56 | |
57 | |
58 def test_invalidAuthType(self): | |
59 """ | |
60 Test that an unrecognized auth type raises an exception. | |
61 """ | |
62 iat = getInvalidAuthType() | |
63 self.assertRaises(strcred.InvalidAuthType, strcred.makeChecker, iat) | |
64 self.assertRaises(strcred.InvalidAuthType, strcred.findCheckerFactory, i
at) | |
65 | |
66 | |
67 | |
68 class TestStrcredFunctions(unittest.TestCase): | |
69 | |
70 def test_findCheckerFactories(self): | |
71 """ | |
72 Test that findCheckerFactories returns all available plugins. | |
73 """ | |
74 availablePlugins = list(strcred.findCheckerFactories()) | |
75 for plg in plugin.getPlugins(strcred.ICheckerFactory): | |
76 self.assertIn(plg, availablePlugins) | |
77 | |
78 | |
79 def test_findCheckerFactory(self): | |
80 """ | |
81 Test that findCheckerFactory returns the first plugin | |
82 available for a given authentication type. | |
83 """ | |
84 self.assertIdentical(strcred.findCheckerFactory('file'), | |
85 cred_file.theFileCheckerFactory) | |
86 | |
87 | |
88 | |
89 class TestMemoryChecker(unittest.TestCase): | |
90 | |
91 def setUp(self): | |
92 self.admin = credentials.UsernamePassword('admin', 'asdf') | |
93 self.alice = credentials.UsernamePassword('alice', 'foo') | |
94 self.badPass = credentials.UsernamePassword('alice', 'foobar') | |
95 self.badUser = credentials.UsernamePassword('x', 'yz') | |
96 self.checker = strcred.makeChecker('memory:admin:asdf:alice:foo') | |
97 | |
98 | |
99 def test_isChecker(self): | |
100 """ | |
101 Verifies that strcred.makeChecker('memory') returns an object | |
102 that implements the L{ICredentialsChecker} interface. | |
103 """ | |
104 self.assertTrue(checkers.ICredentialsChecker.providedBy(self.checker)) | |
105 self.assertIn(credentials.IUsernamePassword, | |
106 self.checker.credentialInterfaces) | |
107 | |
108 | |
109 def test_badFormatArgString(self): | |
110 """ | |
111 Test that an argument string which does not contain user:pass | |
112 pairs (i.e., an odd number of ':' characters) raises an exception. | |
113 """ | |
114 self.assertRaises(strcred.InvalidAuthArgumentString, | |
115 strcred.makeChecker, 'memory:a:b:c') | |
116 | |
117 | |
118 def test_memoryCheckerSucceeds(self): | |
119 """ | |
120 Test that the checker works with valid credentials. | |
121 """ | |
122 def _gotAvatar(username): | |
123 self.assertEquals(username, self.admin.username) | |
124 return (self.checker | |
125 .requestAvatarId(self.admin) | |
126 .addCallback(_gotAvatar)) | |
127 | |
128 | |
129 def test_memoryCheckerFailsUsername(self): | |
130 """ | |
131 Test that the checker fails with an invalid username. | |
132 """ | |
133 return self.assertFailure(self.checker.requestAvatarId(self.badUser), | |
134 error.UnauthorizedLogin) | |
135 | |
136 | |
137 def test_memoryCheckerFailsPassword(self): | |
138 """ | |
139 Test that the checker fails with an invalid password. | |
140 """ | |
141 return self.assertFailure(self.checker.requestAvatarId(self.badPass), | |
142 error.UnauthorizedLogin) | |
143 | |
144 | |
145 | |
146 class TestAnonymousChecker(unittest.TestCase): | |
147 | |
148 def test_isChecker(self): | |
149 """ | |
150 Verifies that strcred.makeChecker('anonymous') returns an object | |
151 that implements the L{ICredentialsChecker} interface. | |
152 """ | |
153 checker = strcred.makeChecker('anonymous') | |
154 self.assertTrue(checkers.ICredentialsChecker.providedBy(checker)) | |
155 self.assertIn(credentials.IAnonymous, checker.credentialInterfaces) | |
156 | |
157 | |
158 def testAnonymousAccessSucceeds(self): | |
159 """ | |
160 Test that we can log in anonymously using this checker. | |
161 """ | |
162 checker = strcred.makeChecker('anonymous') | |
163 request = checker.requestAvatarId(credentials.Anonymous()) | |
164 def _gotAvatar(avatar): | |
165 self.assertIdentical(checkers.ANONYMOUS, avatar) | |
166 return request.addCallback(_gotAvatar) | |
167 | |
168 | |
169 | |
170 class TestUnixChecker(unittest.TestCase): | |
171 users = { | |
172 'admin': 'asdf', | |
173 'alice': 'foo', | |
174 } | |
175 | |
176 | |
177 def _pwd(self, username): | |
178 return (username, crypt.crypt(self.users[username], 'F/'), | |
179 1000, 1000, username, '/home/'+username, '/bin/sh') | |
180 | |
181 | |
182 def _spwd(self, username): | |
183 return (username, crypt.crypt(self.users[username], 'F/'), | |
184 0, 0, 99999, 7, -1, -1, -1) | |
185 | |
186 | |
187 def setUp(self): | |
188 self.admin = credentials.UsernamePassword('admin', 'asdf') | |
189 self.alice = credentials.UsernamePassword('alice', 'foo') | |
190 self.badPass = credentials.UsernamePassword('alice', 'foobar') | |
191 self.badUser = credentials.UsernamePassword('x', 'yz') | |
192 self.checker = strcred.makeChecker('unix') | |
193 # Hack around the pwd and spwd modules, since we can't really | |
194 # go about reading your /etc/passwd or /etc/shadow files | |
195 if pwd: | |
196 self._pwd_getpwnam = pwd.getpwnam | |
197 pwd.getpwnam = self._pwd | |
198 if spwd: | |
199 self._spwd_getspnam = spwd.getspnam | |
200 spwd.getspnam = self._spwd | |
201 | |
202 | |
203 def tearDown(self): | |
204 if pwd: | |
205 pwd.getpwnam = self._pwd_getpwnam | |
206 if spwd: | |
207 spwd.getspnam = self._spwd_getspnam | |
208 | |
209 | |
210 def test_isChecker(self): | |
211 """ | |
212 Verifies that strcred.makeChecker('unix') returns an object | |
213 that implements the L{ICredentialsChecker} interface. | |
214 """ | |
215 self.assertTrue(checkers.ICredentialsChecker.providedBy(self.checker)) | |
216 self.assertIn(credentials.IUsernamePassword, | |
217 self.checker.credentialInterfaces) | |
218 | |
219 | |
220 def test_unixCheckerSucceeds(self): | |
221 """ | |
222 Test that the checker works with valid credentials. | |
223 """ | |
224 def _gotAvatar(username): | |
225 self.assertEquals(username, self.admin.username) | |
226 return (self.checker | |
227 .requestAvatarId(self.admin) | |
228 .addCallback(_gotAvatar)) | |
229 | |
230 | |
231 def test_unixCheckerFailsUsername(self): | |
232 """ | |
233 Test that the checker fails with an invalid username. | |
234 """ | |
235 return self.assertFailure(self.checker.requestAvatarId(self.badUser), | |
236 error.UnauthorizedLogin) | |
237 | |
238 | |
239 def test_unixCheckerFailsPassword(self): | |
240 """ | |
241 Test that the checker fails with an invalid password. | |
242 """ | |
243 return self.assertFailure(self.checker.requestAvatarId(self.badPass), | |
244 error.UnauthorizedLogin) | |
245 | |
246 | |
247 if None in (pwd, spwd, crypt): | |
248 for method in (test_unixCheckerSucceeds, | |
249 test_unixCheckerFailsUsername, | |
250 test_unixCheckerFailsPassword): | |
251 method.skip = 'pwd and spwd are both unavailable' | |
252 | |
253 | |
254 | |
255 class TestFileDBChecker(unittest.TestCase): | |
256 """ | |
257 Test for the --auth=file:... file checker. | |
258 """ | |
259 | |
260 def setUp(self): | |
261 self.admin = credentials.UsernamePassword('admin', 'asdf') | |
262 self.alice = credentials.UsernamePassword('alice', 'foo') | |
263 self.badPass = credentials.UsernamePassword('alice', 'foobar') | |
264 self.badUser = credentials.UsernamePassword('x', 'yz') | |
265 self.filename = self.mktemp() | |
266 file(self.filename, 'w').write('admin:asdf\nalice:foo\n') | |
267 self.checker = strcred.makeChecker('file:' + self.filename) | |
268 | |
269 | |
270 def _fakeFilename(self): | |
271 filename = '/DoesNotExist' | |
272 while os.path.exists(filename): | |
273 filename += '_' | |
274 return filename | |
275 | |
276 | |
277 def test_isChecker(self): | |
278 """ | |
279 Verifies that strcred.makeChecker('memory') returns an object | |
280 that implements the L{ICredentialsChecker} interface. | |
281 """ | |
282 self.assertTrue(checkers.ICredentialsChecker.providedBy(self.checker)) | |
283 self.assertIn(credentials.IUsernamePassword, | |
284 self.checker.credentialInterfaces) | |
285 | |
286 | |
287 def test_fileCheckerSucceeds(self): | |
288 """ | |
289 Test that the checker works with valid credentials. | |
290 """ | |
291 def _gotAvatar(username): | |
292 self.assertEquals(username, self.admin.username) | |
293 return (self.checker | |
294 .requestAvatarId(self.admin) | |
295 .addCallback(_gotAvatar)) | |
296 | |
297 | |
298 def test_fileCheckerFailsUsername(self): | |
299 """ | |
300 Test that the checker fails with an invalid username. | |
301 """ | |
302 return self.assertFailure(self.checker.requestAvatarId(self.badUser), | |
303 error.UnauthorizedLogin) | |
304 | |
305 | |
306 def test_fileCheckerFailsPassword(self): | |
307 """ | |
308 Test that the checker fails with an invalid password. | |
309 """ | |
310 return self.assertFailure(self.checker.requestAvatarId(self.badPass), | |
311 error.UnauthorizedLogin) | |
312 | |
313 | |
314 def test_failsWithEmptyFilename(self): | |
315 """ | |
316 Test that an empty filename raises an error. | |
317 """ | |
318 self.assertRaises(ValueError, strcred.makeChecker, 'file') | |
319 self.assertRaises(ValueError, strcred.makeChecker, 'file:') | |
320 | |
321 | |
322 def test_warnWithBadFilename(self): | |
323 """ | |
324 When the file auth plugin is given a file that doesn't exist, it | |
325 should produce a warning. | |
326 """ | |
327 oldOutput = cred_file.theFileCheckerFactory.errorOutput | |
328 newOutput = StringIO.StringIO() | |
329 cred_file.theFileCheckerFactory.errorOutput = newOutput | |
330 checker = strcred.makeChecker('file:' + self._fakeFilename()) | |
331 cred_file.theFileCheckerFactory.errorOutput = oldOutput | |
332 self.assertIn(cred_file.invalidFileWarning, newOutput.getvalue()) | |
333 | |
334 | |
335 | |
336 class DummyOptions(usage.Options, strcred.AuthOptionMixin): | |
337 """ | |
338 Simple options for testing L{strcred.AuthOptionMixin}. | |
339 """ | |
340 | |
341 | |
342 | |
343 class TestCheckerOptions(unittest.TestCase): | |
344 | |
345 def test_createsList(self): | |
346 """ | |
347 Test that the --auth command line creates a list in the | |
348 Options instance and appends values to it. | |
349 """ | |
350 options = DummyOptions() | |
351 options.parseOptions(['--auth', 'memory']) | |
352 self.assertEqual(len(options['credCheckers']), 1) | |
353 options = DummyOptions() | |
354 options.parseOptions(['--auth', 'memory', '--auth', 'memory']) | |
355 self.assertEqual(len(options['credCheckers']), 2) | |
356 | |
357 | |
358 def test_invalidAuthError(self): | |
359 """ | |
360 Test that the --auth command line raises an exception when it | |
361 gets a parameter it doesn't understand. | |
362 """ | |
363 options = DummyOptions() | |
364 # If someone adds a 'ThisPluginDoesNotExist' then this unit | |
365 # test should still run. | |
366 invalidParameter = getInvalidAuthType() | |
367 self.assertRaises( | |
368 usage.UsageError, | |
369 options.parseOptions, ['--auth', invalidParameter]) | |
370 self.assertRaises( | |
371 usage.UsageError, | |
372 options.parseOptions, ['--help-auth-type', invalidParameter]) | |
373 | |
374 | |
375 def test_createsDictionary(self): | |
376 """ | |
377 Test that the --auth command line creates a dictionary | |
378 mapping supported interfaces to the list of credentials | |
379 checkers that support it. | |
380 """ | |
381 options = DummyOptions() | |
382 options.parseOptions(['--auth', 'memory', '--auth', 'anonymous']) | |
383 chd = options['credInterfaces'] | |
384 self.assertEquals(len(chd[credentials.IAnonymous]), 1) | |
385 self.assertEquals(len(chd[credentials.IUsernamePassword]), 1) | |
386 chdAnonymous = chd[credentials.IAnonymous][0] | |
387 chdUserPass = chd[credentials.IUsernamePassword][0] | |
388 self.assertTrue(checkers.ICredentialsChecker.providedBy(chdAnonymous)) | |
389 self.assertTrue(checkers.ICredentialsChecker.providedBy(chdUserPass)) | |
390 self.assertIn(credentials.IAnonymous, | |
391 chdAnonymous.credentialInterfaces) | |
392 self.assertIn(credentials.IUsernamePassword, | |
393 chdUserPass.credentialInterfaces) | |
394 | |
395 | |
396 def test_credInterfacesProvidesLists(self): | |
397 """ | |
398 Test that when two --auth arguments are passed along which | |
399 support the same interface, a list with both is created. | |
400 """ | |
401 options = DummyOptions() | |
402 options.parseOptions(['--auth', 'memory', '--auth', 'unix']) | |
403 self.assertEquals( | |
404 options['credCheckers'], | |
405 options['credInterfaces'][credentials.IUsernamePassword]) | |
406 | |
407 | |
408 def test_listDoesNotDisplayDuplicates(self): | |
409 """ | |
410 Test that the list for --help-auth does not duplicate items. | |
411 """ | |
412 authTypes = [] | |
413 options = DummyOptions() | |
414 for cf in options._checkerFactoriesForOptHelpAuth(): | |
415 self.assertNotIn(cf.authType, authTypes) | |
416 authTypes.append(cf.authType) | |
417 | |
418 | |
419 def test_displaysListCorrectly(self): | |
420 """ | |
421 Test that the --help-auth argument correctly displays all | |
422 available authentication plugins, then exits. | |
423 """ | |
424 newStdout = StringIO.StringIO() | |
425 options = DummyOptions() | |
426 options.authOutput = newStdout | |
427 self.assertRaises(SystemExit, options.parseOptions, ['--help-auth']) | |
428 for checkerFactory in strcred.findCheckerFactories(): | |
429 self.assertIn(checkerFactory.authType, newStdout.getvalue()) | |
430 | |
431 | |
432 def test_displaysHelpCorrectly(self): | |
433 """ | |
434 Test that the --help-auth-for argument will correctly display | |
435 the help file for a particular authentication plugin. | |
436 """ | |
437 newStdout = StringIO.StringIO() | |
438 options = DummyOptions() | |
439 options.authOutput = newStdout | |
440 self.assertRaises( | |
441 SystemExit, options.parseOptions, ['--help-auth-type', 'file']) | |
442 for line in cred_file.theFileCheckerFactory.authHelp: | |
443 if line.strip(): | |
444 self.assertIn(line.strip(), newStdout.getvalue()) | |
445 | |
446 | |
447 def test_unexpectedException(self): | |
448 """ | |
449 When the checker specified by --auth raises an unexpected error, it | |
450 should be caught and re-raised within a L{usage.UsageError}. | |
451 """ | |
452 options = DummyOptions() | |
453 err = self.assertRaises(usage.UsageError, options.parseOptions, | |
454 ['--auth', 'file']) | |
455 self.assertEquals(str(err), | |
456 "Unexpected error: 'file' requires a filename") | |
457 | |
458 | |
459 | |
460 class OptionsForUsernamePassword(usage.Options, strcred.AuthOptionMixin): | |
461 supportedInterfaces = (credentials.IUsernamePassword,) | |
462 | |
463 | |
464 | |
465 class OptionsForUsernameHashedPassword(usage.Options, strcred.AuthOptionMixin): | |
466 supportedInterfaces = (credentials.IUsernameHashedPassword,) | |
467 | |
468 | |
469 | |
470 class OptionsSupportsAllInterfaces(usage.Options, strcred.AuthOptionMixin): | |
471 supportedInterfaces = None | |
472 | |
473 | |
474 | |
475 class OptionsSupportsNoInterfaces(usage.Options, strcred.AuthOptionMixin): | |
476 supportedInterfaces = [] | |
477 | |
478 | |
479 | |
480 class TestLimitingInterfaces(unittest.TestCase): | |
481 """ | |
482 Tests functionality that allows an application to limit the | |
483 credential interfaces it can support. For the purposes of this | |
484 test, we use IUsernameHashedPassword, although this will never | |
485 really be used by the command line. | |
486 | |
487 (I have, to date, not thought of a half-decent way for a user to | |
488 specify a hash algorithm via the command-line. Nor do I think it's | |
489 very useful.) | |
490 | |
491 I should note that, at first, this test is counter-intuitive, | |
492 because we're using the checker with a pre-defined hash function | |
493 as the 'bad' checker. See the documentation for | |
494 L{twisted.cred.checkers.FilePasswordDB.hash} for more details. | |
495 """ | |
496 | |
497 def setUp(self): | |
498 self.filename = self.mktemp() | |
499 file(self.filename, 'w').write('admin:asdf\nalice:foo\n') | |
500 self.goodChecker = checkers.FilePasswordDB(self.filename) | |
501 self.badChecker = checkers.FilePasswordDB(self.filename, hash=self._hash
) | |
502 self.anonChecker = checkers.AllowAnonymousAccess() | |
503 | |
504 | |
505 def _hash(self, networkUsername, networkPassword, storedPassword): | |
506 """ | |
507 A dumb hash that doesn't really do anything. | |
508 """ | |
509 return networkPassword | |
510 | |
511 | |
512 def test_supportsInterface(self): | |
513 """ | |
514 Test that the supportsInterface method behaves appropriately. | |
515 """ | |
516 options = OptionsForUsernamePassword() | |
517 self.assertTrue( | |
518 options.supportsInterface(credentials.IUsernamePassword)) | |
519 self.assertFalse( | |
520 options.supportsInterface(credentials.IAnonymous)) | |
521 self.assertRaises( | |
522 strcred.UnsupportedInterfaces, options.addChecker, self.anonChecker) | |
523 | |
524 | |
525 def test_supportsAllInterfaces(self): | |
526 """ | |
527 Test that the supportsInterface method behaves appropriately | |
528 when the supportedInterfaces attribute is None. | |
529 """ | |
530 options = OptionsSupportsAllInterfaces() | |
531 self.assertTrue( | |
532 options.supportsInterface(credentials.IUsernamePassword)) | |
533 self.assertTrue( | |
534 options.supportsInterface(credentials.IAnonymous)) | |
535 | |
536 | |
537 def test_supportsCheckerFactory(self): | |
538 """ | |
539 Test that the supportsCheckerFactory method behaves appropriately. | |
540 """ | |
541 options = OptionsForUsernamePassword() | |
542 fileCF = cred_file.theFileCheckerFactory | |
543 anonCF = cred_anonymous.theAnonymousCheckerFactory | |
544 self.assertTrue(options.supportsCheckerFactory(fileCF)) | |
545 self.assertFalse(options.supportsCheckerFactory(anonCF)) | |
546 | |
547 | |
548 def test_canAddSupportedChecker(self): | |
549 """ | |
550 Test that when addChecker is called with a checker that | |
551 implements at least one of the interfaces our application | |
552 supports, it is successful. | |
553 """ | |
554 options = OptionsForUsernamePassword() | |
555 options.addChecker(self.goodChecker) | |
556 iface = options.supportedInterfaces[0] | |
557 # Test that we did get IUsernamePassword | |
558 self.assertIdentical(options['credInterfaces'][iface][0], self.goodCheck
er) | |
559 self.assertIdentical(options['credCheckers'][0], self.goodChecker) | |
560 # Test that we didn't get IUsernameHashedPassword | |
561 self.assertEquals(len(options['credInterfaces'][iface]), 1) | |
562 self.assertEquals(len(options['credCheckers']), 1) | |
563 | |
564 | |
565 def test_failOnAddingUnsupportedChecker(self): | |
566 """ | |
567 Test that when addChecker is called with a checker that does | |
568 not implement any supported interfaces, it fails. | |
569 """ | |
570 options = OptionsForUsernameHashedPassword() | |
571 self.assertRaises(strcred.UnsupportedInterfaces, | |
572 options.addChecker, self.badChecker) | |
573 | |
574 | |
575 def test_unsupportedInterfaceError(self): | |
576 """ | |
577 Test that the --auth command line raises an exception when it | |
578 gets a checker we don't support. | |
579 """ | |
580 options = OptionsSupportsNoInterfaces() | |
581 authType = cred_anonymous.theAnonymousCheckerFactory.authType | |
582 self.assertRaises( | |
583 usage.UsageError, | |
584 options.parseOptions, ['--auth', authType]) | |
585 | |
586 | |
587 def test_helpAuthLimitsOutput(self): | |
588 """ | |
589 Test that --help-auth will only list checkers that purport to | |
590 supply at least one of the credential interfaces our | |
591 application can use. | |
592 """ | |
593 options = OptionsForUsernamePassword() | |
594 for factory in options._checkerFactoriesForOptHelpAuth(): | |
595 invalid = True | |
596 for interface in factory.credentialInterfaces: | |
597 if options.supportsInterface(interface): | |
598 invalid = False | |
599 if invalid: | |
600 raise strcred.UnsupportedInterfaces() | |
601 | |
602 | |
603 def test_helpAuthTypeLimitsOutput(self): | |
604 """ | |
605 Test that --help-auth-type will display a warning if you get | |
606 help for an authType that does not supply at least one of the | |
607 credential interfaces our application can use. | |
608 """ | |
609 options = OptionsForUsernamePassword() | |
610 # Find an interface that we can use for our test | |
611 invalidFactory = None | |
612 for factory in strcred.findCheckerFactories(): | |
613 if not options.supportsCheckerFactory(factory): | |
614 invalidFactory = factory | |
615 break | |
616 self.assertNotIdentical(invalidFactory, None) | |
617 # Capture output and make sure the warning is there | |
618 newStdout = StringIO.StringIO() | |
619 options.authOutput = newStdout | |
620 self.assertRaises(SystemExit, options.parseOptions, | |
621 ['--help-auth-type', 'anonymous']) | |
622 self.assertIn(strcred.notSupportedWarning, newStdout.getvalue()) | |
623 | |
OLD | NEW |