Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(93)

Side by Side Diff: third_party/twisted_8_1/twisted/mail/test/test_smtp.py

Issue 12261012: Remove third_party/twisted_8_1 (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/tools/build
Patch Set: Created 7 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 # Copyright (c) 2001-2007 Twisted Matrix Laboratories.
2 # See LICENSE for details.
3
4 """
5 Test cases for twisted.mail.smtp module.
6 """
7
8 from zope.interface import implements
9
10 from twisted.trial import unittest, util
11 from twisted.protocols import basic, loopback
12 from twisted.mail import smtp
13 from twisted.internet import defer, protocol, reactor, interfaces
14 from twisted.internet import address, error, task
15 from twisted.test.test_protocols import StringIOWithoutClosing
16 from twisted.test.proto_helpers import StringTransport
17
18 from twisted import cred
19 import twisted.cred.error
20 import twisted.cred.portal
21 import twisted.cred.checkers
22 import twisted.cred.credentials
23
24 from twisted.cred.portal import IRealm, Portal
25 from twisted.cred.checkers import ICredentialsChecker, AllowAnonymousAccess
26 from twisted.cred.credentials import IAnonymous
27 from twisted.cred.error import UnauthorizedLogin
28
29 from twisted.mail import imap4
30
31
32 try:
33 from twisted.test.ssl_helpers import ClientTLSContext, ServerTLSContext
34 except ImportError:
35 ClientTLSContext = ServerTLSContext = None
36
37 import re
38
39 try:
40 from cStringIO import StringIO
41 except ImportError:
42 from StringIO import StringIO
43
44 def spameater(*spam, **eggs):
45 return None
46
47 class DummyMessage:
48
49 def __init__(self, domain, user):
50 self.domain = domain
51 self.user = user
52 self.buffer = []
53
54 def lineReceived(self, line):
55 # Throw away the generated Received: header
56 if not re.match('Received: From yyy.com \(\[.*\]\) by localhost;', line) :
57 self.buffer.append(line)
58
59 def eomReceived(self):
60 message = '\n'.join(self.buffer) + '\n'
61 self.domain.messages[self.user.dest.local].append(message)
62 deferred = defer.Deferred()
63 deferred.callback("saved")
64 return deferred
65
66
67 class DummyDomain:
68
69 def __init__(self, names):
70 self.messages = {}
71 for name in names:
72 self.messages[name] = []
73
74 def exists(self, user):
75 if self.messages.has_key(user.dest.local):
76 return defer.succeed(lambda: self.startMessage(user))
77 return defer.fail(smtp.SMTPBadRcpt(user))
78
79 def startMessage(self, user):
80 return DummyMessage(self, user)
81
82 class SMTPTestCase(unittest.TestCase):
83
84 messages = [('foo@bar.com', ['foo@baz.com', 'qux@baz.com'], '''\
85 Subject: urgent\015
86 \015
87 Someone set up us the bomb!\015
88 ''')]
89
90 mbox = {'foo': ['Subject: urgent\n\nSomeone set up us the bomb!\n']}
91
92 def setUp(self):
93 self.factory = smtp.SMTPFactory()
94 self.factory.domains = {}
95 self.factory.domains['baz.com'] = DummyDomain(['foo'])
96 self.output = StringIOWithoutClosing()
97 self.transport = protocol.FileWrapper(self.output)
98
99 def testMessages(self):
100 from twisted.mail import protocols
101 protocol = protocols.DomainSMTP()
102 protocol.service = self.factory
103 protocol.factory = self.factory
104 protocol.receivedHeader = spameater
105 protocol.makeConnection(self.transport)
106 protocol.lineReceived('HELO yyy.com')
107 for message in self.messages:
108 protocol.lineReceived('MAIL FROM:<%s>' % message[0])
109 for target in message[1]:
110 protocol.lineReceived('RCPT TO:<%s>' % target)
111 protocol.lineReceived('DATA')
112 protocol.dataReceived(message[2])
113 protocol.lineReceived('.')
114 protocol.lineReceived('QUIT')
115 if self.mbox != self.factory.domains['baz.com'].messages:
116 raise AssertionError(self.factory.domains['baz.com'].messages)
117 protocol.setTimeout(None)
118
119 testMessages.suppress = [util.suppress(message='DomainSMTP', category=Deprec ationWarning)]
120
121 mail = '''\
122 Subject: hello
123
124 Goodbye
125 '''
126
127 class MyClient:
128 def __init__(self):
129 self.mail = 'moshez@foo.bar', ['moshez@foo.bar'], mail
130
131 def getMailFrom(self):
132 return self.mail[0]
133
134 def getMailTo(self):
135 return self.mail[1]
136
137 def getMailData(self):
138 return StringIO(self.mail[2])
139
140 def sentMail(self, code, resp, numOk, addresses, log):
141 self.mail = None, None, None
142
143 class MySMTPClient(MyClient, smtp.SMTPClient):
144 def __init__(self):
145 smtp.SMTPClient.__init__(self, 'foo.baz')
146 MyClient.__init__(self)
147
148 class MyESMTPClient(MyClient, smtp.ESMTPClient):
149 def __init__(self, secret = '', contextFactory = None):
150 smtp.ESMTPClient.__init__(self, secret, contextFactory, 'foo.baz')
151 MyClient.__init__(self)
152
153 class LoopbackMixin:
154 def loopback(self, server, client):
155 return loopback.loopbackTCP(server, client)
156
157 class LoopbackTestCase(LoopbackMixin):
158 def testMessages(self):
159 factory = smtp.SMTPFactory()
160 factory.domains = {}
161 factory.domains['foo.bar'] = DummyDomain(['moshez'])
162 from twisted.mail.protocols import DomainSMTP
163 protocol = DomainSMTP()
164 protocol.service = factory
165 protocol.factory = factory
166 clientProtocol = self.clientClass()
167 return self.loopback(protocol, clientProtocol)
168 testMessages.suppress = [util.suppress(message='DomainSMTP', category=Deprec ationWarning)]
169
170 class LoopbackSMTPTestCase(LoopbackTestCase, unittest.TestCase):
171 clientClass = MySMTPClient
172
173 class LoopbackESMTPTestCase(LoopbackTestCase, unittest.TestCase):
174 clientClass = MyESMTPClient
175
176
177 class FakeSMTPServer(basic.LineReceiver):
178
179 clientData = [
180 '220 hello', '250 nice to meet you',
181 '250 great', '250 great', '354 go on, lad'
182 ]
183
184 def connectionMade(self):
185 self.buffer = []
186 self.clientData = self.clientData[:]
187 self.clientData.reverse()
188 self.sendLine(self.clientData.pop())
189
190 def lineReceived(self, line):
191 self.buffer.append(line)
192 if line == "QUIT":
193 self.transport.write("221 see ya around\r\n")
194 self.transport.loseConnection()
195 elif line == ".":
196 self.transport.write("250 gotcha\r\n")
197 elif line == "RSET":
198 self.transport.loseConnection()
199
200 if self.clientData:
201 self.sendLine(self.clientData.pop())
202
203
204 class SMTPClientTestCase(unittest.TestCase, LoopbackMixin):
205
206 expected_output = [
207 'HELO foo.baz', 'MAIL FROM:<moshez@foo.bar>',
208 'RCPT TO:<moshez@foo.bar>', 'DATA',
209 'Subject: hello', '', 'Goodbye', '.', 'RSET'
210 ]
211
212 def testMessages(self):
213 # this test is disabled temporarily
214 client = MySMTPClient()
215 server = FakeSMTPServer()
216 d = self.loopback(server, client)
217 d.addCallback(lambda x :
218 self.assertEquals(server.buffer, self.expected_output))
219 return d
220
221 class DummySMTPMessage:
222
223 def __init__(self, protocol, users):
224 self.protocol = protocol
225 self.users = users
226 self.buffer = []
227
228 def lineReceived(self, line):
229 self.buffer.append(line)
230
231 def eomReceived(self):
232 message = '\n'.join(self.buffer) + '\n'
233 helo, origin = self.users[0].helo[0], str(self.users[0].orig)
234 recipients = []
235 for user in self.users:
236 recipients.append(str(user))
237 self.protocol.message[tuple(recipients)] = (helo, origin, recipients, me ssage)
238 return defer.succeed("saved")
239
240 class DummyProto:
241 def connectionMade(self):
242 self.dummyMixinBase.connectionMade(self)
243 self.message = {}
244
245 def startMessage(self, users):
246 return DummySMTPMessage(self, users)
247
248 def receivedHeader(*spam):
249 return None
250
251 def validateTo(self, user):
252 self.delivery = DummyDelivery()
253 return lambda: self.startMessage([user])
254
255 def validateFrom(self, helo, origin):
256 return origin
257
258 class DummySMTP(DummyProto, smtp.SMTP):
259 dummyMixinBase = smtp.SMTP
260
261 class DummyESMTP(DummyProto, smtp.ESMTP):
262 dummyMixinBase = smtp.ESMTP
263
264 class AnotherTestCase:
265 serverClass = None
266 clientClass = None
267
268 messages = [ ('foo.com', 'moshez@foo.com', ['moshez@bar.com'],
269 'moshez@foo.com', ['moshez@bar.com'], '''\
270 From: Moshe
271 To: Moshe
272
273 Hi,
274 how are you?
275 '''),
276 ('foo.com', 'tttt@rrr.com', ['uuu@ooo', 'yyy@eee'],
277 'tttt@rrr.com', ['uuu@ooo', 'yyy@eee'], '''\
278 Subject: pass
279
280 ..rrrr..
281 '''),
282 ('foo.com', '@this,@is,@ignored:foo@bar.com',
283 ['@ignore,@this,@too:bar@foo.com'],
284 'foo@bar.com', ['bar@foo.com'], '''\
285 Subject: apa
286 To: foo
287
288 123
289 .
290 456
291 '''),
292 ]
293
294 data = [
295 ('', '220.*\r\n$', None, None),
296 ('HELO foo.com\r\n', '250.*\r\n$', None, None),
297 ('RSET\r\n', '250.*\r\n$', None, None),
298 ]
299 for helo_, from_, to_, realfrom, realto, msg in messages:
300 data.append(('MAIL FROM:<%s>\r\n' % from_, '250.*\r\n',
301 None, None))
302 for rcpt in to_:
303 data.append(('RCPT TO:<%s>\r\n' % rcpt, '250.*\r\n',
304 None, None))
305
306 data.append(('DATA\r\n','354.*\r\n',
307 msg, ('250.*\r\n',
308 (helo_, realfrom, realto, msg))))
309
310
311 def testBuffer(self):
312 output = StringIOWithoutClosing()
313 a = self.serverClass()
314 class fooFactory:
315 domain = 'foo.com'
316
317 a.factory = fooFactory()
318 a.makeConnection(protocol.FileWrapper(output))
319 for (send, expect, msg, msgexpect) in self.data:
320 if send:
321 a.dataReceived(send)
322 data = output.getvalue()
323 output.truncate(0)
324 if not re.match(expect, data):
325 raise AssertionError, (send, expect, data)
326 if data[:3] == '354':
327 for line in msg.splitlines():
328 if line and line[0] == '.':
329 line = '.' + line
330 a.dataReceived(line + '\r\n')
331 a.dataReceived('.\r\n')
332 # Special case for DATA. Now we want a 250, and then
333 # we compare the messages
334 data = output.getvalue()
335 output.truncate()
336 resp, msgdata = msgexpect
337 if not re.match(resp, data):
338 raise AssertionError, (resp, data)
339 for recip in msgdata[2]:
340 expected = list(msgdata[:])
341 expected[2] = [recip]
342 self.assertEquals(
343 a.message[(recip,)],
344 tuple(expected)
345 )
346 a.setTimeout(None)
347
348
349 class AnotherESMTPTestCase(AnotherTestCase, unittest.TestCase):
350 serverClass = DummyESMTP
351 clientClass = MyESMTPClient
352
353 class AnotherSMTPTestCase(AnotherTestCase, unittest.TestCase):
354 serverClass = DummySMTP
355 clientClass = MySMTPClient
356
357
358
359 class DummyChecker:
360 implements(cred.checkers.ICredentialsChecker)
361
362 users = {
363 'testuser': 'testpassword'
364 }
365
366 credentialInterfaces = (cred.credentials.IUsernameHashedPassword,)
367
368 def requestAvatarId(self, credentials):
369 return defer.maybeDeferred(
370 credentials.checkPassword, self.users[credentials.username]
371 ).addCallback(self._cbCheck, credentials.username)
372
373 def _cbCheck(self, result, username):
374 if result:
375 return username
376 raise cred.error.UnauthorizedLogin()
377
378 class DummyDelivery:
379 implements(smtp.IMessageDelivery)
380
381 def validateTo(self, user):
382 return user
383
384 def validateFrom(self, helo, origin):
385 return origin
386
387 def receivedHeader(*args):
388 return None
389
390 class DummyRealm:
391 def requestAvatar(self, avatarId, mind, *interfaces):
392 return smtp.IMessageDelivery, DummyDelivery(), lambda: None
393
394 class AuthTestCase(unittest.TestCase, LoopbackMixin):
395 def testAuth(self):
396 realm = DummyRealm()
397 p = cred.portal.Portal(realm)
398 p.registerChecker(DummyChecker())
399
400 server = DummyESMTP({'CRAM-MD5': cred.credentials.CramMD5Credentials})
401 server.portal = p
402 client = MyESMTPClient('testpassword')
403
404 cAuth = imap4.CramMD5ClientAuthenticator('testuser')
405 client.registerAuthenticator(cAuth)
406
407 d = self.loopback(server, client)
408 d.addCallback(lambda x : self.assertEquals(server.authenticated, 1))
409 return d
410
411 class SMTPHelperTestCase(unittest.TestCase):
412 def testMessageID(self):
413 d = {}
414 for i in range(1000):
415 m = smtp.messageid('testcase')
416 self.failIf(m in d)
417 d[m] = None
418
419 def testQuoteAddr(self):
420 cases = [
421 ['user@host.name', '<user@host.name>'],
422 ['"User Name" <user@host.name>', '<user@host.name>'],
423 [smtp.Address('someguy@someplace'), '<someguy@someplace>'],
424 ['', '<>'],
425 [smtp.Address(''), '<>'],
426 ]
427
428 for (c, e) in cases:
429 self.assertEquals(smtp.quoteaddr(c), e)
430
431 def testUser(self):
432 u = smtp.User('user@host', 'helo.host.name', None, None)
433 self.assertEquals(str(u), 'user@host')
434
435 def testXtextEncoding(self):
436 cases = [
437 ('Hello world', 'Hello+20world'),
438 ('Hello+world', 'Hello+2Bworld'),
439 ('\0\1\2\3\4\5', '+00+01+02+03+04+05'),
440 ('e=mc2@example.com', 'e+3Dmc2@example.com')
441 ]
442
443 for (case, expected) in cases:
444 self.assertEquals(case.encode('xtext'), expected)
445 self.assertEquals(expected.decode('xtext'), case)
446
447
448 class NoticeTLSClient(MyESMTPClient):
449 tls = False
450
451 def esmtpState_starttls(self, code, resp):
452 MyESMTPClient.esmtpState_starttls(self, code, resp)
453 self.tls = True
454
455 class TLSTestCase(unittest.TestCase, LoopbackMixin):
456 def testTLS(self):
457 clientCTX = ClientTLSContext()
458 serverCTX = ServerTLSContext()
459
460 client = NoticeTLSClient(contextFactory=clientCTX)
461 server = DummyESMTP(contextFactory=serverCTX)
462
463 def check(ignored):
464 self.assertEquals(client.tls, True)
465 self.assertEquals(server.startedTLS, True)
466
467 return self.loopback(server, client).addCallback(check)
468
469 if ClientTLSContext is None:
470 for case in (TLSTestCase,):
471 case.skip = "OpenSSL not present"
472
473 if not interfaces.IReactorSSL.providedBy(reactor):
474 for case in (TLSTestCase,):
475 case.skip = "Reactor doesn't support SSL"
476
477 class EmptyLineTestCase(unittest.TestCase):
478 def testEmptyLineSyntaxError(self):
479 proto = smtp.SMTP()
480 output = StringIOWithoutClosing()
481 transport = protocol.FileWrapper(output)
482 proto.makeConnection(transport)
483 proto.lineReceived('')
484 proto.setTimeout(None)
485
486 out = output.getvalue().splitlines()
487 self.assertEquals(len(out), 2)
488 self.failUnless(out[0].startswith('220'))
489 self.assertEquals(out[1], "500 Error: bad syntax")
490
491
492
493 class TimeoutTestCase(unittest.TestCase, LoopbackMixin):
494 """
495 Check that SMTP client factories correctly use the timeout.
496 """
497
498 def _timeoutTest(self, onDone, clientFactory):
499 """
500 Connect the clientFactory, and check the timeout on the request.
501 """
502 clock = task.Clock()
503 client = clientFactory.buildProtocol(
504 address.IPv4Address('TCP', 'example.net', 25))
505 client.callLater = clock.callLater
506 t = StringTransport()
507 client.makeConnection(t)
508 t.protocol = client
509 def check(ign):
510 self.assertEquals(clock.seconds(), 0.5)
511 d = self.assertFailure(onDone, smtp.SMTPTimeoutError
512 ).addCallback(check)
513 # The first call should not trigger the timeout
514 clock.advance(0.1)
515 # But this one should
516 clock.advance(0.4)
517 return d
518
519
520 def test_SMTPClient(self):
521 """
522 Test timeout for L{smtp.SMTPSenderFactory}: the response L{Deferred}
523 should be errback with a L{smtp.SMTPTimeoutError}.
524 """
525 onDone = defer.Deferred()
526 clientFactory = smtp.SMTPSenderFactory(
527 'source@address', 'recipient@address',
528 StringIO("Message body"), onDone,
529 retries=0, timeout=0.5)
530 return self._timeoutTest(onDone, clientFactory)
531
532
533 def test_ESMTPClient(self):
534 """
535 Test timeout for L{smtp.ESMTPSenderFactory}: the response L{Deferred}
536 should be errback with a L{smtp.SMTPTimeoutError}.
537 """
538 onDone = defer.Deferred()
539 clientFactory = smtp.ESMTPSenderFactory(
540 'username', 'password',
541 'source@address', 'recipient@address',
542 StringIO("Message body"), onDone,
543 retries=0, timeout=0.5)
544 return self._timeoutTest(onDone, clientFactory)
545
546
547
548 class SingletonRealm(object):
549 """
550 Trivial realm implementation which is constructed with an interface and an
551 avatar and returns that avatar when asked for that interface.
552 """
553 implements(IRealm)
554
555 def __init__(self, interface, avatar):
556 self.interface = interface
557 self.avatar = avatar
558
559
560 def requestAvatar(self, avatarId, mind, *interfaces):
561 for iface in interfaces:
562 if iface is self.interface:
563 return iface, self.avatar, lambda: None
564
565
566
567 class NotImplementedDelivery(object):
568 """
569 Non-implementation of L{smtp.IMessageDelivery} which only has methods which
570 raise L{NotImplementedError}. Subclassed by various tests to provide the
571 particular behavior being tested.
572 """
573 def validateFrom(self, helo, origin):
574 raise NotImplementedError("This oughtn't be called in the course of this test.")
575
576
577 def validateTo(self, user):
578 raise NotImplementedError("This oughtn't be called in the course of this test.")
579
580
581 def receivedHeader(self, helo, origin, recipients):
582 raise NotImplementedError("This oughtn't be called in the course of this test.")
583
584
585
586 class SMTPServerTestCase(unittest.TestCase):
587 """
588 Test various behaviors of L{twisted.mail.smtp.SMTP} and
589 L{twisted.mail.smtp.ESMTP}.
590 """
591 def testSMTPGreetingHost(self, serverClass=smtp.SMTP):
592 """
593 Test that the specified hostname shows up in the SMTP server's
594 greeting.
595 """
596 s = serverClass()
597 s.host = "example.com"
598 t = StringTransport()
599 s.makeConnection(t)
600 s.connectionLost(error.ConnectionDone())
601 self.assertIn("example.com", t.value())
602
603
604 def testSMTPGreetingNotExtended(self):
605 """
606 Test that the string "ESMTP" does not appear in the SMTP server's
607 greeting since that string strongly suggests the presence of support
608 for various SMTP extensions which are not supported by L{smtp.SMTP}.
609 """
610 s = smtp.SMTP()
611 t = StringTransport()
612 s.makeConnection(t)
613 s.connectionLost(error.ConnectionDone())
614 self.assertNotIn("ESMTP", t.value())
615
616
617 def testESMTPGreetingHost(self):
618 """
619 Similar to testSMTPGreetingHost, but for the L{smtp.ESMTP} class.
620 """
621 self.testSMTPGreetingHost(smtp.ESMTP)
622
623
624 def testESMTPGreetingExtended(self):
625 """
626 Test that the string "ESMTP" does appear in the ESMTP server's
627 greeting since L{smtp.ESMTP} does support the SMTP extensions which
628 that advertises to the client.
629 """
630 s = smtp.ESMTP()
631 t = StringTransport()
632 s.makeConnection(t)
633 s.connectionLost(error.ConnectionDone())
634 self.assertIn("ESMTP", t.value())
635
636
637 def test_acceptSenderAddress(self):
638 """
639 Test that a C{MAIL FROM} command with an acceptable address is
640 responded to with the correct success code.
641 """
642 class AcceptanceDelivery(NotImplementedDelivery):
643 """
644 Delivery object which accepts all senders as valid.
645 """
646 def validateFrom(self, helo, origin):
647 return origin
648
649 realm = SingletonRealm(smtp.IMessageDelivery, AcceptanceDelivery())
650 portal = Portal(realm, [AllowAnonymousAccess()])
651 proto = smtp.SMTP()
652 proto.portal = portal
653 trans = StringTransport()
654 proto.makeConnection(trans)
655
656 # Deal with the necessary preliminaries
657 proto.dataReceived('HELO example.com\r\n')
658 trans.clear()
659
660 # Try to specify our sender address
661 proto.dataReceived('MAIL FROM:<alice@example.com>\r\n')
662
663 # Clean up the protocol before doing anything that might raise an
664 # exception.
665 proto.connectionLost(error.ConnectionLost())
666
667 # Make sure that we received exactly the correct response
668 self.assertEqual(
669 trans.value(),
670 '250 Sender address accepted\r\n')
671
672
673 def test_deliveryRejectedSenderAddress(self):
674 """
675 Test that a C{MAIL FROM} command with an address rejected by a
676 L{smtp.IMessageDelivery} instance is responded to with the correct
677 error code.
678 """
679 class RejectionDelivery(NotImplementedDelivery):
680 """
681 Delivery object which rejects all senders as invalid.
682 """
683 def validateFrom(self, helo, origin):
684 raise smtp.SMTPBadSender(origin)
685
686 realm = SingletonRealm(smtp.IMessageDelivery, RejectionDelivery())
687 portal = Portal(realm, [AllowAnonymousAccess()])
688 proto = smtp.SMTP()
689 proto.portal = portal
690 trans = StringTransport()
691 proto.makeConnection(trans)
692
693 # Deal with the necessary preliminaries
694 proto.dataReceived('HELO example.com\r\n')
695 trans.clear()
696
697 # Try to specify our sender address
698 proto.dataReceived('MAIL FROM:<alice@example.com>\r\n')
699
700 # Clean up the protocol before doing anything that might raise an
701 # exception.
702 proto.connectionLost(error.ConnectionLost())
703
704 # Make sure that we received exactly the correct response
705 self.assertEqual(
706 trans.value(),
707 '550 Cannot receive from specified address '
708 '<alice@example.com>: Sender not acceptable\r\n')
709
710
711 def test_portalRejectedSenderAddress(self):
712 """
713 Test that a C{MAIL FROM} command with an address rejected by an
714 L{smtp.SMTP} instance's portal is responded to with the correct error
715 code.
716 """
717 class DisallowAnonymousAccess(object):
718 """
719 Checker for L{IAnonymous} which rejects authentication attempts.
720 """
721 implements(ICredentialsChecker)
722
723 credentialInterfaces = (IAnonymous,)
724
725 def requestAvatarId(self, credentials):
726 return defer.fail(UnauthorizedLogin())
727
728 realm = SingletonRealm(smtp.IMessageDelivery, NotImplementedDelivery())
729 portal = Portal(realm, [DisallowAnonymousAccess()])
730 proto = smtp.SMTP()
731 proto.portal = portal
732 trans = StringTransport()
733 proto.makeConnection(trans)
734
735 # Deal with the necessary preliminaries
736 proto.dataReceived('HELO example.com\r\n')
737 trans.clear()
738
739 # Try to specify our sender address
740 proto.dataReceived('MAIL FROM:<alice@example.com>\r\n')
741
742 # Clean up the protocol before doing anything that might raise an
743 # exception.
744 proto.connectionLost(error.ConnectionLost())
745
746 # Make sure that we received exactly the correct response
747 self.assertEqual(
748 trans.value(),
749 '550 Cannot receive from specified address '
750 '<alice@example.com>: Sender not acceptable\r\n')
751
752
753 def test_portalRejectedAnonymousSender(self):
754 """
755 Test that a C{MAIL FROM} command issued without first authenticating
756 when a portal has been configured to disallow anonymous logins is
757 responded to with the correct error code.
758 """
759 realm = SingletonRealm(smtp.IMessageDelivery, NotImplementedDelivery())
760 portal = Portal(realm, [])
761 proto = smtp.SMTP()
762 proto.portal = portal
763 trans = StringTransport()
764 proto.makeConnection(trans)
765
766 # Deal with the necessary preliminaries
767 proto.dataReceived('HELO example.com\r\n')
768 trans.clear()
769
770 # Try to specify our sender address
771 proto.dataReceived('MAIL FROM:<alice@example.com>\r\n')
772
773 # Clean up the protocol before doing anything that might raise an
774 # exception.
775 proto.connectionLost(error.ConnectionLost())
776
777 # Make sure that we received exactly the correct response
778 self.assertEqual(
779 trans.value(),
780 '550 Cannot receive from specified address '
781 '<alice@example.com>: Unauthenticated senders not allowed\r\n')
782
783
784
785 class ESMTPAuthenticationTestCase(unittest.TestCase):
786 def assertServerResponse(self, bytes, response):
787 """
788 Assert that when the given bytes are delivered to the ESMTP server
789 instance, it responds with the indicated lines.
790
791 @type bytes: str
792 @type response: list of str
793 """
794 self.transport.clear()
795 self.server.dataReceived(bytes)
796 self.assertEqual(
797 response,
798 self.transport.value().splitlines())
799
800
801 def assertServerAuthenticated(self, loginArgs, username="username", password ="password"):
802 """
803 Assert that a login attempt has been made, that the credentials and
804 interfaces passed to it are correct, and that when the login request
805 is satisfied, a successful response is sent by the ESMTP server
806 instance.
807
808 @param loginArgs: A C{list} previously passed to L{portalFactory}.
809 """
810 d, credentials, mind, interfaces = loginArgs.pop()
811 self.assertEqual(loginArgs, [])
812 self.failUnless(twisted.cred.credentials.IUsernamePassword.providedBy(cr edentials))
813 self.assertEqual(credentials.username, username)
814 self.failUnless(credentials.checkPassword(password))
815 self.assertIn(smtp.IMessageDeliveryFactory, interfaces)
816 self.assertIn(smtp.IMessageDelivery, interfaces)
817 d.callback((smtp.IMessageDeliveryFactory, None, lambda: None))
818
819 self.assertEqual(
820 ["235 Authentication successful."],
821 self.transport.value().splitlines())
822
823
824 def setUp(self):
825 """
826 Create an ESMTP instance attached to a StringTransport.
827 """
828 self.server = smtp.ESMTP({
829 'LOGIN': imap4.LOGINCredentials})
830 self.server.host = 'localhost'
831 self.transport = StringTransport(
832 peerAddress=address.IPv4Address('TCP', '127.0.0.1', 12345))
833 self.server.makeConnection(self.transport)
834
835
836 def tearDown(self):
837 """
838 Disconnect the ESMTP instance to clean up its timeout DelayedCall.
839 """
840 self.server.connectionLost(error.ConnectionDone())
841
842
843 def portalFactory(self, loginList):
844 class DummyPortal:
845 def login(self, credentials, mind, *interfaces):
846 d = defer.Deferred()
847 loginList.append((d, credentials, mind, interfaces))
848 return d
849 return DummyPortal()
850
851
852 def test_authenticationCapabilityAdvertised(self):
853 """
854 Test that AUTH is advertised to clients which issue an EHLO command.
855 """
856 self.transport.clear()
857 self.server.dataReceived('EHLO\r\n')
858 responseLines = self.transport.value().splitlines()
859 self.assertEqual(
860 responseLines[0],
861 "250-localhost Hello 127.0.0.1, nice to meet you")
862 self.assertEqual(
863 responseLines[1],
864 "250 AUTH LOGIN")
865 self.assertEqual(len(responseLines), 2)
866
867
868 def test_plainAuthentication(self):
869 """
870 Test that the LOGIN authentication mechanism can be used
871 """
872 loginArgs = []
873 self.server.portal = self.portalFactory(loginArgs)
874
875 self.server.dataReceived('EHLO\r\n')
876 self.transport.clear()
877
878 self.assertServerResponse(
879 'AUTH LOGIN\r\n',
880 ["334 " + "User Name\0".encode('base64').strip()])
881
882 self.assertServerResponse(
883 'username'.encode('base64') + '\r\n',
884 ["334 " + "Password\0".encode('base64').strip()])
885
886 self.assertServerResponse(
887 'password'.encode('base64').strip() + '\r\n',
888 [])
889
890 self.assertServerAuthenticated(loginArgs)
891
892
893 def test_plainAuthenticationEmptyPassword(self):
894 """
895 Test that giving an empty password for plain auth succeeds.
896 """
897 loginArgs = []
898 self.server.portal = self.portalFactory(loginArgs)
899
900 self.server.dataReceived('EHLO\r\n')
901 self.transport.clear()
902
903 self.assertServerResponse(
904 'AUTH LOGIN\r\n',
905 ["334 " + "User Name\0".encode('base64').strip()])
906
907 self.assertServerResponse(
908 'username'.encode('base64') + '\r\n',
909 ["334 " + "Password\0".encode('base64').strip()])
910
911 self.assertServerResponse('\r\n', [])
912 self.assertServerAuthenticated(loginArgs, password='')
913
914 def test_plainAuthenticationInitialResponse(self):
915 """
916 The response to the first challenge may be included on the AUTH command
917 line. Test that this is also supported.
918 """
919 loginArgs = []
920 self.server.portal = self.portalFactory(loginArgs)
921
922 self.server.dataReceived('EHLO\r\n')
923 self.transport.clear()
924
925 self.assertServerResponse(
926 'AUTH LOGIN ' + "username".encode('base64').strip() + '\r\n',
927 ["334 " + "Password\0".encode('base64').strip()])
928
929 self.assertServerResponse(
930 'password'.encode('base64').strip() + '\r\n',
931 [])
932
933 self.assertServerAuthenticated(loginArgs)
934
935
936 def test_abortAuthentication(self):
937 """
938 Test that a challenge/response sequence can be aborted by the client.
939 """
940 loginArgs = []
941 self.server.portal = self.portalFactory(loginArgs)
942
943 self.server.dataReceived('EHLO\r\n')
944 self.server.dataReceived('AUTH LOGIN\r\n')
945
946 self.assertServerResponse(
947 '*\r\n',
948 ['501 Authentication aborted'])
949
950
951 def test_invalidBase64EncodedResponse(self):
952 """
953 Test that a response which is not properly Base64 encoded results in
954 the appropriate error code.
955 """
956 loginArgs = []
957 self.server.portal = self.portalFactory(loginArgs)
958
959 self.server.dataReceived('EHLO\r\n')
960 self.server.dataReceived('AUTH LOGIN\r\n')
961
962 self.assertServerResponse(
963 'x\r\n',
964 ['501 Syntax error in parameters or arguments'])
965
966 self.assertEqual(loginArgs, [])
967
968
969 def test_invalidBase64EncodedInitialResponse(self):
970 """
971 Like L{test_invalidBase64EncodedResponse} but for the case of an
972 initial response included with the C{AUTH} command.
973 """
974 loginArgs = []
975 self.server.portal = self.portalFactory(loginArgs)
976
977 self.server.dataReceived('EHLO\r\n')
978 self.assertServerResponse(
979 'AUTH LOGIN x\r\n',
980 ['501 Syntax error in parameters or arguments'])
981
982 self.assertEqual(loginArgs, [])
OLDNEW
« no previous file with comments | « third_party/twisted_8_1/twisted/mail/test/test_pop3client.py ('k') | third_party/twisted_8_1/twisted/mail/topfiles/NEWS » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698