Index: third_party/twisted_8_1/twisted/mail/test/test_pop3.py |
diff --git a/third_party/twisted_8_1/twisted/mail/test/test_pop3.py b/third_party/twisted_8_1/twisted/mail/test/test_pop3.py |
deleted file mode 100644 |
index 28302181b9e1c7d2bca30e6c93b530f870b84dc4..0000000000000000000000000000000000000000 |
--- a/third_party/twisted_8_1/twisted/mail/test/test_pop3.py |
+++ /dev/null |
@@ -1,1071 +0,0 @@ |
-# Copyright (c) 2001-2004 Twisted Matrix Laboratories. |
-# See LICENSE for details. |
- |
- |
-""" |
-Test cases for twisted.mail.pop3 module. |
-""" |
- |
-import StringIO |
-import string |
-import hmac |
-import base64 |
-import itertools |
- |
-from zope.interface import implements |
- |
-from twisted.internet import defer |
- |
-from twisted.trial import unittest, util |
-from twisted import mail |
-import twisted.mail.protocols |
-import twisted.mail.pop3 |
-import twisted.internet.protocol |
-from twisted import internet |
-from twisted.mail import pop3 |
-from twisted.protocols import loopback |
-from twisted.python import failure |
- |
-from twisted import cred |
-import twisted.cred.portal |
-import twisted.cred.checkers |
-import twisted.cred.credentials |
- |
-from twisted.test.proto_helpers import LineSendingProtocol |
- |
- |
-class UtilityTestCase(unittest.TestCase): |
- """ |
- Test the various helper functions and classes used by the POP3 server |
- protocol implementation. |
- """ |
- |
- def testLineBuffering(self): |
- """ |
- Test creating a LineBuffer and feeding it some lines. The lines should |
- build up in its internal buffer for a while and then get spat out to |
- the writer. |
- """ |
- output = [] |
- input = iter(itertools.cycle(['012', '345', '6', '7', '8', '9'])) |
- c = pop3._IteratorBuffer(output.extend, input, 6) |
- i = iter(c) |
- self.assertEquals(output, []) # nothing is buffer |
- i.next() |
- self.assertEquals(output, []) # '012' is buffered |
- i.next() |
- self.assertEquals(output, []) # '012345' is buffered |
- i.next() |
- self.assertEquals(output, ['012', '345', '6']) # nothing is buffered |
- for n in range(5): |
- i.next() |
- self.assertEquals(output, ['012', '345', '6', '7', '8', '9', '012', '345']) |
- |
- |
- def testFinishLineBuffering(self): |
- """ |
- Test that a LineBuffer flushes everything when its iterator is |
- exhausted, and itself raises StopIteration. |
- """ |
- output = [] |
- input = iter(['a', 'b', 'c']) |
- c = pop3._IteratorBuffer(output.extend, input, 5) |
- for i in c: |
- pass |
- self.assertEquals(output, ['a', 'b', 'c']) |
- |
- |
- def testSuccessResponseFormatter(self): |
- """ |
- Test that the thing that spits out POP3 'success responses' works |
- right. |
- """ |
- self.assertEquals( |
- pop3.successResponse('Great.'), |
- '+OK Great.\r\n') |
- |
- |
- def testStatLineFormatter(self): |
- """ |
- Test that the function which formats stat lines does so appropriately. |
- """ |
- statLine = list(pop3.formatStatResponse([]))[-1] |
- self.assertEquals(statLine, '+OK 0 0\r\n') |
- |
- statLine = list(pop3.formatStatResponse([10, 31, 0, 10101]))[-1] |
- self.assertEquals(statLine, '+OK 4 10142\r\n') |
- |
- |
- def testListLineFormatter(self): |
- """ |
- Test that the function which formats the lines in response to a LIST |
- command does so appropriately. |
- """ |
- listLines = list(pop3.formatListResponse([])) |
- self.assertEquals( |
- listLines, |
- ['+OK 0\r\n', '.\r\n']) |
- |
- listLines = list(pop3.formatListResponse([1, 2, 3, 100])) |
- self.assertEquals( |
- listLines, |
- ['+OK 4\r\n', '1 1\r\n', '2 2\r\n', '3 3\r\n', '4 100\r\n', '.\r\n']) |
- |
- |
- |
- def testUIDListLineFormatter(self): |
- """ |
- Test that the function which formats lines in response to a UIDL |
- command does so appropriately. |
- """ |
- UIDs = ['abc', 'def', 'ghi'] |
- listLines = list(pop3.formatUIDListResponse([], UIDs.__getitem__)) |
- self.assertEquals( |
- listLines, |
- ['+OK \r\n', '.\r\n']) |
- |
- listLines = list(pop3.formatUIDListResponse([123, 431, 591], UIDs.__getitem__)) |
- self.assertEquals( |
- listLines, |
- ['+OK \r\n', '1 abc\r\n', '2 def\r\n', '3 ghi\r\n', '.\r\n']) |
- |
- listLines = list(pop3.formatUIDListResponse([0, None, 591], UIDs.__getitem__)) |
- self.assertEquals( |
- listLines, |
- ['+OK \r\n', '1 abc\r\n', '3 ghi\r\n', '.\r\n']) |
- |
- |
- |
-class MyVirtualPOP3(mail.protocols.VirtualPOP3): |
- |
- magic = '<moshez>' |
- |
- def authenticateUserAPOP(self, user, digest): |
- user, domain = self.lookupDomain(user) |
- return self.service.domains['baz.com'].authenticateUserAPOP(user, digest, self.magic, domain) |
- |
-class DummyDomain: |
- |
- def __init__(self): |
- self.users = {} |
- |
- def addUser(self, name): |
- self.users[name] = [] |
- |
- def addMessage(self, name, message): |
- self.users[name].append(message) |
- |
- def authenticateUserAPOP(self, name, digest, magic, domain): |
- return pop3.IMailbox, ListMailbox(self.users[name]), lambda: None |
- |
- |
-class ListMailbox: |
- |
- def __init__(self, list): |
- self.list = list |
- |
- def listMessages(self, i=None): |
- if i is None: |
- return map(len, self.list) |
- return len(self.list[i]) |
- |
- def getMessage(self, i): |
- return StringIO.StringIO(self.list[i]) |
- |
- def getUidl(self, i): |
- return i |
- |
- def deleteMessage(self, i): |
- self.list[i] = '' |
- |
- def sync(self): |
- pass |
- |
-class MyPOP3Downloader(pop3.POP3Client): |
- |
- def handle_WELCOME(self, line): |
- pop3.POP3Client.handle_WELCOME(self, line) |
- self.apop('hello@baz.com', 'world') |
- |
- def handle_APOP(self, line): |
- parts = line.split() |
- code = parts[0] |
- data = (parts[1:] or ['NONE'])[0] |
- if code != '+OK': |
- print parts |
- raise AssertionError, 'code is ' + code |
- self.lines = [] |
- self.retr(1) |
- |
- def handle_RETR_continue(self, line): |
- self.lines.append(line) |
- |
- def handle_RETR_end(self): |
- self.message = '\n'.join(self.lines) + '\n' |
- self.quit() |
- |
- def handle_QUIT(self, line): |
- if line[:3] != '+OK': |
- raise AssertionError, 'code is ' + line |
- |
- |
-class POP3TestCase(unittest.TestCase): |
- |
- message = '''\ |
-Subject: urgent |
- |
-Someone set up us the bomb! |
-''' |
- |
- expectedOutput = '''\ |
-+OK <moshez>\015 |
-+OK Authentication succeeded\015 |
-+OK \015 |
-1 0\015 |
-.\015 |
-+OK %d\015 |
-Subject: urgent\015 |
-\015 |
-Someone set up us the bomb!\015 |
-.\015 |
-+OK \015 |
-''' % len(message) |
- |
- def setUp(self): |
- self.factory = internet.protocol.Factory() |
- self.factory.domains = {} |
- self.factory.domains['baz.com'] = DummyDomain() |
- self.factory.domains['baz.com'].addUser('hello') |
- self.factory.domains['baz.com'].addMessage('hello', self.message) |
- |
- def testMessages(self): |
- client = LineSendingProtocol([ |
- 'APOP hello@baz.com world', |
- 'UIDL', |
- 'RETR 1', |
- 'QUIT', |
- ]) |
- server = MyVirtualPOP3() |
- server.service = self.factory |
- def check(ignored): |
- output = '\r\n'.join(client.response) + '\r\n' |
- self.assertEquals(output, self.expectedOutput) |
- return loopback.loopbackTCP(server, client).addCallback(check) |
- |
- def testLoopback(self): |
- protocol = MyVirtualPOP3() |
- protocol.service = self.factory |
- clientProtocol = MyPOP3Downloader() |
- def check(ignored): |
- self.failUnlessEqual(clientProtocol.message, self.message) |
- protocol.connectionLost( |
- failure.Failure(Exception("Test harness disconnect"))) |
- d = loopback.loopbackAsync(protocol, clientProtocol) |
- return d.addCallback(check) |
- testLoopback.suppress = [util.suppress(message="twisted.mail.pop3.POP3Client is deprecated")] |
- |
- |
- |
-class DummyPOP3(pop3.POP3): |
- |
- magic = '<moshez>' |
- |
- def authenticateUserAPOP(self, user, password): |
- return pop3.IMailbox, DummyMailbox(ValueError), lambda: None |
- |
- |
- |
-class DummyMailbox(pop3.Mailbox): |
- |
- messages = [ |
-'''\ |
-From: moshe |
-To: moshe |
- |
-How are you, friend? |
-'''] |
- |
- def __init__(self, exceptionType): |
- self.messages = DummyMailbox.messages[:] |
- self.exceptionType = exceptionType |
- |
- def listMessages(self, i=None): |
- if i is None: |
- return map(len, self.messages) |
- if i >= len(self.messages): |
- raise self.exceptionType() |
- return len(self.messages[i]) |
- |
- def getMessage(self, i): |
- return StringIO.StringIO(self.messages[i]) |
- |
- def getUidl(self, i): |
- if i >= len(self.messages): |
- raise self.exceptionType() |
- return str(i) |
- |
- def deleteMessage(self, i): |
- self.messages[i] = '' |
- |
- |
-class AnotherPOP3TestCase(unittest.TestCase): |
- |
- def runTest(self, lines): |
- dummy = DummyPOP3() |
- client = LineSendingProtocol([ |
- "APOP moshez dummy", |
- "LIST", |
- "UIDL", |
- "RETR 1", |
- "RETR 2", |
- "DELE 1", |
- "RETR 1", |
- "QUIT", |
- ]) |
- d = loopback.loopbackAsync(dummy, client) |
- return d.addCallback(self._cbRunTest, client, dummy) |
- |
- def _cbRunTest(self, ignored, client, dummy): |
- expected_output = [ |
- '+OK <moshez>', |
- '+OK Authentication succeeded', |
- '+OK 1', |
- '1 44', |
- '.', |
- '+OK ', |
- '1 0', |
- '.', |
- '+OK 44', |
- 'From: moshe', |
- 'To: moshe', |
- '', |
- 'How are you, friend?', |
- '', |
- '.', |
- '-ERR Bad message number argument', |
- '+OK ', |
- '-ERR message deleted', |
- '+OK ', |
- '' |
- ] |
- self.failUnlessEqual('\r\n'.join(expected_output), |
- '\r\n'.join(client.response) + '\r\n') |
- dummy.connectionLost(failure.Failure(Exception("Test harness disconnect"))) |
- return ignored |
- |
- |
- def testBuffer(self): |
- lines = string.split('''\ |
-APOP moshez dummy |
-LIST |
-UIDL |
-RETR 1 |
-RETR 2 |
-DELE 1 |
-RETR 1 |
-QUIT''', '\n') |
- return self.runTest(lines) |
- |
- def testNoop(self): |
- lines = ['APOP spiv dummy', 'NOOP', 'QUIT'] |
- return self.runTest(lines) |
- |
- def testAuthListing(self): |
- p = DummyPOP3() |
- p.factory = internet.protocol.Factory() |
- p.factory.challengers = {'Auth1': None, 'secondAuth': None, 'authLast': None} |
- client = LineSendingProtocol([ |
- "AUTH", |
- "QUIT", |
- ]) |
- |
- d = loopback.loopbackAsync(p, client) |
- return d.addCallback(self._cbTestAuthListing, client) |
- |
- def _cbTestAuthListing(self, ignored, client): |
- self.failUnless(client.response[1].startswith('+OK')) |
- self.assertEquals(client.response[2:6], |
- ["AUTH1", "SECONDAUTH", "AUTHLAST", "."]) |
- |
- def testIllegalPASS(self): |
- dummy = DummyPOP3() |
- client = LineSendingProtocol([ |
- "PASS fooz", |
- "QUIT" |
- ]) |
- d = loopback.loopbackAsync(dummy, client) |
- return d.addCallback(self._cbTestIllegalPASS, client, dummy) |
- |
- def _cbTestIllegalPASS(self, ignored, client, dummy): |
- expected_output = '+OK <moshez>\r\n-ERR USER required before PASS\r\n+OK \r\n' |
- self.failUnlessEqual(expected_output, '\r\n'.join(client.response) + '\r\n') |
- dummy.connectionLost(failure.Failure(Exception("Test harness disconnect"))) |
- |
- def testEmptyPASS(self): |
- dummy = DummyPOP3() |
- client = LineSendingProtocol([ |
- "PASS ", |
- "QUIT" |
- ]) |
- d = loopback.loopbackAsync(dummy, client) |
- return d.addCallback(self._cbTestEmptyPASS, client, dummy) |
- |
- def _cbTestEmptyPASS(self, ignored, client, dummy): |
- expected_output = '+OK <moshez>\r\n-ERR USER required before PASS\r\n+OK \r\n' |
- self.failUnlessEqual(expected_output, '\r\n'.join(client.response) + '\r\n') |
- dummy.connectionLost(failure.Failure(Exception("Test harness disconnect"))) |
- |
- |
-class TestServerFactory: |
- implements(pop3.IServerFactory) |
- |
- def cap_IMPLEMENTATION(self): |
- return "Test Implementation String" |
- |
- def cap_EXPIRE(self): |
- return 60 |
- |
- challengers = {"SCHEME_1": None, "SCHEME_2": None} |
- |
- def cap_LOGIN_DELAY(self): |
- return 120 |
- |
- pue = True |
- def perUserExpiration(self): |
- return self.pue |
- |
- puld = True |
- def perUserLoginDelay(self): |
- return self.puld |
- |
- |
-class TestMailbox: |
- loginDelay = 100 |
- messageExpiration = 25 |
- |
- |
-class CapabilityTestCase(unittest.TestCase): |
- def setUp(self): |
- s = StringIO.StringIO() |
- p = pop3.POP3() |
- p.factory = TestServerFactory() |
- p.transport = internet.protocol.FileWrapper(s) |
- p.connectionMade() |
- p.do_CAPA() |
- |
- self.caps = p.listCapabilities() |
- self.pcaps = s.getvalue().splitlines() |
- |
- s = StringIO.StringIO() |
- p.mbox = TestMailbox() |
- p.transport = internet.protocol.FileWrapper(s) |
- p.do_CAPA() |
- |
- self.lpcaps = s.getvalue().splitlines() |
- p.connectionLost(failure.Failure(Exception("Test harness disconnect"))) |
- |
- def contained(self, s, *caps): |
- for c in caps: |
- self.assertIn(s, c) |
- |
- def testUIDL(self): |
- self.contained("UIDL", self.caps, self.pcaps, self.lpcaps) |
- |
- def testTOP(self): |
- self.contained("TOP", self.caps, self.pcaps, self.lpcaps) |
- |
- def testUSER(self): |
- self.contained("USER", self.caps, self.pcaps, self.lpcaps) |
- |
- def testEXPIRE(self): |
- self.contained("EXPIRE 60 USER", self.caps, self.pcaps) |
- self.contained("EXPIRE 25", self.lpcaps) |
- |
- def testIMPLEMENTATION(self): |
- self.contained( |
- "IMPLEMENTATION Test Implementation String", |
- self.caps, self.pcaps, self.lpcaps |
- ) |
- |
- def testSASL(self): |
- self.contained( |
- "SASL SCHEME_1 SCHEME_2", |
- self.caps, self.pcaps, self.lpcaps |
- ) |
- |
- def testLOGIN_DELAY(self): |
- self.contained("LOGIN-DELAY 120 USER", self.caps, self.pcaps) |
- self.assertIn("LOGIN-DELAY 100", self.lpcaps) |
- |
- |
- |
-class GlobalCapabilitiesTestCase(unittest.TestCase): |
- def setUp(self): |
- s = StringIO.StringIO() |
- p = pop3.POP3() |
- p.factory = TestServerFactory() |
- p.factory.pue = p.factory.puld = False |
- p.transport = internet.protocol.FileWrapper(s) |
- p.connectionMade() |
- p.do_CAPA() |
- |
- self.caps = p.listCapabilities() |
- self.pcaps = s.getvalue().splitlines() |
- |
- s = StringIO.StringIO() |
- p.mbox = TestMailbox() |
- p.transport = internet.protocol.FileWrapper(s) |
- p.do_CAPA() |
- |
- self.lpcaps = s.getvalue().splitlines() |
- p.connectionLost(failure.Failure(Exception("Test harness disconnect"))) |
- |
- def contained(self, s, *caps): |
- for c in caps: |
- self.assertIn(s, c) |
- |
- def testEXPIRE(self): |
- self.contained("EXPIRE 60", self.caps, self.pcaps, self.lpcaps) |
- |
- def testLOGIN_DELAY(self): |
- self.contained("LOGIN-DELAY 120", self.caps, self.pcaps, self.lpcaps) |
- |
- |
- |
-class TestRealm: |
- def requestAvatar(self, avatarId, mind, *interfaces): |
- if avatarId == 'testuser': |
- return pop3.IMailbox, DummyMailbox(ValueError), lambda: None |
- assert False |
- |
- |
- |
-class SASLTestCase(unittest.TestCase): |
- def testValidLogin(self): |
- p = pop3.POP3() |
- p.factory = TestServerFactory() |
- p.factory.challengers = {'CRAM-MD5': cred.credentials.CramMD5Credentials} |
- p.portal = cred.portal.Portal(TestRealm()) |
- ch = cred.checkers.InMemoryUsernamePasswordDatabaseDontUse() |
- ch.addUser('testuser', 'testpassword') |
- p.portal.registerChecker(ch) |
- |
- s = StringIO.StringIO() |
- p.transport = internet.protocol.FileWrapper(s) |
- p.connectionMade() |
- |
- p.lineReceived("CAPA") |
- self.failUnless(s.getvalue().find("SASL CRAM-MD5") >= 0) |
- |
- p.lineReceived("AUTH CRAM-MD5") |
- chal = s.getvalue().splitlines()[-1][2:] |
- chal = base64.decodestring(chal) |
- response = hmac.HMAC('testpassword', chal).hexdigest() |
- |
- p.lineReceived(base64.encodestring('testuser ' + response).rstrip('\n')) |
- self.failUnless(p.mbox) |
- self.failUnless(s.getvalue().splitlines()[-1].find("+OK") >= 0) |
- p.connectionLost(failure.Failure(Exception("Test harness disconnect"))) |
- |
- |
- |
-class CommandMixin: |
- """ |
- Tests for all the commands a POP3 server is allowed to receive. |
- """ |
- |
- extraMessage = '''\ |
-From: guy |
-To: fellow |
- |
-More message text for you. |
-''' |
- |
- |
- def setUp(self): |
- """ |
- Make a POP3 server protocol instance hooked up to a simple mailbox and |
- a transport that buffers output to a StringIO. |
- """ |
- p = pop3.POP3() |
- p.mbox = self.mailboxType(self.exceptionType) |
- p.schedule = list |
- self.pop3Server = p |
- |
- s = StringIO.StringIO() |
- p.transport = internet.protocol.FileWrapper(s) |
- p.connectionMade() |
- s.truncate(0) |
- self.pop3Transport = s |
- |
- |
- def tearDown(self): |
- """ |
- Disconnect the server protocol so it can clean up anything it might |
- need to clean up. |
- """ |
- self.pop3Server.connectionLost(failure.Failure(Exception("Test harness disconnect"))) |
- |
- |
- def _flush(self): |
- """ |
- Do some of the things that the reactor would take care of, if the |
- reactor were actually running. |
- """ |
- # Oh man FileWrapper is pooh. |
- self.pop3Server.transport._checkProducer() |
- |
- |
- def testLIST(self): |
- """ |
- Test the two forms of list: with a message index number, which should |
- return a short-form response, and without a message index number, which |
- should return a long-form response, one line per message. |
- """ |
- p = self.pop3Server |
- s = self.pop3Transport |
- |
- p.lineReceived("LIST 1") |
- self._flush() |
- self.assertEquals(s.getvalue(), "+OK 1 44\r\n") |
- s.truncate(0) |
- |
- p.lineReceived("LIST") |
- self._flush() |
- self.assertEquals(s.getvalue(), "+OK 1\r\n1 44\r\n.\r\n") |
- |
- |
- def testLISTWithBadArgument(self): |
- """ |
- Test that non-integers and out-of-bound integers produce appropriate |
- error responses. |
- """ |
- p = self.pop3Server |
- s = self.pop3Transport |
- |
- p.lineReceived("LIST a") |
- self.assertEquals( |
- s.getvalue(), |
- "-ERR Invalid message-number: 'a'\r\n") |
- s.truncate(0) |
- |
- p.lineReceived("LIST 0") |
- self.assertEquals( |
- s.getvalue(), |
- "-ERR Invalid message-number: 0\r\n") |
- s.truncate(0) |
- |
- p.lineReceived("LIST 2") |
- self.assertEquals( |
- s.getvalue(), |
- "-ERR Invalid message-number: 2\r\n") |
- s.truncate(0) |
- |
- |
- def testUIDL(self): |
- """ |
- Test the two forms of the UIDL command. These are just like the two |
- forms of the LIST command. |
- """ |
- p = self.pop3Server |
- s = self.pop3Transport |
- |
- p.lineReceived("UIDL 1") |
- self.assertEquals(s.getvalue(), "+OK 0\r\n") |
- s.truncate(0) |
- |
- p.lineReceived("UIDL") |
- self._flush() |
- self.assertEquals(s.getvalue(), "+OK \r\n1 0\r\n.\r\n") |
- |
- |
- def testUIDLWithBadArgument(self): |
- """ |
- Test that UIDL with a non-integer or an out-of-bounds integer produces |
- the appropriate error response. |
- """ |
- p = self.pop3Server |
- s = self.pop3Transport |
- |
- p.lineReceived("UIDL a") |
- self.assertEquals( |
- s.getvalue(), |
- "-ERR Bad message number argument\r\n") |
- s.truncate(0) |
- |
- p.lineReceived("UIDL 0") |
- self.assertEquals( |
- s.getvalue(), |
- "-ERR Bad message number argument\r\n") |
- s.truncate(0) |
- |
- p.lineReceived("UIDL 2") |
- self.assertEquals( |
- s.getvalue(), |
- "-ERR Bad message number argument\r\n") |
- s.truncate(0) |
- |
- |
- def testSTAT(self): |
- """ |
- Test the single form of the STAT command, which returns a short-form |
- response of the number of messages in the mailbox and their total size. |
- """ |
- p = self.pop3Server |
- s = self.pop3Transport |
- |
- p.lineReceived("STAT") |
- self._flush() |
- self.assertEquals(s.getvalue(), "+OK 1 44\r\n") |
- |
- |
- def testRETR(self): |
- """ |
- Test downloading a message. |
- """ |
- p = self.pop3Server |
- s = self.pop3Transport |
- |
- p.lineReceived("RETR 1") |
- self._flush() |
- self.assertEquals( |
- s.getvalue(), |
- "+OK 44\r\n" |
- "From: moshe\r\n" |
- "To: moshe\r\n" |
- "\r\n" |
- "How are you, friend?\r\n" |
- ".\r\n") |
- s.truncate(0) |
- |
- |
- def testRETRWithBadArgument(self): |
- """ |
- Test that trying to download a message with a bad argument, either not |
- an integer or an out-of-bounds integer, fails with the appropriate |
- error response. |
- """ |
- p = self.pop3Server |
- s = self.pop3Transport |
- |
- p.lineReceived("RETR a") |
- self.assertEquals( |
- s.getvalue(), |
- "-ERR Bad message number argument\r\n") |
- s.truncate(0) |
- |
- p.lineReceived("RETR 0") |
- self.assertEquals( |
- s.getvalue(), |
- "-ERR Bad message number argument\r\n") |
- s.truncate(0) |
- |
- p.lineReceived("RETR 2") |
- self.assertEquals( |
- s.getvalue(), |
- "-ERR Bad message number argument\r\n") |
- s.truncate(0) |
- |
- |
- def testTOP(self): |
- """ |
- Test downloading the headers and part of the body of a message. |
- """ |
- p = self.pop3Server |
- s = self.pop3Transport |
- p.mbox.messages.append(self.extraMessage) |
- |
- p.lineReceived("TOP 1 0") |
- self._flush() |
- self.assertEquals( |
- s.getvalue(), |
- "+OK Top of message follows\r\n" |
- "From: moshe\r\n" |
- "To: moshe\r\n" |
- "\r\n" |
- ".\r\n") |
- |
- |
- def testTOPWithBadArgument(self): |
- """ |
- Test that trying to download a message with a bad argument, either a |
- message number which isn't an integer or is an out-of-bounds integer or |
- a number of lines which isn't an integer or is a negative integer, |
- fails with the appropriate error response. |
- """ |
- p = self.pop3Server |
- s = self.pop3Transport |
- p.mbox.messages.append(self.extraMessage) |
- |
- p.lineReceived("TOP 1 a") |
- self.assertEquals( |
- s.getvalue(), |
- "-ERR Bad line count argument\r\n") |
- s.truncate(0) |
- |
- p.lineReceived("TOP 1 -1") |
- self.assertEquals( |
- s.getvalue(), |
- "-ERR Bad line count argument\r\n") |
- s.truncate(0) |
- |
- p.lineReceived("TOP a 1") |
- self.assertEquals( |
- s.getvalue(), |
- "-ERR Bad message number argument\r\n") |
- s.truncate(0) |
- |
- p.lineReceived("TOP 0 1") |
- self.assertEquals( |
- s.getvalue(), |
- "-ERR Bad message number argument\r\n") |
- s.truncate(0) |
- |
- p.lineReceived("TOP 3 1") |
- self.assertEquals( |
- s.getvalue(), |
- "-ERR Bad message number argument\r\n") |
- s.truncate(0) |
- |
- |
- def testLAST(self): |
- """ |
- Test the exceedingly pointless LAST command, which tells you the |
- highest message index which you have already downloaded. |
- """ |
- p = self.pop3Server |
- s = self.pop3Transport |
- p.mbox.messages.append(self.extraMessage) |
- |
- p.lineReceived('LAST') |
- self.assertEquals( |
- s.getvalue(), |
- "+OK 0\r\n") |
- s.truncate(0) |
- |
- |
- def testRetrieveUpdatesHighest(self): |
- """ |
- Test that issuing a RETR command updates the LAST response. |
- """ |
- p = self.pop3Server |
- s = self.pop3Transport |
- p.mbox.messages.append(self.extraMessage) |
- |
- p.lineReceived('RETR 2') |
- self._flush() |
- s.truncate(0) |
- p.lineReceived('LAST') |
- self.assertEquals( |
- s.getvalue(), |
- '+OK 2\r\n') |
- s.truncate(0) |
- |
- |
- def testTopUpdatesHighest(self): |
- """ |
- Test that issuing a TOP command updates the LAST response. |
- """ |
- p = self.pop3Server |
- s = self.pop3Transport |
- p.mbox.messages.append(self.extraMessage) |
- |
- p.lineReceived('TOP 2 10') |
- self._flush() |
- s.truncate(0) |
- p.lineReceived('LAST') |
- self.assertEquals( |
- s.getvalue(), |
- '+OK 2\r\n') |
- |
- |
- def testHighestOnlyProgresses(self): |
- """ |
- Test that downloading a message with a smaller index than the current |
- LAST response doesn't change the LAST response. |
- """ |
- p = self.pop3Server |
- s = self.pop3Transport |
- p.mbox.messages.append(self.extraMessage) |
- |
- p.lineReceived('RETR 2') |
- self._flush() |
- p.lineReceived('TOP 1 10') |
- self._flush() |
- s.truncate(0) |
- p.lineReceived('LAST') |
- self.assertEquals( |
- s.getvalue(), |
- '+OK 2\r\n') |
- |
- |
- def testResetClearsHighest(self): |
- """ |
- Test that issuing RSET changes the LAST response to 0. |
- """ |
- p = self.pop3Server |
- s = self.pop3Transport |
- p.mbox.messages.append(self.extraMessage) |
- |
- p.lineReceived('RETR 2') |
- self._flush() |
- p.lineReceived('RSET') |
- s.truncate(0) |
- p.lineReceived('LAST') |
- self.assertEquals( |
- s.getvalue(), |
- '+OK 0\r\n') |
- |
- |
- |
-_listMessageDeprecation = ( |
- "twisted.mail.pop3.IMailbox.listMessages may not " |
- "raise IndexError for out-of-bounds message numbers: " |
- "raise ValueError instead.") |
-_listMessageSuppression = util.suppress( |
- message=_listMessageDeprecation, |
- category=PendingDeprecationWarning) |
- |
-_getUidlDeprecation = ( |
- "twisted.mail.pop3.IMailbox.getUidl may not " |
- "raise IndexError for out-of-bounds message numbers: " |
- "raise ValueError instead.") |
-_getUidlSuppression = util.suppress( |
- message=_getUidlDeprecation, |
- category=PendingDeprecationWarning) |
- |
-class IndexErrorCommandTestCase(CommandMixin, unittest.TestCase): |
- """ |
- Run all of the command tests against a mailbox which raises IndexError |
- when an out of bounds request is made. This behavior will be deprecated |
- shortly and then removed. |
- """ |
- exceptionType = IndexError |
- mailboxType = DummyMailbox |
- |
- def testLISTWithBadArgument(self): |
- return CommandMixin.testLISTWithBadArgument(self) |
- testLISTWithBadArgument.suppress = [_listMessageSuppression] |
- |
- |
- def testUIDLWithBadArgument(self): |
- return CommandMixin.testUIDLWithBadArgument(self) |
- testUIDLWithBadArgument.suppress = [_getUidlSuppression] |
- |
- |
- def testTOPWithBadArgument(self): |
- return CommandMixin.testTOPWithBadArgument(self) |
- testTOPWithBadArgument.suppress = [_listMessageSuppression] |
- |
- |
- def testRETRWithBadArgument(self): |
- return CommandMixin.testRETRWithBadArgument(self) |
- testRETRWithBadArgument.suppress = [_listMessageSuppression] |
- |
- |
- |
-class ValueErrorCommandTestCase(CommandMixin, unittest.TestCase): |
- """ |
- Run all of the command tests against a mailbox which raises ValueError |
- when an out of bounds request is made. This is the correct behavior and |
- after support for mailboxes which raise IndexError is removed, this will |
- become just C{CommandTestCase}. |
- """ |
- exceptionType = ValueError |
- mailboxType = DummyMailbox |
- |
- |
- |
-class SyncDeferredMailbox(DummyMailbox): |
- """ |
- Mailbox which has a listMessages implementation which returns a Deferred |
- which has already fired. |
- """ |
- def listMessages(self, n=None): |
- return defer.succeed(DummyMailbox.listMessages(self, n)) |
- |
- |
- |
-class IndexErrorSyncDeferredCommandTestCase(IndexErrorCommandTestCase): |
- """ |
- Run all of the L{IndexErrorCommandTestCase} tests with a |
- synchronous-Deferred returning IMailbox implementation. |
- """ |
- mailboxType = SyncDeferredMailbox |
- |
- |
- |
-class ValueErrorSyncDeferredCommandTestCase(ValueErrorCommandTestCase): |
- """ |
- Run all of the L{ValueErrorCommandTestCase} tests with a |
- synchronous-Deferred returning IMailbox implementation. |
- """ |
- mailboxType = SyncDeferredMailbox |
- |
- |
- |
-class AsyncDeferredMailbox(DummyMailbox): |
- """ |
- Mailbox which has a listMessages implementation which returns a Deferred |
- which has not yet fired. |
- """ |
- def __init__(self, *a, **kw): |
- self.waiting = [] |
- DummyMailbox.__init__(self, *a, **kw) |
- |
- |
- def listMessages(self, n=None): |
- d = defer.Deferred() |
- # See AsyncDeferredMailbox._flush |
- self.waiting.append((d, DummyMailbox.listMessages(self, n))) |
- return d |
- |
- |
- |
-class IndexErrorAsyncDeferredCommandTestCase(IndexErrorCommandTestCase): |
- """ |
- Run all of the L{IndexErrorCommandTestCase} tests with an asynchronous-Deferred |
- returning IMailbox implementation. |
- """ |
- mailboxType = AsyncDeferredMailbox |
- |
- def _flush(self): |
- """ |
- Fire whatever Deferreds we've built up in our mailbox. |
- """ |
- while self.pop3Server.mbox.waiting: |
- d, a = self.pop3Server.mbox.waiting.pop() |
- d.callback(a) |
- IndexErrorCommandTestCase._flush(self) |
- |
- |
- |
-class ValueErrorAsyncDeferredCommandTestCase(ValueErrorCommandTestCase): |
- """ |
- Run all of the L{IndexErrorCommandTestCase} tests with an asynchronous-Deferred |
- returning IMailbox implementation. |
- """ |
- mailboxType = AsyncDeferredMailbox |
- |
- def _flush(self): |
- """ |
- Fire whatever Deferreds we've built up in our mailbox. |
- """ |
- while self.pop3Server.mbox.waiting: |
- d, a = self.pop3Server.mbox.waiting.pop() |
- d.callback(a) |
- ValueErrorCommandTestCase._flush(self) |
- |
-class POP3MiscTestCase(unittest.TestCase): |
- """ |
- Miscellaneous tests more to do with module/package structure than |
- anything to do with the Post Office Protocol. |
- """ |
- def test_all(self): |
- """ |
- This test checks that all names listed in |
- twisted.mail.pop3.__all__ are actually present in the module. |
- """ |
- mod = twisted.mail.pop3 |
- for attr in mod.__all__: |
- self.failUnless(hasattr(mod, attr)) |