Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(71)

Side by Side Diff: third_party/twisted_8_1/twisted/mail/test/test_pop3.py

Issue 12261012: Remove third_party/twisted_8_1 (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/tools/build
Patch Set: Created 7 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 # Copyright (c) 2001-2004 Twisted Matrix Laboratories.
2 # See LICENSE for details.
3
4
5 """
6 Test cases for twisted.mail.pop3 module.
7 """
8
9 import StringIO
10 import string
11 import hmac
12 import base64
13 import itertools
14
15 from zope.interface import implements
16
17 from twisted.internet import defer
18
19 from twisted.trial import unittest, util
20 from twisted import mail
21 import twisted.mail.protocols
22 import twisted.mail.pop3
23 import twisted.internet.protocol
24 from twisted import internet
25 from twisted.mail import pop3
26 from twisted.protocols import loopback
27 from twisted.python import failure
28
29 from twisted import cred
30 import twisted.cred.portal
31 import twisted.cred.checkers
32 import twisted.cred.credentials
33
34 from twisted.test.proto_helpers import LineSendingProtocol
35
36
37 class UtilityTestCase(unittest.TestCase):
38 """
39 Test the various helper functions and classes used by the POP3 server
40 protocol implementation.
41 """
42
43 def testLineBuffering(self):
44 """
45 Test creating a LineBuffer and feeding it some lines. The lines should
46 build up in its internal buffer for a while and then get spat out to
47 the writer.
48 """
49 output = []
50 input = iter(itertools.cycle(['012', '345', '6', '7', '8', '9']))
51 c = pop3._IteratorBuffer(output.extend, input, 6)
52 i = iter(c)
53 self.assertEquals(output, []) # nothing is buffer
54 i.next()
55 self.assertEquals(output, []) # '012' is buffered
56 i.next()
57 self.assertEquals(output, []) # '012345' is buffered
58 i.next()
59 self.assertEquals(output, ['012', '345', '6']) # nothing is buffered
60 for n in range(5):
61 i.next()
62 self.assertEquals(output, ['012', '345', '6', '7', '8', '9', '012', '345 '])
63
64
65 def testFinishLineBuffering(self):
66 """
67 Test that a LineBuffer flushes everything when its iterator is
68 exhausted, and itself raises StopIteration.
69 """
70 output = []
71 input = iter(['a', 'b', 'c'])
72 c = pop3._IteratorBuffer(output.extend, input, 5)
73 for i in c:
74 pass
75 self.assertEquals(output, ['a', 'b', 'c'])
76
77
78 def testSuccessResponseFormatter(self):
79 """
80 Test that the thing that spits out POP3 'success responses' works
81 right.
82 """
83 self.assertEquals(
84 pop3.successResponse('Great.'),
85 '+OK Great.\r\n')
86
87
88 def testStatLineFormatter(self):
89 """
90 Test that the function which formats stat lines does so appropriately.
91 """
92 statLine = list(pop3.formatStatResponse([]))[-1]
93 self.assertEquals(statLine, '+OK 0 0\r\n')
94
95 statLine = list(pop3.formatStatResponse([10, 31, 0, 10101]))[-1]
96 self.assertEquals(statLine, '+OK 4 10142\r\n')
97
98
99 def testListLineFormatter(self):
100 """
101 Test that the function which formats the lines in response to a LIST
102 command does so appropriately.
103 """
104 listLines = list(pop3.formatListResponse([]))
105 self.assertEquals(
106 listLines,
107 ['+OK 0\r\n', '.\r\n'])
108
109 listLines = list(pop3.formatListResponse([1, 2, 3, 100]))
110 self.assertEquals(
111 listLines,
112 ['+OK 4\r\n', '1 1\r\n', '2 2\r\n', '3 3\r\n', '4 100\r\n', '.\r\n'] )
113
114
115
116 def testUIDListLineFormatter(self):
117 """
118 Test that the function which formats lines in response to a UIDL
119 command does so appropriately.
120 """
121 UIDs = ['abc', 'def', 'ghi']
122 listLines = list(pop3.formatUIDListResponse([], UIDs.__getitem__))
123 self.assertEquals(
124 listLines,
125 ['+OK \r\n', '.\r\n'])
126
127 listLines = list(pop3.formatUIDListResponse([123, 431, 591], UIDs.__geti tem__))
128 self.assertEquals(
129 listLines,
130 ['+OK \r\n', '1 abc\r\n', '2 def\r\n', '3 ghi\r\n', '.\r\n'])
131
132 listLines = list(pop3.formatUIDListResponse([0, None, 591], UIDs.__getit em__))
133 self.assertEquals(
134 listLines,
135 ['+OK \r\n', '1 abc\r\n', '3 ghi\r\n', '.\r\n'])
136
137
138
139 class MyVirtualPOP3(mail.protocols.VirtualPOP3):
140
141 magic = '<moshez>'
142
143 def authenticateUserAPOP(self, user, digest):
144 user, domain = self.lookupDomain(user)
145 return self.service.domains['baz.com'].authenticateUserAPOP(user, digest , self.magic, domain)
146
147 class DummyDomain:
148
149 def __init__(self):
150 self.users = {}
151
152 def addUser(self, name):
153 self.users[name] = []
154
155 def addMessage(self, name, message):
156 self.users[name].append(message)
157
158 def authenticateUserAPOP(self, name, digest, magic, domain):
159 return pop3.IMailbox, ListMailbox(self.users[name]), lambda: None
160
161
162 class ListMailbox:
163
164 def __init__(self, list):
165 self.list = list
166
167 def listMessages(self, i=None):
168 if i is None:
169 return map(len, self.list)
170 return len(self.list[i])
171
172 def getMessage(self, i):
173 return StringIO.StringIO(self.list[i])
174
175 def getUidl(self, i):
176 return i
177
178 def deleteMessage(self, i):
179 self.list[i] = ''
180
181 def sync(self):
182 pass
183
184 class MyPOP3Downloader(pop3.POP3Client):
185
186 def handle_WELCOME(self, line):
187 pop3.POP3Client.handle_WELCOME(self, line)
188 self.apop('hello@baz.com', 'world')
189
190 def handle_APOP(self, line):
191 parts = line.split()
192 code = parts[0]
193 data = (parts[1:] or ['NONE'])[0]
194 if code != '+OK':
195 print parts
196 raise AssertionError, 'code is ' + code
197 self.lines = []
198 self.retr(1)
199
200 def handle_RETR_continue(self, line):
201 self.lines.append(line)
202
203 def handle_RETR_end(self):
204 self.message = '\n'.join(self.lines) + '\n'
205 self.quit()
206
207 def handle_QUIT(self, line):
208 if line[:3] != '+OK':
209 raise AssertionError, 'code is ' + line
210
211
212 class POP3TestCase(unittest.TestCase):
213
214 message = '''\
215 Subject: urgent
216
217 Someone set up us the bomb!
218 '''
219
220 expectedOutput = '''\
221 +OK <moshez>\015
222 +OK Authentication succeeded\015
223 +OK \015
224 1 0\015
225 .\015
226 +OK %d\015
227 Subject: urgent\015
228 \015
229 Someone set up us the bomb!\015
230 .\015
231 +OK \015
232 ''' % len(message)
233
234 def setUp(self):
235 self.factory = internet.protocol.Factory()
236 self.factory.domains = {}
237 self.factory.domains['baz.com'] = DummyDomain()
238 self.factory.domains['baz.com'].addUser('hello')
239 self.factory.domains['baz.com'].addMessage('hello', self.message)
240
241 def testMessages(self):
242 client = LineSendingProtocol([
243 'APOP hello@baz.com world',
244 'UIDL',
245 'RETR 1',
246 'QUIT',
247 ])
248 server = MyVirtualPOP3()
249 server.service = self.factory
250 def check(ignored):
251 output = '\r\n'.join(client.response) + '\r\n'
252 self.assertEquals(output, self.expectedOutput)
253 return loopback.loopbackTCP(server, client).addCallback(check)
254
255 def testLoopback(self):
256 protocol = MyVirtualPOP3()
257 protocol.service = self.factory
258 clientProtocol = MyPOP3Downloader()
259 def check(ignored):
260 self.failUnlessEqual(clientProtocol.message, self.message)
261 protocol.connectionLost(
262 failure.Failure(Exception("Test harness disconnect")))
263 d = loopback.loopbackAsync(protocol, clientProtocol)
264 return d.addCallback(check)
265 testLoopback.suppress = [util.suppress(message="twisted.mail.pop3.POP3Client is deprecated")]
266
267
268
269 class DummyPOP3(pop3.POP3):
270
271 magic = '<moshez>'
272
273 def authenticateUserAPOP(self, user, password):
274 return pop3.IMailbox, DummyMailbox(ValueError), lambda: None
275
276
277
278 class DummyMailbox(pop3.Mailbox):
279
280 messages = [
281 '''\
282 From: moshe
283 To: moshe
284
285 How are you, friend?
286 ''']
287
288 def __init__(self, exceptionType):
289 self.messages = DummyMailbox.messages[:]
290 self.exceptionType = exceptionType
291
292 def listMessages(self, i=None):
293 if i is None:
294 return map(len, self.messages)
295 if i >= len(self.messages):
296 raise self.exceptionType()
297 return len(self.messages[i])
298
299 def getMessage(self, i):
300 return StringIO.StringIO(self.messages[i])
301
302 def getUidl(self, i):
303 if i >= len(self.messages):
304 raise self.exceptionType()
305 return str(i)
306
307 def deleteMessage(self, i):
308 self.messages[i] = ''
309
310
311 class AnotherPOP3TestCase(unittest.TestCase):
312
313 def runTest(self, lines):
314 dummy = DummyPOP3()
315 client = LineSendingProtocol([
316 "APOP moshez dummy",
317 "LIST",
318 "UIDL",
319 "RETR 1",
320 "RETR 2",
321 "DELE 1",
322 "RETR 1",
323 "QUIT",
324 ])
325 d = loopback.loopbackAsync(dummy, client)
326 return d.addCallback(self._cbRunTest, client, dummy)
327
328 def _cbRunTest(self, ignored, client, dummy):
329 expected_output = [
330 '+OK <moshez>',
331 '+OK Authentication succeeded',
332 '+OK 1',
333 '1 44',
334 '.',
335 '+OK ',
336 '1 0',
337 '.',
338 '+OK 44',
339 'From: moshe',
340 'To: moshe',
341 '',
342 'How are you, friend?',
343 '',
344 '.',
345 '-ERR Bad message number argument',
346 '+OK ',
347 '-ERR message deleted',
348 '+OK ',
349 ''
350 ]
351 self.failUnlessEqual('\r\n'.join(expected_output),
352 '\r\n'.join(client.response) + '\r\n')
353 dummy.connectionLost(failure.Failure(Exception("Test harness disconnect" )))
354 return ignored
355
356
357 def testBuffer(self):
358 lines = string.split('''\
359 APOP moshez dummy
360 LIST
361 UIDL
362 RETR 1
363 RETR 2
364 DELE 1
365 RETR 1
366 QUIT''', '\n')
367 return self.runTest(lines)
368
369 def testNoop(self):
370 lines = ['APOP spiv dummy', 'NOOP', 'QUIT']
371 return self.runTest(lines)
372
373 def testAuthListing(self):
374 p = DummyPOP3()
375 p.factory = internet.protocol.Factory()
376 p.factory.challengers = {'Auth1': None, 'secondAuth': None, 'authLast': None}
377 client = LineSendingProtocol([
378 "AUTH",
379 "QUIT",
380 ])
381
382 d = loopback.loopbackAsync(p, client)
383 return d.addCallback(self._cbTestAuthListing, client)
384
385 def _cbTestAuthListing(self, ignored, client):
386 self.failUnless(client.response[1].startswith('+OK'))
387 self.assertEquals(client.response[2:6],
388 ["AUTH1", "SECONDAUTH", "AUTHLAST", "."])
389
390 def testIllegalPASS(self):
391 dummy = DummyPOP3()
392 client = LineSendingProtocol([
393 "PASS fooz",
394 "QUIT"
395 ])
396 d = loopback.loopbackAsync(dummy, client)
397 return d.addCallback(self._cbTestIllegalPASS, client, dummy)
398
399 def _cbTestIllegalPASS(self, ignored, client, dummy):
400 expected_output = '+OK <moshez>\r\n-ERR USER required before PASS\r\n+OK \r\n'
401 self.failUnlessEqual(expected_output, '\r\n'.join(client.response) + '\r \n')
402 dummy.connectionLost(failure.Failure(Exception("Test harness disconnect" )))
403
404 def testEmptyPASS(self):
405 dummy = DummyPOP3()
406 client = LineSendingProtocol([
407 "PASS ",
408 "QUIT"
409 ])
410 d = loopback.loopbackAsync(dummy, client)
411 return d.addCallback(self._cbTestEmptyPASS, client, dummy)
412
413 def _cbTestEmptyPASS(self, ignored, client, dummy):
414 expected_output = '+OK <moshez>\r\n-ERR USER required before PASS\r\n+OK \r\n'
415 self.failUnlessEqual(expected_output, '\r\n'.join(client.response) + '\r \n')
416 dummy.connectionLost(failure.Failure(Exception("Test harness disconnect" )))
417
418
419 class TestServerFactory:
420 implements(pop3.IServerFactory)
421
422 def cap_IMPLEMENTATION(self):
423 return "Test Implementation String"
424
425 def cap_EXPIRE(self):
426 return 60
427
428 challengers = {"SCHEME_1": None, "SCHEME_2": None}
429
430 def cap_LOGIN_DELAY(self):
431 return 120
432
433 pue = True
434 def perUserExpiration(self):
435 return self.pue
436
437 puld = True
438 def perUserLoginDelay(self):
439 return self.puld
440
441
442 class TestMailbox:
443 loginDelay = 100
444 messageExpiration = 25
445
446
447 class CapabilityTestCase(unittest.TestCase):
448 def setUp(self):
449 s = StringIO.StringIO()
450 p = pop3.POP3()
451 p.factory = TestServerFactory()
452 p.transport = internet.protocol.FileWrapper(s)
453 p.connectionMade()
454 p.do_CAPA()
455
456 self.caps = p.listCapabilities()
457 self.pcaps = s.getvalue().splitlines()
458
459 s = StringIO.StringIO()
460 p.mbox = TestMailbox()
461 p.transport = internet.protocol.FileWrapper(s)
462 p.do_CAPA()
463
464 self.lpcaps = s.getvalue().splitlines()
465 p.connectionLost(failure.Failure(Exception("Test harness disconnect")))
466
467 def contained(self, s, *caps):
468 for c in caps:
469 self.assertIn(s, c)
470
471 def testUIDL(self):
472 self.contained("UIDL", self.caps, self.pcaps, self.lpcaps)
473
474 def testTOP(self):
475 self.contained("TOP", self.caps, self.pcaps, self.lpcaps)
476
477 def testUSER(self):
478 self.contained("USER", self.caps, self.pcaps, self.lpcaps)
479
480 def testEXPIRE(self):
481 self.contained("EXPIRE 60 USER", self.caps, self.pcaps)
482 self.contained("EXPIRE 25", self.lpcaps)
483
484 def testIMPLEMENTATION(self):
485 self.contained(
486 "IMPLEMENTATION Test Implementation String",
487 self.caps, self.pcaps, self.lpcaps
488 )
489
490 def testSASL(self):
491 self.contained(
492 "SASL SCHEME_1 SCHEME_2",
493 self.caps, self.pcaps, self.lpcaps
494 )
495
496 def testLOGIN_DELAY(self):
497 self.contained("LOGIN-DELAY 120 USER", self.caps, self.pcaps)
498 self.assertIn("LOGIN-DELAY 100", self.lpcaps)
499
500
501
502 class GlobalCapabilitiesTestCase(unittest.TestCase):
503 def setUp(self):
504 s = StringIO.StringIO()
505 p = pop3.POP3()
506 p.factory = TestServerFactory()
507 p.factory.pue = p.factory.puld = False
508 p.transport = internet.protocol.FileWrapper(s)
509 p.connectionMade()
510 p.do_CAPA()
511
512 self.caps = p.listCapabilities()
513 self.pcaps = s.getvalue().splitlines()
514
515 s = StringIO.StringIO()
516 p.mbox = TestMailbox()
517 p.transport = internet.protocol.FileWrapper(s)
518 p.do_CAPA()
519
520 self.lpcaps = s.getvalue().splitlines()
521 p.connectionLost(failure.Failure(Exception("Test harness disconnect")))
522
523 def contained(self, s, *caps):
524 for c in caps:
525 self.assertIn(s, c)
526
527 def testEXPIRE(self):
528 self.contained("EXPIRE 60", self.caps, self.pcaps, self.lpcaps)
529
530 def testLOGIN_DELAY(self):
531 self.contained("LOGIN-DELAY 120", self.caps, self.pcaps, self.lpcaps)
532
533
534
535 class TestRealm:
536 def requestAvatar(self, avatarId, mind, *interfaces):
537 if avatarId == 'testuser':
538 return pop3.IMailbox, DummyMailbox(ValueError), lambda: None
539 assert False
540
541
542
543 class SASLTestCase(unittest.TestCase):
544 def testValidLogin(self):
545 p = pop3.POP3()
546 p.factory = TestServerFactory()
547 p.factory.challengers = {'CRAM-MD5': cred.credentials.CramMD5Credentials }
548 p.portal = cred.portal.Portal(TestRealm())
549 ch = cred.checkers.InMemoryUsernamePasswordDatabaseDontUse()
550 ch.addUser('testuser', 'testpassword')
551 p.portal.registerChecker(ch)
552
553 s = StringIO.StringIO()
554 p.transport = internet.protocol.FileWrapper(s)
555 p.connectionMade()
556
557 p.lineReceived("CAPA")
558 self.failUnless(s.getvalue().find("SASL CRAM-MD5") >= 0)
559
560 p.lineReceived("AUTH CRAM-MD5")
561 chal = s.getvalue().splitlines()[-1][2:]
562 chal = base64.decodestring(chal)
563 response = hmac.HMAC('testpassword', chal).hexdigest()
564
565 p.lineReceived(base64.encodestring('testuser ' + response).rstrip('\n'))
566 self.failUnless(p.mbox)
567 self.failUnless(s.getvalue().splitlines()[-1].find("+OK") >= 0)
568 p.connectionLost(failure.Failure(Exception("Test harness disconnect")))
569
570
571
572 class CommandMixin:
573 """
574 Tests for all the commands a POP3 server is allowed to receive.
575 """
576
577 extraMessage = '''\
578 From: guy
579 To: fellow
580
581 More message text for you.
582 '''
583
584
585 def setUp(self):
586 """
587 Make a POP3 server protocol instance hooked up to a simple mailbox and
588 a transport that buffers output to a StringIO.
589 """
590 p = pop3.POP3()
591 p.mbox = self.mailboxType(self.exceptionType)
592 p.schedule = list
593 self.pop3Server = p
594
595 s = StringIO.StringIO()
596 p.transport = internet.protocol.FileWrapper(s)
597 p.connectionMade()
598 s.truncate(0)
599 self.pop3Transport = s
600
601
602 def tearDown(self):
603 """
604 Disconnect the server protocol so it can clean up anything it might
605 need to clean up.
606 """
607 self.pop3Server.connectionLost(failure.Failure(Exception("Test harness d isconnect")))
608
609
610 def _flush(self):
611 """
612 Do some of the things that the reactor would take care of, if the
613 reactor were actually running.
614 """
615 # Oh man FileWrapper is pooh.
616 self.pop3Server.transport._checkProducer()
617
618
619 def testLIST(self):
620 """
621 Test the two forms of list: with a message index number, which should
622 return a short-form response, and without a message index number, which
623 should return a long-form response, one line per message.
624 """
625 p = self.pop3Server
626 s = self.pop3Transport
627
628 p.lineReceived("LIST 1")
629 self._flush()
630 self.assertEquals(s.getvalue(), "+OK 1 44\r\n")
631 s.truncate(0)
632
633 p.lineReceived("LIST")
634 self._flush()
635 self.assertEquals(s.getvalue(), "+OK 1\r\n1 44\r\n.\r\n")
636
637
638 def testLISTWithBadArgument(self):
639 """
640 Test that non-integers and out-of-bound integers produce appropriate
641 error responses.
642 """
643 p = self.pop3Server
644 s = self.pop3Transport
645
646 p.lineReceived("LIST a")
647 self.assertEquals(
648 s.getvalue(),
649 "-ERR Invalid message-number: 'a'\r\n")
650 s.truncate(0)
651
652 p.lineReceived("LIST 0")
653 self.assertEquals(
654 s.getvalue(),
655 "-ERR Invalid message-number: 0\r\n")
656 s.truncate(0)
657
658 p.lineReceived("LIST 2")
659 self.assertEquals(
660 s.getvalue(),
661 "-ERR Invalid message-number: 2\r\n")
662 s.truncate(0)
663
664
665 def testUIDL(self):
666 """
667 Test the two forms of the UIDL command. These are just like the two
668 forms of the LIST command.
669 """
670 p = self.pop3Server
671 s = self.pop3Transport
672
673 p.lineReceived("UIDL 1")
674 self.assertEquals(s.getvalue(), "+OK 0\r\n")
675 s.truncate(0)
676
677 p.lineReceived("UIDL")
678 self._flush()
679 self.assertEquals(s.getvalue(), "+OK \r\n1 0\r\n.\r\n")
680
681
682 def testUIDLWithBadArgument(self):
683 """
684 Test that UIDL with a non-integer or an out-of-bounds integer produces
685 the appropriate error response.
686 """
687 p = self.pop3Server
688 s = self.pop3Transport
689
690 p.lineReceived("UIDL a")
691 self.assertEquals(
692 s.getvalue(),
693 "-ERR Bad message number argument\r\n")
694 s.truncate(0)
695
696 p.lineReceived("UIDL 0")
697 self.assertEquals(
698 s.getvalue(),
699 "-ERR Bad message number argument\r\n")
700 s.truncate(0)
701
702 p.lineReceived("UIDL 2")
703 self.assertEquals(
704 s.getvalue(),
705 "-ERR Bad message number argument\r\n")
706 s.truncate(0)
707
708
709 def testSTAT(self):
710 """
711 Test the single form of the STAT command, which returns a short-form
712 response of the number of messages in the mailbox and their total size.
713 """
714 p = self.pop3Server
715 s = self.pop3Transport
716
717 p.lineReceived("STAT")
718 self._flush()
719 self.assertEquals(s.getvalue(), "+OK 1 44\r\n")
720
721
722 def testRETR(self):
723 """
724 Test downloading a message.
725 """
726 p = self.pop3Server
727 s = self.pop3Transport
728
729 p.lineReceived("RETR 1")
730 self._flush()
731 self.assertEquals(
732 s.getvalue(),
733 "+OK 44\r\n"
734 "From: moshe\r\n"
735 "To: moshe\r\n"
736 "\r\n"
737 "How are you, friend?\r\n"
738 ".\r\n")
739 s.truncate(0)
740
741
742 def testRETRWithBadArgument(self):
743 """
744 Test that trying to download a message with a bad argument, either not
745 an integer or an out-of-bounds integer, fails with the appropriate
746 error response.
747 """
748 p = self.pop3Server
749 s = self.pop3Transport
750
751 p.lineReceived("RETR a")
752 self.assertEquals(
753 s.getvalue(),
754 "-ERR Bad message number argument\r\n")
755 s.truncate(0)
756
757 p.lineReceived("RETR 0")
758 self.assertEquals(
759 s.getvalue(),
760 "-ERR Bad message number argument\r\n")
761 s.truncate(0)
762
763 p.lineReceived("RETR 2")
764 self.assertEquals(
765 s.getvalue(),
766 "-ERR Bad message number argument\r\n")
767 s.truncate(0)
768
769
770 def testTOP(self):
771 """
772 Test downloading the headers and part of the body of a message.
773 """
774 p = self.pop3Server
775 s = self.pop3Transport
776 p.mbox.messages.append(self.extraMessage)
777
778 p.lineReceived("TOP 1 0")
779 self._flush()
780 self.assertEquals(
781 s.getvalue(),
782 "+OK Top of message follows\r\n"
783 "From: moshe\r\n"
784 "To: moshe\r\n"
785 "\r\n"
786 ".\r\n")
787
788
789 def testTOPWithBadArgument(self):
790 """
791 Test that trying to download a message with a bad argument, either a
792 message number which isn't an integer or is an out-of-bounds integer or
793 a number of lines which isn't an integer or is a negative integer,
794 fails with the appropriate error response.
795 """
796 p = self.pop3Server
797 s = self.pop3Transport
798 p.mbox.messages.append(self.extraMessage)
799
800 p.lineReceived("TOP 1 a")
801 self.assertEquals(
802 s.getvalue(),
803 "-ERR Bad line count argument\r\n")
804 s.truncate(0)
805
806 p.lineReceived("TOP 1 -1")
807 self.assertEquals(
808 s.getvalue(),
809 "-ERR Bad line count argument\r\n")
810 s.truncate(0)
811
812 p.lineReceived("TOP a 1")
813 self.assertEquals(
814 s.getvalue(),
815 "-ERR Bad message number argument\r\n")
816 s.truncate(0)
817
818 p.lineReceived("TOP 0 1")
819 self.assertEquals(
820 s.getvalue(),
821 "-ERR Bad message number argument\r\n")
822 s.truncate(0)
823
824 p.lineReceived("TOP 3 1")
825 self.assertEquals(
826 s.getvalue(),
827 "-ERR Bad message number argument\r\n")
828 s.truncate(0)
829
830
831 def testLAST(self):
832 """
833 Test the exceedingly pointless LAST command, which tells you the
834 highest message index which you have already downloaded.
835 """
836 p = self.pop3Server
837 s = self.pop3Transport
838 p.mbox.messages.append(self.extraMessage)
839
840 p.lineReceived('LAST')
841 self.assertEquals(
842 s.getvalue(),
843 "+OK 0\r\n")
844 s.truncate(0)
845
846
847 def testRetrieveUpdatesHighest(self):
848 """
849 Test that issuing a RETR command updates the LAST response.
850 """
851 p = self.pop3Server
852 s = self.pop3Transport
853 p.mbox.messages.append(self.extraMessage)
854
855 p.lineReceived('RETR 2')
856 self._flush()
857 s.truncate(0)
858 p.lineReceived('LAST')
859 self.assertEquals(
860 s.getvalue(),
861 '+OK 2\r\n')
862 s.truncate(0)
863
864
865 def testTopUpdatesHighest(self):
866 """
867 Test that issuing a TOP command updates the LAST response.
868 """
869 p = self.pop3Server
870 s = self.pop3Transport
871 p.mbox.messages.append(self.extraMessage)
872
873 p.lineReceived('TOP 2 10')
874 self._flush()
875 s.truncate(0)
876 p.lineReceived('LAST')
877 self.assertEquals(
878 s.getvalue(),
879 '+OK 2\r\n')
880
881
882 def testHighestOnlyProgresses(self):
883 """
884 Test that downloading a message with a smaller index than the current
885 LAST response doesn't change the LAST response.
886 """
887 p = self.pop3Server
888 s = self.pop3Transport
889 p.mbox.messages.append(self.extraMessage)
890
891 p.lineReceived('RETR 2')
892 self._flush()
893 p.lineReceived('TOP 1 10')
894 self._flush()
895 s.truncate(0)
896 p.lineReceived('LAST')
897 self.assertEquals(
898 s.getvalue(),
899 '+OK 2\r\n')
900
901
902 def testResetClearsHighest(self):
903 """
904 Test that issuing RSET changes the LAST response to 0.
905 """
906 p = self.pop3Server
907 s = self.pop3Transport
908 p.mbox.messages.append(self.extraMessage)
909
910 p.lineReceived('RETR 2')
911 self._flush()
912 p.lineReceived('RSET')
913 s.truncate(0)
914 p.lineReceived('LAST')
915 self.assertEquals(
916 s.getvalue(),
917 '+OK 0\r\n')
918
919
920
921 _listMessageDeprecation = (
922 "twisted.mail.pop3.IMailbox.listMessages may not "
923 "raise IndexError for out-of-bounds message numbers: "
924 "raise ValueError instead.")
925 _listMessageSuppression = util.suppress(
926 message=_listMessageDeprecation,
927 category=PendingDeprecationWarning)
928
929 _getUidlDeprecation = (
930 "twisted.mail.pop3.IMailbox.getUidl may not "
931 "raise IndexError for out-of-bounds message numbers: "
932 "raise ValueError instead.")
933 _getUidlSuppression = util.suppress(
934 message=_getUidlDeprecation,
935 category=PendingDeprecationWarning)
936
937 class IndexErrorCommandTestCase(CommandMixin, unittest.TestCase):
938 """
939 Run all of the command tests against a mailbox which raises IndexError
940 when an out of bounds request is made. This behavior will be deprecated
941 shortly and then removed.
942 """
943 exceptionType = IndexError
944 mailboxType = DummyMailbox
945
946 def testLISTWithBadArgument(self):
947 return CommandMixin.testLISTWithBadArgument(self)
948 testLISTWithBadArgument.suppress = [_listMessageSuppression]
949
950
951 def testUIDLWithBadArgument(self):
952 return CommandMixin.testUIDLWithBadArgument(self)
953 testUIDLWithBadArgument.suppress = [_getUidlSuppression]
954
955
956 def testTOPWithBadArgument(self):
957 return CommandMixin.testTOPWithBadArgument(self)
958 testTOPWithBadArgument.suppress = [_listMessageSuppression]
959
960
961 def testRETRWithBadArgument(self):
962 return CommandMixin.testRETRWithBadArgument(self)
963 testRETRWithBadArgument.suppress = [_listMessageSuppression]
964
965
966
967 class ValueErrorCommandTestCase(CommandMixin, unittest.TestCase):
968 """
969 Run all of the command tests against a mailbox which raises ValueError
970 when an out of bounds request is made. This is the correct behavior and
971 after support for mailboxes which raise IndexError is removed, this will
972 become just C{CommandTestCase}.
973 """
974 exceptionType = ValueError
975 mailboxType = DummyMailbox
976
977
978
979 class SyncDeferredMailbox(DummyMailbox):
980 """
981 Mailbox which has a listMessages implementation which returns a Deferred
982 which has already fired.
983 """
984 def listMessages(self, n=None):
985 return defer.succeed(DummyMailbox.listMessages(self, n))
986
987
988
989 class IndexErrorSyncDeferredCommandTestCase(IndexErrorCommandTestCase):
990 """
991 Run all of the L{IndexErrorCommandTestCase} tests with a
992 synchronous-Deferred returning IMailbox implementation.
993 """
994 mailboxType = SyncDeferredMailbox
995
996
997
998 class ValueErrorSyncDeferredCommandTestCase(ValueErrorCommandTestCase):
999 """
1000 Run all of the L{ValueErrorCommandTestCase} tests with a
1001 synchronous-Deferred returning IMailbox implementation.
1002 """
1003 mailboxType = SyncDeferredMailbox
1004
1005
1006
1007 class AsyncDeferredMailbox(DummyMailbox):
1008 """
1009 Mailbox which has a listMessages implementation which returns a Deferred
1010 which has not yet fired.
1011 """
1012 def __init__(self, *a, **kw):
1013 self.waiting = []
1014 DummyMailbox.__init__(self, *a, **kw)
1015
1016
1017 def listMessages(self, n=None):
1018 d = defer.Deferred()
1019 # See AsyncDeferredMailbox._flush
1020 self.waiting.append((d, DummyMailbox.listMessages(self, n)))
1021 return d
1022
1023
1024
1025 class IndexErrorAsyncDeferredCommandTestCase(IndexErrorCommandTestCase):
1026 """
1027 Run all of the L{IndexErrorCommandTestCase} tests with an asynchronous-Defer red
1028 returning IMailbox implementation.
1029 """
1030 mailboxType = AsyncDeferredMailbox
1031
1032 def _flush(self):
1033 """
1034 Fire whatever Deferreds we've built up in our mailbox.
1035 """
1036 while self.pop3Server.mbox.waiting:
1037 d, a = self.pop3Server.mbox.waiting.pop()
1038 d.callback(a)
1039 IndexErrorCommandTestCase._flush(self)
1040
1041
1042
1043 class ValueErrorAsyncDeferredCommandTestCase(ValueErrorCommandTestCase):
1044 """
1045 Run all of the L{IndexErrorCommandTestCase} tests with an asynchronous-Defer red
1046 returning IMailbox implementation.
1047 """
1048 mailboxType = AsyncDeferredMailbox
1049
1050 def _flush(self):
1051 """
1052 Fire whatever Deferreds we've built up in our mailbox.
1053 """
1054 while self.pop3Server.mbox.waiting:
1055 d, a = self.pop3Server.mbox.waiting.pop()
1056 d.callback(a)
1057 ValueErrorCommandTestCase._flush(self)
1058
1059 class POP3MiscTestCase(unittest.TestCase):
1060 """
1061 Miscellaneous tests more to do with module/package structure than
1062 anything to do with the Post Office Protocol.
1063 """
1064 def test_all(self):
1065 """
1066 This test checks that all names listed in
1067 twisted.mail.pop3.__all__ are actually present in the module.
1068 """
1069 mod = twisted.mail.pop3
1070 for attr in mod.__all__:
1071 self.failUnless(hasattr(mod, attr))
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698