Index: third_party/twisted_8_1/twisted/mail/test/test_smtp.py |
diff --git a/third_party/twisted_8_1/twisted/mail/test/test_smtp.py b/third_party/twisted_8_1/twisted/mail/test/test_smtp.py |
deleted file mode 100644 |
index 34d8bcf5dc14f7796270752320e54e3396313ca1..0000000000000000000000000000000000000000 |
--- a/third_party/twisted_8_1/twisted/mail/test/test_smtp.py |
+++ /dev/null |
@@ -1,982 +0,0 @@ |
-# Copyright (c) 2001-2007 Twisted Matrix Laboratories. |
-# See LICENSE for details. |
- |
-""" |
-Test cases for twisted.mail.smtp module. |
-""" |
- |
-from zope.interface import implements |
- |
-from twisted.trial import unittest, util |
-from twisted.protocols import basic, loopback |
-from twisted.mail import smtp |
-from twisted.internet import defer, protocol, reactor, interfaces |
-from twisted.internet import address, error, task |
-from twisted.test.test_protocols import StringIOWithoutClosing |
-from twisted.test.proto_helpers import StringTransport |
- |
-from twisted import cred |
-import twisted.cred.error |
-import twisted.cred.portal |
-import twisted.cred.checkers |
-import twisted.cred.credentials |
- |
-from twisted.cred.portal import IRealm, Portal |
-from twisted.cred.checkers import ICredentialsChecker, AllowAnonymousAccess |
-from twisted.cred.credentials import IAnonymous |
-from twisted.cred.error import UnauthorizedLogin |
- |
-from twisted.mail import imap4 |
- |
- |
-try: |
- from twisted.test.ssl_helpers import ClientTLSContext, ServerTLSContext |
-except ImportError: |
- ClientTLSContext = ServerTLSContext = None |
- |
-import re |
- |
-try: |
- from cStringIO import StringIO |
-except ImportError: |
- from StringIO import StringIO |
- |
-def spameater(*spam, **eggs): |
- return None |
- |
-class DummyMessage: |
- |
- def __init__(self, domain, user): |
- self.domain = domain |
- self.user = user |
- self.buffer = [] |
- |
- def lineReceived(self, line): |
- # Throw away the generated Received: header |
- if not re.match('Received: From yyy.com \(\[.*\]\) by localhost;', line): |
- self.buffer.append(line) |
- |
- def eomReceived(self): |
- message = '\n'.join(self.buffer) + '\n' |
- self.domain.messages[self.user.dest.local].append(message) |
- deferred = defer.Deferred() |
- deferred.callback("saved") |
- return deferred |
- |
- |
-class DummyDomain: |
- |
- def __init__(self, names): |
- self.messages = {} |
- for name in names: |
- self.messages[name] = [] |
- |
- def exists(self, user): |
- if self.messages.has_key(user.dest.local): |
- return defer.succeed(lambda: self.startMessage(user)) |
- return defer.fail(smtp.SMTPBadRcpt(user)) |
- |
- def startMessage(self, user): |
- return DummyMessage(self, user) |
- |
-class SMTPTestCase(unittest.TestCase): |
- |
- messages = [('foo@bar.com', ['foo@baz.com', 'qux@baz.com'], '''\ |
-Subject: urgent\015 |
-\015 |
-Someone set up us the bomb!\015 |
-''')] |
- |
- mbox = {'foo': ['Subject: urgent\n\nSomeone set up us the bomb!\n']} |
- |
- def setUp(self): |
- self.factory = smtp.SMTPFactory() |
- self.factory.domains = {} |
- self.factory.domains['baz.com'] = DummyDomain(['foo']) |
- self.output = StringIOWithoutClosing() |
- self.transport = protocol.FileWrapper(self.output) |
- |
- def testMessages(self): |
- from twisted.mail import protocols |
- protocol = protocols.DomainSMTP() |
- protocol.service = self.factory |
- protocol.factory = self.factory |
- protocol.receivedHeader = spameater |
- protocol.makeConnection(self.transport) |
- protocol.lineReceived('HELO yyy.com') |
- for message in self.messages: |
- protocol.lineReceived('MAIL FROM:<%s>' % message[0]) |
- for target in message[1]: |
- protocol.lineReceived('RCPT TO:<%s>' % target) |
- protocol.lineReceived('DATA') |
- protocol.dataReceived(message[2]) |
- protocol.lineReceived('.') |
- protocol.lineReceived('QUIT') |
- if self.mbox != self.factory.domains['baz.com'].messages: |
- raise AssertionError(self.factory.domains['baz.com'].messages) |
- protocol.setTimeout(None) |
- |
- testMessages.suppress = [util.suppress(message='DomainSMTP', category=DeprecationWarning)] |
- |
-mail = '''\ |
-Subject: hello |
- |
-Goodbye |
-''' |
- |
-class MyClient: |
- def __init__(self): |
- self.mail = 'moshez@foo.bar', ['moshez@foo.bar'], mail |
- |
- def getMailFrom(self): |
- return self.mail[0] |
- |
- def getMailTo(self): |
- return self.mail[1] |
- |
- def getMailData(self): |
- return StringIO(self.mail[2]) |
- |
- def sentMail(self, code, resp, numOk, addresses, log): |
- self.mail = None, None, None |
- |
-class MySMTPClient(MyClient, smtp.SMTPClient): |
- def __init__(self): |
- smtp.SMTPClient.__init__(self, 'foo.baz') |
- MyClient.__init__(self) |
- |
-class MyESMTPClient(MyClient, smtp.ESMTPClient): |
- def __init__(self, secret = '', contextFactory = None): |
- smtp.ESMTPClient.__init__(self, secret, contextFactory, 'foo.baz') |
- MyClient.__init__(self) |
- |
-class LoopbackMixin: |
- def loopback(self, server, client): |
- return loopback.loopbackTCP(server, client) |
- |
-class LoopbackTestCase(LoopbackMixin): |
- def testMessages(self): |
- factory = smtp.SMTPFactory() |
- factory.domains = {} |
- factory.domains['foo.bar'] = DummyDomain(['moshez']) |
- from twisted.mail.protocols import DomainSMTP |
- protocol = DomainSMTP() |
- protocol.service = factory |
- protocol.factory = factory |
- clientProtocol = self.clientClass() |
- return self.loopback(protocol, clientProtocol) |
- testMessages.suppress = [util.suppress(message='DomainSMTP', category=DeprecationWarning)] |
- |
-class LoopbackSMTPTestCase(LoopbackTestCase, unittest.TestCase): |
- clientClass = MySMTPClient |
- |
-class LoopbackESMTPTestCase(LoopbackTestCase, unittest.TestCase): |
- clientClass = MyESMTPClient |
- |
- |
-class FakeSMTPServer(basic.LineReceiver): |
- |
- clientData = [ |
- '220 hello', '250 nice to meet you', |
- '250 great', '250 great', '354 go on, lad' |
- ] |
- |
- def connectionMade(self): |
- self.buffer = [] |
- self.clientData = self.clientData[:] |
- self.clientData.reverse() |
- self.sendLine(self.clientData.pop()) |
- |
- def lineReceived(self, line): |
- self.buffer.append(line) |
- if line == "QUIT": |
- self.transport.write("221 see ya around\r\n") |
- self.transport.loseConnection() |
- elif line == ".": |
- self.transport.write("250 gotcha\r\n") |
- elif line == "RSET": |
- self.transport.loseConnection() |
- |
- if self.clientData: |
- self.sendLine(self.clientData.pop()) |
- |
- |
-class SMTPClientTestCase(unittest.TestCase, LoopbackMixin): |
- |
- expected_output = [ |
- 'HELO foo.baz', 'MAIL FROM:<moshez@foo.bar>', |
- 'RCPT TO:<moshez@foo.bar>', 'DATA', |
- 'Subject: hello', '', 'Goodbye', '.', 'RSET' |
- ] |
- |
- def testMessages(self): |
- # this test is disabled temporarily |
- client = MySMTPClient() |
- server = FakeSMTPServer() |
- d = self.loopback(server, client) |
- d.addCallback(lambda x : |
- self.assertEquals(server.buffer, self.expected_output)) |
- return d |
- |
-class DummySMTPMessage: |
- |
- def __init__(self, protocol, users): |
- self.protocol = protocol |
- self.users = users |
- self.buffer = [] |
- |
- def lineReceived(self, line): |
- self.buffer.append(line) |
- |
- def eomReceived(self): |
- message = '\n'.join(self.buffer) + '\n' |
- helo, origin = self.users[0].helo[0], str(self.users[0].orig) |
- recipients = [] |
- for user in self.users: |
- recipients.append(str(user)) |
- self.protocol.message[tuple(recipients)] = (helo, origin, recipients, message) |
- return defer.succeed("saved") |
- |
-class DummyProto: |
- def connectionMade(self): |
- self.dummyMixinBase.connectionMade(self) |
- self.message = {} |
- |
- def startMessage(self, users): |
- return DummySMTPMessage(self, users) |
- |
- def receivedHeader(*spam): |
- return None |
- |
- def validateTo(self, user): |
- self.delivery = DummyDelivery() |
- return lambda: self.startMessage([user]) |
- |
- def validateFrom(self, helo, origin): |
- return origin |
- |
-class DummySMTP(DummyProto, smtp.SMTP): |
- dummyMixinBase = smtp.SMTP |
- |
-class DummyESMTP(DummyProto, smtp.ESMTP): |
- dummyMixinBase = smtp.ESMTP |
- |
-class AnotherTestCase: |
- serverClass = None |
- clientClass = None |
- |
- messages = [ ('foo.com', 'moshez@foo.com', ['moshez@bar.com'], |
- 'moshez@foo.com', ['moshez@bar.com'], '''\ |
-From: Moshe |
-To: Moshe |
- |
-Hi, |
-how are you? |
-'''), |
- ('foo.com', 'tttt@rrr.com', ['uuu@ooo', 'yyy@eee'], |
- 'tttt@rrr.com', ['uuu@ooo', 'yyy@eee'], '''\ |
-Subject: pass |
- |
-..rrrr.. |
-'''), |
- ('foo.com', '@this,@is,@ignored:foo@bar.com', |
- ['@ignore,@this,@too:bar@foo.com'], |
- 'foo@bar.com', ['bar@foo.com'], '''\ |
-Subject: apa |
-To: foo |
- |
-123 |
-. |
-456 |
-'''), |
- ] |
- |
- data = [ |
- ('', '220.*\r\n$', None, None), |
- ('HELO foo.com\r\n', '250.*\r\n$', None, None), |
- ('RSET\r\n', '250.*\r\n$', None, None), |
- ] |
- for helo_, from_, to_, realfrom, realto, msg in messages: |
- data.append(('MAIL FROM:<%s>\r\n' % from_, '250.*\r\n', |
- None, None)) |
- for rcpt in to_: |
- data.append(('RCPT TO:<%s>\r\n' % rcpt, '250.*\r\n', |
- None, None)) |
- |
- data.append(('DATA\r\n','354.*\r\n', |
- msg, ('250.*\r\n', |
- (helo_, realfrom, realto, msg)))) |
- |
- |
- def testBuffer(self): |
- output = StringIOWithoutClosing() |
- a = self.serverClass() |
- class fooFactory: |
- domain = 'foo.com' |
- |
- a.factory = fooFactory() |
- a.makeConnection(protocol.FileWrapper(output)) |
- for (send, expect, msg, msgexpect) in self.data: |
- if send: |
- a.dataReceived(send) |
- data = output.getvalue() |
- output.truncate(0) |
- if not re.match(expect, data): |
- raise AssertionError, (send, expect, data) |
- if data[:3] == '354': |
- for line in msg.splitlines(): |
- if line and line[0] == '.': |
- line = '.' + line |
- a.dataReceived(line + '\r\n') |
- a.dataReceived('.\r\n') |
- # Special case for DATA. Now we want a 250, and then |
- # we compare the messages |
- data = output.getvalue() |
- output.truncate() |
- resp, msgdata = msgexpect |
- if not re.match(resp, data): |
- raise AssertionError, (resp, data) |
- for recip in msgdata[2]: |
- expected = list(msgdata[:]) |
- expected[2] = [recip] |
- self.assertEquals( |
- a.message[(recip,)], |
- tuple(expected) |
- ) |
- a.setTimeout(None) |
- |
- |
-class AnotherESMTPTestCase(AnotherTestCase, unittest.TestCase): |
- serverClass = DummyESMTP |
- clientClass = MyESMTPClient |
- |
-class AnotherSMTPTestCase(AnotherTestCase, unittest.TestCase): |
- serverClass = DummySMTP |
- clientClass = MySMTPClient |
- |
- |
- |
-class DummyChecker: |
- implements(cred.checkers.ICredentialsChecker) |
- |
- users = { |
- 'testuser': 'testpassword' |
- } |
- |
- credentialInterfaces = (cred.credentials.IUsernameHashedPassword,) |
- |
- def requestAvatarId(self, credentials): |
- return defer.maybeDeferred( |
- credentials.checkPassword, self.users[credentials.username] |
- ).addCallback(self._cbCheck, credentials.username) |
- |
- def _cbCheck(self, result, username): |
- if result: |
- return username |
- raise cred.error.UnauthorizedLogin() |
- |
-class DummyDelivery: |
- implements(smtp.IMessageDelivery) |
- |
- def validateTo(self, user): |
- return user |
- |
- def validateFrom(self, helo, origin): |
- return origin |
- |
- def receivedHeader(*args): |
- return None |
- |
-class DummyRealm: |
- def requestAvatar(self, avatarId, mind, *interfaces): |
- return smtp.IMessageDelivery, DummyDelivery(), lambda: None |
- |
-class AuthTestCase(unittest.TestCase, LoopbackMixin): |
- def testAuth(self): |
- realm = DummyRealm() |
- p = cred.portal.Portal(realm) |
- p.registerChecker(DummyChecker()) |
- |
- server = DummyESMTP({'CRAM-MD5': cred.credentials.CramMD5Credentials}) |
- server.portal = p |
- client = MyESMTPClient('testpassword') |
- |
- cAuth = imap4.CramMD5ClientAuthenticator('testuser') |
- client.registerAuthenticator(cAuth) |
- |
- d = self.loopback(server, client) |
- d.addCallback(lambda x : self.assertEquals(server.authenticated, 1)) |
- return d |
- |
-class SMTPHelperTestCase(unittest.TestCase): |
- def testMessageID(self): |
- d = {} |
- for i in range(1000): |
- m = smtp.messageid('testcase') |
- self.failIf(m in d) |
- d[m] = None |
- |
- def testQuoteAddr(self): |
- cases = [ |
- ['user@host.name', '<user@host.name>'], |
- ['"User Name" <user@host.name>', '<user@host.name>'], |
- [smtp.Address('someguy@someplace'), '<someguy@someplace>'], |
- ['', '<>'], |
- [smtp.Address(''), '<>'], |
- ] |
- |
- for (c, e) in cases: |
- self.assertEquals(smtp.quoteaddr(c), e) |
- |
- def testUser(self): |
- u = smtp.User('user@host', 'helo.host.name', None, None) |
- self.assertEquals(str(u), 'user@host') |
- |
- def testXtextEncoding(self): |
- cases = [ |
- ('Hello world', 'Hello+20world'), |
- ('Hello+world', 'Hello+2Bworld'), |
- ('\0\1\2\3\4\5', '+00+01+02+03+04+05'), |
- ('e=mc2@example.com', 'e+3Dmc2@example.com') |
- ] |
- |
- for (case, expected) in cases: |
- self.assertEquals(case.encode('xtext'), expected) |
- self.assertEquals(expected.decode('xtext'), case) |
- |
- |
-class NoticeTLSClient(MyESMTPClient): |
- tls = False |
- |
- def esmtpState_starttls(self, code, resp): |
- MyESMTPClient.esmtpState_starttls(self, code, resp) |
- self.tls = True |
- |
-class TLSTestCase(unittest.TestCase, LoopbackMixin): |
- def testTLS(self): |
- clientCTX = ClientTLSContext() |
- serverCTX = ServerTLSContext() |
- |
- client = NoticeTLSClient(contextFactory=clientCTX) |
- server = DummyESMTP(contextFactory=serverCTX) |
- |
- def check(ignored): |
- self.assertEquals(client.tls, True) |
- self.assertEquals(server.startedTLS, True) |
- |
- return self.loopback(server, client).addCallback(check) |
- |
-if ClientTLSContext is None: |
- for case in (TLSTestCase,): |
- case.skip = "OpenSSL not present" |
- |
-if not interfaces.IReactorSSL.providedBy(reactor): |
- for case in (TLSTestCase,): |
- case.skip = "Reactor doesn't support SSL" |
- |
-class EmptyLineTestCase(unittest.TestCase): |
- def testEmptyLineSyntaxError(self): |
- proto = smtp.SMTP() |
- output = StringIOWithoutClosing() |
- transport = protocol.FileWrapper(output) |
- proto.makeConnection(transport) |
- proto.lineReceived('') |
- proto.setTimeout(None) |
- |
- out = output.getvalue().splitlines() |
- self.assertEquals(len(out), 2) |
- self.failUnless(out[0].startswith('220')) |
- self.assertEquals(out[1], "500 Error: bad syntax") |
- |
- |
- |
-class TimeoutTestCase(unittest.TestCase, LoopbackMixin): |
- """ |
- Check that SMTP client factories correctly use the timeout. |
- """ |
- |
- def _timeoutTest(self, onDone, clientFactory): |
- """ |
- Connect the clientFactory, and check the timeout on the request. |
- """ |
- clock = task.Clock() |
- client = clientFactory.buildProtocol( |
- address.IPv4Address('TCP', 'example.net', 25)) |
- client.callLater = clock.callLater |
- t = StringTransport() |
- client.makeConnection(t) |
- t.protocol = client |
- def check(ign): |
- self.assertEquals(clock.seconds(), 0.5) |
- d = self.assertFailure(onDone, smtp.SMTPTimeoutError |
- ).addCallback(check) |
- # The first call should not trigger the timeout |
- clock.advance(0.1) |
- # But this one should |
- clock.advance(0.4) |
- return d |
- |
- |
- def test_SMTPClient(self): |
- """ |
- Test timeout for L{smtp.SMTPSenderFactory}: the response L{Deferred} |
- should be errback with a L{smtp.SMTPTimeoutError}. |
- """ |
- onDone = defer.Deferred() |
- clientFactory = smtp.SMTPSenderFactory( |
- 'source@address', 'recipient@address', |
- StringIO("Message body"), onDone, |
- retries=0, timeout=0.5) |
- return self._timeoutTest(onDone, clientFactory) |
- |
- |
- def test_ESMTPClient(self): |
- """ |
- Test timeout for L{smtp.ESMTPSenderFactory}: the response L{Deferred} |
- should be errback with a L{smtp.SMTPTimeoutError}. |
- """ |
- onDone = defer.Deferred() |
- clientFactory = smtp.ESMTPSenderFactory( |
- 'username', 'password', |
- 'source@address', 'recipient@address', |
- StringIO("Message body"), onDone, |
- retries=0, timeout=0.5) |
- return self._timeoutTest(onDone, clientFactory) |
- |
- |
- |
-class SingletonRealm(object): |
- """ |
- Trivial realm implementation which is constructed with an interface and an |
- avatar and returns that avatar when asked for that interface. |
- """ |
- implements(IRealm) |
- |
- def __init__(self, interface, avatar): |
- self.interface = interface |
- self.avatar = avatar |
- |
- |
- def requestAvatar(self, avatarId, mind, *interfaces): |
- for iface in interfaces: |
- if iface is self.interface: |
- return iface, self.avatar, lambda: None |
- |
- |
- |
-class NotImplementedDelivery(object): |
- """ |
- Non-implementation of L{smtp.IMessageDelivery} which only has methods which |
- raise L{NotImplementedError}. Subclassed by various tests to provide the |
- particular behavior being tested. |
- """ |
- def validateFrom(self, helo, origin): |
- raise NotImplementedError("This oughtn't be called in the course of this test.") |
- |
- |
- def validateTo(self, user): |
- raise NotImplementedError("This oughtn't be called in the course of this test.") |
- |
- |
- def receivedHeader(self, helo, origin, recipients): |
- raise NotImplementedError("This oughtn't be called in the course of this test.") |
- |
- |
- |
-class SMTPServerTestCase(unittest.TestCase): |
- """ |
- Test various behaviors of L{twisted.mail.smtp.SMTP} and |
- L{twisted.mail.smtp.ESMTP}. |
- """ |
- def testSMTPGreetingHost(self, serverClass=smtp.SMTP): |
- """ |
- Test that the specified hostname shows up in the SMTP server's |
- greeting. |
- """ |
- s = serverClass() |
- s.host = "example.com" |
- t = StringTransport() |
- s.makeConnection(t) |
- s.connectionLost(error.ConnectionDone()) |
- self.assertIn("example.com", t.value()) |
- |
- |
- def testSMTPGreetingNotExtended(self): |
- """ |
- Test that the string "ESMTP" does not appear in the SMTP server's |
- greeting since that string strongly suggests the presence of support |
- for various SMTP extensions which are not supported by L{smtp.SMTP}. |
- """ |
- s = smtp.SMTP() |
- t = StringTransport() |
- s.makeConnection(t) |
- s.connectionLost(error.ConnectionDone()) |
- self.assertNotIn("ESMTP", t.value()) |
- |
- |
- def testESMTPGreetingHost(self): |
- """ |
- Similar to testSMTPGreetingHost, but for the L{smtp.ESMTP} class. |
- """ |
- self.testSMTPGreetingHost(smtp.ESMTP) |
- |
- |
- def testESMTPGreetingExtended(self): |
- """ |
- Test that the string "ESMTP" does appear in the ESMTP server's |
- greeting since L{smtp.ESMTP} does support the SMTP extensions which |
- that advertises to the client. |
- """ |
- s = smtp.ESMTP() |
- t = StringTransport() |
- s.makeConnection(t) |
- s.connectionLost(error.ConnectionDone()) |
- self.assertIn("ESMTP", t.value()) |
- |
- |
- def test_acceptSenderAddress(self): |
- """ |
- Test that a C{MAIL FROM} command with an acceptable address is |
- responded to with the correct success code. |
- """ |
- class AcceptanceDelivery(NotImplementedDelivery): |
- """ |
- Delivery object which accepts all senders as valid. |
- """ |
- def validateFrom(self, helo, origin): |
- return origin |
- |
- realm = SingletonRealm(smtp.IMessageDelivery, AcceptanceDelivery()) |
- portal = Portal(realm, [AllowAnonymousAccess()]) |
- proto = smtp.SMTP() |
- proto.portal = portal |
- trans = StringTransport() |
- proto.makeConnection(trans) |
- |
- # Deal with the necessary preliminaries |
- proto.dataReceived('HELO example.com\r\n') |
- trans.clear() |
- |
- # Try to specify our sender address |
- proto.dataReceived('MAIL FROM:<alice@example.com>\r\n') |
- |
- # Clean up the protocol before doing anything that might raise an |
- # exception. |
- proto.connectionLost(error.ConnectionLost()) |
- |
- # Make sure that we received exactly the correct response |
- self.assertEqual( |
- trans.value(), |
- '250 Sender address accepted\r\n') |
- |
- |
- def test_deliveryRejectedSenderAddress(self): |
- """ |
- Test that a C{MAIL FROM} command with an address rejected by a |
- L{smtp.IMessageDelivery} instance is responded to with the correct |
- error code. |
- """ |
- class RejectionDelivery(NotImplementedDelivery): |
- """ |
- Delivery object which rejects all senders as invalid. |
- """ |
- def validateFrom(self, helo, origin): |
- raise smtp.SMTPBadSender(origin) |
- |
- realm = SingletonRealm(smtp.IMessageDelivery, RejectionDelivery()) |
- portal = Portal(realm, [AllowAnonymousAccess()]) |
- proto = smtp.SMTP() |
- proto.portal = portal |
- trans = StringTransport() |
- proto.makeConnection(trans) |
- |
- # Deal with the necessary preliminaries |
- proto.dataReceived('HELO example.com\r\n') |
- trans.clear() |
- |
- # Try to specify our sender address |
- proto.dataReceived('MAIL FROM:<alice@example.com>\r\n') |
- |
- # Clean up the protocol before doing anything that might raise an |
- # exception. |
- proto.connectionLost(error.ConnectionLost()) |
- |
- # Make sure that we received exactly the correct response |
- self.assertEqual( |
- trans.value(), |
- '550 Cannot receive from specified address ' |
- '<alice@example.com>: Sender not acceptable\r\n') |
- |
- |
- def test_portalRejectedSenderAddress(self): |
- """ |
- Test that a C{MAIL FROM} command with an address rejected by an |
- L{smtp.SMTP} instance's portal is responded to with the correct error |
- code. |
- """ |
- class DisallowAnonymousAccess(object): |
- """ |
- Checker for L{IAnonymous} which rejects authentication attempts. |
- """ |
- implements(ICredentialsChecker) |
- |
- credentialInterfaces = (IAnonymous,) |
- |
- def requestAvatarId(self, credentials): |
- return defer.fail(UnauthorizedLogin()) |
- |
- realm = SingletonRealm(smtp.IMessageDelivery, NotImplementedDelivery()) |
- portal = Portal(realm, [DisallowAnonymousAccess()]) |
- proto = smtp.SMTP() |
- proto.portal = portal |
- trans = StringTransport() |
- proto.makeConnection(trans) |
- |
- # Deal with the necessary preliminaries |
- proto.dataReceived('HELO example.com\r\n') |
- trans.clear() |
- |
- # Try to specify our sender address |
- proto.dataReceived('MAIL FROM:<alice@example.com>\r\n') |
- |
- # Clean up the protocol before doing anything that might raise an |
- # exception. |
- proto.connectionLost(error.ConnectionLost()) |
- |
- # Make sure that we received exactly the correct response |
- self.assertEqual( |
- trans.value(), |
- '550 Cannot receive from specified address ' |
- '<alice@example.com>: Sender not acceptable\r\n') |
- |
- |
- def test_portalRejectedAnonymousSender(self): |
- """ |
- Test that a C{MAIL FROM} command issued without first authenticating |
- when a portal has been configured to disallow anonymous logins is |
- responded to with the correct error code. |
- """ |
- realm = SingletonRealm(smtp.IMessageDelivery, NotImplementedDelivery()) |
- portal = Portal(realm, []) |
- proto = smtp.SMTP() |
- proto.portal = portal |
- trans = StringTransport() |
- proto.makeConnection(trans) |
- |
- # Deal with the necessary preliminaries |
- proto.dataReceived('HELO example.com\r\n') |
- trans.clear() |
- |
- # Try to specify our sender address |
- proto.dataReceived('MAIL FROM:<alice@example.com>\r\n') |
- |
- # Clean up the protocol before doing anything that might raise an |
- # exception. |
- proto.connectionLost(error.ConnectionLost()) |
- |
- # Make sure that we received exactly the correct response |
- self.assertEqual( |
- trans.value(), |
- '550 Cannot receive from specified address ' |
- '<alice@example.com>: Unauthenticated senders not allowed\r\n') |
- |
- |
- |
-class ESMTPAuthenticationTestCase(unittest.TestCase): |
- def assertServerResponse(self, bytes, response): |
- """ |
- Assert that when the given bytes are delivered to the ESMTP server |
- instance, it responds with the indicated lines. |
- |
- @type bytes: str |
- @type response: list of str |
- """ |
- self.transport.clear() |
- self.server.dataReceived(bytes) |
- self.assertEqual( |
- response, |
- self.transport.value().splitlines()) |
- |
- |
- def assertServerAuthenticated(self, loginArgs, username="username", password="password"): |
- """ |
- Assert that a login attempt has been made, that the credentials and |
- interfaces passed to it are correct, and that when the login request |
- is satisfied, a successful response is sent by the ESMTP server |
- instance. |
- |
- @param loginArgs: A C{list} previously passed to L{portalFactory}. |
- """ |
- d, credentials, mind, interfaces = loginArgs.pop() |
- self.assertEqual(loginArgs, []) |
- self.failUnless(twisted.cred.credentials.IUsernamePassword.providedBy(credentials)) |
- self.assertEqual(credentials.username, username) |
- self.failUnless(credentials.checkPassword(password)) |
- self.assertIn(smtp.IMessageDeliveryFactory, interfaces) |
- self.assertIn(smtp.IMessageDelivery, interfaces) |
- d.callback((smtp.IMessageDeliveryFactory, None, lambda: None)) |
- |
- self.assertEqual( |
- ["235 Authentication successful."], |
- self.transport.value().splitlines()) |
- |
- |
- def setUp(self): |
- """ |
- Create an ESMTP instance attached to a StringTransport. |
- """ |
- self.server = smtp.ESMTP({ |
- 'LOGIN': imap4.LOGINCredentials}) |
- self.server.host = 'localhost' |
- self.transport = StringTransport( |
- peerAddress=address.IPv4Address('TCP', '127.0.0.1', 12345)) |
- self.server.makeConnection(self.transport) |
- |
- |
- def tearDown(self): |
- """ |
- Disconnect the ESMTP instance to clean up its timeout DelayedCall. |
- """ |
- self.server.connectionLost(error.ConnectionDone()) |
- |
- |
- def portalFactory(self, loginList): |
- class DummyPortal: |
- def login(self, credentials, mind, *interfaces): |
- d = defer.Deferred() |
- loginList.append((d, credentials, mind, interfaces)) |
- return d |
- return DummyPortal() |
- |
- |
- def test_authenticationCapabilityAdvertised(self): |
- """ |
- Test that AUTH is advertised to clients which issue an EHLO command. |
- """ |
- self.transport.clear() |
- self.server.dataReceived('EHLO\r\n') |
- responseLines = self.transport.value().splitlines() |
- self.assertEqual( |
- responseLines[0], |
- "250-localhost Hello 127.0.0.1, nice to meet you") |
- self.assertEqual( |
- responseLines[1], |
- "250 AUTH LOGIN") |
- self.assertEqual(len(responseLines), 2) |
- |
- |
- def test_plainAuthentication(self): |
- """ |
- Test that the LOGIN authentication mechanism can be used |
- """ |
- loginArgs = [] |
- self.server.portal = self.portalFactory(loginArgs) |
- |
- self.server.dataReceived('EHLO\r\n') |
- self.transport.clear() |
- |
- self.assertServerResponse( |
- 'AUTH LOGIN\r\n', |
- ["334 " + "User Name\0".encode('base64').strip()]) |
- |
- self.assertServerResponse( |
- 'username'.encode('base64') + '\r\n', |
- ["334 " + "Password\0".encode('base64').strip()]) |
- |
- self.assertServerResponse( |
- 'password'.encode('base64').strip() + '\r\n', |
- []) |
- |
- self.assertServerAuthenticated(loginArgs) |
- |
- |
- def test_plainAuthenticationEmptyPassword(self): |
- """ |
- Test that giving an empty password for plain auth succeeds. |
- """ |
- loginArgs = [] |
- self.server.portal = self.portalFactory(loginArgs) |
- |
- self.server.dataReceived('EHLO\r\n') |
- self.transport.clear() |
- |
- self.assertServerResponse( |
- 'AUTH LOGIN\r\n', |
- ["334 " + "User Name\0".encode('base64').strip()]) |
- |
- self.assertServerResponse( |
- 'username'.encode('base64') + '\r\n', |
- ["334 " + "Password\0".encode('base64').strip()]) |
- |
- self.assertServerResponse('\r\n', []) |
- self.assertServerAuthenticated(loginArgs, password='') |
- |
- def test_plainAuthenticationInitialResponse(self): |
- """ |
- The response to the first challenge may be included on the AUTH command |
- line. Test that this is also supported. |
- """ |
- loginArgs = [] |
- self.server.portal = self.portalFactory(loginArgs) |
- |
- self.server.dataReceived('EHLO\r\n') |
- self.transport.clear() |
- |
- self.assertServerResponse( |
- 'AUTH LOGIN ' + "username".encode('base64').strip() + '\r\n', |
- ["334 " + "Password\0".encode('base64').strip()]) |
- |
- self.assertServerResponse( |
- 'password'.encode('base64').strip() + '\r\n', |
- []) |
- |
- self.assertServerAuthenticated(loginArgs) |
- |
- |
- def test_abortAuthentication(self): |
- """ |
- Test that a challenge/response sequence can be aborted by the client. |
- """ |
- loginArgs = [] |
- self.server.portal = self.portalFactory(loginArgs) |
- |
- self.server.dataReceived('EHLO\r\n') |
- self.server.dataReceived('AUTH LOGIN\r\n') |
- |
- self.assertServerResponse( |
- '*\r\n', |
- ['501 Authentication aborted']) |
- |
- |
- def test_invalidBase64EncodedResponse(self): |
- """ |
- Test that a response which is not properly Base64 encoded results in |
- the appropriate error code. |
- """ |
- loginArgs = [] |
- self.server.portal = self.portalFactory(loginArgs) |
- |
- self.server.dataReceived('EHLO\r\n') |
- self.server.dataReceived('AUTH LOGIN\r\n') |
- |
- self.assertServerResponse( |
- 'x\r\n', |
- ['501 Syntax error in parameters or arguments']) |
- |
- self.assertEqual(loginArgs, []) |
- |
- |
- def test_invalidBase64EncodedInitialResponse(self): |
- """ |
- Like L{test_invalidBase64EncodedResponse} but for the case of an |
- initial response included with the C{AUTH} command. |
- """ |
- loginArgs = [] |
- self.server.portal = self.portalFactory(loginArgs) |
- |
- self.server.dataReceived('EHLO\r\n') |
- self.assertServerResponse( |
- 'AUTH LOGIN x\r\n', |
- ['501 Syntax error in parameters or arguments']) |
- |
- self.assertEqual(loginArgs, []) |