Index: third_party/twisted_8_1/twisted/conch/test/test_userauth.py |
diff --git a/third_party/twisted_8_1/twisted/conch/test/test_userauth.py b/third_party/twisted_8_1/twisted/conch/test/test_userauth.py |
deleted file mode 100644 |
index b0b601f915f06445d621de06d6c9fa2195c13bbd..0000000000000000000000000000000000000000 |
--- a/third_party/twisted_8_1/twisted/conch/test/test_userauth.py |
+++ /dev/null |
@@ -1,176 +0,0 @@ |
-from zope.interface import implements |
- |
-from twisted.cred.checkers import ICredentialsChecker |
-from twisted.cred.credentials import IUsernamePassword |
-from twisted.cred.error import UnauthorizedLogin |
-from twisted.cred.portal import IRealm, Portal |
- |
-from twisted.conch.error import ConchError |
-from twisted.conch.ssh import userauth |
-from twisted.conch.ssh.common import NS |
-from twisted.conch.ssh.transport import SSHServerTransport |
- |
-from twisted.internet import defer |
- |
-from twisted.trial import unittest |
- |
- |
- |
-class FakeTransport(SSHServerTransport): |
- """ |
- L{userauth.SSHUserAuthServer} expects an SSH transport which has a factory |
- attribute which has a portal attribute. Because the portal is important for |
- testing authentication, we need to be able to provide an interesting portal |
- object to the C{SSHUserAuthServer}. |
- |
- In addition, we want to be able to capture any packets sent over the |
- transport. |
- """ |
- |
- |
- class Service(object): |
- name = 'nancy' |
- |
- def serviceStarted(self): |
- pass |
- |
- |
- class Factory(object): |
- def _makeService(self): |
- return FakeTransport.Service() |
- |
- def getService(self, transport, nextService): |
- # This has to return a callable. |
- return self._makeService |
- |
- |
- def __init__(self, portal): |
- self.factory = self.Factory() |
- self.factory.portal = portal |
- self.packets = [] |
- |
- |
- def sendPacket(self, messageType, message): |
- self.packets.append((messageType, message)) |
- |
- |
- def isEncrypted(self, direction): |
- """ |
- Pretend that this transport encrypts traffic in both directions. The |
- SSHUserAuthServer disables password authentication if the transport |
- isn't encrypted. |
- """ |
- return True |
- |
- |
- |
-class Realm(object): |
- """ |
- A mock realm for testing L{userauth.SSHUserAuthServer}. |
- |
- This realm is not actually used in the course of testing, so it returns the |
- simplest thing that could possibly work. |
- """ |
- |
- implements(IRealm) |
- |
- def requestAvatar(self, avatarId, mind, *interfaces): |
- return defer.succeed((interfaces[0], None, lambda: None)) |
- |
- |
- |
-class MockChecker(object): |
- """ |
- A very simple username/password checker which authenticates anyone whose |
- password matches their username and rejects all others. |
- """ |
- |
- credentialInterfaces = (IUsernamePassword,) |
- implements(ICredentialsChecker) |
- |
- |
- def requestAvatarId(self, creds): |
- if creds.username == creds.password: |
- return defer.succeed(creds.username) |
- return defer.fail(UnauthorizedLogin("Invalid username/password pair")) |
- |
- |
- |
-class TestSSHUserAuthServer(unittest.TestCase): |
- """Tests for SSHUserAuthServer.""" |
- |
- def setUp(self): |
- self.realm = Realm() |
- portal = Portal(self.realm) |
- portal.registerChecker(MockChecker()) |
- self.authServer = userauth.SSHUserAuthServer() |
- self.authServer.transport = FakeTransport(portal) |
- self.authServer.serviceStarted() |
- |
- |
- def tearDown(self): |
- self.authServer.serviceStopped() |
- self.authServer = None |
- |
- |
- def test_successfulAuthentication(self): |
- """ |
- When provided with correct authentication information, the server |
- should respond by sending a MSG_USERAUTH_SUCCESS message with no other |
- data. |
- |
- See RFC 4252, Section 5.1. |
- """ |
- packet = NS('foo') + NS('none') + NS('password') + chr(0) + NS('foo') |
- d = self.authServer.ssh_USERAUTH_REQUEST(packet) |
- |
- def check(ignored): |
- # Check that the server reports the failure, including 'password' |
- # as a valid authentication type. |
- self.assertEqual( |
- self.authServer.transport.packets, |
- [(userauth.MSG_USERAUTH_SUCCESS, '')]) |
- return d.addCallback(check) |
- |
- |
- def test_failedAuthentication(self): |
- """ |
- When provided with invalid authentication details, the server should |
- respond by sending a MSG_USERAUTH_FAILURE message which states whether |
- the authentication was partially successful, and provides other, open |
- options for authentication. |
- |
- See RFC 4252, Section 5.1. |
- """ |
- # packet = username, next_service, authentication type, FALSE, password |
- packet = NS('foo') + NS('none') + NS('password') + chr(0) + NS('bar') |
- d = self.authServer.ssh_USERAUTH_REQUEST(packet) |
- |
- def check(ignored): |
- # Check that the server reports the failure, including 'password' |
- # as a valid authentication type. |
- self.assertEqual( |
- self.authServer.transport.packets, |
- [(userauth.MSG_USERAUTH_FAILURE, NS('password') + chr(0))]) |
- return d.addCallback(check) |
- |
- |
- def test_requestRaisesConchError(self): |
- """ |
- ssh_USERAUTH_REQUEST should raise a ConchError if tryAuth returns |
- None. Added to catch a bug noticed by pyflakes. This is a whitebox |
- test. |
- """ |
- def mockTryAuth(kind, user, data): |
- return None |
- |
- def mockEbBadAuth(reason): |
- reason.trap(ConchError) |
- |
- self.patch(self.authServer, 'tryAuth', mockTryAuth) |
- self.patch(self.authServer, '_ebBadAuth', mockEbBadAuth) |
- |
- packet = NS('user') + NS('none') + NS('public-key') + NS('data') |
- # If an error other than ConchError is raised, this will trigger an |
- # exception. |
- return self.authServer.ssh_USERAUTH_REQUEST(packet) |