| Index: third_party/twisted_8_1/twisted/test/test_newcred.py
|
| diff --git a/third_party/twisted_8_1/twisted/test/test_newcred.py b/third_party/twisted_8_1/twisted/test/test_newcred.py
|
| deleted file mode 100644
|
| index 623bb3bfbd62047343bc7b1c782c8e20915c4905..0000000000000000000000000000000000000000
|
| --- a/third_party/twisted_8_1/twisted/test/test_newcred.py
|
| +++ /dev/null
|
| @@ -1,444 +0,0 @@
|
| -# Copyright (c) 2001-2007 Twisted Matrix Laboratories.
|
| -# See LICENSE for details.
|
| -
|
| -"""
|
| -Now with 30% more starch.
|
| -"""
|
| -
|
| -
|
| -import hmac
|
| -from zope.interface import implements, Interface
|
| -
|
| -from twisted.trial import unittest
|
| -from twisted.cred import portal, checkers, credentials, error
|
| -from twisted.python import components
|
| -from twisted.internet import defer
|
| -from twisted.internet.defer import deferredGenerator as dG, waitForDeferred as wFD
|
| -
|
| -try:
|
| - from crypt import crypt
|
| -except ImportError:
|
| - crypt = None
|
| -
|
| -try:
|
| - from twisted.cred.pamauth import callIntoPAM
|
| -except ImportError:
|
| - pamauth = None
|
| -else:
|
| - from twisted.cred import pamauth
|
| -
|
| -class ITestable(Interface):
|
| - pass
|
| -
|
| -class TestAvatar:
|
| - def __init__(self, name):
|
| - self.name = name
|
| - self.loggedIn = False
|
| - self.loggedOut = False
|
| -
|
| - def login(self):
|
| - assert not self.loggedIn
|
| - self.loggedIn = True
|
| -
|
| - def logout(self):
|
| - self.loggedOut = True
|
| -
|
| -class Testable(components.Adapter):
|
| - implements(ITestable)
|
| -
|
| -# components.Interface(TestAvatar).adaptWith(Testable, ITestable)
|
| -
|
| -components.registerAdapter(Testable, TestAvatar, ITestable)
|
| -
|
| -class IDerivedCredentials(credentials.IUsernamePassword):
|
| - pass
|
| -
|
| -class DerivedCredentials(object):
|
| - implements(IDerivedCredentials, ITestable)
|
| -
|
| - def __init__(self, username, password):
|
| - self.username = username
|
| - self.password = password
|
| -
|
| - def checkPassword(self, password):
|
| - return password == self.password
|
| -
|
| -
|
| -class TestRealm:
|
| - implements(portal.IRealm)
|
| - def __init__(self):
|
| - self.avatars = {}
|
| -
|
| - def requestAvatar(self, avatarId, mind, *interfaces):
|
| - if self.avatars.has_key(avatarId):
|
| - avatar = self.avatars[avatarId]
|
| - else:
|
| - avatar = TestAvatar(avatarId)
|
| - self.avatars[avatarId] = avatar
|
| - avatar.login()
|
| - return (interfaces[0], interfaces[0](avatar),
|
| - avatar.logout)
|
| -
|
| -class NewCredTest(unittest.TestCase):
|
| - def setUp(self):
|
| - r = self.realm = TestRealm()
|
| - p = self.portal = portal.Portal(r)
|
| - up = self.checker = checkers.InMemoryUsernamePasswordDatabaseDontUse()
|
| - up.addUser("bob", "hello")
|
| - p.registerChecker(up)
|
| -
|
| - def testListCheckers(self):
|
| - expected = [credentials.IUsernamePassword, credentials.IUsernameHashedPassword]
|
| - got = self.portal.listCredentialsInterfaces()
|
| - expected.sort()
|
| - got.sort()
|
| - self.assertEquals(got, expected)
|
| -
|
| - def testBasicLogin(self):
|
| - l = []; f = []
|
| - self.portal.login(credentials.UsernamePassword("bob", "hello"),
|
| - self, ITestable).addCallback(
|
| - l.append).addErrback(f.append)
|
| - if f:
|
| - raise f[0]
|
| - # print l[0].getBriefTraceback()
|
| - iface, impl, logout = l[0]
|
| - # whitebox
|
| - self.assertEquals(iface, ITestable)
|
| - self.failUnless(iface.providedBy(impl),
|
| - "%s does not implement %s" % (impl, iface))
|
| - # greybox
|
| - self.failUnless(impl.original.loggedIn)
|
| - self.failUnless(not impl.original.loggedOut)
|
| - logout()
|
| - self.failUnless(impl.original.loggedOut)
|
| -
|
| - def test_derivedInterface(self):
|
| - """
|
| - Login with credentials implementing an interface inheriting from an
|
| - interface registered with a checker (but not itself registered).
|
| - """
|
| - l = []
|
| - f = []
|
| - self.portal.login(DerivedCredentials("bob", "hello"), self, ITestable
|
| - ).addCallback(l.append
|
| - ).addErrback(f.append)
|
| - if f:
|
| - raise f[0]
|
| - iface, impl, logout = l[0]
|
| - # whitebox
|
| - self.assertEquals(iface, ITestable)
|
| - self.failUnless(iface.providedBy(impl),
|
| - "%s does not implement %s" % (impl, iface))
|
| - # greybox
|
| - self.failUnless(impl.original.loggedIn)
|
| - self.failUnless(not impl.original.loggedOut)
|
| - logout()
|
| - self.failUnless(impl.original.loggedOut)
|
| -
|
| - def testFailedLogin(self):
|
| - l = []
|
| - self.portal.login(credentials.UsernamePassword("bob", "h3llo"),
|
| - self, ITestable).addErrback(
|
| - lambda x: x.trap(error.UnauthorizedLogin)).addCallback(l.append)
|
| - self.failUnless(l)
|
| - self.failUnlessEqual(error.UnauthorizedLogin, l[0])
|
| -
|
| - def testFailedLoginName(self):
|
| - l = []
|
| - self.portal.login(credentials.UsernamePassword("jay", "hello"),
|
| - self, ITestable).addErrback(
|
| - lambda x: x.trap(error.UnauthorizedLogin)).addCallback(l.append)
|
| - self.failUnless(l)
|
| - self.failUnlessEqual(error.UnauthorizedLogin, l[0])
|
| -
|
| -
|
| -class CramMD5CredentialsTestCase(unittest.TestCase):
|
| - def testIdempotentChallenge(self):
|
| - c = credentials.CramMD5Credentials()
|
| - chal = c.getChallenge()
|
| - self.assertEquals(chal, c.getChallenge())
|
| -
|
| - def testCheckPassword(self):
|
| - c = credentials.CramMD5Credentials()
|
| - chal = c.getChallenge()
|
| - c.response = hmac.HMAC('secret', chal).hexdigest()
|
| - self.failUnless(c.checkPassword('secret'))
|
| -
|
| - def testWrongPassword(self):
|
| - c = credentials.CramMD5Credentials()
|
| - self.failIf(c.checkPassword('secret'))
|
| -
|
| -class OnDiskDatabaseTestCase(unittest.TestCase):
|
| - users = [
|
| - ('user1', 'pass1'),
|
| - ('user2', 'pass2'),
|
| - ('user3', 'pass3'),
|
| - ]
|
| -
|
| -
|
| - def testUserLookup(self):
|
| - dbfile = self.mktemp()
|
| - db = checkers.FilePasswordDB(dbfile)
|
| - f = file(dbfile, 'w')
|
| - for (u, p) in self.users:
|
| - f.write('%s:%s\n' % (u, p))
|
| - f.close()
|
| -
|
| - for (u, p) in self.users:
|
| - self.failUnlessRaises(KeyError, db.getUser, u.upper())
|
| - self.assertEquals(db.getUser(u), (u, p))
|
| -
|
| - def testCaseInSensitivity(self):
|
| - dbfile = self.mktemp()
|
| - db = checkers.FilePasswordDB(dbfile, caseSensitive=0)
|
| - f = file(dbfile, 'w')
|
| - for (u, p) in self.users:
|
| - f.write('%s:%s\n' % (u, p))
|
| - f.close()
|
| -
|
| - for (u, p) in self.users:
|
| - self.assertEquals(db.getUser(u.upper()), (u, p))
|
| -
|
| - def testRequestAvatarId(self):
|
| - dbfile = self.mktemp()
|
| - db = checkers.FilePasswordDB(dbfile, caseSensitive=0)
|
| - f = file(dbfile, 'w')
|
| - for (u, p) in self.users:
|
| - f.write('%s:%s\n' % (u, p))
|
| - f.close()
|
| - creds = [credentials.UsernamePassword(u, p) for u, p in self.users]
|
| - d = defer.gatherResults(
|
| - [defer.maybeDeferred(db.requestAvatarId, c) for c in creds])
|
| - d.addCallback(self.assertEquals, [u for u, p in self.users])
|
| - return d
|
| -
|
| - def testRequestAvatarId_hashed(self):
|
| - dbfile = self.mktemp()
|
| - db = checkers.FilePasswordDB(dbfile, caseSensitive=0)
|
| - f = file(dbfile, 'w')
|
| - for (u, p) in self.users:
|
| - f.write('%s:%s\n' % (u, p))
|
| - f.close()
|
| - creds = [credentials.UsernameHashedPassword(u, p) for u, p in self.users]
|
| - d = defer.gatherResults(
|
| - [defer.maybeDeferred(db.requestAvatarId, c) for c in creds])
|
| - d.addCallback(self.assertEquals, [u for u, p in self.users])
|
| - return d
|
| -
|
| -
|
| -
|
| -class HashedPasswordOnDiskDatabaseTestCase(unittest.TestCase):
|
| - users = [
|
| - ('user1', 'pass1'),
|
| - ('user2', 'pass2'),
|
| - ('user3', 'pass3'),
|
| - ]
|
| -
|
| -
|
| - def hash(self, u, p, s):
|
| - return crypt(p, s)
|
| -
|
| - def setUp(self):
|
| - dbfile = self.mktemp()
|
| - self.db = checkers.FilePasswordDB(dbfile, hash=self.hash)
|
| - f = file(dbfile, 'w')
|
| - for (u, p) in self.users:
|
| - f.write('%s:%s\n' % (u, crypt(p, u[:2])))
|
| - f.close()
|
| - r = TestRealm()
|
| - self.port = portal.Portal(r)
|
| - self.port.registerChecker(self.db)
|
| -
|
| - def testGoodCredentials(self):
|
| - goodCreds = [credentials.UsernamePassword(u, p) for u, p in self.users]
|
| - d = defer.gatherResults([self.db.requestAvatarId(c) for c in goodCreds])
|
| - d.addCallback(self.assertEquals, [u for u, p in self.users])
|
| - return d
|
| -
|
| - def testGoodCredentials_login(self):
|
| - goodCreds = [credentials.UsernamePassword(u, p) for u, p in self.users]
|
| - d = defer.gatherResults([self.port.login(c, None, ITestable)
|
| - for c in goodCreds])
|
| - d.addCallback(lambda x: [a.original.name for i, a, l in x])
|
| - d.addCallback(self.assertEquals, [u for u, p in self.users])
|
| - return d
|
| -
|
| - def testBadCredentials(self):
|
| - badCreds = [credentials.UsernamePassword(u, 'wrong password')
|
| - for u, p in self.users]
|
| - d = defer.DeferredList([self.port.login(c, None, ITestable)
|
| - for c in badCreds], consumeErrors=True)
|
| - d.addCallback(self._assertFailures, error.UnauthorizedLogin)
|
| - return d
|
| -
|
| - def testHashedCredentials(self):
|
| - hashedCreds = [credentials.UsernameHashedPassword(u, crypt(p, u[:2]))
|
| - for u, p in self.users]
|
| - d = defer.DeferredList([self.port.login(c, None, ITestable)
|
| - for c in hashedCreds], consumeErrors=True)
|
| - d.addCallback(self._assertFailures, error.UnhandledCredentials)
|
| - return d
|
| -
|
| - def _assertFailures(self, failures, *expectedFailures):
|
| - for flag, failure in failures:
|
| - self.failUnlessEqual(flag, defer.FAILURE)
|
| - failure.trap(*expectedFailures)
|
| - return None
|
| -
|
| - if crypt is None:
|
| - skip = "crypt module not available"
|
| -
|
| -class PluggableAuthenticationModulesTest(unittest.TestCase):
|
| -
|
| - def setUp(self):
|
| - """
|
| - Replace L{pamauth.callIntoPAM} with a dummy implementation with
|
| - easily-controlled behavior.
|
| - """
|
| - self._oldCallIntoPAM = pamauth.callIntoPAM
|
| - pamauth.callIntoPAM = self.callIntoPAM
|
| -
|
| -
|
| - def tearDown(self):
|
| - """
|
| - Restore the original value of L{pamauth.callIntoPAM}.
|
| - """
|
| - pamauth.callIntoPAM = self._oldCallIntoPAM
|
| -
|
| -
|
| - def callIntoPAM(self, service, user, conv):
|
| - if service != 'Twisted':
|
| - raise error.UnauthorizedLogin('bad service: %s' % service)
|
| - if user != 'testuser':
|
| - raise error.UnauthorizedLogin('bad username: %s' % user)
|
| - questions = [
|
| - (1, "Password"),
|
| - (2, "Message w/ Input"),
|
| - (3, "Message w/o Input"),
|
| - ]
|
| - replies = conv(questions)
|
| - if replies != [
|
| - ("password", 0),
|
| - ("entry", 0),
|
| - ("", 0)
|
| - ]:
|
| - raise error.UnauthorizedLogin('bad conversion: %s' % repr(replies))
|
| - return 1
|
| -
|
| - def _makeConv(self, d):
|
| - def conv(questions):
|
| - return defer.succeed([(d[t], 0) for t, q in questions])
|
| - return conv
|
| -
|
| - def testRequestAvatarId(self):
|
| - db = checkers.PluggableAuthenticationModulesChecker()
|
| - conv = self._makeConv({1:'password', 2:'entry', 3:''})
|
| - creds = credentials.PluggableAuthenticationModules('testuser',
|
| - conv)
|
| - d = db.requestAvatarId(creds)
|
| - d.addCallback(self.assertEquals, 'testuser')
|
| - return d
|
| -
|
| - def testBadCredentials(self):
|
| - db = checkers.PluggableAuthenticationModulesChecker()
|
| - conv = self._makeConv({1:'', 2:'', 3:''})
|
| - creds = credentials.PluggableAuthenticationModules('testuser',
|
| - conv)
|
| - d = db.requestAvatarId(creds)
|
| - self.assertFailure(d, error.UnauthorizedLogin)
|
| - return d
|
| -
|
| - def testBadUsername(self):
|
| - db = checkers.PluggableAuthenticationModulesChecker()
|
| - conv = self._makeConv({1:'password', 2:'entry', 3:''})
|
| - creds = credentials.PluggableAuthenticationModules('baduser',
|
| - conv)
|
| - d = db.requestAvatarId(creds)
|
| - self.assertFailure(d, error.UnauthorizedLogin)
|
| - return d
|
| -
|
| - if not pamauth:
|
| - skip = "Can't run without PyPAM"
|
| -
|
| -class CheckersMixin:
|
| - def testPositive(self):
|
| - for chk in self.getCheckers():
|
| - for (cred, avatarId) in self.getGoodCredentials():
|
| - r = wFD(chk.requestAvatarId(cred))
|
| - yield r
|
| - self.assertEquals(r.getResult(), avatarId)
|
| - testPositive = dG(testPositive)
|
| -
|
| - def testNegative(self):
|
| - for chk in self.getCheckers():
|
| - for cred in self.getBadCredentials():
|
| - r = wFD(chk.requestAvatarId(cred))
|
| - yield r
|
| - self.assertRaises(error.UnauthorizedLogin, r.getResult)
|
| - testNegative = dG(testNegative)
|
| -
|
| -class HashlessFilePasswordDBMixin:
|
| - credClass = credentials.UsernamePassword
|
| - diskHash = None
|
| - networkHash = staticmethod(lambda x: x)
|
| -
|
| - _validCredentials = [
|
| - ('user1', 'password1'),
|
| - ('user2', 'password2'),
|
| - ('user3', 'password3')]
|
| -
|
| - def getGoodCredentials(self):
|
| - for u, p in self._validCredentials:
|
| - yield self.credClass(u, self.networkHash(p)), u
|
| -
|
| - def getBadCredentials(self):
|
| - for u, p in [('user1', 'password3'),
|
| - ('user2', 'password1'),
|
| - ('bloof', 'blarf')]:
|
| - yield self.credClass(u, self.networkHash(p))
|
| -
|
| - def getCheckers(self):
|
| - diskHash = self.diskHash or (lambda x: x)
|
| - hashCheck = self.diskHash and (lambda username, password, stored: self.diskHash(password))
|
| -
|
| - for cache in True, False:
|
| - fn = self.mktemp()
|
| - fObj = file(fn, 'w')
|
| - for u, p in self._validCredentials:
|
| - fObj.write('%s:%s\n' % (u, diskHash(p)))
|
| - fObj.close()
|
| - yield checkers.FilePasswordDB(fn, cache=cache, hash=hashCheck)
|
| -
|
| - fn = self.mktemp()
|
| - fObj = file(fn, 'w')
|
| - for u, p in self._validCredentials:
|
| - fObj.write('%s dingle dongle %s\n' % (diskHash(p), u))
|
| - fObj.close()
|
| - yield checkers.FilePasswordDB(fn, ' ', 3, 0, cache=cache, hash=hashCheck)
|
| -
|
| - fn = self.mktemp()
|
| - fObj = file(fn, 'w')
|
| - for u, p in self._validCredentials:
|
| - fObj.write('zip,zap,%s,zup,%s\n' % (u.title(), diskHash(p)))
|
| - fObj.close()
|
| - yield checkers.FilePasswordDB(fn, ',', 2, 4, False, cache=cache, hash=hashCheck)
|
| -
|
| -class LocallyHashedFilePasswordDBMixin(HashlessFilePasswordDBMixin):
|
| - diskHash = staticmethod(lambda x: x.encode('hex'))
|
| -
|
| -class NetworkHashedFilePasswordDBMixin(HashlessFilePasswordDBMixin):
|
| - networkHash = staticmethod(lambda x: x.encode('hex'))
|
| - class credClass(credentials.UsernameHashedPassword):
|
| - def checkPassword(self, password):
|
| - return self.hashed.decode('hex') == password
|
| -
|
| -class HashlessFilePasswordDBCheckerTestCase(HashlessFilePasswordDBMixin, CheckersMixin, unittest.TestCase):
|
| - pass
|
| -
|
| -class LocallyHashedFilePasswordDBCheckerTestCase(LocallyHashedFilePasswordDBMixin, CheckersMixin, unittest.TestCase):
|
| - pass
|
| -
|
| -class NetworkHashedFilePasswordDBCheckerTestCase(NetworkHashedFilePasswordDBMixin, CheckersMixin, unittest.TestCase):
|
| - pass
|
| -
|
|
|