Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(254)

Side by Side Diff: third_party/twisted_8_1/twisted/mail/test/test_mail.py

Issue 12261012: Remove third_party/twisted_8_1 (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/tools/build
Patch Set: Created 7 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 # Copyright (c) 2001-2008 Twisted Matrix Laboratories.
2 # See LICENSE for details.
3
4 """
5 Tests for large portions of L{twisted.mail}.
6 """
7
8 import os
9 import errno
10 import md5
11 import shutil
12 import pickle
13 import StringIO
14 import rfc822
15 import tempfile
16 import signal
17
18 from zope.interface import Interface, implements
19
20 from twisted.trial import unittest
21 from twisted.mail import smtp
22 from twisted.mail import pop3
23 from twisted.names import dns
24 from twisted.internet import protocol
25 from twisted.internet import defer
26 from twisted.internet.defer import Deferred
27 from twisted.internet import reactor
28 from twisted.internet import interfaces
29 from twisted.internet import task
30 from twisted.internet.error import DNSLookupError, CannotListenError
31 from twisted.internet.error import ProcessDone, ProcessTerminated
32 from twisted.internet import address
33 from twisted.python import failure
34 from twisted.python.filepath import FilePath
35
36 from twisted import mail
37 import twisted.mail.mail
38 import twisted.mail.maildir
39 import twisted.mail.relay
40 import twisted.mail.relaymanager
41 import twisted.mail.protocols
42 import twisted.mail.alias
43
44 from twisted.names.error import DNSNameError
45 from twisted.names.dns import RRHeader, Record_CNAME, Record_MX
46
47 from twisted import cred
48 import twisted.cred.credentials
49 import twisted.cred.checkers
50 import twisted.cred.portal
51
52 from twisted.test.proto_helpers import LineSendingProtocol
53
54 class DomainWithDefaultsTestCase(unittest.TestCase):
55 def testMethods(self):
56 d = dict([(x, x + 10) for x in range(10)])
57 d = mail.mail.DomainWithDefaultDict(d, 'Default')
58
59 self.assertEquals(len(d), 10)
60 self.assertEquals(list(iter(d)), range(10))
61 self.assertEquals(list(d.iterkeys()), list(iter(d)))
62
63 items = list(d.iteritems())
64 items.sort()
65 self.assertEquals(items, [(x, x + 10) for x in range(10)])
66
67 values = list(d.itervalues())
68 values.sort()
69 self.assertEquals(values, range(10, 20))
70
71 items = d.items()
72 items.sort()
73 self.assertEquals(items, [(x, x + 10) for x in range(10)])
74
75 values = d.values()
76 values.sort()
77 self.assertEquals(values, range(10, 20))
78
79 for x in range(10):
80 self.assertEquals(d[x], x + 10)
81 self.assertEquals(d.get(x), x + 10)
82 self.failUnless(x in d)
83 self.failUnless(d.has_key(x))
84
85 del d[2], d[4], d[6]
86
87 self.assertEquals(len(d), 7)
88 self.assertEquals(d[2], 'Default')
89 self.assertEquals(d[4], 'Default')
90 self.assertEquals(d[6], 'Default')
91
92 d.update({'a': None, 'b': (), 'c': '*'})
93 self.assertEquals(len(d), 10)
94 self.assertEquals(d['a'], None)
95 self.assertEquals(d['b'], ())
96 self.assertEquals(d['c'], '*')
97
98 d.clear()
99 self.assertEquals(len(d), 0)
100
101 self.assertEquals(d.setdefault('key', 'value'), 'value')
102 self.assertEquals(d['key'], 'value')
103
104 self.assertEquals(d.popitem(), ('key', 'value'))
105 self.assertEquals(len(d), 0)
106
107 dcopy = d.copy()
108 self.assertEquals(d.domains, dcopy.domains)
109 self.assertEquals(d.default, dcopy.default)
110
111
112 def _stringificationTest(self, stringifier):
113 """
114 Assert that the class name of a L{mail.mail.DomainWithDefaultDict}
115 instance and the string-formatted underlying domain dictionary both
116 appear in the string produced by the given string-returning function.
117
118 @type stringifier: one-argument callable
119 @param stringifier: either C{str} or C{repr}, to be used to get a
120 string to make assertions against.
121 """
122 domain = mail.mail.DomainWithDefaultDict({}, 'Default')
123 self.assertIn(domain.__class__.__name__, stringifier(domain))
124 domain['key'] = 'value'
125 self.assertIn(str({'key': 'value'}), stringifier(domain))
126
127
128 def test_str(self):
129 """
130 L{DomainWithDefaultDict.__str__} should return a string including
131 the class name and the domain mapping held by the instance.
132 """
133 self._stringificationTest(str)
134
135
136 def test_repr(self):
137 """
138 L{DomainWithDefaultDict.__repr__} should return a string including
139 the class name and the domain mapping held by the instance.
140 """
141 self._stringificationTest(repr)
142
143
144
145 class BounceTestCase(unittest.TestCase):
146 def setUp(self):
147 self.domain = mail.mail.BounceDomain()
148
149 def testExists(self):
150 self.assertRaises(smtp.AddressError, self.domain.exists, "any user")
151
152 def testRelay(self):
153 self.assertEquals(
154 self.domain.willRelay("random q emailer", "protocol"),
155 False
156 )
157
158 def testMessage(self):
159 self.assertRaises(NotImplementedError, self.domain.startMessage, "whomev er")
160
161 def testAddUser(self):
162 self.domain.addUser("bob", "password")
163 self.assertRaises(smtp.SMTPBadRcpt, self.domain.exists, "bob")
164
165 class FileMessageTestCase(unittest.TestCase):
166 def setUp(self):
167 self.name = "fileMessage.testFile"
168 self.final = "final.fileMessage.testFile"
169 self.f = file(self.name, 'w')
170 self.fp = mail.mail.FileMessage(self.f, self.name, self.final)
171
172 def tearDown(self):
173 try:
174 self.f.close()
175 except:
176 pass
177 try:
178 os.remove(self.name)
179 except:
180 pass
181 try:
182 os.remove(self.final)
183 except:
184 pass
185
186 def testFinalName(self):
187 return self.fp.eomReceived().addCallback(self._cbFinalName)
188
189 def _cbFinalName(self, result):
190 self.assertEquals(result, self.final)
191 self.failUnless(self.f.closed)
192 self.failIf(os.path.exists(self.name))
193
194 def testContents(self):
195 contents = "first line\nsecond line\nthird line\n"
196 for line in contents.splitlines():
197 self.fp.lineReceived(line)
198 self.fp.eomReceived()
199 self.assertEquals(file(self.final).read(), contents)
200
201 def testInterrupted(self):
202 contents = "first line\nsecond line\n"
203 for line in contents.splitlines():
204 self.fp.lineReceived(line)
205 self.fp.connectionLost()
206 self.failIf(os.path.exists(self.name))
207 self.failIf(os.path.exists(self.final))
208
209 class MailServiceTestCase(unittest.TestCase):
210 def setUp(self):
211 self.service = mail.mail.MailService()
212
213 def testFactories(self):
214 f = self.service.getPOP3Factory()
215 self.failUnless(isinstance(f, protocol.ServerFactory))
216 self.failUnless(f.buildProtocol(('127.0.0.1', 12345)), pop3.POP3)
217
218 f = self.service.getSMTPFactory()
219 self.failUnless(isinstance(f, protocol.ServerFactory))
220 self.failUnless(f.buildProtocol(('127.0.0.1', 12345)), smtp.SMTP)
221
222 f = self.service.getESMTPFactory()
223 self.failUnless(isinstance(f, protocol.ServerFactory))
224 self.failUnless(f.buildProtocol(('127.0.0.1', 12345)), smtp.ESMTP)
225
226 def testPortals(self):
227 o1 = object()
228 o2 = object()
229 self.service.portals['domain'] = o1
230 self.service.portals[''] = o2
231
232 self.failUnless(self.service.lookupPortal('domain') is o1)
233 self.failUnless(self.service.defaultPortal() is o2)
234
235 class FailingMaildirMailboxAppendMessageTask(mail.maildir._MaildirMailboxAppendM essageTask):
236 _openstate = True
237 _writestate = True
238 _renamestate = True
239 def osopen(self, fn, attr, mode):
240 if self._openstate:
241 return os.open(fn, attr, mode)
242 else:
243 raise OSError(errno.EPERM, "Faked Permission Problem")
244 def oswrite(self, fh, data):
245 if self._writestate:
246 return os.write(fh, data)
247 else:
248 raise OSError(errno.ENOSPC, "Faked Space problem")
249 def osrename(self, oldname, newname):
250 if self._renamestate:
251 return os.rename(oldname, newname)
252 else:
253 raise OSError(errno.EPERM, "Faked Permission Problem")
254
255 class MaildirAppendStringTestCase(unittest.TestCase):
256 def setUp(self):
257 self.d = self.mktemp()
258 mail.maildir.initializeMaildir(self.d)
259
260 def tearDown(self):
261 shutil.rmtree(self.d)
262
263 def _append(self, ignored, mbox):
264 d = mbox.appendMessage('TEST')
265 return self.assertFailure(d, Exception)
266
267 def _setState(self, ignored, mbox, rename=None, write=None, open=None):
268 if rename is not None:
269 mbox.AppendFactory._renameState = rename
270 if write is not None:
271 mbox.AppendFactory._writeState = write
272 if open is not None:
273 mbox.AppendFactory._openstate = open
274
275 def testAppend(self):
276 mbox = mail.maildir.MaildirMailbox(self.d)
277 mbox.AppendFactory = FailingMaildirMailboxAppendMessageTask
278 ds = []
279 for i in xrange(1, 11):
280 ds.append(mbox.appendMessage("X" * i))
281 ds[-1].addCallback(self.assertEqual, None)
282 d = defer.gatherResults(ds)
283 d.addCallback(self._cbTestAppend, mbox)
284 return d
285
286 def _cbTestAppend(self, result, mbox):
287 self.assertEquals(len(mbox.listMessages()),
288 10)
289 self.assertEquals(len(mbox.getMessage(5).read()), 6)
290 # test in the right order: last to first error location.
291 mbox.AppendFactory._renamestate = False
292 d = self._append(None, mbox)
293 d.addCallback(self._setState, mbox, rename=True, write=False)
294 d.addCallback(self._append, mbox)
295 d.addCallback(self._setState, mbox, write=True, open=False)
296 d.addCallback(self._append, mbox)
297 d.addCallback(self._setState, mbox, open=True)
298 return d
299
300
301 class MaildirAppendFileTestCase(unittest.TestCase):
302 def setUp(self):
303 self.d = self.mktemp()
304 mail.maildir.initializeMaildir(self.d)
305
306 def tearDown(self):
307 shutil.rmtree(self.d)
308
309 def testAppend(self):
310 mbox = mail.maildir.MaildirMailbox(self.d)
311 ds = []
312 def _check(res, t):
313 t.close()
314 self.assertEqual(res, None)
315 for i in xrange(1, 11):
316 temp = tempfile.TemporaryFile()
317 temp.write("X" * i)
318 temp.seek(0,0)
319 ds.append(mbox.appendMessage(temp))
320 ds[-1].addCallback(_check, temp)
321 return defer.gatherResults(ds).addCallback(self._cbTestAppend, mbox)
322
323 def _cbTestAppend(self, result, mbox):
324 self.assertEquals(len(mbox.listMessages()),
325 10)
326 self.assertEquals(len(mbox.getMessage(5).read()), 6)
327
328
329 class MaildirTestCase(unittest.TestCase):
330 def setUp(self):
331 self.d = self.mktemp()
332 mail.maildir.initializeMaildir(self.d)
333
334 def tearDown(self):
335 shutil.rmtree(self.d)
336
337 def testInitializer(self):
338 d = self.d
339 trash = os.path.join(d, '.Trash')
340
341 self.failUnless(os.path.exists(d) and os.path.isdir(d))
342 self.failUnless(os.path.exists(os.path.join(d, 'new')))
343 self.failUnless(os.path.exists(os.path.join(d, 'cur')))
344 self.failUnless(os.path.exists(os.path.join(d, 'tmp')))
345 self.failUnless(os.path.isdir(os.path.join(d, 'new')))
346 self.failUnless(os.path.isdir(os.path.join(d, 'cur')))
347 self.failUnless(os.path.isdir(os.path.join(d, 'tmp')))
348
349 self.failUnless(os.path.exists(os.path.join(trash, 'new')))
350 self.failUnless(os.path.exists(os.path.join(trash, 'cur')))
351 self.failUnless(os.path.exists(os.path.join(trash, 'tmp')))
352 self.failUnless(os.path.isdir(os.path.join(trash, 'new')))
353 self.failUnless(os.path.isdir(os.path.join(trash, 'cur')))
354 self.failUnless(os.path.isdir(os.path.join(trash, 'tmp')))
355
356 def testMailbox(self):
357 j = os.path.join
358 n = mail.maildir._generateMaildirName
359 msgs = [j(b, n()) for b in ('cur', 'new') for x in range(5)]
360
361 # Toss a few files into the mailbox
362 i = 1
363 for f in msgs:
364 f = file(j(self.d, f), 'w')
365 f.write('x' * i)
366 f.close()
367 i = i + 1
368
369 mb = mail.maildir.MaildirMailbox(self.d)
370 self.assertEquals(mb.listMessages(), range(1, 11))
371 self.assertEquals(mb.listMessages(1), 2)
372 self.assertEquals(mb.listMessages(5), 6)
373
374 self.assertEquals(mb.getMessage(6).read(), 'x' * 7)
375 self.assertEquals(mb.getMessage(1).read(), 'x' * 2)
376
377 d = {}
378 for i in range(10):
379 u = mb.getUidl(i)
380 self.failIf(u in d)
381 d[u] = None
382
383 p, f = os.path.split(msgs[5])
384
385 mb.deleteMessage(5)
386 self.assertEquals(mb.listMessages(5), 0)
387 self.failUnless(os.path.exists(j(self.d, '.Trash', 'cur', f)))
388 self.failIf(os.path.exists(j(self.d, msgs[5])))
389
390 mb.undeleteMessages()
391 self.assertEquals(mb.listMessages(5), 6)
392 self.failIf(os.path.exists(j(self.d, '.Trash', 'cur', f)))
393 self.failUnless(os.path.exists(j(self.d, msgs[5])))
394
395 class MaildirDirdbmDomainTestCase(unittest.TestCase):
396 def setUp(self):
397 self.P = self.mktemp()
398 self.S = mail.mail.MailService()
399 self.D = mail.maildir.MaildirDirdbmDomain(self.S, self.P)
400
401 def tearDown(self):
402 shutil.rmtree(self.P)
403
404 def testAddUser(self):
405 toAdd = (('user1', 'pwd1'), ('user2', 'pwd2'), ('user3', 'pwd3'))
406 for (u, p) in toAdd:
407 self.D.addUser(u, p)
408
409 for (u, p) in toAdd:
410 self.failUnless(u in self.D.dbm)
411 self.assertEquals(self.D.dbm[u], p)
412 self.failUnless(os.path.exists(os.path.join(self.P, u)))
413
414 def testCredentials(self):
415 creds = self.D.getCredentialsCheckers()
416
417 self.assertEquals(len(creds), 1)
418 self.failUnless(cred.checkers.ICredentialsChecker.providedBy(creds[0]))
419 self.failUnless(cred.credentials.IUsernamePassword in creds[0].credentia lInterfaces)
420
421 def testRequestAvatar(self):
422 class ISomething(Interface):
423 pass
424
425 self.D.addUser('user', 'password')
426 self.assertRaises(
427 NotImplementedError,
428 self.D.requestAvatar, 'user', None, ISomething
429 )
430
431 t = self.D.requestAvatar('user', None, pop3.IMailbox)
432 self.assertEquals(len(t), 3)
433 self.failUnless(t[0] is pop3.IMailbox)
434 self.failUnless(pop3.IMailbox.providedBy(t[1]))
435
436 t[2]()
437
438 def testRequestAvatarId(self):
439 self.D.addUser('user', 'password')
440 database = self.D.getCredentialsCheckers()[0]
441
442 creds = cred.credentials.UsernamePassword('user', 'wrong password')
443 self.assertRaises(
444 cred.error.UnauthorizedLogin,
445 database.requestAvatarId, creds
446 )
447
448 creds = cred.credentials.UsernamePassword('user', 'password')
449 self.assertEquals(database.requestAvatarId(creds), 'user')
450
451
452 class StubAliasableDomain(object):
453 """
454 Minimal testable implementation of IAliasableDomain.
455 """
456 implements(mail.mail.IAliasableDomain)
457
458 def exists(self, user):
459 """
460 No test coverage for invocations of this method on domain objects,
461 so we just won't implement it.
462 """
463 raise NotImplementedError()
464
465
466 def addUser(self, user, password):
467 """
468 No test coverage for invocations of this method on domain objects,
469 so we just won't implement it.
470 """
471 raise NotImplementedError()
472
473
474 def getCredentialsCheckers(self):
475 """
476 This needs to succeed in order for other tests to complete
477 successfully, but we don't actually assert anything about its
478 behavior. Return an empty list. Sometime later we should return
479 something else and assert that a portal got set up properly.
480 """
481 return []
482
483
484 def setAliasGroup(self, aliases):
485 """
486 Just record the value so the test can check it later.
487 """
488 self.aliasGroup = aliases
489
490
491 class ServiceDomainTestCase(unittest.TestCase):
492 def setUp(self):
493 self.S = mail.mail.MailService()
494 self.D = mail.protocols.DomainDeliveryBase(self.S, None)
495 self.D.service = self.S
496 self.D.protocolName = 'TEST'
497 self.D.host = 'hostname'
498
499 self.tmpdir = self.mktemp()
500 domain = mail.maildir.MaildirDirdbmDomain(self.S, self.tmpdir)
501 domain.addUser('user', 'password')
502 self.S.addDomain('test.domain', domain)
503
504 def tearDown(self):
505 shutil.rmtree(self.tmpdir)
506
507
508 def testAddAliasableDomain(self):
509 """
510 Test that adding an IAliasableDomain to a mail service properly sets
511 up alias group references and such.
512 """
513 aliases = object()
514 domain = StubAliasableDomain()
515 self.S.aliases = aliases
516 self.S.addDomain('example.com', domain)
517 self.assertIdentical(domain.aliasGroup, aliases)
518
519
520 def testReceivedHeader(self):
521 hdr = self.D.receivedHeader(
522 ('remotehost', '123.232.101.234'),
523 smtp.Address('<someguy@somplace>'),
524 ['user@host.name']
525 )
526 fp = StringIO.StringIO(hdr)
527 m = rfc822.Message(fp)
528 self.assertEquals(len(m.items()), 1)
529 self.failUnless(m.has_key('Received'))
530
531 def testValidateTo(self):
532 user = smtp.User('user@test.domain', 'helo', None, 'wherever@whatever')
533 return defer.maybeDeferred(self.D.validateTo, user
534 ).addCallback(self._cbValidateTo
535 )
536
537 def _cbValidateTo(self, result):
538 self.failUnless(callable(result))
539
540 def testValidateToBadUsername(self):
541 user = smtp.User('resu@test.domain', 'helo', None, 'wherever@whatever')
542 return self.assertFailure(
543 defer.maybeDeferred(self.D.validateTo, user),
544 smtp.SMTPBadRcpt)
545
546 def testValidateToBadDomain(self):
547 user = smtp.User('user@domain.test', 'helo', None, 'wherever@whatever')
548 return self.assertFailure(
549 defer.maybeDeferred(self.D.validateTo, user),
550 smtp.SMTPBadRcpt)
551
552 def testValidateFrom(self):
553 helo = ('hostname', '127.0.0.1')
554 origin = smtp.Address('<user@hostname>')
555 self.failUnless(self.D.validateFrom(helo, origin) is origin)
556
557 helo = ('hostname', '1.2.3.4')
558 origin = smtp.Address('<user@hostname>')
559 self.failUnless(self.D.validateFrom(helo, origin) is origin)
560
561 helo = ('hostname', '1.2.3.4')
562 origin = smtp.Address('<>')
563 self.failUnless(self.D.validateFrom(helo, origin) is origin)
564
565 self.assertRaises(
566 smtp.SMTPBadSender,
567 self.D.validateFrom, None, origin
568 )
569
570 class VirtualPOP3TestCase(unittest.TestCase):
571 def setUp(self):
572 self.tmpdir = self.mktemp()
573 self.S = mail.mail.MailService()
574 self.D = mail.maildir.MaildirDirdbmDomain(self.S, self.tmpdir)
575 self.D.addUser('user', 'password')
576 self.S.addDomain('test.domain', self.D)
577
578 portal = cred.portal.Portal(self.D)
579 map(portal.registerChecker, self.D.getCredentialsCheckers())
580 self.S.portals[''] = self.S.portals['test.domain'] = portal
581
582 self.P = mail.protocols.VirtualPOP3()
583 self.P.service = self.S
584 self.P.magic = '<unit test magic>'
585
586 def tearDown(self):
587 shutil.rmtree(self.tmpdir)
588
589 def testAuthenticateAPOP(self):
590 resp = md5.new(self.P.magic + 'password').hexdigest()
591 return self.P.authenticateUserAPOP('user', resp
592 ).addCallback(self._cbAuthenticateAPOP
593 )
594
595 def _cbAuthenticateAPOP(self, result):
596 self.assertEquals(len(result), 3)
597 self.assertEquals(result[0], pop3.IMailbox)
598 self.failUnless(pop3.IMailbox.providedBy(result[1]))
599 result[2]()
600
601 def testAuthenticateIncorrectUserAPOP(self):
602 resp = md5.new(self.P.magic + 'password').hexdigest()
603 return self.assertFailure(
604 self.P.authenticateUserAPOP('resu', resp),
605 cred.error.UnauthorizedLogin)
606
607 def testAuthenticateIncorrectResponseAPOP(self):
608 resp = md5.new('wrong digest').hexdigest()
609 return self.assertFailure(
610 self.P.authenticateUserAPOP('user', resp),
611 cred.error.UnauthorizedLogin)
612
613 def testAuthenticatePASS(self):
614 return self.P.authenticateUserPASS('user', 'password'
615 ).addCallback(self._cbAuthenticatePASS
616 )
617
618 def _cbAuthenticatePASS(self, result):
619 self.assertEquals(len(result), 3)
620 self.assertEquals(result[0], pop3.IMailbox)
621 self.failUnless(pop3.IMailbox.providedBy(result[1]))
622 result[2]()
623
624 def testAuthenticateBadUserPASS(self):
625 return self.assertFailure(
626 self.P.authenticateUserPASS('resu', 'password'),
627 cred.error.UnauthorizedLogin)
628
629 def testAuthenticateBadPasswordPASS(self):
630 return self.assertFailure(
631 self.P.authenticateUserPASS('user', 'wrong password'),
632 cred.error.UnauthorizedLogin)
633
634 class empty(smtp.User):
635 def __init__(self):
636 pass
637
638 class RelayTestCase(unittest.TestCase):
639 def testExists(self):
640 service = mail.mail.MailService()
641 domain = mail.relay.DomainQueuer(service)
642
643 doRelay = [
644 address.UNIXAddress('/var/run/mail-relay'),
645 address.IPv4Address('TCP', '127.0.0.1', 12345),
646 ]
647
648 dontRelay = [
649 address.IPv4Address('TCP', '192.168.2.1', 62),
650 address.IPv4Address('TCP', '1.2.3.4', 1943),
651 ]
652
653 for peer in doRelay:
654 user = empty()
655 user.orig = 'user@host'
656 user.dest = 'tsoh@resu'
657 user.protocol = empty()
658 user.protocol.transport = empty()
659 user.protocol.transport.getPeer = lambda: peer
660
661 self.failUnless(callable(domain.exists(user)))
662
663 for peer in dontRelay:
664 user = empty()
665 user.orig = 'some@place'
666 user.protocol = empty()
667 user.protocol.transport = empty()
668 user.protocol.transport.getPeer = lambda: peer
669 user.dest = 'who@cares'
670
671 self.assertRaises(smtp.SMTPBadRcpt, domain.exists, user)
672
673 class RelayerTestCase(unittest.TestCase):
674 def setUp(self):
675 self.tmpdir = self.mktemp()
676 os.mkdir(self.tmpdir)
677 self.messageFiles = []
678 for i in range(10):
679 name = os.path.join(self.tmpdir, 'body-%d' % (i,))
680 f = file(name + '-H', 'w')
681 pickle.dump(['from-%d' % (i,), 'to-%d' % (i,)], f)
682 f.close()
683
684 f = file(name + '-D', 'w')
685 f.write(name)
686 f.seek(0, 0)
687 self.messageFiles.append(name)
688
689 self.R = mail.relay.RelayerMixin()
690 self.R.loadMessages(self.messageFiles)
691
692 def tearDown(self):
693 shutil.rmtree(self.tmpdir)
694
695 def testMailFrom(self):
696 for i in range(10):
697 self.assertEquals(self.R.getMailFrom(), 'from-%d' % (i,))
698 self.R.sentMail(250, None, None, None, None)
699 self.assertEquals(self.R.getMailFrom(), None)
700
701 def testMailTo(self):
702 for i in range(10):
703 self.assertEquals(self.R.getMailTo(), ['to-%d' % (i,)])
704 self.R.sentMail(250, None, None, None, None)
705 self.assertEquals(self.R.getMailTo(), None)
706
707 def testMailData(self):
708 for i in range(10):
709 name = os.path.join(self.tmpdir, 'body-%d' % (i,))
710 self.assertEquals(self.R.getMailData().read(), name)
711 self.R.sentMail(250, None, None, None, None)
712 self.assertEquals(self.R.getMailData(), None)
713
714 class Manager:
715 def __init__(self):
716 self.success = []
717 self.failure = []
718 self.done = []
719
720 def notifySuccess(self, factory, message):
721 self.success.append((factory, message))
722
723 def notifyFailure(self, factory, message):
724 self.failure.append((factory, message))
725
726 def notifyDone(self, factory):
727 self.done.append(factory)
728
729 class ManagedRelayerTestCase(unittest.TestCase):
730 def setUp(self):
731 self.manager = Manager()
732 self.messages = range(0, 20, 2)
733 self.factory = object()
734 self.relay = mail.relaymanager.ManagedRelayerMixin(self.manager)
735 self.relay.messages = self.messages[:]
736 self.relay.names = self.messages[:]
737 self.relay.factory = self.factory
738
739 def testSuccessfulSentMail(self):
740 for i in self.messages:
741 self.relay.sentMail(250, None, None, None, None)
742
743 self.assertEquals(
744 self.manager.success,
745 [(self.factory, m) for m in self.messages]
746 )
747
748 def testFailedSentMail(self):
749 for i in self.messages:
750 self.relay.sentMail(550, None, None, None, None)
751
752 self.assertEquals(
753 self.manager.failure,
754 [(self.factory, m) for m in self.messages]
755 )
756
757 def testConnectionLost(self):
758 self.relay.connectionLost(failure.Failure(Exception()))
759 self.assertEquals(self.manager.done, [self.factory])
760
761 class DirectoryQueueTestCase(unittest.TestCase):
762 def setUp(self):
763 # This is almost a test case itself.
764 self.tmpdir = self.mktemp()
765 os.mkdir(self.tmpdir)
766 self.queue = mail.relaymanager.Queue(self.tmpdir)
767 self.queue.noisy = False
768 for m in range(25):
769 hdrF, msgF = self.queue.createNewMessage()
770 pickle.dump(['header', m], hdrF)
771 hdrF.close()
772 msgF.lineReceived('body: %d' % (m,))
773 msgF.eomReceived()
774 self.queue.readDirectory()
775
776 def tearDown(self):
777 shutil.rmtree(self.tmpdir)
778
779 def testWaiting(self):
780 self.failUnless(self.queue.hasWaiting())
781 self.assertEquals(len(self.queue.getWaiting()), 25)
782
783 waiting = self.queue.getWaiting()
784 self.queue.setRelaying(waiting[0])
785 self.assertEquals(len(self.queue.getWaiting()), 24)
786
787 self.queue.setWaiting(waiting[0])
788 self.assertEquals(len(self.queue.getWaiting()), 25)
789
790 def testRelaying(self):
791 for m in self.queue.getWaiting():
792 self.queue.setRelaying(m)
793 self.assertEquals(
794 len(self.queue.getRelayed()),
795 25 - len(self.queue.getWaiting())
796 )
797
798 self.failIf(self.queue.hasWaiting())
799
800 relayed = self.queue.getRelayed()
801 self.queue.setWaiting(relayed[0])
802 self.assertEquals(len(self.queue.getWaiting()), 1)
803 self.assertEquals(len(self.queue.getRelayed()), 24)
804
805 def testDone(self):
806 msg = self.queue.getWaiting()[0]
807 self.queue.setRelaying(msg)
808 self.queue.done(msg)
809
810 self.assertEquals(len(self.queue.getWaiting()), 24)
811 self.assertEquals(len(self.queue.getRelayed()), 0)
812
813 self.failIf(msg in self.queue.getWaiting())
814 self.failIf(msg in self.queue.getRelayed())
815
816 def testEnvelope(self):
817 envelopes = []
818
819 for msg in self.queue.getWaiting():
820 envelopes.append(self.queue.getEnvelope(msg))
821
822 envelopes.sort()
823 for i in range(25):
824 self.assertEquals(
825 envelopes.pop(0),
826 ['header', i]
827 )
828
829 from twisted.names import server
830 from twisted.names import client
831 from twisted.names import common
832
833 class TestAuthority(common.ResolverBase):
834 def __init__(self):
835 common.ResolverBase.__init__(self)
836 self.addresses = {}
837
838 def _lookup(self, name, cls, type, timeout = None):
839 if name in self.addresses and type == dns.MX:
840 results = []
841 for a in self.addresses[name]:
842 hdr = dns.RRHeader(
843 name, dns.MX, dns.IN, 60, dns.Record_MX(0, a)
844 )
845 results.append(hdr)
846 return defer.succeed((results, [], []))
847 return defer.fail(failure.Failure(dns.DomainError(name)))
848
849 def setUpDNS(self):
850 self.auth = TestAuthority()
851 factory = server.DNSServerFactory([self.auth])
852 protocol = dns.DNSDatagramProtocol(factory)
853 while 1:
854 self.port = reactor.listenTCP(0, factory, interface='127.0.0.1')
855 portNumber = self.port.getHost().port
856
857 try:
858 self.udpPort = reactor.listenUDP(portNumber, protocol, interface='12 7.0.0.1')
859 except CannotListenError:
860 self.port.stopListening()
861 else:
862 break
863 self.resolver = client.Resolver(servers=[('127.0.0.1', portNumber)])
864
865
866 def tearDownDNS(self):
867 dl = []
868 dl.append(defer.maybeDeferred(self.port.stopListening))
869 dl.append(defer.maybeDeferred(self.udpPort.stopListening))
870 if self.resolver.protocol.transport is not None:
871 dl.append(defer.maybeDeferred(self.resolver.protocol.transport.stopListe ning))
872 try:
873 self.resolver._parseCall.cancel()
874 except:
875 pass
876 return defer.DeferredList(dl)
877
878 class MXTestCase(unittest.TestCase):
879 """
880 Tests for L{mail.relaymanager.MXCalculator}.
881 """
882 def setUp(self):
883 setUpDNS(self)
884 self.clock = task.Clock()
885 self.mx = mail.relaymanager.MXCalculator(self.resolver, self.clock)
886
887 def tearDown(self):
888 return tearDownDNS(self)
889
890
891 def test_defaultClock(self):
892 """
893 L{MXCalculator}'s default clock is C{twisted.internet.reactor}.
894 """
895 self.assertIdentical(
896 mail.relaymanager.MXCalculator(self.resolver).clock,
897 reactor)
898
899
900 def testSimpleSuccess(self):
901 self.auth.addresses['test.domain'] = ['the.email.test.domain']
902 return self.mx.getMX('test.domain').addCallback(self._cbSimpleSuccess)
903
904 def _cbSimpleSuccess(self, mx):
905 self.assertEquals(mx.preference, 0)
906 self.assertEquals(str(mx.name), 'the.email.test.domain')
907
908 def testSimpleFailure(self):
909 self.mx.fallbackToDomain = False
910 return self.assertFailure(self.mx.getMX('test.domain'), IOError)
911
912 def testSimpleFailureWithFallback(self):
913 return self.assertFailure(self.mx.getMX('test.domain'), DNSLookupError)
914
915
916 def _exchangeTest(self, domain, records, correctMailExchange):
917 """
918 Issue an MX request for the given domain and arrange for it to be
919 responded to with the given records. Verify that the resulting mail
920 exchange is the indicated host.
921
922 @type domain: C{str}
923 @type records: C{list} of L{RRHeader}
924 @type correctMailExchange: C{str}
925 @rtype: L{Deferred}
926 """
927 class DummyResolver(object):
928 def lookupMailExchange(self, name):
929 if name == domain:
930 return defer.succeed((
931 records,
932 [],
933 []))
934 return defer.fail(DNSNameError(domain))
935
936 self.mx.resolver = DummyResolver()
937 d = self.mx.getMX(domain)
938 def gotMailExchange(record):
939 self.assertEqual(str(record.name), correctMailExchange)
940 d.addCallback(gotMailExchange)
941 return d
942
943
944 def test_mailExchangePreference(self):
945 """
946 The MX record with the lowest preference is returned by
947 L{MXCalculator.getMX}.
948 """
949 domain = "example.com"
950 good = "good.example.com"
951 bad = "bad.example.com"
952
953 records = [
954 RRHeader(name=domain,
955 type=Record_MX.TYPE,
956 payload=Record_MX(1, bad)),
957 RRHeader(name=domain,
958 type=Record_MX.TYPE,
959 payload=Record_MX(0, good)),
960 RRHeader(name=domain,
961 type=Record_MX.TYPE,
962 payload=Record_MX(2, bad))]
963 return self._exchangeTest(domain, records, good)
964
965
966 def test_badExchangeExcluded(self):
967 """
968 L{MXCalculator.getMX} returns the MX record with the lowest preference
969 which is not also marked as bad.
970 """
971 domain = "example.com"
972 good = "good.example.com"
973 bad = "bad.example.com"
974
975 records = [
976 RRHeader(name=domain,
977 type=Record_MX.TYPE,
978 payload=Record_MX(0, bad)),
979 RRHeader(name=domain,
980 type=Record_MX.TYPE,
981 payload=Record_MX(1, good))]
982 self.mx.markBad(bad)
983 return self._exchangeTest(domain, records, good)
984
985
986 def test_fallbackForAllBadExchanges(self):
987 """
988 L{MXCalculator.getMX} returns the MX record with the lowest preference
989 if all the MX records in the response have been marked bad.
990 """
991 domain = "example.com"
992 bad = "bad.example.com"
993 worse = "worse.example.com"
994
995 records = [
996 RRHeader(name=domain,
997 type=Record_MX.TYPE,
998 payload=Record_MX(0, bad)),
999 RRHeader(name=domain,
1000 type=Record_MX.TYPE,
1001 payload=Record_MX(1, worse))]
1002 self.mx.markBad(bad)
1003 self.mx.markBad(worse)
1004 return self._exchangeTest(domain, records, bad)
1005
1006
1007 def test_badExchangeExpires(self):
1008 """
1009 L{MXCalculator.getMX} returns the MX record with the lowest preference
1010 if it was last marked bad longer than L{MXCalculator.timeOutBadMX}
1011 seconds ago.
1012 """
1013 domain = "example.com"
1014 good = "good.example.com"
1015 previouslyBad = "bad.example.com"
1016
1017 records = [
1018 RRHeader(name=domain,
1019 type=Record_MX.TYPE,
1020 payload=Record_MX(0, previouslyBad)),
1021 RRHeader(name=domain,
1022 type=Record_MX.TYPE,
1023 payload=Record_MX(1, good))]
1024 self.mx.markBad(previouslyBad)
1025 self.clock.advance(self.mx.timeOutBadMX)
1026 return self._exchangeTest(domain, records, previouslyBad)
1027
1028
1029 def test_goodExchangeUsed(self):
1030 """
1031 L{MXCalculator.getMX} returns the MX record with the lowest preference
1032 if it was marked good after it was marked bad.
1033 """
1034 domain = "example.com"
1035 good = "good.example.com"
1036 previouslyBad = "bad.example.com"
1037
1038 records = [
1039 RRHeader(name=domain,
1040 type=Record_MX.TYPE,
1041 payload=Record_MX(0, previouslyBad)),
1042 RRHeader(name=domain,
1043 type=Record_MX.TYPE,
1044 payload=Record_MX(1, good))]
1045 self.mx.markBad(previouslyBad)
1046 self.mx.markGood(previouslyBad)
1047 self.clock.advance(self.mx.timeOutBadMX)
1048 return self._exchangeTest(domain, records, previouslyBad)
1049
1050
1051 def test_successWithoutResults(self):
1052 """
1053 If an MX lookup succeeds but the result set is empty,
1054 L{MXCalculator.getMX} should try to look up an I{A} record for the
1055 requested name and call back its returned Deferred with that
1056 address.
1057 """
1058 ip = '1.2.3.4'
1059 domain = 'example.org'
1060
1061 class DummyResolver(object):
1062 """
1063 Fake resolver which will respond to an MX lookup with an empty
1064 result set.
1065
1066 @ivar mx: A dictionary mapping hostnames to three-tuples of
1067 results to be returned from I{MX} lookups.
1068
1069 @ivar a: A dictionary mapping hostnames to addresses to be
1070 returned from I{A} lookups.
1071 """
1072 mx = {domain: ([], [], [])}
1073 a = {domain: ip}
1074
1075 def lookupMailExchange(self, domain):
1076 return defer.succeed(self.mx[domain])
1077
1078 def getHostByName(self, domain):
1079 return defer.succeed(self.a[domain])
1080
1081 self.mx.resolver = DummyResolver()
1082 d = self.mx.getMX(domain)
1083 d.addCallback(self.assertEqual, Record_MX(name=ip))
1084 return d
1085
1086
1087 def test_failureWithSuccessfulFallback(self):
1088 """
1089 Test that if the MX record lookup fails, fallback is enabled, and an A
1090 record is available for the name, then the Deferred returned by
1091 L{MXCalculator.getMX} ultimately fires with a Record_MX instance which
1092 gives the address in the A record for the name.
1093 """
1094 class DummyResolver(object):
1095 """
1096 Fake resolver which will fail an MX lookup but then succeed a
1097 getHostByName call.
1098 """
1099 def lookupMailExchange(self, domain):
1100 return defer.fail(DNSNameError())
1101
1102 def getHostByName(self, domain):
1103 return defer.succeed("1.2.3.4")
1104
1105 self.mx.resolver = DummyResolver()
1106 d = self.mx.getMX("domain")
1107 d.addCallback(self.assertEqual, Record_MX(name="1.2.3.4"))
1108 return d
1109
1110
1111 def test_cnameWithoutGlueRecords(self):
1112 """
1113 If an MX lookup returns a single CNAME record as a result, MXCalculator
1114 will perform an MX lookup for the canonical name indicated and return
1115 the MX record which results.
1116 """
1117 alias = "alias.example.com"
1118 canonical = "canonical.example.com"
1119 exchange = "mail.example.com"
1120
1121 class DummyResolver(object):
1122 """
1123 Fake resolver which will return a CNAME for an MX lookup of a name
1124 which is an alias and an MX for an MX lookup of the canonical name.
1125 """
1126 def lookupMailExchange(self, domain):
1127 if domain == alias:
1128 return defer.succeed((
1129 [RRHeader(name=domain,
1130 type=Record_CNAME.TYPE,
1131 payload=Record_CNAME(canonical))],
1132 [], []))
1133 elif domain == canonical:
1134 return defer.succeed((
1135 [RRHeader(name=domain,
1136 type=Record_MX.TYPE,
1137 payload=Record_MX(0, exchange))],
1138 [], []))
1139 else:
1140 return defer.fail(DNSNameError(domain))
1141
1142 self.mx.resolver = DummyResolver()
1143 d = self.mx.getMX(alias)
1144 d.addCallback(self.assertEqual, Record_MX(name=exchange))
1145 return d
1146
1147
1148 def test_cnameChain(self):
1149 """
1150 If L{MXCalculator.getMX} encounters a CNAME chain which is longer than
1151 the length specified, the returned L{Deferred} should errback with
1152 L{CanonicalNameChainTooLong}.
1153 """
1154 class DummyResolver(object):
1155 """
1156 Fake resolver which generates a CNAME chain of infinite length in
1157 response to MX lookups.
1158 """
1159 chainCounter = 0
1160
1161 def lookupMailExchange(self, domain):
1162 self.chainCounter += 1
1163 name = 'x-%d.example.com' % (self.chainCounter,)
1164 return defer.succeed((
1165 [RRHeader(name=domain,
1166 type=Record_CNAME.TYPE,
1167 payload=Record_CNAME(name))],
1168 [], []))
1169
1170 cnameLimit = 3
1171 self.mx.resolver = DummyResolver()
1172 d = self.mx.getMX("mail.example.com", cnameLimit)
1173 self.assertFailure(
1174 d, twisted.mail.relaymanager.CanonicalNameChainTooLong)
1175 def cbChainTooLong(error):
1176 self.assertEqual(error.args[0], Record_CNAME("x-%d.example.com" % (c nameLimit + 1,)))
1177 self.assertEqual(self.mx.resolver.chainCounter, cnameLimit + 1)
1178 d.addCallback(cbChainTooLong)
1179 return d
1180
1181
1182 def test_cnameWithGlueRecords(self):
1183 """
1184 If an MX lookup returns a CNAME and the MX record for the CNAME, the
1185 L{Deferred} returned by L{MXCalculator.getMX} should be called back
1186 with the name from the MX record without further lookups being
1187 attempted.
1188 """
1189 lookedUp = []
1190 alias = "alias.example.com"
1191 canonical = "canonical.example.com"
1192 exchange = "mail.example.com"
1193
1194 class DummyResolver(object):
1195 def lookupMailExchange(self, domain):
1196 if domain != alias or lookedUp:
1197 # Don't give back any results for anything except the alias
1198 # or on any request after the first.
1199 return ([], [], [])
1200 return defer.succeed((
1201 [RRHeader(name=alias,
1202 type=Record_CNAME.TYPE,
1203 payload=Record_CNAME(canonical)),
1204 RRHeader(name=canonical,
1205 type=Record_MX.TYPE,
1206 payload=Record_MX(name=exchange))],
1207 [], []))
1208
1209 self.mx.resolver = DummyResolver()
1210 d = self.mx.getMX(alias)
1211 d.addCallback(self.assertEqual, Record_MX(name=exchange))
1212 return d
1213
1214
1215 def test_cnameLoopWithGlueRecords(self):
1216 """
1217 If an MX lookup returns two CNAME records which point to each other,
1218 the loop should be detected and the L{Deferred} returned by
1219 L{MXCalculator.getMX} should be errbacked with L{CanonicalNameLoop}.
1220 """
1221 firstAlias = "cname1.example.com"
1222 secondAlias = "cname2.example.com"
1223
1224 class DummyResolver(object):
1225 def lookupMailExchange(self, domain):
1226 return defer.succeed((
1227 [RRHeader(name=firstAlias,
1228 type=Record_CNAME.TYPE,
1229 payload=Record_CNAME(secondAlias)),
1230 RRHeader(name=secondAlias,
1231 type=Record_CNAME.TYPE,
1232 payload=Record_CNAME(firstAlias))],
1233 [], []))
1234
1235 self.mx.resolver = DummyResolver()
1236 d = self.mx.getMX(firstAlias)
1237 self.assertFailure(d, twisted.mail.relaymanager.CanonicalNameLoop)
1238 return d
1239
1240
1241 def testManyRecords(self):
1242 self.auth.addresses['test.domain'] = [
1243 'mx1.test.domain', 'mx2.test.domain', 'mx3.test.domain'
1244 ]
1245 return self.mx.getMX('test.domain'
1246 ).addCallback(self._cbManyRecordsSuccessfulLookup
1247 )
1248
1249 def _cbManyRecordsSuccessfulLookup(self, mx):
1250 self.failUnless(str(mx.name).split('.', 1)[0] in ('mx1', 'mx2', 'mx3'))
1251 self.mx.markBad(str(mx.name))
1252 return self.mx.getMX('test.domain'
1253 ).addCallback(self._cbManyRecordsDifferentResult, mx
1254 )
1255
1256 def _cbManyRecordsDifferentResult(self, nextMX, mx):
1257 self.assertNotEqual(str(mx.name), str(nextMX.name))
1258 self.mx.markBad(str(nextMX.name))
1259
1260 return self.mx.getMX('test.domain'
1261 ).addCallback(self._cbManyRecordsLastResult, mx, nextMX
1262 )
1263
1264 def _cbManyRecordsLastResult(self, lastMX, mx, nextMX):
1265 self.assertNotEqual(str(mx.name), str(lastMX.name))
1266 self.assertNotEqual(str(nextMX.name), str(lastMX.name))
1267
1268 self.mx.markBad(str(lastMX.name))
1269 self.mx.markGood(str(nextMX.name))
1270
1271 return self.mx.getMX('test.domain'
1272 ).addCallback(self._cbManyRecordsRepeatSpecificResult, nextMX
1273 )
1274
1275 def _cbManyRecordsRepeatSpecificResult(self, againMX, nextMX):
1276 self.assertEqual(str(againMX.name), str(nextMX.name))
1277
1278 class LiveFireExercise(unittest.TestCase):
1279 if interfaces.IReactorUDP(reactor, None) is None:
1280 skip = "UDP support is required to determining MX records"
1281
1282 def setUp(self):
1283 setUpDNS(self)
1284 self.tmpdirs = [
1285 'domainDir', 'insertionDomain', 'insertionQueue',
1286 'destinationDomain', 'destinationQueue'
1287 ]
1288
1289 def tearDown(self):
1290 for d in self.tmpdirs:
1291 if os.path.exists(d):
1292 shutil.rmtree(d)
1293 return tearDownDNS(self)
1294
1295 def testLocalDelivery(self):
1296 service = mail.mail.MailService()
1297 service.smtpPortal.registerChecker(cred.checkers.AllowAnonymousAccess())
1298 domain = mail.maildir.MaildirDirdbmDomain(service, 'domainDir')
1299 domain.addUser('user', 'password')
1300 service.addDomain('test.domain', domain)
1301 service.portals[''] = service.portals['test.domain']
1302 map(service.portals[''].registerChecker, domain.getCredentialsCheckers() )
1303
1304 service.setQueue(mail.relay.DomainQueuer(service))
1305 manager = mail.relaymanager.SmartHostSMTPRelayingManager(service.queue, None)
1306 helper = mail.relaymanager.RelayStateHelper(manager, 1)
1307
1308 f = service.getSMTPFactory()
1309
1310 self.smtpServer = reactor.listenTCP(0, f, interface='127.0.0.1')
1311
1312 client = LineSendingProtocol([
1313 'HELO meson',
1314 'MAIL FROM: <user@hostname>',
1315 'RCPT TO: <user@test.domain>',
1316 'DATA',
1317 'This is the message',
1318 '.',
1319 'QUIT'
1320 ])
1321
1322 done = Deferred()
1323 f = protocol.ClientFactory()
1324 f.protocol = lambda: client
1325 f.clientConnectionLost = lambda *args: done.callback(None)
1326 reactor.connectTCP('127.0.0.1', self.smtpServer.getHost().port, f)
1327
1328 def finished(ign):
1329 mbox = domain.requestAvatar('user', None, pop3.IMailbox)[1]
1330 msg = mbox.getMessage(0).read()
1331 self.failIfEqual(msg.find('This is the message'), -1)
1332
1333 return self.smtpServer.stopListening()
1334 done.addCallback(finished)
1335 return done
1336
1337
1338 def testRelayDelivery(self):
1339 # Here is the service we will connect to and send mail from
1340 insServ = mail.mail.MailService()
1341 insServ.smtpPortal.registerChecker(cred.checkers.AllowAnonymousAccess())
1342 domain = mail.maildir.MaildirDirdbmDomain(insServ, 'insertionDomain')
1343 insServ.addDomain('insertion.domain', domain)
1344 os.mkdir('insertionQueue')
1345 insServ.setQueue(mail.relaymanager.Queue('insertionQueue'))
1346 insServ.domains.setDefaultDomain(mail.relay.DomainQueuer(insServ))
1347 manager = mail.relaymanager.SmartHostSMTPRelayingManager(insServ.queue)
1348 manager.fArgs += ('test.identity.hostname',)
1349 helper = mail.relaymanager.RelayStateHelper(manager, 1)
1350 # Yoink! Now the internet obeys OUR every whim!
1351 manager.mxcalc = mail.relaymanager.MXCalculator(self.resolver)
1352 # And this is our whim.
1353 self.auth.addresses['destination.domain'] = ['127.0.0.1']
1354
1355 f = insServ.getSMTPFactory()
1356 self.insServer = reactor.listenTCP(0, f, interface='127.0.0.1')
1357
1358 # Here is the service the previous one will connect to for final
1359 # delivery
1360 destServ = mail.mail.MailService()
1361 destServ.smtpPortal.registerChecker(cred.checkers.AllowAnonymousAccess() )
1362 domain = mail.maildir.MaildirDirdbmDomain(destServ, 'destinationDomain')
1363 domain.addUser('user', 'password')
1364 destServ.addDomain('destination.domain', domain)
1365 os.mkdir('destinationQueue')
1366 destServ.setQueue(mail.relaymanager.Queue('destinationQueue'))
1367 manager2 = mail.relaymanager.SmartHostSMTPRelayingManager(destServ.queue )
1368 helper = mail.relaymanager.RelayStateHelper(manager, 1)
1369 helper.startService()
1370
1371 f = destServ.getSMTPFactory()
1372 self.destServer = reactor.listenTCP(0, f, interface='127.0.0.1')
1373
1374 # Update the port number the *first* relay will connect to, because we c an't use
1375 # port 25
1376 manager.PORT = self.destServer.getHost().port
1377
1378 client = LineSendingProtocol([
1379 'HELO meson',
1380 'MAIL FROM: <user@wherever>',
1381 'RCPT TO: <user@destination.domain>',
1382 'DATA',
1383 'This is the message',
1384 '.',
1385 'QUIT'
1386 ])
1387
1388 done = Deferred()
1389 f = protocol.ClientFactory()
1390 f.protocol = lambda: client
1391 f.clientConnectionLost = lambda *args: done.callback(None)
1392 reactor.connectTCP('127.0.0.1', self.insServer.getHost().port, f)
1393
1394 def finished(ign):
1395 # First part of the delivery is done. Poke the queue manually now
1396 # so we don't have to wait for the queue to be flushed.
1397 delivery = manager.checkState()
1398 def delivered(ign):
1399 mbox = domain.requestAvatar('user', None, pop3.IMailbox)[1]
1400 msg = mbox.getMessage(0).read()
1401 self.failIfEqual(msg.find('This is the message'), -1)
1402
1403 self.insServer.stopListening()
1404 self.destServer.stopListening()
1405 helper.stopService()
1406 delivery.addCallback(delivered)
1407 return delivery
1408 done.addCallback(finished)
1409 return done
1410
1411
1412 aliasFile = StringIO.StringIO("""\
1413 # Here's a comment
1414 # woop another one
1415 testuser: address1,address2, address3,
1416 continuation@address, |/bin/process/this
1417
1418 usertwo:thisaddress,thataddress, lastaddress
1419 lastuser: :/includable, /filename, |/program, address
1420 """)
1421
1422 class LineBufferMessage:
1423 def __init__(self):
1424 self.lines = []
1425 self.eom = False
1426 self.lost = False
1427
1428 def lineReceived(self, line):
1429 self.lines.append(line)
1430
1431 def eomReceived(self):
1432 self.eom = True
1433 return defer.succeed('<Whatever>')
1434
1435 def connectionLost(self):
1436 self.lost = True
1437
1438 class AliasTestCase(unittest.TestCase):
1439 lines = [
1440 'First line',
1441 'Next line',
1442 '',
1443 'After a blank line',
1444 'Last line'
1445 ]
1446
1447 def setUp(self):
1448 aliasFile.seek(0)
1449
1450 def testHandle(self):
1451 result = {}
1452 lines = [
1453 'user: another@host\n',
1454 'nextuser: |/bin/program\n',
1455 'user: me@again\n',
1456 'moreusers: :/etc/include/filename\n',
1457 'multiuser: first@host, second@host,last@anotherhost',
1458 ]
1459
1460 for l in lines:
1461 mail.alias.handle(result, l, 'TestCase', None)
1462
1463 self.assertEquals(result['user'], ['another@host', 'me@again'])
1464 self.assertEquals(result['nextuser'], ['|/bin/program'])
1465 self.assertEquals(result['moreusers'], [':/etc/include/filename'])
1466 self.assertEquals(result['multiuser'], ['first@host', 'second@host', 'la st@anotherhost'])
1467
1468 def testFileLoader(self):
1469 domains = {'': object()}
1470 result = mail.alias.loadAliasFile(domains, fp=aliasFile)
1471
1472 self.assertEquals(len(result), 3)
1473
1474 group = result['testuser']
1475 s = str(group)
1476 for a in ('address1', 'address2', 'address3', 'continuation@address', '/ bin/process/this'):
1477 self.failIfEqual(s.find(a), -1)
1478 self.assertEquals(len(group), 5)
1479
1480 group = result['usertwo']
1481 s = str(group)
1482 for a in ('thisaddress', 'thataddress', 'lastaddress'):
1483 self.failIfEqual(s.find(a), -1)
1484 self.assertEquals(len(group), 3)
1485
1486 group = result['lastuser']
1487 s = str(group)
1488 self.failUnlessEqual(s.find('/includable'), -1)
1489 for a in ('/filename', 'program', 'address'):
1490 self.failIfEqual(s.find(a), -1, '%s not found' % a)
1491 self.assertEquals(len(group), 3)
1492
1493 def testMultiWrapper(self):
1494 msgs = LineBufferMessage(), LineBufferMessage(), LineBufferMessage()
1495 msg = mail.alias.MultiWrapper(msgs)
1496
1497 for L in self.lines:
1498 msg.lineReceived(L)
1499 return msg.eomReceived().addCallback(self._cbMultiWrapper, msgs)
1500
1501 def _cbMultiWrapper(self, ignored, msgs):
1502 for m in msgs:
1503 self.failUnless(m.eom)
1504 self.failIf(m.lost)
1505 self.assertEquals(self.lines, m.lines)
1506
1507 def testFileAlias(self):
1508 tmpfile = self.mktemp()
1509 a = mail.alias.FileAlias(tmpfile, None, None)
1510 m = a.createMessageReceiver()
1511
1512 for l in self.lines:
1513 m.lineReceived(l)
1514 return m.eomReceived().addCallback(self._cbTestFileAlias, tmpfile)
1515
1516 def _cbTestFileAlias(self, ignored, tmpfile):
1517 lines = file(tmpfile).readlines()
1518 self.assertEquals([L[:-1] for L in lines], self.lines)
1519
1520
1521
1522 class DummyProcess(object):
1523 __slots__ = ['onEnd']
1524
1525
1526
1527 class MockProcessAlias(mail.alias.ProcessAlias):
1528 """
1529 A alias processor that doesn't actually launch processes.
1530 """
1531
1532 def spawnProcess(self, proto, program, path):
1533 """
1534 Don't spawn a process.
1535 """
1536
1537
1538
1539 class MockAliasGroup(mail.alias.AliasGroup):
1540 """
1541 An alias group using C{MockProcessAlias}.
1542 """
1543 processAliasFactory = MockProcessAlias
1544
1545
1546
1547 class StubProcess(object):
1548 """
1549 Fake implementation of L{IProcessTransport}.
1550
1551 @ivar signals: A list of all the signals which have been sent to this fake
1552 process.
1553 """
1554 def __init__(self):
1555 self.signals = []
1556
1557
1558 def loseConnection(self):
1559 """
1560 No-op implementation of disconnection.
1561 """
1562
1563
1564 def signalProcess(self, signal):
1565 """
1566 Record a signal sent to this process for later inspection.
1567 """
1568 self.signals.append(signal)
1569
1570
1571
1572 class ProcessAliasTestCase(unittest.TestCase):
1573 """
1574 Tests for alias resolution.
1575 """
1576
1577 lines = [
1578 'First line',
1579 'Next line',
1580 '',
1581 'After a blank line',
1582 'Last line'
1583 ]
1584
1585 def exitStatus(self, code):
1586 """
1587 Construct a status from the given exit code.
1588
1589 @type code: L{int} between 0 and 255 inclusive.
1590 @param code: The exit status which the code will represent.
1591
1592 @rtype: L{int}
1593 @return: A status integer for the given exit code.
1594 """
1595 # /* Macros for constructing status values. */
1596 # #define __W_EXITCODE(ret, sig) ((ret) << 8 | (sig))
1597 status = (code << 8) | 0
1598
1599 # Sanity check
1600 self.assertTrue(os.WIFEXITED(status))
1601 self.assertEqual(os.WEXITSTATUS(status), code)
1602 self.assertFalse(os.WIFSIGNALED(status))
1603
1604 return status
1605
1606
1607 def signalStatus(self, signal):
1608 """
1609 Construct a status from the given signal.
1610
1611 @type signal: L{int} between 0 and 255 inclusive.
1612 @param signal: The signal number which the status will represent.
1613
1614 @rtype: L{int}
1615 @return: A status integer for the given signal.
1616 """
1617 # /* If WIFSIGNALED(STATUS), the terminating signal. */
1618 # #define __WTERMSIG(status) ((status) & 0x7f)
1619 # /* Nonzero if STATUS indicates termination by a signal. */
1620 # #define __WIFSIGNALED(status) \
1621 # (((signed char) (((status) & 0x7f) + 1) >> 1) > 0)
1622 status = signal
1623
1624 # Sanity check
1625 self.assertTrue(os.WIFSIGNALED(status))
1626 self.assertEqual(os.WTERMSIG(status), signal)
1627 self.assertFalse(os.WIFEXITED(status))
1628
1629 return status
1630
1631
1632 def setUp(self):
1633 """
1634 Replace L{smtp.DNSNAME} with a well-known value.
1635 """
1636 self.DNSNAME = smtp.DNSNAME
1637 smtp.DNSNAME = ''
1638
1639
1640 def tearDown(self):
1641 """
1642 Restore the original value of L{smtp.DNSNAME}.
1643 """
1644 smtp.DNSNAME = self.DNSNAME
1645
1646
1647 def test_processAlias(self):
1648 """
1649 Standard call to C{mail.alias.ProcessAlias}: check that the specified
1650 script is called, and that the input is correctly transferred to it.
1651 """
1652 sh = FilePath(self.mktemp())
1653 sh.setContent("""\
1654 #!/bin/sh
1655 rm -f process.alias.out
1656 while read i; do
1657 echo $i >> process.alias.out
1658 done""")
1659 os.chmod(sh.path, 0700)
1660 a = mail.alias.ProcessAlias(sh.path, None, None)
1661 m = a.createMessageReceiver()
1662
1663 for l in self.lines:
1664 m.lineReceived(l)
1665
1666 def _cbProcessAlias(ignored):
1667 lines = file('process.alias.out').readlines()
1668 self.assertEquals([L[:-1] for L in lines], self.lines)
1669
1670 return m.eomReceived().addCallback(_cbProcessAlias)
1671
1672
1673 def test_processAliasTimeout(self):
1674 """
1675 If the alias child process does not exit within a particular period of
1676 time, the L{Deferred} returned by L{MessageWrapper.eomReceived} should
1677 fail with L{ProcessAliasTimeout} and send the I{KILL} signal to the
1678 child process..
1679 """
1680 reactor = task.Clock()
1681 transport = StubProcess()
1682 proto = mail.alias.ProcessAliasProtocol()
1683 proto.makeConnection(transport)
1684
1685 receiver = mail.alias.MessageWrapper(proto, None, reactor)
1686 d = receiver.eomReceived()
1687 reactor.advance(receiver.completionTimeout)
1688 def timedOut(ignored):
1689 self.assertEqual(transport.signals, ['KILL'])
1690 # Now that it has been killed, disconnect the protocol associated
1691 # with it.
1692 proto.processEnded(
1693 ProcessTerminated(self.signalStatus(signal.SIGKILL)))
1694 self.assertFailure(d, mail.alias.ProcessAliasTimeout)
1695 d.addCallback(timedOut)
1696 return d
1697
1698
1699 def test_earlyProcessTermination(self):
1700 """
1701 If the process associated with an L{mail.alias.MessageWrapper} exits
1702 before I{eomReceived} is called, the L{Deferred} returned by
1703 I{eomReceived} should fail.
1704 """
1705 transport = StubProcess()
1706 protocol = mail.alias.ProcessAliasProtocol()
1707 protocol.makeConnection(transport)
1708 receiver = mail.alias.MessageWrapper(protocol, None, None)
1709 protocol.processEnded(failure.Failure(ProcessDone(0)))
1710 return self.assertFailure(receiver.eomReceived(), ProcessDone)
1711
1712
1713 def _terminationTest(self, status):
1714 """
1715 Verify that if the process associated with an
1716 L{mail.alias.MessageWrapper} exits with the given status, the
1717 L{Deferred} returned by I{eomReceived} fails with L{ProcessTerminated}.
1718 """
1719 transport = StubProcess()
1720 protocol = mail.alias.ProcessAliasProtocol()
1721 protocol.makeConnection(transport)
1722 receiver = mail.alias.MessageWrapper(protocol, None, None)
1723 protocol.processEnded(
1724 failure.Failure(ProcessTerminated(status)))
1725 return self.assertFailure(receiver.eomReceived(), ProcessTerminated)
1726
1727
1728 def test_errorProcessTermination(self):
1729 """
1730 If the process associated with an L{mail.alias.MessageWrapper} exits
1731 with a non-zero exit code, the L{Deferred} returned by I{eomReceived}
1732 should fail.
1733 """
1734 return self._terminationTest(self.exitStatus(1))
1735
1736
1737 def test_signalProcessTermination(self):
1738 """
1739 If the process associated with an L{mail.alias.MessageWrapper} exits
1740 because it received a signal, the L{Deferred} returned by
1741 I{eomReceived} should fail.
1742 """
1743 return self._terminationTest(self.signalStatus(signal.SIGHUP))
1744
1745
1746 def test_aliasResolution(self):
1747 """
1748 Check that the C{resolve} method of alias processors produce the correct
1749 set of objects:
1750 - direct alias with L{mail.alias.AddressAlias} if a simple input is passed
1751 - aliases in a file with L{mail.alias.FileWrapper} if an input in th e format
1752 '/file' is given
1753 - aliases resulting of a process call wrapped by L{mail.alias.Messag eWrapper}
1754 if the format is '|process'
1755 """
1756 aliases = {}
1757 domain = {'': TestDomain(aliases, ['user1', 'user2', 'user3'])}
1758 A1 = MockAliasGroup(['user1', '|echo', '/file'], domain, 'alias1')
1759 A2 = MockAliasGroup(['user2', 'user3'], domain, 'alias2')
1760 A3 = mail.alias.AddressAlias('alias1', domain, 'alias3')
1761 aliases.update({
1762 'alias1': A1,
1763 'alias2': A2,
1764 'alias3': A3,
1765 })
1766
1767 res1 = A1.resolve(aliases)
1768 r1 = map(str, res1.objs)
1769 r1.sort()
1770 expected = map(str, [
1771 mail.alias.AddressAlias('user1', None, None),
1772 mail.alias.MessageWrapper(DummyProcess(), 'echo'),
1773 mail.alias.FileWrapper('/file'),
1774 ])
1775 expected.sort()
1776 self.assertEquals(r1, expected)
1777
1778 res2 = A2.resolve(aliases)
1779 r2 = map(str, res2.objs)
1780 r2.sort()
1781 expected = map(str, [
1782 mail.alias.AddressAlias('user2', None, None),
1783 mail.alias.AddressAlias('user3', None, None)
1784 ])
1785 expected.sort()
1786 self.assertEquals(r2, expected)
1787
1788 res3 = A3.resolve(aliases)
1789 r3 = map(str, res3.objs)
1790 r3.sort()
1791 expected = map(str, [
1792 mail.alias.AddressAlias('user1', None, None),
1793 mail.alias.MessageWrapper(DummyProcess(), 'echo'),
1794 mail.alias.FileWrapper('/file'),
1795 ])
1796 expected.sort()
1797 self.assertEquals(r3, expected)
1798
1799
1800 def test_cyclicAlias(self):
1801 """
1802 Check that a cycle in alias resolution is correctly handled.
1803 """
1804 aliases = {}
1805 domain = {'': TestDomain(aliases, [])}
1806 A1 = mail.alias.AddressAlias('alias2', domain, 'alias1')
1807 A2 = mail.alias.AddressAlias('alias3', domain, 'alias2')
1808 A3 = mail.alias.AddressAlias('alias1', domain, 'alias3')
1809 aliases.update({
1810 'alias1': A1,
1811 'alias2': A2,
1812 'alias3': A3
1813 })
1814
1815 self.assertEquals(aliases['alias1'].resolve(aliases), None)
1816 self.assertEquals(aliases['alias2'].resolve(aliases), None)
1817 self.assertEquals(aliases['alias3'].resolve(aliases), None)
1818
1819 A4 = MockAliasGroup(['|echo', 'alias1'], domain, 'alias4')
1820 aliases['alias4'] = A4
1821
1822 res = A4.resolve(aliases)
1823 r = map(str, res.objs)
1824 r.sort()
1825 expected = map(str, [
1826 mail.alias.MessageWrapper(DummyProcess(), 'echo')
1827 ])
1828 expected.sort()
1829 self.assertEquals(r, expected)
1830
1831
1832
1833 if interfaces.IReactorProcess(reactor, None) is None:
1834 ProcessAliasTestCase = "IReactorProcess not supported"
1835
1836
1837
1838 class TestDomain:
1839 def __init__(self, aliases, users):
1840 self.aliases = aliases
1841 self.users = users
1842
1843 def exists(self, user, memo=None):
1844 user = user.dest.local
1845 if user in self.users:
1846 return lambda: mail.alias.AddressAlias(user, None, None)
1847 try:
1848 a = self.aliases[user]
1849 except:
1850 raise smtp.SMTPBadRcpt(user)
1851 else:
1852 aliases = a.resolve(self.aliases, memo)
1853 if aliases:
1854 return lambda: aliases
1855 raise smtp.SMTPBadRcpt(user)
1856
1857
1858 from twisted.python.runtime import platformType
1859 import types
1860 if platformType != "posix":
1861 for o in locals().values():
1862 if isinstance(o, (types.ClassType, type)) and issubclass(o, unittest.Tes tCase):
1863 o.skip = "twisted.mail only works on posix"
OLDNEW
« no previous file with comments | « third_party/twisted_8_1/twisted/mail/test/test_imap.py ('k') | third_party/twisted_8_1/twisted/mail/test/test_options.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698