| Index: third_party/twisted_8_1/twisted/mail/test/test_imap.py
|
| diff --git a/third_party/twisted_8_1/twisted/mail/test/test_imap.py b/third_party/twisted_8_1/twisted/mail/test/test_imap.py
|
| deleted file mode 100644
|
| index 6d11e1eac123517f69f4534e8812819741d90787..0000000000000000000000000000000000000000
|
| --- a/third_party/twisted_8_1/twisted/mail/test/test_imap.py
|
| +++ /dev/null
|
| @@ -1,3040 +0,0 @@
|
| -# -*- test-case-name: twisted.mail.test.test_imap -*-
|
| -# Copyright (c) 2001-2008 Twisted Matrix Laboratories.
|
| -# See LICENSE for details.
|
| -
|
| -
|
| -"""
|
| -Test case for twisted.mail.imap4
|
| -"""
|
| -
|
| -try:
|
| - from cStringIO import StringIO
|
| -except ImportError:
|
| - from StringIO import StringIO
|
| -
|
| -import os
|
| -import types
|
| -
|
| -from zope.interface import implements
|
| -
|
| -from twisted.mail.imap4 import MessageSet
|
| -from twisted.mail import imap4
|
| -from twisted.protocols import loopback
|
| -from twisted.internet import defer
|
| -from twisted.internet import error
|
| -from twisted.internet import reactor
|
| -from twisted.internet import interfaces
|
| -from twisted.internet.task import Clock
|
| -from twisted.trial import unittest
|
| -from twisted.python import util
|
| -from twisted.python import failure
|
| -
|
| -from twisted import cred
|
| -import twisted.cred.error
|
| -import twisted.cred.checkers
|
| -import twisted.cred.credentials
|
| -import twisted.cred.portal
|
| -
|
| -from twisted.test.proto_helpers import StringTransport, StringTransportWithDisconnection
|
| -
|
| -try:
|
| - from twisted.test.ssl_helpers import ClientTLSContext, ServerTLSContext
|
| -except ImportError:
|
| - ClientTLSContext = ServerTLSContext = None
|
| -
|
| -def strip(f):
|
| - return lambda result, f=f: f()
|
| -
|
| -def sortNest(l):
|
| - l = l[:]
|
| - l.sort()
|
| - for i in range(len(l)):
|
| - if isinstance(l[i], types.ListType):
|
| - l[i] = sortNest(l[i])
|
| - elif isinstance(l[i], types.TupleType):
|
| - l[i] = tuple(sortNest(list(l[i])))
|
| - return l
|
| -
|
| -class IMAP4UTF7TestCase(unittest.TestCase):
|
| - tests = [
|
| - [u'Hello world', 'Hello world'],
|
| - [u'Hello & world', 'Hello &- world'],
|
| - [u'Hello\xffworld', 'Hello&AP8-world'],
|
| - [u'\xff\xfe\xfd\xfc', '&AP8A,gD9APw-'],
|
| - [u'~peter/mail/\u65e5\u672c\u8a9e/\u53f0\u5317',
|
| - '~peter/mail/&ZeVnLIqe-/&U,BTFw-'], # example from RFC 2060
|
| - ]
|
| -
|
| - def test_encodeWithErrors(self):
|
| - """
|
| - Specifying an error policy to C{unicode.encode} with the
|
| - I{imap4-utf-7} codec should produce the same result as not
|
| - specifying the error policy.
|
| - """
|
| - text = u'Hello world'
|
| - self.assertEqual(
|
| - text.encode('imap4-utf-7', 'strict'),
|
| - text.encode('imap4-utf-7'))
|
| -
|
| -
|
| - def test_decodeWithErrors(self):
|
| - """
|
| - Similar to L{test_encodeWithErrors}, but for C{str.decode}.
|
| - """
|
| - bytes = 'Hello world'
|
| - self.assertEqual(
|
| - bytes.decode('imap4-utf-7', 'strict'),
|
| - bytes.decode('imap4-utf-7'))
|
| -
|
| -
|
| - def testEncode(self):
|
| - for (input, output) in self.tests:
|
| - self.assertEquals(input.encode('imap4-utf-7'), output)
|
| -
|
| - def testDecode(self):
|
| - for (input, output) in self.tests:
|
| - # XXX - Piece of *crap* 2.1
|
| - self.assertEquals(input, imap4.decoder(output)[0])
|
| -
|
| - def testPrintableSingletons(self):
|
| - # All printables represent themselves
|
| - for o in range(0x20, 0x26) + range(0x27, 0x7f):
|
| - self.failUnlessEqual(chr(o), chr(o).encode('imap4-utf-7'))
|
| - self.failUnlessEqual(chr(o), chr(o).decode('imap4-utf-7'))
|
| - self.failUnlessEqual('&'.encode('imap4-utf-7'), '&-')
|
| - self.failUnlessEqual('&-'.decode('imap4-utf-7'), '&')
|
| -
|
| -class BufferingConsumer:
|
| - def __init__(self):
|
| - self.buffer = []
|
| -
|
| - def write(self, bytes):
|
| - self.buffer.append(bytes)
|
| - if self.consumer:
|
| - self.consumer.resumeProducing()
|
| -
|
| - def registerProducer(self, consumer, streaming):
|
| - self.consumer = consumer
|
| - self.consumer.resumeProducing()
|
| -
|
| - def unregisterProducer(self):
|
| - self.consumer = None
|
| -
|
| -class MessageProducerTestCase(unittest.TestCase):
|
| - def testSinglePart(self):
|
| - body = 'This is body text. Rar.'
|
| - headers = util.OrderedDict()
|
| - headers['from'] = 'sender@host'
|
| - headers['to'] = 'recipient@domain'
|
| - headers['subject'] = 'booga booga boo'
|
| - headers['content-type'] = 'text/plain'
|
| -
|
| - msg = FakeyMessage(headers, (), None, body, 123, None )
|
| -
|
| - c = BufferingConsumer()
|
| - p = imap4.MessageProducer(msg)
|
| - d = p.beginProducing(c)
|
| -
|
| - def cbProduced(result):
|
| - self.assertIdentical(result, p)
|
| - self.assertEquals(
|
| - ''.join(c.buffer),
|
| -
|
| - '{119}\r\n'
|
| - 'From: sender@host\r\n'
|
| - 'To: recipient@domain\r\n'
|
| - 'Subject: booga booga boo\r\n'
|
| - 'Content-Type: text/plain\r\n'
|
| - '\r\n'
|
| - + body)
|
| - return d.addCallback(cbProduced)
|
| -
|
| -
|
| - def testSingleMultiPart(self):
|
| - outerBody = ''
|
| - innerBody = 'Contained body message text. Squarge.'
|
| - headers = util.OrderedDict()
|
| - headers['from'] = 'sender@host'
|
| - headers['to'] = 'recipient@domain'
|
| - headers['subject'] = 'booga booga boo'
|
| - headers['content-type'] = 'multipart/alternative; boundary="xyz"'
|
| -
|
| - innerHeaders = util.OrderedDict()
|
| - innerHeaders['subject'] = 'this is subject text'
|
| - innerHeaders['content-type'] = 'text/plain'
|
| - msg = FakeyMessage(headers, (), None, outerBody, 123,
|
| - [FakeyMessage(innerHeaders, (), None, innerBody,
|
| - None, None)],
|
| - )
|
| -
|
| - c = BufferingConsumer()
|
| - p = imap4.MessageProducer(msg)
|
| - d = p.beginProducing(c)
|
| -
|
| - def cbProduced(result):
|
| - self.failUnlessIdentical(result, p)
|
| -
|
| - self.assertEquals(
|
| - ''.join(c.buffer),
|
| -
|
| - '{239}\r\n'
|
| - 'From: sender@host\r\n'
|
| - 'To: recipient@domain\r\n'
|
| - 'Subject: booga booga boo\r\n'
|
| - 'Content-Type: multipart/alternative; boundary="xyz"\r\n'
|
| - '\r\n'
|
| - '\r\n'
|
| - '--xyz\r\n'
|
| - 'Subject: this is subject text\r\n'
|
| - 'Content-Type: text/plain\r\n'
|
| - '\r\n'
|
| - + innerBody
|
| - + '\r\n--xyz--\r\n')
|
| -
|
| - return d.addCallback(cbProduced)
|
| -
|
| -
|
| - def testMultipleMultiPart(self):
|
| - outerBody = ''
|
| - innerBody1 = 'Contained body message text. Squarge.'
|
| - innerBody2 = 'Secondary <i>message</i> text of squarge body.'
|
| - headers = util.OrderedDict()
|
| - headers['from'] = 'sender@host'
|
| - headers['to'] = 'recipient@domain'
|
| - headers['subject'] = 'booga booga boo'
|
| - headers['content-type'] = 'multipart/alternative; boundary="xyz"'
|
| - innerHeaders = util.OrderedDict()
|
| - innerHeaders['subject'] = 'this is subject text'
|
| - innerHeaders['content-type'] = 'text/plain'
|
| - innerHeaders2 = util.OrderedDict()
|
| - innerHeaders2['subject'] = '<b>this is subject</b>'
|
| - innerHeaders2['content-type'] = 'text/html'
|
| - msg = FakeyMessage(headers, (), None, outerBody, 123, [
|
| - FakeyMessage(innerHeaders, (), None, innerBody1, None, None),
|
| - FakeyMessage(innerHeaders2, (), None, innerBody2, None, None)
|
| - ],
|
| - )
|
| -
|
| - c = BufferingConsumer()
|
| - p = imap4.MessageProducer(msg)
|
| - d = p.beginProducing(c)
|
| -
|
| - def cbProduced(result):
|
| - self.failUnlessIdentical(result, p)
|
| -
|
| - self.assertEquals(
|
| - ''.join(c.buffer),
|
| -
|
| - '{354}\r\n'
|
| - 'From: sender@host\r\n'
|
| - 'To: recipient@domain\r\n'
|
| - 'Subject: booga booga boo\r\n'
|
| - 'Content-Type: multipart/alternative; boundary="xyz"\r\n'
|
| - '\r\n'
|
| - '\r\n'
|
| - '--xyz\r\n'
|
| - 'Subject: this is subject text\r\n'
|
| - 'Content-Type: text/plain\r\n'
|
| - '\r\n'
|
| - + innerBody1
|
| - + '\r\n--xyz\r\n'
|
| - 'Subject: <b>this is subject</b>\r\n'
|
| - 'Content-Type: text/html\r\n'
|
| - '\r\n'
|
| - + innerBody2
|
| - + '\r\n--xyz--\r\n')
|
| - return d.addCallback(cbProduced)
|
| -
|
| -
|
| -class IMAP4HelperTestCase(unittest.TestCase):
|
| - def testFileProducer(self):
|
| - b = (('x' * 1) + ('y' * 1) + ('z' * 1)) * 10
|
| - c = BufferingConsumer()
|
| - f = StringIO(b)
|
| - p = imap4.FileProducer(f)
|
| - d = p.beginProducing(c)
|
| -
|
| - def cbProduced(result):
|
| - self.failUnlessIdentical(result, p)
|
| - self.assertEquals(
|
| - ('{%d}\r\n' % len(b))+ b,
|
| - ''.join(c.buffer))
|
| - return d.addCallback(cbProduced)
|
| -
|
| - def testWildcard(self):
|
| - cases = [
|
| - ['foo/%gum/bar',
|
| - ['foo/bar', 'oo/lalagum/bar', 'foo/gumx/bar', 'foo/gum/baz'],
|
| - ['foo/xgum/bar', 'foo/gum/bar'],
|
| - ], ['foo/x%x/bar',
|
| - ['foo', 'bar', 'fuz fuz fuz', 'foo/*/bar', 'foo/xyz/bar', 'foo/xx/baz'],
|
| - ['foo/xyx/bar', 'foo/xx/bar', 'foo/xxxxxxxxxxxxxx/bar'],
|
| - ], ['foo/xyz*abc/bar',
|
| - ['foo/xyz/bar', 'foo/abc/bar', 'foo/xyzab/cbar', 'foo/xyza/bcbar'],
|
| - ['foo/xyzabc/bar', 'foo/xyz/abc/bar', 'foo/xyz/123/abc/bar'],
|
| - ]
|
| - ]
|
| -
|
| - for (wildcard, fail, succeed) in cases:
|
| - wildcard = imap4.wildcardToRegexp(wildcard, '/')
|
| - for x in fail:
|
| - self.failIf(wildcard.match(x))
|
| - for x in succeed:
|
| - self.failUnless(wildcard.match(x))
|
| -
|
| - def testWildcardNoDelim(self):
|
| - cases = [
|
| - ['foo/%gum/bar',
|
| - ['foo/bar', 'oo/lalagum/bar', 'foo/gumx/bar', 'foo/gum/baz'],
|
| - ['foo/xgum/bar', 'foo/gum/bar', 'foo/x/gum/bar'],
|
| - ], ['foo/x%x/bar',
|
| - ['foo', 'bar', 'fuz fuz fuz', 'foo/*/bar', 'foo/xyz/bar', 'foo/xx/baz'],
|
| - ['foo/xyx/bar', 'foo/xx/bar', 'foo/xxxxxxxxxxxxxx/bar', 'foo/x/x/bar'],
|
| - ], ['foo/xyz*abc/bar',
|
| - ['foo/xyz/bar', 'foo/abc/bar', 'foo/xyzab/cbar', 'foo/xyza/bcbar'],
|
| - ['foo/xyzabc/bar', 'foo/xyz/abc/bar', 'foo/xyz/123/abc/bar'],
|
| - ]
|
| - ]
|
| -
|
| - for (wildcard, fail, succeed) in cases:
|
| - wildcard = imap4.wildcardToRegexp(wildcard, None)
|
| - for x in fail:
|
| - self.failIf(wildcard.match(x), x)
|
| - for x in succeed:
|
| - self.failUnless(wildcard.match(x), x)
|
| -
|
| - def testHeaderFormatter(self):
|
| - cases = [
|
| - ({'Header1': 'Value1', 'Header2': 'Value2'}, 'Header2: Value2\r\nHeader1: Value1\r\n'),
|
| - ]
|
| -
|
| - for (input, output) in cases:
|
| - self.assertEquals(imap4._formatHeaders(input), output)
|
| -
|
| - def testMessageSet(self):
|
| - m1 = MessageSet()
|
| - m2 = MessageSet()
|
| -
|
| - self.assertEquals(m1, m2)
|
| -
|
| - m1 = m1 + (1, 3)
|
| - self.assertEquals(len(m1), 3)
|
| - self.assertEquals(list(m1), [1, 2, 3])
|
| -
|
| - m2 = m2 + (1, 3)
|
| - self.assertEquals(m1, m2)
|
| - self.assertEquals(list(m1 + m2), [1, 2, 3])
|
| -
|
| - def testQuotedSplitter(self):
|
| - cases = [
|
| - '''Hello World''',
|
| - '''Hello "World!"''',
|
| - '''World "Hello" "How are you?"''',
|
| - '''"Hello world" How "are you?"''',
|
| - '''foo bar "baz buz" NIL''',
|
| - '''foo bar "baz buz" "NIL"''',
|
| - '''foo NIL "baz buz" bar''',
|
| - '''foo "NIL" "baz buz" bar''',
|
| - '''"NIL" bar "baz buz" foo''',
|
| - ]
|
| -
|
| - answers = [
|
| - ['Hello', 'World'],
|
| - ['Hello', 'World!'],
|
| - ['World', 'Hello', 'How are you?'],
|
| - ['Hello world', 'How', 'are you?'],
|
| - ['foo', 'bar', 'baz buz', None],
|
| - ['foo', 'bar', 'baz buz', 'NIL'],
|
| - ['foo', None, 'baz buz', 'bar'],
|
| - ['foo', 'NIL', 'baz buz', 'bar'],
|
| - ['NIL', 'bar', 'baz buz', 'foo'],
|
| - ]
|
| -
|
| - errors = [
|
| - '"mismatched quote',
|
| - 'mismatched quote"',
|
| - 'mismatched"quote',
|
| - '"oops here is" another"',
|
| - ]
|
| -
|
| - for s in errors:
|
| - self.assertRaises(imap4.MismatchedQuoting, imap4.splitQuoted, s)
|
| -
|
| - for (case, expected) in zip(cases, answers):
|
| - self.assertEquals(imap4.splitQuoted(case), expected)
|
| -
|
| -
|
| - def testStringCollapser(self):
|
| - cases = [
|
| - ['a', 'b', 'c', 'd', 'e'],
|
| - ['a', ' ', '"', 'b', 'c', ' ', '"', ' ', 'd', 'e'],
|
| - [['a', 'b', 'c'], 'd', 'e'],
|
| - ['a', ['b', 'c', 'd'], 'e'],
|
| - ['a', 'b', ['c', 'd', 'e']],
|
| - ['"', 'a', ' ', '"', ['b', 'c', 'd'], '"', ' ', 'e', '"'],
|
| - ['a', ['"', ' ', 'b', 'c', ' ', ' ', '"'], 'd', 'e'],
|
| - ]
|
| -
|
| - answers = [
|
| - ['abcde'],
|
| - ['a', 'bc ', 'de'],
|
| - [['abc'], 'de'],
|
| - ['a', ['bcd'], 'e'],
|
| - ['ab', ['cde']],
|
| - ['a ', ['bcd'], ' e'],
|
| - ['a', [' bc '], 'de'],
|
| - ]
|
| -
|
| - for (case, expected) in zip(cases, answers):
|
| - self.assertEquals(imap4.collapseStrings(case), expected)
|
| -
|
| - def testParenParser(self):
|
| - s = '\r\n'.join(['xx'] * 4)
|
| - cases = [
|
| - '(BODY.PEEK[HEADER.FIELDS.NOT (subject bcc cc)] {%d}\r\n%s)' % (len(s), s,),
|
| -
|
| -# '(FLAGS (\Seen) INTERNALDATE "17-Jul-1996 02:44:25 -0700" '
|
| -# 'RFC822.SIZE 4286 ENVELOPE ("Wed, 17 Jul 1996 02:23:25 -0700 (PDT)" '
|
| -# '"IMAP4rev1 WG mtg summary and minutes" '
|
| -# '(("Terry Gray" NIL "gray" "cac.washington.edu")) '
|
| -# '(("Terry Gray" NIL "gray" "cac.washington.edu")) '
|
| -# '(("Terry Gray" NIL "gray" "cac.washington.edu")) '
|
| -# '((NIL NIL "imap" "cac.washington.edu")) '
|
| -# '((NIL NIL "minutes" "CNRI.Reston.VA.US") '
|
| -# '("John Klensin" NIL "KLENSIN" "INFOODS.MIT.EDU")) NIL NIL '
|
| -# '"<B27397-0100000@cac.washington.edu>") '
|
| -# 'BODY ("TEXT" "PLAIN" ("CHARSET" "US-ASCII") NIL NIL "7BIT" 3028 92))',
|
| -
|
| - '(FLAGS (\Seen) INTERNALDATE "17-Jul-1996 02:44:25 -0700" '
|
| - 'RFC822.SIZE 4286 ENVELOPE ("Wed, 17 Jul 1996 02:23:25 -0700 (PDT)" '
|
| - '"IMAP4rev1 WG mtg summary and minutes" '
|
| - '(("Terry Gray" NIL gray cac.washington.edu)) '
|
| - '(("Terry Gray" NIL gray cac.washington.edu)) '
|
| - '(("Terry Gray" NIL gray cac.washington.edu)) '
|
| - '((NIL NIL imap cac.washington.edu)) '
|
| - '((NIL NIL minutes CNRI.Reston.VA.US) '
|
| - '("John Klensin" NIL KLENSIN INFOODS.MIT.EDU)) NIL NIL '
|
| - '<B27397-0100000@cac.washington.edu>) '
|
| - 'BODY (TEXT PLAIN (CHARSET US-ASCII) NIL NIL 7BIT 3028 92))',
|
| - ]
|
| -
|
| - answers = [
|
| - ['BODY.PEEK', ['HEADER.FIELDS.NOT', ['subject', 'bcc', 'cc']], s],
|
| -
|
| - ['FLAGS', [r'\Seen'], 'INTERNALDATE',
|
| - '17-Jul-1996 02:44:25 -0700', 'RFC822.SIZE', '4286', 'ENVELOPE',
|
| - ['Wed, 17 Jul 1996 02:23:25 -0700 (PDT)',
|
| - 'IMAP4rev1 WG mtg summary and minutes', [["Terry Gray", None,
|
| - "gray", "cac.washington.edu"]], [["Terry Gray", None,
|
| - "gray", "cac.washington.edu"]], [["Terry Gray", None,
|
| - "gray", "cac.washington.edu"]], [[None, None, "imap",
|
| - "cac.washington.edu"]], [[None, None, "minutes",
|
| - "CNRI.Reston.VA.US"], ["John Klensin", None, "KLENSIN",
|
| - "INFOODS.MIT.EDU"]], None, None,
|
| - "<B27397-0100000@cac.washington.edu>"], "BODY", ["TEXT", "PLAIN",
|
| - ["CHARSET", "US-ASCII"], None, None, "7BIT", "3028", "92"]],
|
| - ]
|
| -
|
| - for (case, expected) in zip(cases, answers):
|
| - self.assertEquals(imap4.parseNestedParens(case), [expected])
|
| -
|
| - # XXX This code used to work, but changes occurred within the
|
| - # imap4.py module which made it no longer necessary for *all* of it
|
| - # to work. In particular, only the part that makes
|
| - # 'BODY.PEEK[HEADER.FIELDS.NOT (Subject Bcc Cc)]' come out correctly
|
| - # no longer needs to work. So, I am loathe to delete the entire
|
| - # section of the test. --exarkun
|
| - #
|
| -
|
| -# for (case, expected) in zip(answers, cases):
|
| -# self.assertEquals('(' + imap4.collapseNestedLists(case) + ')', expected)
|
| -
|
| - def testFetchParserSimple(self):
|
| - cases = [
|
| - ['ENVELOPE', 'Envelope'],
|
| - ['FLAGS', 'Flags'],
|
| - ['INTERNALDATE', 'InternalDate'],
|
| - ['RFC822.HEADER', 'RFC822Header'],
|
| - ['RFC822.SIZE', 'RFC822Size'],
|
| - ['RFC822.TEXT', 'RFC822Text'],
|
| - ['RFC822', 'RFC822'],
|
| - ['UID', 'UID'],
|
| - ['BODYSTRUCTURE', 'BodyStructure'],
|
| - ]
|
| -
|
| - for (inp, outp) in cases:
|
| - p = imap4._FetchParser()
|
| - p.parseString(inp)
|
| - self.assertEquals(len(p.result), 1)
|
| - self.failUnless(isinstance(p.result[0], getattr(p, outp)))
|
| -
|
| - def testFetchParserMacros(self):
|
| - cases = [
|
| - ['ALL', (4, ['flags', 'internaldate', 'rfc822.size', 'envelope'])],
|
| - ['FULL', (5, ['flags', 'internaldate', 'rfc822.size', 'envelope', 'body'])],
|
| - ['FAST', (3, ['flags', 'internaldate', 'rfc822.size'])],
|
| - ]
|
| -
|
| - for (inp, outp) in cases:
|
| - p = imap4._FetchParser()
|
| - p.parseString(inp)
|
| - self.assertEquals(len(p.result), outp[0])
|
| - p = [str(p).lower() for p in p.result]
|
| - p.sort()
|
| - outp[1].sort()
|
| - self.assertEquals(p, outp[1])
|
| -
|
| - def testFetchParserBody(self):
|
| - P = imap4._FetchParser
|
| -
|
| - p = P()
|
| - p.parseString('BODY')
|
| - self.assertEquals(len(p.result), 1)
|
| - self.failUnless(isinstance(p.result[0], p.Body))
|
| - self.assertEquals(p.result[0].peek, False)
|
| - self.assertEquals(p.result[0].header, None)
|
| - self.assertEquals(str(p.result[0]), 'BODY')
|
| -
|
| - p = P()
|
| - p.parseString('BODY.PEEK')
|
| - self.assertEquals(len(p.result), 1)
|
| - self.failUnless(isinstance(p.result[0], p.Body))
|
| - self.assertEquals(p.result[0].peek, True)
|
| - self.assertEquals(str(p.result[0]), 'BODY')
|
| -
|
| - p = P()
|
| - p.parseString('BODY[]')
|
| - self.assertEquals(len(p.result), 1)
|
| - self.failUnless(isinstance(p.result[0], p.Body))
|
| - self.assertEquals(p.result[0].empty, True)
|
| - self.assertEquals(str(p.result[0]), 'BODY[]')
|
| -
|
| - p = P()
|
| - p.parseString('BODY[HEADER]')
|
| - self.assertEquals(len(p.result), 1)
|
| - self.failUnless(isinstance(p.result[0], p.Body))
|
| - self.assertEquals(p.result[0].peek, False)
|
| - self.failUnless(isinstance(p.result[0].header, p.Header))
|
| - self.assertEquals(p.result[0].header.negate, True)
|
| - self.assertEquals(p.result[0].header.fields, ())
|
| - self.assertEquals(p.result[0].empty, False)
|
| - self.assertEquals(str(p.result[0]), 'BODY[HEADER]')
|
| -
|
| - p = P()
|
| - p.parseString('BODY.PEEK[HEADER]')
|
| - self.assertEquals(len(p.result), 1)
|
| - self.failUnless(isinstance(p.result[0], p.Body))
|
| - self.assertEquals(p.result[0].peek, True)
|
| - self.failUnless(isinstance(p.result[0].header, p.Header))
|
| - self.assertEquals(p.result[0].header.negate, True)
|
| - self.assertEquals(p.result[0].header.fields, ())
|
| - self.assertEquals(p.result[0].empty, False)
|
| - self.assertEquals(str(p.result[0]), 'BODY[HEADER]')
|
| -
|
| - p = P()
|
| - p.parseString('BODY[HEADER.FIELDS (Subject Cc Message-Id)]')
|
| - self.assertEquals(len(p.result), 1)
|
| - self.failUnless(isinstance(p.result[0], p.Body))
|
| - self.assertEquals(p.result[0].peek, False)
|
| - self.failUnless(isinstance(p.result[0].header, p.Header))
|
| - self.assertEquals(p.result[0].header.negate, False)
|
| - self.assertEquals(p.result[0].header.fields, ['SUBJECT', 'CC', 'MESSAGE-ID'])
|
| - self.assertEquals(p.result[0].empty, False)
|
| - self.assertEquals(str(p.result[0]), 'BODY[HEADER.FIELDS (Subject Cc Message-Id)]')
|
| -
|
| - p = P()
|
| - p.parseString('BODY.PEEK[HEADER.FIELDS (Subject Cc Message-Id)]')
|
| - self.assertEquals(len(p.result), 1)
|
| - self.failUnless(isinstance(p.result[0], p.Body))
|
| - self.assertEquals(p.result[0].peek, True)
|
| - self.failUnless(isinstance(p.result[0].header, p.Header))
|
| - self.assertEquals(p.result[0].header.negate, False)
|
| - self.assertEquals(p.result[0].header.fields, ['SUBJECT', 'CC', 'MESSAGE-ID'])
|
| - self.assertEquals(p.result[0].empty, False)
|
| - self.assertEquals(str(p.result[0]), 'BODY[HEADER.FIELDS (Subject Cc Message-Id)]')
|
| -
|
| - p = P()
|
| - p.parseString('BODY.PEEK[HEADER.FIELDS.NOT (Subject Cc Message-Id)]')
|
| - self.assertEquals(len(p.result), 1)
|
| - self.failUnless(isinstance(p.result[0], p.Body))
|
| - self.assertEquals(p.result[0].peek, True)
|
| - self.failUnless(isinstance(p.result[0].header, p.Header))
|
| - self.assertEquals(p.result[0].header.negate, True)
|
| - self.assertEquals(p.result[0].header.fields, ['SUBJECT', 'CC', 'MESSAGE-ID'])
|
| - self.assertEquals(p.result[0].empty, False)
|
| - self.assertEquals(str(p.result[0]), 'BODY[HEADER.FIELDS.NOT (Subject Cc Message-Id)]')
|
| -
|
| - p = P()
|
| - p.parseString('BODY[1.MIME]<10.50>')
|
| - self.assertEquals(len(p.result), 1)
|
| - self.failUnless(isinstance(p.result[0], p.Body))
|
| - self.assertEquals(p.result[0].peek, False)
|
| - self.failUnless(isinstance(p.result[0].mime, p.MIME))
|
| - self.assertEquals(p.result[0].part, (0,))
|
| - self.assertEquals(p.result[0].partialBegin, 10)
|
| - self.assertEquals(p.result[0].partialLength, 50)
|
| - self.assertEquals(p.result[0].empty, False)
|
| - self.assertEquals(str(p.result[0]), 'BODY[1.MIME]<10.50>')
|
| -
|
| - p = P()
|
| - p.parseString('BODY.PEEK[1.3.9.11.HEADER.FIELDS.NOT (Message-Id Date)]<103.69>')
|
| - self.assertEquals(len(p.result), 1)
|
| - self.failUnless(isinstance(p.result[0], p.Body))
|
| - self.assertEquals(p.result[0].peek, True)
|
| - self.failUnless(isinstance(p.result[0].header, p.Header))
|
| - self.assertEquals(p.result[0].part, (0, 2, 8, 10))
|
| - self.assertEquals(p.result[0].header.fields, ['MESSAGE-ID', 'DATE'])
|
| - self.assertEquals(p.result[0].partialBegin, 103)
|
| - self.assertEquals(p.result[0].partialLength, 69)
|
| - self.assertEquals(p.result[0].empty, False)
|
| - self.assertEquals(str(p.result[0]), 'BODY[1.3.9.11.HEADER.FIELDS.NOT (Message-Id Date)]<103.69>')
|
| -
|
| -
|
| - def testFiles(self):
|
| - inputStructure = [
|
| - 'foo', 'bar', 'baz', StringIO('this is a file\r\n'), 'buz'
|
| - ]
|
| -
|
| - output = '"foo" "bar" "baz" {16}\r\nthis is a file\r\n "buz"'
|
| -
|
| - self.assertEquals(imap4.collapseNestedLists(inputStructure), output)
|
| -
|
| - def testQuoteAvoider(self):
|
| - input = [
|
| - 'foo', imap4.DontQuoteMe('bar'), "baz", StringIO('this is a file\r\n'),
|
| - imap4.DontQuoteMe('buz'), ""
|
| - ]
|
| -
|
| - output = '"foo" bar "baz" {16}\r\nthis is a file\r\n buz ""'
|
| -
|
| - self.assertEquals(imap4.collapseNestedLists(input), output)
|
| -
|
| - def testLiterals(self):
|
| - cases = [
|
| - ('({10}\r\n0123456789)', [['0123456789']]),
|
| - ]
|
| -
|
| - for (case, expected) in cases:
|
| - self.assertEquals(imap4.parseNestedParens(case), expected)
|
| -
|
| - def testQueryBuilder(self):
|
| - inputs = [
|
| - imap4.Query(flagged=1),
|
| - imap4.Query(sorted=1, unflagged=1, deleted=1),
|
| - imap4.Or(imap4.Query(flagged=1), imap4.Query(deleted=1)),
|
| - imap4.Query(before='today'),
|
| - imap4.Or(
|
| - imap4.Query(deleted=1),
|
| - imap4.Query(unseen=1),
|
| - imap4.Query(new=1)
|
| - ),
|
| - imap4.Or(
|
| - imap4.Not(
|
| - imap4.Or(
|
| - imap4.Query(sorted=1, since='yesterday', smaller=1000),
|
| - imap4.Query(sorted=1, before='tuesday', larger=10000),
|
| - imap4.Query(sorted=1, unseen=1, deleted=1, before='today'),
|
| - imap4.Not(
|
| - imap4.Query(subject='spam')
|
| - ),
|
| - ),
|
| - ),
|
| - imap4.Not(
|
| - imap4.Query(uid='1:5')
|
| - ),
|
| - )
|
| - ]
|
| -
|
| - outputs = [
|
| - 'FLAGGED',
|
| - '(DELETED UNFLAGGED)',
|
| - '(OR FLAGGED DELETED)',
|
| - '(BEFORE "today")',
|
| - '(OR DELETED (OR UNSEEN NEW))',
|
| - '(OR (NOT (OR (SINCE "yesterday" SMALLER 1000) ' # Continuing
|
| - '(OR (BEFORE "tuesday" LARGER 10000) (OR (BEFORE ' # Some more
|
| - '"today" DELETED UNSEEN) (NOT (SUBJECT "spam")))))) ' # And more
|
| - '(NOT (UID 1:5)))',
|
| - ]
|
| -
|
| - for (query, expected) in zip(inputs, outputs):
|
| - self.assertEquals(query, expected)
|
| -
|
| - def testIdListParser(self):
|
| - inputs = [
|
| - '1:*',
|
| - '5:*',
|
| - '1:2,5:*',
|
| - '1',
|
| - '1,2',
|
| - '1,3,5',
|
| - '1:10',
|
| - '1:10,11',
|
| - '1:5,10:20',
|
| - '1,5:10',
|
| - '1,5:10,15:20',
|
| - '1:10,15,20:25',
|
| - ]
|
| -
|
| - outputs = [
|
| - MessageSet(1, None),
|
| - MessageSet(5, None),
|
| - MessageSet(5, None) + MessageSet(1, 2),
|
| - MessageSet(1),
|
| - MessageSet(1, 2),
|
| - MessageSet(1) + MessageSet(3) + MessageSet(5),
|
| - MessageSet(1, 10),
|
| - MessageSet(1, 11),
|
| - MessageSet(1, 5) + MessageSet(10, 20),
|
| - MessageSet(1) + MessageSet(5, 10),
|
| - MessageSet(1) + MessageSet(5, 10) + MessageSet(15, 20),
|
| - MessageSet(1, 10) + MessageSet(15) + MessageSet(20, 25),
|
| - ]
|
| -
|
| - lengths = [
|
| - None, None, None,
|
| - 1, 2, 3, 10, 11, 16, 7, 13, 17,
|
| - ]
|
| -
|
| - for (input, expected) in zip(inputs, outputs):
|
| - self.assertEquals(imap4.parseIdList(input), expected)
|
| -
|
| - for (input, expected) in zip(inputs, lengths):
|
| - try:
|
| - L = len(imap4.parseIdList(input))
|
| - except TypeError:
|
| - L = None
|
| - self.assertEquals(L, expected,
|
| - "len(%r) = %r != %r" % (input, L, expected))
|
| -
|
| -class SimpleMailbox:
|
| - implements(imap4.IMailboxInfo, imap4.IMailbox, imap4.ICloseableMailbox)
|
| -
|
| - flags = ('\\Flag1', 'Flag2', '\\AnotherSysFlag', 'LastFlag')
|
| - messages = []
|
| - mUID = 0
|
| - rw = 1
|
| - closed = False
|
| -
|
| - def __init__(self):
|
| - self.listeners = []
|
| - self.addListener = self.listeners.append
|
| - self.removeListener = self.listeners.remove
|
| -
|
| - def getFlags(self):
|
| - return self.flags
|
| -
|
| - def getUIDValidity(self):
|
| - return 42
|
| -
|
| - def getUIDNext(self):
|
| - return len(self.messages) + 1
|
| -
|
| - def getMessageCount(self):
|
| - return 9
|
| -
|
| - def getRecentCount(self):
|
| - return 3
|
| -
|
| - def getUnseenCount(self):
|
| - return 4
|
| -
|
| - def isWriteable(self):
|
| - return self.rw
|
| -
|
| - def destroy(self):
|
| - pass
|
| -
|
| - def getHierarchicalDelimiter(self):
|
| - return '/'
|
| -
|
| - def requestStatus(self, names):
|
| - r = {}
|
| - if 'MESSAGES' in names:
|
| - r['MESSAGES'] = self.getMessageCount()
|
| - if 'RECENT' in names:
|
| - r['RECENT'] = self.getRecentCount()
|
| - if 'UIDNEXT' in names:
|
| - r['UIDNEXT'] = self.getMessageCount() + 1
|
| - if 'UIDVALIDITY' in names:
|
| - r['UIDVALIDITY'] = self.getUID()
|
| - if 'UNSEEN' in names:
|
| - r['UNSEEN'] = self.getUnseenCount()
|
| - return defer.succeed(r)
|
| -
|
| - def addMessage(self, message, flags, date = None):
|
| - self.messages.append((message, flags, date, self.mUID))
|
| - self.mUID += 1
|
| - return defer.succeed(None)
|
| -
|
| - def expunge(self):
|
| - delete = []
|
| - for i in self.messages:
|
| - if '\\Deleted' in i[1]:
|
| - delete.append(i)
|
| - for i in delete:
|
| - self.messages.remove(i)
|
| - return [i[3] for i in delete]
|
| -
|
| - def close(self):
|
| - self.closed = True
|
| -
|
| -class Account(imap4.MemoryAccount):
|
| - mailboxFactory = SimpleMailbox
|
| - def _emptyMailbox(self, name, id):
|
| - return self.mailboxFactory()
|
| -
|
| - def select(self, name, rw=1):
|
| - mbox = imap4.MemoryAccount.select(self, name)
|
| - if mbox is not None:
|
| - mbox.rw = rw
|
| - return mbox
|
| -
|
| -class SimpleServer(imap4.IMAP4Server):
|
| - def __init__(self, *args, **kw):
|
| - imap4.IMAP4Server.__init__(self, *args, **kw)
|
| - realm = TestRealm()
|
| - realm.theAccount = Account('testuser')
|
| - portal = cred.portal.Portal(realm)
|
| - c = cred.checkers.InMemoryUsernamePasswordDatabaseDontUse()
|
| - self.checker = c
|
| - self.portal = portal
|
| - portal.registerChecker(c)
|
| - self.timeoutTest = False
|
| -
|
| - def lineReceived(self, line):
|
| - if self.timeoutTest:
|
| - #Do not send a respones
|
| - return
|
| -
|
| - imap4.IMAP4Server.lineReceived(self, line)
|
| -
|
| - _username = 'testuser'
|
| - _password = 'password-test'
|
| - def authenticateLogin(self, username, password):
|
| - if username == self._username and password == self._password:
|
| - return imap4.IAccount, self.theAccount, lambda: None
|
| - raise cred.error.UnauthorizedLogin()
|
| -
|
| -
|
| -class SimpleClient(imap4.IMAP4Client):
|
| - def __init__(self, deferred, contextFactory = None):
|
| - imap4.IMAP4Client.__init__(self, contextFactory)
|
| - self.deferred = deferred
|
| - self.events = []
|
| -
|
| - def serverGreeting(self, caps):
|
| - self.deferred.callback(None)
|
| -
|
| - def modeChanged(self, writeable):
|
| - self.events.append(['modeChanged', writeable])
|
| - self.transport.loseConnection()
|
| -
|
| - def flagsChanged(self, newFlags):
|
| - self.events.append(['flagsChanged', newFlags])
|
| - self.transport.loseConnection()
|
| -
|
| - def newMessages(self, exists, recent):
|
| - self.events.append(['newMessages', exists, recent])
|
| - self.transport.loseConnection()
|
| -
|
| - def fetchBodyParts(self, message, parts):
|
| - """Fetch some parts of the body.
|
| -
|
| - @param message: message with parts to fetch
|
| - @type message: C{str}
|
| - @param parts: a list of int/str
|
| - @type parts: C{list}
|
| - """
|
| - cmd = "%s (BODY[%s]" % (message, parts[0])
|
| - for p in parts[1:]:
|
| - cmd += " BODY[%s]" % p
|
| - cmd += ")"
|
| - d = self.sendCommand(imap4.Command("FETCH", cmd,
|
| - wantResponse=("FETCH",)))
|
| - d.addCallback(self.__cb_fetchBodyParts)
|
| - return d
|
| -
|
| - def __cb_fetchBodyParts(self, (lines, last)):
|
| - info = {}
|
| - for line in lines:
|
| - parts = line.split(None, 2)
|
| - if len(parts) == 3:
|
| - if parts[1] == "FETCH":
|
| - try:
|
| - mail_id = int(parts[0])
|
| - except ValueError:
|
| - raise imap4.IllegalServerResponse, line
|
| - else:
|
| - body_parts = imap4.parseNestedParens(parts[2])[0]
|
| - dict_parts = {}
|
| - for i in range(len(body_parts)/3):
|
| - dict_parts[body_parts[3*i+1][0]] = body_parts[3*i+2]
|
| - info[mail_id] = dict_parts
|
| - return info
|
| -
|
| -class IMAP4HelperMixin:
|
| - serverCTX = None
|
| - clientCTX = None
|
| -
|
| - def setUp(self):
|
| - d = defer.Deferred()
|
| - self.server = SimpleServer(contextFactory=self.serverCTX)
|
| - self.client = SimpleClient(d, contextFactory=self.clientCTX)
|
| - self.connected = d
|
| -
|
| - SimpleMailbox.messages = []
|
| - theAccount = Account('testuser')
|
| - theAccount.mboxType = SimpleMailbox
|
| - SimpleServer.theAccount = theAccount
|
| -
|
| - def tearDown(self):
|
| - del self.server
|
| - del self.client
|
| - del self.connected
|
| -
|
| - def _cbStopClient(self, ignore):
|
| - self.client.transport.loseConnection()
|
| -
|
| - def _ebGeneral(self, failure):
|
| - self.client.transport.loseConnection()
|
| - self.server.transport.loseConnection()
|
| - failure.printTraceback(open('failure.log', 'w'))
|
| - failure.printTraceback()
|
| - raise failure.value
|
| -
|
| - def loopback(self):
|
| - return loopback.loopbackAsync(self.server, self.client)
|
| -
|
| -class IMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):
|
| - def testCapability(self):
|
| - caps = {}
|
| - def getCaps():
|
| - def gotCaps(c):
|
| - caps.update(c)
|
| - self.server.transport.loseConnection()
|
| - return self.client.getCapabilities().addCallback(gotCaps)
|
| - d1 = self.connected.addCallback(strip(getCaps)).addErrback(self._ebGeneral)
|
| - d = defer.gatherResults([self.loopback(), d1])
|
| - expected = {'IMAP4rev1': None, 'NAMESPACE': None, 'IDLE': None}
|
| - return d.addCallback(lambda _: self.assertEquals(expected, caps))
|
| -
|
| - def testCapabilityWithAuth(self):
|
| - caps = {}
|
| - self.server.challengers['CRAM-MD5'] = cred.credentials.CramMD5Credentials
|
| - def getCaps():
|
| - def gotCaps(c):
|
| - caps.update(c)
|
| - self.server.transport.loseConnection()
|
| - return self.client.getCapabilities().addCallback(gotCaps)
|
| - d1 = self.connected.addCallback(strip(getCaps)).addErrback(self._ebGeneral)
|
| - d = defer.gatherResults([self.loopback(), d1])
|
| -
|
| - expCap = {'IMAP4rev1': None, 'NAMESPACE': None,
|
| - 'IDLE': None, 'AUTH': ['CRAM-MD5']}
|
| -
|
| - return d.addCallback(lambda _: self.assertEquals(expCap, caps))
|
| -
|
| - def testLogout(self):
|
| - self.loggedOut = 0
|
| - def logout():
|
| - def setLoggedOut():
|
| - self.loggedOut = 1
|
| - self.client.logout().addCallback(strip(setLoggedOut))
|
| - self.connected.addCallback(strip(logout)).addErrback(self._ebGeneral)
|
| - d = self.loopback()
|
| - return d.addCallback(lambda _: self.assertEquals(self.loggedOut, 1))
|
| -
|
| - def testNoop(self):
|
| - self.responses = None
|
| - def noop():
|
| - def setResponses(responses):
|
| - self.responses = responses
|
| - self.server.transport.loseConnection()
|
| - self.client.noop().addCallback(setResponses)
|
| - self.connected.addCallback(strip(noop)).addErrback(self._ebGeneral)
|
| - d = self.loopback()
|
| - return d.addCallback(lambda _: self.assertEquals(self.responses, []))
|
| -
|
| - def testLogin(self):
|
| - def login():
|
| - d = self.client.login('testuser', 'password-test')
|
| - d.addCallback(self._cbStopClient)
|
| - d1 = self.connected.addCallback(strip(login)).addErrback(self._ebGeneral)
|
| - d = defer.gatherResults([d1, self.loopback()])
|
| - return d.addCallback(self._cbTestLogin)
|
| -
|
| - def _cbTestLogin(self, ignored):
|
| - self.assertEquals(self.server.account, SimpleServer.theAccount)
|
| - self.assertEquals(self.server.state, 'auth')
|
| -
|
| - def testFailedLogin(self):
|
| - def login():
|
| - d = self.client.login('testuser', 'wrong-password')
|
| - d.addBoth(self._cbStopClient)
|
| -
|
| - d1 = self.connected.addCallback(strip(login)).addErrback(self._ebGeneral)
|
| - d2 = self.loopback()
|
| - d = defer.gatherResults([d1, d2])
|
| - return d.addCallback(self._cbTestFailedLogin)
|
| -
|
| - def _cbTestFailedLogin(self, ignored):
|
| - self.assertEquals(self.server.account, None)
|
| - self.assertEquals(self.server.state, 'unauth')
|
| -
|
| -
|
| - def testLoginRequiringQuoting(self):
|
| - self.server._username = '{test}user'
|
| - self.server._password = '{test}password'
|
| -
|
| - def login():
|
| - d = self.client.login('{test}user', '{test}password')
|
| - d.addBoth(self._cbStopClient)
|
| -
|
| - d1 = self.connected.addCallback(strip(login)).addErrback(self._ebGeneral)
|
| - d = defer.gatherResults([self.loopback(), d1])
|
| - return d.addCallback(self._cbTestLoginRequiringQuoting)
|
| -
|
| - def _cbTestLoginRequiringQuoting(self, ignored):
|
| - self.assertEquals(self.server.account, SimpleServer.theAccount)
|
| - self.assertEquals(self.server.state, 'auth')
|
| -
|
| -
|
| - def testNamespace(self):
|
| - self.namespaceArgs = None
|
| - def login():
|
| - return self.client.login('testuser', 'password-test')
|
| - def namespace():
|
| - def gotNamespace(args):
|
| - self.namespaceArgs = args
|
| - self._cbStopClient(None)
|
| - return self.client.namespace().addCallback(gotNamespace)
|
| -
|
| - d1 = self.connected.addCallback(strip(login))
|
| - d1.addCallback(strip(namespace))
|
| - d1.addErrback(self._ebGeneral)
|
| - d2 = self.loopback()
|
| - d = defer.gatherResults([d1, d2])
|
| - d.addCallback(lambda _: self.assertEquals(self.namespaceArgs,
|
| - [[['', '/']], [], []]))
|
| - return d
|
| -
|
| - def testSelect(self):
|
| - SimpleServer.theAccount.addMailbox('test-mailbox')
|
| - self.selectedArgs = None
|
| - def login():
|
| - return self.client.login('testuser', 'password-test')
|
| - def select():
|
| - def selected(args):
|
| - self.selectedArgs = args
|
| - self._cbStopClient(None)
|
| - d = self.client.select('test-mailbox')
|
| - d.addCallback(selected)
|
| - return d
|
| -
|
| - d1 = self.connected.addCallback(strip(login))
|
| - d1.addCallback(strip(select))
|
| - d1.addErrback(self._ebGeneral)
|
| - d2 = self.loopback()
|
| - return defer.gatherResults([d1, d2]).addCallback(self._cbTestSelect)
|
| -
|
| - def _cbTestSelect(self, ignored):
|
| - mbox = SimpleServer.theAccount.mailboxes['TEST-MAILBOX']
|
| - self.assertEquals(self.server.mbox, mbox)
|
| - self.assertEquals(self.selectedArgs, {
|
| - 'EXISTS': 9, 'RECENT': 3, 'UIDVALIDITY': 42,
|
| - 'FLAGS': ('\\Flag1', 'Flag2', '\\AnotherSysFlag', 'LastFlag'),
|
| - 'READ-WRITE': 1
|
| - })
|
| -
|
| - def testExamine(self):
|
| - SimpleServer.theAccount.addMailbox('test-mailbox')
|
| - self.examinedArgs = None
|
| - def login():
|
| - return self.client.login('testuser', 'password-test')
|
| - def examine():
|
| - def examined(args):
|
| - self.examinedArgs = args
|
| - self._cbStopClient(None)
|
| - d = self.client.examine('test-mailbox')
|
| - d.addCallback(examined)
|
| - return d
|
| -
|
| - d1 = self.connected.addCallback(strip(login))
|
| - d1.addCallback(strip(examine))
|
| - d1.addErrback(self._ebGeneral)
|
| - d2 = self.loopback()
|
| - d = defer.gatherResults([d1, d2])
|
| - return d.addCallback(self._cbTestExamine)
|
| -
|
| - def _cbTestExamine(self, ignored):
|
| - mbox = SimpleServer.theAccount.mailboxes['TEST-MAILBOX']
|
| - self.assertEquals(self.server.mbox, mbox)
|
| - self.assertEquals(self.examinedArgs, {
|
| - 'EXISTS': 9, 'RECENT': 3, 'UIDVALIDITY': 42,
|
| - 'FLAGS': ('\\Flag1', 'Flag2', '\\AnotherSysFlag', 'LastFlag'),
|
| - 'READ-WRITE': 0
|
| - })
|
| -
|
| - def testCreate(self):
|
| - succeed = ('testbox', 'test/box', 'test/', 'test/box/box', 'INBOX')
|
| - fail = ('testbox', 'test/box')
|
| -
|
| - def cb(): self.result.append(1)
|
| - def eb(failure): self.result.append(0)
|
| -
|
| - def login():
|
| - return self.client.login('testuser', 'password-test')
|
| - def create():
|
| - for name in succeed + fail:
|
| - d = self.client.create(name)
|
| - d.addCallback(strip(cb)).addErrback(eb)
|
| - d.addCallbacks(self._cbStopClient, self._ebGeneral)
|
| -
|
| - self.result = []
|
| - d1 = self.connected.addCallback(strip(login)).addCallback(strip(create))
|
| - d2 = self.loopback()
|
| - d = defer.gatherResults([d1, d2])
|
| - return d.addCallback(self._cbTestCreate, succeed, fail)
|
| -
|
| - def _cbTestCreate(self, ignored, succeed, fail):
|
| - self.assertEquals(self.result, [1] * len(succeed) + [0] * len(fail))
|
| - mbox = SimpleServer.theAccount.mailboxes.keys()
|
| - answers = ['inbox', 'testbox', 'test/box', 'test', 'test/box/box']
|
| - mbox.sort()
|
| - answers.sort()
|
| - self.assertEquals(mbox, [a.upper() for a in answers])
|
| -
|
| - def testDelete(self):
|
| - SimpleServer.theAccount.addMailbox('delete/me')
|
| -
|
| - def login():
|
| - return self.client.login('testuser', 'password-test')
|
| - def delete():
|
| - return self.client.delete('delete/me')
|
| - d1 = self.connected.addCallback(strip(login))
|
| - d1.addCallbacks(strip(delete), self._ebGeneral)
|
| - d1.addCallbacks(self._cbStopClient, self._ebGeneral)
|
| - d2 = self.loopback()
|
| - d = defer.gatherResults([d1, d2])
|
| - d.addCallback(lambda _:
|
| - self.assertEquals(SimpleServer.theAccount.mailboxes.keys(), []))
|
| - return d
|
| -
|
| - def testIllegalInboxDelete(self):
|
| - self.stashed = None
|
| - def login():
|
| - return self.client.login('testuser', 'password-test')
|
| - def delete():
|
| - return self.client.delete('inbox')
|
| - def stash(result):
|
| - self.stashed = result
|
| -
|
| - d1 = self.connected.addCallback(strip(login))
|
| - d1.addCallbacks(strip(delete), self._ebGeneral)
|
| - d1.addBoth(stash)
|
| - d1.addCallbacks(self._cbStopClient, self._ebGeneral)
|
| - d2 = self.loopback()
|
| - d = defer.gatherResults([d1, d2])
|
| - d.addCallback(lambda _: self.failUnless(isinstance(self.stashed,
|
| - failure.Failure)))
|
| - return d
|
| -
|
| -
|
| - def testNonExistentDelete(self):
|
| - def login():
|
| - return self.client.login('testuser', 'password-test')
|
| - def delete():
|
| - return self.client.delete('delete/me')
|
| - def deleteFailed(failure):
|
| - self.failure = failure
|
| -
|
| - self.failure = None
|
| - d1 = self.connected.addCallback(strip(login))
|
| - d1.addCallback(strip(delete)).addErrback(deleteFailed)
|
| - d1.addCallbacks(self._cbStopClient, self._ebGeneral)
|
| - d2 = self.loopback()
|
| - d = defer.gatherResults([d1, d2])
|
| - d.addCallback(lambda _: self.assertEquals(str(self.failure.value),
|
| - 'No such mailbox'))
|
| - return d
|
| -
|
| -
|
| - def testIllegalDelete(self):
|
| - m = SimpleMailbox()
|
| - m.flags = (r'\Noselect',)
|
| - SimpleServer.theAccount.addMailbox('delete', m)
|
| - SimpleServer.theAccount.addMailbox('delete/me')
|
| -
|
| - def login():
|
| - return self.client.login('testuser', 'password-test')
|
| - def delete():
|
| - return self.client.delete('delete')
|
| - def deleteFailed(failure):
|
| - self.failure = failure
|
| -
|
| - self.failure = None
|
| - d1 = self.connected.addCallback(strip(login))
|
| - d1.addCallback(strip(delete)).addErrback(deleteFailed)
|
| - d1.addCallbacks(self._cbStopClient, self._ebGeneral)
|
| - d2 = self.loopback()
|
| - d = defer.gatherResults([d1, d2])
|
| - expected = "Hierarchically inferior mailboxes exist and \\Noselect is set"
|
| - d.addCallback(lambda _:
|
| - self.assertEquals(str(self.failure.value), expected))
|
| - return d
|
| -
|
| - def testRename(self):
|
| - SimpleServer.theAccount.addMailbox('oldmbox')
|
| - def login():
|
| - return self.client.login('testuser', 'password-test')
|
| - def rename():
|
| - return self.client.rename('oldmbox', 'newname')
|
| -
|
| - d1 = self.connected.addCallback(strip(login))
|
| - d1.addCallbacks(strip(rename), self._ebGeneral)
|
| - d1.addCallbacks(self._cbStopClient, self._ebGeneral)
|
| - d2 = self.loopback()
|
| - d = defer.gatherResults([d1, d2])
|
| - d.addCallback(lambda _:
|
| - self.assertEquals(SimpleServer.theAccount.mailboxes.keys(),
|
| - ['NEWNAME']))
|
| - return d
|
| -
|
| - def testIllegalInboxRename(self):
|
| - self.stashed = None
|
| - def login():
|
| - return self.client.login('testuser', 'password-test')
|
| - def rename():
|
| - return self.client.rename('inbox', 'frotz')
|
| - def stash(stuff):
|
| - self.stashed = stuff
|
| -
|
| - d1 = self.connected.addCallback(strip(login))
|
| - d1.addCallbacks(strip(rename), self._ebGeneral)
|
| - d1.addBoth(stash)
|
| - d1.addCallbacks(self._cbStopClient, self._ebGeneral)
|
| - d2 = self.loopback()
|
| - d = defer.gatherResults([d1, d2])
|
| - d.addCallback(lambda _:
|
| - self.failUnless(isinstance(self.stashed, failure.Failure)))
|
| - return d
|
| -
|
| - def testHierarchicalRename(self):
|
| - SimpleServer.theAccount.create('oldmbox/m1')
|
| - SimpleServer.theAccount.create('oldmbox/m2')
|
| - def login():
|
| - return self.client.login('testuser', 'password-test')
|
| - def rename():
|
| - return self.client.rename('oldmbox', 'newname')
|
| -
|
| - d1 = self.connected.addCallback(strip(login))
|
| - d1.addCallbacks(strip(rename), self._ebGeneral)
|
| - d1.addCallbacks(self._cbStopClient, self._ebGeneral)
|
| - d2 = self.loopback()
|
| - d = defer.gatherResults([d1, d2])
|
| - return d.addCallback(self._cbTestHierarchicalRename)
|
| -
|
| - def _cbTestHierarchicalRename(self, ignored):
|
| - mboxes = SimpleServer.theAccount.mailboxes.keys()
|
| - expected = ['newname', 'newname/m1', 'newname/m2']
|
| - mboxes.sort()
|
| - self.assertEquals(mboxes, [s.upper() for s in expected])
|
| -
|
| - def testSubscribe(self):
|
| - def login():
|
| - return self.client.login('testuser', 'password-test')
|
| - def subscribe():
|
| - return self.client.subscribe('this/mbox')
|
| -
|
| - d1 = self.connected.addCallback(strip(login))
|
| - d1.addCallbacks(strip(subscribe), self._ebGeneral)
|
| - d1.addCallbacks(self._cbStopClient, self._ebGeneral)
|
| - d2 = self.loopback()
|
| - d = defer.gatherResults([d1, d2])
|
| - d.addCallback(lambda _:
|
| - self.assertEquals(SimpleServer.theAccount.subscriptions,
|
| - ['THIS/MBOX']))
|
| - return d
|
| -
|
| - def testUnsubscribe(self):
|
| - SimpleServer.theAccount.subscriptions = ['THIS/MBOX', 'THAT/MBOX']
|
| - def login():
|
| - return self.client.login('testuser', 'password-test')
|
| - def unsubscribe():
|
| - return self.client.unsubscribe('this/mbox')
|
| -
|
| - d1 = self.connected.addCallback(strip(login))
|
| - d1.addCallbacks(strip(unsubscribe), self._ebGeneral)
|
| - d1.addCallbacks(self._cbStopClient, self._ebGeneral)
|
| - d2 = self.loopback()
|
| - d = defer.gatherResults([d1, d2])
|
| - d.addCallback(lambda _:
|
| - self.assertEquals(SimpleServer.theAccount.subscriptions,
|
| - ['THAT/MBOX']))
|
| - return d
|
| -
|
| - def _listSetup(self, f):
|
| - SimpleServer.theAccount.addMailbox('root/subthing')
|
| - SimpleServer.theAccount.addMailbox('root/another-thing')
|
| - SimpleServer.theAccount.addMailbox('non-root/subthing')
|
| -
|
| - def login():
|
| - return self.client.login('testuser', 'password-test')
|
| - def listed(answers):
|
| - self.listed = answers
|
| -
|
| - self.listed = None
|
| - d1 = self.connected.addCallback(strip(login))
|
| - d1.addCallbacks(strip(f), self._ebGeneral)
|
| - d1.addCallbacks(listed, self._ebGeneral)
|
| - d1.addCallbacks(self._cbStopClient, self._ebGeneral)
|
| - d2 = self.loopback()
|
| - return defer.gatherResults([d1, d2]).addCallback(lambda _: self.listed)
|
| -
|
| - def testList(self):
|
| - def list():
|
| - return self.client.list('root', '%')
|
| - d = self._listSetup(list)
|
| - d.addCallback(lambda listed: self.assertEquals(
|
| - sortNest(listed),
|
| - sortNest([
|
| - (SimpleMailbox.flags, "/", "ROOT/SUBTHING"),
|
| - (SimpleMailbox.flags, "/", "ROOT/ANOTHER-THING")
|
| - ])
|
| - ))
|
| - return d
|
| -
|
| - def testLSub(self):
|
| - SimpleServer.theAccount.subscribe('ROOT/SUBTHING')
|
| - def lsub():
|
| - return self.client.lsub('root', '%')
|
| - d = self._listSetup(lsub)
|
| - d.addCallback(self.assertEquals,
|
| - [(SimpleMailbox.flags, "/", "ROOT/SUBTHING")])
|
| - return d
|
| -
|
| - def testStatus(self):
|
| - SimpleServer.theAccount.addMailbox('root/subthing')
|
| - def login():
|
| - return self.client.login('testuser', 'password-test')
|
| - def status():
|
| - return self.client.status('root/subthing', 'MESSAGES', 'UIDNEXT', 'UNSEEN')
|
| - def statused(result):
|
| - self.statused = result
|
| -
|
| - self.statused = None
|
| - d1 = self.connected.addCallback(strip(login))
|
| - d1.addCallbacks(strip(status), self._ebGeneral)
|
| - d1.addCallbacks(statused, self._ebGeneral)
|
| - d1.addCallbacks(self._cbStopClient, self._ebGeneral)
|
| - d2 = self.loopback()
|
| - d = defer.gatherResults([d1, d2])
|
| - d.addCallback(lambda _: self.assertEquals(
|
| - self.statused,
|
| - {'MESSAGES': 9, 'UIDNEXT': '10', 'UNSEEN': 4}
|
| - ))
|
| - return d
|
| -
|
| - def testFailedStatus(self):
|
| - def login():
|
| - return self.client.login('testuser', 'password-test')
|
| - def status():
|
| - return self.client.status('root/nonexistent', 'MESSAGES', 'UIDNEXT', 'UNSEEN')
|
| - def statused(result):
|
| - self.statused = result
|
| - def failed(failure):
|
| - self.failure = failure
|
| -
|
| - self.statused = self.failure = None
|
| - d1 = self.connected.addCallback(strip(login))
|
| - d1.addCallbacks(strip(status), self._ebGeneral)
|
| - d1.addCallbacks(statused, failed)
|
| - d1.addCallbacks(self._cbStopClient, self._ebGeneral)
|
| - d2 = self.loopback()
|
| - return defer.gatherResults([d1, d2]).addCallback(self._cbTestFailedStatus)
|
| -
|
| - def _cbTestFailedStatus(self, ignored):
|
| - self.assertEquals(
|
| - self.statused, None
|
| - )
|
| - self.assertEquals(
|
| - self.failure.value.args,
|
| - ('Could not open mailbox',)
|
| - )
|
| -
|
| - def testFullAppend(self):
|
| - infile = util.sibpath(__file__, 'rfc822.message')
|
| - message = open(infile)
|
| - SimpleServer.theAccount.addMailbox('root/subthing')
|
| - def login():
|
| - return self.client.login('testuser', 'password-test')
|
| - def append():
|
| - return self.client.append(
|
| - 'root/subthing',
|
| - message,
|
| - ('\\SEEN', '\\DELETED'),
|
| - 'Tue, 17 Jun 2003 11:22:16 -0600 (MDT)',
|
| - )
|
| -
|
| - d1 = self.connected.addCallback(strip(login))
|
| - d1.addCallbacks(strip(append), self._ebGeneral)
|
| - d1.addCallbacks(self._cbStopClient, self._ebGeneral)
|
| - d2 = self.loopback()
|
| - d = defer.gatherResults([d1, d2])
|
| - return d.addCallback(self._cbTestFullAppend, infile)
|
| -
|
| - def _cbTestFullAppend(self, ignored, infile):
|
| - mb = SimpleServer.theAccount.mailboxes['ROOT/SUBTHING']
|
| - self.assertEquals(1, len(mb.messages))
|
| - self.assertEquals(
|
| - (['\\SEEN', '\\DELETED'], 'Tue, 17 Jun 2003 11:22:16 -0600 (MDT)', 0),
|
| - mb.messages[0][1:]
|
| - )
|
| - self.assertEquals(open(infile).read(), mb.messages[0][0].getvalue())
|
| -
|
| - def testPartialAppend(self):
|
| - infile = util.sibpath(__file__, 'rfc822.message')
|
| - message = open(infile)
|
| - SimpleServer.theAccount.addMailbox('PARTIAL/SUBTHING')
|
| - def login():
|
| - return self.client.login('testuser', 'password-test')
|
| - def append():
|
| - message = file(infile)
|
| - return self.client.sendCommand(
|
| - imap4.Command(
|
| - 'APPEND',
|
| - 'PARTIAL/SUBTHING (\\SEEN) "Right now" {%d}' % os.path.getsize(infile),
|
| - (), self.client._IMAP4Client__cbContinueAppend, message
|
| - )
|
| - )
|
| - d1 = self.connected.addCallback(strip(login))
|
| - d1.addCallbacks(strip(append), self._ebGeneral)
|
| - d1.addCallbacks(self._cbStopClient, self._ebGeneral)
|
| - d2 = self.loopback()
|
| - d = defer.gatherResults([d1, d2])
|
| - return d.addCallback(self._cbTestPartialAppend, infile)
|
| -
|
| - def _cbTestPartialAppend(self, ignored, infile):
|
| - mb = SimpleServer.theAccount.mailboxes['PARTIAL/SUBTHING']
|
| - self.assertEquals(1, len(mb.messages))
|
| - self.assertEquals(
|
| - (['\\SEEN'], 'Right now', 0),
|
| - mb.messages[0][1:]
|
| - )
|
| - self.assertEquals(open(infile).read(), mb.messages[0][0].getvalue())
|
| -
|
| - def testCheck(self):
|
| - SimpleServer.theAccount.addMailbox('root/subthing')
|
| - def login():
|
| - return self.client.login('testuser', 'password-test')
|
| - def select():
|
| - return self.client.select('root/subthing')
|
| - def check():
|
| - return self.client.check()
|
| -
|
| - d = self.connected.addCallback(strip(login))
|
| - d.addCallbacks(strip(select), self._ebGeneral)
|
| - d.addCallbacks(strip(check), self._ebGeneral)
|
| - d.addCallbacks(self._cbStopClient, self._ebGeneral)
|
| - return self.loopback()
|
| -
|
| - # Okay, that was fun
|
| -
|
| - def testClose(self):
|
| - m = SimpleMailbox()
|
| - m.messages = [
|
| - ('Message 1', ('\\Deleted', 'AnotherFlag'), None, 0),
|
| - ('Message 2', ('AnotherFlag',), None, 1),
|
| - ('Message 3', ('\\Deleted',), None, 2),
|
| - ]
|
| - SimpleServer.theAccount.addMailbox('mailbox', m)
|
| - def login():
|
| - return self.client.login('testuser', 'password-test')
|
| - def select():
|
| - return self.client.select('mailbox')
|
| - def close():
|
| - return self.client.close()
|
| -
|
| - d = self.connected.addCallback(strip(login))
|
| - d.addCallbacks(strip(select), self._ebGeneral)
|
| - d.addCallbacks(strip(close), self._ebGeneral)
|
| - d.addCallbacks(self._cbStopClient, self._ebGeneral)
|
| - d2 = self.loopback()
|
| - return defer.gatherResults([d, d2]).addCallback(self._cbTestClose, m)
|
| -
|
| - def _cbTestClose(self, ignored, m):
|
| - self.assertEquals(len(m.messages), 1)
|
| - self.assertEquals(m.messages[0], ('Message 2', ('AnotherFlag',), None, 1))
|
| - self.failUnless(m.closed)
|
| -
|
| - def testExpunge(self):
|
| - m = SimpleMailbox()
|
| - m.messages = [
|
| - ('Message 1', ('\\Deleted', 'AnotherFlag'), None, 0),
|
| - ('Message 2', ('AnotherFlag',), None, 1),
|
| - ('Message 3', ('\\Deleted',), None, 2),
|
| - ]
|
| - SimpleServer.theAccount.addMailbox('mailbox', m)
|
| - def login():
|
| - return self.client.login('testuser', 'password-test')
|
| - def select():
|
| - return self.client.select('mailbox')
|
| - def expunge():
|
| - return self.client.expunge()
|
| - def expunged(results):
|
| - self.failIf(self.server.mbox is None)
|
| - self.results = results
|
| -
|
| - self.results = None
|
| - d1 = self.connected.addCallback(strip(login))
|
| - d1.addCallbacks(strip(select), self._ebGeneral)
|
| - d1.addCallbacks(strip(expunge), self._ebGeneral)
|
| - d1.addCallbacks(expunged, self._ebGeneral)
|
| - d1.addCallbacks(self._cbStopClient, self._ebGeneral)
|
| - d2 = self.loopback()
|
| - d = defer.gatherResults([d1, d2])
|
| - return d.addCallback(self._cbTestExpunge, m)
|
| -
|
| - def _cbTestExpunge(self, ignored, m):
|
| - self.assertEquals(len(m.messages), 1)
|
| - self.assertEquals(m.messages[0], ('Message 2', ('AnotherFlag',), None, 1))
|
| -
|
| - self.assertEquals(self.results, [0, 2])
|
| -
|
| -class TestRealm:
|
| - theAccount = None
|
| -
|
| - def requestAvatar(self, avatarId, mind, *interfaces):
|
| - return imap4.IAccount, self.theAccount, lambda: None
|
| -
|
| -class TestChecker:
|
| - credentialInterfaces = (cred.credentials.IUsernameHashedPassword, cred.credentials.IUsernamePassword)
|
| -
|
| - users = {
|
| - 'testuser': 'secret'
|
| - }
|
| -
|
| - def requestAvatarId(self, credentials):
|
| - if credentials.username in self.users:
|
| - return defer.maybeDeferred(
|
| - credentials.checkPassword, self.users[credentials.username]
|
| - ).addCallback(self._cbCheck, credentials.username)
|
| -
|
| - def _cbCheck(self, result, username):
|
| - if result:
|
| - return username
|
| - raise cred.error.UnauthorizedLogin()
|
| -
|
| -class AuthenticatorTestCase(IMAP4HelperMixin, unittest.TestCase):
|
| - def setUp(self):
|
| - IMAP4HelperMixin.setUp(self)
|
| -
|
| - realm = TestRealm()
|
| - realm.theAccount = Account('testuser')
|
| - portal = cred.portal.Portal(realm)
|
| - portal.registerChecker(TestChecker())
|
| - self.server.portal = portal
|
| -
|
| - self.authenticated = 0
|
| - self.account = realm.theAccount
|
| -
|
| - def testCramMD5(self):
|
| - self.server.challengers['CRAM-MD5'] = cred.credentials.CramMD5Credentials
|
| - cAuth = imap4.CramMD5ClientAuthenticator('testuser')
|
| - self.client.registerAuthenticator(cAuth)
|
| -
|
| - def auth():
|
| - return self.client.authenticate('secret')
|
| - def authed():
|
| - self.authenticated = 1
|
| -
|
| - d1 = self.connected.addCallback(strip(auth))
|
| - d1.addCallbacks(strip(authed), self._ebGeneral)
|
| - d1.addCallbacks(self._cbStopClient, self._ebGeneral)
|
| - d2 = self.loopback()
|
| - d = defer.gatherResults([d1, d2])
|
| - return d.addCallback(self._cbTestCramMD5)
|
| -
|
| - def _cbTestCramMD5(self, ignored):
|
| - self.assertEquals(self.authenticated, 1)
|
| - self.assertEquals(self.server.account, self.account)
|
| -
|
| - def testFailedCramMD5(self):
|
| - self.server.challengers['CRAM-MD5'] = cred.credentials.CramMD5Credentials
|
| - cAuth = imap4.CramMD5ClientAuthenticator('testuser')
|
| - self.client.registerAuthenticator(cAuth)
|
| -
|
| - def misauth():
|
| - return self.client.authenticate('not the secret')
|
| - def authed():
|
| - self.authenticated = 1
|
| - def misauthed():
|
| - self.authenticated = -1
|
| -
|
| - d1 = self.connected.addCallback(strip(misauth))
|
| - d1.addCallbacks(strip(authed), strip(misauthed))
|
| - d1.addCallbacks(self._cbStopClient, self._ebGeneral)
|
| - d = defer.gatherResults([self.loopback(), d1])
|
| - return d.addCallback(self._cbTestFailedCramMD5)
|
| -
|
| - def _cbTestFailedCramMD5(self, ignored):
|
| - self.assertEquals(self.authenticated, -1)
|
| - self.assertEquals(self.server.account, None)
|
| -
|
| - def testLOGIN(self):
|
| - self.server.challengers['LOGIN'] = imap4.LOGINCredentials
|
| - cAuth = imap4.LOGINAuthenticator('testuser')
|
| - self.client.registerAuthenticator(cAuth)
|
| -
|
| - def auth():
|
| - return self.client.authenticate('secret')
|
| - def authed():
|
| - self.authenticated = 1
|
| -
|
| - d1 = self.connected.addCallback(strip(auth))
|
| - d1.addCallbacks(strip(authed), self._ebGeneral)
|
| - d1.addCallbacks(self._cbStopClient, self._ebGeneral)
|
| - d = defer.gatherResults([self.loopback(), d1])
|
| - return d.addCallback(self._cbTestLOGIN)
|
| -
|
| - def _cbTestLOGIN(self, ignored):
|
| - self.assertEquals(self.authenticated, 1)
|
| - self.assertEquals(self.server.account, self.account)
|
| -
|
| - def testFailedLOGIN(self):
|
| - self.server.challengers['LOGIN'] = imap4.LOGINCredentials
|
| - cAuth = imap4.LOGINAuthenticator('testuser')
|
| - self.client.registerAuthenticator(cAuth)
|
| -
|
| - def misauth():
|
| - return self.client.authenticate('not the secret')
|
| - def authed():
|
| - self.authenticated = 1
|
| - def misauthed():
|
| - self.authenticated = -1
|
| -
|
| - d1 = self.connected.addCallback(strip(misauth))
|
| - d1.addCallbacks(strip(authed), strip(misauthed))
|
| - d1.addCallbacks(self._cbStopClient, self._ebGeneral)
|
| - d = defer.gatherResults([self.loopback(), d1])
|
| - return d.addCallback(self._cbTestFailedLOGIN)
|
| -
|
| - def _cbTestFailedLOGIN(self, ignored):
|
| - self.assertEquals(self.authenticated, -1)
|
| - self.assertEquals(self.server.account, None)
|
| -
|
| - def testPLAIN(self):
|
| - self.server.challengers['PLAIN'] = imap4.PLAINCredentials
|
| - cAuth = imap4.PLAINAuthenticator('testuser')
|
| - self.client.registerAuthenticator(cAuth)
|
| -
|
| - def auth():
|
| - return self.client.authenticate('secret')
|
| - def authed():
|
| - self.authenticated = 1
|
| -
|
| - d1 = self.connected.addCallback(strip(auth))
|
| - d1.addCallbacks(strip(authed), self._ebGeneral)
|
| - d1.addCallbacks(self._cbStopClient, self._ebGeneral)
|
| - d = defer.gatherResults([self.loopback(), d1])
|
| - return d.addCallback(self._cbTestPLAIN)
|
| -
|
| - def _cbTestPLAIN(self, ignored):
|
| - self.assertEquals(self.authenticated, 1)
|
| - self.assertEquals(self.server.account, self.account)
|
| -
|
| - def testFailedPLAIN(self):
|
| - self.server.challengers['PLAIN'] = imap4.PLAINCredentials
|
| - cAuth = imap4.PLAINAuthenticator('testuser')
|
| - self.client.registerAuthenticator(cAuth)
|
| -
|
| - def misauth():
|
| - return self.client.authenticate('not the secret')
|
| - def authed():
|
| - self.authenticated = 1
|
| - def misauthed():
|
| - self.authenticated = -1
|
| -
|
| - d1 = self.connected.addCallback(strip(misauth))
|
| - d1.addCallbacks(strip(authed), strip(misauthed))
|
| - d1.addCallbacks(self._cbStopClient, self._ebGeneral)
|
| - d = defer.gatherResults([self.loopback(), d1])
|
| - return d.addCallback(self._cbTestFailedPLAIN)
|
| -
|
| - def _cbTestFailedPLAIN(self, ignored):
|
| - self.assertEquals(self.authenticated, -1)
|
| - self.assertEquals(self.server.account, None)
|
| -
|
| -
|
| -class UnsolicitedResponseTestCase(IMAP4HelperMixin, unittest.TestCase):
|
| - def testReadWrite(self):
|
| - def login():
|
| - return self.client.login('testuser', 'password-test')
|
| - def loggedIn():
|
| - self.server.modeChanged(1)
|
| -
|
| - d1 = self.connected.addCallback(strip(login))
|
| - d1.addCallback(strip(loggedIn)).addErrback(self._ebGeneral)
|
| - d = defer.gatherResults([self.loopback(), d1])
|
| - return d.addCallback(self._cbTestReadWrite)
|
| -
|
| - def _cbTestReadWrite(self, ignored):
|
| - E = self.client.events
|
| - self.assertEquals(E, [['modeChanged', 1]])
|
| -
|
| - def testReadOnly(self):
|
| - def login():
|
| - return self.client.login('testuser', 'password-test')
|
| - def loggedIn():
|
| - self.server.modeChanged(0)
|
| -
|
| - d1 = self.connected.addCallback(strip(login))
|
| - d1.addCallback(strip(loggedIn)).addErrback(self._ebGeneral)
|
| - d = defer.gatherResults([self.loopback(), d1])
|
| - return d.addCallback(self._cbTestReadOnly)
|
| -
|
| - def _cbTestReadOnly(self, ignored):
|
| - E = self.client.events
|
| - self.assertEquals(E, [['modeChanged', 0]])
|
| -
|
| - def testFlagChange(self):
|
| - flags = {
|
| - 1: ['\\Answered', '\\Deleted'],
|
| - 5: [],
|
| - 10: ['\\Recent']
|
| - }
|
| - def login():
|
| - return self.client.login('testuser', 'password-test')
|
| - def loggedIn():
|
| - self.server.flagsChanged(flags)
|
| -
|
| - d1 = self.connected.addCallback(strip(login))
|
| - d1.addCallback(strip(loggedIn)).addErrback(self._ebGeneral)
|
| - d = defer.gatherResults([self.loopback(), d1])
|
| - return d.addCallback(self._cbTestFlagChange, flags)
|
| -
|
| - def _cbTestFlagChange(self, ignored, flags):
|
| - E = self.client.events
|
| - expect = [['flagsChanged', {x[0]: x[1]}] for x in flags.items()]
|
| - E.sort()
|
| - expect.sort()
|
| - self.assertEquals(E, expect)
|
| -
|
| - def testNewMessages(self):
|
| - def login():
|
| - return self.client.login('testuser', 'password-test')
|
| - def loggedIn():
|
| - self.server.newMessages(10, None)
|
| -
|
| - d1 = self.connected.addCallback(strip(login))
|
| - d1.addCallback(strip(loggedIn)).addErrback(self._ebGeneral)
|
| - d = defer.gatherResults([self.loopback(), d1])
|
| - return d.addCallback(self._cbTestNewMessages)
|
| -
|
| - def _cbTestNewMessages(self, ignored):
|
| - E = self.client.events
|
| - self.assertEquals(E, [['newMessages', 10, None]])
|
| -
|
| - def testNewRecentMessages(self):
|
| - def login():
|
| - return self.client.login('testuser', 'password-test')
|
| - def loggedIn():
|
| - self.server.newMessages(None, 10)
|
| -
|
| - d1 = self.connected.addCallback(strip(login))
|
| - d1.addCallback(strip(loggedIn)).addErrback(self._ebGeneral)
|
| - d = defer.gatherResults([self.loopback(), d1])
|
| - return d.addCallback(self._cbTestNewRecentMessages)
|
| -
|
| - def _cbTestNewRecentMessages(self, ignored):
|
| - E = self.client.events
|
| - self.assertEquals(E, [['newMessages', None, 10]])
|
| -
|
| - def testNewMessagesAndRecent(self):
|
| - def login():
|
| - return self.client.login('testuser', 'password-test')
|
| - def loggedIn():
|
| - self.server.newMessages(20, 10)
|
| -
|
| - d1 = self.connected.addCallback(strip(login))
|
| - d1.addCallback(strip(loggedIn)).addErrback(self._ebGeneral)
|
| - d = defer.gatherResults([self.loopback(), d1])
|
| - return d.addCallback(self._cbTestNewMessagesAndRecent)
|
| -
|
| - def _cbTestNewMessagesAndRecent(self, ignored):
|
| - E = self.client.events
|
| - self.assertEquals(E, [['newMessages', 20, None], ['newMessages', None, 10]])
|
| -
|
| -
|
| -class ClientCapabilityTests(unittest.TestCase):
|
| - """
|
| - Tests for issuance of the CAPABILITY command and handling of its response.
|
| - """
|
| - def setUp(self):
|
| - """
|
| - Create an L{imap4.IMAP4Client} connected to a L{StringTransport}.
|
| - """
|
| - self.transport = StringTransport()
|
| - self.protocol = imap4.IMAP4Client()
|
| - self.protocol.makeConnection(self.transport)
|
| - self.protocol.dataReceived('* OK [IMAP4rev1]\r\n')
|
| -
|
| -
|
| - def test_simpleAtoms(self):
|
| - """
|
| - A capability response consisting only of atoms without C{'='} in them
|
| - should result in a dict mapping those atoms to C{None}.
|
| - """
|
| - capabilitiesResult = self.protocol.getCapabilities(useCache=False)
|
| - self.protocol.dataReceived('* CAPABILITY IMAP4rev1 LOGINDISABLED\r\n')
|
| - self.protocol.dataReceived('0001 OK Capability completed.\r\n')
|
| - def gotCapabilities(capabilities):
|
| - self.assertEqual(
|
| - capabilities, {'IMAP4rev1': None, 'LOGINDISABLED': None})
|
| - capabilitiesResult.addCallback(gotCapabilities)
|
| - return capabilitiesResult
|
| -
|
| -
|
| - def test_categoryAtoms(self):
|
| - """
|
| - A capability response consisting of atoms including C{'='} should have
|
| - those atoms split on that byte and have capabilities in the same
|
| - category aggregated into lists in the resulting dictionary.
|
| -
|
| - (n.b. - I made up the word "category atom"; the protocol has no notion
|
| - of structure here, but rather allows each capability to define the
|
| - semantics of its entry in the capability response in a freeform manner.
|
| - If I had realized this earlier, the API for capabilities would look
|
| - different. As it is, we can hope that no one defines any crazy
|
| - semantics which are incompatible with this API, or try to figure out a
|
| - better API when someone does. -exarkun)
|
| - """
|
| - capabilitiesResult = self.protocol.getCapabilities(useCache=False)
|
| - self.protocol.dataReceived('* CAPABILITY IMAP4rev1 AUTH=LOGIN AUTH=PLAIN\r\n')
|
| - self.protocol.dataReceived('0001 OK Capability completed.\r\n')
|
| - def gotCapabilities(capabilities):
|
| - self.assertEqual(
|
| - capabilities, {'IMAP4rev1': None, 'AUTH': ['LOGIN', 'PLAIN']})
|
| - capabilitiesResult.addCallback(gotCapabilities)
|
| - return capabilitiesResult
|
| -
|
| -
|
| - def test_mixedAtoms(self):
|
| - """
|
| - A capability response consisting of both simple and category atoms of
|
| - the same type should result in a list containing C{None} as well as the
|
| - values for the category.
|
| - """
|
| - capabilitiesResult = self.protocol.getCapabilities(useCache=False)
|
| - # Exercise codepath for both orderings of =-having and =-missing
|
| - # capabilities.
|
| - self.protocol.dataReceived(
|
| - '* CAPABILITY IMAP4rev1 FOO FOO=BAR BAR=FOO BAR\r\n')
|
| - self.protocol.dataReceived('0001 OK Capability completed.\r\n')
|
| - def gotCapabilities(capabilities):
|
| - self.assertEqual(capabilities, {'IMAP4rev1': None,
|
| - 'FOO': [None, 'BAR'],
|
| - 'BAR': ['FOO', None]})
|
| - capabilitiesResult.addCallback(gotCapabilities)
|
| - return capabilitiesResult
|
| -
|
| -
|
| -
|
| -
|
| -class HandCraftedTestCase(IMAP4HelperMixin, unittest.TestCase):
|
| - def testTrailingLiteral(self):
|
| - transport = StringTransport()
|
| - c = imap4.IMAP4Client()
|
| - c.makeConnection(transport)
|
| - c.lineReceived('* OK [IMAP4rev1]')
|
| -
|
| - def cbSelect(ignored):
|
| - d = c.fetchMessage('1')
|
| - c.dataReceived('* 1 FETCH (RFC822 {10}\r\n0123456789\r\n RFC822.SIZE 10)\r\n')
|
| - c.dataReceived('0003 OK FETCH\r\n')
|
| - return d
|
| -
|
| - def cbLogin(ignored):
|
| - d = c.select('inbox')
|
| - c.lineReceived('0002 OK SELECT')
|
| - d.addCallback(cbSelect)
|
| - return d
|
| -
|
| - d = c.login('blah', 'blah')
|
| - c.dataReceived('0001 OK LOGIN\r\n')
|
| - d.addCallback(cbLogin)
|
| - return d
|
| -
|
| - def testPathelogicalScatteringOfLiterals(self):
|
| - self.server.checker.addUser('testuser', 'password-test')
|
| - transport = StringTransport()
|
| - self.server.makeConnection(transport)
|
| -
|
| - transport.clear()
|
| - self.server.dataReceived("01 LOGIN {8}\r\n")
|
| - self.assertEquals(transport.value(), "+ Ready for 8 octets of text\r\n")
|
| -
|
| - transport.clear()
|
| - self.server.dataReceived("testuser {13}\r\n")
|
| - self.assertEquals(transport.value(), "+ Ready for 13 octets of text\r\n")
|
| -
|
| - transport.clear()
|
| - self.server.dataReceived("password-test\r\n")
|
| - self.assertEquals(transport.value(), "01 OK LOGIN succeeded\r\n")
|
| - self.assertEquals(self.server.state, 'auth')
|
| -
|
| - self.server.connectionLost(error.ConnectionDone("Connection done."))
|
| -
|
| - def testUnsolicitedResponseMixedWithSolicitedResponse(self):
|
| -
|
| - class StillSimplerClient(imap4.IMAP4Client):
|
| - events = []
|
| - def flagsChanged(self, newFlags):
|
| - self.events.append(['flagsChanged', newFlags])
|
| -
|
| - transport = StringTransport()
|
| - c = StillSimplerClient()
|
| - c.makeConnection(transport)
|
| - c.lineReceived('* OK [IMAP4rev1]')
|
| -
|
| - def login():
|
| - d = c.login('blah', 'blah')
|
| - c.dataReceived('0001 OK LOGIN\r\n')
|
| - return d
|
| - def select():
|
| - d = c.select('inbox')
|
| - c.lineReceived('0002 OK SELECT')
|
| - return d
|
| - def fetch():
|
| - d = c.fetchSpecific('1:*',
|
| - headerType='HEADER.FIELDS',
|
| - headerArgs=['SUBJECT'])
|
| - c.dataReceived('* 1 FETCH (BODY[HEADER.FIELDS ("SUBJECT")] {38}\r\n')
|
| - c.dataReceived('Subject: Suprise for your woman...\r\n')
|
| - c.dataReceived('\r\n')
|
| - c.dataReceived(')\r\n')
|
| - c.dataReceived('* 1 FETCH (FLAGS (\Seen))\r\n')
|
| - c.dataReceived('* 2 FETCH (BODY[HEADER.FIELDS ("SUBJECT")] {75}\r\n')
|
| - c.dataReceived('Subject: What you been doing. Order your meds here . ,. handcuff madsen\r\n')
|
| - c.dataReceived('\r\n')
|
| - c.dataReceived(')\r\n')
|
| - c.dataReceived('0003 OK FETCH completed\r\n')
|
| - return d
|
| - def test(res):
|
| - self.assertEquals(res, {
|
| - 1: [['BODY', ['HEADER.FIELDS', ['SUBJECT']],
|
| - 'Subject: Suprise for your woman...\r\n\r\n']],
|
| - 2: [['BODY', ['HEADER.FIELDS', ['SUBJECT']],
|
| - 'Subject: What you been doing. Order your meds here . ,. handcuff madsen\r\n\r\n']]
|
| - })
|
| -
|
| - self.assertEquals(c.events, [['flagsChanged', {1: ['\\Seen']}]])
|
| -
|
| - return login(
|
| - ).addCallback(strip(select)
|
| - ).addCallback(strip(fetch)
|
| - ).addCallback(test)
|
| -
|
| -
|
| - def test_literalWithoutPrecedingWhitespace(self):
|
| - """
|
| - Literals should be recognized even when they are not preceded by
|
| - whitespace.
|
| - """
|
| - transport = StringTransport()
|
| - protocol = imap4.IMAP4Client()
|
| -
|
| - protocol.makeConnection(transport)
|
| - protocol.lineReceived('* OK [IMAP4rev1]')
|
| -
|
| - def login():
|
| - d = protocol.login('blah', 'blah')
|
| - protocol.dataReceived('0001 OK LOGIN\r\n')
|
| - return d
|
| - def select():
|
| - d = protocol.select('inbox')
|
| - protocol.lineReceived('0002 OK SELECT')
|
| - return d
|
| - def fetch():
|
| - d = protocol.fetchSpecific('1:*',
|
| - headerType='HEADER.FIELDS',
|
| - headerArgs=['SUBJECT'])
|
| - protocol.dataReceived(
|
| - '* 1 FETCH (BODY[HEADER.FIELDS ({7}\r\nSUBJECT)] "Hello")\r\n')
|
| - protocol.dataReceived('0003 OK FETCH completed\r\n')
|
| - return d
|
| - def test(result):
|
| - self.assertEqual(
|
| - result, {1: [['BODY', ['HEADER.FIELDS', ['SUBJECT']], 'Hello']]})
|
| -
|
| - d = login()
|
| - d.addCallback(strip(select))
|
| - d.addCallback(strip(fetch))
|
| - d.addCallback(test)
|
| - return d
|
| -
|
| -
|
| - def test_nonIntegerLiteralLength(self):
|
| - """
|
| - If the server sends a literal length which cannot be parsed as an
|
| - integer, L{IMAP4Client.lineReceived} should cause the protocol to be
|
| - disconnected by raising L{imap4.IllegalServerResponse}.
|
| - """
|
| - transport = StringTransport()
|
| - protocol = imap4.IMAP4Client()
|
| -
|
| - protocol.makeConnection(transport)
|
| - protocol.lineReceived('* OK [IMAP4rev1]')
|
| -
|
| - def login():
|
| - d = protocol.login('blah', 'blah')
|
| - protocol.dataReceived('0001 OK LOGIN\r\n')
|
| - return d
|
| - def select():
|
| - d = protocol.select('inbox')
|
| - protocol.lineReceived('0002 OK SELECT')
|
| - return d
|
| - def fetch():
|
| - d = protocol.fetchSpecific('1:*',
|
| - headerType='HEADER.FIELDS',
|
| - headerArgs=['SUBJECT'])
|
| - self.assertRaises(
|
| - imap4.IllegalServerResponse,
|
| - protocol.dataReceived,
|
| - '* 1 FETCH {xyz}\r\n...')
|
| - d = login()
|
| - d.addCallback(strip(select))
|
| - d.addCallback(strip(fetch))
|
| - return d
|
| -
|
| -
|
| -
|
| -class FakeyServer(imap4.IMAP4Server):
|
| - state = 'select'
|
| - timeout = None
|
| -
|
| - def sendServerGreeting(self):
|
| - pass
|
| -
|
| -class FakeyMessage:
|
| - implements(imap4.IMessage)
|
| -
|
| - def __init__(self, headers, flags, date, body, uid, subpart):
|
| - self.headers = headers
|
| - self.flags = flags
|
| - self.body = StringIO(body)
|
| - self.size = len(body)
|
| - self.date = date
|
| - self.uid = uid
|
| - self.subpart = subpart
|
| -
|
| - def getHeaders(self, negate, *names):
|
| - self.got_headers = negate, names
|
| - return self.headers
|
| -
|
| - def getFlags(self):
|
| - return self.flags
|
| -
|
| - def getInternalDate(self):
|
| - return self.date
|
| -
|
| - def getBodyFile(self):
|
| - return self.body
|
| -
|
| - def getSize(self):
|
| - return self.size
|
| -
|
| - def getUID(self):
|
| - return self.uid
|
| -
|
| - def isMultipart(self):
|
| - return self.subpart is not None
|
| -
|
| - def getSubPart(self, part):
|
| - self.got_subpart = part
|
| - return self.subpart[part]
|
| -
|
| -class NewStoreTestCase(unittest.TestCase, IMAP4HelperMixin):
|
| - result = None
|
| - storeArgs = None
|
| -
|
| - def setUp(self):
|
| - self.received_messages = self.received_uid = None
|
| -
|
| - self.server = imap4.IMAP4Server()
|
| - self.server.state = 'select'
|
| - self.server.mbox = self
|
| - self.connected = defer.Deferred()
|
| - self.client = SimpleClient(self.connected)
|
| -
|
| - def addListener(self, x):
|
| - pass
|
| - def removeListener(self, x):
|
| - pass
|
| -
|
| - def store(self, *args, **kw):
|
| - self.storeArgs = args, kw
|
| - return self.response
|
| -
|
| - def _storeWork(self):
|
| - def connected():
|
| - return self.function(self.messages, self.flags, self.silent, self.uid)
|
| - def result(R):
|
| - self.result = R
|
| -
|
| - self.connected.addCallback(strip(connected)
|
| - ).addCallback(result
|
| - ).addCallback(self._cbStopClient
|
| - ).addErrback(self._ebGeneral)
|
| -
|
| - def check(ignored):
|
| - self.assertEquals(self.result, self.expected)
|
| - self.assertEquals(self.storeArgs, self.expectedArgs)
|
| - d = loopback.loopbackTCP(self.server, self.client, noisy=False)
|
| - d.addCallback(check)
|
| - return d
|
| -
|
| - def testSetFlags(self, uid=0):
|
| - self.function = self.client.setFlags
|
| - self.messages = '1,5,9'
|
| - self.flags = ['\\A', '\\B', 'C']
|
| - self.silent = False
|
| - self.uid = uid
|
| - self.response = {
|
| - 1: ['\\A', '\\B', 'C'],
|
| - 5: ['\\A', '\\B', 'C'],
|
| - 9: ['\\A', '\\B', 'C'],
|
| - }
|
| - self.expected = {
|
| - 1: {'FLAGS': ['\\A', '\\B', 'C']},
|
| - 5: {'FLAGS': ['\\A', '\\B', 'C']},
|
| - 9: {'FLAGS': ['\\A', '\\B', 'C']},
|
| - }
|
| - msg = imap4.MessageSet()
|
| - msg.add(1)
|
| - msg.add(5)
|
| - msg.add(9)
|
| - self.expectedArgs = ((msg, ['\\A', '\\B', 'C'], 0), {'uid': 0})
|
| - return self._storeWork()
|
| -
|
| -
|
| -class NewFetchTestCase(unittest.TestCase, IMAP4HelperMixin):
|
| - def setUp(self):
|
| - self.received_messages = self.received_uid = None
|
| - self.result = None
|
| -
|
| - self.server = imap4.IMAP4Server()
|
| - self.server.state = 'select'
|
| - self.server.mbox = self
|
| - self.connected = defer.Deferred()
|
| - self.client = SimpleClient(self.connected)
|
| -
|
| - def addListener(self, x):
|
| - pass
|
| - def removeListener(self, x):
|
| - pass
|
| -
|
| - def fetch(self, messages, uid):
|
| - self.received_messages = messages
|
| - self.received_uid = uid
|
| - return iter(zip(range(len(self.msgObjs)), self.msgObjs))
|
| -
|
| - def _fetchWork(self, uid):
|
| - if uid:
|
| - for (i, msg) in zip(range(len(self.msgObjs)), self.msgObjs):
|
| - self.expected[i]['UID'] = str(msg.getUID())
|
| -
|
| - def result(R):
|
| - self.result = R
|
| -
|
| - self.connected.addCallback(lambda _: self.function(self.messages, uid)
|
| - ).addCallback(result
|
| - ).addCallback(self._cbStopClient
|
| - ).addErrback(self._ebGeneral)
|
| -
|
| - d = loopback.loopbackTCP(self.server, self.client, noisy=False)
|
| - d.addCallback(lambda x : self.assertEquals(self.result, self.expected))
|
| - return d
|
| -
|
| - def testFetchUID(self):
|
| - self.function = lambda m, u: self.client.fetchUID(m)
|
| -
|
| - self.messages = '7'
|
| - self.msgObjs = [
|
| - FakeyMessage({}, (), '', '', 12345, None),
|
| - FakeyMessage({}, (), '', '', 999, None),
|
| - FakeyMessage({}, (), '', '', 10101, None),
|
| - ]
|
| - self.expected = {
|
| - 0: {'UID': '12345'},
|
| - 1: {'UID': '999'},
|
| - 2: {'UID': '10101'},
|
| - }
|
| - return self._fetchWork(0)
|
| -
|
| - def testFetchFlags(self, uid=0):
|
| - self.function = self.client.fetchFlags
|
| - self.messages = '9'
|
| - self.msgObjs = [
|
| - FakeyMessage({}, ['FlagA', 'FlagB', '\\FlagC'], '', '', 54321, None),
|
| - FakeyMessage({}, ['\\FlagC', 'FlagA', 'FlagB'], '', '', 12345, None),
|
| - ]
|
| - self.expected = {
|
| - 0: {'FLAGS': ['FlagA', 'FlagB', '\\FlagC']},
|
| - 1: {'FLAGS': ['\\FlagC', 'FlagA', 'FlagB']},
|
| - }
|
| - return self._fetchWork(uid)
|
| -
|
| - def testFetchFlagsUID(self):
|
| - return self.testFetchFlags(1)
|
| -
|
| - def testFetchInternalDate(self, uid=0):
|
| - self.function = self.client.fetchInternalDate
|
| - self.messages = '13'
|
| - self.msgObjs = [
|
| - FakeyMessage({}, (), 'Fri, 02 Nov 2003 21:25:10 GMT', '', 23232, None),
|
| - FakeyMessage({}, (), 'Thu, 29 Dec 2013 11:31:52 EST', '', 101, None),
|
| - FakeyMessage({}, (), 'Mon, 10 Mar 1992 02:44:30 CST', '', 202, None),
|
| - FakeyMessage({}, (), 'Sat, 11 Jan 2000 14:40:24 PST', '', 303, None),
|
| - ]
|
| - self.expected = {
|
| - 0: {'INTERNALDATE': '02-Nov-2003 21:25:10 +0000'},
|
| - 1: {'INTERNALDATE': '29-Dec-2013 11:31:52 -0500'},
|
| - 2: {'INTERNALDATE': '10-Mar-1992 02:44:30 -0600'},
|
| - 3: {'INTERNALDATE': '11-Jan-2000 14:40:24 -0800'},
|
| - }
|
| - return self._fetchWork(uid)
|
| -
|
| - def testFetchInternalDateUID(self):
|
| - return self.testFetchInternalDate(1)
|
| -
|
| - def testFetchEnvelope(self, uid=0):
|
| - self.function = self.client.fetchEnvelope
|
| - self.messages = '15'
|
| - self.msgObjs = [
|
| - FakeyMessage({
|
| - 'from': 'user@domain', 'to': 'resu@domain',
|
| - 'date': 'thursday', 'subject': 'it is a message',
|
| - 'message-id': 'id-id-id-yayaya'}, (), '', '', 65656,
|
| - None),
|
| - ]
|
| - self.expected = {
|
| - 0: {'ENVELOPE':
|
| - ['thursday', 'it is a message',
|
| - [[None, None, 'user', 'domain']],
|
| - [[None, None, 'user', 'domain']],
|
| - [[None, None, 'user', 'domain']],
|
| - [[None, None, 'resu', 'domain']],
|
| - None, None, None, 'id-id-id-yayaya']
|
| - }
|
| - }
|
| - return self._fetchWork(uid)
|
| -
|
| - def testFetchEnvelopeUID(self):
|
| - return self.testFetchEnvelope(1)
|
| -
|
| - def testFetchBodyStructure(self, uid=0):
|
| - self.function = self.client.fetchBodyStructure
|
| - self.messages = '3:9,10:*'
|
| - self.msgObjs = [FakeyMessage({
|
| - 'content-type': 'text/plain; name=thing; key="value"',
|
| - 'content-id': 'this-is-the-content-id',
|
| - 'content-description': 'describing-the-content-goes-here!',
|
| - 'content-transfer-encoding': '8BIT',
|
| - }, (), '', 'Body\nText\nGoes\nHere\n', 919293, None)]
|
| - self.expected = {0: {'BODYSTRUCTURE': [
|
| - 'text', 'plain', [['name', 'thing'], ['key', 'value']],
|
| - 'this-is-the-content-id', 'describing-the-content-goes-here!',
|
| - '8BIT', '20', '4', None, None, None]}}
|
| - return self._fetchWork(uid)
|
| -
|
| - def testFetchBodyStructureUID(self):
|
| - return self.testFetchBodyStructure(1)
|
| -
|
| - def testFetchSimplifiedBody(self, uid=0):
|
| - self.function = self.client.fetchSimplifiedBody
|
| - self.messages = '21'
|
| - self.msgObjs = [FakeyMessage({}, (), '', 'Yea whatever', 91825,
|
| - [FakeyMessage({'content-type': 'image/jpg'}, (), '',
|
| - 'Body Body Body', None, None
|
| - )]
|
| - )]
|
| - self.expected = {0:
|
| - {'BODY':
|
| - [None, None, [], None, None, None,
|
| - '12'
|
| - ]
|
| - }
|
| - }
|
| -
|
| - return self._fetchWork(uid)
|
| -
|
| - def testFetchSimplifiedBodyUID(self):
|
| - return self.testFetchSimplifiedBody(1)
|
| -
|
| - def testFetchSimplifiedBodyText(self, uid=0):
|
| - self.function = self.client.fetchSimplifiedBody
|
| - self.messages = '21'
|
| - self.msgObjs = [FakeyMessage({'content-type': 'text/plain'},
|
| - (), '', 'Yea whatever', 91825, None)]
|
| - self.expected = {0:
|
| - {'BODY':
|
| - ['text', 'plain', [], None, None, None,
|
| - '12', '1'
|
| - ]
|
| - }
|
| - }
|
| -
|
| - return self._fetchWork(uid)
|
| -
|
| - def testFetchSimplifiedBodyTextUID(self):
|
| - return self.testFetchSimplifiedBodyText(1)
|
| -
|
| - def testFetchSimplifiedBodyRFC822(self, uid=0):
|
| - self.function = self.client.fetchSimplifiedBody
|
| - self.messages = '21'
|
| - self.msgObjs = [FakeyMessage({'content-type': 'message/rfc822'},
|
| - (), '', 'Yea whatever', 91825,
|
| - [FakeyMessage({'content-type': 'image/jpg'}, (), '',
|
| - 'Body Body Body', None, None
|
| - )]
|
| - )]
|
| - self.expected = {0:
|
| - {'BODY':
|
| - ['message', 'rfc822', [], None, None, None,
|
| - '12', [None, None, [[None, None, None]],
|
| - [[None, None, None]], None, None, None,
|
| - None, None, None], ['image', 'jpg', [],
|
| - None, None, None, '14'], '1'
|
| - ]
|
| - }
|
| - }
|
| -
|
| - return self._fetchWork(uid)
|
| -
|
| - def testFetchSimplifiedBodyRFC822UID(self):
|
| - return self.testFetchSimplifiedBodyRFC822(1)
|
| -
|
| - def testFetchMessage(self, uid=0):
|
| - self.function = self.client.fetchMessage
|
| - self.messages = '1,3,7,10101'
|
| - self.msgObjs = [
|
| - FakeyMessage({'Header': 'Value'}, (), '', 'BODY TEXT\r\n', 91, None),
|
| - ]
|
| - self.expected = {
|
| - 0: {'RFC822': 'Header: Value\r\n\r\nBODY TEXT\r\n'}
|
| - }
|
| - return self._fetchWork(uid)
|
| -
|
| - def testFetchMessageUID(self):
|
| - return self.testFetchMessage(1)
|
| -
|
| - def testFetchHeaders(self, uid=0):
|
| - self.function = self.client.fetchHeaders
|
| - self.messages = '9,6,2'
|
| - self.msgObjs = [
|
| - FakeyMessage({'H1': 'V1', 'H2': 'V2'}, (), '', '', 99, None),
|
| - ]
|
| - self.expected = {
|
| - 0: {'RFC822.HEADER': imap4._formatHeaders({'H1': 'V1', 'H2': 'V2'})},
|
| - }
|
| - return self._fetchWork(uid)
|
| -
|
| - def testFetchHeadersUID(self):
|
| - return self.testFetchHeaders(1)
|
| -
|
| - def testFetchBody(self, uid=0):
|
| - self.function = self.client.fetchBody
|
| - self.messages = '1,2,3,4,5,6,7'
|
| - self.msgObjs = [
|
| - FakeyMessage({'Header': 'Value'}, (), '', 'Body goes here\r\n', 171, None),
|
| - ]
|
| - self.expected = {
|
| - 0: {'RFC822.TEXT': 'Body goes here\r\n'},
|
| - }
|
| - return self._fetchWork(uid)
|
| -
|
| - def testFetchBodyUID(self):
|
| - return self.testFetchBody(1)
|
| -
|
| - def testFetchBodyParts(self):
|
| - self.function = self.client.fetchBodyParts
|
| - self.messages = '1'
|
| - parts = [1, 2]
|
| - outerBody = ''
|
| - innerBody1 = 'Contained body message text. Squarge.'
|
| - innerBody2 = 'Secondary <i>message</i> text of squarge body.'
|
| - headers = util.OrderedDict()
|
| - headers['from'] = 'sender@host'
|
| - headers['to'] = 'recipient@domain'
|
| - headers['subject'] = 'booga booga boo'
|
| - headers['content-type'] = 'multipart/alternative; boundary="xyz"'
|
| - innerHeaders = util.OrderedDict()
|
| - innerHeaders['subject'] = 'this is subject text'
|
| - innerHeaders['content-type'] = 'text/plain'
|
| - innerHeaders2 = util.OrderedDict()
|
| - innerHeaders2['subject'] = '<b>this is subject</b>'
|
| - innerHeaders2['content-type'] = 'text/html'
|
| - self.msgObjs = [FakeyMessage(
|
| - headers, (), None, outerBody, 123,
|
| - [FakeyMessage(innerHeaders, (), None, innerBody1, None, None),
|
| - FakeyMessage(innerHeaders2, (), None, innerBody2, None, None)])]
|
| - self.expected = {
|
| - 0: {'1': innerBody1, '2': innerBody2},
|
| - }
|
| -
|
| - def result(R):
|
| - self.result = R
|
| -
|
| - self.connected.addCallback(lambda _: self.function(self.messages, parts))
|
| - self.connected.addCallback(result)
|
| - self.connected.addCallback(self._cbStopClient)
|
| - self.connected.addErrback(self._ebGeneral)
|
| -
|
| - d = loopback.loopbackTCP(self.server, self.client, noisy=False)
|
| - d.addCallback(lambda ign: self.assertEquals(self.result, self.expected))
|
| - return d
|
| -
|
| -
|
| - def test_fetchBodyPartOfNonMultipart(self):
|
| - """
|
| - Single-part messages have an implicit first part which clients
|
| - should be able to retrieve explicitly. Test that a client
|
| - requesting part 1 of a text/plain message receives the body of the
|
| - text/plain part.
|
| - """
|
| - self.function = self.client.fetchBodyParts
|
| - self.messages = '1'
|
| - parts = [1]
|
| - outerBody = 'DA body'
|
| - headers = util.OrderedDict()
|
| - headers['from'] = 'sender@host'
|
| - headers['to'] = 'recipient@domain'
|
| - headers['subject'] = 'booga booga boo'
|
| - headers['content-type'] = 'text/plain'
|
| - self.msgObjs = [FakeyMessage(
|
| - headers, (), None, outerBody, 123, None)]
|
| -
|
| - self.expected = {
|
| - 0: {'1': outerBody},
|
| - }
|
| -
|
| - def result(R):
|
| - self.result = R
|
| -
|
| - self.connected.addCallback(lambda _: self.function(self.messages, parts))
|
| - self.connected.addCallback(result)
|
| - self.connected.addCallback(self._cbStopClient)
|
| - self.connected.addErrback(self._ebGeneral)
|
| -
|
| - d = loopback.loopbackTCP(self.server, self.client, noisy=False)
|
| - d.addCallback(lambda ign: self.assertEquals(self.result, self.expected))
|
| - return d
|
| -
|
| -
|
| - def testFetchSize(self, uid=0):
|
| - self.function = self.client.fetchSize
|
| - self.messages = '1:100,2:*'
|
| - self.msgObjs = [
|
| - FakeyMessage({}, (), '', 'x' * 20, 123, None),
|
| - ]
|
| - self.expected = {
|
| - 0: {'RFC822.SIZE': '20'},
|
| - }
|
| - return self._fetchWork(uid)
|
| -
|
| - def testFetchSizeUID(self):
|
| - return self.testFetchSize(1)
|
| -
|
| - def testFetchFull(self, uid=0):
|
| - self.function = self.client.fetchFull
|
| - self.messages = '1,3'
|
| - self.msgObjs = [
|
| - FakeyMessage({}, ('\\XYZ', '\\YZX', 'Abc'),
|
| - 'Sun, 25 Jul 2010 06:20:30 -0400 (EDT)',
|
| - 'xyz' * 2, 654, None),
|
| - FakeyMessage({}, ('\\One', '\\Two', 'Three'),
|
| - 'Mon, 14 Apr 2003 19:43:44 -0400',
|
| - 'abc' * 4, 555, None),
|
| - ]
|
| - self.expected = {
|
| - 0: {'FLAGS': ['\\XYZ', '\\YZX', 'Abc'],
|
| - 'INTERNALDATE': '25-Jul-2010 06:20:30 -0400',
|
| - 'RFC822.SIZE': '6',
|
| - 'ENVELOPE': [None, None, [[None, None, None]], [[None, None, None]], None, None, None, None, None, None],
|
| - 'BODY': [None, None, [], None, None, None, '6']},
|
| - 1: {'FLAGS': ['\\One', '\\Two', 'Three'],
|
| - 'INTERNALDATE': '14-Apr-2003 19:43:44 -0400',
|
| - 'RFC822.SIZE': '12',
|
| - 'ENVELOPE': [None, None, [[None, None, None]], [[None, None, None]], None, None, None, None, None, None],
|
| - 'BODY': [None, None, [], None, None, None, '12']},
|
| - }
|
| - return self._fetchWork(uid)
|
| -
|
| - def testFetchFullUID(self):
|
| - return self.testFetchFull(1)
|
| -
|
| - def testFetchAll(self, uid=0):
|
| - self.function = self.client.fetchAll
|
| - self.messages = '1,2:3'
|
| - self.msgObjs = [
|
| - FakeyMessage({}, (), 'Mon, 14 Apr 2003 19:43:44 +0400',
|
| - 'Lalala', 10101, None),
|
| - FakeyMessage({}, (), 'Tue, 15 Apr 2003 19:43:44 +0200',
|
| - 'Alalal', 20202, None),
|
| - ]
|
| - self.expected = {
|
| - 0: {'ENVELOPE': [None, None, [[None, None, None]], [[None, None, None]], None, None, None, None, None, None],
|
| - 'RFC822.SIZE': '6',
|
| - 'INTERNALDATE': '14-Apr-2003 19:43:44 +0400',
|
| - 'FLAGS': []},
|
| - 1: {'ENVELOPE': [None, None, [[None, None, None]], [[None, None, None]], None, None, None, None, None, None],
|
| - 'RFC822.SIZE': '6',
|
| - 'INTERNALDATE': '15-Apr-2003 19:43:44 +0200',
|
| - 'FLAGS': []},
|
| - }
|
| - return self._fetchWork(uid)
|
| -
|
| - def testFetchAllUID(self):
|
| - return self.testFetchAll(1)
|
| -
|
| - def testFetchFast(self, uid=0):
|
| - self.function = self.client.fetchFast
|
| - self.messages = '1'
|
| - self.msgObjs = [
|
| - FakeyMessage({}, ('\\X',), '19 Mar 2003 19:22:21 -0500', '', 9, None),
|
| - ]
|
| - self.expected = {
|
| - 0: {'FLAGS': ['\\X'],
|
| - 'INTERNALDATE': '19-Mar-2003 19:22:21 -0500',
|
| - 'RFC822.SIZE': '0'},
|
| - }
|
| - return self._fetchWork(uid)
|
| -
|
| - def testFetchFastUID(self):
|
| - return self.testFetchFast(1)
|
| -
|
| -
|
| -class FetchSearchStoreTestCase(unittest.TestCase, IMAP4HelperMixin):
|
| - implements(imap4.ISearchableMailbox)
|
| -
|
| - def setUp(self):
|
| - self.expected = self.result = None
|
| - self.server_received_query = None
|
| - self.server_received_uid = None
|
| - self.server_received_parts = None
|
| - self.server_received_messages = None
|
| -
|
| - self.server = imap4.IMAP4Server()
|
| - self.server.state = 'select'
|
| - self.server.mbox = self
|
| - self.connected = defer.Deferred()
|
| - self.client = SimpleClient(self.connected)
|
| -
|
| - def search(self, query, uid):
|
| - self.server_received_query = query
|
| - self.server_received_uid = uid
|
| - return self.expected
|
| -
|
| - def addListener(self, *a, **kw):
|
| - pass
|
| - removeListener = addListener
|
| -
|
| - def _searchWork(self, uid):
|
| - def search():
|
| - return self.client.search(self.query, uid=uid)
|
| - def result(R):
|
| - self.result = R
|
| -
|
| - self.connected.addCallback(strip(search)
|
| - ).addCallback(result
|
| - ).addCallback(self._cbStopClient
|
| - ).addErrback(self._ebGeneral)
|
| -
|
| - def check(ignored):
|
| - # Ensure no short-circuiting wierdness is going on
|
| - self.failIf(self.result is self.expected)
|
| -
|
| - self.assertEquals(self.result, self.expected)
|
| - self.assertEquals(self.uid, self.server_received_uid)
|
| - self.assertEquals(
|
| - imap4.parseNestedParens(self.query),
|
| - self.server_received_query
|
| - )
|
| - d = loopback.loopbackTCP(self.server, self.client, noisy=False)
|
| - d.addCallback(check)
|
| - return d
|
| -
|
| - def testSearch(self):
|
| - self.query = imap4.Or(
|
| - imap4.Query(header=('subject', 'substring')),
|
| - imap4.Query(larger=1024, smaller=4096),
|
| - )
|
| - self.expected = [1, 4, 5, 7]
|
| - self.uid = 0
|
| - return self._searchWork(0)
|
| -
|
| - def testUIDSearch(self):
|
| - self.query = imap4.Or(
|
| - imap4.Query(header=('subject', 'substring')),
|
| - imap4.Query(larger=1024, smaller=4096),
|
| - )
|
| - self.uid = 1
|
| - self.expected = [1, 2, 3]
|
| - return self._searchWork(1)
|
| -
|
| - def getUID(self, msg):
|
| - try:
|
| - return self.expected[msg]['UID']
|
| - except (TypeError, IndexError):
|
| - return self.expected[msg-1]
|
| - except KeyError:
|
| - return 42
|
| -
|
| - def fetch(self, messages, uid):
|
| - self.server_received_uid = uid
|
| - self.server_received_messages = str(messages)
|
| - return self.expected
|
| -
|
| - def _fetchWork(self, fetch):
|
| - def result(R):
|
| - self.result = R
|
| -
|
| - self.connected.addCallback(strip(fetch)
|
| - ).addCallback(result
|
| - ).addCallback(self._cbStopClient
|
| - ).addErrback(self._ebGeneral)
|
| -
|
| - def check(ignored):
|
| - # Ensure no short-circuiting wierdness is going on
|
| - self.failIf(self.result is self.expected)
|
| -
|
| - self.parts and self.parts.sort()
|
| - self.server_received_parts and self.server_received_parts.sort()
|
| -
|
| - if self.uid:
|
| - for (k, v) in self.expected.items():
|
| - v['UID'] = str(k)
|
| -
|
| - self.assertEquals(self.result, self.expected)
|
| - self.assertEquals(self.uid, self.server_received_uid)
|
| - self.assertEquals(self.parts, self.server_received_parts)
|
| - self.assertEquals(imap4.parseIdList(self.messages),
|
| - imap4.parseIdList(self.server_received_messages))
|
| -
|
| - d = loopback.loopbackTCP(self.server, self.client, noisy=False)
|
| - d.addCallback(check)
|
| - return d
|
| -
|
| -class FakeMailbox:
|
| - def __init__(self):
|
| - self.args = []
|
| - def addMessage(self, body, flags, date):
|
| - self.args.append((body, flags, date))
|
| - return defer.succeed(None)
|
| -
|
| -class FeaturefulMessage:
|
| - implements(imap4.IMessageFile)
|
| -
|
| - def getFlags(self):
|
| - return 'flags'
|
| -
|
| - def getInternalDate(self):
|
| - return 'internaldate'
|
| -
|
| - def open(self):
|
| - return StringIO("open")
|
| -
|
| -class MessageCopierMailbox:
|
| - implements(imap4.IMessageCopier)
|
| -
|
| - def __init__(self):
|
| - self.msgs = []
|
| -
|
| - def copy(self, msg):
|
| - self.msgs.append(msg)
|
| - return len(self.msgs)
|
| -
|
| -class CopyWorkerTestCase(unittest.TestCase):
|
| - def testFeaturefulMessage(self):
|
| - s = imap4.IMAP4Server()
|
| -
|
| - # Yes. I am grabbing this uber-non-public method to test it.
|
| - # It is complex. It needs to be tested directly!
|
| - # Perhaps it should be refactored, simplified, or split up into
|
| - # not-so-private components, but that is a task for another day.
|
| -
|
| - # Ha ha! Addendum! Soon it will be split up, and this test will
|
| - # be re-written to just use the default adapter for IMailbox to
|
| - # IMessageCopier and call .copy on that adapter.
|
| - f = s._IMAP4Server__cbCopy
|
| -
|
| - m = FakeMailbox()
|
| - d = f([(i, FeaturefulMessage()) for i in range(1, 11)], 'tag', m)
|
| -
|
| - def cbCopy(results):
|
| - for a in m.args:
|
| - self.assertEquals(a[0].read(), "open")
|
| - self.assertEquals(a[1], "flags")
|
| - self.assertEquals(a[2], "internaldate")
|
| -
|
| - for (status, result) in results:
|
| - self.failUnless(status)
|
| - self.assertEquals(result, None)
|
| -
|
| - return d.addCallback(cbCopy)
|
| -
|
| -
|
| - def testUnfeaturefulMessage(self):
|
| - s = imap4.IMAP4Server()
|
| -
|
| - # See above comment
|
| - f = s._IMAP4Server__cbCopy
|
| -
|
| - m = FakeMailbox()
|
| - msgs = [FakeyMessage({'Header-Counter': str(i)}, (), 'Date', 'Body %d' % (i,), i + 10, None) for i in range(1, 11)]
|
| - d = f([im for im in zip(range(1, 11), msgs)], 'tag', m)
|
| -
|
| - def cbCopy(results):
|
| - seen = []
|
| - for a in m.args:
|
| - seen.append(a[0].read())
|
| - self.assertEquals(a[1], ())
|
| - self.assertEquals(a[2], "Date")
|
| -
|
| - seen.sort()
|
| - exp = ["Header-Counter: %d\r\n\r\nBody %d" % (i, i) for i in range(1, 11)]
|
| - exp.sort()
|
| - self.assertEquals(seen, exp)
|
| -
|
| - for (status, result) in results:
|
| - self.failUnless(status)
|
| - self.assertEquals(result, None)
|
| -
|
| - return d.addCallback(cbCopy)
|
| -
|
| - def testMessageCopier(self):
|
| - s = imap4.IMAP4Server()
|
| -
|
| - # See above comment
|
| - f = s._IMAP4Server__cbCopy
|
| -
|
| - m = MessageCopierMailbox()
|
| - msgs = [object() for i in range(1, 11)]
|
| - d = f([im for im in zip(range(1, 11), msgs)], 'tag', m)
|
| -
|
| - def cbCopy(results):
|
| - self.assertEquals(results, zip([1] * 10, range(1, 11)))
|
| - for (orig, new) in zip(msgs, m.msgs):
|
| - self.assertIdentical(orig, new)
|
| -
|
| - return d.addCallback(cbCopy)
|
| -
|
| -
|
| -class TLSTestCase(IMAP4HelperMixin, unittest.TestCase):
|
| - serverCTX = ServerTLSContext and ServerTLSContext()
|
| - clientCTX = ClientTLSContext and ClientTLSContext()
|
| -
|
| - def loopback(self):
|
| - return loopback.loopbackTCP(self.server, self.client, noisy=False)
|
| -
|
| - def testAPileOfThings(self):
|
| - SimpleServer.theAccount.addMailbox('inbox')
|
| - called = []
|
| - def login():
|
| - called.append(None)
|
| - return self.client.login('testuser', 'password-test')
|
| - def list():
|
| - called.append(None)
|
| - return self.client.list('inbox', '%')
|
| - def status():
|
| - called.append(None)
|
| - return self.client.status('inbox', 'UIDNEXT')
|
| - def examine():
|
| - called.append(None)
|
| - return self.client.examine('inbox')
|
| - def logout():
|
| - called.append(None)
|
| - return self.client.logout()
|
| -
|
| - self.client.requireTransportSecurity = True
|
| -
|
| - methods = [login, list, status, examine, logout]
|
| - map(self.connected.addCallback, map(strip, methods))
|
| - self.connected.addCallbacks(self._cbStopClient, self._ebGeneral)
|
| - def check(ignored):
|
| - self.assertEquals(self.server.startedTLS, True)
|
| - self.assertEquals(self.client.startedTLS, True)
|
| - self.assertEquals(len(called), len(methods))
|
| - d = self.loopback()
|
| - d.addCallback(check)
|
| - return d
|
| -
|
| - def testLoginLogin(self):
|
| - self.server.checker.addUser('testuser', 'password-test')
|
| - success = []
|
| - self.client.registerAuthenticator(imap4.LOGINAuthenticator('testuser'))
|
| - self.connected.addCallback(
|
| - lambda _: self.client.authenticate('password-test')
|
| - ).addCallback(
|
| - lambda _: self.client.logout()
|
| - ).addCallback(success.append
|
| - ).addCallback(self._cbStopClient
|
| - ).addErrback(self._ebGeneral)
|
| -
|
| - d = self.loopback()
|
| - d.addCallback(lambda x : self.assertEquals(len(success), 1))
|
| - return d
|
| -
|
| - def testStartTLS(self):
|
| - success = []
|
| - self.connected.addCallback(lambda _: self.client.startTLS())
|
| - self.connected.addCallback(lambda _: self.assertNotEquals(-1, self.client.transport.__class__.__name__.find('TLS')))
|
| - self.connected.addCallback(self._cbStopClient)
|
| - self.connected.addCallback(success.append)
|
| - self.connected.addErrback(self._ebGeneral)
|
| -
|
| - d = self.loopback()
|
| - d.addCallback(lambda x : self.failUnless(success))
|
| - return d
|
| -
|
| - def testFailedStartTLS(self):
|
| - failure = []
|
| - def breakServerTLS(ign):
|
| - self.server.canStartTLS = False
|
| -
|
| - self.connected.addCallback(breakServerTLS)
|
| - self.connected.addCallback(lambda ign: self.client.startTLS())
|
| - self.connected.addErrback(lambda err: failure.append(err.trap(imap4.IMAP4Exception)))
|
| - self.connected.addCallback(self._cbStopClient)
|
| - self.connected.addErrback(self._ebGeneral)
|
| -
|
| - def check(ignored):
|
| - self.failUnless(failure)
|
| - self.assertIdentical(failure[0], imap4.IMAP4Exception)
|
| - return self.loopback().addCallback(check)
|
| -
|
| -
|
| -
|
| -class SlowMailbox(SimpleMailbox):
|
| - howSlow = 2
|
| - callLater = None
|
| - fetchDeferred = None
|
| -
|
| - # Not a very nice implementation of fetch(), but it'll
|
| - # do for the purposes of testing.
|
| - def fetch(self, messages, uid):
|
| - d = defer.Deferred()
|
| - self.callLater(self.howSlow, d.callback, ())
|
| - self.fetchDeferred.callback(None)
|
| - return d
|
| -
|
| -class Timeout(IMAP4HelperMixin, unittest.TestCase):
|
| -
|
| - def test_serverTimeout(self):
|
| - """
|
| - The *client* has a timeout mechanism which will close connections that
|
| - are inactive for a period.
|
| - """
|
| - c = Clock()
|
| - self.server.timeoutTest = True
|
| - self.client.timeout = 5 #seconds
|
| - self.client.callLater = c.callLater
|
| - self.selectedArgs = None
|
| -
|
| - def login():
|
| - d = self.client.login('testuser', 'password-test')
|
| - c.advance(5)
|
| - d.addErrback(timedOut)
|
| - return d
|
| -
|
| - def timedOut(failure):
|
| - self._cbStopClient(None)
|
| - failure.trap(error.TimeoutError)
|
| -
|
| - d = self.connected.addCallback(strip(login))
|
| - d.addErrback(self._ebGeneral)
|
| - return defer.gatherResults([d, self.loopback()])
|
| -
|
| -
|
| - def test_longFetchDoesntTimeout(self):
|
| - """
|
| - The connection timeout does not take effect during fetches.
|
| - """
|
| - c = Clock()
|
| - SlowMailbox.callLater = c.callLater
|
| - SlowMailbox.fetchDeferred = defer.Deferred()
|
| - self.server.callLater = c.callLater
|
| - SimpleServer.theAccount.mailboxFactory = SlowMailbox
|
| - SimpleServer.theAccount.addMailbox('mailbox-test')
|
| -
|
| - self.server.setTimeout(1)
|
| -
|
| - def login():
|
| - return self.client.login('testuser', 'password-test')
|
| - def select():
|
| - self.server.setTimeout(1)
|
| - return self.client.select('mailbox-test')
|
| - def fetch():
|
| - return self.client.fetchUID('1:*')
|
| - def stillConnected():
|
| - self.assertNotEquals(self.server.state, 'timeout')
|
| -
|
| - def cbAdvance(ignored):
|
| - for i in xrange(4):
|
| - c.advance(.5)
|
| -
|
| - SlowMailbox.fetchDeferred.addCallback(cbAdvance)
|
| -
|
| - d1 = self.connected.addCallback(strip(login))
|
| - d1.addCallback(strip(select))
|
| - d1.addCallback(strip(fetch))
|
| - d1.addCallback(strip(stillConnected))
|
| - d1.addCallback(self._cbStopClient)
|
| - d1.addErrback(self._ebGeneral)
|
| - d = defer.gatherResults([d1, self.loopback()])
|
| - return d
|
| -
|
| -
|
| - def test_idleClientDoesDisconnect(self):
|
| - """
|
| - The *server* has a timeout mechanism which will close connections that
|
| - are inactive for a period.
|
| - """
|
| - c = Clock()
|
| - # Hook up our server protocol
|
| - transport = StringTransportWithDisconnection()
|
| - transport.protocol = self.server
|
| - self.server.callLater = c.callLater
|
| - self.server.makeConnection(transport)
|
| -
|
| - # Make sure we can notice when the connection goes away
|
| - lost = []
|
| - connLost = self.server.connectionLost
|
| - self.server.connectionLost = lambda reason: (lost.append(None), connLost(reason))[1]
|
| -
|
| - # 2/3rds of the idle timeout elapses...
|
| - c.pump([0.0] + [self.server.timeOut / 3.0] * 2)
|
| - self.failIf(lost, lost)
|
| -
|
| - # Now some more
|
| - c.pump([0.0, self.server.timeOut / 2.0])
|
| - self.failUnless(lost)
|
| -
|
| -
|
| -
|
| -class Disconnection(unittest.TestCase):
|
| - def testClientDisconnectFailsDeferreds(self):
|
| - c = imap4.IMAP4Client()
|
| - t = StringTransportWithDisconnection()
|
| - c.makeConnection(t)
|
| - d = self.assertFailure(c.login('testuser', 'example.com'), error.ConnectionDone)
|
| - c.connectionLost(error.ConnectionDone("Connection closed"))
|
| - return d
|
| -
|
| -
|
| -
|
| -class SynchronousMailbox(object):
|
| - """
|
| - Trivial, in-memory mailbox implementation which can produce a message
|
| - synchronously.
|
| - """
|
| - def __init__(self, messages):
|
| - self.messages = messages
|
| -
|
| -
|
| - def fetch(self, msgset, uid):
|
| - assert not uid, "Cannot handle uid requests."
|
| - for msg in msgset:
|
| - yield msg, self.messages[msg - 1]
|
| -
|
| -
|
| -
|
| -class StringTransportConsumer(StringTransport):
|
| - producer = None
|
| - streaming = None
|
| -
|
| - def registerProducer(self, producer, streaming):
|
| - self.producer = producer
|
| - self.streaming = streaming
|
| -
|
| -
|
| -
|
| -class Pipelining(unittest.TestCase):
|
| - """
|
| - Tests for various aspects of the IMAP4 server's pipelining support.
|
| - """
|
| - messages = [
|
| - FakeyMessage({}, [], '', '0', None, None),
|
| - FakeyMessage({}, [], '', '1', None, None),
|
| - FakeyMessage({}, [], '', '2', None, None),
|
| - ]
|
| -
|
| - def setUp(self):
|
| - self.iterators = []
|
| -
|
| - self.transport = StringTransportConsumer()
|
| - self.server = imap4.IMAP4Server(None, None, self.iterateInReactor)
|
| - self.server.makeConnection(self.transport)
|
| -
|
| -
|
| - def iterateInReactor(self, iterator):
|
| - d = defer.Deferred()
|
| - self.iterators.append((iterator, d))
|
| - return d
|
| -
|
| -
|
| - def tearDown(self):
|
| - self.server.connectionLost(failure.Failure(error.ConnectionDone()))
|
| -
|
| -
|
| - def test_synchronousFetch(self):
|
| - """
|
| - Test that pipelined FETCH commands which can be responded to
|
| - synchronously are responded to correctly.
|
| - """
|
| - mailbox = SynchronousMailbox(self.messages)
|
| -
|
| - # Skip over authentication and folder selection
|
| - self.server.state = 'select'
|
| - self.server.mbox = mailbox
|
| -
|
| - # Get rid of any greeting junk
|
| - self.transport.clear()
|
| -
|
| - # Here's some pipelined stuff
|
| - self.server.dataReceived(
|
| - '01 FETCH 1 BODY[]\r\n'
|
| - '02 FETCH 2 BODY[]\r\n'
|
| - '03 FETCH 3 BODY[]\r\n')
|
| -
|
| - # Flush anything the server has scheduled to run
|
| - while self.iterators:
|
| - for e in self.iterators[0][0]:
|
| - break
|
| - else:
|
| - self.iterators.pop(0)[1].callback(None)
|
| -
|
| - # The bodies are empty because we aren't simulating a transport
|
| - # exactly correctly (we have StringTransportConsumer but we never
|
| - # call resumeProducing on its producer). It doesn't matter: just
|
| - # make sure the surrounding structure is okay, and that no
|
| - # exceptions occurred.
|
| - self.assertEquals(
|
| - self.transport.value(),
|
| - '* 1 FETCH (BODY[] )\r\n'
|
| - '01 OK FETCH completed\r\n'
|
| - '* 2 FETCH (BODY[] )\r\n'
|
| - '02 OK FETCH completed\r\n'
|
| - '* 3 FETCH (BODY[] )\r\n'
|
| - '03 OK FETCH completed\r\n')
|
| -
|
| -
|
| -
|
| -if ClientTLSContext is None:
|
| - for case in (TLSTestCase,):
|
| - case.skip = "OpenSSL not present"
|
| -elif interfaces.IReactorSSL(reactor, None) is None:
|
| - for case in (TLSTestCase,):
|
| - case.skip = "Reactor doesn't support SSL"
|
|
|