Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(113)

Side by Side Diff: third_party/twisted_8_1/twisted/mail/test/test_imap.py

Issue 12261012: Remove third_party/twisted_8_1 (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/tools/build
Patch Set: Created 7 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 # -*- test-case-name: twisted.mail.test.test_imap -*-
2 # Copyright (c) 2001-2008 Twisted Matrix Laboratories.
3 # See LICENSE for details.
4
5
6 """
7 Test case for twisted.mail.imap4
8 """
9
10 try:
11 from cStringIO import StringIO
12 except ImportError:
13 from StringIO import StringIO
14
15 import os
16 import types
17
18 from zope.interface import implements
19
20 from twisted.mail.imap4 import MessageSet
21 from twisted.mail import imap4
22 from twisted.protocols import loopback
23 from twisted.internet import defer
24 from twisted.internet import error
25 from twisted.internet import reactor
26 from twisted.internet import interfaces
27 from twisted.internet.task import Clock
28 from twisted.trial import unittest
29 from twisted.python import util
30 from twisted.python import failure
31
32 from twisted import cred
33 import twisted.cred.error
34 import twisted.cred.checkers
35 import twisted.cred.credentials
36 import twisted.cred.portal
37
38 from twisted.test.proto_helpers import StringTransport, StringTransportWithDisco nnection
39
40 try:
41 from twisted.test.ssl_helpers import ClientTLSContext, ServerTLSContext
42 except ImportError:
43 ClientTLSContext = ServerTLSContext = None
44
45 def strip(f):
46 return lambda result, f=f: f()
47
48 def sortNest(l):
49 l = l[:]
50 l.sort()
51 for i in range(len(l)):
52 if isinstance(l[i], types.ListType):
53 l[i] = sortNest(l[i])
54 elif isinstance(l[i], types.TupleType):
55 l[i] = tuple(sortNest(list(l[i])))
56 return l
57
58 class IMAP4UTF7TestCase(unittest.TestCase):
59 tests = [
60 [u'Hello world', 'Hello world'],
61 [u'Hello & world', 'Hello &- world'],
62 [u'Hello\xffworld', 'Hello&AP8-world'],
63 [u'\xff\xfe\xfd\xfc', '&AP8A,gD9APw-'],
64 [u'~peter/mail/\u65e5\u672c\u8a9e/\u53f0\u5317',
65 '~peter/mail/&ZeVnLIqe-/&U,BTFw-'], # example from RFC 2060
66 ]
67
68 def test_encodeWithErrors(self):
69 """
70 Specifying an error policy to C{unicode.encode} with the
71 I{imap4-utf-7} codec should produce the same result as not
72 specifying the error policy.
73 """
74 text = u'Hello world'
75 self.assertEqual(
76 text.encode('imap4-utf-7', 'strict'),
77 text.encode('imap4-utf-7'))
78
79
80 def test_decodeWithErrors(self):
81 """
82 Similar to L{test_encodeWithErrors}, but for C{str.decode}.
83 """
84 bytes = 'Hello world'
85 self.assertEqual(
86 bytes.decode('imap4-utf-7', 'strict'),
87 bytes.decode('imap4-utf-7'))
88
89
90 def testEncode(self):
91 for (input, output) in self.tests:
92 self.assertEquals(input.encode('imap4-utf-7'), output)
93
94 def testDecode(self):
95 for (input, output) in self.tests:
96 # XXX - Piece of *crap* 2.1
97 self.assertEquals(input, imap4.decoder(output)[0])
98
99 def testPrintableSingletons(self):
100 # All printables represent themselves
101 for o in range(0x20, 0x26) + range(0x27, 0x7f):
102 self.failUnlessEqual(chr(o), chr(o).encode('imap4-utf-7'))
103 self.failUnlessEqual(chr(o), chr(o).decode('imap4-utf-7'))
104 self.failUnlessEqual('&'.encode('imap4-utf-7'), '&-')
105 self.failUnlessEqual('&-'.decode('imap4-utf-7'), '&')
106
107 class BufferingConsumer:
108 def __init__(self):
109 self.buffer = []
110
111 def write(self, bytes):
112 self.buffer.append(bytes)
113 if self.consumer:
114 self.consumer.resumeProducing()
115
116 def registerProducer(self, consumer, streaming):
117 self.consumer = consumer
118 self.consumer.resumeProducing()
119
120 def unregisterProducer(self):
121 self.consumer = None
122
123 class MessageProducerTestCase(unittest.TestCase):
124 def testSinglePart(self):
125 body = 'This is body text. Rar.'
126 headers = util.OrderedDict()
127 headers['from'] = 'sender@host'
128 headers['to'] = 'recipient@domain'
129 headers['subject'] = 'booga booga boo'
130 headers['content-type'] = 'text/plain'
131
132 msg = FakeyMessage(headers, (), None, body, 123, None )
133
134 c = BufferingConsumer()
135 p = imap4.MessageProducer(msg)
136 d = p.beginProducing(c)
137
138 def cbProduced(result):
139 self.assertIdentical(result, p)
140 self.assertEquals(
141 ''.join(c.buffer),
142
143 '{119}\r\n'
144 'From: sender@host\r\n'
145 'To: recipient@domain\r\n'
146 'Subject: booga booga boo\r\n'
147 'Content-Type: text/plain\r\n'
148 '\r\n'
149 + body)
150 return d.addCallback(cbProduced)
151
152
153 def testSingleMultiPart(self):
154 outerBody = ''
155 innerBody = 'Contained body message text. Squarge.'
156 headers = util.OrderedDict()
157 headers['from'] = 'sender@host'
158 headers['to'] = 'recipient@domain'
159 headers['subject'] = 'booga booga boo'
160 headers['content-type'] = 'multipart/alternative; boundary="xyz"'
161
162 innerHeaders = util.OrderedDict()
163 innerHeaders['subject'] = 'this is subject text'
164 innerHeaders['content-type'] = 'text/plain'
165 msg = FakeyMessage(headers, (), None, outerBody, 123,
166 [FakeyMessage(innerHeaders, (), None, innerBody,
167 None, None)],
168 )
169
170 c = BufferingConsumer()
171 p = imap4.MessageProducer(msg)
172 d = p.beginProducing(c)
173
174 def cbProduced(result):
175 self.failUnlessIdentical(result, p)
176
177 self.assertEquals(
178 ''.join(c.buffer),
179
180 '{239}\r\n'
181 'From: sender@host\r\n'
182 'To: recipient@domain\r\n'
183 'Subject: booga booga boo\r\n'
184 'Content-Type: multipart/alternative; boundary="xyz"\r\n'
185 '\r\n'
186 '\r\n'
187 '--xyz\r\n'
188 'Subject: this is subject text\r\n'
189 'Content-Type: text/plain\r\n'
190 '\r\n'
191 + innerBody
192 + '\r\n--xyz--\r\n')
193
194 return d.addCallback(cbProduced)
195
196
197 def testMultipleMultiPart(self):
198 outerBody = ''
199 innerBody1 = 'Contained body message text. Squarge.'
200 innerBody2 = 'Secondary <i>message</i> text of squarge body.'
201 headers = util.OrderedDict()
202 headers['from'] = 'sender@host'
203 headers['to'] = 'recipient@domain'
204 headers['subject'] = 'booga booga boo'
205 headers['content-type'] = 'multipart/alternative; boundary="xyz"'
206 innerHeaders = util.OrderedDict()
207 innerHeaders['subject'] = 'this is subject text'
208 innerHeaders['content-type'] = 'text/plain'
209 innerHeaders2 = util.OrderedDict()
210 innerHeaders2['subject'] = '<b>this is subject</b>'
211 innerHeaders2['content-type'] = 'text/html'
212 msg = FakeyMessage(headers, (), None, outerBody, 123, [
213 FakeyMessage(innerHeaders, (), None, innerBody1, None, None),
214 FakeyMessage(innerHeaders2, (), None, innerBody2, None, None)
215 ],
216 )
217
218 c = BufferingConsumer()
219 p = imap4.MessageProducer(msg)
220 d = p.beginProducing(c)
221
222 def cbProduced(result):
223 self.failUnlessIdentical(result, p)
224
225 self.assertEquals(
226 ''.join(c.buffer),
227
228 '{354}\r\n'
229 'From: sender@host\r\n'
230 'To: recipient@domain\r\n'
231 'Subject: booga booga boo\r\n'
232 'Content-Type: multipart/alternative; boundary="xyz"\r\n'
233 '\r\n'
234 '\r\n'
235 '--xyz\r\n'
236 'Subject: this is subject text\r\n'
237 'Content-Type: text/plain\r\n'
238 '\r\n'
239 + innerBody1
240 + '\r\n--xyz\r\n'
241 'Subject: <b>this is subject</b>\r\n'
242 'Content-Type: text/html\r\n'
243 '\r\n'
244 + innerBody2
245 + '\r\n--xyz--\r\n')
246 return d.addCallback(cbProduced)
247
248
249 class IMAP4HelperTestCase(unittest.TestCase):
250 def testFileProducer(self):
251 b = (('x' * 1) + ('y' * 1) + ('z' * 1)) * 10
252 c = BufferingConsumer()
253 f = StringIO(b)
254 p = imap4.FileProducer(f)
255 d = p.beginProducing(c)
256
257 def cbProduced(result):
258 self.failUnlessIdentical(result, p)
259 self.assertEquals(
260 ('{%d}\r\n' % len(b))+ b,
261 ''.join(c.buffer))
262 return d.addCallback(cbProduced)
263
264 def testWildcard(self):
265 cases = [
266 ['foo/%gum/bar',
267 ['foo/bar', 'oo/lalagum/bar', 'foo/gumx/bar', 'foo/gum/baz'],
268 ['foo/xgum/bar', 'foo/gum/bar'],
269 ], ['foo/x%x/bar',
270 ['foo', 'bar', 'fuz fuz fuz', 'foo/*/bar', 'foo/xyz/bar', 'foo/x x/baz'],
271 ['foo/xyx/bar', 'foo/xx/bar', 'foo/xxxxxxxxxxxxxx/bar'],
272 ], ['foo/xyz*abc/bar',
273 ['foo/xyz/bar', 'foo/abc/bar', 'foo/xyzab/cbar', 'foo/xyza/bcbar '],
274 ['foo/xyzabc/bar', 'foo/xyz/abc/bar', 'foo/xyz/123/abc/bar'],
275 ]
276 ]
277
278 for (wildcard, fail, succeed) in cases:
279 wildcard = imap4.wildcardToRegexp(wildcard, '/')
280 for x in fail:
281 self.failIf(wildcard.match(x))
282 for x in succeed:
283 self.failUnless(wildcard.match(x))
284
285 def testWildcardNoDelim(self):
286 cases = [
287 ['foo/%gum/bar',
288 ['foo/bar', 'oo/lalagum/bar', 'foo/gumx/bar', 'foo/gum/baz'],
289 ['foo/xgum/bar', 'foo/gum/bar', 'foo/x/gum/bar'],
290 ], ['foo/x%x/bar',
291 ['foo', 'bar', 'fuz fuz fuz', 'foo/*/bar', 'foo/xyz/bar', 'foo/x x/baz'],
292 ['foo/xyx/bar', 'foo/xx/bar', 'foo/xxxxxxxxxxxxxx/bar', 'foo/x/x /bar'],
293 ], ['foo/xyz*abc/bar',
294 ['foo/xyz/bar', 'foo/abc/bar', 'foo/xyzab/cbar', 'foo/xyza/bcbar '],
295 ['foo/xyzabc/bar', 'foo/xyz/abc/bar', 'foo/xyz/123/abc/bar'],
296 ]
297 ]
298
299 for (wildcard, fail, succeed) in cases:
300 wildcard = imap4.wildcardToRegexp(wildcard, None)
301 for x in fail:
302 self.failIf(wildcard.match(x), x)
303 for x in succeed:
304 self.failUnless(wildcard.match(x), x)
305
306 def testHeaderFormatter(self):
307 cases = [
308 ({'Header1': 'Value1', 'Header2': 'Value2'}, 'Header2: Value2\r\nHea der1: Value1\r\n'),
309 ]
310
311 for (input, output) in cases:
312 self.assertEquals(imap4._formatHeaders(input), output)
313
314 def testMessageSet(self):
315 m1 = MessageSet()
316 m2 = MessageSet()
317
318 self.assertEquals(m1, m2)
319
320 m1 = m1 + (1, 3)
321 self.assertEquals(len(m1), 3)
322 self.assertEquals(list(m1), [1, 2, 3])
323
324 m2 = m2 + (1, 3)
325 self.assertEquals(m1, m2)
326 self.assertEquals(list(m1 + m2), [1, 2, 3])
327
328 def testQuotedSplitter(self):
329 cases = [
330 '''Hello World''',
331 '''Hello "World!"''',
332 '''World "Hello" "How are you?"''',
333 '''"Hello world" How "are you?"''',
334 '''foo bar "baz buz" NIL''',
335 '''foo bar "baz buz" "NIL"''',
336 '''foo NIL "baz buz" bar''',
337 '''foo "NIL" "baz buz" bar''',
338 '''"NIL" bar "baz buz" foo''',
339 ]
340
341 answers = [
342 ['Hello', 'World'],
343 ['Hello', 'World!'],
344 ['World', 'Hello', 'How are you?'],
345 ['Hello world', 'How', 'are you?'],
346 ['foo', 'bar', 'baz buz', None],
347 ['foo', 'bar', 'baz buz', 'NIL'],
348 ['foo', None, 'baz buz', 'bar'],
349 ['foo', 'NIL', 'baz buz', 'bar'],
350 ['NIL', 'bar', 'baz buz', 'foo'],
351 ]
352
353 errors = [
354 '"mismatched quote',
355 'mismatched quote"',
356 'mismatched"quote',
357 '"oops here is" another"',
358 ]
359
360 for s in errors:
361 self.assertRaises(imap4.MismatchedQuoting, imap4.splitQuoted, s)
362
363 for (case, expected) in zip(cases, answers):
364 self.assertEquals(imap4.splitQuoted(case), expected)
365
366
367 def testStringCollapser(self):
368 cases = [
369 ['a', 'b', 'c', 'd', 'e'],
370 ['a', ' ', '"', 'b', 'c', ' ', '"', ' ', 'd', 'e'],
371 [['a', 'b', 'c'], 'd', 'e'],
372 ['a', ['b', 'c', 'd'], 'e'],
373 ['a', 'b', ['c', 'd', 'e']],
374 ['"', 'a', ' ', '"', ['b', 'c', 'd'], '"', ' ', 'e', '"'],
375 ['a', ['"', ' ', 'b', 'c', ' ', ' ', '"'], 'd', 'e'],
376 ]
377
378 answers = [
379 ['abcde'],
380 ['a', 'bc ', 'de'],
381 [['abc'], 'de'],
382 ['a', ['bcd'], 'e'],
383 ['ab', ['cde']],
384 ['a ', ['bcd'], ' e'],
385 ['a', [' bc '], 'de'],
386 ]
387
388 for (case, expected) in zip(cases, answers):
389 self.assertEquals(imap4.collapseStrings(case), expected)
390
391 def testParenParser(self):
392 s = '\r\n'.join(['xx'] * 4)
393 cases = [
394 '(BODY.PEEK[HEADER.FIELDS.NOT (subject bcc cc)] {%d}\r\n%s)' % (len( s), s,),
395
396 # '(FLAGS (\Seen) INTERNALDATE "17-Jul-1996 02:44:25 -0700" '
397 # 'RFC822.SIZE 4286 ENVELOPE ("Wed, 17 Jul 1996 02:23:25 -0700 (PDT)" '
398 # '"IMAP4rev1 WG mtg summary and minutes" '
399 # '(("Terry Gray" NIL "gray" "cac.washington.edu")) '
400 # '(("Terry Gray" NIL "gray" "cac.washington.edu")) '
401 # '(("Terry Gray" NIL "gray" "cac.washington.edu")) '
402 # '((NIL NIL "imap" "cac.washington.edu")) '
403 # '((NIL NIL "minutes" "CNRI.Reston.VA.US") '
404 # '("John Klensin" NIL "KLENSIN" "INFOODS.MIT.EDU")) NIL NIL '
405 # '"<B27397-0100000@cac.washington.edu>") '
406 # 'BODY ("TEXT" "PLAIN" ("CHARSET" "US-ASCII") NIL NIL "7BIT" 3028 92 ))',
407
408 '(FLAGS (\Seen) INTERNALDATE "17-Jul-1996 02:44:25 -0700" '
409 'RFC822.SIZE 4286 ENVELOPE ("Wed, 17 Jul 1996 02:23:25 -0700 (PDT)" '
410 '"IMAP4rev1 WG mtg summary and minutes" '
411 '(("Terry Gray" NIL gray cac.washington.edu)) '
412 '(("Terry Gray" NIL gray cac.washington.edu)) '
413 '(("Terry Gray" NIL gray cac.washington.edu)) '
414 '((NIL NIL imap cac.washington.edu)) '
415 '((NIL NIL minutes CNRI.Reston.VA.US) '
416 '("John Klensin" NIL KLENSIN INFOODS.MIT.EDU)) NIL NIL '
417 '<B27397-0100000@cac.washington.edu>) '
418 'BODY (TEXT PLAIN (CHARSET US-ASCII) NIL NIL 7BIT 3028 92))',
419 ]
420
421 answers = [
422 ['BODY.PEEK', ['HEADER.FIELDS.NOT', ['subject', 'bcc', 'cc']], s],
423
424 ['FLAGS', [r'\Seen'], 'INTERNALDATE',
425 '17-Jul-1996 02:44:25 -0700', 'RFC822.SIZE', '4286', 'ENVELOPE',
426 ['Wed, 17 Jul 1996 02:23:25 -0700 (PDT)',
427 'IMAP4rev1 WG mtg summary and minutes', [["Terry Gray", None,
428 "gray", "cac.washington.edu"]], [["Terry Gray", None,
429 "gray", "cac.washington.edu"]], [["Terry Gray", None,
430 "gray", "cac.washington.edu"]], [[None, None, "imap",
431 "cac.washington.edu"]], [[None, None, "minutes",
432 "CNRI.Reston.VA.US"], ["John Klensin", None, "KLENSIN",
433 "INFOODS.MIT.EDU"]], None, None,
434 "<B27397-0100000@cac.washington.edu>"], "BODY", ["TEXT", "PLAIN",
435 ["CHARSET", "US-ASCII"], None, None, "7BIT", "3028", "92"]],
436 ]
437
438 for (case, expected) in zip(cases, answers):
439 self.assertEquals(imap4.parseNestedParens(case), [expected])
440
441 # XXX This code used to work, but changes occurred within the
442 # imap4.py module which made it no longer necessary for *all* of it
443 # to work. In particular, only the part that makes
444 # 'BODY.PEEK[HEADER.FIELDS.NOT (Subject Bcc Cc)]' come out correctly
445 # no longer needs to work. So, I am loathe to delete the entire
446 # section of the test. --exarkun
447 #
448
449 # for (case, expected) in zip(answers, cases):
450 # self.assertEquals('(' + imap4.collapseNestedLists(case) + ')', expe cted)
451
452 def testFetchParserSimple(self):
453 cases = [
454 ['ENVELOPE', 'Envelope'],
455 ['FLAGS', 'Flags'],
456 ['INTERNALDATE', 'InternalDate'],
457 ['RFC822.HEADER', 'RFC822Header'],
458 ['RFC822.SIZE', 'RFC822Size'],
459 ['RFC822.TEXT', 'RFC822Text'],
460 ['RFC822', 'RFC822'],
461 ['UID', 'UID'],
462 ['BODYSTRUCTURE', 'BodyStructure'],
463 ]
464
465 for (inp, outp) in cases:
466 p = imap4._FetchParser()
467 p.parseString(inp)
468 self.assertEquals(len(p.result), 1)
469 self.failUnless(isinstance(p.result[0], getattr(p, outp)))
470
471 def testFetchParserMacros(self):
472 cases = [
473 ['ALL', (4, ['flags', 'internaldate', 'rfc822.size', 'envelope'])],
474 ['FULL', (5, ['flags', 'internaldate', 'rfc822.size', 'envelope', 'b ody'])],
475 ['FAST', (3, ['flags', 'internaldate', 'rfc822.size'])],
476 ]
477
478 for (inp, outp) in cases:
479 p = imap4._FetchParser()
480 p.parseString(inp)
481 self.assertEquals(len(p.result), outp[0])
482 p = [str(p).lower() for p in p.result]
483 p.sort()
484 outp[1].sort()
485 self.assertEquals(p, outp[1])
486
487 def testFetchParserBody(self):
488 P = imap4._FetchParser
489
490 p = P()
491 p.parseString('BODY')
492 self.assertEquals(len(p.result), 1)
493 self.failUnless(isinstance(p.result[0], p.Body))
494 self.assertEquals(p.result[0].peek, False)
495 self.assertEquals(p.result[0].header, None)
496 self.assertEquals(str(p.result[0]), 'BODY')
497
498 p = P()
499 p.parseString('BODY.PEEK')
500 self.assertEquals(len(p.result), 1)
501 self.failUnless(isinstance(p.result[0], p.Body))
502 self.assertEquals(p.result[0].peek, True)
503 self.assertEquals(str(p.result[0]), 'BODY')
504
505 p = P()
506 p.parseString('BODY[]')
507 self.assertEquals(len(p.result), 1)
508 self.failUnless(isinstance(p.result[0], p.Body))
509 self.assertEquals(p.result[0].empty, True)
510 self.assertEquals(str(p.result[0]), 'BODY[]')
511
512 p = P()
513 p.parseString('BODY[HEADER]')
514 self.assertEquals(len(p.result), 1)
515 self.failUnless(isinstance(p.result[0], p.Body))
516 self.assertEquals(p.result[0].peek, False)
517 self.failUnless(isinstance(p.result[0].header, p.Header))
518 self.assertEquals(p.result[0].header.negate, True)
519 self.assertEquals(p.result[0].header.fields, ())
520 self.assertEquals(p.result[0].empty, False)
521 self.assertEquals(str(p.result[0]), 'BODY[HEADER]')
522
523 p = P()
524 p.parseString('BODY.PEEK[HEADER]')
525 self.assertEquals(len(p.result), 1)
526 self.failUnless(isinstance(p.result[0], p.Body))
527 self.assertEquals(p.result[0].peek, True)
528 self.failUnless(isinstance(p.result[0].header, p.Header))
529 self.assertEquals(p.result[0].header.negate, True)
530 self.assertEquals(p.result[0].header.fields, ())
531 self.assertEquals(p.result[0].empty, False)
532 self.assertEquals(str(p.result[0]), 'BODY[HEADER]')
533
534 p = P()
535 p.parseString('BODY[HEADER.FIELDS (Subject Cc Message-Id)]')
536 self.assertEquals(len(p.result), 1)
537 self.failUnless(isinstance(p.result[0], p.Body))
538 self.assertEquals(p.result[0].peek, False)
539 self.failUnless(isinstance(p.result[0].header, p.Header))
540 self.assertEquals(p.result[0].header.negate, False)
541 self.assertEquals(p.result[0].header.fields, ['SUBJECT', 'CC', 'MESSAGE- ID'])
542 self.assertEquals(p.result[0].empty, False)
543 self.assertEquals(str(p.result[0]), 'BODY[HEADER.FIELDS (Subject Cc Mess age-Id)]')
544
545 p = P()
546 p.parseString('BODY.PEEK[HEADER.FIELDS (Subject Cc Message-Id)]')
547 self.assertEquals(len(p.result), 1)
548 self.failUnless(isinstance(p.result[0], p.Body))
549 self.assertEquals(p.result[0].peek, True)
550 self.failUnless(isinstance(p.result[0].header, p.Header))
551 self.assertEquals(p.result[0].header.negate, False)
552 self.assertEquals(p.result[0].header.fields, ['SUBJECT', 'CC', 'MESSAGE- ID'])
553 self.assertEquals(p.result[0].empty, False)
554 self.assertEquals(str(p.result[0]), 'BODY[HEADER.FIELDS (Subject Cc Mess age-Id)]')
555
556 p = P()
557 p.parseString('BODY.PEEK[HEADER.FIELDS.NOT (Subject Cc Message-Id)]')
558 self.assertEquals(len(p.result), 1)
559 self.failUnless(isinstance(p.result[0], p.Body))
560 self.assertEquals(p.result[0].peek, True)
561 self.failUnless(isinstance(p.result[0].header, p.Header))
562 self.assertEquals(p.result[0].header.negate, True)
563 self.assertEquals(p.result[0].header.fields, ['SUBJECT', 'CC', 'MESSAGE- ID'])
564 self.assertEquals(p.result[0].empty, False)
565 self.assertEquals(str(p.result[0]), 'BODY[HEADER.FIELDS.NOT (Subject Cc Message-Id)]')
566
567 p = P()
568 p.parseString('BODY[1.MIME]<10.50>')
569 self.assertEquals(len(p.result), 1)
570 self.failUnless(isinstance(p.result[0], p.Body))
571 self.assertEquals(p.result[0].peek, False)
572 self.failUnless(isinstance(p.result[0].mime, p.MIME))
573 self.assertEquals(p.result[0].part, (0,))
574 self.assertEquals(p.result[0].partialBegin, 10)
575 self.assertEquals(p.result[0].partialLength, 50)
576 self.assertEquals(p.result[0].empty, False)
577 self.assertEquals(str(p.result[0]), 'BODY[1.MIME]<10.50>')
578
579 p = P()
580 p.parseString('BODY.PEEK[1.3.9.11.HEADER.FIELDS.NOT (Message-Id Date)]<1 03.69>')
581 self.assertEquals(len(p.result), 1)
582 self.failUnless(isinstance(p.result[0], p.Body))
583 self.assertEquals(p.result[0].peek, True)
584 self.failUnless(isinstance(p.result[0].header, p.Header))
585 self.assertEquals(p.result[0].part, (0, 2, 8, 10))
586 self.assertEquals(p.result[0].header.fields, ['MESSAGE-ID', 'DATE'])
587 self.assertEquals(p.result[0].partialBegin, 103)
588 self.assertEquals(p.result[0].partialLength, 69)
589 self.assertEquals(p.result[0].empty, False)
590 self.assertEquals(str(p.result[0]), 'BODY[1.3.9.11.HEADER.FIELDS.NOT (Me ssage-Id Date)]<103.69>')
591
592
593 def testFiles(self):
594 inputStructure = [
595 'foo', 'bar', 'baz', StringIO('this is a file\r\n'), 'buz'
596 ]
597
598 output = '"foo" "bar" "baz" {16}\r\nthis is a file\r\n "buz"'
599
600 self.assertEquals(imap4.collapseNestedLists(inputStructure), output)
601
602 def testQuoteAvoider(self):
603 input = [
604 'foo', imap4.DontQuoteMe('bar'), "baz", StringIO('this is a file\r\n '),
605 imap4.DontQuoteMe('buz'), ""
606 ]
607
608 output = '"foo" bar "baz" {16}\r\nthis is a file\r\n buz ""'
609
610 self.assertEquals(imap4.collapseNestedLists(input), output)
611
612 def testLiterals(self):
613 cases = [
614 ('({10}\r\n0123456789)', [['0123456789']]),
615 ]
616
617 for (case, expected) in cases:
618 self.assertEquals(imap4.parseNestedParens(case), expected)
619
620 def testQueryBuilder(self):
621 inputs = [
622 imap4.Query(flagged=1),
623 imap4.Query(sorted=1, unflagged=1, deleted=1),
624 imap4.Or(imap4.Query(flagged=1), imap4.Query(deleted=1)),
625 imap4.Query(before='today'),
626 imap4.Or(
627 imap4.Query(deleted=1),
628 imap4.Query(unseen=1),
629 imap4.Query(new=1)
630 ),
631 imap4.Or(
632 imap4.Not(
633 imap4.Or(
634 imap4.Query(sorted=1, since='yesterday', smaller=1000),
635 imap4.Query(sorted=1, before='tuesday', larger=10000),
636 imap4.Query(sorted=1, unseen=1, deleted=1, before='today '),
637 imap4.Not(
638 imap4.Query(subject='spam')
639 ),
640 ),
641 ),
642 imap4.Not(
643 imap4.Query(uid='1:5')
644 ),
645 )
646 ]
647
648 outputs = [
649 'FLAGGED',
650 '(DELETED UNFLAGGED)',
651 '(OR FLAGGED DELETED)',
652 '(BEFORE "today")',
653 '(OR DELETED (OR UNSEEN NEW))',
654 '(OR (NOT (OR (SINCE "yesterday" SMALLER 1000) ' # Continuing
655 '(OR (BEFORE "tuesday" LARGER 10000) (OR (BEFORE ' # Some more
656 '"today" DELETED UNSEEN) (NOT (SUBJECT "spam")))))) ' # And more
657 '(NOT (UID 1:5)))',
658 ]
659
660 for (query, expected) in zip(inputs, outputs):
661 self.assertEquals(query, expected)
662
663 def testIdListParser(self):
664 inputs = [
665 '1:*',
666 '5:*',
667 '1:2,5:*',
668 '1',
669 '1,2',
670 '1,3,5',
671 '1:10',
672 '1:10,11',
673 '1:5,10:20',
674 '1,5:10',
675 '1,5:10,15:20',
676 '1:10,15,20:25',
677 ]
678
679 outputs = [
680 MessageSet(1, None),
681 MessageSet(5, None),
682 MessageSet(5, None) + MessageSet(1, 2),
683 MessageSet(1),
684 MessageSet(1, 2),
685 MessageSet(1) + MessageSet(3) + MessageSet(5),
686 MessageSet(1, 10),
687 MessageSet(1, 11),
688 MessageSet(1, 5) + MessageSet(10, 20),
689 MessageSet(1) + MessageSet(5, 10),
690 MessageSet(1) + MessageSet(5, 10) + MessageSet(15, 20),
691 MessageSet(1, 10) + MessageSet(15) + MessageSet(20, 25),
692 ]
693
694 lengths = [
695 None, None, None,
696 1, 2, 3, 10, 11, 16, 7, 13, 17,
697 ]
698
699 for (input, expected) in zip(inputs, outputs):
700 self.assertEquals(imap4.parseIdList(input), expected)
701
702 for (input, expected) in zip(inputs, lengths):
703 try:
704 L = len(imap4.parseIdList(input))
705 except TypeError:
706 L = None
707 self.assertEquals(L, expected,
708 "len(%r) = %r != %r" % (input, L, expected))
709
710 class SimpleMailbox:
711 implements(imap4.IMailboxInfo, imap4.IMailbox, imap4.ICloseableMailbox)
712
713 flags = ('\\Flag1', 'Flag2', '\\AnotherSysFlag', 'LastFlag')
714 messages = []
715 mUID = 0
716 rw = 1
717 closed = False
718
719 def __init__(self):
720 self.listeners = []
721 self.addListener = self.listeners.append
722 self.removeListener = self.listeners.remove
723
724 def getFlags(self):
725 return self.flags
726
727 def getUIDValidity(self):
728 return 42
729
730 def getUIDNext(self):
731 return len(self.messages) + 1
732
733 def getMessageCount(self):
734 return 9
735
736 def getRecentCount(self):
737 return 3
738
739 def getUnseenCount(self):
740 return 4
741
742 def isWriteable(self):
743 return self.rw
744
745 def destroy(self):
746 pass
747
748 def getHierarchicalDelimiter(self):
749 return '/'
750
751 def requestStatus(self, names):
752 r = {}
753 if 'MESSAGES' in names:
754 r['MESSAGES'] = self.getMessageCount()
755 if 'RECENT' in names:
756 r['RECENT'] = self.getRecentCount()
757 if 'UIDNEXT' in names:
758 r['UIDNEXT'] = self.getMessageCount() + 1
759 if 'UIDVALIDITY' in names:
760 r['UIDVALIDITY'] = self.getUID()
761 if 'UNSEEN' in names:
762 r['UNSEEN'] = self.getUnseenCount()
763 return defer.succeed(r)
764
765 def addMessage(self, message, flags, date = None):
766 self.messages.append((message, flags, date, self.mUID))
767 self.mUID += 1
768 return defer.succeed(None)
769
770 def expunge(self):
771 delete = []
772 for i in self.messages:
773 if '\\Deleted' in i[1]:
774 delete.append(i)
775 for i in delete:
776 self.messages.remove(i)
777 return [i[3] for i in delete]
778
779 def close(self):
780 self.closed = True
781
782 class Account(imap4.MemoryAccount):
783 mailboxFactory = SimpleMailbox
784 def _emptyMailbox(self, name, id):
785 return self.mailboxFactory()
786
787 def select(self, name, rw=1):
788 mbox = imap4.MemoryAccount.select(self, name)
789 if mbox is not None:
790 mbox.rw = rw
791 return mbox
792
793 class SimpleServer(imap4.IMAP4Server):
794 def __init__(self, *args, **kw):
795 imap4.IMAP4Server.__init__(self, *args, **kw)
796 realm = TestRealm()
797 realm.theAccount = Account('testuser')
798 portal = cred.portal.Portal(realm)
799 c = cred.checkers.InMemoryUsernamePasswordDatabaseDontUse()
800 self.checker = c
801 self.portal = portal
802 portal.registerChecker(c)
803 self.timeoutTest = False
804
805 def lineReceived(self, line):
806 if self.timeoutTest:
807 #Do not send a respones
808 return
809
810 imap4.IMAP4Server.lineReceived(self, line)
811
812 _username = 'testuser'
813 _password = 'password-test'
814 def authenticateLogin(self, username, password):
815 if username == self._username and password == self._password:
816 return imap4.IAccount, self.theAccount, lambda: None
817 raise cred.error.UnauthorizedLogin()
818
819
820 class SimpleClient(imap4.IMAP4Client):
821 def __init__(self, deferred, contextFactory = None):
822 imap4.IMAP4Client.__init__(self, contextFactory)
823 self.deferred = deferred
824 self.events = []
825
826 def serverGreeting(self, caps):
827 self.deferred.callback(None)
828
829 def modeChanged(self, writeable):
830 self.events.append(['modeChanged', writeable])
831 self.transport.loseConnection()
832
833 def flagsChanged(self, newFlags):
834 self.events.append(['flagsChanged', newFlags])
835 self.transport.loseConnection()
836
837 def newMessages(self, exists, recent):
838 self.events.append(['newMessages', exists, recent])
839 self.transport.loseConnection()
840
841 def fetchBodyParts(self, message, parts):
842 """Fetch some parts of the body.
843
844 @param message: message with parts to fetch
845 @type message: C{str}
846 @param parts: a list of int/str
847 @type parts: C{list}
848 """
849 cmd = "%s (BODY[%s]" % (message, parts[0])
850 for p in parts[1:]:
851 cmd += " BODY[%s]" % p
852 cmd += ")"
853 d = self.sendCommand(imap4.Command("FETCH", cmd,
854 wantResponse=("FETCH",)))
855 d.addCallback(self.__cb_fetchBodyParts)
856 return d
857
858 def __cb_fetchBodyParts(self, (lines, last)):
859 info = {}
860 for line in lines:
861 parts = line.split(None, 2)
862 if len(parts) == 3:
863 if parts[1] == "FETCH":
864 try:
865 mail_id = int(parts[0])
866 except ValueError:
867 raise imap4.IllegalServerResponse, line
868 else:
869 body_parts = imap4.parseNestedParens(parts[2])[0]
870 dict_parts = {}
871 for i in range(len(body_parts)/3):
872 dict_parts[body_parts[3*i+1][0]] = body_parts[3*i+2]
873 info[mail_id] = dict_parts
874 return info
875
876 class IMAP4HelperMixin:
877 serverCTX = None
878 clientCTX = None
879
880 def setUp(self):
881 d = defer.Deferred()
882 self.server = SimpleServer(contextFactory=self.serverCTX)
883 self.client = SimpleClient(d, contextFactory=self.clientCTX)
884 self.connected = d
885
886 SimpleMailbox.messages = []
887 theAccount = Account('testuser')
888 theAccount.mboxType = SimpleMailbox
889 SimpleServer.theAccount = theAccount
890
891 def tearDown(self):
892 del self.server
893 del self.client
894 del self.connected
895
896 def _cbStopClient(self, ignore):
897 self.client.transport.loseConnection()
898
899 def _ebGeneral(self, failure):
900 self.client.transport.loseConnection()
901 self.server.transport.loseConnection()
902 failure.printTraceback(open('failure.log', 'w'))
903 failure.printTraceback()
904 raise failure.value
905
906 def loopback(self):
907 return loopback.loopbackAsync(self.server, self.client)
908
909 class IMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):
910 def testCapability(self):
911 caps = {}
912 def getCaps():
913 def gotCaps(c):
914 caps.update(c)
915 self.server.transport.loseConnection()
916 return self.client.getCapabilities().addCallback(gotCaps)
917 d1 = self.connected.addCallback(strip(getCaps)).addErrback(self._ebGener al)
918 d = defer.gatherResults([self.loopback(), d1])
919 expected = {'IMAP4rev1': None, 'NAMESPACE': None, 'IDLE': None}
920 return d.addCallback(lambda _: self.assertEquals(expected, caps))
921
922 def testCapabilityWithAuth(self):
923 caps = {}
924 self.server.challengers['CRAM-MD5'] = cred.credentials.CramMD5Credential s
925 def getCaps():
926 def gotCaps(c):
927 caps.update(c)
928 self.server.transport.loseConnection()
929 return self.client.getCapabilities().addCallback(gotCaps)
930 d1 = self.connected.addCallback(strip(getCaps)).addErrback(self._ebGener al)
931 d = defer.gatherResults([self.loopback(), d1])
932
933 expCap = {'IMAP4rev1': None, 'NAMESPACE': None,
934 'IDLE': None, 'AUTH': ['CRAM-MD5']}
935
936 return d.addCallback(lambda _: self.assertEquals(expCap, caps))
937
938 def testLogout(self):
939 self.loggedOut = 0
940 def logout():
941 def setLoggedOut():
942 self.loggedOut = 1
943 self.client.logout().addCallback(strip(setLoggedOut))
944 self.connected.addCallback(strip(logout)).addErrback(self._ebGeneral)
945 d = self.loopback()
946 return d.addCallback(lambda _: self.assertEquals(self.loggedOut, 1))
947
948 def testNoop(self):
949 self.responses = None
950 def noop():
951 def setResponses(responses):
952 self.responses = responses
953 self.server.transport.loseConnection()
954 self.client.noop().addCallback(setResponses)
955 self.connected.addCallback(strip(noop)).addErrback(self._ebGeneral)
956 d = self.loopback()
957 return d.addCallback(lambda _: self.assertEquals(self.responses, []))
958
959 def testLogin(self):
960 def login():
961 d = self.client.login('testuser', 'password-test')
962 d.addCallback(self._cbStopClient)
963 d1 = self.connected.addCallback(strip(login)).addErrback(self._ebGeneral )
964 d = defer.gatherResults([d1, self.loopback()])
965 return d.addCallback(self._cbTestLogin)
966
967 def _cbTestLogin(self, ignored):
968 self.assertEquals(self.server.account, SimpleServer.theAccount)
969 self.assertEquals(self.server.state, 'auth')
970
971 def testFailedLogin(self):
972 def login():
973 d = self.client.login('testuser', 'wrong-password')
974 d.addBoth(self._cbStopClient)
975
976 d1 = self.connected.addCallback(strip(login)).addErrback(self._ebGeneral )
977 d2 = self.loopback()
978 d = defer.gatherResults([d1, d2])
979 return d.addCallback(self._cbTestFailedLogin)
980
981 def _cbTestFailedLogin(self, ignored):
982 self.assertEquals(self.server.account, None)
983 self.assertEquals(self.server.state, 'unauth')
984
985
986 def testLoginRequiringQuoting(self):
987 self.server._username = '{test}user'
988 self.server._password = '{test}password'
989
990 def login():
991 d = self.client.login('{test}user', '{test}password')
992 d.addBoth(self._cbStopClient)
993
994 d1 = self.connected.addCallback(strip(login)).addErrback(self._ebGeneral )
995 d = defer.gatherResults([self.loopback(), d1])
996 return d.addCallback(self._cbTestLoginRequiringQuoting)
997
998 def _cbTestLoginRequiringQuoting(self, ignored):
999 self.assertEquals(self.server.account, SimpleServer.theAccount)
1000 self.assertEquals(self.server.state, 'auth')
1001
1002
1003 def testNamespace(self):
1004 self.namespaceArgs = None
1005 def login():
1006 return self.client.login('testuser', 'password-test')
1007 def namespace():
1008 def gotNamespace(args):
1009 self.namespaceArgs = args
1010 self._cbStopClient(None)
1011 return self.client.namespace().addCallback(gotNamespace)
1012
1013 d1 = self.connected.addCallback(strip(login))
1014 d1.addCallback(strip(namespace))
1015 d1.addErrback(self._ebGeneral)
1016 d2 = self.loopback()
1017 d = defer.gatherResults([d1, d2])
1018 d.addCallback(lambda _: self.assertEquals(self.namespaceArgs,
1019 [[['', '/']], [], []]))
1020 return d
1021
1022 def testSelect(self):
1023 SimpleServer.theAccount.addMailbox('test-mailbox')
1024 self.selectedArgs = None
1025 def login():
1026 return self.client.login('testuser', 'password-test')
1027 def select():
1028 def selected(args):
1029 self.selectedArgs = args
1030 self._cbStopClient(None)
1031 d = self.client.select('test-mailbox')
1032 d.addCallback(selected)
1033 return d
1034
1035 d1 = self.connected.addCallback(strip(login))
1036 d1.addCallback(strip(select))
1037 d1.addErrback(self._ebGeneral)
1038 d2 = self.loopback()
1039 return defer.gatherResults([d1, d2]).addCallback(self._cbTestSelect)
1040
1041 def _cbTestSelect(self, ignored):
1042 mbox = SimpleServer.theAccount.mailboxes['TEST-MAILBOX']
1043 self.assertEquals(self.server.mbox, mbox)
1044 self.assertEquals(self.selectedArgs, {
1045 'EXISTS': 9, 'RECENT': 3, 'UIDVALIDITY': 42,
1046 'FLAGS': ('\\Flag1', 'Flag2', '\\AnotherSysFlag', 'LastFlag'),
1047 'READ-WRITE': 1
1048 })
1049
1050 def testExamine(self):
1051 SimpleServer.theAccount.addMailbox('test-mailbox')
1052 self.examinedArgs = None
1053 def login():
1054 return self.client.login('testuser', 'password-test')
1055 def examine():
1056 def examined(args):
1057 self.examinedArgs = args
1058 self._cbStopClient(None)
1059 d = self.client.examine('test-mailbox')
1060 d.addCallback(examined)
1061 return d
1062
1063 d1 = self.connected.addCallback(strip(login))
1064 d1.addCallback(strip(examine))
1065 d1.addErrback(self._ebGeneral)
1066 d2 = self.loopback()
1067 d = defer.gatherResults([d1, d2])
1068 return d.addCallback(self._cbTestExamine)
1069
1070 def _cbTestExamine(self, ignored):
1071 mbox = SimpleServer.theAccount.mailboxes['TEST-MAILBOX']
1072 self.assertEquals(self.server.mbox, mbox)
1073 self.assertEquals(self.examinedArgs, {
1074 'EXISTS': 9, 'RECENT': 3, 'UIDVALIDITY': 42,
1075 'FLAGS': ('\\Flag1', 'Flag2', '\\AnotherSysFlag', 'LastFlag'),
1076 'READ-WRITE': 0
1077 })
1078
1079 def testCreate(self):
1080 succeed = ('testbox', 'test/box', 'test/', 'test/box/box', 'INBOX')
1081 fail = ('testbox', 'test/box')
1082
1083 def cb(): self.result.append(1)
1084 def eb(failure): self.result.append(0)
1085
1086 def login():
1087 return self.client.login('testuser', 'password-test')
1088 def create():
1089 for name in succeed + fail:
1090 d = self.client.create(name)
1091 d.addCallback(strip(cb)).addErrback(eb)
1092 d.addCallbacks(self._cbStopClient, self._ebGeneral)
1093
1094 self.result = []
1095 d1 = self.connected.addCallback(strip(login)).addCallback(strip(create))
1096 d2 = self.loopback()
1097 d = defer.gatherResults([d1, d2])
1098 return d.addCallback(self._cbTestCreate, succeed, fail)
1099
1100 def _cbTestCreate(self, ignored, succeed, fail):
1101 self.assertEquals(self.result, [1] * len(succeed) + [0] * len(fail))
1102 mbox = SimpleServer.theAccount.mailboxes.keys()
1103 answers = ['inbox', 'testbox', 'test/box', 'test', 'test/box/box']
1104 mbox.sort()
1105 answers.sort()
1106 self.assertEquals(mbox, [a.upper() for a in answers])
1107
1108 def testDelete(self):
1109 SimpleServer.theAccount.addMailbox('delete/me')
1110
1111 def login():
1112 return self.client.login('testuser', 'password-test')
1113 def delete():
1114 return self.client.delete('delete/me')
1115 d1 = self.connected.addCallback(strip(login))
1116 d1.addCallbacks(strip(delete), self._ebGeneral)
1117 d1.addCallbacks(self._cbStopClient, self._ebGeneral)
1118 d2 = self.loopback()
1119 d = defer.gatherResults([d1, d2])
1120 d.addCallback(lambda _:
1121 self.assertEquals(SimpleServer.theAccount.mailboxes.keys() , []))
1122 return d
1123
1124 def testIllegalInboxDelete(self):
1125 self.stashed = None
1126 def login():
1127 return self.client.login('testuser', 'password-test')
1128 def delete():
1129 return self.client.delete('inbox')
1130 def stash(result):
1131 self.stashed = result
1132
1133 d1 = self.connected.addCallback(strip(login))
1134 d1.addCallbacks(strip(delete), self._ebGeneral)
1135 d1.addBoth(stash)
1136 d1.addCallbacks(self._cbStopClient, self._ebGeneral)
1137 d2 = self.loopback()
1138 d = defer.gatherResults([d1, d2])
1139 d.addCallback(lambda _: self.failUnless(isinstance(self.stashed,
1140 failure.Failure)))
1141 return d
1142
1143
1144 def testNonExistentDelete(self):
1145 def login():
1146 return self.client.login('testuser', 'password-test')
1147 def delete():
1148 return self.client.delete('delete/me')
1149 def deleteFailed(failure):
1150 self.failure = failure
1151
1152 self.failure = None
1153 d1 = self.connected.addCallback(strip(login))
1154 d1.addCallback(strip(delete)).addErrback(deleteFailed)
1155 d1.addCallbacks(self._cbStopClient, self._ebGeneral)
1156 d2 = self.loopback()
1157 d = defer.gatherResults([d1, d2])
1158 d.addCallback(lambda _: self.assertEquals(str(self.failure.value),
1159 'No such mailbox'))
1160 return d
1161
1162
1163 def testIllegalDelete(self):
1164 m = SimpleMailbox()
1165 m.flags = (r'\Noselect',)
1166 SimpleServer.theAccount.addMailbox('delete', m)
1167 SimpleServer.theAccount.addMailbox('delete/me')
1168
1169 def login():
1170 return self.client.login('testuser', 'password-test')
1171 def delete():
1172 return self.client.delete('delete')
1173 def deleteFailed(failure):
1174 self.failure = failure
1175
1176 self.failure = None
1177 d1 = self.connected.addCallback(strip(login))
1178 d1.addCallback(strip(delete)).addErrback(deleteFailed)
1179 d1.addCallbacks(self._cbStopClient, self._ebGeneral)
1180 d2 = self.loopback()
1181 d = defer.gatherResults([d1, d2])
1182 expected = "Hierarchically inferior mailboxes exist and \\Noselect is se t"
1183 d.addCallback(lambda _:
1184 self.assertEquals(str(self.failure.value), expected))
1185 return d
1186
1187 def testRename(self):
1188 SimpleServer.theAccount.addMailbox('oldmbox')
1189 def login():
1190 return self.client.login('testuser', 'password-test')
1191 def rename():
1192 return self.client.rename('oldmbox', 'newname')
1193
1194 d1 = self.connected.addCallback(strip(login))
1195 d1.addCallbacks(strip(rename), self._ebGeneral)
1196 d1.addCallbacks(self._cbStopClient, self._ebGeneral)
1197 d2 = self.loopback()
1198 d = defer.gatherResults([d1, d2])
1199 d.addCallback(lambda _:
1200 self.assertEquals(SimpleServer.theAccount.mailboxes.keys() ,
1201 ['NEWNAME']))
1202 return d
1203
1204 def testIllegalInboxRename(self):
1205 self.stashed = None
1206 def login():
1207 return self.client.login('testuser', 'password-test')
1208 def rename():
1209 return self.client.rename('inbox', 'frotz')
1210 def stash(stuff):
1211 self.stashed = stuff
1212
1213 d1 = self.connected.addCallback(strip(login))
1214 d1.addCallbacks(strip(rename), self._ebGeneral)
1215 d1.addBoth(stash)
1216 d1.addCallbacks(self._cbStopClient, self._ebGeneral)
1217 d2 = self.loopback()
1218 d = defer.gatherResults([d1, d2])
1219 d.addCallback(lambda _:
1220 self.failUnless(isinstance(self.stashed, failure.Failure)) )
1221 return d
1222
1223 def testHierarchicalRename(self):
1224 SimpleServer.theAccount.create('oldmbox/m1')
1225 SimpleServer.theAccount.create('oldmbox/m2')
1226 def login():
1227 return self.client.login('testuser', 'password-test')
1228 def rename():
1229 return self.client.rename('oldmbox', 'newname')
1230
1231 d1 = self.connected.addCallback(strip(login))
1232 d1.addCallbacks(strip(rename), self._ebGeneral)
1233 d1.addCallbacks(self._cbStopClient, self._ebGeneral)
1234 d2 = self.loopback()
1235 d = defer.gatherResults([d1, d2])
1236 return d.addCallback(self._cbTestHierarchicalRename)
1237
1238 def _cbTestHierarchicalRename(self, ignored):
1239 mboxes = SimpleServer.theAccount.mailboxes.keys()
1240 expected = ['newname', 'newname/m1', 'newname/m2']
1241 mboxes.sort()
1242 self.assertEquals(mboxes, [s.upper() for s in expected])
1243
1244 def testSubscribe(self):
1245 def login():
1246 return self.client.login('testuser', 'password-test')
1247 def subscribe():
1248 return self.client.subscribe('this/mbox')
1249
1250 d1 = self.connected.addCallback(strip(login))
1251 d1.addCallbacks(strip(subscribe), self._ebGeneral)
1252 d1.addCallbacks(self._cbStopClient, self._ebGeneral)
1253 d2 = self.loopback()
1254 d = defer.gatherResults([d1, d2])
1255 d.addCallback(lambda _:
1256 self.assertEquals(SimpleServer.theAccount.subscriptions,
1257 ['THIS/MBOX']))
1258 return d
1259
1260 def testUnsubscribe(self):
1261 SimpleServer.theAccount.subscriptions = ['THIS/MBOX', 'THAT/MBOX']
1262 def login():
1263 return self.client.login('testuser', 'password-test')
1264 def unsubscribe():
1265 return self.client.unsubscribe('this/mbox')
1266
1267 d1 = self.connected.addCallback(strip(login))
1268 d1.addCallbacks(strip(unsubscribe), self._ebGeneral)
1269 d1.addCallbacks(self._cbStopClient, self._ebGeneral)
1270 d2 = self.loopback()
1271 d = defer.gatherResults([d1, d2])
1272 d.addCallback(lambda _:
1273 self.assertEquals(SimpleServer.theAccount.subscriptions,
1274 ['THAT/MBOX']))
1275 return d
1276
1277 def _listSetup(self, f):
1278 SimpleServer.theAccount.addMailbox('root/subthing')
1279 SimpleServer.theAccount.addMailbox('root/another-thing')
1280 SimpleServer.theAccount.addMailbox('non-root/subthing')
1281
1282 def login():
1283 return self.client.login('testuser', 'password-test')
1284 def listed(answers):
1285 self.listed = answers
1286
1287 self.listed = None
1288 d1 = self.connected.addCallback(strip(login))
1289 d1.addCallbacks(strip(f), self._ebGeneral)
1290 d1.addCallbacks(listed, self._ebGeneral)
1291 d1.addCallbacks(self._cbStopClient, self._ebGeneral)
1292 d2 = self.loopback()
1293 return defer.gatherResults([d1, d2]).addCallback(lambda _: self.listed)
1294
1295 def testList(self):
1296 def list():
1297 return self.client.list('root', '%')
1298 d = self._listSetup(list)
1299 d.addCallback(lambda listed: self.assertEquals(
1300 sortNest(listed),
1301 sortNest([
1302 (SimpleMailbox.flags, "/", "ROOT/SUBTHING"),
1303 (SimpleMailbox.flags, "/", "ROOT/ANOTHER-THING")
1304 ])
1305 ))
1306 return d
1307
1308 def testLSub(self):
1309 SimpleServer.theAccount.subscribe('ROOT/SUBTHING')
1310 def lsub():
1311 return self.client.lsub('root', '%')
1312 d = self._listSetup(lsub)
1313 d.addCallback(self.assertEquals,
1314 [(SimpleMailbox.flags, "/", "ROOT/SUBTHING")])
1315 return d
1316
1317 def testStatus(self):
1318 SimpleServer.theAccount.addMailbox('root/subthing')
1319 def login():
1320 return self.client.login('testuser', 'password-test')
1321 def status():
1322 return self.client.status('root/subthing', 'MESSAGES', 'UIDNEXT', 'U NSEEN')
1323 def statused(result):
1324 self.statused = result
1325
1326 self.statused = None
1327 d1 = self.connected.addCallback(strip(login))
1328 d1.addCallbacks(strip(status), self._ebGeneral)
1329 d1.addCallbacks(statused, self._ebGeneral)
1330 d1.addCallbacks(self._cbStopClient, self._ebGeneral)
1331 d2 = self.loopback()
1332 d = defer.gatherResults([d1, d2])
1333 d.addCallback(lambda _: self.assertEquals(
1334 self.statused,
1335 {'MESSAGES': 9, 'UIDNEXT': '10', 'UNSEEN': 4}
1336 ))
1337 return d
1338
1339 def testFailedStatus(self):
1340 def login():
1341 return self.client.login('testuser', 'password-test')
1342 def status():
1343 return self.client.status('root/nonexistent', 'MESSAGES', 'UIDNEXT', 'UNSEEN')
1344 def statused(result):
1345 self.statused = result
1346 def failed(failure):
1347 self.failure = failure
1348
1349 self.statused = self.failure = None
1350 d1 = self.connected.addCallback(strip(login))
1351 d1.addCallbacks(strip(status), self._ebGeneral)
1352 d1.addCallbacks(statused, failed)
1353 d1.addCallbacks(self._cbStopClient, self._ebGeneral)
1354 d2 = self.loopback()
1355 return defer.gatherResults([d1, d2]).addCallback(self._cbTestFailedStatu s)
1356
1357 def _cbTestFailedStatus(self, ignored):
1358 self.assertEquals(
1359 self.statused, None
1360 )
1361 self.assertEquals(
1362 self.failure.value.args,
1363 ('Could not open mailbox',)
1364 )
1365
1366 def testFullAppend(self):
1367 infile = util.sibpath(__file__, 'rfc822.message')
1368 message = open(infile)
1369 SimpleServer.theAccount.addMailbox('root/subthing')
1370 def login():
1371 return self.client.login('testuser', 'password-test')
1372 def append():
1373 return self.client.append(
1374 'root/subthing',
1375 message,
1376 ('\\SEEN', '\\DELETED'),
1377 'Tue, 17 Jun 2003 11:22:16 -0600 (MDT)',
1378 )
1379
1380 d1 = self.connected.addCallback(strip(login))
1381 d1.addCallbacks(strip(append), self._ebGeneral)
1382 d1.addCallbacks(self._cbStopClient, self._ebGeneral)
1383 d2 = self.loopback()
1384 d = defer.gatherResults([d1, d2])
1385 return d.addCallback(self._cbTestFullAppend, infile)
1386
1387 def _cbTestFullAppend(self, ignored, infile):
1388 mb = SimpleServer.theAccount.mailboxes['ROOT/SUBTHING']
1389 self.assertEquals(1, len(mb.messages))
1390 self.assertEquals(
1391 (['\\SEEN', '\\DELETED'], 'Tue, 17 Jun 2003 11:22:16 -0600 (MDT)', 0 ),
1392 mb.messages[0][1:]
1393 )
1394 self.assertEquals(open(infile).read(), mb.messages[0][0].getvalue())
1395
1396 def testPartialAppend(self):
1397 infile = util.sibpath(__file__, 'rfc822.message')
1398 message = open(infile)
1399 SimpleServer.theAccount.addMailbox('PARTIAL/SUBTHING')
1400 def login():
1401 return self.client.login('testuser', 'password-test')
1402 def append():
1403 message = file(infile)
1404 return self.client.sendCommand(
1405 imap4.Command(
1406 'APPEND',
1407 'PARTIAL/SUBTHING (\\SEEN) "Right now" {%d}' % os.path.getsi ze(infile),
1408 (), self.client._IMAP4Client__cbContinueAppend, message
1409 )
1410 )
1411 d1 = self.connected.addCallback(strip(login))
1412 d1.addCallbacks(strip(append), self._ebGeneral)
1413 d1.addCallbacks(self._cbStopClient, self._ebGeneral)
1414 d2 = self.loopback()
1415 d = defer.gatherResults([d1, d2])
1416 return d.addCallback(self._cbTestPartialAppend, infile)
1417
1418 def _cbTestPartialAppend(self, ignored, infile):
1419 mb = SimpleServer.theAccount.mailboxes['PARTIAL/SUBTHING']
1420 self.assertEquals(1, len(mb.messages))
1421 self.assertEquals(
1422 (['\\SEEN'], 'Right now', 0),
1423 mb.messages[0][1:]
1424 )
1425 self.assertEquals(open(infile).read(), mb.messages[0][0].getvalue())
1426
1427 def testCheck(self):
1428 SimpleServer.theAccount.addMailbox('root/subthing')
1429 def login():
1430 return self.client.login('testuser', 'password-test')
1431 def select():
1432 return self.client.select('root/subthing')
1433 def check():
1434 return self.client.check()
1435
1436 d = self.connected.addCallback(strip(login))
1437 d.addCallbacks(strip(select), self._ebGeneral)
1438 d.addCallbacks(strip(check), self._ebGeneral)
1439 d.addCallbacks(self._cbStopClient, self._ebGeneral)
1440 return self.loopback()
1441
1442 # Okay, that was fun
1443
1444 def testClose(self):
1445 m = SimpleMailbox()
1446 m.messages = [
1447 ('Message 1', ('\\Deleted', 'AnotherFlag'), None, 0),
1448 ('Message 2', ('AnotherFlag',), None, 1),
1449 ('Message 3', ('\\Deleted',), None, 2),
1450 ]
1451 SimpleServer.theAccount.addMailbox('mailbox', m)
1452 def login():
1453 return self.client.login('testuser', 'password-test')
1454 def select():
1455 return self.client.select('mailbox')
1456 def close():
1457 return self.client.close()
1458
1459 d = self.connected.addCallback(strip(login))
1460 d.addCallbacks(strip(select), self._ebGeneral)
1461 d.addCallbacks(strip(close), self._ebGeneral)
1462 d.addCallbacks(self._cbStopClient, self._ebGeneral)
1463 d2 = self.loopback()
1464 return defer.gatherResults([d, d2]).addCallback(self._cbTestClose, m)
1465
1466 def _cbTestClose(self, ignored, m):
1467 self.assertEquals(len(m.messages), 1)
1468 self.assertEquals(m.messages[0], ('Message 2', ('AnotherFlag',), None, 1 ))
1469 self.failUnless(m.closed)
1470
1471 def testExpunge(self):
1472 m = SimpleMailbox()
1473 m.messages = [
1474 ('Message 1', ('\\Deleted', 'AnotherFlag'), None, 0),
1475 ('Message 2', ('AnotherFlag',), None, 1),
1476 ('Message 3', ('\\Deleted',), None, 2),
1477 ]
1478 SimpleServer.theAccount.addMailbox('mailbox', m)
1479 def login():
1480 return self.client.login('testuser', 'password-test')
1481 def select():
1482 return self.client.select('mailbox')
1483 def expunge():
1484 return self.client.expunge()
1485 def expunged(results):
1486 self.failIf(self.server.mbox is None)
1487 self.results = results
1488
1489 self.results = None
1490 d1 = self.connected.addCallback(strip(login))
1491 d1.addCallbacks(strip(select), self._ebGeneral)
1492 d1.addCallbacks(strip(expunge), self._ebGeneral)
1493 d1.addCallbacks(expunged, self._ebGeneral)
1494 d1.addCallbacks(self._cbStopClient, self._ebGeneral)
1495 d2 = self.loopback()
1496 d = defer.gatherResults([d1, d2])
1497 return d.addCallback(self._cbTestExpunge, m)
1498
1499 def _cbTestExpunge(self, ignored, m):
1500 self.assertEquals(len(m.messages), 1)
1501 self.assertEquals(m.messages[0], ('Message 2', ('AnotherFlag',), None, 1 ))
1502
1503 self.assertEquals(self.results, [0, 2])
1504
1505 class TestRealm:
1506 theAccount = None
1507
1508 def requestAvatar(self, avatarId, mind, *interfaces):
1509 return imap4.IAccount, self.theAccount, lambda: None
1510
1511 class TestChecker:
1512 credentialInterfaces = (cred.credentials.IUsernameHashedPassword, cred.crede ntials.IUsernamePassword)
1513
1514 users = {
1515 'testuser': 'secret'
1516 }
1517
1518 def requestAvatarId(self, credentials):
1519 if credentials.username in self.users:
1520 return defer.maybeDeferred(
1521 credentials.checkPassword, self.users[credentials.username]
1522 ).addCallback(self._cbCheck, credentials.username)
1523
1524 def _cbCheck(self, result, username):
1525 if result:
1526 return username
1527 raise cred.error.UnauthorizedLogin()
1528
1529 class AuthenticatorTestCase(IMAP4HelperMixin, unittest.TestCase):
1530 def setUp(self):
1531 IMAP4HelperMixin.setUp(self)
1532
1533 realm = TestRealm()
1534 realm.theAccount = Account('testuser')
1535 portal = cred.portal.Portal(realm)
1536 portal.registerChecker(TestChecker())
1537 self.server.portal = portal
1538
1539 self.authenticated = 0
1540 self.account = realm.theAccount
1541
1542 def testCramMD5(self):
1543 self.server.challengers['CRAM-MD5'] = cred.credentials.CramMD5Credential s
1544 cAuth = imap4.CramMD5ClientAuthenticator('testuser')
1545 self.client.registerAuthenticator(cAuth)
1546
1547 def auth():
1548 return self.client.authenticate('secret')
1549 def authed():
1550 self.authenticated = 1
1551
1552 d1 = self.connected.addCallback(strip(auth))
1553 d1.addCallbacks(strip(authed), self._ebGeneral)
1554 d1.addCallbacks(self._cbStopClient, self._ebGeneral)
1555 d2 = self.loopback()
1556 d = defer.gatherResults([d1, d2])
1557 return d.addCallback(self._cbTestCramMD5)
1558
1559 def _cbTestCramMD5(self, ignored):
1560 self.assertEquals(self.authenticated, 1)
1561 self.assertEquals(self.server.account, self.account)
1562
1563 def testFailedCramMD5(self):
1564 self.server.challengers['CRAM-MD5'] = cred.credentials.CramMD5Credential s
1565 cAuth = imap4.CramMD5ClientAuthenticator('testuser')
1566 self.client.registerAuthenticator(cAuth)
1567
1568 def misauth():
1569 return self.client.authenticate('not the secret')
1570 def authed():
1571 self.authenticated = 1
1572 def misauthed():
1573 self.authenticated = -1
1574
1575 d1 = self.connected.addCallback(strip(misauth))
1576 d1.addCallbacks(strip(authed), strip(misauthed))
1577 d1.addCallbacks(self._cbStopClient, self._ebGeneral)
1578 d = defer.gatherResults([self.loopback(), d1])
1579 return d.addCallback(self._cbTestFailedCramMD5)
1580
1581 def _cbTestFailedCramMD5(self, ignored):
1582 self.assertEquals(self.authenticated, -1)
1583 self.assertEquals(self.server.account, None)
1584
1585 def testLOGIN(self):
1586 self.server.challengers['LOGIN'] = imap4.LOGINCredentials
1587 cAuth = imap4.LOGINAuthenticator('testuser')
1588 self.client.registerAuthenticator(cAuth)
1589
1590 def auth():
1591 return self.client.authenticate('secret')
1592 def authed():
1593 self.authenticated = 1
1594
1595 d1 = self.connected.addCallback(strip(auth))
1596 d1.addCallbacks(strip(authed), self._ebGeneral)
1597 d1.addCallbacks(self._cbStopClient, self._ebGeneral)
1598 d = defer.gatherResults([self.loopback(), d1])
1599 return d.addCallback(self._cbTestLOGIN)
1600
1601 def _cbTestLOGIN(self, ignored):
1602 self.assertEquals(self.authenticated, 1)
1603 self.assertEquals(self.server.account, self.account)
1604
1605 def testFailedLOGIN(self):
1606 self.server.challengers['LOGIN'] = imap4.LOGINCredentials
1607 cAuth = imap4.LOGINAuthenticator('testuser')
1608 self.client.registerAuthenticator(cAuth)
1609
1610 def misauth():
1611 return self.client.authenticate('not the secret')
1612 def authed():
1613 self.authenticated = 1
1614 def misauthed():
1615 self.authenticated = -1
1616
1617 d1 = self.connected.addCallback(strip(misauth))
1618 d1.addCallbacks(strip(authed), strip(misauthed))
1619 d1.addCallbacks(self._cbStopClient, self._ebGeneral)
1620 d = defer.gatherResults([self.loopback(), d1])
1621 return d.addCallback(self._cbTestFailedLOGIN)
1622
1623 def _cbTestFailedLOGIN(self, ignored):
1624 self.assertEquals(self.authenticated, -1)
1625 self.assertEquals(self.server.account, None)
1626
1627 def testPLAIN(self):
1628 self.server.challengers['PLAIN'] = imap4.PLAINCredentials
1629 cAuth = imap4.PLAINAuthenticator('testuser')
1630 self.client.registerAuthenticator(cAuth)
1631
1632 def auth():
1633 return self.client.authenticate('secret')
1634 def authed():
1635 self.authenticated = 1
1636
1637 d1 = self.connected.addCallback(strip(auth))
1638 d1.addCallbacks(strip(authed), self._ebGeneral)
1639 d1.addCallbacks(self._cbStopClient, self._ebGeneral)
1640 d = defer.gatherResults([self.loopback(), d1])
1641 return d.addCallback(self._cbTestPLAIN)
1642
1643 def _cbTestPLAIN(self, ignored):
1644 self.assertEquals(self.authenticated, 1)
1645 self.assertEquals(self.server.account, self.account)
1646
1647 def testFailedPLAIN(self):
1648 self.server.challengers['PLAIN'] = imap4.PLAINCredentials
1649 cAuth = imap4.PLAINAuthenticator('testuser')
1650 self.client.registerAuthenticator(cAuth)
1651
1652 def misauth():
1653 return self.client.authenticate('not the secret')
1654 def authed():
1655 self.authenticated = 1
1656 def misauthed():
1657 self.authenticated = -1
1658
1659 d1 = self.connected.addCallback(strip(misauth))
1660 d1.addCallbacks(strip(authed), strip(misauthed))
1661 d1.addCallbacks(self._cbStopClient, self._ebGeneral)
1662 d = defer.gatherResults([self.loopback(), d1])
1663 return d.addCallback(self._cbTestFailedPLAIN)
1664
1665 def _cbTestFailedPLAIN(self, ignored):
1666 self.assertEquals(self.authenticated, -1)
1667 self.assertEquals(self.server.account, None)
1668
1669
1670 class UnsolicitedResponseTestCase(IMAP4HelperMixin, unittest.TestCase):
1671 def testReadWrite(self):
1672 def login():
1673 return self.client.login('testuser', 'password-test')
1674 def loggedIn():
1675 self.server.modeChanged(1)
1676
1677 d1 = self.connected.addCallback(strip(login))
1678 d1.addCallback(strip(loggedIn)).addErrback(self._ebGeneral)
1679 d = defer.gatherResults([self.loopback(), d1])
1680 return d.addCallback(self._cbTestReadWrite)
1681
1682 def _cbTestReadWrite(self, ignored):
1683 E = self.client.events
1684 self.assertEquals(E, [['modeChanged', 1]])
1685
1686 def testReadOnly(self):
1687 def login():
1688 return self.client.login('testuser', 'password-test')
1689 def loggedIn():
1690 self.server.modeChanged(0)
1691
1692 d1 = self.connected.addCallback(strip(login))
1693 d1.addCallback(strip(loggedIn)).addErrback(self._ebGeneral)
1694 d = defer.gatherResults([self.loopback(), d1])
1695 return d.addCallback(self._cbTestReadOnly)
1696
1697 def _cbTestReadOnly(self, ignored):
1698 E = self.client.events
1699 self.assertEquals(E, [['modeChanged', 0]])
1700
1701 def testFlagChange(self):
1702 flags = {
1703 1: ['\\Answered', '\\Deleted'],
1704 5: [],
1705 10: ['\\Recent']
1706 }
1707 def login():
1708 return self.client.login('testuser', 'password-test')
1709 def loggedIn():
1710 self.server.flagsChanged(flags)
1711
1712 d1 = self.connected.addCallback(strip(login))
1713 d1.addCallback(strip(loggedIn)).addErrback(self._ebGeneral)
1714 d = defer.gatherResults([self.loopback(), d1])
1715 return d.addCallback(self._cbTestFlagChange, flags)
1716
1717 def _cbTestFlagChange(self, ignored, flags):
1718 E = self.client.events
1719 expect = [['flagsChanged', {x[0]: x[1]}] for x in flags.items()]
1720 E.sort()
1721 expect.sort()
1722 self.assertEquals(E, expect)
1723
1724 def testNewMessages(self):
1725 def login():
1726 return self.client.login('testuser', 'password-test')
1727 def loggedIn():
1728 self.server.newMessages(10, None)
1729
1730 d1 = self.connected.addCallback(strip(login))
1731 d1.addCallback(strip(loggedIn)).addErrback(self._ebGeneral)
1732 d = defer.gatherResults([self.loopback(), d1])
1733 return d.addCallback(self._cbTestNewMessages)
1734
1735 def _cbTestNewMessages(self, ignored):
1736 E = self.client.events
1737 self.assertEquals(E, [['newMessages', 10, None]])
1738
1739 def testNewRecentMessages(self):
1740 def login():
1741 return self.client.login('testuser', 'password-test')
1742 def loggedIn():
1743 self.server.newMessages(None, 10)
1744
1745 d1 = self.connected.addCallback(strip(login))
1746 d1.addCallback(strip(loggedIn)).addErrback(self._ebGeneral)
1747 d = defer.gatherResults([self.loopback(), d1])
1748 return d.addCallback(self._cbTestNewRecentMessages)
1749
1750 def _cbTestNewRecentMessages(self, ignored):
1751 E = self.client.events
1752 self.assertEquals(E, [['newMessages', None, 10]])
1753
1754 def testNewMessagesAndRecent(self):
1755 def login():
1756 return self.client.login('testuser', 'password-test')
1757 def loggedIn():
1758 self.server.newMessages(20, 10)
1759
1760 d1 = self.connected.addCallback(strip(login))
1761 d1.addCallback(strip(loggedIn)).addErrback(self._ebGeneral)
1762 d = defer.gatherResults([self.loopback(), d1])
1763 return d.addCallback(self._cbTestNewMessagesAndRecent)
1764
1765 def _cbTestNewMessagesAndRecent(self, ignored):
1766 E = self.client.events
1767 self.assertEquals(E, [['newMessages', 20, None], ['newMessages', None, 1 0]])
1768
1769
1770 class ClientCapabilityTests(unittest.TestCase):
1771 """
1772 Tests for issuance of the CAPABILITY command and handling of its response.
1773 """
1774 def setUp(self):
1775 """
1776 Create an L{imap4.IMAP4Client} connected to a L{StringTransport}.
1777 """
1778 self.transport = StringTransport()
1779 self.protocol = imap4.IMAP4Client()
1780 self.protocol.makeConnection(self.transport)
1781 self.protocol.dataReceived('* OK [IMAP4rev1]\r\n')
1782
1783
1784 def test_simpleAtoms(self):
1785 """
1786 A capability response consisting only of atoms without C{'='} in them
1787 should result in a dict mapping those atoms to C{None}.
1788 """
1789 capabilitiesResult = self.protocol.getCapabilities(useCache=False)
1790 self.protocol.dataReceived('* CAPABILITY IMAP4rev1 LOGINDISABLED\r\n')
1791 self.protocol.dataReceived('0001 OK Capability completed.\r\n')
1792 def gotCapabilities(capabilities):
1793 self.assertEqual(
1794 capabilities, {'IMAP4rev1': None, 'LOGINDISABLED': None})
1795 capabilitiesResult.addCallback(gotCapabilities)
1796 return capabilitiesResult
1797
1798
1799 def test_categoryAtoms(self):
1800 """
1801 A capability response consisting of atoms including C{'='} should have
1802 those atoms split on that byte and have capabilities in the same
1803 category aggregated into lists in the resulting dictionary.
1804
1805 (n.b. - I made up the word "category atom"; the protocol has no notion
1806 of structure here, but rather allows each capability to define the
1807 semantics of its entry in the capability response in a freeform manner.
1808 If I had realized this earlier, the API for capabilities would look
1809 different. As it is, we can hope that no one defines any crazy
1810 semantics which are incompatible with this API, or try to figure out a
1811 better API when someone does. -exarkun)
1812 """
1813 capabilitiesResult = self.protocol.getCapabilities(useCache=False)
1814 self.protocol.dataReceived('* CAPABILITY IMAP4rev1 AUTH=LOGIN AUTH=PLAIN \r\n')
1815 self.protocol.dataReceived('0001 OK Capability completed.\r\n')
1816 def gotCapabilities(capabilities):
1817 self.assertEqual(
1818 capabilities, {'IMAP4rev1': None, 'AUTH': ['LOGIN', 'PLAIN']})
1819 capabilitiesResult.addCallback(gotCapabilities)
1820 return capabilitiesResult
1821
1822
1823 def test_mixedAtoms(self):
1824 """
1825 A capability response consisting of both simple and category atoms of
1826 the same type should result in a list containing C{None} as well as the
1827 values for the category.
1828 """
1829 capabilitiesResult = self.protocol.getCapabilities(useCache=False)
1830 # Exercise codepath for both orderings of =-having and =-missing
1831 # capabilities.
1832 self.protocol.dataReceived(
1833 '* CAPABILITY IMAP4rev1 FOO FOO=BAR BAR=FOO BAR\r\n')
1834 self.protocol.dataReceived('0001 OK Capability completed.\r\n')
1835 def gotCapabilities(capabilities):
1836 self.assertEqual(capabilities, {'IMAP4rev1': None,
1837 'FOO': [None, 'BAR'],
1838 'BAR': ['FOO', None]})
1839 capabilitiesResult.addCallback(gotCapabilities)
1840 return capabilitiesResult
1841
1842
1843
1844
1845 class HandCraftedTestCase(IMAP4HelperMixin, unittest.TestCase):
1846 def testTrailingLiteral(self):
1847 transport = StringTransport()
1848 c = imap4.IMAP4Client()
1849 c.makeConnection(transport)
1850 c.lineReceived('* OK [IMAP4rev1]')
1851
1852 def cbSelect(ignored):
1853 d = c.fetchMessage('1')
1854 c.dataReceived('* 1 FETCH (RFC822 {10}\r\n0123456789\r\n RFC822.SIZE 10)\r\n')
1855 c.dataReceived('0003 OK FETCH\r\n')
1856 return d
1857
1858 def cbLogin(ignored):
1859 d = c.select('inbox')
1860 c.lineReceived('0002 OK SELECT')
1861 d.addCallback(cbSelect)
1862 return d
1863
1864 d = c.login('blah', 'blah')
1865 c.dataReceived('0001 OK LOGIN\r\n')
1866 d.addCallback(cbLogin)
1867 return d
1868
1869 def testPathelogicalScatteringOfLiterals(self):
1870 self.server.checker.addUser('testuser', 'password-test')
1871 transport = StringTransport()
1872 self.server.makeConnection(transport)
1873
1874 transport.clear()
1875 self.server.dataReceived("01 LOGIN {8}\r\n")
1876 self.assertEquals(transport.value(), "+ Ready for 8 octets of text\r\n")
1877
1878 transport.clear()
1879 self.server.dataReceived("testuser {13}\r\n")
1880 self.assertEquals(transport.value(), "+ Ready for 13 octets of text\r\n" )
1881
1882 transport.clear()
1883 self.server.dataReceived("password-test\r\n")
1884 self.assertEquals(transport.value(), "01 OK LOGIN succeeded\r\n")
1885 self.assertEquals(self.server.state, 'auth')
1886
1887 self.server.connectionLost(error.ConnectionDone("Connection done."))
1888
1889 def testUnsolicitedResponseMixedWithSolicitedResponse(self):
1890
1891 class StillSimplerClient(imap4.IMAP4Client):
1892 events = []
1893 def flagsChanged(self, newFlags):
1894 self.events.append(['flagsChanged', newFlags])
1895
1896 transport = StringTransport()
1897 c = StillSimplerClient()
1898 c.makeConnection(transport)
1899 c.lineReceived('* OK [IMAP4rev1]')
1900
1901 def login():
1902 d = c.login('blah', 'blah')
1903 c.dataReceived('0001 OK LOGIN\r\n')
1904 return d
1905 def select():
1906 d = c.select('inbox')
1907 c.lineReceived('0002 OK SELECT')
1908 return d
1909 def fetch():
1910 d = c.fetchSpecific('1:*',
1911 headerType='HEADER.FIELDS',
1912 headerArgs=['SUBJECT'])
1913 c.dataReceived('* 1 FETCH (BODY[HEADER.FIELDS ("SUBJECT")] {38}\r\n' )
1914 c.dataReceived('Subject: Suprise for your woman...\r\n')
1915 c.dataReceived('\r\n')
1916 c.dataReceived(')\r\n')
1917 c.dataReceived('* 1 FETCH (FLAGS (\Seen))\r\n')
1918 c.dataReceived('* 2 FETCH (BODY[HEADER.FIELDS ("SUBJECT")] {75}\r\n' )
1919 c.dataReceived('Subject: What you been doing. Order your meds here . ,. handcuff madsen\r\n')
1920 c.dataReceived('\r\n')
1921 c.dataReceived(')\r\n')
1922 c.dataReceived('0003 OK FETCH completed\r\n')
1923 return d
1924 def test(res):
1925 self.assertEquals(res, {
1926 1: [['BODY', ['HEADER.FIELDS', ['SUBJECT']],
1927 'Subject: Suprise for your woman...\r\n\r\n']],
1928 2: [['BODY', ['HEADER.FIELDS', ['SUBJECT']],
1929 'Subject: What you been doing. Order your meds here . ,. han dcuff madsen\r\n\r\n']]
1930 })
1931
1932 self.assertEquals(c.events, [['flagsChanged', {1: ['\\Seen']}]])
1933
1934 return login(
1935 ).addCallback(strip(select)
1936 ).addCallback(strip(fetch)
1937 ).addCallback(test)
1938
1939
1940 def test_literalWithoutPrecedingWhitespace(self):
1941 """
1942 Literals should be recognized even when they are not preceded by
1943 whitespace.
1944 """
1945 transport = StringTransport()
1946 protocol = imap4.IMAP4Client()
1947
1948 protocol.makeConnection(transport)
1949 protocol.lineReceived('* OK [IMAP4rev1]')
1950
1951 def login():
1952 d = protocol.login('blah', 'blah')
1953 protocol.dataReceived('0001 OK LOGIN\r\n')
1954 return d
1955 def select():
1956 d = protocol.select('inbox')
1957 protocol.lineReceived('0002 OK SELECT')
1958 return d
1959 def fetch():
1960 d = protocol.fetchSpecific('1:*',
1961 headerType='HEADER.FIELDS',
1962 headerArgs=['SUBJECT'])
1963 protocol.dataReceived(
1964 '* 1 FETCH (BODY[HEADER.FIELDS ({7}\r\nSUBJECT)] "Hello")\r\n')
1965 protocol.dataReceived('0003 OK FETCH completed\r\n')
1966 return d
1967 def test(result):
1968 self.assertEqual(
1969 result, {1: [['BODY', ['HEADER.FIELDS', ['SUBJECT']], 'Hello']] })
1970
1971 d = login()
1972 d.addCallback(strip(select))
1973 d.addCallback(strip(fetch))
1974 d.addCallback(test)
1975 return d
1976
1977
1978 def test_nonIntegerLiteralLength(self):
1979 """
1980 If the server sends a literal length which cannot be parsed as an
1981 integer, L{IMAP4Client.lineReceived} should cause the protocol to be
1982 disconnected by raising L{imap4.IllegalServerResponse}.
1983 """
1984 transport = StringTransport()
1985 protocol = imap4.IMAP4Client()
1986
1987 protocol.makeConnection(transport)
1988 protocol.lineReceived('* OK [IMAP4rev1]')
1989
1990 def login():
1991 d = protocol.login('blah', 'blah')
1992 protocol.dataReceived('0001 OK LOGIN\r\n')
1993 return d
1994 def select():
1995 d = protocol.select('inbox')
1996 protocol.lineReceived('0002 OK SELECT')
1997 return d
1998 def fetch():
1999 d = protocol.fetchSpecific('1:*',
2000 headerType='HEADER.FIELDS',
2001 headerArgs=['SUBJECT'])
2002 self.assertRaises(
2003 imap4.IllegalServerResponse,
2004 protocol.dataReceived,
2005 '* 1 FETCH {xyz}\r\n...')
2006 d = login()
2007 d.addCallback(strip(select))
2008 d.addCallback(strip(fetch))
2009 return d
2010
2011
2012
2013 class FakeyServer(imap4.IMAP4Server):
2014 state = 'select'
2015 timeout = None
2016
2017 def sendServerGreeting(self):
2018 pass
2019
2020 class FakeyMessage:
2021 implements(imap4.IMessage)
2022
2023 def __init__(self, headers, flags, date, body, uid, subpart):
2024 self.headers = headers
2025 self.flags = flags
2026 self.body = StringIO(body)
2027 self.size = len(body)
2028 self.date = date
2029 self.uid = uid
2030 self.subpart = subpart
2031
2032 def getHeaders(self, negate, *names):
2033 self.got_headers = negate, names
2034 return self.headers
2035
2036 def getFlags(self):
2037 return self.flags
2038
2039 def getInternalDate(self):
2040 return self.date
2041
2042 def getBodyFile(self):
2043 return self.body
2044
2045 def getSize(self):
2046 return self.size
2047
2048 def getUID(self):
2049 return self.uid
2050
2051 def isMultipart(self):
2052 return self.subpart is not None
2053
2054 def getSubPart(self, part):
2055 self.got_subpart = part
2056 return self.subpart[part]
2057
2058 class NewStoreTestCase(unittest.TestCase, IMAP4HelperMixin):
2059 result = None
2060 storeArgs = None
2061
2062 def setUp(self):
2063 self.received_messages = self.received_uid = None
2064
2065 self.server = imap4.IMAP4Server()
2066 self.server.state = 'select'
2067 self.server.mbox = self
2068 self.connected = defer.Deferred()
2069 self.client = SimpleClient(self.connected)
2070
2071 def addListener(self, x):
2072 pass
2073 def removeListener(self, x):
2074 pass
2075
2076 def store(self, *args, **kw):
2077 self.storeArgs = args, kw
2078 return self.response
2079
2080 def _storeWork(self):
2081 def connected():
2082 return self.function(self.messages, self.flags, self.silent, self.ui d)
2083 def result(R):
2084 self.result = R
2085
2086 self.connected.addCallback(strip(connected)
2087 ).addCallback(result
2088 ).addCallback(self._cbStopClient
2089 ).addErrback(self._ebGeneral)
2090
2091 def check(ignored):
2092 self.assertEquals(self.result, self.expected)
2093 self.assertEquals(self.storeArgs, self.expectedArgs)
2094 d = loopback.loopbackTCP(self.server, self.client, noisy=False)
2095 d.addCallback(check)
2096 return d
2097
2098 def testSetFlags(self, uid=0):
2099 self.function = self.client.setFlags
2100 self.messages = '1,5,9'
2101 self.flags = ['\\A', '\\B', 'C']
2102 self.silent = False
2103 self.uid = uid
2104 self.response = {
2105 1: ['\\A', '\\B', 'C'],
2106 5: ['\\A', '\\B', 'C'],
2107 9: ['\\A', '\\B', 'C'],
2108 }
2109 self.expected = {
2110 1: {'FLAGS': ['\\A', '\\B', 'C']},
2111 5: {'FLAGS': ['\\A', '\\B', 'C']},
2112 9: {'FLAGS': ['\\A', '\\B', 'C']},
2113 }
2114 msg = imap4.MessageSet()
2115 msg.add(1)
2116 msg.add(5)
2117 msg.add(9)
2118 self.expectedArgs = ((msg, ['\\A', '\\B', 'C'], 0), {'uid': 0})
2119 return self._storeWork()
2120
2121
2122 class NewFetchTestCase(unittest.TestCase, IMAP4HelperMixin):
2123 def setUp(self):
2124 self.received_messages = self.received_uid = None
2125 self.result = None
2126
2127 self.server = imap4.IMAP4Server()
2128 self.server.state = 'select'
2129 self.server.mbox = self
2130 self.connected = defer.Deferred()
2131 self.client = SimpleClient(self.connected)
2132
2133 def addListener(self, x):
2134 pass
2135 def removeListener(self, x):
2136 pass
2137
2138 def fetch(self, messages, uid):
2139 self.received_messages = messages
2140 self.received_uid = uid
2141 return iter(zip(range(len(self.msgObjs)), self.msgObjs))
2142
2143 def _fetchWork(self, uid):
2144 if uid:
2145 for (i, msg) in zip(range(len(self.msgObjs)), self.msgObjs):
2146 self.expected[i]['UID'] = str(msg.getUID())
2147
2148 def result(R):
2149 self.result = R
2150
2151 self.connected.addCallback(lambda _: self.function(self.messages, uid)
2152 ).addCallback(result
2153 ).addCallback(self._cbStopClient
2154 ).addErrback(self._ebGeneral)
2155
2156 d = loopback.loopbackTCP(self.server, self.client, noisy=False)
2157 d.addCallback(lambda x : self.assertEquals(self.result, self.expected))
2158 return d
2159
2160 def testFetchUID(self):
2161 self.function = lambda m, u: self.client.fetchUID(m)
2162
2163 self.messages = '7'
2164 self.msgObjs = [
2165 FakeyMessage({}, (), '', '', 12345, None),
2166 FakeyMessage({}, (), '', '', 999, None),
2167 FakeyMessage({}, (), '', '', 10101, None),
2168 ]
2169 self.expected = {
2170 0: {'UID': '12345'},
2171 1: {'UID': '999'},
2172 2: {'UID': '10101'},
2173 }
2174 return self._fetchWork(0)
2175
2176 def testFetchFlags(self, uid=0):
2177 self.function = self.client.fetchFlags
2178 self.messages = '9'
2179 self.msgObjs = [
2180 FakeyMessage({}, ['FlagA', 'FlagB', '\\FlagC'], '', '', 54321, None) ,
2181 FakeyMessage({}, ['\\FlagC', 'FlagA', 'FlagB'], '', '', 12345, None) ,
2182 ]
2183 self.expected = {
2184 0: {'FLAGS': ['FlagA', 'FlagB', '\\FlagC']},
2185 1: {'FLAGS': ['\\FlagC', 'FlagA', 'FlagB']},
2186 }
2187 return self._fetchWork(uid)
2188
2189 def testFetchFlagsUID(self):
2190 return self.testFetchFlags(1)
2191
2192 def testFetchInternalDate(self, uid=0):
2193 self.function = self.client.fetchInternalDate
2194 self.messages = '13'
2195 self.msgObjs = [
2196 FakeyMessage({}, (), 'Fri, 02 Nov 2003 21:25:10 GMT', '', 23232, Non e),
2197 FakeyMessage({}, (), 'Thu, 29 Dec 2013 11:31:52 EST', '', 101, None) ,
2198 FakeyMessage({}, (), 'Mon, 10 Mar 1992 02:44:30 CST', '', 202, None) ,
2199 FakeyMessage({}, (), 'Sat, 11 Jan 2000 14:40:24 PST', '', 303, None) ,
2200 ]
2201 self.expected = {
2202 0: {'INTERNALDATE': '02-Nov-2003 21:25:10 +0000'},
2203 1: {'INTERNALDATE': '29-Dec-2013 11:31:52 -0500'},
2204 2: {'INTERNALDATE': '10-Mar-1992 02:44:30 -0600'},
2205 3: {'INTERNALDATE': '11-Jan-2000 14:40:24 -0800'},
2206 }
2207 return self._fetchWork(uid)
2208
2209 def testFetchInternalDateUID(self):
2210 return self.testFetchInternalDate(1)
2211
2212 def testFetchEnvelope(self, uid=0):
2213 self.function = self.client.fetchEnvelope
2214 self.messages = '15'
2215 self.msgObjs = [
2216 FakeyMessage({
2217 'from': 'user@domain', 'to': 'resu@domain',
2218 'date': 'thursday', 'subject': 'it is a message',
2219 'message-id': 'id-id-id-yayaya'}, (), '', '', 65656,
2220 None),
2221 ]
2222 self.expected = {
2223 0: {'ENVELOPE':
2224 ['thursday', 'it is a message',
2225 [[None, None, 'user', 'domain']],
2226 [[None, None, 'user', 'domain']],
2227 [[None, None, 'user', 'domain']],
2228 [[None, None, 'resu', 'domain']],
2229 None, None, None, 'id-id-id-yayaya']
2230 }
2231 }
2232 return self._fetchWork(uid)
2233
2234 def testFetchEnvelopeUID(self):
2235 return self.testFetchEnvelope(1)
2236
2237 def testFetchBodyStructure(self, uid=0):
2238 self.function = self.client.fetchBodyStructure
2239 self.messages = '3:9,10:*'
2240 self.msgObjs = [FakeyMessage({
2241 'content-type': 'text/plain; name=thing; key="value"',
2242 'content-id': 'this-is-the-content-id',
2243 'content-description': 'describing-the-content-goes-here!',
2244 'content-transfer-encoding': '8BIT',
2245 }, (), '', 'Body\nText\nGoes\nHere\n', 919293, None)]
2246 self.expected = {0: {'BODYSTRUCTURE': [
2247 'text', 'plain', [['name', 'thing'], ['key', 'value']],
2248 'this-is-the-content-id', 'describing-the-content-goes-here!',
2249 '8BIT', '20', '4', None, None, None]}}
2250 return self._fetchWork(uid)
2251
2252 def testFetchBodyStructureUID(self):
2253 return self.testFetchBodyStructure(1)
2254
2255 def testFetchSimplifiedBody(self, uid=0):
2256 self.function = self.client.fetchSimplifiedBody
2257 self.messages = '21'
2258 self.msgObjs = [FakeyMessage({}, (), '', 'Yea whatever', 91825,
2259 [FakeyMessage({'content-type': 'image/jpg'}, (), '',
2260 'Body Body Body', None, None
2261 )]
2262 )]
2263 self.expected = {0:
2264 {'BODY':
2265 [None, None, [], None, None, None,
2266 '12'
2267 ]
2268 }
2269 }
2270
2271 return self._fetchWork(uid)
2272
2273 def testFetchSimplifiedBodyUID(self):
2274 return self.testFetchSimplifiedBody(1)
2275
2276 def testFetchSimplifiedBodyText(self, uid=0):
2277 self.function = self.client.fetchSimplifiedBody
2278 self.messages = '21'
2279 self.msgObjs = [FakeyMessage({'content-type': 'text/plain'},
2280 (), '', 'Yea whatever', 91825, None)]
2281 self.expected = {0:
2282 {'BODY':
2283 ['text', 'plain', [], None, None, None,
2284 '12', '1'
2285 ]
2286 }
2287 }
2288
2289 return self._fetchWork(uid)
2290
2291 def testFetchSimplifiedBodyTextUID(self):
2292 return self.testFetchSimplifiedBodyText(1)
2293
2294 def testFetchSimplifiedBodyRFC822(self, uid=0):
2295 self.function = self.client.fetchSimplifiedBody
2296 self.messages = '21'
2297 self.msgObjs = [FakeyMessage({'content-type': 'message/rfc822'},
2298 (), '', 'Yea whatever', 91825,
2299 [FakeyMessage({'content-type': 'image/jpg'}, (), '',
2300 'Body Body Body', None, None
2301 )]
2302 )]
2303 self.expected = {0:
2304 {'BODY':
2305 ['message', 'rfc822', [], None, None, None,
2306 '12', [None, None, [[None, None, None]],
2307 [[None, None, None]], None, None, None,
2308 None, None, None], ['image', 'jpg', [],
2309 None, None, None, '14'], '1'
2310 ]
2311 }
2312 }
2313
2314 return self._fetchWork(uid)
2315
2316 def testFetchSimplifiedBodyRFC822UID(self):
2317 return self.testFetchSimplifiedBodyRFC822(1)
2318
2319 def testFetchMessage(self, uid=0):
2320 self.function = self.client.fetchMessage
2321 self.messages = '1,3,7,10101'
2322 self.msgObjs = [
2323 FakeyMessage({'Header': 'Value'}, (), '', 'BODY TEXT\r\n', 91, None) ,
2324 ]
2325 self.expected = {
2326 0: {'RFC822': 'Header: Value\r\n\r\nBODY TEXT\r\n'}
2327 }
2328 return self._fetchWork(uid)
2329
2330 def testFetchMessageUID(self):
2331 return self.testFetchMessage(1)
2332
2333 def testFetchHeaders(self, uid=0):
2334 self.function = self.client.fetchHeaders
2335 self.messages = '9,6,2'
2336 self.msgObjs = [
2337 FakeyMessage({'H1': 'V1', 'H2': 'V2'}, (), '', '', 99, None),
2338 ]
2339 self.expected = {
2340 0: {'RFC822.HEADER': imap4._formatHeaders({'H1': 'V1', 'H2': 'V2'})} ,
2341 }
2342 return self._fetchWork(uid)
2343
2344 def testFetchHeadersUID(self):
2345 return self.testFetchHeaders(1)
2346
2347 def testFetchBody(self, uid=0):
2348 self.function = self.client.fetchBody
2349 self.messages = '1,2,3,4,5,6,7'
2350 self.msgObjs = [
2351 FakeyMessage({'Header': 'Value'}, (), '', 'Body goes here\r\n', 171, None),
2352 ]
2353 self.expected = {
2354 0: {'RFC822.TEXT': 'Body goes here\r\n'},
2355 }
2356 return self._fetchWork(uid)
2357
2358 def testFetchBodyUID(self):
2359 return self.testFetchBody(1)
2360
2361 def testFetchBodyParts(self):
2362 self.function = self.client.fetchBodyParts
2363 self.messages = '1'
2364 parts = [1, 2]
2365 outerBody = ''
2366 innerBody1 = 'Contained body message text. Squarge.'
2367 innerBody2 = 'Secondary <i>message</i> text of squarge body.'
2368 headers = util.OrderedDict()
2369 headers['from'] = 'sender@host'
2370 headers['to'] = 'recipient@domain'
2371 headers['subject'] = 'booga booga boo'
2372 headers['content-type'] = 'multipart/alternative; boundary="xyz"'
2373 innerHeaders = util.OrderedDict()
2374 innerHeaders['subject'] = 'this is subject text'
2375 innerHeaders['content-type'] = 'text/plain'
2376 innerHeaders2 = util.OrderedDict()
2377 innerHeaders2['subject'] = '<b>this is subject</b>'
2378 innerHeaders2['content-type'] = 'text/html'
2379 self.msgObjs = [FakeyMessage(
2380 headers, (), None, outerBody, 123,
2381 [FakeyMessage(innerHeaders, (), None, innerBody1, None, None),
2382 FakeyMessage(innerHeaders2, (), None, innerBody2, None, None)])]
2383 self.expected = {
2384 0: {'1': innerBody1, '2': innerBody2},
2385 }
2386
2387 def result(R):
2388 self.result = R
2389
2390 self.connected.addCallback(lambda _: self.function(self.messages, parts) )
2391 self.connected.addCallback(result)
2392 self.connected.addCallback(self._cbStopClient)
2393 self.connected.addErrback(self._ebGeneral)
2394
2395 d = loopback.loopbackTCP(self.server, self.client, noisy=False)
2396 d.addCallback(lambda ign: self.assertEquals(self.result, self.expected))
2397 return d
2398
2399
2400 def test_fetchBodyPartOfNonMultipart(self):
2401 """
2402 Single-part messages have an implicit first part which clients
2403 should be able to retrieve explicitly. Test that a client
2404 requesting part 1 of a text/plain message receives the body of the
2405 text/plain part.
2406 """
2407 self.function = self.client.fetchBodyParts
2408 self.messages = '1'
2409 parts = [1]
2410 outerBody = 'DA body'
2411 headers = util.OrderedDict()
2412 headers['from'] = 'sender@host'
2413 headers['to'] = 'recipient@domain'
2414 headers['subject'] = 'booga booga boo'
2415 headers['content-type'] = 'text/plain'
2416 self.msgObjs = [FakeyMessage(
2417 headers, (), None, outerBody, 123, None)]
2418
2419 self.expected = {
2420 0: {'1': outerBody},
2421 }
2422
2423 def result(R):
2424 self.result = R
2425
2426 self.connected.addCallback(lambda _: self.function(self.messages, parts) )
2427 self.connected.addCallback(result)
2428 self.connected.addCallback(self._cbStopClient)
2429 self.connected.addErrback(self._ebGeneral)
2430
2431 d = loopback.loopbackTCP(self.server, self.client, noisy=False)
2432 d.addCallback(lambda ign: self.assertEquals(self.result, self.expected))
2433 return d
2434
2435
2436 def testFetchSize(self, uid=0):
2437 self.function = self.client.fetchSize
2438 self.messages = '1:100,2:*'
2439 self.msgObjs = [
2440 FakeyMessage({}, (), '', 'x' * 20, 123, None),
2441 ]
2442 self.expected = {
2443 0: {'RFC822.SIZE': '20'},
2444 }
2445 return self._fetchWork(uid)
2446
2447 def testFetchSizeUID(self):
2448 return self.testFetchSize(1)
2449
2450 def testFetchFull(self, uid=0):
2451 self.function = self.client.fetchFull
2452 self.messages = '1,3'
2453 self.msgObjs = [
2454 FakeyMessage({}, ('\\XYZ', '\\YZX', 'Abc'),
2455 'Sun, 25 Jul 2010 06:20:30 -0400 (EDT)',
2456 'xyz' * 2, 654, None),
2457 FakeyMessage({}, ('\\One', '\\Two', 'Three'),
2458 'Mon, 14 Apr 2003 19:43:44 -0400',
2459 'abc' * 4, 555, None),
2460 ]
2461 self.expected = {
2462 0: {'FLAGS': ['\\XYZ', '\\YZX', 'Abc'],
2463 'INTERNALDATE': '25-Jul-2010 06:20:30 -0400',
2464 'RFC822.SIZE': '6',
2465 'ENVELOPE': [None, None, [[None, None, None]], [[None, None, Non e]], None, None, None, None, None, None],
2466 'BODY': [None, None, [], None, None, None, '6']},
2467 1: {'FLAGS': ['\\One', '\\Two', 'Three'],
2468 'INTERNALDATE': '14-Apr-2003 19:43:44 -0400',
2469 'RFC822.SIZE': '12',
2470 'ENVELOPE': [None, None, [[None, None, None]], [[None, None, Non e]], None, None, None, None, None, None],
2471 'BODY': [None, None, [], None, None, None, '12']},
2472 }
2473 return self._fetchWork(uid)
2474
2475 def testFetchFullUID(self):
2476 return self.testFetchFull(1)
2477
2478 def testFetchAll(self, uid=0):
2479 self.function = self.client.fetchAll
2480 self.messages = '1,2:3'
2481 self.msgObjs = [
2482 FakeyMessage({}, (), 'Mon, 14 Apr 2003 19:43:44 +0400',
2483 'Lalala', 10101, None),
2484 FakeyMessage({}, (), 'Tue, 15 Apr 2003 19:43:44 +0200',
2485 'Alalal', 20202, None),
2486 ]
2487 self.expected = {
2488 0: {'ENVELOPE': [None, None, [[None, None, None]], [[None, None, Non e]], None, None, None, None, None, None],
2489 'RFC822.SIZE': '6',
2490 'INTERNALDATE': '14-Apr-2003 19:43:44 +0400',
2491 'FLAGS': []},
2492 1: {'ENVELOPE': [None, None, [[None, None, None]], [[None, None, Non e]], None, None, None, None, None, None],
2493 'RFC822.SIZE': '6',
2494 'INTERNALDATE': '15-Apr-2003 19:43:44 +0200',
2495 'FLAGS': []},
2496 }
2497 return self._fetchWork(uid)
2498
2499 def testFetchAllUID(self):
2500 return self.testFetchAll(1)
2501
2502 def testFetchFast(self, uid=0):
2503 self.function = self.client.fetchFast
2504 self.messages = '1'
2505 self.msgObjs = [
2506 FakeyMessage({}, ('\\X',), '19 Mar 2003 19:22:21 -0500', '', 9, None ),
2507 ]
2508 self.expected = {
2509 0: {'FLAGS': ['\\X'],
2510 'INTERNALDATE': '19-Mar-2003 19:22:21 -0500',
2511 'RFC822.SIZE': '0'},
2512 }
2513 return self._fetchWork(uid)
2514
2515 def testFetchFastUID(self):
2516 return self.testFetchFast(1)
2517
2518
2519 class FetchSearchStoreTestCase(unittest.TestCase, IMAP4HelperMixin):
2520 implements(imap4.ISearchableMailbox)
2521
2522 def setUp(self):
2523 self.expected = self.result = None
2524 self.server_received_query = None
2525 self.server_received_uid = None
2526 self.server_received_parts = None
2527 self.server_received_messages = None
2528
2529 self.server = imap4.IMAP4Server()
2530 self.server.state = 'select'
2531 self.server.mbox = self
2532 self.connected = defer.Deferred()
2533 self.client = SimpleClient(self.connected)
2534
2535 def search(self, query, uid):
2536 self.server_received_query = query
2537 self.server_received_uid = uid
2538 return self.expected
2539
2540 def addListener(self, *a, **kw):
2541 pass
2542 removeListener = addListener
2543
2544 def _searchWork(self, uid):
2545 def search():
2546 return self.client.search(self.query, uid=uid)
2547 def result(R):
2548 self.result = R
2549
2550 self.connected.addCallback(strip(search)
2551 ).addCallback(result
2552 ).addCallback(self._cbStopClient
2553 ).addErrback(self._ebGeneral)
2554
2555 def check(ignored):
2556 # Ensure no short-circuiting wierdness is going on
2557 self.failIf(self.result is self.expected)
2558
2559 self.assertEquals(self.result, self.expected)
2560 self.assertEquals(self.uid, self.server_received_uid)
2561 self.assertEquals(
2562 imap4.parseNestedParens(self.query),
2563 self.server_received_query
2564 )
2565 d = loopback.loopbackTCP(self.server, self.client, noisy=False)
2566 d.addCallback(check)
2567 return d
2568
2569 def testSearch(self):
2570 self.query = imap4.Or(
2571 imap4.Query(header=('subject', 'substring')),
2572 imap4.Query(larger=1024, smaller=4096),
2573 )
2574 self.expected = [1, 4, 5, 7]
2575 self.uid = 0
2576 return self._searchWork(0)
2577
2578 def testUIDSearch(self):
2579 self.query = imap4.Or(
2580 imap4.Query(header=('subject', 'substring')),
2581 imap4.Query(larger=1024, smaller=4096),
2582 )
2583 self.uid = 1
2584 self.expected = [1, 2, 3]
2585 return self._searchWork(1)
2586
2587 def getUID(self, msg):
2588 try:
2589 return self.expected[msg]['UID']
2590 except (TypeError, IndexError):
2591 return self.expected[msg-1]
2592 except KeyError:
2593 return 42
2594
2595 def fetch(self, messages, uid):
2596 self.server_received_uid = uid
2597 self.server_received_messages = str(messages)
2598 return self.expected
2599
2600 def _fetchWork(self, fetch):
2601 def result(R):
2602 self.result = R
2603
2604 self.connected.addCallback(strip(fetch)
2605 ).addCallback(result
2606 ).addCallback(self._cbStopClient
2607 ).addErrback(self._ebGeneral)
2608
2609 def check(ignored):
2610 # Ensure no short-circuiting wierdness is going on
2611 self.failIf(self.result is self.expected)
2612
2613 self.parts and self.parts.sort()
2614 self.server_received_parts and self.server_received_parts.sort()
2615
2616 if self.uid:
2617 for (k, v) in self.expected.items():
2618 v['UID'] = str(k)
2619
2620 self.assertEquals(self.result, self.expected)
2621 self.assertEquals(self.uid, self.server_received_uid)
2622 self.assertEquals(self.parts, self.server_received_parts)
2623 self.assertEquals(imap4.parseIdList(self.messages),
2624 imap4.parseIdList(self.server_received_messages))
2625
2626 d = loopback.loopbackTCP(self.server, self.client, noisy=False)
2627 d.addCallback(check)
2628 return d
2629
2630 class FakeMailbox:
2631 def __init__(self):
2632 self.args = []
2633 def addMessage(self, body, flags, date):
2634 self.args.append((body, flags, date))
2635 return defer.succeed(None)
2636
2637 class FeaturefulMessage:
2638 implements(imap4.IMessageFile)
2639
2640 def getFlags(self):
2641 return 'flags'
2642
2643 def getInternalDate(self):
2644 return 'internaldate'
2645
2646 def open(self):
2647 return StringIO("open")
2648
2649 class MessageCopierMailbox:
2650 implements(imap4.IMessageCopier)
2651
2652 def __init__(self):
2653 self.msgs = []
2654
2655 def copy(self, msg):
2656 self.msgs.append(msg)
2657 return len(self.msgs)
2658
2659 class CopyWorkerTestCase(unittest.TestCase):
2660 def testFeaturefulMessage(self):
2661 s = imap4.IMAP4Server()
2662
2663 # Yes. I am grabbing this uber-non-public method to test it.
2664 # It is complex. It needs to be tested directly!
2665 # Perhaps it should be refactored, simplified, or split up into
2666 # not-so-private components, but that is a task for another day.
2667
2668 # Ha ha! Addendum! Soon it will be split up, and this test will
2669 # be re-written to just use the default adapter for IMailbox to
2670 # IMessageCopier and call .copy on that adapter.
2671 f = s._IMAP4Server__cbCopy
2672
2673 m = FakeMailbox()
2674 d = f([(i, FeaturefulMessage()) for i in range(1, 11)], 'tag', m)
2675
2676 def cbCopy(results):
2677 for a in m.args:
2678 self.assertEquals(a[0].read(), "open")
2679 self.assertEquals(a[1], "flags")
2680 self.assertEquals(a[2], "internaldate")
2681
2682 for (status, result) in results:
2683 self.failUnless(status)
2684 self.assertEquals(result, None)
2685
2686 return d.addCallback(cbCopy)
2687
2688
2689 def testUnfeaturefulMessage(self):
2690 s = imap4.IMAP4Server()
2691
2692 # See above comment
2693 f = s._IMAP4Server__cbCopy
2694
2695 m = FakeMailbox()
2696 msgs = [FakeyMessage({'Header-Counter': str(i)}, (), 'Date', 'Body %d' % (i,), i + 10, None) for i in range(1, 11)]
2697 d = f([im for im in zip(range(1, 11), msgs)], 'tag', m)
2698
2699 def cbCopy(results):
2700 seen = []
2701 for a in m.args:
2702 seen.append(a[0].read())
2703 self.assertEquals(a[1], ())
2704 self.assertEquals(a[2], "Date")
2705
2706 seen.sort()
2707 exp = ["Header-Counter: %d\r\n\r\nBody %d" % (i, i) for i in range(1 , 11)]
2708 exp.sort()
2709 self.assertEquals(seen, exp)
2710
2711 for (status, result) in results:
2712 self.failUnless(status)
2713 self.assertEquals(result, None)
2714
2715 return d.addCallback(cbCopy)
2716
2717 def testMessageCopier(self):
2718 s = imap4.IMAP4Server()
2719
2720 # See above comment
2721 f = s._IMAP4Server__cbCopy
2722
2723 m = MessageCopierMailbox()
2724 msgs = [object() for i in range(1, 11)]
2725 d = f([im for im in zip(range(1, 11), msgs)], 'tag', m)
2726
2727 def cbCopy(results):
2728 self.assertEquals(results, zip([1] * 10, range(1, 11)))
2729 for (orig, new) in zip(msgs, m.msgs):
2730 self.assertIdentical(orig, new)
2731
2732 return d.addCallback(cbCopy)
2733
2734
2735 class TLSTestCase(IMAP4HelperMixin, unittest.TestCase):
2736 serverCTX = ServerTLSContext and ServerTLSContext()
2737 clientCTX = ClientTLSContext and ClientTLSContext()
2738
2739 def loopback(self):
2740 return loopback.loopbackTCP(self.server, self.client, noisy=False)
2741
2742 def testAPileOfThings(self):
2743 SimpleServer.theAccount.addMailbox('inbox')
2744 called = []
2745 def login():
2746 called.append(None)
2747 return self.client.login('testuser', 'password-test')
2748 def list():
2749 called.append(None)
2750 return self.client.list('inbox', '%')
2751 def status():
2752 called.append(None)
2753 return self.client.status('inbox', 'UIDNEXT')
2754 def examine():
2755 called.append(None)
2756 return self.client.examine('inbox')
2757 def logout():
2758 called.append(None)
2759 return self.client.logout()
2760
2761 self.client.requireTransportSecurity = True
2762
2763 methods = [login, list, status, examine, logout]
2764 map(self.connected.addCallback, map(strip, methods))
2765 self.connected.addCallbacks(self._cbStopClient, self._ebGeneral)
2766 def check(ignored):
2767 self.assertEquals(self.server.startedTLS, True)
2768 self.assertEquals(self.client.startedTLS, True)
2769 self.assertEquals(len(called), len(methods))
2770 d = self.loopback()
2771 d.addCallback(check)
2772 return d
2773
2774 def testLoginLogin(self):
2775 self.server.checker.addUser('testuser', 'password-test')
2776 success = []
2777 self.client.registerAuthenticator(imap4.LOGINAuthenticator('testuser'))
2778 self.connected.addCallback(
2779 lambda _: self.client.authenticate('password-test')
2780 ).addCallback(
2781 lambda _: self.client.logout()
2782 ).addCallback(success.append
2783 ).addCallback(self._cbStopClient
2784 ).addErrback(self._ebGeneral)
2785
2786 d = self.loopback()
2787 d.addCallback(lambda x : self.assertEquals(len(success), 1))
2788 return d
2789
2790 def testStartTLS(self):
2791 success = []
2792 self.connected.addCallback(lambda _: self.client.startTLS())
2793 self.connected.addCallback(lambda _: self.assertNotEquals(-1, self.clien t.transport.__class__.__name__.find('TLS')))
2794 self.connected.addCallback(self._cbStopClient)
2795 self.connected.addCallback(success.append)
2796 self.connected.addErrback(self._ebGeneral)
2797
2798 d = self.loopback()
2799 d.addCallback(lambda x : self.failUnless(success))
2800 return d
2801
2802 def testFailedStartTLS(self):
2803 failure = []
2804 def breakServerTLS(ign):
2805 self.server.canStartTLS = False
2806
2807 self.connected.addCallback(breakServerTLS)
2808 self.connected.addCallback(lambda ign: self.client.startTLS())
2809 self.connected.addErrback(lambda err: failure.append(err.trap(imap4.IMAP 4Exception)))
2810 self.connected.addCallback(self._cbStopClient)
2811 self.connected.addErrback(self._ebGeneral)
2812
2813 def check(ignored):
2814 self.failUnless(failure)
2815 self.assertIdentical(failure[0], imap4.IMAP4Exception)
2816 return self.loopback().addCallback(check)
2817
2818
2819
2820 class SlowMailbox(SimpleMailbox):
2821 howSlow = 2
2822 callLater = None
2823 fetchDeferred = None
2824
2825 # Not a very nice implementation of fetch(), but it'll
2826 # do for the purposes of testing.
2827 def fetch(self, messages, uid):
2828 d = defer.Deferred()
2829 self.callLater(self.howSlow, d.callback, ())
2830 self.fetchDeferred.callback(None)
2831 return d
2832
2833 class Timeout(IMAP4HelperMixin, unittest.TestCase):
2834
2835 def test_serverTimeout(self):
2836 """
2837 The *client* has a timeout mechanism which will close connections that
2838 are inactive for a period.
2839 """
2840 c = Clock()
2841 self.server.timeoutTest = True
2842 self.client.timeout = 5 #seconds
2843 self.client.callLater = c.callLater
2844 self.selectedArgs = None
2845
2846 def login():
2847 d = self.client.login('testuser', 'password-test')
2848 c.advance(5)
2849 d.addErrback(timedOut)
2850 return d
2851
2852 def timedOut(failure):
2853 self._cbStopClient(None)
2854 failure.trap(error.TimeoutError)
2855
2856 d = self.connected.addCallback(strip(login))
2857 d.addErrback(self._ebGeneral)
2858 return defer.gatherResults([d, self.loopback()])
2859
2860
2861 def test_longFetchDoesntTimeout(self):
2862 """
2863 The connection timeout does not take effect during fetches.
2864 """
2865 c = Clock()
2866 SlowMailbox.callLater = c.callLater
2867 SlowMailbox.fetchDeferred = defer.Deferred()
2868 self.server.callLater = c.callLater
2869 SimpleServer.theAccount.mailboxFactory = SlowMailbox
2870 SimpleServer.theAccount.addMailbox('mailbox-test')
2871
2872 self.server.setTimeout(1)
2873
2874 def login():
2875 return self.client.login('testuser', 'password-test')
2876 def select():
2877 self.server.setTimeout(1)
2878 return self.client.select('mailbox-test')
2879 def fetch():
2880 return self.client.fetchUID('1:*')
2881 def stillConnected():
2882 self.assertNotEquals(self.server.state, 'timeout')
2883
2884 def cbAdvance(ignored):
2885 for i in xrange(4):
2886 c.advance(.5)
2887
2888 SlowMailbox.fetchDeferred.addCallback(cbAdvance)
2889
2890 d1 = self.connected.addCallback(strip(login))
2891 d1.addCallback(strip(select))
2892 d1.addCallback(strip(fetch))
2893 d1.addCallback(strip(stillConnected))
2894 d1.addCallback(self._cbStopClient)
2895 d1.addErrback(self._ebGeneral)
2896 d = defer.gatherResults([d1, self.loopback()])
2897 return d
2898
2899
2900 def test_idleClientDoesDisconnect(self):
2901 """
2902 The *server* has a timeout mechanism which will close connections that
2903 are inactive for a period.
2904 """
2905 c = Clock()
2906 # Hook up our server protocol
2907 transport = StringTransportWithDisconnection()
2908 transport.protocol = self.server
2909 self.server.callLater = c.callLater
2910 self.server.makeConnection(transport)
2911
2912 # Make sure we can notice when the connection goes away
2913 lost = []
2914 connLost = self.server.connectionLost
2915 self.server.connectionLost = lambda reason: (lost.append(None), connLost (reason))[1]
2916
2917 # 2/3rds of the idle timeout elapses...
2918 c.pump([0.0] + [self.server.timeOut / 3.0] * 2)
2919 self.failIf(lost, lost)
2920
2921 # Now some more
2922 c.pump([0.0, self.server.timeOut / 2.0])
2923 self.failUnless(lost)
2924
2925
2926
2927 class Disconnection(unittest.TestCase):
2928 def testClientDisconnectFailsDeferreds(self):
2929 c = imap4.IMAP4Client()
2930 t = StringTransportWithDisconnection()
2931 c.makeConnection(t)
2932 d = self.assertFailure(c.login('testuser', 'example.com'), error.Connect ionDone)
2933 c.connectionLost(error.ConnectionDone("Connection closed"))
2934 return d
2935
2936
2937
2938 class SynchronousMailbox(object):
2939 """
2940 Trivial, in-memory mailbox implementation which can produce a message
2941 synchronously.
2942 """
2943 def __init__(self, messages):
2944 self.messages = messages
2945
2946
2947 def fetch(self, msgset, uid):
2948 assert not uid, "Cannot handle uid requests."
2949 for msg in msgset:
2950 yield msg, self.messages[msg - 1]
2951
2952
2953
2954 class StringTransportConsumer(StringTransport):
2955 producer = None
2956 streaming = None
2957
2958 def registerProducer(self, producer, streaming):
2959 self.producer = producer
2960 self.streaming = streaming
2961
2962
2963
2964 class Pipelining(unittest.TestCase):
2965 """
2966 Tests for various aspects of the IMAP4 server's pipelining support.
2967 """
2968 messages = [
2969 FakeyMessage({}, [], '', '0', None, None),
2970 FakeyMessage({}, [], '', '1', None, None),
2971 FakeyMessage({}, [], '', '2', None, None),
2972 ]
2973
2974 def setUp(self):
2975 self.iterators = []
2976
2977 self.transport = StringTransportConsumer()
2978 self.server = imap4.IMAP4Server(None, None, self.iterateInReactor)
2979 self.server.makeConnection(self.transport)
2980
2981
2982 def iterateInReactor(self, iterator):
2983 d = defer.Deferred()
2984 self.iterators.append((iterator, d))
2985 return d
2986
2987
2988 def tearDown(self):
2989 self.server.connectionLost(failure.Failure(error.ConnectionDone()))
2990
2991
2992 def test_synchronousFetch(self):
2993 """
2994 Test that pipelined FETCH commands which can be responded to
2995 synchronously are responded to correctly.
2996 """
2997 mailbox = SynchronousMailbox(self.messages)
2998
2999 # Skip over authentication and folder selection
3000 self.server.state = 'select'
3001 self.server.mbox = mailbox
3002
3003 # Get rid of any greeting junk
3004 self.transport.clear()
3005
3006 # Here's some pipelined stuff
3007 self.server.dataReceived(
3008 '01 FETCH 1 BODY[]\r\n'
3009 '02 FETCH 2 BODY[]\r\n'
3010 '03 FETCH 3 BODY[]\r\n')
3011
3012 # Flush anything the server has scheduled to run
3013 while self.iterators:
3014 for e in self.iterators[0][0]:
3015 break
3016 else:
3017 self.iterators.pop(0)[1].callback(None)
3018
3019 # The bodies are empty because we aren't simulating a transport
3020 # exactly correctly (we have StringTransportConsumer but we never
3021 # call resumeProducing on its producer). It doesn't matter: just
3022 # make sure the surrounding structure is okay, and that no
3023 # exceptions occurred.
3024 self.assertEquals(
3025 self.transport.value(),
3026 '* 1 FETCH (BODY[] )\r\n'
3027 '01 OK FETCH completed\r\n'
3028 '* 2 FETCH (BODY[] )\r\n'
3029 '02 OK FETCH completed\r\n'
3030 '* 3 FETCH (BODY[] )\r\n'
3031 '03 OK FETCH completed\r\n')
3032
3033
3034
3035 if ClientTLSContext is None:
3036 for case in (TLSTestCase,):
3037 case.skip = "OpenSSL not present"
3038 elif interfaces.IReactorSSL(reactor, None) is None:
3039 for case in (TLSTestCase,):
3040 case.skip = "Reactor doesn't support SSL"
OLDNEW
« no previous file with comments | « third_party/twisted_8_1/twisted/mail/test/test_bounce.py ('k') | third_party/twisted_8_1/twisted/mail/test/test_mail.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698