Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(90)

Side by Side Diff: third_party/twisted_8_1/twisted/mail/relaymanager.py

Issue 12261012: Remove third_party/twisted_8_1 (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/tools/build
Patch Set: Created 7 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 # -*- test-case-name: twisted.mail.test.test_mail -*-
2 # Copyright (c) 2001-2008 Twisted Matrix Laboratories.
3 # See LICENSE for details.
4
5 """
6 Infrastructure for relaying mail through smart host
7
8 Today, internet e-mail has stopped being Peer-to-peer for many problems,
9 spam (unsolicited bulk mail) among them. Instead, most nodes on the
10 internet send all e-mail to a single computer, usually the ISP's though
11 sometimes other schemes, such as SMTP-after-POP, are used. This computer
12 is supposedly permanently up and traceable, and will do the work of
13 figuring out MXs and connecting to them. This kind of configuration
14 is usually termed "smart host", since the host we are connecting to
15 is "smart" (and will find MXs and connect to them) rather then just
16 accepting mail for a small set of domains.
17
18 The classes here are meant to facilitate support for such a configuration
19 for the twisted.mail SMTP server
20 """
21
22 import rfc822
23 import os
24 import time
25
26 try:
27 import cPickle as pickle
28 except ImportError:
29 import pickle
30
31 from twisted.python import log
32 from twisted.python.failure import Failure
33 from twisted.python.compat import set
34 from twisted.mail import relay
35 from twisted.mail import bounce
36 from twisted.internet import protocol
37 from twisted.internet.defer import Deferred, DeferredList
38 from twisted.internet.error import DNSLookupError
39 from twisted.mail import smtp
40 from twisted.application import internet
41
42 class ManagedRelayerMixin:
43 """SMTP Relayer which notifies a manager
44
45 Notify the manager about successful mail, failed mail
46 and broken connections
47 """
48
49 def __init__(self, manager):
50 self.manager = manager
51
52 def sentMail(self, code, resp, numOk, addresses, log):
53 """called when e-mail has been sent
54
55 we will always get 0 or 1 addresses.
56 """
57 message = self.names[0]
58 if code in smtp.SUCCESS:
59 self.manager.notifySuccess(self.factory, message)
60 else:
61 self.manager.notifyFailure(self.factory, message)
62 del self.messages[0]
63 del self.names[0]
64
65 def connectionLost(self, reason):
66 """called when connection is broken
67
68 notify manager we will try to send no more e-mail
69 """
70 self.manager.notifyDone(self.factory)
71
72 class SMTPManagedRelayer(ManagedRelayerMixin, relay.SMTPRelayer):
73 def __init__(self, messages, manager, *args, **kw):
74 """
75 @type messages: C{list} of C{str}
76 @param messages: Filenames of messages to relay
77
78 manager should support .notifySuccess, .notifyFailure
79 and .notifyDone
80 """
81 ManagedRelayerMixin.__init__(self, manager)
82 relay.SMTPRelayer.__init__(self, messages, *args, **kw)
83
84 class ESMTPManagedRelayer(ManagedRelayerMixin, relay.ESMTPRelayer):
85 def __init__(self, messages, manager, *args, **kw):
86 """
87 @type messages: C{list} of C{str}
88 @param messages: Filenames of messages to relay
89
90 manager should support .notifySuccess, .notifyFailure
91 and .notifyDone
92 """
93 ManagedRelayerMixin.__init__(self, manager)
94 relay.ESMTPRelayer.__init__(self, messages, *args, **kw)
95
96 class SMTPManagedRelayerFactory(protocol.ClientFactory):
97 protocol = SMTPManagedRelayer
98
99 def __init__(self, messages, manager, *args, **kw):
100 self.messages = messages
101 self.manager = manager
102 self.pArgs = args
103 self.pKwArgs = kw
104
105 def buildProtocol(self, addr):
106 protocol = self.protocol(self.messages, self.manager, *self.pArgs,
107 **self.pKwArgs)
108 protocol.factory = self
109 return protocol
110
111 def clientConnectionFailed(self, connector, reason):
112 """called when connection could not be made
113
114 our manager should be notified that this happened,
115 it might prefer some other host in that case"""
116 self.manager.notifyNoConnection(self)
117 self.manager.notifyDone(self)
118
119 class ESMTPManagedRelayerFactory(SMTPManagedRelayerFactory):
120 protocol = ESMTPManagedRelayer
121
122 def __init__(self, messages, manager, secret, contextFactory, *args, **kw):
123 self.secret = secret
124 self.contextFactory = contextFactory
125 SMTPManagedRelayerFactory.__init__(self, messages, manager, *args, **kw)
126
127 def buildProtocol(self, addr):
128 s = self.secret and self.secret(addr)
129 protocol = self.protocol(self.messages, self.manager, s,
130 self.contextFactory, *self.pArgs, **self.pKwArgs)
131 protocol.factory = self
132 return protocol
133
134 class Queue:
135 """A queue of ougoing emails."""
136
137 noisy = True
138
139 def __init__(self, directory):
140 self.directory = directory
141 self._init()
142
143 def _init(self):
144 self.n = 0
145 self.waiting = {}
146 self.relayed = {}
147 self.readDirectory()
148
149 def __getstate__(self):
150 """(internal) delete volatile state"""
151 return {'directory' : self.directory}
152
153 def __setstate__(self, state):
154 """(internal) restore volatile state"""
155 self.__dict__.update(state)
156 self._init()
157
158 def readDirectory(self):
159 """Read the messages directory.
160
161 look for new messages.
162 """
163 for message in os.listdir(self.directory):
164 # Skip non data files
165 if message[-2:]!='-D':
166 continue
167 self.addMessage(message[:-2])
168
169 def getWaiting(self):
170 return self.waiting.keys()
171
172 def hasWaiting(self):
173 return len(self.waiting) > 0
174
175 def getRelayed(self):
176 return self.relayed.keys()
177
178 def setRelaying(self, message):
179 del self.waiting[message]
180 self.relayed[message] = 1
181
182 def setWaiting(self, message):
183 del self.relayed[message]
184 self.waiting[message] = 1
185
186 def addMessage(self, message):
187 if message not in self.relayed:
188 self.waiting[message] = 1
189 if self.noisy:
190 log.msg('Set ' + message + ' waiting')
191
192 def done(self, message):
193 """Remove message to from queue."""
194 message = os.path.basename(message)
195 os.remove(self.getPath(message) + '-D')
196 os.remove(self.getPath(message) + '-H')
197 del self.relayed[message]
198
199 def getPath(self, message):
200 """Get the path in the filesystem of a message."""
201 return os.path.join(self.directory, message)
202
203 def getEnvelope(self, message):
204 return pickle.load(self.getEnvelopeFile(message))
205
206 def getEnvelopeFile(self, message):
207 return open(os.path.join(self.directory, message+'-H'), 'rb')
208
209 def createNewMessage(self):
210 """Create a new message in the queue.
211
212 Return a tuple - file-like object for headers, and ISMTPMessage.
213 """
214 fname = "%s_%s_%s_%s" % (os.getpid(), time.time(), self.n, id(self))
215 self.n = self.n + 1
216 headerFile = open(os.path.join(self.directory, fname+'-H'), 'wb')
217 tempFilename = os.path.join(self.directory, fname+'-C')
218 finalFilename = os.path.join(self.directory, fname+'-D')
219 messageFile = open(tempFilename, 'wb')
220
221 from twisted.mail.mail import FileMessage
222 return headerFile,FileMessage(messageFile, tempFilename, finalFilename)
223
224
225 class _AttemptManager(object):
226 """
227 Manage the state of a single attempt to flush the relay queue.
228 """
229 def __init__(self, manager):
230 self.manager = manager
231 self._completionDeferreds = []
232
233
234 def getCompletionDeferred(self):
235 self._completionDeferreds.append(Deferred())
236 return self._completionDeferreds[-1]
237
238
239 def _finish(self, relay, message):
240 self.manager.managed[relay].remove(os.path.basename(message))
241 self.manager.queue.done(message)
242
243
244 def notifySuccess(self, relay, message):
245 """a relay sent a message successfully
246
247 Mark it as sent in our lists
248 """
249 if self.manager.queue.noisy:
250 log.msg("success sending %s, removing from queue" % message)
251 self._finish(relay, message)
252
253
254 def notifyFailure(self, relay, message):
255 """Relaying the message has failed."""
256 if self.manager.queue.noisy:
257 log.msg("could not relay "+message)
258 # Moshe - Bounce E-mail here
259 # Be careful: if it's a bounced bounce, silently
260 # discard it
261 message = os.path.basename(message)
262 fp = self.manager.queue.getEnvelopeFile(message)
263 from_, to = pickle.load(fp)
264 fp.close()
265 from_, to, bounceMessage = bounce.generateBounce(open(self.manager.queue .getPath(message)+'-D'), from_, to)
266 fp, outgoingMessage = self.manager.queue.createNewMessage()
267 pickle.dump([from_, to], fp)
268 fp.close()
269 for line in bounceMessage.splitlines():
270 outgoingMessage.lineReceived(line)
271 outgoingMessage.eomReceived()
272 self._finish(relay, self.manager.queue.getPath(message))
273
274
275 def notifyDone(self, relay):
276 """A relaying SMTP client is disconnected.
277
278 unmark all pending messages under this relay's responsibility
279 as being relayed, and remove the relay.
280 """
281 for message in self.manager.managed.get(relay, ()):
282 if self.manager.queue.noisy:
283 log.msg("Setting " + message + " waiting")
284 self.manager.queue.setWaiting(message)
285 try:
286 del self.manager.managed[relay]
287 except KeyError:
288 pass
289 notifications = self._completionDeferreds
290 self._completionDeferreds = None
291 for d in notifications:
292 d.callback(None)
293
294
295 def notifyNoConnection(self, relay):
296 """Relaying SMTP client couldn't connect.
297
298 Useful because it tells us our upstream server is unavailable.
299 """
300 # Back off a bit
301 try:
302 msgs = self.manager.managed[relay]
303 except KeyError:
304 log.msg("notifyNoConnection passed unknown relay!")
305 return
306
307 if self.manager.queue.noisy:
308 log.msg("Backing off on delivery of " + str(msgs))
309 def setWaiting(queue, messages):
310 map(queue.setWaiting, messages)
311 from twisted.internet import reactor
312 reactor.callLater(30, setWaiting, self.manager.queue, msgs)
313 del self.manager.managed[relay]
314
315
316
317 class SmartHostSMTPRelayingManager:
318 """Manage SMTP Relayers
319
320 Manage SMTP relayers, keeping track of the existing connections,
321 each connection's responsibility in term of messages. Create
322 more relayers if the need arises.
323
324 Someone should press .checkState periodically
325
326 @ivar fArgs: Additional positional arguments used to instantiate
327 C{factory}.
328
329 @ivar fKwArgs: Additional keyword arguments used to instantiate
330 C{factory}.
331
332 @ivar factory: A callable which returns a ClientFactory suitable for
333 making SMTP connections.
334 """
335
336 factory = SMTPManagedRelayerFactory
337
338 PORT = 25
339
340 mxcalc = None
341
342 def __init__(self, queue, maxConnections=2, maxMessagesPerConnection=10):
343 """
344 @type queue: Any implementor of C{IQueue}
345 @param queue: The object used to queue messages on their way to
346 delivery.
347
348 @type maxConnections: C{int}
349 @param maxConnections: The maximum number of SMTP connections to
350 allow to be opened at any given time.
351
352 @type maxMessagesPerConnection: C{int}
353 @param maxMessagesPerConnection: The maximum number of messages a
354 relayer will be given responsibility for.
355
356 Default values are meant for a small box with 1-5 users.
357 """
358 self.maxConnections = maxConnections
359 self.maxMessagesPerConnection = maxMessagesPerConnection
360 self.managed = {} # SMTP clients we're managing
361 self.queue = queue
362 self.fArgs = ()
363 self.fKwArgs = {}
364
365 def __getstate__(self):
366 """(internal) delete volatile state"""
367 dct = self.__dict__.copy()
368 del dct['managed']
369 return dct
370
371 def __setstate__(self, state):
372 """(internal) restore volatile state"""
373 self.__dict__.update(state)
374 self.managed = {}
375
376 def checkState(self):
377 """
378 Synchronize with the state of the world, and maybe launch a new
379 relay.
380
381 Call me periodically to check I am still up to date.
382
383 @return: None or a Deferred which fires when all of the SMTP clients
384 started by this call have disconnected.
385 """
386 self.queue.readDirectory()
387 if (len(self.managed) >= self.maxConnections):
388 return
389 if not self.queue.hasWaiting():
390 return
391
392 return self._checkStateMX()
393
394 def _checkStateMX(self):
395 nextMessages = self.queue.getWaiting()
396 nextMessages.reverse()
397
398 exchanges = {}
399 for msg in nextMessages:
400 from_, to = self.queue.getEnvelope(msg)
401 name, addr = rfc822.parseaddr(to)
402 parts = addr.split('@', 1)
403 if len(parts) != 2:
404 log.err("Illegal message destination: " + to)
405 continue
406 domain = parts[1]
407
408 self.queue.setRelaying(msg)
409 exchanges.setdefault(domain, []).append(self.queue.getPath(msg))
410 if len(exchanges) >= (self.maxConnections - len(self.managed)):
411 break
412
413 if self.mxcalc is None:
414 self.mxcalc = MXCalculator()
415
416 relays = []
417 for (domain, msgs) in exchanges.iteritems():
418 manager = _AttemptManager(self)
419 factory = self.factory(msgs, manager, *self.fArgs, **self.fKwArgs)
420 self.managed[factory] = map(os.path.basename, msgs)
421 relayAttemptDeferred = manager.getCompletionDeferred()
422 connectSetupDeferred = self.mxcalc.getMX(domain)
423 connectSetupDeferred.addCallback(lambda mx: str(mx.name))
424 connectSetupDeferred.addCallback(self._cbExchange, self.PORT, factor y)
425 connectSetupDeferred.addErrback(lambda err: (relayAttemptDeferred.er rback(err), err)[1])
426 connectSetupDeferred.addErrback(self._ebExchange, factory, domain)
427 relays.append(relayAttemptDeferred)
428 return DeferredList(relays)
429
430
431 def _cbExchange(self, address, port, factory):
432 from twisted.internet import reactor
433 reactor.connectTCP(address, port, factory)
434
435 def _ebExchange(self, failure, factory, domain):
436 log.err('Error setting up managed relay factory for ' + domain)
437 log.err(failure)
438 def setWaiting(queue, messages):
439 map(queue.setWaiting, messages)
440 from twisted.internet import reactor
441 reactor.callLater(30, setWaiting, self.queue, self.managed[factory])
442 del self.managed[factory]
443
444 class SmartHostESMTPRelayingManager(SmartHostSMTPRelayingManager):
445 factory = ESMTPManagedRelayerFactory
446
447 def _checkState(manager):
448 manager.checkState()
449
450 def RelayStateHelper(manager, delay):
451 return internet.TimerService(delay, _checkState, manager)
452
453
454
455 class CanonicalNameLoop(Exception):
456 """
457 When trying to look up the MX record for a host, a set of CNAME records was
458 found which form a cycle and resolution was abandoned.
459 """
460
461
462 class CanonicalNameChainTooLong(Exception):
463 """
464 When trying to look up the MX record for a host, too many CNAME records
465 which point to other CNAME records were encountered and resolution was
466 abandoned.
467 """
468
469
470 class MXCalculator:
471 """
472 A utility for looking up mail exchange hosts and tracking whether they are
473 working or not.
474
475 @ivar clock: L{IReactorTime} provider which will be used to decide when to
476 retry mail exchanges which have not been working.
477 """
478 timeOutBadMX = 60 * 60 # One hour
479 fallbackToDomain = True
480
481 def __init__(self, resolver=None, clock=None):
482 self.badMXs = {}
483 if resolver is None:
484 from twisted.names.client import createResolver
485 resolver = createResolver()
486 self.resolver = resolver
487 if clock is None:
488 from twisted.internet import reactor as clock
489 self.clock = clock
490
491
492 def markBad(self, mx):
493 """Indicate a given mx host is not currently functioning.
494
495 @type mx: C{str}
496 @param mx: The hostname of the host which is down.
497 """
498 self.badMXs[str(mx)] = self.clock.seconds() + self.timeOutBadMX
499
500 def markGood(self, mx):
501 """Indicate a given mx host is back online.
502
503 @type mx: C{str}
504 @param mx: The hostname of the host which is up.
505 """
506 try:
507 del self.badMXs[mx]
508 except KeyError:
509 pass
510
511 def getMX(self, domain, maximumCanonicalChainLength=3):
512 """
513 Find an MX record for the given domain.
514
515 @type domain: C{str}
516 @param domain: The domain name for which to look up an MX record.
517
518 @type maximumCanonicalChainLength: C{int}
519 @param maximumCanonicalChainLength: The maximum number of unique CNAME
520 records to follow while looking up the MX record.
521
522 @return: A L{Deferred} which is called back with a string giving the
523 name in the found MX record or which is errbacked if no MX record
524 can be found.
525 """
526 mailExchangeDeferred = self.resolver.lookupMailExchange(domain)
527 mailExchangeDeferred.addCallback(self._filterRecords)
528 mailExchangeDeferred.addCallback(
529 self._cbMX, domain, maximumCanonicalChainLength)
530 mailExchangeDeferred.addErrback(self._ebMX, domain)
531 return mailExchangeDeferred
532
533
534 def _filterRecords(self, records):
535 """
536 Convert a DNS response (a three-tuple of lists of RRHeaders) into a
537 mapping from record names to lists of corresponding record payloads.
538 """
539 recordBag = {}
540 for answer in records[0]:
541 recordBag.setdefault(str(answer.name), []).append(answer.payload)
542 return recordBag
543
544
545 def _cbMX(self, answers, domain, cnamesLeft):
546 """
547 Try to find the MX host from the given DNS information.
548
549 This will attempt to resolve CNAME results. It can recognize loops
550 and will give up on non-cyclic chains after a specified number of
551 lookups.
552 """
553 # Do this import here so that relaymanager.py doesn't depend on
554 # twisted.names, only MXCalculator will.
555 from twisted.names import dns, error
556
557 seenAliases = set()
558 exchanges = []
559 # Examine the answers for the domain we asked about
560 pertinentRecords = answers.get(domain, [])
561 while pertinentRecords:
562 record = pertinentRecords.pop()
563
564 # If it's a CNAME, we'll need to do some more processing
565 if record.TYPE == dns.CNAME:
566
567 # Remember that this name was an alias.
568 seenAliases.add(domain)
569
570 canonicalName = str(record.name)
571 # See if we have some local records which might be relevant.
572 if canonicalName in answers:
573
574 # Make sure it isn't a loop contained entirely within the
575 # results we have here.
576 if canonicalName in seenAliases:
577 return Failure(CanonicalNameLoop(record))
578
579 pertinentRecords = answers[canonicalName]
580 exchanges = []
581 else:
582 if cnamesLeft:
583 # Request more information from the server.
584 return self.getMX(canonicalName, cnamesLeft - 1)
585 else:
586 # Give up.
587 return Failure(CanonicalNameChainTooLong(record))
588
589 # If it's an MX, collect it.
590 if record.TYPE == dns.MX:
591 exchanges.append((record.preference, record))
592
593 if exchanges:
594 exchanges.sort()
595 for (preference, record) in exchanges:
596 host = str(record.name)
597 if host not in self.badMXs:
598 return record
599 t = self.clock.seconds() - self.badMXs[host]
600 if t >= 0:
601 del self.badMXs[host]
602 return record
603 return exchanges[0][1]
604 else:
605 # Treat no answers the same as an error - jump to the errback to try
606 # to look up an A record. This provides behavior described as a
607 # special case in RFC 974 in the section headed I{Interpreting the
608 # List of MX RRs}.
609 return Failure(
610 error.DNSNameError("No MX records for %r" % (domain,)))
611
612
613 def _ebMX(self, failure, domain):
614 from twisted.names import error, dns
615
616 if self.fallbackToDomain:
617 failure.trap(error.DNSNameError)
618 log.msg("MX lookup failed; attempting to use hostname (%s) directly" % (domain,))
619
620 # Alright, I admit, this is a bit icky.
621 d = self.resolver.getHostByName(domain)
622 def cbResolved(addr):
623 return dns.Record_MX(name=addr)
624 def ebResolved(err):
625 err.trap(error.DNSNameError)
626 raise DNSLookupError()
627 d.addCallbacks(cbResolved, ebResolved)
628 return d
629 elif failure.check(error.DNSNameError):
630 raise IOError("No MX found for %r" % (domain,))
631 return failure
OLDNEW
« no previous file with comments | « third_party/twisted_8_1/twisted/mail/relay.py ('k') | third_party/twisted_8_1/twisted/mail/scripts/__init__.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698