Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(25)

Unified Diff: third_party/twisted_8_1/twisted/mail/test/test_mail.py

Issue 12261012: Remove third_party/twisted_8_1 (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/tools/build
Patch Set: Created 7 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: third_party/twisted_8_1/twisted/mail/test/test_mail.py
diff --git a/third_party/twisted_8_1/twisted/mail/test/test_mail.py b/third_party/twisted_8_1/twisted/mail/test/test_mail.py
deleted file mode 100644
index 5a888c8f4415df019f24a7a36498bd875e741baa..0000000000000000000000000000000000000000
--- a/third_party/twisted_8_1/twisted/mail/test/test_mail.py
+++ /dev/null
@@ -1,1863 +0,0 @@
-# Copyright (c) 2001-2008 Twisted Matrix Laboratories.
-# See LICENSE for details.
-
-"""
-Tests for large portions of L{twisted.mail}.
-"""
-
-import os
-import errno
-import md5
-import shutil
-import pickle
-import StringIO
-import rfc822
-import tempfile
-import signal
-
-from zope.interface import Interface, implements
-
-from twisted.trial import unittest
-from twisted.mail import smtp
-from twisted.mail import pop3
-from twisted.names import dns
-from twisted.internet import protocol
-from twisted.internet import defer
-from twisted.internet.defer import Deferred
-from twisted.internet import reactor
-from twisted.internet import interfaces
-from twisted.internet import task
-from twisted.internet.error import DNSLookupError, CannotListenError
-from twisted.internet.error import ProcessDone, ProcessTerminated
-from twisted.internet import address
-from twisted.python import failure
-from twisted.python.filepath import FilePath
-
-from twisted import mail
-import twisted.mail.mail
-import twisted.mail.maildir
-import twisted.mail.relay
-import twisted.mail.relaymanager
-import twisted.mail.protocols
-import twisted.mail.alias
-
-from twisted.names.error import DNSNameError
-from twisted.names.dns import RRHeader, Record_CNAME, Record_MX
-
-from twisted import cred
-import twisted.cred.credentials
-import twisted.cred.checkers
-import twisted.cred.portal
-
-from twisted.test.proto_helpers import LineSendingProtocol
-
-class DomainWithDefaultsTestCase(unittest.TestCase):
- def testMethods(self):
- d = dict([(x, x + 10) for x in range(10)])
- d = mail.mail.DomainWithDefaultDict(d, 'Default')
-
- self.assertEquals(len(d), 10)
- self.assertEquals(list(iter(d)), range(10))
- self.assertEquals(list(d.iterkeys()), list(iter(d)))
-
- items = list(d.iteritems())
- items.sort()
- self.assertEquals(items, [(x, x + 10) for x in range(10)])
-
- values = list(d.itervalues())
- values.sort()
- self.assertEquals(values, range(10, 20))
-
- items = d.items()
- items.sort()
- self.assertEquals(items, [(x, x + 10) for x in range(10)])
-
- values = d.values()
- values.sort()
- self.assertEquals(values, range(10, 20))
-
- for x in range(10):
- self.assertEquals(d[x], x + 10)
- self.assertEquals(d.get(x), x + 10)
- self.failUnless(x in d)
- self.failUnless(d.has_key(x))
-
- del d[2], d[4], d[6]
-
- self.assertEquals(len(d), 7)
- self.assertEquals(d[2], 'Default')
- self.assertEquals(d[4], 'Default')
- self.assertEquals(d[6], 'Default')
-
- d.update({'a': None, 'b': (), 'c': '*'})
- self.assertEquals(len(d), 10)
- self.assertEquals(d['a'], None)
- self.assertEquals(d['b'], ())
- self.assertEquals(d['c'], '*')
-
- d.clear()
- self.assertEquals(len(d), 0)
-
- self.assertEquals(d.setdefault('key', 'value'), 'value')
- self.assertEquals(d['key'], 'value')
-
- self.assertEquals(d.popitem(), ('key', 'value'))
- self.assertEquals(len(d), 0)
-
- dcopy = d.copy()
- self.assertEquals(d.domains, dcopy.domains)
- self.assertEquals(d.default, dcopy.default)
-
-
- def _stringificationTest(self, stringifier):
- """
- Assert that the class name of a L{mail.mail.DomainWithDefaultDict}
- instance and the string-formatted underlying domain dictionary both
- appear in the string produced by the given string-returning function.
-
- @type stringifier: one-argument callable
- @param stringifier: either C{str} or C{repr}, to be used to get a
- string to make assertions against.
- """
- domain = mail.mail.DomainWithDefaultDict({}, 'Default')
- self.assertIn(domain.__class__.__name__, stringifier(domain))
- domain['key'] = 'value'
- self.assertIn(str({'key': 'value'}), stringifier(domain))
-
-
- def test_str(self):
- """
- L{DomainWithDefaultDict.__str__} should return a string including
- the class name and the domain mapping held by the instance.
- """
- self._stringificationTest(str)
-
-
- def test_repr(self):
- """
- L{DomainWithDefaultDict.__repr__} should return a string including
- the class name and the domain mapping held by the instance.
- """
- self._stringificationTest(repr)
-
-
-
-class BounceTestCase(unittest.TestCase):
- def setUp(self):
- self.domain = mail.mail.BounceDomain()
-
- def testExists(self):
- self.assertRaises(smtp.AddressError, self.domain.exists, "any user")
-
- def testRelay(self):
- self.assertEquals(
- self.domain.willRelay("random q emailer", "protocol"),
- False
- )
-
- def testMessage(self):
- self.assertRaises(NotImplementedError, self.domain.startMessage, "whomever")
-
- def testAddUser(self):
- self.domain.addUser("bob", "password")
- self.assertRaises(smtp.SMTPBadRcpt, self.domain.exists, "bob")
-
-class FileMessageTestCase(unittest.TestCase):
- def setUp(self):
- self.name = "fileMessage.testFile"
- self.final = "final.fileMessage.testFile"
- self.f = file(self.name, 'w')
- self.fp = mail.mail.FileMessage(self.f, self.name, self.final)
-
- def tearDown(self):
- try:
- self.f.close()
- except:
- pass
- try:
- os.remove(self.name)
- except:
- pass
- try:
- os.remove(self.final)
- except:
- pass
-
- def testFinalName(self):
- return self.fp.eomReceived().addCallback(self._cbFinalName)
-
- def _cbFinalName(self, result):
- self.assertEquals(result, self.final)
- self.failUnless(self.f.closed)
- self.failIf(os.path.exists(self.name))
-
- def testContents(self):
- contents = "first line\nsecond line\nthird line\n"
- for line in contents.splitlines():
- self.fp.lineReceived(line)
- self.fp.eomReceived()
- self.assertEquals(file(self.final).read(), contents)
-
- def testInterrupted(self):
- contents = "first line\nsecond line\n"
- for line in contents.splitlines():
- self.fp.lineReceived(line)
- self.fp.connectionLost()
- self.failIf(os.path.exists(self.name))
- self.failIf(os.path.exists(self.final))
-
-class MailServiceTestCase(unittest.TestCase):
- def setUp(self):
- self.service = mail.mail.MailService()
-
- def testFactories(self):
- f = self.service.getPOP3Factory()
- self.failUnless(isinstance(f, protocol.ServerFactory))
- self.failUnless(f.buildProtocol(('127.0.0.1', 12345)), pop3.POP3)
-
- f = self.service.getSMTPFactory()
- self.failUnless(isinstance(f, protocol.ServerFactory))
- self.failUnless(f.buildProtocol(('127.0.0.1', 12345)), smtp.SMTP)
-
- f = self.service.getESMTPFactory()
- self.failUnless(isinstance(f, protocol.ServerFactory))
- self.failUnless(f.buildProtocol(('127.0.0.1', 12345)), smtp.ESMTP)
-
- def testPortals(self):
- o1 = object()
- o2 = object()
- self.service.portals['domain'] = o1
- self.service.portals[''] = o2
-
- self.failUnless(self.service.lookupPortal('domain') is o1)
- self.failUnless(self.service.defaultPortal() is o2)
-
-class FailingMaildirMailboxAppendMessageTask(mail.maildir._MaildirMailboxAppendMessageTask):
- _openstate = True
- _writestate = True
- _renamestate = True
- def osopen(self, fn, attr, mode):
- if self._openstate:
- return os.open(fn, attr, mode)
- else:
- raise OSError(errno.EPERM, "Faked Permission Problem")
- def oswrite(self, fh, data):
- if self._writestate:
- return os.write(fh, data)
- else:
- raise OSError(errno.ENOSPC, "Faked Space problem")
- def osrename(self, oldname, newname):
- if self._renamestate:
- return os.rename(oldname, newname)
- else:
- raise OSError(errno.EPERM, "Faked Permission Problem")
-
-class MaildirAppendStringTestCase(unittest.TestCase):
- def setUp(self):
- self.d = self.mktemp()
- mail.maildir.initializeMaildir(self.d)
-
- def tearDown(self):
- shutil.rmtree(self.d)
-
- def _append(self, ignored, mbox):
- d = mbox.appendMessage('TEST')
- return self.assertFailure(d, Exception)
-
- def _setState(self, ignored, mbox, rename=None, write=None, open=None):
- if rename is not None:
- mbox.AppendFactory._renameState = rename
- if write is not None:
- mbox.AppendFactory._writeState = write
- if open is not None:
- mbox.AppendFactory._openstate = open
-
- def testAppend(self):
- mbox = mail.maildir.MaildirMailbox(self.d)
- mbox.AppendFactory = FailingMaildirMailboxAppendMessageTask
- ds = []
- for i in xrange(1, 11):
- ds.append(mbox.appendMessage("X" * i))
- ds[-1].addCallback(self.assertEqual, None)
- d = defer.gatherResults(ds)
- d.addCallback(self._cbTestAppend, mbox)
- return d
-
- def _cbTestAppend(self, result, mbox):
- self.assertEquals(len(mbox.listMessages()),
- 10)
- self.assertEquals(len(mbox.getMessage(5).read()), 6)
- # test in the right order: last to first error location.
- mbox.AppendFactory._renamestate = False
- d = self._append(None, mbox)
- d.addCallback(self._setState, mbox, rename=True, write=False)
- d.addCallback(self._append, mbox)
- d.addCallback(self._setState, mbox, write=True, open=False)
- d.addCallback(self._append, mbox)
- d.addCallback(self._setState, mbox, open=True)
- return d
-
-
-class MaildirAppendFileTestCase(unittest.TestCase):
- def setUp(self):
- self.d = self.mktemp()
- mail.maildir.initializeMaildir(self.d)
-
- def tearDown(self):
- shutil.rmtree(self.d)
-
- def testAppend(self):
- mbox = mail.maildir.MaildirMailbox(self.d)
- ds = []
- def _check(res, t):
- t.close()
- self.assertEqual(res, None)
- for i in xrange(1, 11):
- temp = tempfile.TemporaryFile()
- temp.write("X" * i)
- temp.seek(0,0)
- ds.append(mbox.appendMessage(temp))
- ds[-1].addCallback(_check, temp)
- return defer.gatherResults(ds).addCallback(self._cbTestAppend, mbox)
-
- def _cbTestAppend(self, result, mbox):
- self.assertEquals(len(mbox.listMessages()),
- 10)
- self.assertEquals(len(mbox.getMessage(5).read()), 6)
-
-
-class MaildirTestCase(unittest.TestCase):
- def setUp(self):
- self.d = self.mktemp()
- mail.maildir.initializeMaildir(self.d)
-
- def tearDown(self):
- shutil.rmtree(self.d)
-
- def testInitializer(self):
- d = self.d
- trash = os.path.join(d, '.Trash')
-
- self.failUnless(os.path.exists(d) and os.path.isdir(d))
- self.failUnless(os.path.exists(os.path.join(d, 'new')))
- self.failUnless(os.path.exists(os.path.join(d, 'cur')))
- self.failUnless(os.path.exists(os.path.join(d, 'tmp')))
- self.failUnless(os.path.isdir(os.path.join(d, 'new')))
- self.failUnless(os.path.isdir(os.path.join(d, 'cur')))
- self.failUnless(os.path.isdir(os.path.join(d, 'tmp')))
-
- self.failUnless(os.path.exists(os.path.join(trash, 'new')))
- self.failUnless(os.path.exists(os.path.join(trash, 'cur')))
- self.failUnless(os.path.exists(os.path.join(trash, 'tmp')))
- self.failUnless(os.path.isdir(os.path.join(trash, 'new')))
- self.failUnless(os.path.isdir(os.path.join(trash, 'cur')))
- self.failUnless(os.path.isdir(os.path.join(trash, 'tmp')))
-
- def testMailbox(self):
- j = os.path.join
- n = mail.maildir._generateMaildirName
- msgs = [j(b, n()) for b in ('cur', 'new') for x in range(5)]
-
- # Toss a few files into the mailbox
- i = 1
- for f in msgs:
- f = file(j(self.d, f), 'w')
- f.write('x' * i)
- f.close()
- i = i + 1
-
- mb = mail.maildir.MaildirMailbox(self.d)
- self.assertEquals(mb.listMessages(), range(1, 11))
- self.assertEquals(mb.listMessages(1), 2)
- self.assertEquals(mb.listMessages(5), 6)
-
- self.assertEquals(mb.getMessage(6).read(), 'x' * 7)
- self.assertEquals(mb.getMessage(1).read(), 'x' * 2)
-
- d = {}
- for i in range(10):
- u = mb.getUidl(i)
- self.failIf(u in d)
- d[u] = None
-
- p, f = os.path.split(msgs[5])
-
- mb.deleteMessage(5)
- self.assertEquals(mb.listMessages(5), 0)
- self.failUnless(os.path.exists(j(self.d, '.Trash', 'cur', f)))
- self.failIf(os.path.exists(j(self.d, msgs[5])))
-
- mb.undeleteMessages()
- self.assertEquals(mb.listMessages(5), 6)
- self.failIf(os.path.exists(j(self.d, '.Trash', 'cur', f)))
- self.failUnless(os.path.exists(j(self.d, msgs[5])))
-
-class MaildirDirdbmDomainTestCase(unittest.TestCase):
- def setUp(self):
- self.P = self.mktemp()
- self.S = mail.mail.MailService()
- self.D = mail.maildir.MaildirDirdbmDomain(self.S, self.P)
-
- def tearDown(self):
- shutil.rmtree(self.P)
-
- def testAddUser(self):
- toAdd = (('user1', 'pwd1'), ('user2', 'pwd2'), ('user3', 'pwd3'))
- for (u, p) in toAdd:
- self.D.addUser(u, p)
-
- for (u, p) in toAdd:
- self.failUnless(u in self.D.dbm)
- self.assertEquals(self.D.dbm[u], p)
- self.failUnless(os.path.exists(os.path.join(self.P, u)))
-
- def testCredentials(self):
- creds = self.D.getCredentialsCheckers()
-
- self.assertEquals(len(creds), 1)
- self.failUnless(cred.checkers.ICredentialsChecker.providedBy(creds[0]))
- self.failUnless(cred.credentials.IUsernamePassword in creds[0].credentialInterfaces)
-
- def testRequestAvatar(self):
- class ISomething(Interface):
- pass
-
- self.D.addUser('user', 'password')
- self.assertRaises(
- NotImplementedError,
- self.D.requestAvatar, 'user', None, ISomething
- )
-
- t = self.D.requestAvatar('user', None, pop3.IMailbox)
- self.assertEquals(len(t), 3)
- self.failUnless(t[0] is pop3.IMailbox)
- self.failUnless(pop3.IMailbox.providedBy(t[1]))
-
- t[2]()
-
- def testRequestAvatarId(self):
- self.D.addUser('user', 'password')
- database = self.D.getCredentialsCheckers()[0]
-
- creds = cred.credentials.UsernamePassword('user', 'wrong password')
- self.assertRaises(
- cred.error.UnauthorizedLogin,
- database.requestAvatarId, creds
- )
-
- creds = cred.credentials.UsernamePassword('user', 'password')
- self.assertEquals(database.requestAvatarId(creds), 'user')
-
-
-class StubAliasableDomain(object):
- """
- Minimal testable implementation of IAliasableDomain.
- """
- implements(mail.mail.IAliasableDomain)
-
- def exists(self, user):
- """
- No test coverage for invocations of this method on domain objects,
- so we just won't implement it.
- """
- raise NotImplementedError()
-
-
- def addUser(self, user, password):
- """
- No test coverage for invocations of this method on domain objects,
- so we just won't implement it.
- """
- raise NotImplementedError()
-
-
- def getCredentialsCheckers(self):
- """
- This needs to succeed in order for other tests to complete
- successfully, but we don't actually assert anything about its
- behavior. Return an empty list. Sometime later we should return
- something else and assert that a portal got set up properly.
- """
- return []
-
-
- def setAliasGroup(self, aliases):
- """
- Just record the value so the test can check it later.
- """
- self.aliasGroup = aliases
-
-
-class ServiceDomainTestCase(unittest.TestCase):
- def setUp(self):
- self.S = mail.mail.MailService()
- self.D = mail.protocols.DomainDeliveryBase(self.S, None)
- self.D.service = self.S
- self.D.protocolName = 'TEST'
- self.D.host = 'hostname'
-
- self.tmpdir = self.mktemp()
- domain = mail.maildir.MaildirDirdbmDomain(self.S, self.tmpdir)
- domain.addUser('user', 'password')
- self.S.addDomain('test.domain', domain)
-
- def tearDown(self):
- shutil.rmtree(self.tmpdir)
-
-
- def testAddAliasableDomain(self):
- """
- Test that adding an IAliasableDomain to a mail service properly sets
- up alias group references and such.
- """
- aliases = object()
- domain = StubAliasableDomain()
- self.S.aliases = aliases
- self.S.addDomain('example.com', domain)
- self.assertIdentical(domain.aliasGroup, aliases)
-
-
- def testReceivedHeader(self):
- hdr = self.D.receivedHeader(
- ('remotehost', '123.232.101.234'),
- smtp.Address('<someguy@somplace>'),
- ['user@host.name']
- )
- fp = StringIO.StringIO(hdr)
- m = rfc822.Message(fp)
- self.assertEquals(len(m.items()), 1)
- self.failUnless(m.has_key('Received'))
-
- def testValidateTo(self):
- user = smtp.User('user@test.domain', 'helo', None, 'wherever@whatever')
- return defer.maybeDeferred(self.D.validateTo, user
- ).addCallback(self._cbValidateTo
- )
-
- def _cbValidateTo(self, result):
- self.failUnless(callable(result))
-
- def testValidateToBadUsername(self):
- user = smtp.User('resu@test.domain', 'helo', None, 'wherever@whatever')
- return self.assertFailure(
- defer.maybeDeferred(self.D.validateTo, user),
- smtp.SMTPBadRcpt)
-
- def testValidateToBadDomain(self):
- user = smtp.User('user@domain.test', 'helo', None, 'wherever@whatever')
- return self.assertFailure(
- defer.maybeDeferred(self.D.validateTo, user),
- smtp.SMTPBadRcpt)
-
- def testValidateFrom(self):
- helo = ('hostname', '127.0.0.1')
- origin = smtp.Address('<user@hostname>')
- self.failUnless(self.D.validateFrom(helo, origin) is origin)
-
- helo = ('hostname', '1.2.3.4')
- origin = smtp.Address('<user@hostname>')
- self.failUnless(self.D.validateFrom(helo, origin) is origin)
-
- helo = ('hostname', '1.2.3.4')
- origin = smtp.Address('<>')
- self.failUnless(self.D.validateFrom(helo, origin) is origin)
-
- self.assertRaises(
- smtp.SMTPBadSender,
- self.D.validateFrom, None, origin
- )
-
-class VirtualPOP3TestCase(unittest.TestCase):
- def setUp(self):
- self.tmpdir = self.mktemp()
- self.S = mail.mail.MailService()
- self.D = mail.maildir.MaildirDirdbmDomain(self.S, self.tmpdir)
- self.D.addUser('user', 'password')
- self.S.addDomain('test.domain', self.D)
-
- portal = cred.portal.Portal(self.D)
- map(portal.registerChecker, self.D.getCredentialsCheckers())
- self.S.portals[''] = self.S.portals['test.domain'] = portal
-
- self.P = mail.protocols.VirtualPOP3()
- self.P.service = self.S
- self.P.magic = '<unit test magic>'
-
- def tearDown(self):
- shutil.rmtree(self.tmpdir)
-
- def testAuthenticateAPOP(self):
- resp = md5.new(self.P.magic + 'password').hexdigest()
- return self.P.authenticateUserAPOP('user', resp
- ).addCallback(self._cbAuthenticateAPOP
- )
-
- def _cbAuthenticateAPOP(self, result):
- self.assertEquals(len(result), 3)
- self.assertEquals(result[0], pop3.IMailbox)
- self.failUnless(pop3.IMailbox.providedBy(result[1]))
- result[2]()
-
- def testAuthenticateIncorrectUserAPOP(self):
- resp = md5.new(self.P.magic + 'password').hexdigest()
- return self.assertFailure(
- self.P.authenticateUserAPOP('resu', resp),
- cred.error.UnauthorizedLogin)
-
- def testAuthenticateIncorrectResponseAPOP(self):
- resp = md5.new('wrong digest').hexdigest()
- return self.assertFailure(
- self.P.authenticateUserAPOP('user', resp),
- cred.error.UnauthorizedLogin)
-
- def testAuthenticatePASS(self):
- return self.P.authenticateUserPASS('user', 'password'
- ).addCallback(self._cbAuthenticatePASS
- )
-
- def _cbAuthenticatePASS(self, result):
- self.assertEquals(len(result), 3)
- self.assertEquals(result[0], pop3.IMailbox)
- self.failUnless(pop3.IMailbox.providedBy(result[1]))
- result[2]()
-
- def testAuthenticateBadUserPASS(self):
- return self.assertFailure(
- self.P.authenticateUserPASS('resu', 'password'),
- cred.error.UnauthorizedLogin)
-
- def testAuthenticateBadPasswordPASS(self):
- return self.assertFailure(
- self.P.authenticateUserPASS('user', 'wrong password'),
- cred.error.UnauthorizedLogin)
-
-class empty(smtp.User):
- def __init__(self):
- pass
-
-class RelayTestCase(unittest.TestCase):
- def testExists(self):
- service = mail.mail.MailService()
- domain = mail.relay.DomainQueuer(service)
-
- doRelay = [
- address.UNIXAddress('/var/run/mail-relay'),
- address.IPv4Address('TCP', '127.0.0.1', 12345),
- ]
-
- dontRelay = [
- address.IPv4Address('TCP', '192.168.2.1', 62),
- address.IPv4Address('TCP', '1.2.3.4', 1943),
- ]
-
- for peer in doRelay:
- user = empty()
- user.orig = 'user@host'
- user.dest = 'tsoh@resu'
- user.protocol = empty()
- user.protocol.transport = empty()
- user.protocol.transport.getPeer = lambda: peer
-
- self.failUnless(callable(domain.exists(user)))
-
- for peer in dontRelay:
- user = empty()
- user.orig = 'some@place'
- user.protocol = empty()
- user.protocol.transport = empty()
- user.protocol.transport.getPeer = lambda: peer
- user.dest = 'who@cares'
-
- self.assertRaises(smtp.SMTPBadRcpt, domain.exists, user)
-
-class RelayerTestCase(unittest.TestCase):
- def setUp(self):
- self.tmpdir = self.mktemp()
- os.mkdir(self.tmpdir)
- self.messageFiles = []
- for i in range(10):
- name = os.path.join(self.tmpdir, 'body-%d' % (i,))
- f = file(name + '-H', 'w')
- pickle.dump(['from-%d' % (i,), 'to-%d' % (i,)], f)
- f.close()
-
- f = file(name + '-D', 'w')
- f.write(name)
- f.seek(0, 0)
- self.messageFiles.append(name)
-
- self.R = mail.relay.RelayerMixin()
- self.R.loadMessages(self.messageFiles)
-
- def tearDown(self):
- shutil.rmtree(self.tmpdir)
-
- def testMailFrom(self):
- for i in range(10):
- self.assertEquals(self.R.getMailFrom(), 'from-%d' % (i,))
- self.R.sentMail(250, None, None, None, None)
- self.assertEquals(self.R.getMailFrom(), None)
-
- def testMailTo(self):
- for i in range(10):
- self.assertEquals(self.R.getMailTo(), ['to-%d' % (i,)])
- self.R.sentMail(250, None, None, None, None)
- self.assertEquals(self.R.getMailTo(), None)
-
- def testMailData(self):
- for i in range(10):
- name = os.path.join(self.tmpdir, 'body-%d' % (i,))
- self.assertEquals(self.R.getMailData().read(), name)
- self.R.sentMail(250, None, None, None, None)
- self.assertEquals(self.R.getMailData(), None)
-
-class Manager:
- def __init__(self):
- self.success = []
- self.failure = []
- self.done = []
-
- def notifySuccess(self, factory, message):
- self.success.append((factory, message))
-
- def notifyFailure(self, factory, message):
- self.failure.append((factory, message))
-
- def notifyDone(self, factory):
- self.done.append(factory)
-
-class ManagedRelayerTestCase(unittest.TestCase):
- def setUp(self):
- self.manager = Manager()
- self.messages = range(0, 20, 2)
- self.factory = object()
- self.relay = mail.relaymanager.ManagedRelayerMixin(self.manager)
- self.relay.messages = self.messages[:]
- self.relay.names = self.messages[:]
- self.relay.factory = self.factory
-
- def testSuccessfulSentMail(self):
- for i in self.messages:
- self.relay.sentMail(250, None, None, None, None)
-
- self.assertEquals(
- self.manager.success,
- [(self.factory, m) for m in self.messages]
- )
-
- def testFailedSentMail(self):
- for i in self.messages:
- self.relay.sentMail(550, None, None, None, None)
-
- self.assertEquals(
- self.manager.failure,
- [(self.factory, m) for m in self.messages]
- )
-
- def testConnectionLost(self):
- self.relay.connectionLost(failure.Failure(Exception()))
- self.assertEquals(self.manager.done, [self.factory])
-
-class DirectoryQueueTestCase(unittest.TestCase):
- def setUp(self):
- # This is almost a test case itself.
- self.tmpdir = self.mktemp()
- os.mkdir(self.tmpdir)
- self.queue = mail.relaymanager.Queue(self.tmpdir)
- self.queue.noisy = False
- for m in range(25):
- hdrF, msgF = self.queue.createNewMessage()
- pickle.dump(['header', m], hdrF)
- hdrF.close()
- msgF.lineReceived('body: %d' % (m,))
- msgF.eomReceived()
- self.queue.readDirectory()
-
- def tearDown(self):
- shutil.rmtree(self.tmpdir)
-
- def testWaiting(self):
- self.failUnless(self.queue.hasWaiting())
- self.assertEquals(len(self.queue.getWaiting()), 25)
-
- waiting = self.queue.getWaiting()
- self.queue.setRelaying(waiting[0])
- self.assertEquals(len(self.queue.getWaiting()), 24)
-
- self.queue.setWaiting(waiting[0])
- self.assertEquals(len(self.queue.getWaiting()), 25)
-
- def testRelaying(self):
- for m in self.queue.getWaiting():
- self.queue.setRelaying(m)
- self.assertEquals(
- len(self.queue.getRelayed()),
- 25 - len(self.queue.getWaiting())
- )
-
- self.failIf(self.queue.hasWaiting())
-
- relayed = self.queue.getRelayed()
- self.queue.setWaiting(relayed[0])
- self.assertEquals(len(self.queue.getWaiting()), 1)
- self.assertEquals(len(self.queue.getRelayed()), 24)
-
- def testDone(self):
- msg = self.queue.getWaiting()[0]
- self.queue.setRelaying(msg)
- self.queue.done(msg)
-
- self.assertEquals(len(self.queue.getWaiting()), 24)
- self.assertEquals(len(self.queue.getRelayed()), 0)
-
- self.failIf(msg in self.queue.getWaiting())
- self.failIf(msg in self.queue.getRelayed())
-
- def testEnvelope(self):
- envelopes = []
-
- for msg in self.queue.getWaiting():
- envelopes.append(self.queue.getEnvelope(msg))
-
- envelopes.sort()
- for i in range(25):
- self.assertEquals(
- envelopes.pop(0),
- ['header', i]
- )
-
-from twisted.names import server
-from twisted.names import client
-from twisted.names import common
-
-class TestAuthority(common.ResolverBase):
- def __init__(self):
- common.ResolverBase.__init__(self)
- self.addresses = {}
-
- def _lookup(self, name, cls, type, timeout = None):
- if name in self.addresses and type == dns.MX:
- results = []
- for a in self.addresses[name]:
- hdr = dns.RRHeader(
- name, dns.MX, dns.IN, 60, dns.Record_MX(0, a)
- )
- results.append(hdr)
- return defer.succeed((results, [], []))
- return defer.fail(failure.Failure(dns.DomainError(name)))
-
-def setUpDNS(self):
- self.auth = TestAuthority()
- factory = server.DNSServerFactory([self.auth])
- protocol = dns.DNSDatagramProtocol(factory)
- while 1:
- self.port = reactor.listenTCP(0, factory, interface='127.0.0.1')
- portNumber = self.port.getHost().port
-
- try:
- self.udpPort = reactor.listenUDP(portNumber, protocol, interface='127.0.0.1')
- except CannotListenError:
- self.port.stopListening()
- else:
- break
- self.resolver = client.Resolver(servers=[('127.0.0.1', portNumber)])
-
-
-def tearDownDNS(self):
- dl = []
- dl.append(defer.maybeDeferred(self.port.stopListening))
- dl.append(defer.maybeDeferred(self.udpPort.stopListening))
- if self.resolver.protocol.transport is not None:
- dl.append(defer.maybeDeferred(self.resolver.protocol.transport.stopListening))
- try:
- self.resolver._parseCall.cancel()
- except:
- pass
- return defer.DeferredList(dl)
-
-class MXTestCase(unittest.TestCase):
- """
- Tests for L{mail.relaymanager.MXCalculator}.
- """
- def setUp(self):
- setUpDNS(self)
- self.clock = task.Clock()
- self.mx = mail.relaymanager.MXCalculator(self.resolver, self.clock)
-
- def tearDown(self):
- return tearDownDNS(self)
-
-
- def test_defaultClock(self):
- """
- L{MXCalculator}'s default clock is C{twisted.internet.reactor}.
- """
- self.assertIdentical(
- mail.relaymanager.MXCalculator(self.resolver).clock,
- reactor)
-
-
- def testSimpleSuccess(self):
- self.auth.addresses['test.domain'] = ['the.email.test.domain']
- return self.mx.getMX('test.domain').addCallback(self._cbSimpleSuccess)
-
- def _cbSimpleSuccess(self, mx):
- self.assertEquals(mx.preference, 0)
- self.assertEquals(str(mx.name), 'the.email.test.domain')
-
- def testSimpleFailure(self):
- self.mx.fallbackToDomain = False
- return self.assertFailure(self.mx.getMX('test.domain'), IOError)
-
- def testSimpleFailureWithFallback(self):
- return self.assertFailure(self.mx.getMX('test.domain'), DNSLookupError)
-
-
- def _exchangeTest(self, domain, records, correctMailExchange):
- """
- Issue an MX request for the given domain and arrange for it to be
- responded to with the given records. Verify that the resulting mail
- exchange is the indicated host.
-
- @type domain: C{str}
- @type records: C{list} of L{RRHeader}
- @type correctMailExchange: C{str}
- @rtype: L{Deferred}
- """
- class DummyResolver(object):
- def lookupMailExchange(self, name):
- if name == domain:
- return defer.succeed((
- records,
- [],
- []))
- return defer.fail(DNSNameError(domain))
-
- self.mx.resolver = DummyResolver()
- d = self.mx.getMX(domain)
- def gotMailExchange(record):
- self.assertEqual(str(record.name), correctMailExchange)
- d.addCallback(gotMailExchange)
- return d
-
-
- def test_mailExchangePreference(self):
- """
- The MX record with the lowest preference is returned by
- L{MXCalculator.getMX}.
- """
- domain = "example.com"
- good = "good.example.com"
- bad = "bad.example.com"
-
- records = [
- RRHeader(name=domain,
- type=Record_MX.TYPE,
- payload=Record_MX(1, bad)),
- RRHeader(name=domain,
- type=Record_MX.TYPE,
- payload=Record_MX(0, good)),
- RRHeader(name=domain,
- type=Record_MX.TYPE,
- payload=Record_MX(2, bad))]
- return self._exchangeTest(domain, records, good)
-
-
- def test_badExchangeExcluded(self):
- """
- L{MXCalculator.getMX} returns the MX record with the lowest preference
- which is not also marked as bad.
- """
- domain = "example.com"
- good = "good.example.com"
- bad = "bad.example.com"
-
- records = [
- RRHeader(name=domain,
- type=Record_MX.TYPE,
- payload=Record_MX(0, bad)),
- RRHeader(name=domain,
- type=Record_MX.TYPE,
- payload=Record_MX(1, good))]
- self.mx.markBad(bad)
- return self._exchangeTest(domain, records, good)
-
-
- def test_fallbackForAllBadExchanges(self):
- """
- L{MXCalculator.getMX} returns the MX record with the lowest preference
- if all the MX records in the response have been marked bad.
- """
- domain = "example.com"
- bad = "bad.example.com"
- worse = "worse.example.com"
-
- records = [
- RRHeader(name=domain,
- type=Record_MX.TYPE,
- payload=Record_MX(0, bad)),
- RRHeader(name=domain,
- type=Record_MX.TYPE,
- payload=Record_MX(1, worse))]
- self.mx.markBad(bad)
- self.mx.markBad(worse)
- return self._exchangeTest(domain, records, bad)
-
-
- def test_badExchangeExpires(self):
- """
- L{MXCalculator.getMX} returns the MX record with the lowest preference
- if it was last marked bad longer than L{MXCalculator.timeOutBadMX}
- seconds ago.
- """
- domain = "example.com"
- good = "good.example.com"
- previouslyBad = "bad.example.com"
-
- records = [
- RRHeader(name=domain,
- type=Record_MX.TYPE,
- payload=Record_MX(0, previouslyBad)),
- RRHeader(name=domain,
- type=Record_MX.TYPE,
- payload=Record_MX(1, good))]
- self.mx.markBad(previouslyBad)
- self.clock.advance(self.mx.timeOutBadMX)
- return self._exchangeTest(domain, records, previouslyBad)
-
-
- def test_goodExchangeUsed(self):
- """
- L{MXCalculator.getMX} returns the MX record with the lowest preference
- if it was marked good after it was marked bad.
- """
- domain = "example.com"
- good = "good.example.com"
- previouslyBad = "bad.example.com"
-
- records = [
- RRHeader(name=domain,
- type=Record_MX.TYPE,
- payload=Record_MX(0, previouslyBad)),
- RRHeader(name=domain,
- type=Record_MX.TYPE,
- payload=Record_MX(1, good))]
- self.mx.markBad(previouslyBad)
- self.mx.markGood(previouslyBad)
- self.clock.advance(self.mx.timeOutBadMX)
- return self._exchangeTest(domain, records, previouslyBad)
-
-
- def test_successWithoutResults(self):
- """
- If an MX lookup succeeds but the result set is empty,
- L{MXCalculator.getMX} should try to look up an I{A} record for the
- requested name and call back its returned Deferred with that
- address.
- """
- ip = '1.2.3.4'
- domain = 'example.org'
-
- class DummyResolver(object):
- """
- Fake resolver which will respond to an MX lookup with an empty
- result set.
-
- @ivar mx: A dictionary mapping hostnames to three-tuples of
- results to be returned from I{MX} lookups.
-
- @ivar a: A dictionary mapping hostnames to addresses to be
- returned from I{A} lookups.
- """
- mx = {domain: ([], [], [])}
- a = {domain: ip}
-
- def lookupMailExchange(self, domain):
- return defer.succeed(self.mx[domain])
-
- def getHostByName(self, domain):
- return defer.succeed(self.a[domain])
-
- self.mx.resolver = DummyResolver()
- d = self.mx.getMX(domain)
- d.addCallback(self.assertEqual, Record_MX(name=ip))
- return d
-
-
- def test_failureWithSuccessfulFallback(self):
- """
- Test that if the MX record lookup fails, fallback is enabled, and an A
- record is available for the name, then the Deferred returned by
- L{MXCalculator.getMX} ultimately fires with a Record_MX instance which
- gives the address in the A record for the name.
- """
- class DummyResolver(object):
- """
- Fake resolver which will fail an MX lookup but then succeed a
- getHostByName call.
- """
- def lookupMailExchange(self, domain):
- return defer.fail(DNSNameError())
-
- def getHostByName(self, domain):
- return defer.succeed("1.2.3.4")
-
- self.mx.resolver = DummyResolver()
- d = self.mx.getMX("domain")
- d.addCallback(self.assertEqual, Record_MX(name="1.2.3.4"))
- return d
-
-
- def test_cnameWithoutGlueRecords(self):
- """
- If an MX lookup returns a single CNAME record as a result, MXCalculator
- will perform an MX lookup for the canonical name indicated and return
- the MX record which results.
- """
- alias = "alias.example.com"
- canonical = "canonical.example.com"
- exchange = "mail.example.com"
-
- class DummyResolver(object):
- """
- Fake resolver which will return a CNAME for an MX lookup of a name
- which is an alias and an MX for an MX lookup of the canonical name.
- """
- def lookupMailExchange(self, domain):
- if domain == alias:
- return defer.succeed((
- [RRHeader(name=domain,
- type=Record_CNAME.TYPE,
- payload=Record_CNAME(canonical))],
- [], []))
- elif domain == canonical:
- return defer.succeed((
- [RRHeader(name=domain,
- type=Record_MX.TYPE,
- payload=Record_MX(0, exchange))],
- [], []))
- else:
- return defer.fail(DNSNameError(domain))
-
- self.mx.resolver = DummyResolver()
- d = self.mx.getMX(alias)
- d.addCallback(self.assertEqual, Record_MX(name=exchange))
- return d
-
-
- def test_cnameChain(self):
- """
- If L{MXCalculator.getMX} encounters a CNAME chain which is longer than
- the length specified, the returned L{Deferred} should errback with
- L{CanonicalNameChainTooLong}.
- """
- class DummyResolver(object):
- """
- Fake resolver which generates a CNAME chain of infinite length in
- response to MX lookups.
- """
- chainCounter = 0
-
- def lookupMailExchange(self, domain):
- self.chainCounter += 1
- name = 'x-%d.example.com' % (self.chainCounter,)
- return defer.succeed((
- [RRHeader(name=domain,
- type=Record_CNAME.TYPE,
- payload=Record_CNAME(name))],
- [], []))
-
- cnameLimit = 3
- self.mx.resolver = DummyResolver()
- d = self.mx.getMX("mail.example.com", cnameLimit)
- self.assertFailure(
- d, twisted.mail.relaymanager.CanonicalNameChainTooLong)
- def cbChainTooLong(error):
- self.assertEqual(error.args[0], Record_CNAME("x-%d.example.com" % (cnameLimit + 1,)))
- self.assertEqual(self.mx.resolver.chainCounter, cnameLimit + 1)
- d.addCallback(cbChainTooLong)
- return d
-
-
- def test_cnameWithGlueRecords(self):
- """
- If an MX lookup returns a CNAME and the MX record for the CNAME, the
- L{Deferred} returned by L{MXCalculator.getMX} should be called back
- with the name from the MX record without further lookups being
- attempted.
- """
- lookedUp = []
- alias = "alias.example.com"
- canonical = "canonical.example.com"
- exchange = "mail.example.com"
-
- class DummyResolver(object):
- def lookupMailExchange(self, domain):
- if domain != alias or lookedUp:
- # Don't give back any results for anything except the alias
- # or on any request after the first.
- return ([], [], [])
- return defer.succeed((
- [RRHeader(name=alias,
- type=Record_CNAME.TYPE,
- payload=Record_CNAME(canonical)),
- RRHeader(name=canonical,
- type=Record_MX.TYPE,
- payload=Record_MX(name=exchange))],
- [], []))
-
- self.mx.resolver = DummyResolver()
- d = self.mx.getMX(alias)
- d.addCallback(self.assertEqual, Record_MX(name=exchange))
- return d
-
-
- def test_cnameLoopWithGlueRecords(self):
- """
- If an MX lookup returns two CNAME records which point to each other,
- the loop should be detected and the L{Deferred} returned by
- L{MXCalculator.getMX} should be errbacked with L{CanonicalNameLoop}.
- """
- firstAlias = "cname1.example.com"
- secondAlias = "cname2.example.com"
-
- class DummyResolver(object):
- def lookupMailExchange(self, domain):
- return defer.succeed((
- [RRHeader(name=firstAlias,
- type=Record_CNAME.TYPE,
- payload=Record_CNAME(secondAlias)),
- RRHeader(name=secondAlias,
- type=Record_CNAME.TYPE,
- payload=Record_CNAME(firstAlias))],
- [], []))
-
- self.mx.resolver = DummyResolver()
- d = self.mx.getMX(firstAlias)
- self.assertFailure(d, twisted.mail.relaymanager.CanonicalNameLoop)
- return d
-
-
- def testManyRecords(self):
- self.auth.addresses['test.domain'] = [
- 'mx1.test.domain', 'mx2.test.domain', 'mx3.test.domain'
- ]
- return self.mx.getMX('test.domain'
- ).addCallback(self._cbManyRecordsSuccessfulLookup
- )
-
- def _cbManyRecordsSuccessfulLookup(self, mx):
- self.failUnless(str(mx.name).split('.', 1)[0] in ('mx1', 'mx2', 'mx3'))
- self.mx.markBad(str(mx.name))
- return self.mx.getMX('test.domain'
- ).addCallback(self._cbManyRecordsDifferentResult, mx
- )
-
- def _cbManyRecordsDifferentResult(self, nextMX, mx):
- self.assertNotEqual(str(mx.name), str(nextMX.name))
- self.mx.markBad(str(nextMX.name))
-
- return self.mx.getMX('test.domain'
- ).addCallback(self._cbManyRecordsLastResult, mx, nextMX
- )
-
- def _cbManyRecordsLastResult(self, lastMX, mx, nextMX):
- self.assertNotEqual(str(mx.name), str(lastMX.name))
- self.assertNotEqual(str(nextMX.name), str(lastMX.name))
-
- self.mx.markBad(str(lastMX.name))
- self.mx.markGood(str(nextMX.name))
-
- return self.mx.getMX('test.domain'
- ).addCallback(self._cbManyRecordsRepeatSpecificResult, nextMX
- )
-
- def _cbManyRecordsRepeatSpecificResult(self, againMX, nextMX):
- self.assertEqual(str(againMX.name), str(nextMX.name))
-
-class LiveFireExercise(unittest.TestCase):
- if interfaces.IReactorUDP(reactor, None) is None:
- skip = "UDP support is required to determining MX records"
-
- def setUp(self):
- setUpDNS(self)
- self.tmpdirs = [
- 'domainDir', 'insertionDomain', 'insertionQueue',
- 'destinationDomain', 'destinationQueue'
- ]
-
- def tearDown(self):
- for d in self.tmpdirs:
- if os.path.exists(d):
- shutil.rmtree(d)
- return tearDownDNS(self)
-
- def testLocalDelivery(self):
- service = mail.mail.MailService()
- service.smtpPortal.registerChecker(cred.checkers.AllowAnonymousAccess())
- domain = mail.maildir.MaildirDirdbmDomain(service, 'domainDir')
- domain.addUser('user', 'password')
- service.addDomain('test.domain', domain)
- service.portals[''] = service.portals['test.domain']
- map(service.portals[''].registerChecker, domain.getCredentialsCheckers())
-
- service.setQueue(mail.relay.DomainQueuer(service))
- manager = mail.relaymanager.SmartHostSMTPRelayingManager(service.queue, None)
- helper = mail.relaymanager.RelayStateHelper(manager, 1)
-
- f = service.getSMTPFactory()
-
- self.smtpServer = reactor.listenTCP(0, f, interface='127.0.0.1')
-
- client = LineSendingProtocol([
- 'HELO meson',
- 'MAIL FROM: <user@hostname>',
- 'RCPT TO: <user@test.domain>',
- 'DATA',
- 'This is the message',
- '.',
- 'QUIT'
- ])
-
- done = Deferred()
- f = protocol.ClientFactory()
- f.protocol = lambda: client
- f.clientConnectionLost = lambda *args: done.callback(None)
- reactor.connectTCP('127.0.0.1', self.smtpServer.getHost().port, f)
-
- def finished(ign):
- mbox = domain.requestAvatar('user', None, pop3.IMailbox)[1]
- msg = mbox.getMessage(0).read()
- self.failIfEqual(msg.find('This is the message'), -1)
-
- return self.smtpServer.stopListening()
- done.addCallback(finished)
- return done
-
-
- def testRelayDelivery(self):
- # Here is the service we will connect to and send mail from
- insServ = mail.mail.MailService()
- insServ.smtpPortal.registerChecker(cred.checkers.AllowAnonymousAccess())
- domain = mail.maildir.MaildirDirdbmDomain(insServ, 'insertionDomain')
- insServ.addDomain('insertion.domain', domain)
- os.mkdir('insertionQueue')
- insServ.setQueue(mail.relaymanager.Queue('insertionQueue'))
- insServ.domains.setDefaultDomain(mail.relay.DomainQueuer(insServ))
- manager = mail.relaymanager.SmartHostSMTPRelayingManager(insServ.queue)
- manager.fArgs += ('test.identity.hostname',)
- helper = mail.relaymanager.RelayStateHelper(manager, 1)
- # Yoink! Now the internet obeys OUR every whim!
- manager.mxcalc = mail.relaymanager.MXCalculator(self.resolver)
- # And this is our whim.
- self.auth.addresses['destination.domain'] = ['127.0.0.1']
-
- f = insServ.getSMTPFactory()
- self.insServer = reactor.listenTCP(0, f, interface='127.0.0.1')
-
- # Here is the service the previous one will connect to for final
- # delivery
- destServ = mail.mail.MailService()
- destServ.smtpPortal.registerChecker(cred.checkers.AllowAnonymousAccess())
- domain = mail.maildir.MaildirDirdbmDomain(destServ, 'destinationDomain')
- domain.addUser('user', 'password')
- destServ.addDomain('destination.domain', domain)
- os.mkdir('destinationQueue')
- destServ.setQueue(mail.relaymanager.Queue('destinationQueue'))
- manager2 = mail.relaymanager.SmartHostSMTPRelayingManager(destServ.queue)
- helper = mail.relaymanager.RelayStateHelper(manager, 1)
- helper.startService()
-
- f = destServ.getSMTPFactory()
- self.destServer = reactor.listenTCP(0, f, interface='127.0.0.1')
-
- # Update the port number the *first* relay will connect to, because we can't use
- # port 25
- manager.PORT = self.destServer.getHost().port
-
- client = LineSendingProtocol([
- 'HELO meson',
- 'MAIL FROM: <user@wherever>',
- 'RCPT TO: <user@destination.domain>',
- 'DATA',
- 'This is the message',
- '.',
- 'QUIT'
- ])
-
- done = Deferred()
- f = protocol.ClientFactory()
- f.protocol = lambda: client
- f.clientConnectionLost = lambda *args: done.callback(None)
- reactor.connectTCP('127.0.0.1', self.insServer.getHost().port, f)
-
- def finished(ign):
- # First part of the delivery is done. Poke the queue manually now
- # so we don't have to wait for the queue to be flushed.
- delivery = manager.checkState()
- def delivered(ign):
- mbox = domain.requestAvatar('user', None, pop3.IMailbox)[1]
- msg = mbox.getMessage(0).read()
- self.failIfEqual(msg.find('This is the message'), -1)
-
- self.insServer.stopListening()
- self.destServer.stopListening()
- helper.stopService()
- delivery.addCallback(delivered)
- return delivery
- done.addCallback(finished)
- return done
-
-
-aliasFile = StringIO.StringIO("""\
-# Here's a comment
- # woop another one
-testuser: address1,address2, address3,
- continuation@address, |/bin/process/this
-
-usertwo:thisaddress,thataddress, lastaddress
-lastuser: :/includable, /filename, |/program, address
-""")
-
-class LineBufferMessage:
- def __init__(self):
- self.lines = []
- self.eom = False
- self.lost = False
-
- def lineReceived(self, line):
- self.lines.append(line)
-
- def eomReceived(self):
- self.eom = True
- return defer.succeed('<Whatever>')
-
- def connectionLost(self):
- self.lost = True
-
-class AliasTestCase(unittest.TestCase):
- lines = [
- 'First line',
- 'Next line',
- '',
- 'After a blank line',
- 'Last line'
- ]
-
- def setUp(self):
- aliasFile.seek(0)
-
- def testHandle(self):
- result = {}
- lines = [
- 'user: another@host\n',
- 'nextuser: |/bin/program\n',
- 'user: me@again\n',
- 'moreusers: :/etc/include/filename\n',
- 'multiuser: first@host, second@host,last@anotherhost',
- ]
-
- for l in lines:
- mail.alias.handle(result, l, 'TestCase', None)
-
- self.assertEquals(result['user'], ['another@host', 'me@again'])
- self.assertEquals(result['nextuser'], ['|/bin/program'])
- self.assertEquals(result['moreusers'], [':/etc/include/filename'])
- self.assertEquals(result['multiuser'], ['first@host', 'second@host', 'last@anotherhost'])
-
- def testFileLoader(self):
- domains = {'': object()}
- result = mail.alias.loadAliasFile(domains, fp=aliasFile)
-
- self.assertEquals(len(result), 3)
-
- group = result['testuser']
- s = str(group)
- for a in ('address1', 'address2', 'address3', 'continuation@address', '/bin/process/this'):
- self.failIfEqual(s.find(a), -1)
- self.assertEquals(len(group), 5)
-
- group = result['usertwo']
- s = str(group)
- for a in ('thisaddress', 'thataddress', 'lastaddress'):
- self.failIfEqual(s.find(a), -1)
- self.assertEquals(len(group), 3)
-
- group = result['lastuser']
- s = str(group)
- self.failUnlessEqual(s.find('/includable'), -1)
- for a in ('/filename', 'program', 'address'):
- self.failIfEqual(s.find(a), -1, '%s not found' % a)
- self.assertEquals(len(group), 3)
-
- def testMultiWrapper(self):
- msgs = LineBufferMessage(), LineBufferMessage(), LineBufferMessage()
- msg = mail.alias.MultiWrapper(msgs)
-
- for L in self.lines:
- msg.lineReceived(L)
- return msg.eomReceived().addCallback(self._cbMultiWrapper, msgs)
-
- def _cbMultiWrapper(self, ignored, msgs):
- for m in msgs:
- self.failUnless(m.eom)
- self.failIf(m.lost)
- self.assertEquals(self.lines, m.lines)
-
- def testFileAlias(self):
- tmpfile = self.mktemp()
- a = mail.alias.FileAlias(tmpfile, None, None)
- m = a.createMessageReceiver()
-
- for l in self.lines:
- m.lineReceived(l)
- return m.eomReceived().addCallback(self._cbTestFileAlias, tmpfile)
-
- def _cbTestFileAlias(self, ignored, tmpfile):
- lines = file(tmpfile).readlines()
- self.assertEquals([L[:-1] for L in lines], self.lines)
-
-
-
-class DummyProcess(object):
- __slots__ = ['onEnd']
-
-
-
-class MockProcessAlias(mail.alias.ProcessAlias):
- """
- A alias processor that doesn't actually launch processes.
- """
-
- def spawnProcess(self, proto, program, path):
- """
- Don't spawn a process.
- """
-
-
-
-class MockAliasGroup(mail.alias.AliasGroup):
- """
- An alias group using C{MockProcessAlias}.
- """
- processAliasFactory = MockProcessAlias
-
-
-
-class StubProcess(object):
- """
- Fake implementation of L{IProcessTransport}.
-
- @ivar signals: A list of all the signals which have been sent to this fake
- process.
- """
- def __init__(self):
- self.signals = []
-
-
- def loseConnection(self):
- """
- No-op implementation of disconnection.
- """
-
-
- def signalProcess(self, signal):
- """
- Record a signal sent to this process for later inspection.
- """
- self.signals.append(signal)
-
-
-
-class ProcessAliasTestCase(unittest.TestCase):
- """
- Tests for alias resolution.
- """
-
- lines = [
- 'First line',
- 'Next line',
- '',
- 'After a blank line',
- 'Last line'
- ]
-
- def exitStatus(self, code):
- """
- Construct a status from the given exit code.
-
- @type code: L{int} between 0 and 255 inclusive.
- @param code: The exit status which the code will represent.
-
- @rtype: L{int}
- @return: A status integer for the given exit code.
- """
- # /* Macros for constructing status values. */
- # #define __W_EXITCODE(ret, sig) ((ret) << 8 | (sig))
- status = (code << 8) | 0
-
- # Sanity check
- self.assertTrue(os.WIFEXITED(status))
- self.assertEqual(os.WEXITSTATUS(status), code)
- self.assertFalse(os.WIFSIGNALED(status))
-
- return status
-
-
- def signalStatus(self, signal):
- """
- Construct a status from the given signal.
-
- @type signal: L{int} between 0 and 255 inclusive.
- @param signal: The signal number which the status will represent.
-
- @rtype: L{int}
- @return: A status integer for the given signal.
- """
- # /* If WIFSIGNALED(STATUS), the terminating signal. */
- # #define __WTERMSIG(status) ((status) & 0x7f)
- # /* Nonzero if STATUS indicates termination by a signal. */
- # #define __WIFSIGNALED(status) \
- # (((signed char) (((status) & 0x7f) + 1) >> 1) > 0)
- status = signal
-
- # Sanity check
- self.assertTrue(os.WIFSIGNALED(status))
- self.assertEqual(os.WTERMSIG(status), signal)
- self.assertFalse(os.WIFEXITED(status))
-
- return status
-
-
- def setUp(self):
- """
- Replace L{smtp.DNSNAME} with a well-known value.
- """
- self.DNSNAME = smtp.DNSNAME
- smtp.DNSNAME = ''
-
-
- def tearDown(self):
- """
- Restore the original value of L{smtp.DNSNAME}.
- """
- smtp.DNSNAME = self.DNSNAME
-
-
- def test_processAlias(self):
- """
- Standard call to C{mail.alias.ProcessAlias}: check that the specified
- script is called, and that the input is correctly transferred to it.
- """
- sh = FilePath(self.mktemp())
- sh.setContent("""\
-#!/bin/sh
-rm -f process.alias.out
-while read i; do
- echo $i >> process.alias.out
-done""")
- os.chmod(sh.path, 0700)
- a = mail.alias.ProcessAlias(sh.path, None, None)
- m = a.createMessageReceiver()
-
- for l in self.lines:
- m.lineReceived(l)
-
- def _cbProcessAlias(ignored):
- lines = file('process.alias.out').readlines()
- self.assertEquals([L[:-1] for L in lines], self.lines)
-
- return m.eomReceived().addCallback(_cbProcessAlias)
-
-
- def test_processAliasTimeout(self):
- """
- If the alias child process does not exit within a particular period of
- time, the L{Deferred} returned by L{MessageWrapper.eomReceived} should
- fail with L{ProcessAliasTimeout} and send the I{KILL} signal to the
- child process..
- """
- reactor = task.Clock()
- transport = StubProcess()
- proto = mail.alias.ProcessAliasProtocol()
- proto.makeConnection(transport)
-
- receiver = mail.alias.MessageWrapper(proto, None, reactor)
- d = receiver.eomReceived()
- reactor.advance(receiver.completionTimeout)
- def timedOut(ignored):
- self.assertEqual(transport.signals, ['KILL'])
- # Now that it has been killed, disconnect the protocol associated
- # with it.
- proto.processEnded(
- ProcessTerminated(self.signalStatus(signal.SIGKILL)))
- self.assertFailure(d, mail.alias.ProcessAliasTimeout)
- d.addCallback(timedOut)
- return d
-
-
- def test_earlyProcessTermination(self):
- """
- If the process associated with an L{mail.alias.MessageWrapper} exits
- before I{eomReceived} is called, the L{Deferred} returned by
- I{eomReceived} should fail.
- """
- transport = StubProcess()
- protocol = mail.alias.ProcessAliasProtocol()
- protocol.makeConnection(transport)
- receiver = mail.alias.MessageWrapper(protocol, None, None)
- protocol.processEnded(failure.Failure(ProcessDone(0)))
- return self.assertFailure(receiver.eomReceived(), ProcessDone)
-
-
- def _terminationTest(self, status):
- """
- Verify that if the process associated with an
- L{mail.alias.MessageWrapper} exits with the given status, the
- L{Deferred} returned by I{eomReceived} fails with L{ProcessTerminated}.
- """
- transport = StubProcess()
- protocol = mail.alias.ProcessAliasProtocol()
- protocol.makeConnection(transport)
- receiver = mail.alias.MessageWrapper(protocol, None, None)
- protocol.processEnded(
- failure.Failure(ProcessTerminated(status)))
- return self.assertFailure(receiver.eomReceived(), ProcessTerminated)
-
-
- def test_errorProcessTermination(self):
- """
- If the process associated with an L{mail.alias.MessageWrapper} exits
- with a non-zero exit code, the L{Deferred} returned by I{eomReceived}
- should fail.
- """
- return self._terminationTest(self.exitStatus(1))
-
-
- def test_signalProcessTermination(self):
- """
- If the process associated with an L{mail.alias.MessageWrapper} exits
- because it received a signal, the L{Deferred} returned by
- I{eomReceived} should fail.
- """
- return self._terminationTest(self.signalStatus(signal.SIGHUP))
-
-
- def test_aliasResolution(self):
- """
- Check that the C{resolve} method of alias processors produce the correct
- set of objects:
- - direct alias with L{mail.alias.AddressAlias} if a simple input is passed
- - aliases in a file with L{mail.alias.FileWrapper} if an input in the format
- '/file' is given
- - aliases resulting of a process call wrapped by L{mail.alias.MessageWrapper}
- if the format is '|process'
- """
- aliases = {}
- domain = {'': TestDomain(aliases, ['user1', 'user2', 'user3'])}
- A1 = MockAliasGroup(['user1', '|echo', '/file'], domain, 'alias1')
- A2 = MockAliasGroup(['user2', 'user3'], domain, 'alias2')
- A3 = mail.alias.AddressAlias('alias1', domain, 'alias3')
- aliases.update({
- 'alias1': A1,
- 'alias2': A2,
- 'alias3': A3,
- })
-
- res1 = A1.resolve(aliases)
- r1 = map(str, res1.objs)
- r1.sort()
- expected = map(str, [
- mail.alias.AddressAlias('user1', None, None),
- mail.alias.MessageWrapper(DummyProcess(), 'echo'),
- mail.alias.FileWrapper('/file'),
- ])
- expected.sort()
- self.assertEquals(r1, expected)
-
- res2 = A2.resolve(aliases)
- r2 = map(str, res2.objs)
- r2.sort()
- expected = map(str, [
- mail.alias.AddressAlias('user2', None, None),
- mail.alias.AddressAlias('user3', None, None)
- ])
- expected.sort()
- self.assertEquals(r2, expected)
-
- res3 = A3.resolve(aliases)
- r3 = map(str, res3.objs)
- r3.sort()
- expected = map(str, [
- mail.alias.AddressAlias('user1', None, None),
- mail.alias.MessageWrapper(DummyProcess(), 'echo'),
- mail.alias.FileWrapper('/file'),
- ])
- expected.sort()
- self.assertEquals(r3, expected)
-
-
- def test_cyclicAlias(self):
- """
- Check that a cycle in alias resolution is correctly handled.
- """
- aliases = {}
- domain = {'': TestDomain(aliases, [])}
- A1 = mail.alias.AddressAlias('alias2', domain, 'alias1')
- A2 = mail.alias.AddressAlias('alias3', domain, 'alias2')
- A3 = mail.alias.AddressAlias('alias1', domain, 'alias3')
- aliases.update({
- 'alias1': A1,
- 'alias2': A2,
- 'alias3': A3
- })
-
- self.assertEquals(aliases['alias1'].resolve(aliases), None)
- self.assertEquals(aliases['alias2'].resolve(aliases), None)
- self.assertEquals(aliases['alias3'].resolve(aliases), None)
-
- A4 = MockAliasGroup(['|echo', 'alias1'], domain, 'alias4')
- aliases['alias4'] = A4
-
- res = A4.resolve(aliases)
- r = map(str, res.objs)
- r.sort()
- expected = map(str, [
- mail.alias.MessageWrapper(DummyProcess(), 'echo')
- ])
- expected.sort()
- self.assertEquals(r, expected)
-
-
-
-if interfaces.IReactorProcess(reactor, None) is None:
- ProcessAliasTestCase = "IReactorProcess not supported"
-
-
-
-class TestDomain:
- def __init__(self, aliases, users):
- self.aliases = aliases
- self.users = users
-
- def exists(self, user, memo=None):
- user = user.dest.local
- if user in self.users:
- return lambda: mail.alias.AddressAlias(user, None, None)
- try:
- a = self.aliases[user]
- except:
- raise smtp.SMTPBadRcpt(user)
- else:
- aliases = a.resolve(self.aliases, memo)
- if aliases:
- return lambda: aliases
- raise smtp.SMTPBadRcpt(user)
-
-
-from twisted.python.runtime import platformType
-import types
-if platformType != "posix":
- for o in locals().values():
- if isinstance(o, (types.ClassType, type)) and issubclass(o, unittest.TestCase):
- o.skip = "twisted.mail only works on posix"
« no previous file with comments | « third_party/twisted_8_1/twisted/mail/test/test_imap.py ('k') | third_party/twisted_8_1/twisted/mail/test/test_options.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698