Index: third_party/twisted_8_1/twisted/mail/test/test_mail.py |
diff --git a/third_party/twisted_8_1/twisted/mail/test/test_mail.py b/third_party/twisted_8_1/twisted/mail/test/test_mail.py |
deleted file mode 100644 |
index 5a888c8f4415df019f24a7a36498bd875e741baa..0000000000000000000000000000000000000000 |
--- a/third_party/twisted_8_1/twisted/mail/test/test_mail.py |
+++ /dev/null |
@@ -1,1863 +0,0 @@ |
-# Copyright (c) 2001-2008 Twisted Matrix Laboratories. |
-# See LICENSE for details. |
- |
-""" |
-Tests for large portions of L{twisted.mail}. |
-""" |
- |
-import os |
-import errno |
-import md5 |
-import shutil |
-import pickle |
-import StringIO |
-import rfc822 |
-import tempfile |
-import signal |
- |
-from zope.interface import Interface, implements |
- |
-from twisted.trial import unittest |
-from twisted.mail import smtp |
-from twisted.mail import pop3 |
-from twisted.names import dns |
-from twisted.internet import protocol |
-from twisted.internet import defer |
-from twisted.internet.defer import Deferred |
-from twisted.internet import reactor |
-from twisted.internet import interfaces |
-from twisted.internet import task |
-from twisted.internet.error import DNSLookupError, CannotListenError |
-from twisted.internet.error import ProcessDone, ProcessTerminated |
-from twisted.internet import address |
-from twisted.python import failure |
-from twisted.python.filepath import FilePath |
- |
-from twisted import mail |
-import twisted.mail.mail |
-import twisted.mail.maildir |
-import twisted.mail.relay |
-import twisted.mail.relaymanager |
-import twisted.mail.protocols |
-import twisted.mail.alias |
- |
-from twisted.names.error import DNSNameError |
-from twisted.names.dns import RRHeader, Record_CNAME, Record_MX |
- |
-from twisted import cred |
-import twisted.cred.credentials |
-import twisted.cred.checkers |
-import twisted.cred.portal |
- |
-from twisted.test.proto_helpers import LineSendingProtocol |
- |
-class DomainWithDefaultsTestCase(unittest.TestCase): |
- def testMethods(self): |
- d = dict([(x, x + 10) for x in range(10)]) |
- d = mail.mail.DomainWithDefaultDict(d, 'Default') |
- |
- self.assertEquals(len(d), 10) |
- self.assertEquals(list(iter(d)), range(10)) |
- self.assertEquals(list(d.iterkeys()), list(iter(d))) |
- |
- items = list(d.iteritems()) |
- items.sort() |
- self.assertEquals(items, [(x, x + 10) for x in range(10)]) |
- |
- values = list(d.itervalues()) |
- values.sort() |
- self.assertEquals(values, range(10, 20)) |
- |
- items = d.items() |
- items.sort() |
- self.assertEquals(items, [(x, x + 10) for x in range(10)]) |
- |
- values = d.values() |
- values.sort() |
- self.assertEquals(values, range(10, 20)) |
- |
- for x in range(10): |
- self.assertEquals(d[x], x + 10) |
- self.assertEquals(d.get(x), x + 10) |
- self.failUnless(x in d) |
- self.failUnless(d.has_key(x)) |
- |
- del d[2], d[4], d[6] |
- |
- self.assertEquals(len(d), 7) |
- self.assertEquals(d[2], 'Default') |
- self.assertEquals(d[4], 'Default') |
- self.assertEquals(d[6], 'Default') |
- |
- d.update({'a': None, 'b': (), 'c': '*'}) |
- self.assertEquals(len(d), 10) |
- self.assertEquals(d['a'], None) |
- self.assertEquals(d['b'], ()) |
- self.assertEquals(d['c'], '*') |
- |
- d.clear() |
- self.assertEquals(len(d), 0) |
- |
- self.assertEquals(d.setdefault('key', 'value'), 'value') |
- self.assertEquals(d['key'], 'value') |
- |
- self.assertEquals(d.popitem(), ('key', 'value')) |
- self.assertEquals(len(d), 0) |
- |
- dcopy = d.copy() |
- self.assertEquals(d.domains, dcopy.domains) |
- self.assertEquals(d.default, dcopy.default) |
- |
- |
- def _stringificationTest(self, stringifier): |
- """ |
- Assert that the class name of a L{mail.mail.DomainWithDefaultDict} |
- instance and the string-formatted underlying domain dictionary both |
- appear in the string produced by the given string-returning function. |
- |
- @type stringifier: one-argument callable |
- @param stringifier: either C{str} or C{repr}, to be used to get a |
- string to make assertions against. |
- """ |
- domain = mail.mail.DomainWithDefaultDict({}, 'Default') |
- self.assertIn(domain.__class__.__name__, stringifier(domain)) |
- domain['key'] = 'value' |
- self.assertIn(str({'key': 'value'}), stringifier(domain)) |
- |
- |
- def test_str(self): |
- """ |
- L{DomainWithDefaultDict.__str__} should return a string including |
- the class name and the domain mapping held by the instance. |
- """ |
- self._stringificationTest(str) |
- |
- |
- def test_repr(self): |
- """ |
- L{DomainWithDefaultDict.__repr__} should return a string including |
- the class name and the domain mapping held by the instance. |
- """ |
- self._stringificationTest(repr) |
- |
- |
- |
-class BounceTestCase(unittest.TestCase): |
- def setUp(self): |
- self.domain = mail.mail.BounceDomain() |
- |
- def testExists(self): |
- self.assertRaises(smtp.AddressError, self.domain.exists, "any user") |
- |
- def testRelay(self): |
- self.assertEquals( |
- self.domain.willRelay("random q emailer", "protocol"), |
- False |
- ) |
- |
- def testMessage(self): |
- self.assertRaises(NotImplementedError, self.domain.startMessage, "whomever") |
- |
- def testAddUser(self): |
- self.domain.addUser("bob", "password") |
- self.assertRaises(smtp.SMTPBadRcpt, self.domain.exists, "bob") |
- |
-class FileMessageTestCase(unittest.TestCase): |
- def setUp(self): |
- self.name = "fileMessage.testFile" |
- self.final = "final.fileMessage.testFile" |
- self.f = file(self.name, 'w') |
- self.fp = mail.mail.FileMessage(self.f, self.name, self.final) |
- |
- def tearDown(self): |
- try: |
- self.f.close() |
- except: |
- pass |
- try: |
- os.remove(self.name) |
- except: |
- pass |
- try: |
- os.remove(self.final) |
- except: |
- pass |
- |
- def testFinalName(self): |
- return self.fp.eomReceived().addCallback(self._cbFinalName) |
- |
- def _cbFinalName(self, result): |
- self.assertEquals(result, self.final) |
- self.failUnless(self.f.closed) |
- self.failIf(os.path.exists(self.name)) |
- |
- def testContents(self): |
- contents = "first line\nsecond line\nthird line\n" |
- for line in contents.splitlines(): |
- self.fp.lineReceived(line) |
- self.fp.eomReceived() |
- self.assertEquals(file(self.final).read(), contents) |
- |
- def testInterrupted(self): |
- contents = "first line\nsecond line\n" |
- for line in contents.splitlines(): |
- self.fp.lineReceived(line) |
- self.fp.connectionLost() |
- self.failIf(os.path.exists(self.name)) |
- self.failIf(os.path.exists(self.final)) |
- |
-class MailServiceTestCase(unittest.TestCase): |
- def setUp(self): |
- self.service = mail.mail.MailService() |
- |
- def testFactories(self): |
- f = self.service.getPOP3Factory() |
- self.failUnless(isinstance(f, protocol.ServerFactory)) |
- self.failUnless(f.buildProtocol(('127.0.0.1', 12345)), pop3.POP3) |
- |
- f = self.service.getSMTPFactory() |
- self.failUnless(isinstance(f, protocol.ServerFactory)) |
- self.failUnless(f.buildProtocol(('127.0.0.1', 12345)), smtp.SMTP) |
- |
- f = self.service.getESMTPFactory() |
- self.failUnless(isinstance(f, protocol.ServerFactory)) |
- self.failUnless(f.buildProtocol(('127.0.0.1', 12345)), smtp.ESMTP) |
- |
- def testPortals(self): |
- o1 = object() |
- o2 = object() |
- self.service.portals['domain'] = o1 |
- self.service.portals[''] = o2 |
- |
- self.failUnless(self.service.lookupPortal('domain') is o1) |
- self.failUnless(self.service.defaultPortal() is o2) |
- |
-class FailingMaildirMailboxAppendMessageTask(mail.maildir._MaildirMailboxAppendMessageTask): |
- _openstate = True |
- _writestate = True |
- _renamestate = True |
- def osopen(self, fn, attr, mode): |
- if self._openstate: |
- return os.open(fn, attr, mode) |
- else: |
- raise OSError(errno.EPERM, "Faked Permission Problem") |
- def oswrite(self, fh, data): |
- if self._writestate: |
- return os.write(fh, data) |
- else: |
- raise OSError(errno.ENOSPC, "Faked Space problem") |
- def osrename(self, oldname, newname): |
- if self._renamestate: |
- return os.rename(oldname, newname) |
- else: |
- raise OSError(errno.EPERM, "Faked Permission Problem") |
- |
-class MaildirAppendStringTestCase(unittest.TestCase): |
- def setUp(self): |
- self.d = self.mktemp() |
- mail.maildir.initializeMaildir(self.d) |
- |
- def tearDown(self): |
- shutil.rmtree(self.d) |
- |
- def _append(self, ignored, mbox): |
- d = mbox.appendMessage('TEST') |
- return self.assertFailure(d, Exception) |
- |
- def _setState(self, ignored, mbox, rename=None, write=None, open=None): |
- if rename is not None: |
- mbox.AppendFactory._renameState = rename |
- if write is not None: |
- mbox.AppendFactory._writeState = write |
- if open is not None: |
- mbox.AppendFactory._openstate = open |
- |
- def testAppend(self): |
- mbox = mail.maildir.MaildirMailbox(self.d) |
- mbox.AppendFactory = FailingMaildirMailboxAppendMessageTask |
- ds = [] |
- for i in xrange(1, 11): |
- ds.append(mbox.appendMessage("X" * i)) |
- ds[-1].addCallback(self.assertEqual, None) |
- d = defer.gatherResults(ds) |
- d.addCallback(self._cbTestAppend, mbox) |
- return d |
- |
- def _cbTestAppend(self, result, mbox): |
- self.assertEquals(len(mbox.listMessages()), |
- 10) |
- self.assertEquals(len(mbox.getMessage(5).read()), 6) |
- # test in the right order: last to first error location. |
- mbox.AppendFactory._renamestate = False |
- d = self._append(None, mbox) |
- d.addCallback(self._setState, mbox, rename=True, write=False) |
- d.addCallback(self._append, mbox) |
- d.addCallback(self._setState, mbox, write=True, open=False) |
- d.addCallback(self._append, mbox) |
- d.addCallback(self._setState, mbox, open=True) |
- return d |
- |
- |
-class MaildirAppendFileTestCase(unittest.TestCase): |
- def setUp(self): |
- self.d = self.mktemp() |
- mail.maildir.initializeMaildir(self.d) |
- |
- def tearDown(self): |
- shutil.rmtree(self.d) |
- |
- def testAppend(self): |
- mbox = mail.maildir.MaildirMailbox(self.d) |
- ds = [] |
- def _check(res, t): |
- t.close() |
- self.assertEqual(res, None) |
- for i in xrange(1, 11): |
- temp = tempfile.TemporaryFile() |
- temp.write("X" * i) |
- temp.seek(0,0) |
- ds.append(mbox.appendMessage(temp)) |
- ds[-1].addCallback(_check, temp) |
- return defer.gatherResults(ds).addCallback(self._cbTestAppend, mbox) |
- |
- def _cbTestAppend(self, result, mbox): |
- self.assertEquals(len(mbox.listMessages()), |
- 10) |
- self.assertEquals(len(mbox.getMessage(5).read()), 6) |
- |
- |
-class MaildirTestCase(unittest.TestCase): |
- def setUp(self): |
- self.d = self.mktemp() |
- mail.maildir.initializeMaildir(self.d) |
- |
- def tearDown(self): |
- shutil.rmtree(self.d) |
- |
- def testInitializer(self): |
- d = self.d |
- trash = os.path.join(d, '.Trash') |
- |
- self.failUnless(os.path.exists(d) and os.path.isdir(d)) |
- self.failUnless(os.path.exists(os.path.join(d, 'new'))) |
- self.failUnless(os.path.exists(os.path.join(d, 'cur'))) |
- self.failUnless(os.path.exists(os.path.join(d, 'tmp'))) |
- self.failUnless(os.path.isdir(os.path.join(d, 'new'))) |
- self.failUnless(os.path.isdir(os.path.join(d, 'cur'))) |
- self.failUnless(os.path.isdir(os.path.join(d, 'tmp'))) |
- |
- self.failUnless(os.path.exists(os.path.join(trash, 'new'))) |
- self.failUnless(os.path.exists(os.path.join(trash, 'cur'))) |
- self.failUnless(os.path.exists(os.path.join(trash, 'tmp'))) |
- self.failUnless(os.path.isdir(os.path.join(trash, 'new'))) |
- self.failUnless(os.path.isdir(os.path.join(trash, 'cur'))) |
- self.failUnless(os.path.isdir(os.path.join(trash, 'tmp'))) |
- |
- def testMailbox(self): |
- j = os.path.join |
- n = mail.maildir._generateMaildirName |
- msgs = [j(b, n()) for b in ('cur', 'new') for x in range(5)] |
- |
- # Toss a few files into the mailbox |
- i = 1 |
- for f in msgs: |
- f = file(j(self.d, f), 'w') |
- f.write('x' * i) |
- f.close() |
- i = i + 1 |
- |
- mb = mail.maildir.MaildirMailbox(self.d) |
- self.assertEquals(mb.listMessages(), range(1, 11)) |
- self.assertEquals(mb.listMessages(1), 2) |
- self.assertEquals(mb.listMessages(5), 6) |
- |
- self.assertEquals(mb.getMessage(6).read(), 'x' * 7) |
- self.assertEquals(mb.getMessage(1).read(), 'x' * 2) |
- |
- d = {} |
- for i in range(10): |
- u = mb.getUidl(i) |
- self.failIf(u in d) |
- d[u] = None |
- |
- p, f = os.path.split(msgs[5]) |
- |
- mb.deleteMessage(5) |
- self.assertEquals(mb.listMessages(5), 0) |
- self.failUnless(os.path.exists(j(self.d, '.Trash', 'cur', f))) |
- self.failIf(os.path.exists(j(self.d, msgs[5]))) |
- |
- mb.undeleteMessages() |
- self.assertEquals(mb.listMessages(5), 6) |
- self.failIf(os.path.exists(j(self.d, '.Trash', 'cur', f))) |
- self.failUnless(os.path.exists(j(self.d, msgs[5]))) |
- |
-class MaildirDirdbmDomainTestCase(unittest.TestCase): |
- def setUp(self): |
- self.P = self.mktemp() |
- self.S = mail.mail.MailService() |
- self.D = mail.maildir.MaildirDirdbmDomain(self.S, self.P) |
- |
- def tearDown(self): |
- shutil.rmtree(self.P) |
- |
- def testAddUser(self): |
- toAdd = (('user1', 'pwd1'), ('user2', 'pwd2'), ('user3', 'pwd3')) |
- for (u, p) in toAdd: |
- self.D.addUser(u, p) |
- |
- for (u, p) in toAdd: |
- self.failUnless(u in self.D.dbm) |
- self.assertEquals(self.D.dbm[u], p) |
- self.failUnless(os.path.exists(os.path.join(self.P, u))) |
- |
- def testCredentials(self): |
- creds = self.D.getCredentialsCheckers() |
- |
- self.assertEquals(len(creds), 1) |
- self.failUnless(cred.checkers.ICredentialsChecker.providedBy(creds[0])) |
- self.failUnless(cred.credentials.IUsernamePassword in creds[0].credentialInterfaces) |
- |
- def testRequestAvatar(self): |
- class ISomething(Interface): |
- pass |
- |
- self.D.addUser('user', 'password') |
- self.assertRaises( |
- NotImplementedError, |
- self.D.requestAvatar, 'user', None, ISomething |
- ) |
- |
- t = self.D.requestAvatar('user', None, pop3.IMailbox) |
- self.assertEquals(len(t), 3) |
- self.failUnless(t[0] is pop3.IMailbox) |
- self.failUnless(pop3.IMailbox.providedBy(t[1])) |
- |
- t[2]() |
- |
- def testRequestAvatarId(self): |
- self.D.addUser('user', 'password') |
- database = self.D.getCredentialsCheckers()[0] |
- |
- creds = cred.credentials.UsernamePassword('user', 'wrong password') |
- self.assertRaises( |
- cred.error.UnauthorizedLogin, |
- database.requestAvatarId, creds |
- ) |
- |
- creds = cred.credentials.UsernamePassword('user', 'password') |
- self.assertEquals(database.requestAvatarId(creds), 'user') |
- |
- |
-class StubAliasableDomain(object): |
- """ |
- Minimal testable implementation of IAliasableDomain. |
- """ |
- implements(mail.mail.IAliasableDomain) |
- |
- def exists(self, user): |
- """ |
- No test coverage for invocations of this method on domain objects, |
- so we just won't implement it. |
- """ |
- raise NotImplementedError() |
- |
- |
- def addUser(self, user, password): |
- """ |
- No test coverage for invocations of this method on domain objects, |
- so we just won't implement it. |
- """ |
- raise NotImplementedError() |
- |
- |
- def getCredentialsCheckers(self): |
- """ |
- This needs to succeed in order for other tests to complete |
- successfully, but we don't actually assert anything about its |
- behavior. Return an empty list. Sometime later we should return |
- something else and assert that a portal got set up properly. |
- """ |
- return [] |
- |
- |
- def setAliasGroup(self, aliases): |
- """ |
- Just record the value so the test can check it later. |
- """ |
- self.aliasGroup = aliases |
- |
- |
-class ServiceDomainTestCase(unittest.TestCase): |
- def setUp(self): |
- self.S = mail.mail.MailService() |
- self.D = mail.protocols.DomainDeliveryBase(self.S, None) |
- self.D.service = self.S |
- self.D.protocolName = 'TEST' |
- self.D.host = 'hostname' |
- |
- self.tmpdir = self.mktemp() |
- domain = mail.maildir.MaildirDirdbmDomain(self.S, self.tmpdir) |
- domain.addUser('user', 'password') |
- self.S.addDomain('test.domain', domain) |
- |
- def tearDown(self): |
- shutil.rmtree(self.tmpdir) |
- |
- |
- def testAddAliasableDomain(self): |
- """ |
- Test that adding an IAliasableDomain to a mail service properly sets |
- up alias group references and such. |
- """ |
- aliases = object() |
- domain = StubAliasableDomain() |
- self.S.aliases = aliases |
- self.S.addDomain('example.com', domain) |
- self.assertIdentical(domain.aliasGroup, aliases) |
- |
- |
- def testReceivedHeader(self): |
- hdr = self.D.receivedHeader( |
- ('remotehost', '123.232.101.234'), |
- smtp.Address('<someguy@somplace>'), |
- ['user@host.name'] |
- ) |
- fp = StringIO.StringIO(hdr) |
- m = rfc822.Message(fp) |
- self.assertEquals(len(m.items()), 1) |
- self.failUnless(m.has_key('Received')) |
- |
- def testValidateTo(self): |
- user = smtp.User('user@test.domain', 'helo', None, 'wherever@whatever') |
- return defer.maybeDeferred(self.D.validateTo, user |
- ).addCallback(self._cbValidateTo |
- ) |
- |
- def _cbValidateTo(self, result): |
- self.failUnless(callable(result)) |
- |
- def testValidateToBadUsername(self): |
- user = smtp.User('resu@test.domain', 'helo', None, 'wherever@whatever') |
- return self.assertFailure( |
- defer.maybeDeferred(self.D.validateTo, user), |
- smtp.SMTPBadRcpt) |
- |
- def testValidateToBadDomain(self): |
- user = smtp.User('user@domain.test', 'helo', None, 'wherever@whatever') |
- return self.assertFailure( |
- defer.maybeDeferred(self.D.validateTo, user), |
- smtp.SMTPBadRcpt) |
- |
- def testValidateFrom(self): |
- helo = ('hostname', '127.0.0.1') |
- origin = smtp.Address('<user@hostname>') |
- self.failUnless(self.D.validateFrom(helo, origin) is origin) |
- |
- helo = ('hostname', '1.2.3.4') |
- origin = smtp.Address('<user@hostname>') |
- self.failUnless(self.D.validateFrom(helo, origin) is origin) |
- |
- helo = ('hostname', '1.2.3.4') |
- origin = smtp.Address('<>') |
- self.failUnless(self.D.validateFrom(helo, origin) is origin) |
- |
- self.assertRaises( |
- smtp.SMTPBadSender, |
- self.D.validateFrom, None, origin |
- ) |
- |
-class VirtualPOP3TestCase(unittest.TestCase): |
- def setUp(self): |
- self.tmpdir = self.mktemp() |
- self.S = mail.mail.MailService() |
- self.D = mail.maildir.MaildirDirdbmDomain(self.S, self.tmpdir) |
- self.D.addUser('user', 'password') |
- self.S.addDomain('test.domain', self.D) |
- |
- portal = cred.portal.Portal(self.D) |
- map(portal.registerChecker, self.D.getCredentialsCheckers()) |
- self.S.portals[''] = self.S.portals['test.domain'] = portal |
- |
- self.P = mail.protocols.VirtualPOP3() |
- self.P.service = self.S |
- self.P.magic = '<unit test magic>' |
- |
- def tearDown(self): |
- shutil.rmtree(self.tmpdir) |
- |
- def testAuthenticateAPOP(self): |
- resp = md5.new(self.P.magic + 'password').hexdigest() |
- return self.P.authenticateUserAPOP('user', resp |
- ).addCallback(self._cbAuthenticateAPOP |
- ) |
- |
- def _cbAuthenticateAPOP(self, result): |
- self.assertEquals(len(result), 3) |
- self.assertEquals(result[0], pop3.IMailbox) |
- self.failUnless(pop3.IMailbox.providedBy(result[1])) |
- result[2]() |
- |
- def testAuthenticateIncorrectUserAPOP(self): |
- resp = md5.new(self.P.magic + 'password').hexdigest() |
- return self.assertFailure( |
- self.P.authenticateUserAPOP('resu', resp), |
- cred.error.UnauthorizedLogin) |
- |
- def testAuthenticateIncorrectResponseAPOP(self): |
- resp = md5.new('wrong digest').hexdigest() |
- return self.assertFailure( |
- self.P.authenticateUserAPOP('user', resp), |
- cred.error.UnauthorizedLogin) |
- |
- def testAuthenticatePASS(self): |
- return self.P.authenticateUserPASS('user', 'password' |
- ).addCallback(self._cbAuthenticatePASS |
- ) |
- |
- def _cbAuthenticatePASS(self, result): |
- self.assertEquals(len(result), 3) |
- self.assertEquals(result[0], pop3.IMailbox) |
- self.failUnless(pop3.IMailbox.providedBy(result[1])) |
- result[2]() |
- |
- def testAuthenticateBadUserPASS(self): |
- return self.assertFailure( |
- self.P.authenticateUserPASS('resu', 'password'), |
- cred.error.UnauthorizedLogin) |
- |
- def testAuthenticateBadPasswordPASS(self): |
- return self.assertFailure( |
- self.P.authenticateUserPASS('user', 'wrong password'), |
- cred.error.UnauthorizedLogin) |
- |
-class empty(smtp.User): |
- def __init__(self): |
- pass |
- |
-class RelayTestCase(unittest.TestCase): |
- def testExists(self): |
- service = mail.mail.MailService() |
- domain = mail.relay.DomainQueuer(service) |
- |
- doRelay = [ |
- address.UNIXAddress('/var/run/mail-relay'), |
- address.IPv4Address('TCP', '127.0.0.1', 12345), |
- ] |
- |
- dontRelay = [ |
- address.IPv4Address('TCP', '192.168.2.1', 62), |
- address.IPv4Address('TCP', '1.2.3.4', 1943), |
- ] |
- |
- for peer in doRelay: |
- user = empty() |
- user.orig = 'user@host' |
- user.dest = 'tsoh@resu' |
- user.protocol = empty() |
- user.protocol.transport = empty() |
- user.protocol.transport.getPeer = lambda: peer |
- |
- self.failUnless(callable(domain.exists(user))) |
- |
- for peer in dontRelay: |
- user = empty() |
- user.orig = 'some@place' |
- user.protocol = empty() |
- user.protocol.transport = empty() |
- user.protocol.transport.getPeer = lambda: peer |
- user.dest = 'who@cares' |
- |
- self.assertRaises(smtp.SMTPBadRcpt, domain.exists, user) |
- |
-class RelayerTestCase(unittest.TestCase): |
- def setUp(self): |
- self.tmpdir = self.mktemp() |
- os.mkdir(self.tmpdir) |
- self.messageFiles = [] |
- for i in range(10): |
- name = os.path.join(self.tmpdir, 'body-%d' % (i,)) |
- f = file(name + '-H', 'w') |
- pickle.dump(['from-%d' % (i,), 'to-%d' % (i,)], f) |
- f.close() |
- |
- f = file(name + '-D', 'w') |
- f.write(name) |
- f.seek(0, 0) |
- self.messageFiles.append(name) |
- |
- self.R = mail.relay.RelayerMixin() |
- self.R.loadMessages(self.messageFiles) |
- |
- def tearDown(self): |
- shutil.rmtree(self.tmpdir) |
- |
- def testMailFrom(self): |
- for i in range(10): |
- self.assertEquals(self.R.getMailFrom(), 'from-%d' % (i,)) |
- self.R.sentMail(250, None, None, None, None) |
- self.assertEquals(self.R.getMailFrom(), None) |
- |
- def testMailTo(self): |
- for i in range(10): |
- self.assertEquals(self.R.getMailTo(), ['to-%d' % (i,)]) |
- self.R.sentMail(250, None, None, None, None) |
- self.assertEquals(self.R.getMailTo(), None) |
- |
- def testMailData(self): |
- for i in range(10): |
- name = os.path.join(self.tmpdir, 'body-%d' % (i,)) |
- self.assertEquals(self.R.getMailData().read(), name) |
- self.R.sentMail(250, None, None, None, None) |
- self.assertEquals(self.R.getMailData(), None) |
- |
-class Manager: |
- def __init__(self): |
- self.success = [] |
- self.failure = [] |
- self.done = [] |
- |
- def notifySuccess(self, factory, message): |
- self.success.append((factory, message)) |
- |
- def notifyFailure(self, factory, message): |
- self.failure.append((factory, message)) |
- |
- def notifyDone(self, factory): |
- self.done.append(factory) |
- |
-class ManagedRelayerTestCase(unittest.TestCase): |
- def setUp(self): |
- self.manager = Manager() |
- self.messages = range(0, 20, 2) |
- self.factory = object() |
- self.relay = mail.relaymanager.ManagedRelayerMixin(self.manager) |
- self.relay.messages = self.messages[:] |
- self.relay.names = self.messages[:] |
- self.relay.factory = self.factory |
- |
- def testSuccessfulSentMail(self): |
- for i in self.messages: |
- self.relay.sentMail(250, None, None, None, None) |
- |
- self.assertEquals( |
- self.manager.success, |
- [(self.factory, m) for m in self.messages] |
- ) |
- |
- def testFailedSentMail(self): |
- for i in self.messages: |
- self.relay.sentMail(550, None, None, None, None) |
- |
- self.assertEquals( |
- self.manager.failure, |
- [(self.factory, m) for m in self.messages] |
- ) |
- |
- def testConnectionLost(self): |
- self.relay.connectionLost(failure.Failure(Exception())) |
- self.assertEquals(self.manager.done, [self.factory]) |
- |
-class DirectoryQueueTestCase(unittest.TestCase): |
- def setUp(self): |
- # This is almost a test case itself. |
- self.tmpdir = self.mktemp() |
- os.mkdir(self.tmpdir) |
- self.queue = mail.relaymanager.Queue(self.tmpdir) |
- self.queue.noisy = False |
- for m in range(25): |
- hdrF, msgF = self.queue.createNewMessage() |
- pickle.dump(['header', m], hdrF) |
- hdrF.close() |
- msgF.lineReceived('body: %d' % (m,)) |
- msgF.eomReceived() |
- self.queue.readDirectory() |
- |
- def tearDown(self): |
- shutil.rmtree(self.tmpdir) |
- |
- def testWaiting(self): |
- self.failUnless(self.queue.hasWaiting()) |
- self.assertEquals(len(self.queue.getWaiting()), 25) |
- |
- waiting = self.queue.getWaiting() |
- self.queue.setRelaying(waiting[0]) |
- self.assertEquals(len(self.queue.getWaiting()), 24) |
- |
- self.queue.setWaiting(waiting[0]) |
- self.assertEquals(len(self.queue.getWaiting()), 25) |
- |
- def testRelaying(self): |
- for m in self.queue.getWaiting(): |
- self.queue.setRelaying(m) |
- self.assertEquals( |
- len(self.queue.getRelayed()), |
- 25 - len(self.queue.getWaiting()) |
- ) |
- |
- self.failIf(self.queue.hasWaiting()) |
- |
- relayed = self.queue.getRelayed() |
- self.queue.setWaiting(relayed[0]) |
- self.assertEquals(len(self.queue.getWaiting()), 1) |
- self.assertEquals(len(self.queue.getRelayed()), 24) |
- |
- def testDone(self): |
- msg = self.queue.getWaiting()[0] |
- self.queue.setRelaying(msg) |
- self.queue.done(msg) |
- |
- self.assertEquals(len(self.queue.getWaiting()), 24) |
- self.assertEquals(len(self.queue.getRelayed()), 0) |
- |
- self.failIf(msg in self.queue.getWaiting()) |
- self.failIf(msg in self.queue.getRelayed()) |
- |
- def testEnvelope(self): |
- envelopes = [] |
- |
- for msg in self.queue.getWaiting(): |
- envelopes.append(self.queue.getEnvelope(msg)) |
- |
- envelopes.sort() |
- for i in range(25): |
- self.assertEquals( |
- envelopes.pop(0), |
- ['header', i] |
- ) |
- |
-from twisted.names import server |
-from twisted.names import client |
-from twisted.names import common |
- |
-class TestAuthority(common.ResolverBase): |
- def __init__(self): |
- common.ResolverBase.__init__(self) |
- self.addresses = {} |
- |
- def _lookup(self, name, cls, type, timeout = None): |
- if name in self.addresses and type == dns.MX: |
- results = [] |
- for a in self.addresses[name]: |
- hdr = dns.RRHeader( |
- name, dns.MX, dns.IN, 60, dns.Record_MX(0, a) |
- ) |
- results.append(hdr) |
- return defer.succeed((results, [], [])) |
- return defer.fail(failure.Failure(dns.DomainError(name))) |
- |
-def setUpDNS(self): |
- self.auth = TestAuthority() |
- factory = server.DNSServerFactory([self.auth]) |
- protocol = dns.DNSDatagramProtocol(factory) |
- while 1: |
- self.port = reactor.listenTCP(0, factory, interface='127.0.0.1') |
- portNumber = self.port.getHost().port |
- |
- try: |
- self.udpPort = reactor.listenUDP(portNumber, protocol, interface='127.0.0.1') |
- except CannotListenError: |
- self.port.stopListening() |
- else: |
- break |
- self.resolver = client.Resolver(servers=[('127.0.0.1', portNumber)]) |
- |
- |
-def tearDownDNS(self): |
- dl = [] |
- dl.append(defer.maybeDeferred(self.port.stopListening)) |
- dl.append(defer.maybeDeferred(self.udpPort.stopListening)) |
- if self.resolver.protocol.transport is not None: |
- dl.append(defer.maybeDeferred(self.resolver.protocol.transport.stopListening)) |
- try: |
- self.resolver._parseCall.cancel() |
- except: |
- pass |
- return defer.DeferredList(dl) |
- |
-class MXTestCase(unittest.TestCase): |
- """ |
- Tests for L{mail.relaymanager.MXCalculator}. |
- """ |
- def setUp(self): |
- setUpDNS(self) |
- self.clock = task.Clock() |
- self.mx = mail.relaymanager.MXCalculator(self.resolver, self.clock) |
- |
- def tearDown(self): |
- return tearDownDNS(self) |
- |
- |
- def test_defaultClock(self): |
- """ |
- L{MXCalculator}'s default clock is C{twisted.internet.reactor}. |
- """ |
- self.assertIdentical( |
- mail.relaymanager.MXCalculator(self.resolver).clock, |
- reactor) |
- |
- |
- def testSimpleSuccess(self): |
- self.auth.addresses['test.domain'] = ['the.email.test.domain'] |
- return self.mx.getMX('test.domain').addCallback(self._cbSimpleSuccess) |
- |
- def _cbSimpleSuccess(self, mx): |
- self.assertEquals(mx.preference, 0) |
- self.assertEquals(str(mx.name), 'the.email.test.domain') |
- |
- def testSimpleFailure(self): |
- self.mx.fallbackToDomain = False |
- return self.assertFailure(self.mx.getMX('test.domain'), IOError) |
- |
- def testSimpleFailureWithFallback(self): |
- return self.assertFailure(self.mx.getMX('test.domain'), DNSLookupError) |
- |
- |
- def _exchangeTest(self, domain, records, correctMailExchange): |
- """ |
- Issue an MX request for the given domain and arrange for it to be |
- responded to with the given records. Verify that the resulting mail |
- exchange is the indicated host. |
- |
- @type domain: C{str} |
- @type records: C{list} of L{RRHeader} |
- @type correctMailExchange: C{str} |
- @rtype: L{Deferred} |
- """ |
- class DummyResolver(object): |
- def lookupMailExchange(self, name): |
- if name == domain: |
- return defer.succeed(( |
- records, |
- [], |
- [])) |
- return defer.fail(DNSNameError(domain)) |
- |
- self.mx.resolver = DummyResolver() |
- d = self.mx.getMX(domain) |
- def gotMailExchange(record): |
- self.assertEqual(str(record.name), correctMailExchange) |
- d.addCallback(gotMailExchange) |
- return d |
- |
- |
- def test_mailExchangePreference(self): |
- """ |
- The MX record with the lowest preference is returned by |
- L{MXCalculator.getMX}. |
- """ |
- domain = "example.com" |
- good = "good.example.com" |
- bad = "bad.example.com" |
- |
- records = [ |
- RRHeader(name=domain, |
- type=Record_MX.TYPE, |
- payload=Record_MX(1, bad)), |
- RRHeader(name=domain, |
- type=Record_MX.TYPE, |
- payload=Record_MX(0, good)), |
- RRHeader(name=domain, |
- type=Record_MX.TYPE, |
- payload=Record_MX(2, bad))] |
- return self._exchangeTest(domain, records, good) |
- |
- |
- def test_badExchangeExcluded(self): |
- """ |
- L{MXCalculator.getMX} returns the MX record with the lowest preference |
- which is not also marked as bad. |
- """ |
- domain = "example.com" |
- good = "good.example.com" |
- bad = "bad.example.com" |
- |
- records = [ |
- RRHeader(name=domain, |
- type=Record_MX.TYPE, |
- payload=Record_MX(0, bad)), |
- RRHeader(name=domain, |
- type=Record_MX.TYPE, |
- payload=Record_MX(1, good))] |
- self.mx.markBad(bad) |
- return self._exchangeTest(domain, records, good) |
- |
- |
- def test_fallbackForAllBadExchanges(self): |
- """ |
- L{MXCalculator.getMX} returns the MX record with the lowest preference |
- if all the MX records in the response have been marked bad. |
- """ |
- domain = "example.com" |
- bad = "bad.example.com" |
- worse = "worse.example.com" |
- |
- records = [ |
- RRHeader(name=domain, |
- type=Record_MX.TYPE, |
- payload=Record_MX(0, bad)), |
- RRHeader(name=domain, |
- type=Record_MX.TYPE, |
- payload=Record_MX(1, worse))] |
- self.mx.markBad(bad) |
- self.mx.markBad(worse) |
- return self._exchangeTest(domain, records, bad) |
- |
- |
- def test_badExchangeExpires(self): |
- """ |
- L{MXCalculator.getMX} returns the MX record with the lowest preference |
- if it was last marked bad longer than L{MXCalculator.timeOutBadMX} |
- seconds ago. |
- """ |
- domain = "example.com" |
- good = "good.example.com" |
- previouslyBad = "bad.example.com" |
- |
- records = [ |
- RRHeader(name=domain, |
- type=Record_MX.TYPE, |
- payload=Record_MX(0, previouslyBad)), |
- RRHeader(name=domain, |
- type=Record_MX.TYPE, |
- payload=Record_MX(1, good))] |
- self.mx.markBad(previouslyBad) |
- self.clock.advance(self.mx.timeOutBadMX) |
- return self._exchangeTest(domain, records, previouslyBad) |
- |
- |
- def test_goodExchangeUsed(self): |
- """ |
- L{MXCalculator.getMX} returns the MX record with the lowest preference |
- if it was marked good after it was marked bad. |
- """ |
- domain = "example.com" |
- good = "good.example.com" |
- previouslyBad = "bad.example.com" |
- |
- records = [ |
- RRHeader(name=domain, |
- type=Record_MX.TYPE, |
- payload=Record_MX(0, previouslyBad)), |
- RRHeader(name=domain, |
- type=Record_MX.TYPE, |
- payload=Record_MX(1, good))] |
- self.mx.markBad(previouslyBad) |
- self.mx.markGood(previouslyBad) |
- self.clock.advance(self.mx.timeOutBadMX) |
- return self._exchangeTest(domain, records, previouslyBad) |
- |
- |
- def test_successWithoutResults(self): |
- """ |
- If an MX lookup succeeds but the result set is empty, |
- L{MXCalculator.getMX} should try to look up an I{A} record for the |
- requested name and call back its returned Deferred with that |
- address. |
- """ |
- ip = '1.2.3.4' |
- domain = 'example.org' |
- |
- class DummyResolver(object): |
- """ |
- Fake resolver which will respond to an MX lookup with an empty |
- result set. |
- |
- @ivar mx: A dictionary mapping hostnames to three-tuples of |
- results to be returned from I{MX} lookups. |
- |
- @ivar a: A dictionary mapping hostnames to addresses to be |
- returned from I{A} lookups. |
- """ |
- mx = {domain: ([], [], [])} |
- a = {domain: ip} |
- |
- def lookupMailExchange(self, domain): |
- return defer.succeed(self.mx[domain]) |
- |
- def getHostByName(self, domain): |
- return defer.succeed(self.a[domain]) |
- |
- self.mx.resolver = DummyResolver() |
- d = self.mx.getMX(domain) |
- d.addCallback(self.assertEqual, Record_MX(name=ip)) |
- return d |
- |
- |
- def test_failureWithSuccessfulFallback(self): |
- """ |
- Test that if the MX record lookup fails, fallback is enabled, and an A |
- record is available for the name, then the Deferred returned by |
- L{MXCalculator.getMX} ultimately fires with a Record_MX instance which |
- gives the address in the A record for the name. |
- """ |
- class DummyResolver(object): |
- """ |
- Fake resolver which will fail an MX lookup but then succeed a |
- getHostByName call. |
- """ |
- def lookupMailExchange(self, domain): |
- return defer.fail(DNSNameError()) |
- |
- def getHostByName(self, domain): |
- return defer.succeed("1.2.3.4") |
- |
- self.mx.resolver = DummyResolver() |
- d = self.mx.getMX("domain") |
- d.addCallback(self.assertEqual, Record_MX(name="1.2.3.4")) |
- return d |
- |
- |
- def test_cnameWithoutGlueRecords(self): |
- """ |
- If an MX lookup returns a single CNAME record as a result, MXCalculator |
- will perform an MX lookup for the canonical name indicated and return |
- the MX record which results. |
- """ |
- alias = "alias.example.com" |
- canonical = "canonical.example.com" |
- exchange = "mail.example.com" |
- |
- class DummyResolver(object): |
- """ |
- Fake resolver which will return a CNAME for an MX lookup of a name |
- which is an alias and an MX for an MX lookup of the canonical name. |
- """ |
- def lookupMailExchange(self, domain): |
- if domain == alias: |
- return defer.succeed(( |
- [RRHeader(name=domain, |
- type=Record_CNAME.TYPE, |
- payload=Record_CNAME(canonical))], |
- [], [])) |
- elif domain == canonical: |
- return defer.succeed(( |
- [RRHeader(name=domain, |
- type=Record_MX.TYPE, |
- payload=Record_MX(0, exchange))], |
- [], [])) |
- else: |
- return defer.fail(DNSNameError(domain)) |
- |
- self.mx.resolver = DummyResolver() |
- d = self.mx.getMX(alias) |
- d.addCallback(self.assertEqual, Record_MX(name=exchange)) |
- return d |
- |
- |
- def test_cnameChain(self): |
- """ |
- If L{MXCalculator.getMX} encounters a CNAME chain which is longer than |
- the length specified, the returned L{Deferred} should errback with |
- L{CanonicalNameChainTooLong}. |
- """ |
- class DummyResolver(object): |
- """ |
- Fake resolver which generates a CNAME chain of infinite length in |
- response to MX lookups. |
- """ |
- chainCounter = 0 |
- |
- def lookupMailExchange(self, domain): |
- self.chainCounter += 1 |
- name = 'x-%d.example.com' % (self.chainCounter,) |
- return defer.succeed(( |
- [RRHeader(name=domain, |
- type=Record_CNAME.TYPE, |
- payload=Record_CNAME(name))], |
- [], [])) |
- |
- cnameLimit = 3 |
- self.mx.resolver = DummyResolver() |
- d = self.mx.getMX("mail.example.com", cnameLimit) |
- self.assertFailure( |
- d, twisted.mail.relaymanager.CanonicalNameChainTooLong) |
- def cbChainTooLong(error): |
- self.assertEqual(error.args[0], Record_CNAME("x-%d.example.com" % (cnameLimit + 1,))) |
- self.assertEqual(self.mx.resolver.chainCounter, cnameLimit + 1) |
- d.addCallback(cbChainTooLong) |
- return d |
- |
- |
- def test_cnameWithGlueRecords(self): |
- """ |
- If an MX lookup returns a CNAME and the MX record for the CNAME, the |
- L{Deferred} returned by L{MXCalculator.getMX} should be called back |
- with the name from the MX record without further lookups being |
- attempted. |
- """ |
- lookedUp = [] |
- alias = "alias.example.com" |
- canonical = "canonical.example.com" |
- exchange = "mail.example.com" |
- |
- class DummyResolver(object): |
- def lookupMailExchange(self, domain): |
- if domain != alias or lookedUp: |
- # Don't give back any results for anything except the alias |
- # or on any request after the first. |
- return ([], [], []) |
- return defer.succeed(( |
- [RRHeader(name=alias, |
- type=Record_CNAME.TYPE, |
- payload=Record_CNAME(canonical)), |
- RRHeader(name=canonical, |
- type=Record_MX.TYPE, |
- payload=Record_MX(name=exchange))], |
- [], [])) |
- |
- self.mx.resolver = DummyResolver() |
- d = self.mx.getMX(alias) |
- d.addCallback(self.assertEqual, Record_MX(name=exchange)) |
- return d |
- |
- |
- def test_cnameLoopWithGlueRecords(self): |
- """ |
- If an MX lookup returns two CNAME records which point to each other, |
- the loop should be detected and the L{Deferred} returned by |
- L{MXCalculator.getMX} should be errbacked with L{CanonicalNameLoop}. |
- """ |
- firstAlias = "cname1.example.com" |
- secondAlias = "cname2.example.com" |
- |
- class DummyResolver(object): |
- def lookupMailExchange(self, domain): |
- return defer.succeed(( |
- [RRHeader(name=firstAlias, |
- type=Record_CNAME.TYPE, |
- payload=Record_CNAME(secondAlias)), |
- RRHeader(name=secondAlias, |
- type=Record_CNAME.TYPE, |
- payload=Record_CNAME(firstAlias))], |
- [], [])) |
- |
- self.mx.resolver = DummyResolver() |
- d = self.mx.getMX(firstAlias) |
- self.assertFailure(d, twisted.mail.relaymanager.CanonicalNameLoop) |
- return d |
- |
- |
- def testManyRecords(self): |
- self.auth.addresses['test.domain'] = [ |
- 'mx1.test.domain', 'mx2.test.domain', 'mx3.test.domain' |
- ] |
- return self.mx.getMX('test.domain' |
- ).addCallback(self._cbManyRecordsSuccessfulLookup |
- ) |
- |
- def _cbManyRecordsSuccessfulLookup(self, mx): |
- self.failUnless(str(mx.name).split('.', 1)[0] in ('mx1', 'mx2', 'mx3')) |
- self.mx.markBad(str(mx.name)) |
- return self.mx.getMX('test.domain' |
- ).addCallback(self._cbManyRecordsDifferentResult, mx |
- ) |
- |
- def _cbManyRecordsDifferentResult(self, nextMX, mx): |
- self.assertNotEqual(str(mx.name), str(nextMX.name)) |
- self.mx.markBad(str(nextMX.name)) |
- |
- return self.mx.getMX('test.domain' |
- ).addCallback(self._cbManyRecordsLastResult, mx, nextMX |
- ) |
- |
- def _cbManyRecordsLastResult(self, lastMX, mx, nextMX): |
- self.assertNotEqual(str(mx.name), str(lastMX.name)) |
- self.assertNotEqual(str(nextMX.name), str(lastMX.name)) |
- |
- self.mx.markBad(str(lastMX.name)) |
- self.mx.markGood(str(nextMX.name)) |
- |
- return self.mx.getMX('test.domain' |
- ).addCallback(self._cbManyRecordsRepeatSpecificResult, nextMX |
- ) |
- |
- def _cbManyRecordsRepeatSpecificResult(self, againMX, nextMX): |
- self.assertEqual(str(againMX.name), str(nextMX.name)) |
- |
-class LiveFireExercise(unittest.TestCase): |
- if interfaces.IReactorUDP(reactor, None) is None: |
- skip = "UDP support is required to determining MX records" |
- |
- def setUp(self): |
- setUpDNS(self) |
- self.tmpdirs = [ |
- 'domainDir', 'insertionDomain', 'insertionQueue', |
- 'destinationDomain', 'destinationQueue' |
- ] |
- |
- def tearDown(self): |
- for d in self.tmpdirs: |
- if os.path.exists(d): |
- shutil.rmtree(d) |
- return tearDownDNS(self) |
- |
- def testLocalDelivery(self): |
- service = mail.mail.MailService() |
- service.smtpPortal.registerChecker(cred.checkers.AllowAnonymousAccess()) |
- domain = mail.maildir.MaildirDirdbmDomain(service, 'domainDir') |
- domain.addUser('user', 'password') |
- service.addDomain('test.domain', domain) |
- service.portals[''] = service.portals['test.domain'] |
- map(service.portals[''].registerChecker, domain.getCredentialsCheckers()) |
- |
- service.setQueue(mail.relay.DomainQueuer(service)) |
- manager = mail.relaymanager.SmartHostSMTPRelayingManager(service.queue, None) |
- helper = mail.relaymanager.RelayStateHelper(manager, 1) |
- |
- f = service.getSMTPFactory() |
- |
- self.smtpServer = reactor.listenTCP(0, f, interface='127.0.0.1') |
- |
- client = LineSendingProtocol([ |
- 'HELO meson', |
- 'MAIL FROM: <user@hostname>', |
- 'RCPT TO: <user@test.domain>', |
- 'DATA', |
- 'This is the message', |
- '.', |
- 'QUIT' |
- ]) |
- |
- done = Deferred() |
- f = protocol.ClientFactory() |
- f.protocol = lambda: client |
- f.clientConnectionLost = lambda *args: done.callback(None) |
- reactor.connectTCP('127.0.0.1', self.smtpServer.getHost().port, f) |
- |
- def finished(ign): |
- mbox = domain.requestAvatar('user', None, pop3.IMailbox)[1] |
- msg = mbox.getMessage(0).read() |
- self.failIfEqual(msg.find('This is the message'), -1) |
- |
- return self.smtpServer.stopListening() |
- done.addCallback(finished) |
- return done |
- |
- |
- def testRelayDelivery(self): |
- # Here is the service we will connect to and send mail from |
- insServ = mail.mail.MailService() |
- insServ.smtpPortal.registerChecker(cred.checkers.AllowAnonymousAccess()) |
- domain = mail.maildir.MaildirDirdbmDomain(insServ, 'insertionDomain') |
- insServ.addDomain('insertion.domain', domain) |
- os.mkdir('insertionQueue') |
- insServ.setQueue(mail.relaymanager.Queue('insertionQueue')) |
- insServ.domains.setDefaultDomain(mail.relay.DomainQueuer(insServ)) |
- manager = mail.relaymanager.SmartHostSMTPRelayingManager(insServ.queue) |
- manager.fArgs += ('test.identity.hostname',) |
- helper = mail.relaymanager.RelayStateHelper(manager, 1) |
- # Yoink! Now the internet obeys OUR every whim! |
- manager.mxcalc = mail.relaymanager.MXCalculator(self.resolver) |
- # And this is our whim. |
- self.auth.addresses['destination.domain'] = ['127.0.0.1'] |
- |
- f = insServ.getSMTPFactory() |
- self.insServer = reactor.listenTCP(0, f, interface='127.0.0.1') |
- |
- # Here is the service the previous one will connect to for final |
- # delivery |
- destServ = mail.mail.MailService() |
- destServ.smtpPortal.registerChecker(cred.checkers.AllowAnonymousAccess()) |
- domain = mail.maildir.MaildirDirdbmDomain(destServ, 'destinationDomain') |
- domain.addUser('user', 'password') |
- destServ.addDomain('destination.domain', domain) |
- os.mkdir('destinationQueue') |
- destServ.setQueue(mail.relaymanager.Queue('destinationQueue')) |
- manager2 = mail.relaymanager.SmartHostSMTPRelayingManager(destServ.queue) |
- helper = mail.relaymanager.RelayStateHelper(manager, 1) |
- helper.startService() |
- |
- f = destServ.getSMTPFactory() |
- self.destServer = reactor.listenTCP(0, f, interface='127.0.0.1') |
- |
- # Update the port number the *first* relay will connect to, because we can't use |
- # port 25 |
- manager.PORT = self.destServer.getHost().port |
- |
- client = LineSendingProtocol([ |
- 'HELO meson', |
- 'MAIL FROM: <user@wherever>', |
- 'RCPT TO: <user@destination.domain>', |
- 'DATA', |
- 'This is the message', |
- '.', |
- 'QUIT' |
- ]) |
- |
- done = Deferred() |
- f = protocol.ClientFactory() |
- f.protocol = lambda: client |
- f.clientConnectionLost = lambda *args: done.callback(None) |
- reactor.connectTCP('127.0.0.1', self.insServer.getHost().port, f) |
- |
- def finished(ign): |
- # First part of the delivery is done. Poke the queue manually now |
- # so we don't have to wait for the queue to be flushed. |
- delivery = manager.checkState() |
- def delivered(ign): |
- mbox = domain.requestAvatar('user', None, pop3.IMailbox)[1] |
- msg = mbox.getMessage(0).read() |
- self.failIfEqual(msg.find('This is the message'), -1) |
- |
- self.insServer.stopListening() |
- self.destServer.stopListening() |
- helper.stopService() |
- delivery.addCallback(delivered) |
- return delivery |
- done.addCallback(finished) |
- return done |
- |
- |
-aliasFile = StringIO.StringIO("""\ |
-# Here's a comment |
- # woop another one |
-testuser: address1,address2, address3, |
- continuation@address, |/bin/process/this |
- |
-usertwo:thisaddress,thataddress, lastaddress |
-lastuser: :/includable, /filename, |/program, address |
-""") |
- |
-class LineBufferMessage: |
- def __init__(self): |
- self.lines = [] |
- self.eom = False |
- self.lost = False |
- |
- def lineReceived(self, line): |
- self.lines.append(line) |
- |
- def eomReceived(self): |
- self.eom = True |
- return defer.succeed('<Whatever>') |
- |
- def connectionLost(self): |
- self.lost = True |
- |
-class AliasTestCase(unittest.TestCase): |
- lines = [ |
- 'First line', |
- 'Next line', |
- '', |
- 'After a blank line', |
- 'Last line' |
- ] |
- |
- def setUp(self): |
- aliasFile.seek(0) |
- |
- def testHandle(self): |
- result = {} |
- lines = [ |
- 'user: another@host\n', |
- 'nextuser: |/bin/program\n', |
- 'user: me@again\n', |
- 'moreusers: :/etc/include/filename\n', |
- 'multiuser: first@host, second@host,last@anotherhost', |
- ] |
- |
- for l in lines: |
- mail.alias.handle(result, l, 'TestCase', None) |
- |
- self.assertEquals(result['user'], ['another@host', 'me@again']) |
- self.assertEquals(result['nextuser'], ['|/bin/program']) |
- self.assertEquals(result['moreusers'], [':/etc/include/filename']) |
- self.assertEquals(result['multiuser'], ['first@host', 'second@host', 'last@anotherhost']) |
- |
- def testFileLoader(self): |
- domains = {'': object()} |
- result = mail.alias.loadAliasFile(domains, fp=aliasFile) |
- |
- self.assertEquals(len(result), 3) |
- |
- group = result['testuser'] |
- s = str(group) |
- for a in ('address1', 'address2', 'address3', 'continuation@address', '/bin/process/this'): |
- self.failIfEqual(s.find(a), -1) |
- self.assertEquals(len(group), 5) |
- |
- group = result['usertwo'] |
- s = str(group) |
- for a in ('thisaddress', 'thataddress', 'lastaddress'): |
- self.failIfEqual(s.find(a), -1) |
- self.assertEquals(len(group), 3) |
- |
- group = result['lastuser'] |
- s = str(group) |
- self.failUnlessEqual(s.find('/includable'), -1) |
- for a in ('/filename', 'program', 'address'): |
- self.failIfEqual(s.find(a), -1, '%s not found' % a) |
- self.assertEquals(len(group), 3) |
- |
- def testMultiWrapper(self): |
- msgs = LineBufferMessage(), LineBufferMessage(), LineBufferMessage() |
- msg = mail.alias.MultiWrapper(msgs) |
- |
- for L in self.lines: |
- msg.lineReceived(L) |
- return msg.eomReceived().addCallback(self._cbMultiWrapper, msgs) |
- |
- def _cbMultiWrapper(self, ignored, msgs): |
- for m in msgs: |
- self.failUnless(m.eom) |
- self.failIf(m.lost) |
- self.assertEquals(self.lines, m.lines) |
- |
- def testFileAlias(self): |
- tmpfile = self.mktemp() |
- a = mail.alias.FileAlias(tmpfile, None, None) |
- m = a.createMessageReceiver() |
- |
- for l in self.lines: |
- m.lineReceived(l) |
- return m.eomReceived().addCallback(self._cbTestFileAlias, tmpfile) |
- |
- def _cbTestFileAlias(self, ignored, tmpfile): |
- lines = file(tmpfile).readlines() |
- self.assertEquals([L[:-1] for L in lines], self.lines) |
- |
- |
- |
-class DummyProcess(object): |
- __slots__ = ['onEnd'] |
- |
- |
- |
-class MockProcessAlias(mail.alias.ProcessAlias): |
- """ |
- A alias processor that doesn't actually launch processes. |
- """ |
- |
- def spawnProcess(self, proto, program, path): |
- """ |
- Don't spawn a process. |
- """ |
- |
- |
- |
-class MockAliasGroup(mail.alias.AliasGroup): |
- """ |
- An alias group using C{MockProcessAlias}. |
- """ |
- processAliasFactory = MockProcessAlias |
- |
- |
- |
-class StubProcess(object): |
- """ |
- Fake implementation of L{IProcessTransport}. |
- |
- @ivar signals: A list of all the signals which have been sent to this fake |
- process. |
- """ |
- def __init__(self): |
- self.signals = [] |
- |
- |
- def loseConnection(self): |
- """ |
- No-op implementation of disconnection. |
- """ |
- |
- |
- def signalProcess(self, signal): |
- """ |
- Record a signal sent to this process for later inspection. |
- """ |
- self.signals.append(signal) |
- |
- |
- |
-class ProcessAliasTestCase(unittest.TestCase): |
- """ |
- Tests for alias resolution. |
- """ |
- |
- lines = [ |
- 'First line', |
- 'Next line', |
- '', |
- 'After a blank line', |
- 'Last line' |
- ] |
- |
- def exitStatus(self, code): |
- """ |
- Construct a status from the given exit code. |
- |
- @type code: L{int} between 0 and 255 inclusive. |
- @param code: The exit status which the code will represent. |
- |
- @rtype: L{int} |
- @return: A status integer for the given exit code. |
- """ |
- # /* Macros for constructing status values. */ |
- # #define __W_EXITCODE(ret, sig) ((ret) << 8 | (sig)) |
- status = (code << 8) | 0 |
- |
- # Sanity check |
- self.assertTrue(os.WIFEXITED(status)) |
- self.assertEqual(os.WEXITSTATUS(status), code) |
- self.assertFalse(os.WIFSIGNALED(status)) |
- |
- return status |
- |
- |
- def signalStatus(self, signal): |
- """ |
- Construct a status from the given signal. |
- |
- @type signal: L{int} between 0 and 255 inclusive. |
- @param signal: The signal number which the status will represent. |
- |
- @rtype: L{int} |
- @return: A status integer for the given signal. |
- """ |
- # /* If WIFSIGNALED(STATUS), the terminating signal. */ |
- # #define __WTERMSIG(status) ((status) & 0x7f) |
- # /* Nonzero if STATUS indicates termination by a signal. */ |
- # #define __WIFSIGNALED(status) \ |
- # (((signed char) (((status) & 0x7f) + 1) >> 1) > 0) |
- status = signal |
- |
- # Sanity check |
- self.assertTrue(os.WIFSIGNALED(status)) |
- self.assertEqual(os.WTERMSIG(status), signal) |
- self.assertFalse(os.WIFEXITED(status)) |
- |
- return status |
- |
- |
- def setUp(self): |
- """ |
- Replace L{smtp.DNSNAME} with a well-known value. |
- """ |
- self.DNSNAME = smtp.DNSNAME |
- smtp.DNSNAME = '' |
- |
- |
- def tearDown(self): |
- """ |
- Restore the original value of L{smtp.DNSNAME}. |
- """ |
- smtp.DNSNAME = self.DNSNAME |
- |
- |
- def test_processAlias(self): |
- """ |
- Standard call to C{mail.alias.ProcessAlias}: check that the specified |
- script is called, and that the input is correctly transferred to it. |
- """ |
- sh = FilePath(self.mktemp()) |
- sh.setContent("""\ |
-#!/bin/sh |
-rm -f process.alias.out |
-while read i; do |
- echo $i >> process.alias.out |
-done""") |
- os.chmod(sh.path, 0700) |
- a = mail.alias.ProcessAlias(sh.path, None, None) |
- m = a.createMessageReceiver() |
- |
- for l in self.lines: |
- m.lineReceived(l) |
- |
- def _cbProcessAlias(ignored): |
- lines = file('process.alias.out').readlines() |
- self.assertEquals([L[:-1] for L in lines], self.lines) |
- |
- return m.eomReceived().addCallback(_cbProcessAlias) |
- |
- |
- def test_processAliasTimeout(self): |
- """ |
- If the alias child process does not exit within a particular period of |
- time, the L{Deferred} returned by L{MessageWrapper.eomReceived} should |
- fail with L{ProcessAliasTimeout} and send the I{KILL} signal to the |
- child process.. |
- """ |
- reactor = task.Clock() |
- transport = StubProcess() |
- proto = mail.alias.ProcessAliasProtocol() |
- proto.makeConnection(transport) |
- |
- receiver = mail.alias.MessageWrapper(proto, None, reactor) |
- d = receiver.eomReceived() |
- reactor.advance(receiver.completionTimeout) |
- def timedOut(ignored): |
- self.assertEqual(transport.signals, ['KILL']) |
- # Now that it has been killed, disconnect the protocol associated |
- # with it. |
- proto.processEnded( |
- ProcessTerminated(self.signalStatus(signal.SIGKILL))) |
- self.assertFailure(d, mail.alias.ProcessAliasTimeout) |
- d.addCallback(timedOut) |
- return d |
- |
- |
- def test_earlyProcessTermination(self): |
- """ |
- If the process associated with an L{mail.alias.MessageWrapper} exits |
- before I{eomReceived} is called, the L{Deferred} returned by |
- I{eomReceived} should fail. |
- """ |
- transport = StubProcess() |
- protocol = mail.alias.ProcessAliasProtocol() |
- protocol.makeConnection(transport) |
- receiver = mail.alias.MessageWrapper(protocol, None, None) |
- protocol.processEnded(failure.Failure(ProcessDone(0))) |
- return self.assertFailure(receiver.eomReceived(), ProcessDone) |
- |
- |
- def _terminationTest(self, status): |
- """ |
- Verify that if the process associated with an |
- L{mail.alias.MessageWrapper} exits with the given status, the |
- L{Deferred} returned by I{eomReceived} fails with L{ProcessTerminated}. |
- """ |
- transport = StubProcess() |
- protocol = mail.alias.ProcessAliasProtocol() |
- protocol.makeConnection(transport) |
- receiver = mail.alias.MessageWrapper(protocol, None, None) |
- protocol.processEnded( |
- failure.Failure(ProcessTerminated(status))) |
- return self.assertFailure(receiver.eomReceived(), ProcessTerminated) |
- |
- |
- def test_errorProcessTermination(self): |
- """ |
- If the process associated with an L{mail.alias.MessageWrapper} exits |
- with a non-zero exit code, the L{Deferred} returned by I{eomReceived} |
- should fail. |
- """ |
- return self._terminationTest(self.exitStatus(1)) |
- |
- |
- def test_signalProcessTermination(self): |
- """ |
- If the process associated with an L{mail.alias.MessageWrapper} exits |
- because it received a signal, the L{Deferred} returned by |
- I{eomReceived} should fail. |
- """ |
- return self._terminationTest(self.signalStatus(signal.SIGHUP)) |
- |
- |
- def test_aliasResolution(self): |
- """ |
- Check that the C{resolve} method of alias processors produce the correct |
- set of objects: |
- - direct alias with L{mail.alias.AddressAlias} if a simple input is passed |
- - aliases in a file with L{mail.alias.FileWrapper} if an input in the format |
- '/file' is given |
- - aliases resulting of a process call wrapped by L{mail.alias.MessageWrapper} |
- if the format is '|process' |
- """ |
- aliases = {} |
- domain = {'': TestDomain(aliases, ['user1', 'user2', 'user3'])} |
- A1 = MockAliasGroup(['user1', '|echo', '/file'], domain, 'alias1') |
- A2 = MockAliasGroup(['user2', 'user3'], domain, 'alias2') |
- A3 = mail.alias.AddressAlias('alias1', domain, 'alias3') |
- aliases.update({ |
- 'alias1': A1, |
- 'alias2': A2, |
- 'alias3': A3, |
- }) |
- |
- res1 = A1.resolve(aliases) |
- r1 = map(str, res1.objs) |
- r1.sort() |
- expected = map(str, [ |
- mail.alias.AddressAlias('user1', None, None), |
- mail.alias.MessageWrapper(DummyProcess(), 'echo'), |
- mail.alias.FileWrapper('/file'), |
- ]) |
- expected.sort() |
- self.assertEquals(r1, expected) |
- |
- res2 = A2.resolve(aliases) |
- r2 = map(str, res2.objs) |
- r2.sort() |
- expected = map(str, [ |
- mail.alias.AddressAlias('user2', None, None), |
- mail.alias.AddressAlias('user3', None, None) |
- ]) |
- expected.sort() |
- self.assertEquals(r2, expected) |
- |
- res3 = A3.resolve(aliases) |
- r3 = map(str, res3.objs) |
- r3.sort() |
- expected = map(str, [ |
- mail.alias.AddressAlias('user1', None, None), |
- mail.alias.MessageWrapper(DummyProcess(), 'echo'), |
- mail.alias.FileWrapper('/file'), |
- ]) |
- expected.sort() |
- self.assertEquals(r3, expected) |
- |
- |
- def test_cyclicAlias(self): |
- """ |
- Check that a cycle in alias resolution is correctly handled. |
- """ |
- aliases = {} |
- domain = {'': TestDomain(aliases, [])} |
- A1 = mail.alias.AddressAlias('alias2', domain, 'alias1') |
- A2 = mail.alias.AddressAlias('alias3', domain, 'alias2') |
- A3 = mail.alias.AddressAlias('alias1', domain, 'alias3') |
- aliases.update({ |
- 'alias1': A1, |
- 'alias2': A2, |
- 'alias3': A3 |
- }) |
- |
- self.assertEquals(aliases['alias1'].resolve(aliases), None) |
- self.assertEquals(aliases['alias2'].resolve(aliases), None) |
- self.assertEquals(aliases['alias3'].resolve(aliases), None) |
- |
- A4 = MockAliasGroup(['|echo', 'alias1'], domain, 'alias4') |
- aliases['alias4'] = A4 |
- |
- res = A4.resolve(aliases) |
- r = map(str, res.objs) |
- r.sort() |
- expected = map(str, [ |
- mail.alias.MessageWrapper(DummyProcess(), 'echo') |
- ]) |
- expected.sort() |
- self.assertEquals(r, expected) |
- |
- |
- |
-if interfaces.IReactorProcess(reactor, None) is None: |
- ProcessAliasTestCase = "IReactorProcess not supported" |
- |
- |
- |
-class TestDomain: |
- def __init__(self, aliases, users): |
- self.aliases = aliases |
- self.users = users |
- |
- def exists(self, user, memo=None): |
- user = user.dest.local |
- if user in self.users: |
- return lambda: mail.alias.AddressAlias(user, None, None) |
- try: |
- a = self.aliases[user] |
- except: |
- raise smtp.SMTPBadRcpt(user) |
- else: |
- aliases = a.resolve(self.aliases, memo) |
- if aliases: |
- return lambda: aliases |
- raise smtp.SMTPBadRcpt(user) |
- |
- |
-from twisted.python.runtime import platformType |
-import types |
-if platformType != "posix": |
- for o in locals().values(): |
- if isinstance(o, (types.ClassType, type)) and issubclass(o, unittest.TestCase): |
- o.skip = "twisted.mail only works on posix" |