Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1)

Unified Diff: third_party/twisted_8_1/twisted/mail/test/test_imap.py

Issue 12261012: Remove third_party/twisted_8_1 (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/tools/build
Patch Set: Created 7 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: third_party/twisted_8_1/twisted/mail/test/test_imap.py
diff --git a/third_party/twisted_8_1/twisted/mail/test/test_imap.py b/third_party/twisted_8_1/twisted/mail/test/test_imap.py
deleted file mode 100644
index 6d11e1eac123517f69f4534e8812819741d90787..0000000000000000000000000000000000000000
--- a/third_party/twisted_8_1/twisted/mail/test/test_imap.py
+++ /dev/null
@@ -1,3040 +0,0 @@
-# -*- test-case-name: twisted.mail.test.test_imap -*-
-# Copyright (c) 2001-2008 Twisted Matrix Laboratories.
-# See LICENSE for details.
-
-
-"""
-Test case for twisted.mail.imap4
-"""
-
-try:
- from cStringIO import StringIO
-except ImportError:
- from StringIO import StringIO
-
-import os
-import types
-
-from zope.interface import implements
-
-from twisted.mail.imap4 import MessageSet
-from twisted.mail import imap4
-from twisted.protocols import loopback
-from twisted.internet import defer
-from twisted.internet import error
-from twisted.internet import reactor
-from twisted.internet import interfaces
-from twisted.internet.task import Clock
-from twisted.trial import unittest
-from twisted.python import util
-from twisted.python import failure
-
-from twisted import cred
-import twisted.cred.error
-import twisted.cred.checkers
-import twisted.cred.credentials
-import twisted.cred.portal
-
-from twisted.test.proto_helpers import StringTransport, StringTransportWithDisconnection
-
-try:
- from twisted.test.ssl_helpers import ClientTLSContext, ServerTLSContext
-except ImportError:
- ClientTLSContext = ServerTLSContext = None
-
-def strip(f):
- return lambda result, f=f: f()
-
-def sortNest(l):
- l = l[:]
- l.sort()
- for i in range(len(l)):
- if isinstance(l[i], types.ListType):
- l[i] = sortNest(l[i])
- elif isinstance(l[i], types.TupleType):
- l[i] = tuple(sortNest(list(l[i])))
- return l
-
-class IMAP4UTF7TestCase(unittest.TestCase):
- tests = [
- [u'Hello world', 'Hello world'],
- [u'Hello & world', 'Hello &- world'],
- [u'Hello\xffworld', 'Hello&AP8-world'],
- [u'\xff\xfe\xfd\xfc', '&AP8A,gD9APw-'],
- [u'~peter/mail/\u65e5\u672c\u8a9e/\u53f0\u5317',
- '~peter/mail/&ZeVnLIqe-/&U,BTFw-'], # example from RFC 2060
- ]
-
- def test_encodeWithErrors(self):
- """
- Specifying an error policy to C{unicode.encode} with the
- I{imap4-utf-7} codec should produce the same result as not
- specifying the error policy.
- """
- text = u'Hello world'
- self.assertEqual(
- text.encode('imap4-utf-7', 'strict'),
- text.encode('imap4-utf-7'))
-
-
- def test_decodeWithErrors(self):
- """
- Similar to L{test_encodeWithErrors}, but for C{str.decode}.
- """
- bytes = 'Hello world'
- self.assertEqual(
- bytes.decode('imap4-utf-7', 'strict'),
- bytes.decode('imap4-utf-7'))
-
-
- def testEncode(self):
- for (input, output) in self.tests:
- self.assertEquals(input.encode('imap4-utf-7'), output)
-
- def testDecode(self):
- for (input, output) in self.tests:
- # XXX - Piece of *crap* 2.1
- self.assertEquals(input, imap4.decoder(output)[0])
-
- def testPrintableSingletons(self):
- # All printables represent themselves
- for o in range(0x20, 0x26) + range(0x27, 0x7f):
- self.failUnlessEqual(chr(o), chr(o).encode('imap4-utf-7'))
- self.failUnlessEqual(chr(o), chr(o).decode('imap4-utf-7'))
- self.failUnlessEqual('&'.encode('imap4-utf-7'), '&-')
- self.failUnlessEqual('&-'.decode('imap4-utf-7'), '&')
-
-class BufferingConsumer:
- def __init__(self):
- self.buffer = []
-
- def write(self, bytes):
- self.buffer.append(bytes)
- if self.consumer:
- self.consumer.resumeProducing()
-
- def registerProducer(self, consumer, streaming):
- self.consumer = consumer
- self.consumer.resumeProducing()
-
- def unregisterProducer(self):
- self.consumer = None
-
-class MessageProducerTestCase(unittest.TestCase):
- def testSinglePart(self):
- body = 'This is body text. Rar.'
- headers = util.OrderedDict()
- headers['from'] = 'sender@host'
- headers['to'] = 'recipient@domain'
- headers['subject'] = 'booga booga boo'
- headers['content-type'] = 'text/plain'
-
- msg = FakeyMessage(headers, (), None, body, 123, None )
-
- c = BufferingConsumer()
- p = imap4.MessageProducer(msg)
- d = p.beginProducing(c)
-
- def cbProduced(result):
- self.assertIdentical(result, p)
- self.assertEquals(
- ''.join(c.buffer),
-
- '{119}\r\n'
- 'From: sender@host\r\n'
- 'To: recipient@domain\r\n'
- 'Subject: booga booga boo\r\n'
- 'Content-Type: text/plain\r\n'
- '\r\n'
- + body)
- return d.addCallback(cbProduced)
-
-
- def testSingleMultiPart(self):
- outerBody = ''
- innerBody = 'Contained body message text. Squarge.'
- headers = util.OrderedDict()
- headers['from'] = 'sender@host'
- headers['to'] = 'recipient@domain'
- headers['subject'] = 'booga booga boo'
- headers['content-type'] = 'multipart/alternative; boundary="xyz"'
-
- innerHeaders = util.OrderedDict()
- innerHeaders['subject'] = 'this is subject text'
- innerHeaders['content-type'] = 'text/plain'
- msg = FakeyMessage(headers, (), None, outerBody, 123,
- [FakeyMessage(innerHeaders, (), None, innerBody,
- None, None)],
- )
-
- c = BufferingConsumer()
- p = imap4.MessageProducer(msg)
- d = p.beginProducing(c)
-
- def cbProduced(result):
- self.failUnlessIdentical(result, p)
-
- self.assertEquals(
- ''.join(c.buffer),
-
- '{239}\r\n'
- 'From: sender@host\r\n'
- 'To: recipient@domain\r\n'
- 'Subject: booga booga boo\r\n'
- 'Content-Type: multipart/alternative; boundary="xyz"\r\n'
- '\r\n'
- '\r\n'
- '--xyz\r\n'
- 'Subject: this is subject text\r\n'
- 'Content-Type: text/plain\r\n'
- '\r\n'
- + innerBody
- + '\r\n--xyz--\r\n')
-
- return d.addCallback(cbProduced)
-
-
- def testMultipleMultiPart(self):
- outerBody = ''
- innerBody1 = 'Contained body message text. Squarge.'
- innerBody2 = 'Secondary <i>message</i> text of squarge body.'
- headers = util.OrderedDict()
- headers['from'] = 'sender@host'
- headers['to'] = 'recipient@domain'
- headers['subject'] = 'booga booga boo'
- headers['content-type'] = 'multipart/alternative; boundary="xyz"'
- innerHeaders = util.OrderedDict()
- innerHeaders['subject'] = 'this is subject text'
- innerHeaders['content-type'] = 'text/plain'
- innerHeaders2 = util.OrderedDict()
- innerHeaders2['subject'] = '<b>this is subject</b>'
- innerHeaders2['content-type'] = 'text/html'
- msg = FakeyMessage(headers, (), None, outerBody, 123, [
- FakeyMessage(innerHeaders, (), None, innerBody1, None, None),
- FakeyMessage(innerHeaders2, (), None, innerBody2, None, None)
- ],
- )
-
- c = BufferingConsumer()
- p = imap4.MessageProducer(msg)
- d = p.beginProducing(c)
-
- def cbProduced(result):
- self.failUnlessIdentical(result, p)
-
- self.assertEquals(
- ''.join(c.buffer),
-
- '{354}\r\n'
- 'From: sender@host\r\n'
- 'To: recipient@domain\r\n'
- 'Subject: booga booga boo\r\n'
- 'Content-Type: multipart/alternative; boundary="xyz"\r\n'
- '\r\n'
- '\r\n'
- '--xyz\r\n'
- 'Subject: this is subject text\r\n'
- 'Content-Type: text/plain\r\n'
- '\r\n'
- + innerBody1
- + '\r\n--xyz\r\n'
- 'Subject: <b>this is subject</b>\r\n'
- 'Content-Type: text/html\r\n'
- '\r\n'
- + innerBody2
- + '\r\n--xyz--\r\n')
- return d.addCallback(cbProduced)
-
-
-class IMAP4HelperTestCase(unittest.TestCase):
- def testFileProducer(self):
- b = (('x' * 1) + ('y' * 1) + ('z' * 1)) * 10
- c = BufferingConsumer()
- f = StringIO(b)
- p = imap4.FileProducer(f)
- d = p.beginProducing(c)
-
- def cbProduced(result):
- self.failUnlessIdentical(result, p)
- self.assertEquals(
- ('{%d}\r\n' % len(b))+ b,
- ''.join(c.buffer))
- return d.addCallback(cbProduced)
-
- def testWildcard(self):
- cases = [
- ['foo/%gum/bar',
- ['foo/bar', 'oo/lalagum/bar', 'foo/gumx/bar', 'foo/gum/baz'],
- ['foo/xgum/bar', 'foo/gum/bar'],
- ], ['foo/x%x/bar',
- ['foo', 'bar', 'fuz fuz fuz', 'foo/*/bar', 'foo/xyz/bar', 'foo/xx/baz'],
- ['foo/xyx/bar', 'foo/xx/bar', 'foo/xxxxxxxxxxxxxx/bar'],
- ], ['foo/xyz*abc/bar',
- ['foo/xyz/bar', 'foo/abc/bar', 'foo/xyzab/cbar', 'foo/xyza/bcbar'],
- ['foo/xyzabc/bar', 'foo/xyz/abc/bar', 'foo/xyz/123/abc/bar'],
- ]
- ]
-
- for (wildcard, fail, succeed) in cases:
- wildcard = imap4.wildcardToRegexp(wildcard, '/')
- for x in fail:
- self.failIf(wildcard.match(x))
- for x in succeed:
- self.failUnless(wildcard.match(x))
-
- def testWildcardNoDelim(self):
- cases = [
- ['foo/%gum/bar',
- ['foo/bar', 'oo/lalagum/bar', 'foo/gumx/bar', 'foo/gum/baz'],
- ['foo/xgum/bar', 'foo/gum/bar', 'foo/x/gum/bar'],
- ], ['foo/x%x/bar',
- ['foo', 'bar', 'fuz fuz fuz', 'foo/*/bar', 'foo/xyz/bar', 'foo/xx/baz'],
- ['foo/xyx/bar', 'foo/xx/bar', 'foo/xxxxxxxxxxxxxx/bar', 'foo/x/x/bar'],
- ], ['foo/xyz*abc/bar',
- ['foo/xyz/bar', 'foo/abc/bar', 'foo/xyzab/cbar', 'foo/xyza/bcbar'],
- ['foo/xyzabc/bar', 'foo/xyz/abc/bar', 'foo/xyz/123/abc/bar'],
- ]
- ]
-
- for (wildcard, fail, succeed) in cases:
- wildcard = imap4.wildcardToRegexp(wildcard, None)
- for x in fail:
- self.failIf(wildcard.match(x), x)
- for x in succeed:
- self.failUnless(wildcard.match(x), x)
-
- def testHeaderFormatter(self):
- cases = [
- ({'Header1': 'Value1', 'Header2': 'Value2'}, 'Header2: Value2\r\nHeader1: Value1\r\n'),
- ]
-
- for (input, output) in cases:
- self.assertEquals(imap4._formatHeaders(input), output)
-
- def testMessageSet(self):
- m1 = MessageSet()
- m2 = MessageSet()
-
- self.assertEquals(m1, m2)
-
- m1 = m1 + (1, 3)
- self.assertEquals(len(m1), 3)
- self.assertEquals(list(m1), [1, 2, 3])
-
- m2 = m2 + (1, 3)
- self.assertEquals(m1, m2)
- self.assertEquals(list(m1 + m2), [1, 2, 3])
-
- def testQuotedSplitter(self):
- cases = [
- '''Hello World''',
- '''Hello "World!"''',
- '''World "Hello" "How are you?"''',
- '''"Hello world" How "are you?"''',
- '''foo bar "baz buz" NIL''',
- '''foo bar "baz buz" "NIL"''',
- '''foo NIL "baz buz" bar''',
- '''foo "NIL" "baz buz" bar''',
- '''"NIL" bar "baz buz" foo''',
- ]
-
- answers = [
- ['Hello', 'World'],
- ['Hello', 'World!'],
- ['World', 'Hello', 'How are you?'],
- ['Hello world', 'How', 'are you?'],
- ['foo', 'bar', 'baz buz', None],
- ['foo', 'bar', 'baz buz', 'NIL'],
- ['foo', None, 'baz buz', 'bar'],
- ['foo', 'NIL', 'baz buz', 'bar'],
- ['NIL', 'bar', 'baz buz', 'foo'],
- ]
-
- errors = [
- '"mismatched quote',
- 'mismatched quote"',
- 'mismatched"quote',
- '"oops here is" another"',
- ]
-
- for s in errors:
- self.assertRaises(imap4.MismatchedQuoting, imap4.splitQuoted, s)
-
- for (case, expected) in zip(cases, answers):
- self.assertEquals(imap4.splitQuoted(case), expected)
-
-
- def testStringCollapser(self):
- cases = [
- ['a', 'b', 'c', 'd', 'e'],
- ['a', ' ', '"', 'b', 'c', ' ', '"', ' ', 'd', 'e'],
- [['a', 'b', 'c'], 'd', 'e'],
- ['a', ['b', 'c', 'd'], 'e'],
- ['a', 'b', ['c', 'd', 'e']],
- ['"', 'a', ' ', '"', ['b', 'c', 'd'], '"', ' ', 'e', '"'],
- ['a', ['"', ' ', 'b', 'c', ' ', ' ', '"'], 'd', 'e'],
- ]
-
- answers = [
- ['abcde'],
- ['a', 'bc ', 'de'],
- [['abc'], 'de'],
- ['a', ['bcd'], 'e'],
- ['ab', ['cde']],
- ['a ', ['bcd'], ' e'],
- ['a', [' bc '], 'de'],
- ]
-
- for (case, expected) in zip(cases, answers):
- self.assertEquals(imap4.collapseStrings(case), expected)
-
- def testParenParser(self):
- s = '\r\n'.join(['xx'] * 4)
- cases = [
- '(BODY.PEEK[HEADER.FIELDS.NOT (subject bcc cc)] {%d}\r\n%s)' % (len(s), s,),
-
-# '(FLAGS (\Seen) INTERNALDATE "17-Jul-1996 02:44:25 -0700" '
-# 'RFC822.SIZE 4286 ENVELOPE ("Wed, 17 Jul 1996 02:23:25 -0700 (PDT)" '
-# '"IMAP4rev1 WG mtg summary and minutes" '
-# '(("Terry Gray" NIL "gray" "cac.washington.edu")) '
-# '(("Terry Gray" NIL "gray" "cac.washington.edu")) '
-# '(("Terry Gray" NIL "gray" "cac.washington.edu")) '
-# '((NIL NIL "imap" "cac.washington.edu")) '
-# '((NIL NIL "minutes" "CNRI.Reston.VA.US") '
-# '("John Klensin" NIL "KLENSIN" "INFOODS.MIT.EDU")) NIL NIL '
-# '"<B27397-0100000@cac.washington.edu>") '
-# 'BODY ("TEXT" "PLAIN" ("CHARSET" "US-ASCII") NIL NIL "7BIT" 3028 92))',
-
- '(FLAGS (\Seen) INTERNALDATE "17-Jul-1996 02:44:25 -0700" '
- 'RFC822.SIZE 4286 ENVELOPE ("Wed, 17 Jul 1996 02:23:25 -0700 (PDT)" '
- '"IMAP4rev1 WG mtg summary and minutes" '
- '(("Terry Gray" NIL gray cac.washington.edu)) '
- '(("Terry Gray" NIL gray cac.washington.edu)) '
- '(("Terry Gray" NIL gray cac.washington.edu)) '
- '((NIL NIL imap cac.washington.edu)) '
- '((NIL NIL minutes CNRI.Reston.VA.US) '
- '("John Klensin" NIL KLENSIN INFOODS.MIT.EDU)) NIL NIL '
- '<B27397-0100000@cac.washington.edu>) '
- 'BODY (TEXT PLAIN (CHARSET US-ASCII) NIL NIL 7BIT 3028 92))',
- ]
-
- answers = [
- ['BODY.PEEK', ['HEADER.FIELDS.NOT', ['subject', 'bcc', 'cc']], s],
-
- ['FLAGS', [r'\Seen'], 'INTERNALDATE',
- '17-Jul-1996 02:44:25 -0700', 'RFC822.SIZE', '4286', 'ENVELOPE',
- ['Wed, 17 Jul 1996 02:23:25 -0700 (PDT)',
- 'IMAP4rev1 WG mtg summary and minutes', [["Terry Gray", None,
- "gray", "cac.washington.edu"]], [["Terry Gray", None,
- "gray", "cac.washington.edu"]], [["Terry Gray", None,
- "gray", "cac.washington.edu"]], [[None, None, "imap",
- "cac.washington.edu"]], [[None, None, "minutes",
- "CNRI.Reston.VA.US"], ["John Klensin", None, "KLENSIN",
- "INFOODS.MIT.EDU"]], None, None,
- "<B27397-0100000@cac.washington.edu>"], "BODY", ["TEXT", "PLAIN",
- ["CHARSET", "US-ASCII"], None, None, "7BIT", "3028", "92"]],
- ]
-
- for (case, expected) in zip(cases, answers):
- self.assertEquals(imap4.parseNestedParens(case), [expected])
-
- # XXX This code used to work, but changes occurred within the
- # imap4.py module which made it no longer necessary for *all* of it
- # to work. In particular, only the part that makes
- # 'BODY.PEEK[HEADER.FIELDS.NOT (Subject Bcc Cc)]' come out correctly
- # no longer needs to work. So, I am loathe to delete the entire
- # section of the test. --exarkun
- #
-
-# for (case, expected) in zip(answers, cases):
-# self.assertEquals('(' + imap4.collapseNestedLists(case) + ')', expected)
-
- def testFetchParserSimple(self):
- cases = [
- ['ENVELOPE', 'Envelope'],
- ['FLAGS', 'Flags'],
- ['INTERNALDATE', 'InternalDate'],
- ['RFC822.HEADER', 'RFC822Header'],
- ['RFC822.SIZE', 'RFC822Size'],
- ['RFC822.TEXT', 'RFC822Text'],
- ['RFC822', 'RFC822'],
- ['UID', 'UID'],
- ['BODYSTRUCTURE', 'BodyStructure'],
- ]
-
- for (inp, outp) in cases:
- p = imap4._FetchParser()
- p.parseString(inp)
- self.assertEquals(len(p.result), 1)
- self.failUnless(isinstance(p.result[0], getattr(p, outp)))
-
- def testFetchParserMacros(self):
- cases = [
- ['ALL', (4, ['flags', 'internaldate', 'rfc822.size', 'envelope'])],
- ['FULL', (5, ['flags', 'internaldate', 'rfc822.size', 'envelope', 'body'])],
- ['FAST', (3, ['flags', 'internaldate', 'rfc822.size'])],
- ]
-
- for (inp, outp) in cases:
- p = imap4._FetchParser()
- p.parseString(inp)
- self.assertEquals(len(p.result), outp[0])
- p = [str(p).lower() for p in p.result]
- p.sort()
- outp[1].sort()
- self.assertEquals(p, outp[1])
-
- def testFetchParserBody(self):
- P = imap4._FetchParser
-
- p = P()
- p.parseString('BODY')
- self.assertEquals(len(p.result), 1)
- self.failUnless(isinstance(p.result[0], p.Body))
- self.assertEquals(p.result[0].peek, False)
- self.assertEquals(p.result[0].header, None)
- self.assertEquals(str(p.result[0]), 'BODY')
-
- p = P()
- p.parseString('BODY.PEEK')
- self.assertEquals(len(p.result), 1)
- self.failUnless(isinstance(p.result[0], p.Body))
- self.assertEquals(p.result[0].peek, True)
- self.assertEquals(str(p.result[0]), 'BODY')
-
- p = P()
- p.parseString('BODY[]')
- self.assertEquals(len(p.result), 1)
- self.failUnless(isinstance(p.result[0], p.Body))
- self.assertEquals(p.result[0].empty, True)
- self.assertEquals(str(p.result[0]), 'BODY[]')
-
- p = P()
- p.parseString('BODY[HEADER]')
- self.assertEquals(len(p.result), 1)
- self.failUnless(isinstance(p.result[0], p.Body))
- self.assertEquals(p.result[0].peek, False)
- self.failUnless(isinstance(p.result[0].header, p.Header))
- self.assertEquals(p.result[0].header.negate, True)
- self.assertEquals(p.result[0].header.fields, ())
- self.assertEquals(p.result[0].empty, False)
- self.assertEquals(str(p.result[0]), 'BODY[HEADER]')
-
- p = P()
- p.parseString('BODY.PEEK[HEADER]')
- self.assertEquals(len(p.result), 1)
- self.failUnless(isinstance(p.result[0], p.Body))
- self.assertEquals(p.result[0].peek, True)
- self.failUnless(isinstance(p.result[0].header, p.Header))
- self.assertEquals(p.result[0].header.negate, True)
- self.assertEquals(p.result[0].header.fields, ())
- self.assertEquals(p.result[0].empty, False)
- self.assertEquals(str(p.result[0]), 'BODY[HEADER]')
-
- p = P()
- p.parseString('BODY[HEADER.FIELDS (Subject Cc Message-Id)]')
- self.assertEquals(len(p.result), 1)
- self.failUnless(isinstance(p.result[0], p.Body))
- self.assertEquals(p.result[0].peek, False)
- self.failUnless(isinstance(p.result[0].header, p.Header))
- self.assertEquals(p.result[0].header.negate, False)
- self.assertEquals(p.result[0].header.fields, ['SUBJECT', 'CC', 'MESSAGE-ID'])
- self.assertEquals(p.result[0].empty, False)
- self.assertEquals(str(p.result[0]), 'BODY[HEADER.FIELDS (Subject Cc Message-Id)]')
-
- p = P()
- p.parseString('BODY.PEEK[HEADER.FIELDS (Subject Cc Message-Id)]')
- self.assertEquals(len(p.result), 1)
- self.failUnless(isinstance(p.result[0], p.Body))
- self.assertEquals(p.result[0].peek, True)
- self.failUnless(isinstance(p.result[0].header, p.Header))
- self.assertEquals(p.result[0].header.negate, False)
- self.assertEquals(p.result[0].header.fields, ['SUBJECT', 'CC', 'MESSAGE-ID'])
- self.assertEquals(p.result[0].empty, False)
- self.assertEquals(str(p.result[0]), 'BODY[HEADER.FIELDS (Subject Cc Message-Id)]')
-
- p = P()
- p.parseString('BODY.PEEK[HEADER.FIELDS.NOT (Subject Cc Message-Id)]')
- self.assertEquals(len(p.result), 1)
- self.failUnless(isinstance(p.result[0], p.Body))
- self.assertEquals(p.result[0].peek, True)
- self.failUnless(isinstance(p.result[0].header, p.Header))
- self.assertEquals(p.result[0].header.negate, True)
- self.assertEquals(p.result[0].header.fields, ['SUBJECT', 'CC', 'MESSAGE-ID'])
- self.assertEquals(p.result[0].empty, False)
- self.assertEquals(str(p.result[0]), 'BODY[HEADER.FIELDS.NOT (Subject Cc Message-Id)]')
-
- p = P()
- p.parseString('BODY[1.MIME]<10.50>')
- self.assertEquals(len(p.result), 1)
- self.failUnless(isinstance(p.result[0], p.Body))
- self.assertEquals(p.result[0].peek, False)
- self.failUnless(isinstance(p.result[0].mime, p.MIME))
- self.assertEquals(p.result[0].part, (0,))
- self.assertEquals(p.result[0].partialBegin, 10)
- self.assertEquals(p.result[0].partialLength, 50)
- self.assertEquals(p.result[0].empty, False)
- self.assertEquals(str(p.result[0]), 'BODY[1.MIME]<10.50>')
-
- p = P()
- p.parseString('BODY.PEEK[1.3.9.11.HEADER.FIELDS.NOT (Message-Id Date)]<103.69>')
- self.assertEquals(len(p.result), 1)
- self.failUnless(isinstance(p.result[0], p.Body))
- self.assertEquals(p.result[0].peek, True)
- self.failUnless(isinstance(p.result[0].header, p.Header))
- self.assertEquals(p.result[0].part, (0, 2, 8, 10))
- self.assertEquals(p.result[0].header.fields, ['MESSAGE-ID', 'DATE'])
- self.assertEquals(p.result[0].partialBegin, 103)
- self.assertEquals(p.result[0].partialLength, 69)
- self.assertEquals(p.result[0].empty, False)
- self.assertEquals(str(p.result[0]), 'BODY[1.3.9.11.HEADER.FIELDS.NOT (Message-Id Date)]<103.69>')
-
-
- def testFiles(self):
- inputStructure = [
- 'foo', 'bar', 'baz', StringIO('this is a file\r\n'), 'buz'
- ]
-
- output = '"foo" "bar" "baz" {16}\r\nthis is a file\r\n "buz"'
-
- self.assertEquals(imap4.collapseNestedLists(inputStructure), output)
-
- def testQuoteAvoider(self):
- input = [
- 'foo', imap4.DontQuoteMe('bar'), "baz", StringIO('this is a file\r\n'),
- imap4.DontQuoteMe('buz'), ""
- ]
-
- output = '"foo" bar "baz" {16}\r\nthis is a file\r\n buz ""'
-
- self.assertEquals(imap4.collapseNestedLists(input), output)
-
- def testLiterals(self):
- cases = [
- ('({10}\r\n0123456789)', [['0123456789']]),
- ]
-
- for (case, expected) in cases:
- self.assertEquals(imap4.parseNestedParens(case), expected)
-
- def testQueryBuilder(self):
- inputs = [
- imap4.Query(flagged=1),
- imap4.Query(sorted=1, unflagged=1, deleted=1),
- imap4.Or(imap4.Query(flagged=1), imap4.Query(deleted=1)),
- imap4.Query(before='today'),
- imap4.Or(
- imap4.Query(deleted=1),
- imap4.Query(unseen=1),
- imap4.Query(new=1)
- ),
- imap4.Or(
- imap4.Not(
- imap4.Or(
- imap4.Query(sorted=1, since='yesterday', smaller=1000),
- imap4.Query(sorted=1, before='tuesday', larger=10000),
- imap4.Query(sorted=1, unseen=1, deleted=1, before='today'),
- imap4.Not(
- imap4.Query(subject='spam')
- ),
- ),
- ),
- imap4.Not(
- imap4.Query(uid='1:5')
- ),
- )
- ]
-
- outputs = [
- 'FLAGGED',
- '(DELETED UNFLAGGED)',
- '(OR FLAGGED DELETED)',
- '(BEFORE "today")',
- '(OR DELETED (OR UNSEEN NEW))',
- '(OR (NOT (OR (SINCE "yesterday" SMALLER 1000) ' # Continuing
- '(OR (BEFORE "tuesday" LARGER 10000) (OR (BEFORE ' # Some more
- '"today" DELETED UNSEEN) (NOT (SUBJECT "spam")))))) ' # And more
- '(NOT (UID 1:5)))',
- ]
-
- for (query, expected) in zip(inputs, outputs):
- self.assertEquals(query, expected)
-
- def testIdListParser(self):
- inputs = [
- '1:*',
- '5:*',
- '1:2,5:*',
- '1',
- '1,2',
- '1,3,5',
- '1:10',
- '1:10,11',
- '1:5,10:20',
- '1,5:10',
- '1,5:10,15:20',
- '1:10,15,20:25',
- ]
-
- outputs = [
- MessageSet(1, None),
- MessageSet(5, None),
- MessageSet(5, None) + MessageSet(1, 2),
- MessageSet(1),
- MessageSet(1, 2),
- MessageSet(1) + MessageSet(3) + MessageSet(5),
- MessageSet(1, 10),
- MessageSet(1, 11),
- MessageSet(1, 5) + MessageSet(10, 20),
- MessageSet(1) + MessageSet(5, 10),
- MessageSet(1) + MessageSet(5, 10) + MessageSet(15, 20),
- MessageSet(1, 10) + MessageSet(15) + MessageSet(20, 25),
- ]
-
- lengths = [
- None, None, None,
- 1, 2, 3, 10, 11, 16, 7, 13, 17,
- ]
-
- for (input, expected) in zip(inputs, outputs):
- self.assertEquals(imap4.parseIdList(input), expected)
-
- for (input, expected) in zip(inputs, lengths):
- try:
- L = len(imap4.parseIdList(input))
- except TypeError:
- L = None
- self.assertEquals(L, expected,
- "len(%r) = %r != %r" % (input, L, expected))
-
-class SimpleMailbox:
- implements(imap4.IMailboxInfo, imap4.IMailbox, imap4.ICloseableMailbox)
-
- flags = ('\\Flag1', 'Flag2', '\\AnotherSysFlag', 'LastFlag')
- messages = []
- mUID = 0
- rw = 1
- closed = False
-
- def __init__(self):
- self.listeners = []
- self.addListener = self.listeners.append
- self.removeListener = self.listeners.remove
-
- def getFlags(self):
- return self.flags
-
- def getUIDValidity(self):
- return 42
-
- def getUIDNext(self):
- return len(self.messages) + 1
-
- def getMessageCount(self):
- return 9
-
- def getRecentCount(self):
- return 3
-
- def getUnseenCount(self):
- return 4
-
- def isWriteable(self):
- return self.rw
-
- def destroy(self):
- pass
-
- def getHierarchicalDelimiter(self):
- return '/'
-
- def requestStatus(self, names):
- r = {}
- if 'MESSAGES' in names:
- r['MESSAGES'] = self.getMessageCount()
- if 'RECENT' in names:
- r['RECENT'] = self.getRecentCount()
- if 'UIDNEXT' in names:
- r['UIDNEXT'] = self.getMessageCount() + 1
- if 'UIDVALIDITY' in names:
- r['UIDVALIDITY'] = self.getUID()
- if 'UNSEEN' in names:
- r['UNSEEN'] = self.getUnseenCount()
- return defer.succeed(r)
-
- def addMessage(self, message, flags, date = None):
- self.messages.append((message, flags, date, self.mUID))
- self.mUID += 1
- return defer.succeed(None)
-
- def expunge(self):
- delete = []
- for i in self.messages:
- if '\\Deleted' in i[1]:
- delete.append(i)
- for i in delete:
- self.messages.remove(i)
- return [i[3] for i in delete]
-
- def close(self):
- self.closed = True
-
-class Account(imap4.MemoryAccount):
- mailboxFactory = SimpleMailbox
- def _emptyMailbox(self, name, id):
- return self.mailboxFactory()
-
- def select(self, name, rw=1):
- mbox = imap4.MemoryAccount.select(self, name)
- if mbox is not None:
- mbox.rw = rw
- return mbox
-
-class SimpleServer(imap4.IMAP4Server):
- def __init__(self, *args, **kw):
- imap4.IMAP4Server.__init__(self, *args, **kw)
- realm = TestRealm()
- realm.theAccount = Account('testuser')
- portal = cred.portal.Portal(realm)
- c = cred.checkers.InMemoryUsernamePasswordDatabaseDontUse()
- self.checker = c
- self.portal = portal
- portal.registerChecker(c)
- self.timeoutTest = False
-
- def lineReceived(self, line):
- if self.timeoutTest:
- #Do not send a respones
- return
-
- imap4.IMAP4Server.lineReceived(self, line)
-
- _username = 'testuser'
- _password = 'password-test'
- def authenticateLogin(self, username, password):
- if username == self._username and password == self._password:
- return imap4.IAccount, self.theAccount, lambda: None
- raise cred.error.UnauthorizedLogin()
-
-
-class SimpleClient(imap4.IMAP4Client):
- def __init__(self, deferred, contextFactory = None):
- imap4.IMAP4Client.__init__(self, contextFactory)
- self.deferred = deferred
- self.events = []
-
- def serverGreeting(self, caps):
- self.deferred.callback(None)
-
- def modeChanged(self, writeable):
- self.events.append(['modeChanged', writeable])
- self.transport.loseConnection()
-
- def flagsChanged(self, newFlags):
- self.events.append(['flagsChanged', newFlags])
- self.transport.loseConnection()
-
- def newMessages(self, exists, recent):
- self.events.append(['newMessages', exists, recent])
- self.transport.loseConnection()
-
- def fetchBodyParts(self, message, parts):
- """Fetch some parts of the body.
-
- @param message: message with parts to fetch
- @type message: C{str}
- @param parts: a list of int/str
- @type parts: C{list}
- """
- cmd = "%s (BODY[%s]" % (message, parts[0])
- for p in parts[1:]:
- cmd += " BODY[%s]" % p
- cmd += ")"
- d = self.sendCommand(imap4.Command("FETCH", cmd,
- wantResponse=("FETCH",)))
- d.addCallback(self.__cb_fetchBodyParts)
- return d
-
- def __cb_fetchBodyParts(self, (lines, last)):
- info = {}
- for line in lines:
- parts = line.split(None, 2)
- if len(parts) == 3:
- if parts[1] == "FETCH":
- try:
- mail_id = int(parts[0])
- except ValueError:
- raise imap4.IllegalServerResponse, line
- else:
- body_parts = imap4.parseNestedParens(parts[2])[0]
- dict_parts = {}
- for i in range(len(body_parts)/3):
- dict_parts[body_parts[3*i+1][0]] = body_parts[3*i+2]
- info[mail_id] = dict_parts
- return info
-
-class IMAP4HelperMixin:
- serverCTX = None
- clientCTX = None
-
- def setUp(self):
- d = defer.Deferred()
- self.server = SimpleServer(contextFactory=self.serverCTX)
- self.client = SimpleClient(d, contextFactory=self.clientCTX)
- self.connected = d
-
- SimpleMailbox.messages = []
- theAccount = Account('testuser')
- theAccount.mboxType = SimpleMailbox
- SimpleServer.theAccount = theAccount
-
- def tearDown(self):
- del self.server
- del self.client
- del self.connected
-
- def _cbStopClient(self, ignore):
- self.client.transport.loseConnection()
-
- def _ebGeneral(self, failure):
- self.client.transport.loseConnection()
- self.server.transport.loseConnection()
- failure.printTraceback(open('failure.log', 'w'))
- failure.printTraceback()
- raise failure.value
-
- def loopback(self):
- return loopback.loopbackAsync(self.server, self.client)
-
-class IMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):
- def testCapability(self):
- caps = {}
- def getCaps():
- def gotCaps(c):
- caps.update(c)
- self.server.transport.loseConnection()
- return self.client.getCapabilities().addCallback(gotCaps)
- d1 = self.connected.addCallback(strip(getCaps)).addErrback(self._ebGeneral)
- d = defer.gatherResults([self.loopback(), d1])
- expected = {'IMAP4rev1': None, 'NAMESPACE': None, 'IDLE': None}
- return d.addCallback(lambda _: self.assertEquals(expected, caps))
-
- def testCapabilityWithAuth(self):
- caps = {}
- self.server.challengers['CRAM-MD5'] = cred.credentials.CramMD5Credentials
- def getCaps():
- def gotCaps(c):
- caps.update(c)
- self.server.transport.loseConnection()
- return self.client.getCapabilities().addCallback(gotCaps)
- d1 = self.connected.addCallback(strip(getCaps)).addErrback(self._ebGeneral)
- d = defer.gatherResults([self.loopback(), d1])
-
- expCap = {'IMAP4rev1': None, 'NAMESPACE': None,
- 'IDLE': None, 'AUTH': ['CRAM-MD5']}
-
- return d.addCallback(lambda _: self.assertEquals(expCap, caps))
-
- def testLogout(self):
- self.loggedOut = 0
- def logout():
- def setLoggedOut():
- self.loggedOut = 1
- self.client.logout().addCallback(strip(setLoggedOut))
- self.connected.addCallback(strip(logout)).addErrback(self._ebGeneral)
- d = self.loopback()
- return d.addCallback(lambda _: self.assertEquals(self.loggedOut, 1))
-
- def testNoop(self):
- self.responses = None
- def noop():
- def setResponses(responses):
- self.responses = responses
- self.server.transport.loseConnection()
- self.client.noop().addCallback(setResponses)
- self.connected.addCallback(strip(noop)).addErrback(self._ebGeneral)
- d = self.loopback()
- return d.addCallback(lambda _: self.assertEquals(self.responses, []))
-
- def testLogin(self):
- def login():
- d = self.client.login('testuser', 'password-test')
- d.addCallback(self._cbStopClient)
- d1 = self.connected.addCallback(strip(login)).addErrback(self._ebGeneral)
- d = defer.gatherResults([d1, self.loopback()])
- return d.addCallback(self._cbTestLogin)
-
- def _cbTestLogin(self, ignored):
- self.assertEquals(self.server.account, SimpleServer.theAccount)
- self.assertEquals(self.server.state, 'auth')
-
- def testFailedLogin(self):
- def login():
- d = self.client.login('testuser', 'wrong-password')
- d.addBoth(self._cbStopClient)
-
- d1 = self.connected.addCallback(strip(login)).addErrback(self._ebGeneral)
- d2 = self.loopback()
- d = defer.gatherResults([d1, d2])
- return d.addCallback(self._cbTestFailedLogin)
-
- def _cbTestFailedLogin(self, ignored):
- self.assertEquals(self.server.account, None)
- self.assertEquals(self.server.state, 'unauth')
-
-
- def testLoginRequiringQuoting(self):
- self.server._username = '{test}user'
- self.server._password = '{test}password'
-
- def login():
- d = self.client.login('{test}user', '{test}password')
- d.addBoth(self._cbStopClient)
-
- d1 = self.connected.addCallback(strip(login)).addErrback(self._ebGeneral)
- d = defer.gatherResults([self.loopback(), d1])
- return d.addCallback(self._cbTestLoginRequiringQuoting)
-
- def _cbTestLoginRequiringQuoting(self, ignored):
- self.assertEquals(self.server.account, SimpleServer.theAccount)
- self.assertEquals(self.server.state, 'auth')
-
-
- def testNamespace(self):
- self.namespaceArgs = None
- def login():
- return self.client.login('testuser', 'password-test')
- def namespace():
- def gotNamespace(args):
- self.namespaceArgs = args
- self._cbStopClient(None)
- return self.client.namespace().addCallback(gotNamespace)
-
- d1 = self.connected.addCallback(strip(login))
- d1.addCallback(strip(namespace))
- d1.addErrback(self._ebGeneral)
- d2 = self.loopback()
- d = defer.gatherResults([d1, d2])
- d.addCallback(lambda _: self.assertEquals(self.namespaceArgs,
- [[['', '/']], [], []]))
- return d
-
- def testSelect(self):
- SimpleServer.theAccount.addMailbox('test-mailbox')
- self.selectedArgs = None
- def login():
- return self.client.login('testuser', 'password-test')
- def select():
- def selected(args):
- self.selectedArgs = args
- self._cbStopClient(None)
- d = self.client.select('test-mailbox')
- d.addCallback(selected)
- return d
-
- d1 = self.connected.addCallback(strip(login))
- d1.addCallback(strip(select))
- d1.addErrback(self._ebGeneral)
- d2 = self.loopback()
- return defer.gatherResults([d1, d2]).addCallback(self._cbTestSelect)
-
- def _cbTestSelect(self, ignored):
- mbox = SimpleServer.theAccount.mailboxes['TEST-MAILBOX']
- self.assertEquals(self.server.mbox, mbox)
- self.assertEquals(self.selectedArgs, {
- 'EXISTS': 9, 'RECENT': 3, 'UIDVALIDITY': 42,
- 'FLAGS': ('\\Flag1', 'Flag2', '\\AnotherSysFlag', 'LastFlag'),
- 'READ-WRITE': 1
- })
-
- def testExamine(self):
- SimpleServer.theAccount.addMailbox('test-mailbox')
- self.examinedArgs = None
- def login():
- return self.client.login('testuser', 'password-test')
- def examine():
- def examined(args):
- self.examinedArgs = args
- self._cbStopClient(None)
- d = self.client.examine('test-mailbox')
- d.addCallback(examined)
- return d
-
- d1 = self.connected.addCallback(strip(login))
- d1.addCallback(strip(examine))
- d1.addErrback(self._ebGeneral)
- d2 = self.loopback()
- d = defer.gatherResults([d1, d2])
- return d.addCallback(self._cbTestExamine)
-
- def _cbTestExamine(self, ignored):
- mbox = SimpleServer.theAccount.mailboxes['TEST-MAILBOX']
- self.assertEquals(self.server.mbox, mbox)
- self.assertEquals(self.examinedArgs, {
- 'EXISTS': 9, 'RECENT': 3, 'UIDVALIDITY': 42,
- 'FLAGS': ('\\Flag1', 'Flag2', '\\AnotherSysFlag', 'LastFlag'),
- 'READ-WRITE': 0
- })
-
- def testCreate(self):
- succeed = ('testbox', 'test/box', 'test/', 'test/box/box', 'INBOX')
- fail = ('testbox', 'test/box')
-
- def cb(): self.result.append(1)
- def eb(failure): self.result.append(0)
-
- def login():
- return self.client.login('testuser', 'password-test')
- def create():
- for name in succeed + fail:
- d = self.client.create(name)
- d.addCallback(strip(cb)).addErrback(eb)
- d.addCallbacks(self._cbStopClient, self._ebGeneral)
-
- self.result = []
- d1 = self.connected.addCallback(strip(login)).addCallback(strip(create))
- d2 = self.loopback()
- d = defer.gatherResults([d1, d2])
- return d.addCallback(self._cbTestCreate, succeed, fail)
-
- def _cbTestCreate(self, ignored, succeed, fail):
- self.assertEquals(self.result, [1] * len(succeed) + [0] * len(fail))
- mbox = SimpleServer.theAccount.mailboxes.keys()
- answers = ['inbox', 'testbox', 'test/box', 'test', 'test/box/box']
- mbox.sort()
- answers.sort()
- self.assertEquals(mbox, [a.upper() for a in answers])
-
- def testDelete(self):
- SimpleServer.theAccount.addMailbox('delete/me')
-
- def login():
- return self.client.login('testuser', 'password-test')
- def delete():
- return self.client.delete('delete/me')
- d1 = self.connected.addCallback(strip(login))
- d1.addCallbacks(strip(delete), self._ebGeneral)
- d1.addCallbacks(self._cbStopClient, self._ebGeneral)
- d2 = self.loopback()
- d = defer.gatherResults([d1, d2])
- d.addCallback(lambda _:
- self.assertEquals(SimpleServer.theAccount.mailboxes.keys(), []))
- return d
-
- def testIllegalInboxDelete(self):
- self.stashed = None
- def login():
- return self.client.login('testuser', 'password-test')
- def delete():
- return self.client.delete('inbox')
- def stash(result):
- self.stashed = result
-
- d1 = self.connected.addCallback(strip(login))
- d1.addCallbacks(strip(delete), self._ebGeneral)
- d1.addBoth(stash)
- d1.addCallbacks(self._cbStopClient, self._ebGeneral)
- d2 = self.loopback()
- d = defer.gatherResults([d1, d2])
- d.addCallback(lambda _: self.failUnless(isinstance(self.stashed,
- failure.Failure)))
- return d
-
-
- def testNonExistentDelete(self):
- def login():
- return self.client.login('testuser', 'password-test')
- def delete():
- return self.client.delete('delete/me')
- def deleteFailed(failure):
- self.failure = failure
-
- self.failure = None
- d1 = self.connected.addCallback(strip(login))
- d1.addCallback(strip(delete)).addErrback(deleteFailed)
- d1.addCallbacks(self._cbStopClient, self._ebGeneral)
- d2 = self.loopback()
- d = defer.gatherResults([d1, d2])
- d.addCallback(lambda _: self.assertEquals(str(self.failure.value),
- 'No such mailbox'))
- return d
-
-
- def testIllegalDelete(self):
- m = SimpleMailbox()
- m.flags = (r'\Noselect',)
- SimpleServer.theAccount.addMailbox('delete', m)
- SimpleServer.theAccount.addMailbox('delete/me')
-
- def login():
- return self.client.login('testuser', 'password-test')
- def delete():
- return self.client.delete('delete')
- def deleteFailed(failure):
- self.failure = failure
-
- self.failure = None
- d1 = self.connected.addCallback(strip(login))
- d1.addCallback(strip(delete)).addErrback(deleteFailed)
- d1.addCallbacks(self._cbStopClient, self._ebGeneral)
- d2 = self.loopback()
- d = defer.gatherResults([d1, d2])
- expected = "Hierarchically inferior mailboxes exist and \\Noselect is set"
- d.addCallback(lambda _:
- self.assertEquals(str(self.failure.value), expected))
- return d
-
- def testRename(self):
- SimpleServer.theAccount.addMailbox('oldmbox')
- def login():
- return self.client.login('testuser', 'password-test')
- def rename():
- return self.client.rename('oldmbox', 'newname')
-
- d1 = self.connected.addCallback(strip(login))
- d1.addCallbacks(strip(rename), self._ebGeneral)
- d1.addCallbacks(self._cbStopClient, self._ebGeneral)
- d2 = self.loopback()
- d = defer.gatherResults([d1, d2])
- d.addCallback(lambda _:
- self.assertEquals(SimpleServer.theAccount.mailboxes.keys(),
- ['NEWNAME']))
- return d
-
- def testIllegalInboxRename(self):
- self.stashed = None
- def login():
- return self.client.login('testuser', 'password-test')
- def rename():
- return self.client.rename('inbox', 'frotz')
- def stash(stuff):
- self.stashed = stuff
-
- d1 = self.connected.addCallback(strip(login))
- d1.addCallbacks(strip(rename), self._ebGeneral)
- d1.addBoth(stash)
- d1.addCallbacks(self._cbStopClient, self._ebGeneral)
- d2 = self.loopback()
- d = defer.gatherResults([d1, d2])
- d.addCallback(lambda _:
- self.failUnless(isinstance(self.stashed, failure.Failure)))
- return d
-
- def testHierarchicalRename(self):
- SimpleServer.theAccount.create('oldmbox/m1')
- SimpleServer.theAccount.create('oldmbox/m2')
- def login():
- return self.client.login('testuser', 'password-test')
- def rename():
- return self.client.rename('oldmbox', 'newname')
-
- d1 = self.connected.addCallback(strip(login))
- d1.addCallbacks(strip(rename), self._ebGeneral)
- d1.addCallbacks(self._cbStopClient, self._ebGeneral)
- d2 = self.loopback()
- d = defer.gatherResults([d1, d2])
- return d.addCallback(self._cbTestHierarchicalRename)
-
- def _cbTestHierarchicalRename(self, ignored):
- mboxes = SimpleServer.theAccount.mailboxes.keys()
- expected = ['newname', 'newname/m1', 'newname/m2']
- mboxes.sort()
- self.assertEquals(mboxes, [s.upper() for s in expected])
-
- def testSubscribe(self):
- def login():
- return self.client.login('testuser', 'password-test')
- def subscribe():
- return self.client.subscribe('this/mbox')
-
- d1 = self.connected.addCallback(strip(login))
- d1.addCallbacks(strip(subscribe), self._ebGeneral)
- d1.addCallbacks(self._cbStopClient, self._ebGeneral)
- d2 = self.loopback()
- d = defer.gatherResults([d1, d2])
- d.addCallback(lambda _:
- self.assertEquals(SimpleServer.theAccount.subscriptions,
- ['THIS/MBOX']))
- return d
-
- def testUnsubscribe(self):
- SimpleServer.theAccount.subscriptions = ['THIS/MBOX', 'THAT/MBOX']
- def login():
- return self.client.login('testuser', 'password-test')
- def unsubscribe():
- return self.client.unsubscribe('this/mbox')
-
- d1 = self.connected.addCallback(strip(login))
- d1.addCallbacks(strip(unsubscribe), self._ebGeneral)
- d1.addCallbacks(self._cbStopClient, self._ebGeneral)
- d2 = self.loopback()
- d = defer.gatherResults([d1, d2])
- d.addCallback(lambda _:
- self.assertEquals(SimpleServer.theAccount.subscriptions,
- ['THAT/MBOX']))
- return d
-
- def _listSetup(self, f):
- SimpleServer.theAccount.addMailbox('root/subthing')
- SimpleServer.theAccount.addMailbox('root/another-thing')
- SimpleServer.theAccount.addMailbox('non-root/subthing')
-
- def login():
- return self.client.login('testuser', 'password-test')
- def listed(answers):
- self.listed = answers
-
- self.listed = None
- d1 = self.connected.addCallback(strip(login))
- d1.addCallbacks(strip(f), self._ebGeneral)
- d1.addCallbacks(listed, self._ebGeneral)
- d1.addCallbacks(self._cbStopClient, self._ebGeneral)
- d2 = self.loopback()
- return defer.gatherResults([d1, d2]).addCallback(lambda _: self.listed)
-
- def testList(self):
- def list():
- return self.client.list('root', '%')
- d = self._listSetup(list)
- d.addCallback(lambda listed: self.assertEquals(
- sortNest(listed),
- sortNest([
- (SimpleMailbox.flags, "/", "ROOT/SUBTHING"),
- (SimpleMailbox.flags, "/", "ROOT/ANOTHER-THING")
- ])
- ))
- return d
-
- def testLSub(self):
- SimpleServer.theAccount.subscribe('ROOT/SUBTHING')
- def lsub():
- return self.client.lsub('root', '%')
- d = self._listSetup(lsub)
- d.addCallback(self.assertEquals,
- [(SimpleMailbox.flags, "/", "ROOT/SUBTHING")])
- return d
-
- def testStatus(self):
- SimpleServer.theAccount.addMailbox('root/subthing')
- def login():
- return self.client.login('testuser', 'password-test')
- def status():
- return self.client.status('root/subthing', 'MESSAGES', 'UIDNEXT', 'UNSEEN')
- def statused(result):
- self.statused = result
-
- self.statused = None
- d1 = self.connected.addCallback(strip(login))
- d1.addCallbacks(strip(status), self._ebGeneral)
- d1.addCallbacks(statused, self._ebGeneral)
- d1.addCallbacks(self._cbStopClient, self._ebGeneral)
- d2 = self.loopback()
- d = defer.gatherResults([d1, d2])
- d.addCallback(lambda _: self.assertEquals(
- self.statused,
- {'MESSAGES': 9, 'UIDNEXT': '10', 'UNSEEN': 4}
- ))
- return d
-
- def testFailedStatus(self):
- def login():
- return self.client.login('testuser', 'password-test')
- def status():
- return self.client.status('root/nonexistent', 'MESSAGES', 'UIDNEXT', 'UNSEEN')
- def statused(result):
- self.statused = result
- def failed(failure):
- self.failure = failure
-
- self.statused = self.failure = None
- d1 = self.connected.addCallback(strip(login))
- d1.addCallbacks(strip(status), self._ebGeneral)
- d1.addCallbacks(statused, failed)
- d1.addCallbacks(self._cbStopClient, self._ebGeneral)
- d2 = self.loopback()
- return defer.gatherResults([d1, d2]).addCallback(self._cbTestFailedStatus)
-
- def _cbTestFailedStatus(self, ignored):
- self.assertEquals(
- self.statused, None
- )
- self.assertEquals(
- self.failure.value.args,
- ('Could not open mailbox',)
- )
-
- def testFullAppend(self):
- infile = util.sibpath(__file__, 'rfc822.message')
- message = open(infile)
- SimpleServer.theAccount.addMailbox('root/subthing')
- def login():
- return self.client.login('testuser', 'password-test')
- def append():
- return self.client.append(
- 'root/subthing',
- message,
- ('\\SEEN', '\\DELETED'),
- 'Tue, 17 Jun 2003 11:22:16 -0600 (MDT)',
- )
-
- d1 = self.connected.addCallback(strip(login))
- d1.addCallbacks(strip(append), self._ebGeneral)
- d1.addCallbacks(self._cbStopClient, self._ebGeneral)
- d2 = self.loopback()
- d = defer.gatherResults([d1, d2])
- return d.addCallback(self._cbTestFullAppend, infile)
-
- def _cbTestFullAppend(self, ignored, infile):
- mb = SimpleServer.theAccount.mailboxes['ROOT/SUBTHING']
- self.assertEquals(1, len(mb.messages))
- self.assertEquals(
- (['\\SEEN', '\\DELETED'], 'Tue, 17 Jun 2003 11:22:16 -0600 (MDT)', 0),
- mb.messages[0][1:]
- )
- self.assertEquals(open(infile).read(), mb.messages[0][0].getvalue())
-
- def testPartialAppend(self):
- infile = util.sibpath(__file__, 'rfc822.message')
- message = open(infile)
- SimpleServer.theAccount.addMailbox('PARTIAL/SUBTHING')
- def login():
- return self.client.login('testuser', 'password-test')
- def append():
- message = file(infile)
- return self.client.sendCommand(
- imap4.Command(
- 'APPEND',
- 'PARTIAL/SUBTHING (\\SEEN) "Right now" {%d}' % os.path.getsize(infile),
- (), self.client._IMAP4Client__cbContinueAppend, message
- )
- )
- d1 = self.connected.addCallback(strip(login))
- d1.addCallbacks(strip(append), self._ebGeneral)
- d1.addCallbacks(self._cbStopClient, self._ebGeneral)
- d2 = self.loopback()
- d = defer.gatherResults([d1, d2])
- return d.addCallback(self._cbTestPartialAppend, infile)
-
- def _cbTestPartialAppend(self, ignored, infile):
- mb = SimpleServer.theAccount.mailboxes['PARTIAL/SUBTHING']
- self.assertEquals(1, len(mb.messages))
- self.assertEquals(
- (['\\SEEN'], 'Right now', 0),
- mb.messages[0][1:]
- )
- self.assertEquals(open(infile).read(), mb.messages[0][0].getvalue())
-
- def testCheck(self):
- SimpleServer.theAccount.addMailbox('root/subthing')
- def login():
- return self.client.login('testuser', 'password-test')
- def select():
- return self.client.select('root/subthing')
- def check():
- return self.client.check()
-
- d = self.connected.addCallback(strip(login))
- d.addCallbacks(strip(select), self._ebGeneral)
- d.addCallbacks(strip(check), self._ebGeneral)
- d.addCallbacks(self._cbStopClient, self._ebGeneral)
- return self.loopback()
-
- # Okay, that was fun
-
- def testClose(self):
- m = SimpleMailbox()
- m.messages = [
- ('Message 1', ('\\Deleted', 'AnotherFlag'), None, 0),
- ('Message 2', ('AnotherFlag',), None, 1),
- ('Message 3', ('\\Deleted',), None, 2),
- ]
- SimpleServer.theAccount.addMailbox('mailbox', m)
- def login():
- return self.client.login('testuser', 'password-test')
- def select():
- return self.client.select('mailbox')
- def close():
- return self.client.close()
-
- d = self.connected.addCallback(strip(login))
- d.addCallbacks(strip(select), self._ebGeneral)
- d.addCallbacks(strip(close), self._ebGeneral)
- d.addCallbacks(self._cbStopClient, self._ebGeneral)
- d2 = self.loopback()
- return defer.gatherResults([d, d2]).addCallback(self._cbTestClose, m)
-
- def _cbTestClose(self, ignored, m):
- self.assertEquals(len(m.messages), 1)
- self.assertEquals(m.messages[0], ('Message 2', ('AnotherFlag',), None, 1))
- self.failUnless(m.closed)
-
- def testExpunge(self):
- m = SimpleMailbox()
- m.messages = [
- ('Message 1', ('\\Deleted', 'AnotherFlag'), None, 0),
- ('Message 2', ('AnotherFlag',), None, 1),
- ('Message 3', ('\\Deleted',), None, 2),
- ]
- SimpleServer.theAccount.addMailbox('mailbox', m)
- def login():
- return self.client.login('testuser', 'password-test')
- def select():
- return self.client.select('mailbox')
- def expunge():
- return self.client.expunge()
- def expunged(results):
- self.failIf(self.server.mbox is None)
- self.results = results
-
- self.results = None
- d1 = self.connected.addCallback(strip(login))
- d1.addCallbacks(strip(select), self._ebGeneral)
- d1.addCallbacks(strip(expunge), self._ebGeneral)
- d1.addCallbacks(expunged, self._ebGeneral)
- d1.addCallbacks(self._cbStopClient, self._ebGeneral)
- d2 = self.loopback()
- d = defer.gatherResults([d1, d2])
- return d.addCallback(self._cbTestExpunge, m)
-
- def _cbTestExpunge(self, ignored, m):
- self.assertEquals(len(m.messages), 1)
- self.assertEquals(m.messages[0], ('Message 2', ('AnotherFlag',), None, 1))
-
- self.assertEquals(self.results, [0, 2])
-
-class TestRealm:
- theAccount = None
-
- def requestAvatar(self, avatarId, mind, *interfaces):
- return imap4.IAccount, self.theAccount, lambda: None
-
-class TestChecker:
- credentialInterfaces = (cred.credentials.IUsernameHashedPassword, cred.credentials.IUsernamePassword)
-
- users = {
- 'testuser': 'secret'
- }
-
- def requestAvatarId(self, credentials):
- if credentials.username in self.users:
- return defer.maybeDeferred(
- credentials.checkPassword, self.users[credentials.username]
- ).addCallback(self._cbCheck, credentials.username)
-
- def _cbCheck(self, result, username):
- if result:
- return username
- raise cred.error.UnauthorizedLogin()
-
-class AuthenticatorTestCase(IMAP4HelperMixin, unittest.TestCase):
- def setUp(self):
- IMAP4HelperMixin.setUp(self)
-
- realm = TestRealm()
- realm.theAccount = Account('testuser')
- portal = cred.portal.Portal(realm)
- portal.registerChecker(TestChecker())
- self.server.portal = portal
-
- self.authenticated = 0
- self.account = realm.theAccount
-
- def testCramMD5(self):
- self.server.challengers['CRAM-MD5'] = cred.credentials.CramMD5Credentials
- cAuth = imap4.CramMD5ClientAuthenticator('testuser')
- self.client.registerAuthenticator(cAuth)
-
- def auth():
- return self.client.authenticate('secret')
- def authed():
- self.authenticated = 1
-
- d1 = self.connected.addCallback(strip(auth))
- d1.addCallbacks(strip(authed), self._ebGeneral)
- d1.addCallbacks(self._cbStopClient, self._ebGeneral)
- d2 = self.loopback()
- d = defer.gatherResults([d1, d2])
- return d.addCallback(self._cbTestCramMD5)
-
- def _cbTestCramMD5(self, ignored):
- self.assertEquals(self.authenticated, 1)
- self.assertEquals(self.server.account, self.account)
-
- def testFailedCramMD5(self):
- self.server.challengers['CRAM-MD5'] = cred.credentials.CramMD5Credentials
- cAuth = imap4.CramMD5ClientAuthenticator('testuser')
- self.client.registerAuthenticator(cAuth)
-
- def misauth():
- return self.client.authenticate('not the secret')
- def authed():
- self.authenticated = 1
- def misauthed():
- self.authenticated = -1
-
- d1 = self.connected.addCallback(strip(misauth))
- d1.addCallbacks(strip(authed), strip(misauthed))
- d1.addCallbacks(self._cbStopClient, self._ebGeneral)
- d = defer.gatherResults([self.loopback(), d1])
- return d.addCallback(self._cbTestFailedCramMD5)
-
- def _cbTestFailedCramMD5(self, ignored):
- self.assertEquals(self.authenticated, -1)
- self.assertEquals(self.server.account, None)
-
- def testLOGIN(self):
- self.server.challengers['LOGIN'] = imap4.LOGINCredentials
- cAuth = imap4.LOGINAuthenticator('testuser')
- self.client.registerAuthenticator(cAuth)
-
- def auth():
- return self.client.authenticate('secret')
- def authed():
- self.authenticated = 1
-
- d1 = self.connected.addCallback(strip(auth))
- d1.addCallbacks(strip(authed), self._ebGeneral)
- d1.addCallbacks(self._cbStopClient, self._ebGeneral)
- d = defer.gatherResults([self.loopback(), d1])
- return d.addCallback(self._cbTestLOGIN)
-
- def _cbTestLOGIN(self, ignored):
- self.assertEquals(self.authenticated, 1)
- self.assertEquals(self.server.account, self.account)
-
- def testFailedLOGIN(self):
- self.server.challengers['LOGIN'] = imap4.LOGINCredentials
- cAuth = imap4.LOGINAuthenticator('testuser')
- self.client.registerAuthenticator(cAuth)
-
- def misauth():
- return self.client.authenticate('not the secret')
- def authed():
- self.authenticated = 1
- def misauthed():
- self.authenticated = -1
-
- d1 = self.connected.addCallback(strip(misauth))
- d1.addCallbacks(strip(authed), strip(misauthed))
- d1.addCallbacks(self._cbStopClient, self._ebGeneral)
- d = defer.gatherResults([self.loopback(), d1])
- return d.addCallback(self._cbTestFailedLOGIN)
-
- def _cbTestFailedLOGIN(self, ignored):
- self.assertEquals(self.authenticated, -1)
- self.assertEquals(self.server.account, None)
-
- def testPLAIN(self):
- self.server.challengers['PLAIN'] = imap4.PLAINCredentials
- cAuth = imap4.PLAINAuthenticator('testuser')
- self.client.registerAuthenticator(cAuth)
-
- def auth():
- return self.client.authenticate('secret')
- def authed():
- self.authenticated = 1
-
- d1 = self.connected.addCallback(strip(auth))
- d1.addCallbacks(strip(authed), self._ebGeneral)
- d1.addCallbacks(self._cbStopClient, self._ebGeneral)
- d = defer.gatherResults([self.loopback(), d1])
- return d.addCallback(self._cbTestPLAIN)
-
- def _cbTestPLAIN(self, ignored):
- self.assertEquals(self.authenticated, 1)
- self.assertEquals(self.server.account, self.account)
-
- def testFailedPLAIN(self):
- self.server.challengers['PLAIN'] = imap4.PLAINCredentials
- cAuth = imap4.PLAINAuthenticator('testuser')
- self.client.registerAuthenticator(cAuth)
-
- def misauth():
- return self.client.authenticate('not the secret')
- def authed():
- self.authenticated = 1
- def misauthed():
- self.authenticated = -1
-
- d1 = self.connected.addCallback(strip(misauth))
- d1.addCallbacks(strip(authed), strip(misauthed))
- d1.addCallbacks(self._cbStopClient, self._ebGeneral)
- d = defer.gatherResults([self.loopback(), d1])
- return d.addCallback(self._cbTestFailedPLAIN)
-
- def _cbTestFailedPLAIN(self, ignored):
- self.assertEquals(self.authenticated, -1)
- self.assertEquals(self.server.account, None)
-
-
-class UnsolicitedResponseTestCase(IMAP4HelperMixin, unittest.TestCase):
- def testReadWrite(self):
- def login():
- return self.client.login('testuser', 'password-test')
- def loggedIn():
- self.server.modeChanged(1)
-
- d1 = self.connected.addCallback(strip(login))
- d1.addCallback(strip(loggedIn)).addErrback(self._ebGeneral)
- d = defer.gatherResults([self.loopback(), d1])
- return d.addCallback(self._cbTestReadWrite)
-
- def _cbTestReadWrite(self, ignored):
- E = self.client.events
- self.assertEquals(E, [['modeChanged', 1]])
-
- def testReadOnly(self):
- def login():
- return self.client.login('testuser', 'password-test')
- def loggedIn():
- self.server.modeChanged(0)
-
- d1 = self.connected.addCallback(strip(login))
- d1.addCallback(strip(loggedIn)).addErrback(self._ebGeneral)
- d = defer.gatherResults([self.loopback(), d1])
- return d.addCallback(self._cbTestReadOnly)
-
- def _cbTestReadOnly(self, ignored):
- E = self.client.events
- self.assertEquals(E, [['modeChanged', 0]])
-
- def testFlagChange(self):
- flags = {
- 1: ['\\Answered', '\\Deleted'],
- 5: [],
- 10: ['\\Recent']
- }
- def login():
- return self.client.login('testuser', 'password-test')
- def loggedIn():
- self.server.flagsChanged(flags)
-
- d1 = self.connected.addCallback(strip(login))
- d1.addCallback(strip(loggedIn)).addErrback(self._ebGeneral)
- d = defer.gatherResults([self.loopback(), d1])
- return d.addCallback(self._cbTestFlagChange, flags)
-
- def _cbTestFlagChange(self, ignored, flags):
- E = self.client.events
- expect = [['flagsChanged', {x[0]: x[1]}] for x in flags.items()]
- E.sort()
- expect.sort()
- self.assertEquals(E, expect)
-
- def testNewMessages(self):
- def login():
- return self.client.login('testuser', 'password-test')
- def loggedIn():
- self.server.newMessages(10, None)
-
- d1 = self.connected.addCallback(strip(login))
- d1.addCallback(strip(loggedIn)).addErrback(self._ebGeneral)
- d = defer.gatherResults([self.loopback(), d1])
- return d.addCallback(self._cbTestNewMessages)
-
- def _cbTestNewMessages(self, ignored):
- E = self.client.events
- self.assertEquals(E, [['newMessages', 10, None]])
-
- def testNewRecentMessages(self):
- def login():
- return self.client.login('testuser', 'password-test')
- def loggedIn():
- self.server.newMessages(None, 10)
-
- d1 = self.connected.addCallback(strip(login))
- d1.addCallback(strip(loggedIn)).addErrback(self._ebGeneral)
- d = defer.gatherResults([self.loopback(), d1])
- return d.addCallback(self._cbTestNewRecentMessages)
-
- def _cbTestNewRecentMessages(self, ignored):
- E = self.client.events
- self.assertEquals(E, [['newMessages', None, 10]])
-
- def testNewMessagesAndRecent(self):
- def login():
- return self.client.login('testuser', 'password-test')
- def loggedIn():
- self.server.newMessages(20, 10)
-
- d1 = self.connected.addCallback(strip(login))
- d1.addCallback(strip(loggedIn)).addErrback(self._ebGeneral)
- d = defer.gatherResults([self.loopback(), d1])
- return d.addCallback(self._cbTestNewMessagesAndRecent)
-
- def _cbTestNewMessagesAndRecent(self, ignored):
- E = self.client.events
- self.assertEquals(E, [['newMessages', 20, None], ['newMessages', None, 10]])
-
-
-class ClientCapabilityTests(unittest.TestCase):
- """
- Tests for issuance of the CAPABILITY command and handling of its response.
- """
- def setUp(self):
- """
- Create an L{imap4.IMAP4Client} connected to a L{StringTransport}.
- """
- self.transport = StringTransport()
- self.protocol = imap4.IMAP4Client()
- self.protocol.makeConnection(self.transport)
- self.protocol.dataReceived('* OK [IMAP4rev1]\r\n')
-
-
- def test_simpleAtoms(self):
- """
- A capability response consisting only of atoms without C{'='} in them
- should result in a dict mapping those atoms to C{None}.
- """
- capabilitiesResult = self.protocol.getCapabilities(useCache=False)
- self.protocol.dataReceived('* CAPABILITY IMAP4rev1 LOGINDISABLED\r\n')
- self.protocol.dataReceived('0001 OK Capability completed.\r\n')
- def gotCapabilities(capabilities):
- self.assertEqual(
- capabilities, {'IMAP4rev1': None, 'LOGINDISABLED': None})
- capabilitiesResult.addCallback(gotCapabilities)
- return capabilitiesResult
-
-
- def test_categoryAtoms(self):
- """
- A capability response consisting of atoms including C{'='} should have
- those atoms split on that byte and have capabilities in the same
- category aggregated into lists in the resulting dictionary.
-
- (n.b. - I made up the word "category atom"; the protocol has no notion
- of structure here, but rather allows each capability to define the
- semantics of its entry in the capability response in a freeform manner.
- If I had realized this earlier, the API for capabilities would look
- different. As it is, we can hope that no one defines any crazy
- semantics which are incompatible with this API, or try to figure out a
- better API when someone does. -exarkun)
- """
- capabilitiesResult = self.protocol.getCapabilities(useCache=False)
- self.protocol.dataReceived('* CAPABILITY IMAP4rev1 AUTH=LOGIN AUTH=PLAIN\r\n')
- self.protocol.dataReceived('0001 OK Capability completed.\r\n')
- def gotCapabilities(capabilities):
- self.assertEqual(
- capabilities, {'IMAP4rev1': None, 'AUTH': ['LOGIN', 'PLAIN']})
- capabilitiesResult.addCallback(gotCapabilities)
- return capabilitiesResult
-
-
- def test_mixedAtoms(self):
- """
- A capability response consisting of both simple and category atoms of
- the same type should result in a list containing C{None} as well as the
- values for the category.
- """
- capabilitiesResult = self.protocol.getCapabilities(useCache=False)
- # Exercise codepath for both orderings of =-having and =-missing
- # capabilities.
- self.protocol.dataReceived(
- '* CAPABILITY IMAP4rev1 FOO FOO=BAR BAR=FOO BAR\r\n')
- self.protocol.dataReceived('0001 OK Capability completed.\r\n')
- def gotCapabilities(capabilities):
- self.assertEqual(capabilities, {'IMAP4rev1': None,
- 'FOO': [None, 'BAR'],
- 'BAR': ['FOO', None]})
- capabilitiesResult.addCallback(gotCapabilities)
- return capabilitiesResult
-
-
-
-
-class HandCraftedTestCase(IMAP4HelperMixin, unittest.TestCase):
- def testTrailingLiteral(self):
- transport = StringTransport()
- c = imap4.IMAP4Client()
- c.makeConnection(transport)
- c.lineReceived('* OK [IMAP4rev1]')
-
- def cbSelect(ignored):
- d = c.fetchMessage('1')
- c.dataReceived('* 1 FETCH (RFC822 {10}\r\n0123456789\r\n RFC822.SIZE 10)\r\n')
- c.dataReceived('0003 OK FETCH\r\n')
- return d
-
- def cbLogin(ignored):
- d = c.select('inbox')
- c.lineReceived('0002 OK SELECT')
- d.addCallback(cbSelect)
- return d
-
- d = c.login('blah', 'blah')
- c.dataReceived('0001 OK LOGIN\r\n')
- d.addCallback(cbLogin)
- return d
-
- def testPathelogicalScatteringOfLiterals(self):
- self.server.checker.addUser('testuser', 'password-test')
- transport = StringTransport()
- self.server.makeConnection(transport)
-
- transport.clear()
- self.server.dataReceived("01 LOGIN {8}\r\n")
- self.assertEquals(transport.value(), "+ Ready for 8 octets of text\r\n")
-
- transport.clear()
- self.server.dataReceived("testuser {13}\r\n")
- self.assertEquals(transport.value(), "+ Ready for 13 octets of text\r\n")
-
- transport.clear()
- self.server.dataReceived("password-test\r\n")
- self.assertEquals(transport.value(), "01 OK LOGIN succeeded\r\n")
- self.assertEquals(self.server.state, 'auth')
-
- self.server.connectionLost(error.ConnectionDone("Connection done."))
-
- def testUnsolicitedResponseMixedWithSolicitedResponse(self):
-
- class StillSimplerClient(imap4.IMAP4Client):
- events = []
- def flagsChanged(self, newFlags):
- self.events.append(['flagsChanged', newFlags])
-
- transport = StringTransport()
- c = StillSimplerClient()
- c.makeConnection(transport)
- c.lineReceived('* OK [IMAP4rev1]')
-
- def login():
- d = c.login('blah', 'blah')
- c.dataReceived('0001 OK LOGIN\r\n')
- return d
- def select():
- d = c.select('inbox')
- c.lineReceived('0002 OK SELECT')
- return d
- def fetch():
- d = c.fetchSpecific('1:*',
- headerType='HEADER.FIELDS',
- headerArgs=['SUBJECT'])
- c.dataReceived('* 1 FETCH (BODY[HEADER.FIELDS ("SUBJECT")] {38}\r\n')
- c.dataReceived('Subject: Suprise for your woman...\r\n')
- c.dataReceived('\r\n')
- c.dataReceived(')\r\n')
- c.dataReceived('* 1 FETCH (FLAGS (\Seen))\r\n')
- c.dataReceived('* 2 FETCH (BODY[HEADER.FIELDS ("SUBJECT")] {75}\r\n')
- c.dataReceived('Subject: What you been doing. Order your meds here . ,. handcuff madsen\r\n')
- c.dataReceived('\r\n')
- c.dataReceived(')\r\n')
- c.dataReceived('0003 OK FETCH completed\r\n')
- return d
- def test(res):
- self.assertEquals(res, {
- 1: [['BODY', ['HEADER.FIELDS', ['SUBJECT']],
- 'Subject: Suprise for your woman...\r\n\r\n']],
- 2: [['BODY', ['HEADER.FIELDS', ['SUBJECT']],
- 'Subject: What you been doing. Order your meds here . ,. handcuff madsen\r\n\r\n']]
- })
-
- self.assertEquals(c.events, [['flagsChanged', {1: ['\\Seen']}]])
-
- return login(
- ).addCallback(strip(select)
- ).addCallback(strip(fetch)
- ).addCallback(test)
-
-
- def test_literalWithoutPrecedingWhitespace(self):
- """
- Literals should be recognized even when they are not preceded by
- whitespace.
- """
- transport = StringTransport()
- protocol = imap4.IMAP4Client()
-
- protocol.makeConnection(transport)
- protocol.lineReceived('* OK [IMAP4rev1]')
-
- def login():
- d = protocol.login('blah', 'blah')
- protocol.dataReceived('0001 OK LOGIN\r\n')
- return d
- def select():
- d = protocol.select('inbox')
- protocol.lineReceived('0002 OK SELECT')
- return d
- def fetch():
- d = protocol.fetchSpecific('1:*',
- headerType='HEADER.FIELDS',
- headerArgs=['SUBJECT'])
- protocol.dataReceived(
- '* 1 FETCH (BODY[HEADER.FIELDS ({7}\r\nSUBJECT)] "Hello")\r\n')
- protocol.dataReceived('0003 OK FETCH completed\r\n')
- return d
- def test(result):
- self.assertEqual(
- result, {1: [['BODY', ['HEADER.FIELDS', ['SUBJECT']], 'Hello']]})
-
- d = login()
- d.addCallback(strip(select))
- d.addCallback(strip(fetch))
- d.addCallback(test)
- return d
-
-
- def test_nonIntegerLiteralLength(self):
- """
- If the server sends a literal length which cannot be parsed as an
- integer, L{IMAP4Client.lineReceived} should cause the protocol to be
- disconnected by raising L{imap4.IllegalServerResponse}.
- """
- transport = StringTransport()
- protocol = imap4.IMAP4Client()
-
- protocol.makeConnection(transport)
- protocol.lineReceived('* OK [IMAP4rev1]')
-
- def login():
- d = protocol.login('blah', 'blah')
- protocol.dataReceived('0001 OK LOGIN\r\n')
- return d
- def select():
- d = protocol.select('inbox')
- protocol.lineReceived('0002 OK SELECT')
- return d
- def fetch():
- d = protocol.fetchSpecific('1:*',
- headerType='HEADER.FIELDS',
- headerArgs=['SUBJECT'])
- self.assertRaises(
- imap4.IllegalServerResponse,
- protocol.dataReceived,
- '* 1 FETCH {xyz}\r\n...')
- d = login()
- d.addCallback(strip(select))
- d.addCallback(strip(fetch))
- return d
-
-
-
-class FakeyServer(imap4.IMAP4Server):
- state = 'select'
- timeout = None
-
- def sendServerGreeting(self):
- pass
-
-class FakeyMessage:
- implements(imap4.IMessage)
-
- def __init__(self, headers, flags, date, body, uid, subpart):
- self.headers = headers
- self.flags = flags
- self.body = StringIO(body)
- self.size = len(body)
- self.date = date
- self.uid = uid
- self.subpart = subpart
-
- def getHeaders(self, negate, *names):
- self.got_headers = negate, names
- return self.headers
-
- def getFlags(self):
- return self.flags
-
- def getInternalDate(self):
- return self.date
-
- def getBodyFile(self):
- return self.body
-
- def getSize(self):
- return self.size
-
- def getUID(self):
- return self.uid
-
- def isMultipart(self):
- return self.subpart is not None
-
- def getSubPart(self, part):
- self.got_subpart = part
- return self.subpart[part]
-
-class NewStoreTestCase(unittest.TestCase, IMAP4HelperMixin):
- result = None
- storeArgs = None
-
- def setUp(self):
- self.received_messages = self.received_uid = None
-
- self.server = imap4.IMAP4Server()
- self.server.state = 'select'
- self.server.mbox = self
- self.connected = defer.Deferred()
- self.client = SimpleClient(self.connected)
-
- def addListener(self, x):
- pass
- def removeListener(self, x):
- pass
-
- def store(self, *args, **kw):
- self.storeArgs = args, kw
- return self.response
-
- def _storeWork(self):
- def connected():
- return self.function(self.messages, self.flags, self.silent, self.uid)
- def result(R):
- self.result = R
-
- self.connected.addCallback(strip(connected)
- ).addCallback(result
- ).addCallback(self._cbStopClient
- ).addErrback(self._ebGeneral)
-
- def check(ignored):
- self.assertEquals(self.result, self.expected)
- self.assertEquals(self.storeArgs, self.expectedArgs)
- d = loopback.loopbackTCP(self.server, self.client, noisy=False)
- d.addCallback(check)
- return d
-
- def testSetFlags(self, uid=0):
- self.function = self.client.setFlags
- self.messages = '1,5,9'
- self.flags = ['\\A', '\\B', 'C']
- self.silent = False
- self.uid = uid
- self.response = {
- 1: ['\\A', '\\B', 'C'],
- 5: ['\\A', '\\B', 'C'],
- 9: ['\\A', '\\B', 'C'],
- }
- self.expected = {
- 1: {'FLAGS': ['\\A', '\\B', 'C']},
- 5: {'FLAGS': ['\\A', '\\B', 'C']},
- 9: {'FLAGS': ['\\A', '\\B', 'C']},
- }
- msg = imap4.MessageSet()
- msg.add(1)
- msg.add(5)
- msg.add(9)
- self.expectedArgs = ((msg, ['\\A', '\\B', 'C'], 0), {'uid': 0})
- return self._storeWork()
-
-
-class NewFetchTestCase(unittest.TestCase, IMAP4HelperMixin):
- def setUp(self):
- self.received_messages = self.received_uid = None
- self.result = None
-
- self.server = imap4.IMAP4Server()
- self.server.state = 'select'
- self.server.mbox = self
- self.connected = defer.Deferred()
- self.client = SimpleClient(self.connected)
-
- def addListener(self, x):
- pass
- def removeListener(self, x):
- pass
-
- def fetch(self, messages, uid):
- self.received_messages = messages
- self.received_uid = uid
- return iter(zip(range(len(self.msgObjs)), self.msgObjs))
-
- def _fetchWork(self, uid):
- if uid:
- for (i, msg) in zip(range(len(self.msgObjs)), self.msgObjs):
- self.expected[i]['UID'] = str(msg.getUID())
-
- def result(R):
- self.result = R
-
- self.connected.addCallback(lambda _: self.function(self.messages, uid)
- ).addCallback(result
- ).addCallback(self._cbStopClient
- ).addErrback(self._ebGeneral)
-
- d = loopback.loopbackTCP(self.server, self.client, noisy=False)
- d.addCallback(lambda x : self.assertEquals(self.result, self.expected))
- return d
-
- def testFetchUID(self):
- self.function = lambda m, u: self.client.fetchUID(m)
-
- self.messages = '7'
- self.msgObjs = [
- FakeyMessage({}, (), '', '', 12345, None),
- FakeyMessage({}, (), '', '', 999, None),
- FakeyMessage({}, (), '', '', 10101, None),
- ]
- self.expected = {
- 0: {'UID': '12345'},
- 1: {'UID': '999'},
- 2: {'UID': '10101'},
- }
- return self._fetchWork(0)
-
- def testFetchFlags(self, uid=0):
- self.function = self.client.fetchFlags
- self.messages = '9'
- self.msgObjs = [
- FakeyMessage({}, ['FlagA', 'FlagB', '\\FlagC'], '', '', 54321, None),
- FakeyMessage({}, ['\\FlagC', 'FlagA', 'FlagB'], '', '', 12345, None),
- ]
- self.expected = {
- 0: {'FLAGS': ['FlagA', 'FlagB', '\\FlagC']},
- 1: {'FLAGS': ['\\FlagC', 'FlagA', 'FlagB']},
- }
- return self._fetchWork(uid)
-
- def testFetchFlagsUID(self):
- return self.testFetchFlags(1)
-
- def testFetchInternalDate(self, uid=0):
- self.function = self.client.fetchInternalDate
- self.messages = '13'
- self.msgObjs = [
- FakeyMessage({}, (), 'Fri, 02 Nov 2003 21:25:10 GMT', '', 23232, None),
- FakeyMessage({}, (), 'Thu, 29 Dec 2013 11:31:52 EST', '', 101, None),
- FakeyMessage({}, (), 'Mon, 10 Mar 1992 02:44:30 CST', '', 202, None),
- FakeyMessage({}, (), 'Sat, 11 Jan 2000 14:40:24 PST', '', 303, None),
- ]
- self.expected = {
- 0: {'INTERNALDATE': '02-Nov-2003 21:25:10 +0000'},
- 1: {'INTERNALDATE': '29-Dec-2013 11:31:52 -0500'},
- 2: {'INTERNALDATE': '10-Mar-1992 02:44:30 -0600'},
- 3: {'INTERNALDATE': '11-Jan-2000 14:40:24 -0800'},
- }
- return self._fetchWork(uid)
-
- def testFetchInternalDateUID(self):
- return self.testFetchInternalDate(1)
-
- def testFetchEnvelope(self, uid=0):
- self.function = self.client.fetchEnvelope
- self.messages = '15'
- self.msgObjs = [
- FakeyMessage({
- 'from': 'user@domain', 'to': 'resu@domain',
- 'date': 'thursday', 'subject': 'it is a message',
- 'message-id': 'id-id-id-yayaya'}, (), '', '', 65656,
- None),
- ]
- self.expected = {
- 0: {'ENVELOPE':
- ['thursday', 'it is a message',
- [[None, None, 'user', 'domain']],
- [[None, None, 'user', 'domain']],
- [[None, None, 'user', 'domain']],
- [[None, None, 'resu', 'domain']],
- None, None, None, 'id-id-id-yayaya']
- }
- }
- return self._fetchWork(uid)
-
- def testFetchEnvelopeUID(self):
- return self.testFetchEnvelope(1)
-
- def testFetchBodyStructure(self, uid=0):
- self.function = self.client.fetchBodyStructure
- self.messages = '3:9,10:*'
- self.msgObjs = [FakeyMessage({
- 'content-type': 'text/plain; name=thing; key="value"',
- 'content-id': 'this-is-the-content-id',
- 'content-description': 'describing-the-content-goes-here!',
- 'content-transfer-encoding': '8BIT',
- }, (), '', 'Body\nText\nGoes\nHere\n', 919293, None)]
- self.expected = {0: {'BODYSTRUCTURE': [
- 'text', 'plain', [['name', 'thing'], ['key', 'value']],
- 'this-is-the-content-id', 'describing-the-content-goes-here!',
- '8BIT', '20', '4', None, None, None]}}
- return self._fetchWork(uid)
-
- def testFetchBodyStructureUID(self):
- return self.testFetchBodyStructure(1)
-
- def testFetchSimplifiedBody(self, uid=0):
- self.function = self.client.fetchSimplifiedBody
- self.messages = '21'
- self.msgObjs = [FakeyMessage({}, (), '', 'Yea whatever', 91825,
- [FakeyMessage({'content-type': 'image/jpg'}, (), '',
- 'Body Body Body', None, None
- )]
- )]
- self.expected = {0:
- {'BODY':
- [None, None, [], None, None, None,
- '12'
- ]
- }
- }
-
- return self._fetchWork(uid)
-
- def testFetchSimplifiedBodyUID(self):
- return self.testFetchSimplifiedBody(1)
-
- def testFetchSimplifiedBodyText(self, uid=0):
- self.function = self.client.fetchSimplifiedBody
- self.messages = '21'
- self.msgObjs = [FakeyMessage({'content-type': 'text/plain'},
- (), '', 'Yea whatever', 91825, None)]
- self.expected = {0:
- {'BODY':
- ['text', 'plain', [], None, None, None,
- '12', '1'
- ]
- }
- }
-
- return self._fetchWork(uid)
-
- def testFetchSimplifiedBodyTextUID(self):
- return self.testFetchSimplifiedBodyText(1)
-
- def testFetchSimplifiedBodyRFC822(self, uid=0):
- self.function = self.client.fetchSimplifiedBody
- self.messages = '21'
- self.msgObjs = [FakeyMessage({'content-type': 'message/rfc822'},
- (), '', 'Yea whatever', 91825,
- [FakeyMessage({'content-type': 'image/jpg'}, (), '',
- 'Body Body Body', None, None
- )]
- )]
- self.expected = {0:
- {'BODY':
- ['message', 'rfc822', [], None, None, None,
- '12', [None, None, [[None, None, None]],
- [[None, None, None]], None, None, None,
- None, None, None], ['image', 'jpg', [],
- None, None, None, '14'], '1'
- ]
- }
- }
-
- return self._fetchWork(uid)
-
- def testFetchSimplifiedBodyRFC822UID(self):
- return self.testFetchSimplifiedBodyRFC822(1)
-
- def testFetchMessage(self, uid=0):
- self.function = self.client.fetchMessage
- self.messages = '1,3,7,10101'
- self.msgObjs = [
- FakeyMessage({'Header': 'Value'}, (), '', 'BODY TEXT\r\n', 91, None),
- ]
- self.expected = {
- 0: {'RFC822': 'Header: Value\r\n\r\nBODY TEXT\r\n'}
- }
- return self._fetchWork(uid)
-
- def testFetchMessageUID(self):
- return self.testFetchMessage(1)
-
- def testFetchHeaders(self, uid=0):
- self.function = self.client.fetchHeaders
- self.messages = '9,6,2'
- self.msgObjs = [
- FakeyMessage({'H1': 'V1', 'H2': 'V2'}, (), '', '', 99, None),
- ]
- self.expected = {
- 0: {'RFC822.HEADER': imap4._formatHeaders({'H1': 'V1', 'H2': 'V2'})},
- }
- return self._fetchWork(uid)
-
- def testFetchHeadersUID(self):
- return self.testFetchHeaders(1)
-
- def testFetchBody(self, uid=0):
- self.function = self.client.fetchBody
- self.messages = '1,2,3,4,5,6,7'
- self.msgObjs = [
- FakeyMessage({'Header': 'Value'}, (), '', 'Body goes here\r\n', 171, None),
- ]
- self.expected = {
- 0: {'RFC822.TEXT': 'Body goes here\r\n'},
- }
- return self._fetchWork(uid)
-
- def testFetchBodyUID(self):
- return self.testFetchBody(1)
-
- def testFetchBodyParts(self):
- self.function = self.client.fetchBodyParts
- self.messages = '1'
- parts = [1, 2]
- outerBody = ''
- innerBody1 = 'Contained body message text. Squarge.'
- innerBody2 = 'Secondary <i>message</i> text of squarge body.'
- headers = util.OrderedDict()
- headers['from'] = 'sender@host'
- headers['to'] = 'recipient@domain'
- headers['subject'] = 'booga booga boo'
- headers['content-type'] = 'multipart/alternative; boundary="xyz"'
- innerHeaders = util.OrderedDict()
- innerHeaders['subject'] = 'this is subject text'
- innerHeaders['content-type'] = 'text/plain'
- innerHeaders2 = util.OrderedDict()
- innerHeaders2['subject'] = '<b>this is subject</b>'
- innerHeaders2['content-type'] = 'text/html'
- self.msgObjs = [FakeyMessage(
- headers, (), None, outerBody, 123,
- [FakeyMessage(innerHeaders, (), None, innerBody1, None, None),
- FakeyMessage(innerHeaders2, (), None, innerBody2, None, None)])]
- self.expected = {
- 0: {'1': innerBody1, '2': innerBody2},
- }
-
- def result(R):
- self.result = R
-
- self.connected.addCallback(lambda _: self.function(self.messages, parts))
- self.connected.addCallback(result)
- self.connected.addCallback(self._cbStopClient)
- self.connected.addErrback(self._ebGeneral)
-
- d = loopback.loopbackTCP(self.server, self.client, noisy=False)
- d.addCallback(lambda ign: self.assertEquals(self.result, self.expected))
- return d
-
-
- def test_fetchBodyPartOfNonMultipart(self):
- """
- Single-part messages have an implicit first part which clients
- should be able to retrieve explicitly. Test that a client
- requesting part 1 of a text/plain message receives the body of the
- text/plain part.
- """
- self.function = self.client.fetchBodyParts
- self.messages = '1'
- parts = [1]
- outerBody = 'DA body'
- headers = util.OrderedDict()
- headers['from'] = 'sender@host'
- headers['to'] = 'recipient@domain'
- headers['subject'] = 'booga booga boo'
- headers['content-type'] = 'text/plain'
- self.msgObjs = [FakeyMessage(
- headers, (), None, outerBody, 123, None)]
-
- self.expected = {
- 0: {'1': outerBody},
- }
-
- def result(R):
- self.result = R
-
- self.connected.addCallback(lambda _: self.function(self.messages, parts))
- self.connected.addCallback(result)
- self.connected.addCallback(self._cbStopClient)
- self.connected.addErrback(self._ebGeneral)
-
- d = loopback.loopbackTCP(self.server, self.client, noisy=False)
- d.addCallback(lambda ign: self.assertEquals(self.result, self.expected))
- return d
-
-
- def testFetchSize(self, uid=0):
- self.function = self.client.fetchSize
- self.messages = '1:100,2:*'
- self.msgObjs = [
- FakeyMessage({}, (), '', 'x' * 20, 123, None),
- ]
- self.expected = {
- 0: {'RFC822.SIZE': '20'},
- }
- return self._fetchWork(uid)
-
- def testFetchSizeUID(self):
- return self.testFetchSize(1)
-
- def testFetchFull(self, uid=0):
- self.function = self.client.fetchFull
- self.messages = '1,3'
- self.msgObjs = [
- FakeyMessage({}, ('\\XYZ', '\\YZX', 'Abc'),
- 'Sun, 25 Jul 2010 06:20:30 -0400 (EDT)',
- 'xyz' * 2, 654, None),
- FakeyMessage({}, ('\\One', '\\Two', 'Three'),
- 'Mon, 14 Apr 2003 19:43:44 -0400',
- 'abc' * 4, 555, None),
- ]
- self.expected = {
- 0: {'FLAGS': ['\\XYZ', '\\YZX', 'Abc'],
- 'INTERNALDATE': '25-Jul-2010 06:20:30 -0400',
- 'RFC822.SIZE': '6',
- 'ENVELOPE': [None, None, [[None, None, None]], [[None, None, None]], None, None, None, None, None, None],
- 'BODY': [None, None, [], None, None, None, '6']},
- 1: {'FLAGS': ['\\One', '\\Two', 'Three'],
- 'INTERNALDATE': '14-Apr-2003 19:43:44 -0400',
- 'RFC822.SIZE': '12',
- 'ENVELOPE': [None, None, [[None, None, None]], [[None, None, None]], None, None, None, None, None, None],
- 'BODY': [None, None, [], None, None, None, '12']},
- }
- return self._fetchWork(uid)
-
- def testFetchFullUID(self):
- return self.testFetchFull(1)
-
- def testFetchAll(self, uid=0):
- self.function = self.client.fetchAll
- self.messages = '1,2:3'
- self.msgObjs = [
- FakeyMessage({}, (), 'Mon, 14 Apr 2003 19:43:44 +0400',
- 'Lalala', 10101, None),
- FakeyMessage({}, (), 'Tue, 15 Apr 2003 19:43:44 +0200',
- 'Alalal', 20202, None),
- ]
- self.expected = {
- 0: {'ENVELOPE': [None, None, [[None, None, None]], [[None, None, None]], None, None, None, None, None, None],
- 'RFC822.SIZE': '6',
- 'INTERNALDATE': '14-Apr-2003 19:43:44 +0400',
- 'FLAGS': []},
- 1: {'ENVELOPE': [None, None, [[None, None, None]], [[None, None, None]], None, None, None, None, None, None],
- 'RFC822.SIZE': '6',
- 'INTERNALDATE': '15-Apr-2003 19:43:44 +0200',
- 'FLAGS': []},
- }
- return self._fetchWork(uid)
-
- def testFetchAllUID(self):
- return self.testFetchAll(1)
-
- def testFetchFast(self, uid=0):
- self.function = self.client.fetchFast
- self.messages = '1'
- self.msgObjs = [
- FakeyMessage({}, ('\\X',), '19 Mar 2003 19:22:21 -0500', '', 9, None),
- ]
- self.expected = {
- 0: {'FLAGS': ['\\X'],
- 'INTERNALDATE': '19-Mar-2003 19:22:21 -0500',
- 'RFC822.SIZE': '0'},
- }
- return self._fetchWork(uid)
-
- def testFetchFastUID(self):
- return self.testFetchFast(1)
-
-
-class FetchSearchStoreTestCase(unittest.TestCase, IMAP4HelperMixin):
- implements(imap4.ISearchableMailbox)
-
- def setUp(self):
- self.expected = self.result = None
- self.server_received_query = None
- self.server_received_uid = None
- self.server_received_parts = None
- self.server_received_messages = None
-
- self.server = imap4.IMAP4Server()
- self.server.state = 'select'
- self.server.mbox = self
- self.connected = defer.Deferred()
- self.client = SimpleClient(self.connected)
-
- def search(self, query, uid):
- self.server_received_query = query
- self.server_received_uid = uid
- return self.expected
-
- def addListener(self, *a, **kw):
- pass
- removeListener = addListener
-
- def _searchWork(self, uid):
- def search():
- return self.client.search(self.query, uid=uid)
- def result(R):
- self.result = R
-
- self.connected.addCallback(strip(search)
- ).addCallback(result
- ).addCallback(self._cbStopClient
- ).addErrback(self._ebGeneral)
-
- def check(ignored):
- # Ensure no short-circuiting wierdness is going on
- self.failIf(self.result is self.expected)
-
- self.assertEquals(self.result, self.expected)
- self.assertEquals(self.uid, self.server_received_uid)
- self.assertEquals(
- imap4.parseNestedParens(self.query),
- self.server_received_query
- )
- d = loopback.loopbackTCP(self.server, self.client, noisy=False)
- d.addCallback(check)
- return d
-
- def testSearch(self):
- self.query = imap4.Or(
- imap4.Query(header=('subject', 'substring')),
- imap4.Query(larger=1024, smaller=4096),
- )
- self.expected = [1, 4, 5, 7]
- self.uid = 0
- return self._searchWork(0)
-
- def testUIDSearch(self):
- self.query = imap4.Or(
- imap4.Query(header=('subject', 'substring')),
- imap4.Query(larger=1024, smaller=4096),
- )
- self.uid = 1
- self.expected = [1, 2, 3]
- return self._searchWork(1)
-
- def getUID(self, msg):
- try:
- return self.expected[msg]['UID']
- except (TypeError, IndexError):
- return self.expected[msg-1]
- except KeyError:
- return 42
-
- def fetch(self, messages, uid):
- self.server_received_uid = uid
- self.server_received_messages = str(messages)
- return self.expected
-
- def _fetchWork(self, fetch):
- def result(R):
- self.result = R
-
- self.connected.addCallback(strip(fetch)
- ).addCallback(result
- ).addCallback(self._cbStopClient
- ).addErrback(self._ebGeneral)
-
- def check(ignored):
- # Ensure no short-circuiting wierdness is going on
- self.failIf(self.result is self.expected)
-
- self.parts and self.parts.sort()
- self.server_received_parts and self.server_received_parts.sort()
-
- if self.uid:
- for (k, v) in self.expected.items():
- v['UID'] = str(k)
-
- self.assertEquals(self.result, self.expected)
- self.assertEquals(self.uid, self.server_received_uid)
- self.assertEquals(self.parts, self.server_received_parts)
- self.assertEquals(imap4.parseIdList(self.messages),
- imap4.parseIdList(self.server_received_messages))
-
- d = loopback.loopbackTCP(self.server, self.client, noisy=False)
- d.addCallback(check)
- return d
-
-class FakeMailbox:
- def __init__(self):
- self.args = []
- def addMessage(self, body, flags, date):
- self.args.append((body, flags, date))
- return defer.succeed(None)
-
-class FeaturefulMessage:
- implements(imap4.IMessageFile)
-
- def getFlags(self):
- return 'flags'
-
- def getInternalDate(self):
- return 'internaldate'
-
- def open(self):
- return StringIO("open")
-
-class MessageCopierMailbox:
- implements(imap4.IMessageCopier)
-
- def __init__(self):
- self.msgs = []
-
- def copy(self, msg):
- self.msgs.append(msg)
- return len(self.msgs)
-
-class CopyWorkerTestCase(unittest.TestCase):
- def testFeaturefulMessage(self):
- s = imap4.IMAP4Server()
-
- # Yes. I am grabbing this uber-non-public method to test it.
- # It is complex. It needs to be tested directly!
- # Perhaps it should be refactored, simplified, or split up into
- # not-so-private components, but that is a task for another day.
-
- # Ha ha! Addendum! Soon it will be split up, and this test will
- # be re-written to just use the default adapter for IMailbox to
- # IMessageCopier and call .copy on that adapter.
- f = s._IMAP4Server__cbCopy
-
- m = FakeMailbox()
- d = f([(i, FeaturefulMessage()) for i in range(1, 11)], 'tag', m)
-
- def cbCopy(results):
- for a in m.args:
- self.assertEquals(a[0].read(), "open")
- self.assertEquals(a[1], "flags")
- self.assertEquals(a[2], "internaldate")
-
- for (status, result) in results:
- self.failUnless(status)
- self.assertEquals(result, None)
-
- return d.addCallback(cbCopy)
-
-
- def testUnfeaturefulMessage(self):
- s = imap4.IMAP4Server()
-
- # See above comment
- f = s._IMAP4Server__cbCopy
-
- m = FakeMailbox()
- msgs = [FakeyMessage({'Header-Counter': str(i)}, (), 'Date', 'Body %d' % (i,), i + 10, None) for i in range(1, 11)]
- d = f([im for im in zip(range(1, 11), msgs)], 'tag', m)
-
- def cbCopy(results):
- seen = []
- for a in m.args:
- seen.append(a[0].read())
- self.assertEquals(a[1], ())
- self.assertEquals(a[2], "Date")
-
- seen.sort()
- exp = ["Header-Counter: %d\r\n\r\nBody %d" % (i, i) for i in range(1, 11)]
- exp.sort()
- self.assertEquals(seen, exp)
-
- for (status, result) in results:
- self.failUnless(status)
- self.assertEquals(result, None)
-
- return d.addCallback(cbCopy)
-
- def testMessageCopier(self):
- s = imap4.IMAP4Server()
-
- # See above comment
- f = s._IMAP4Server__cbCopy
-
- m = MessageCopierMailbox()
- msgs = [object() for i in range(1, 11)]
- d = f([im for im in zip(range(1, 11), msgs)], 'tag', m)
-
- def cbCopy(results):
- self.assertEquals(results, zip([1] * 10, range(1, 11)))
- for (orig, new) in zip(msgs, m.msgs):
- self.assertIdentical(orig, new)
-
- return d.addCallback(cbCopy)
-
-
-class TLSTestCase(IMAP4HelperMixin, unittest.TestCase):
- serverCTX = ServerTLSContext and ServerTLSContext()
- clientCTX = ClientTLSContext and ClientTLSContext()
-
- def loopback(self):
- return loopback.loopbackTCP(self.server, self.client, noisy=False)
-
- def testAPileOfThings(self):
- SimpleServer.theAccount.addMailbox('inbox')
- called = []
- def login():
- called.append(None)
- return self.client.login('testuser', 'password-test')
- def list():
- called.append(None)
- return self.client.list('inbox', '%')
- def status():
- called.append(None)
- return self.client.status('inbox', 'UIDNEXT')
- def examine():
- called.append(None)
- return self.client.examine('inbox')
- def logout():
- called.append(None)
- return self.client.logout()
-
- self.client.requireTransportSecurity = True
-
- methods = [login, list, status, examine, logout]
- map(self.connected.addCallback, map(strip, methods))
- self.connected.addCallbacks(self._cbStopClient, self._ebGeneral)
- def check(ignored):
- self.assertEquals(self.server.startedTLS, True)
- self.assertEquals(self.client.startedTLS, True)
- self.assertEquals(len(called), len(methods))
- d = self.loopback()
- d.addCallback(check)
- return d
-
- def testLoginLogin(self):
- self.server.checker.addUser('testuser', 'password-test')
- success = []
- self.client.registerAuthenticator(imap4.LOGINAuthenticator('testuser'))
- self.connected.addCallback(
- lambda _: self.client.authenticate('password-test')
- ).addCallback(
- lambda _: self.client.logout()
- ).addCallback(success.append
- ).addCallback(self._cbStopClient
- ).addErrback(self._ebGeneral)
-
- d = self.loopback()
- d.addCallback(lambda x : self.assertEquals(len(success), 1))
- return d
-
- def testStartTLS(self):
- success = []
- self.connected.addCallback(lambda _: self.client.startTLS())
- self.connected.addCallback(lambda _: self.assertNotEquals(-1, self.client.transport.__class__.__name__.find('TLS')))
- self.connected.addCallback(self._cbStopClient)
- self.connected.addCallback(success.append)
- self.connected.addErrback(self._ebGeneral)
-
- d = self.loopback()
- d.addCallback(lambda x : self.failUnless(success))
- return d
-
- def testFailedStartTLS(self):
- failure = []
- def breakServerTLS(ign):
- self.server.canStartTLS = False
-
- self.connected.addCallback(breakServerTLS)
- self.connected.addCallback(lambda ign: self.client.startTLS())
- self.connected.addErrback(lambda err: failure.append(err.trap(imap4.IMAP4Exception)))
- self.connected.addCallback(self._cbStopClient)
- self.connected.addErrback(self._ebGeneral)
-
- def check(ignored):
- self.failUnless(failure)
- self.assertIdentical(failure[0], imap4.IMAP4Exception)
- return self.loopback().addCallback(check)
-
-
-
-class SlowMailbox(SimpleMailbox):
- howSlow = 2
- callLater = None
- fetchDeferred = None
-
- # Not a very nice implementation of fetch(), but it'll
- # do for the purposes of testing.
- def fetch(self, messages, uid):
- d = defer.Deferred()
- self.callLater(self.howSlow, d.callback, ())
- self.fetchDeferred.callback(None)
- return d
-
-class Timeout(IMAP4HelperMixin, unittest.TestCase):
-
- def test_serverTimeout(self):
- """
- The *client* has a timeout mechanism which will close connections that
- are inactive for a period.
- """
- c = Clock()
- self.server.timeoutTest = True
- self.client.timeout = 5 #seconds
- self.client.callLater = c.callLater
- self.selectedArgs = None
-
- def login():
- d = self.client.login('testuser', 'password-test')
- c.advance(5)
- d.addErrback(timedOut)
- return d
-
- def timedOut(failure):
- self._cbStopClient(None)
- failure.trap(error.TimeoutError)
-
- d = self.connected.addCallback(strip(login))
- d.addErrback(self._ebGeneral)
- return defer.gatherResults([d, self.loopback()])
-
-
- def test_longFetchDoesntTimeout(self):
- """
- The connection timeout does not take effect during fetches.
- """
- c = Clock()
- SlowMailbox.callLater = c.callLater
- SlowMailbox.fetchDeferred = defer.Deferred()
- self.server.callLater = c.callLater
- SimpleServer.theAccount.mailboxFactory = SlowMailbox
- SimpleServer.theAccount.addMailbox('mailbox-test')
-
- self.server.setTimeout(1)
-
- def login():
- return self.client.login('testuser', 'password-test')
- def select():
- self.server.setTimeout(1)
- return self.client.select('mailbox-test')
- def fetch():
- return self.client.fetchUID('1:*')
- def stillConnected():
- self.assertNotEquals(self.server.state, 'timeout')
-
- def cbAdvance(ignored):
- for i in xrange(4):
- c.advance(.5)
-
- SlowMailbox.fetchDeferred.addCallback(cbAdvance)
-
- d1 = self.connected.addCallback(strip(login))
- d1.addCallback(strip(select))
- d1.addCallback(strip(fetch))
- d1.addCallback(strip(stillConnected))
- d1.addCallback(self._cbStopClient)
- d1.addErrback(self._ebGeneral)
- d = defer.gatherResults([d1, self.loopback()])
- return d
-
-
- def test_idleClientDoesDisconnect(self):
- """
- The *server* has a timeout mechanism which will close connections that
- are inactive for a period.
- """
- c = Clock()
- # Hook up our server protocol
- transport = StringTransportWithDisconnection()
- transport.protocol = self.server
- self.server.callLater = c.callLater
- self.server.makeConnection(transport)
-
- # Make sure we can notice when the connection goes away
- lost = []
- connLost = self.server.connectionLost
- self.server.connectionLost = lambda reason: (lost.append(None), connLost(reason))[1]
-
- # 2/3rds of the idle timeout elapses...
- c.pump([0.0] + [self.server.timeOut / 3.0] * 2)
- self.failIf(lost, lost)
-
- # Now some more
- c.pump([0.0, self.server.timeOut / 2.0])
- self.failUnless(lost)
-
-
-
-class Disconnection(unittest.TestCase):
- def testClientDisconnectFailsDeferreds(self):
- c = imap4.IMAP4Client()
- t = StringTransportWithDisconnection()
- c.makeConnection(t)
- d = self.assertFailure(c.login('testuser', 'example.com'), error.ConnectionDone)
- c.connectionLost(error.ConnectionDone("Connection closed"))
- return d
-
-
-
-class SynchronousMailbox(object):
- """
- Trivial, in-memory mailbox implementation which can produce a message
- synchronously.
- """
- def __init__(self, messages):
- self.messages = messages
-
-
- def fetch(self, msgset, uid):
- assert not uid, "Cannot handle uid requests."
- for msg in msgset:
- yield msg, self.messages[msg - 1]
-
-
-
-class StringTransportConsumer(StringTransport):
- producer = None
- streaming = None
-
- def registerProducer(self, producer, streaming):
- self.producer = producer
- self.streaming = streaming
-
-
-
-class Pipelining(unittest.TestCase):
- """
- Tests for various aspects of the IMAP4 server's pipelining support.
- """
- messages = [
- FakeyMessage({}, [], '', '0', None, None),
- FakeyMessage({}, [], '', '1', None, None),
- FakeyMessage({}, [], '', '2', None, None),
- ]
-
- def setUp(self):
- self.iterators = []
-
- self.transport = StringTransportConsumer()
- self.server = imap4.IMAP4Server(None, None, self.iterateInReactor)
- self.server.makeConnection(self.transport)
-
-
- def iterateInReactor(self, iterator):
- d = defer.Deferred()
- self.iterators.append((iterator, d))
- return d
-
-
- def tearDown(self):
- self.server.connectionLost(failure.Failure(error.ConnectionDone()))
-
-
- def test_synchronousFetch(self):
- """
- Test that pipelined FETCH commands which can be responded to
- synchronously are responded to correctly.
- """
- mailbox = SynchronousMailbox(self.messages)
-
- # Skip over authentication and folder selection
- self.server.state = 'select'
- self.server.mbox = mailbox
-
- # Get rid of any greeting junk
- self.transport.clear()
-
- # Here's some pipelined stuff
- self.server.dataReceived(
- '01 FETCH 1 BODY[]\r\n'
- '02 FETCH 2 BODY[]\r\n'
- '03 FETCH 3 BODY[]\r\n')
-
- # Flush anything the server has scheduled to run
- while self.iterators:
- for e in self.iterators[0][0]:
- break
- else:
- self.iterators.pop(0)[1].callback(None)
-
- # The bodies are empty because we aren't simulating a transport
- # exactly correctly (we have StringTransportConsumer but we never
- # call resumeProducing on its producer). It doesn't matter: just
- # make sure the surrounding structure is okay, and that no
- # exceptions occurred.
- self.assertEquals(
- self.transport.value(),
- '* 1 FETCH (BODY[] )\r\n'
- '01 OK FETCH completed\r\n'
- '* 2 FETCH (BODY[] )\r\n'
- '02 OK FETCH completed\r\n'
- '* 3 FETCH (BODY[] )\r\n'
- '03 OK FETCH completed\r\n')
-
-
-
-if ClientTLSContext is None:
- for case in (TLSTestCase,):
- case.skip = "OpenSSL not present"
-elif interfaces.IReactorSSL(reactor, None) is None:
- for case in (TLSTestCase,):
- case.skip = "Reactor doesn't support SSL"
« no previous file with comments | « third_party/twisted_8_1/twisted/mail/test/test_bounce.py ('k') | third_party/twisted_8_1/twisted/mail/test/test_mail.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698