OLD | NEW |
| (Empty) |
1 # -*- test-case-name: twisted.mail.test.test_mail -*- | |
2 # Copyright (c) 2001-2008 Twisted Matrix Laboratories. | |
3 # See LICENSE for details. | |
4 | |
5 """ | |
6 Infrastructure for relaying mail through smart host | |
7 | |
8 Today, internet e-mail has stopped being Peer-to-peer for many problems, | |
9 spam (unsolicited bulk mail) among them. Instead, most nodes on the | |
10 internet send all e-mail to a single computer, usually the ISP's though | |
11 sometimes other schemes, such as SMTP-after-POP, are used. This computer | |
12 is supposedly permanently up and traceable, and will do the work of | |
13 figuring out MXs and connecting to them. This kind of configuration | |
14 is usually termed "smart host", since the host we are connecting to | |
15 is "smart" (and will find MXs and connect to them) rather then just | |
16 accepting mail for a small set of domains. | |
17 | |
18 The classes here are meant to facilitate support for such a configuration | |
19 for the twisted.mail SMTP server | |
20 """ | |
21 | |
22 import rfc822 | |
23 import os | |
24 import time | |
25 | |
26 try: | |
27 import cPickle as pickle | |
28 except ImportError: | |
29 import pickle | |
30 | |
31 from twisted.python import log | |
32 from twisted.python.failure import Failure | |
33 from twisted.python.compat import set | |
34 from twisted.mail import relay | |
35 from twisted.mail import bounce | |
36 from twisted.internet import protocol | |
37 from twisted.internet.defer import Deferred, DeferredList | |
38 from twisted.internet.error import DNSLookupError | |
39 from twisted.mail import smtp | |
40 from twisted.application import internet | |
41 | |
42 class ManagedRelayerMixin: | |
43 """SMTP Relayer which notifies a manager | |
44 | |
45 Notify the manager about successful mail, failed mail | |
46 and broken connections | |
47 """ | |
48 | |
49 def __init__(self, manager): | |
50 self.manager = manager | |
51 | |
52 def sentMail(self, code, resp, numOk, addresses, log): | |
53 """called when e-mail has been sent | |
54 | |
55 we will always get 0 or 1 addresses. | |
56 """ | |
57 message = self.names[0] | |
58 if code in smtp.SUCCESS: | |
59 self.manager.notifySuccess(self.factory, message) | |
60 else: | |
61 self.manager.notifyFailure(self.factory, message) | |
62 del self.messages[0] | |
63 del self.names[0] | |
64 | |
65 def connectionLost(self, reason): | |
66 """called when connection is broken | |
67 | |
68 notify manager we will try to send no more e-mail | |
69 """ | |
70 self.manager.notifyDone(self.factory) | |
71 | |
72 class SMTPManagedRelayer(ManagedRelayerMixin, relay.SMTPRelayer): | |
73 def __init__(self, messages, manager, *args, **kw): | |
74 """ | |
75 @type messages: C{list} of C{str} | |
76 @param messages: Filenames of messages to relay | |
77 | |
78 manager should support .notifySuccess, .notifyFailure | |
79 and .notifyDone | |
80 """ | |
81 ManagedRelayerMixin.__init__(self, manager) | |
82 relay.SMTPRelayer.__init__(self, messages, *args, **kw) | |
83 | |
84 class ESMTPManagedRelayer(ManagedRelayerMixin, relay.ESMTPRelayer): | |
85 def __init__(self, messages, manager, *args, **kw): | |
86 """ | |
87 @type messages: C{list} of C{str} | |
88 @param messages: Filenames of messages to relay | |
89 | |
90 manager should support .notifySuccess, .notifyFailure | |
91 and .notifyDone | |
92 """ | |
93 ManagedRelayerMixin.__init__(self, manager) | |
94 relay.ESMTPRelayer.__init__(self, messages, *args, **kw) | |
95 | |
96 class SMTPManagedRelayerFactory(protocol.ClientFactory): | |
97 protocol = SMTPManagedRelayer | |
98 | |
99 def __init__(self, messages, manager, *args, **kw): | |
100 self.messages = messages | |
101 self.manager = manager | |
102 self.pArgs = args | |
103 self.pKwArgs = kw | |
104 | |
105 def buildProtocol(self, addr): | |
106 protocol = self.protocol(self.messages, self.manager, *self.pArgs, | |
107 **self.pKwArgs) | |
108 protocol.factory = self | |
109 return protocol | |
110 | |
111 def clientConnectionFailed(self, connector, reason): | |
112 """called when connection could not be made | |
113 | |
114 our manager should be notified that this happened, | |
115 it might prefer some other host in that case""" | |
116 self.manager.notifyNoConnection(self) | |
117 self.manager.notifyDone(self) | |
118 | |
119 class ESMTPManagedRelayerFactory(SMTPManagedRelayerFactory): | |
120 protocol = ESMTPManagedRelayer | |
121 | |
122 def __init__(self, messages, manager, secret, contextFactory, *args, **kw): | |
123 self.secret = secret | |
124 self.contextFactory = contextFactory | |
125 SMTPManagedRelayerFactory.__init__(self, messages, manager, *args, **kw) | |
126 | |
127 def buildProtocol(self, addr): | |
128 s = self.secret and self.secret(addr) | |
129 protocol = self.protocol(self.messages, self.manager, s, | |
130 self.contextFactory, *self.pArgs, **self.pKwArgs) | |
131 protocol.factory = self | |
132 return protocol | |
133 | |
134 class Queue: | |
135 """A queue of ougoing emails.""" | |
136 | |
137 noisy = True | |
138 | |
139 def __init__(self, directory): | |
140 self.directory = directory | |
141 self._init() | |
142 | |
143 def _init(self): | |
144 self.n = 0 | |
145 self.waiting = {} | |
146 self.relayed = {} | |
147 self.readDirectory() | |
148 | |
149 def __getstate__(self): | |
150 """(internal) delete volatile state""" | |
151 return {'directory' : self.directory} | |
152 | |
153 def __setstate__(self, state): | |
154 """(internal) restore volatile state""" | |
155 self.__dict__.update(state) | |
156 self._init() | |
157 | |
158 def readDirectory(self): | |
159 """Read the messages directory. | |
160 | |
161 look for new messages. | |
162 """ | |
163 for message in os.listdir(self.directory): | |
164 # Skip non data files | |
165 if message[-2:]!='-D': | |
166 continue | |
167 self.addMessage(message[:-2]) | |
168 | |
169 def getWaiting(self): | |
170 return self.waiting.keys() | |
171 | |
172 def hasWaiting(self): | |
173 return len(self.waiting) > 0 | |
174 | |
175 def getRelayed(self): | |
176 return self.relayed.keys() | |
177 | |
178 def setRelaying(self, message): | |
179 del self.waiting[message] | |
180 self.relayed[message] = 1 | |
181 | |
182 def setWaiting(self, message): | |
183 del self.relayed[message] | |
184 self.waiting[message] = 1 | |
185 | |
186 def addMessage(self, message): | |
187 if message not in self.relayed: | |
188 self.waiting[message] = 1 | |
189 if self.noisy: | |
190 log.msg('Set ' + message + ' waiting') | |
191 | |
192 def done(self, message): | |
193 """Remove message to from queue.""" | |
194 message = os.path.basename(message) | |
195 os.remove(self.getPath(message) + '-D') | |
196 os.remove(self.getPath(message) + '-H') | |
197 del self.relayed[message] | |
198 | |
199 def getPath(self, message): | |
200 """Get the path in the filesystem of a message.""" | |
201 return os.path.join(self.directory, message) | |
202 | |
203 def getEnvelope(self, message): | |
204 return pickle.load(self.getEnvelopeFile(message)) | |
205 | |
206 def getEnvelopeFile(self, message): | |
207 return open(os.path.join(self.directory, message+'-H'), 'rb') | |
208 | |
209 def createNewMessage(self): | |
210 """Create a new message in the queue. | |
211 | |
212 Return a tuple - file-like object for headers, and ISMTPMessage. | |
213 """ | |
214 fname = "%s_%s_%s_%s" % (os.getpid(), time.time(), self.n, id(self)) | |
215 self.n = self.n + 1 | |
216 headerFile = open(os.path.join(self.directory, fname+'-H'), 'wb') | |
217 tempFilename = os.path.join(self.directory, fname+'-C') | |
218 finalFilename = os.path.join(self.directory, fname+'-D') | |
219 messageFile = open(tempFilename, 'wb') | |
220 | |
221 from twisted.mail.mail import FileMessage | |
222 return headerFile,FileMessage(messageFile, tempFilename, finalFilename) | |
223 | |
224 | |
225 class _AttemptManager(object): | |
226 """ | |
227 Manage the state of a single attempt to flush the relay queue. | |
228 """ | |
229 def __init__(self, manager): | |
230 self.manager = manager | |
231 self._completionDeferreds = [] | |
232 | |
233 | |
234 def getCompletionDeferred(self): | |
235 self._completionDeferreds.append(Deferred()) | |
236 return self._completionDeferreds[-1] | |
237 | |
238 | |
239 def _finish(self, relay, message): | |
240 self.manager.managed[relay].remove(os.path.basename(message)) | |
241 self.manager.queue.done(message) | |
242 | |
243 | |
244 def notifySuccess(self, relay, message): | |
245 """a relay sent a message successfully | |
246 | |
247 Mark it as sent in our lists | |
248 """ | |
249 if self.manager.queue.noisy: | |
250 log.msg("success sending %s, removing from queue" % message) | |
251 self._finish(relay, message) | |
252 | |
253 | |
254 def notifyFailure(self, relay, message): | |
255 """Relaying the message has failed.""" | |
256 if self.manager.queue.noisy: | |
257 log.msg("could not relay "+message) | |
258 # Moshe - Bounce E-mail here | |
259 # Be careful: if it's a bounced bounce, silently | |
260 # discard it | |
261 message = os.path.basename(message) | |
262 fp = self.manager.queue.getEnvelopeFile(message) | |
263 from_, to = pickle.load(fp) | |
264 fp.close() | |
265 from_, to, bounceMessage = bounce.generateBounce(open(self.manager.queue
.getPath(message)+'-D'), from_, to) | |
266 fp, outgoingMessage = self.manager.queue.createNewMessage() | |
267 pickle.dump([from_, to], fp) | |
268 fp.close() | |
269 for line in bounceMessage.splitlines(): | |
270 outgoingMessage.lineReceived(line) | |
271 outgoingMessage.eomReceived() | |
272 self._finish(relay, self.manager.queue.getPath(message)) | |
273 | |
274 | |
275 def notifyDone(self, relay): | |
276 """A relaying SMTP client is disconnected. | |
277 | |
278 unmark all pending messages under this relay's responsibility | |
279 as being relayed, and remove the relay. | |
280 """ | |
281 for message in self.manager.managed.get(relay, ()): | |
282 if self.manager.queue.noisy: | |
283 log.msg("Setting " + message + " waiting") | |
284 self.manager.queue.setWaiting(message) | |
285 try: | |
286 del self.manager.managed[relay] | |
287 except KeyError: | |
288 pass | |
289 notifications = self._completionDeferreds | |
290 self._completionDeferreds = None | |
291 for d in notifications: | |
292 d.callback(None) | |
293 | |
294 | |
295 def notifyNoConnection(self, relay): | |
296 """Relaying SMTP client couldn't connect. | |
297 | |
298 Useful because it tells us our upstream server is unavailable. | |
299 """ | |
300 # Back off a bit | |
301 try: | |
302 msgs = self.manager.managed[relay] | |
303 except KeyError: | |
304 log.msg("notifyNoConnection passed unknown relay!") | |
305 return | |
306 | |
307 if self.manager.queue.noisy: | |
308 log.msg("Backing off on delivery of " + str(msgs)) | |
309 def setWaiting(queue, messages): | |
310 map(queue.setWaiting, messages) | |
311 from twisted.internet import reactor | |
312 reactor.callLater(30, setWaiting, self.manager.queue, msgs) | |
313 del self.manager.managed[relay] | |
314 | |
315 | |
316 | |
317 class SmartHostSMTPRelayingManager: | |
318 """Manage SMTP Relayers | |
319 | |
320 Manage SMTP relayers, keeping track of the existing connections, | |
321 each connection's responsibility in term of messages. Create | |
322 more relayers if the need arises. | |
323 | |
324 Someone should press .checkState periodically | |
325 | |
326 @ivar fArgs: Additional positional arguments used to instantiate | |
327 C{factory}. | |
328 | |
329 @ivar fKwArgs: Additional keyword arguments used to instantiate | |
330 C{factory}. | |
331 | |
332 @ivar factory: A callable which returns a ClientFactory suitable for | |
333 making SMTP connections. | |
334 """ | |
335 | |
336 factory = SMTPManagedRelayerFactory | |
337 | |
338 PORT = 25 | |
339 | |
340 mxcalc = None | |
341 | |
342 def __init__(self, queue, maxConnections=2, maxMessagesPerConnection=10): | |
343 """ | |
344 @type queue: Any implementor of C{IQueue} | |
345 @param queue: The object used to queue messages on their way to | |
346 delivery. | |
347 | |
348 @type maxConnections: C{int} | |
349 @param maxConnections: The maximum number of SMTP connections to | |
350 allow to be opened at any given time. | |
351 | |
352 @type maxMessagesPerConnection: C{int} | |
353 @param maxMessagesPerConnection: The maximum number of messages a | |
354 relayer will be given responsibility for. | |
355 | |
356 Default values are meant for a small box with 1-5 users. | |
357 """ | |
358 self.maxConnections = maxConnections | |
359 self.maxMessagesPerConnection = maxMessagesPerConnection | |
360 self.managed = {} # SMTP clients we're managing | |
361 self.queue = queue | |
362 self.fArgs = () | |
363 self.fKwArgs = {} | |
364 | |
365 def __getstate__(self): | |
366 """(internal) delete volatile state""" | |
367 dct = self.__dict__.copy() | |
368 del dct['managed'] | |
369 return dct | |
370 | |
371 def __setstate__(self, state): | |
372 """(internal) restore volatile state""" | |
373 self.__dict__.update(state) | |
374 self.managed = {} | |
375 | |
376 def checkState(self): | |
377 """ | |
378 Synchronize with the state of the world, and maybe launch a new | |
379 relay. | |
380 | |
381 Call me periodically to check I am still up to date. | |
382 | |
383 @return: None or a Deferred which fires when all of the SMTP clients | |
384 started by this call have disconnected. | |
385 """ | |
386 self.queue.readDirectory() | |
387 if (len(self.managed) >= self.maxConnections): | |
388 return | |
389 if not self.queue.hasWaiting(): | |
390 return | |
391 | |
392 return self._checkStateMX() | |
393 | |
394 def _checkStateMX(self): | |
395 nextMessages = self.queue.getWaiting() | |
396 nextMessages.reverse() | |
397 | |
398 exchanges = {} | |
399 for msg in nextMessages: | |
400 from_, to = self.queue.getEnvelope(msg) | |
401 name, addr = rfc822.parseaddr(to) | |
402 parts = addr.split('@', 1) | |
403 if len(parts) != 2: | |
404 log.err("Illegal message destination: " + to) | |
405 continue | |
406 domain = parts[1] | |
407 | |
408 self.queue.setRelaying(msg) | |
409 exchanges.setdefault(domain, []).append(self.queue.getPath(msg)) | |
410 if len(exchanges) >= (self.maxConnections - len(self.managed)): | |
411 break | |
412 | |
413 if self.mxcalc is None: | |
414 self.mxcalc = MXCalculator() | |
415 | |
416 relays = [] | |
417 for (domain, msgs) in exchanges.iteritems(): | |
418 manager = _AttemptManager(self) | |
419 factory = self.factory(msgs, manager, *self.fArgs, **self.fKwArgs) | |
420 self.managed[factory] = map(os.path.basename, msgs) | |
421 relayAttemptDeferred = manager.getCompletionDeferred() | |
422 connectSetupDeferred = self.mxcalc.getMX(domain) | |
423 connectSetupDeferred.addCallback(lambda mx: str(mx.name)) | |
424 connectSetupDeferred.addCallback(self._cbExchange, self.PORT, factor
y) | |
425 connectSetupDeferred.addErrback(lambda err: (relayAttemptDeferred.er
rback(err), err)[1]) | |
426 connectSetupDeferred.addErrback(self._ebExchange, factory, domain) | |
427 relays.append(relayAttemptDeferred) | |
428 return DeferredList(relays) | |
429 | |
430 | |
431 def _cbExchange(self, address, port, factory): | |
432 from twisted.internet import reactor | |
433 reactor.connectTCP(address, port, factory) | |
434 | |
435 def _ebExchange(self, failure, factory, domain): | |
436 log.err('Error setting up managed relay factory for ' + domain) | |
437 log.err(failure) | |
438 def setWaiting(queue, messages): | |
439 map(queue.setWaiting, messages) | |
440 from twisted.internet import reactor | |
441 reactor.callLater(30, setWaiting, self.queue, self.managed[factory]) | |
442 del self.managed[factory] | |
443 | |
444 class SmartHostESMTPRelayingManager(SmartHostSMTPRelayingManager): | |
445 factory = ESMTPManagedRelayerFactory | |
446 | |
447 def _checkState(manager): | |
448 manager.checkState() | |
449 | |
450 def RelayStateHelper(manager, delay): | |
451 return internet.TimerService(delay, _checkState, manager) | |
452 | |
453 | |
454 | |
455 class CanonicalNameLoop(Exception): | |
456 """ | |
457 When trying to look up the MX record for a host, a set of CNAME records was | |
458 found which form a cycle and resolution was abandoned. | |
459 """ | |
460 | |
461 | |
462 class CanonicalNameChainTooLong(Exception): | |
463 """ | |
464 When trying to look up the MX record for a host, too many CNAME records | |
465 which point to other CNAME records were encountered and resolution was | |
466 abandoned. | |
467 """ | |
468 | |
469 | |
470 class MXCalculator: | |
471 """ | |
472 A utility for looking up mail exchange hosts and tracking whether they are | |
473 working or not. | |
474 | |
475 @ivar clock: L{IReactorTime} provider which will be used to decide when to | |
476 retry mail exchanges which have not been working. | |
477 """ | |
478 timeOutBadMX = 60 * 60 # One hour | |
479 fallbackToDomain = True | |
480 | |
481 def __init__(self, resolver=None, clock=None): | |
482 self.badMXs = {} | |
483 if resolver is None: | |
484 from twisted.names.client import createResolver | |
485 resolver = createResolver() | |
486 self.resolver = resolver | |
487 if clock is None: | |
488 from twisted.internet import reactor as clock | |
489 self.clock = clock | |
490 | |
491 | |
492 def markBad(self, mx): | |
493 """Indicate a given mx host is not currently functioning. | |
494 | |
495 @type mx: C{str} | |
496 @param mx: The hostname of the host which is down. | |
497 """ | |
498 self.badMXs[str(mx)] = self.clock.seconds() + self.timeOutBadMX | |
499 | |
500 def markGood(self, mx): | |
501 """Indicate a given mx host is back online. | |
502 | |
503 @type mx: C{str} | |
504 @param mx: The hostname of the host which is up. | |
505 """ | |
506 try: | |
507 del self.badMXs[mx] | |
508 except KeyError: | |
509 pass | |
510 | |
511 def getMX(self, domain, maximumCanonicalChainLength=3): | |
512 """ | |
513 Find an MX record for the given domain. | |
514 | |
515 @type domain: C{str} | |
516 @param domain: The domain name for which to look up an MX record. | |
517 | |
518 @type maximumCanonicalChainLength: C{int} | |
519 @param maximumCanonicalChainLength: The maximum number of unique CNAME | |
520 records to follow while looking up the MX record. | |
521 | |
522 @return: A L{Deferred} which is called back with a string giving the | |
523 name in the found MX record or which is errbacked if no MX record | |
524 can be found. | |
525 """ | |
526 mailExchangeDeferred = self.resolver.lookupMailExchange(domain) | |
527 mailExchangeDeferred.addCallback(self._filterRecords) | |
528 mailExchangeDeferred.addCallback( | |
529 self._cbMX, domain, maximumCanonicalChainLength) | |
530 mailExchangeDeferred.addErrback(self._ebMX, domain) | |
531 return mailExchangeDeferred | |
532 | |
533 | |
534 def _filterRecords(self, records): | |
535 """ | |
536 Convert a DNS response (a three-tuple of lists of RRHeaders) into a | |
537 mapping from record names to lists of corresponding record payloads. | |
538 """ | |
539 recordBag = {} | |
540 for answer in records[0]: | |
541 recordBag.setdefault(str(answer.name), []).append(answer.payload) | |
542 return recordBag | |
543 | |
544 | |
545 def _cbMX(self, answers, domain, cnamesLeft): | |
546 """ | |
547 Try to find the MX host from the given DNS information. | |
548 | |
549 This will attempt to resolve CNAME results. It can recognize loops | |
550 and will give up on non-cyclic chains after a specified number of | |
551 lookups. | |
552 """ | |
553 # Do this import here so that relaymanager.py doesn't depend on | |
554 # twisted.names, only MXCalculator will. | |
555 from twisted.names import dns, error | |
556 | |
557 seenAliases = set() | |
558 exchanges = [] | |
559 # Examine the answers for the domain we asked about | |
560 pertinentRecords = answers.get(domain, []) | |
561 while pertinentRecords: | |
562 record = pertinentRecords.pop() | |
563 | |
564 # If it's a CNAME, we'll need to do some more processing | |
565 if record.TYPE == dns.CNAME: | |
566 | |
567 # Remember that this name was an alias. | |
568 seenAliases.add(domain) | |
569 | |
570 canonicalName = str(record.name) | |
571 # See if we have some local records which might be relevant. | |
572 if canonicalName in answers: | |
573 | |
574 # Make sure it isn't a loop contained entirely within the | |
575 # results we have here. | |
576 if canonicalName in seenAliases: | |
577 return Failure(CanonicalNameLoop(record)) | |
578 | |
579 pertinentRecords = answers[canonicalName] | |
580 exchanges = [] | |
581 else: | |
582 if cnamesLeft: | |
583 # Request more information from the server. | |
584 return self.getMX(canonicalName, cnamesLeft - 1) | |
585 else: | |
586 # Give up. | |
587 return Failure(CanonicalNameChainTooLong(record)) | |
588 | |
589 # If it's an MX, collect it. | |
590 if record.TYPE == dns.MX: | |
591 exchanges.append((record.preference, record)) | |
592 | |
593 if exchanges: | |
594 exchanges.sort() | |
595 for (preference, record) in exchanges: | |
596 host = str(record.name) | |
597 if host not in self.badMXs: | |
598 return record | |
599 t = self.clock.seconds() - self.badMXs[host] | |
600 if t >= 0: | |
601 del self.badMXs[host] | |
602 return record | |
603 return exchanges[0][1] | |
604 else: | |
605 # Treat no answers the same as an error - jump to the errback to try | |
606 # to look up an A record. This provides behavior described as a | |
607 # special case in RFC 974 in the section headed I{Interpreting the | |
608 # List of MX RRs}. | |
609 return Failure( | |
610 error.DNSNameError("No MX records for %r" % (domain,))) | |
611 | |
612 | |
613 def _ebMX(self, failure, domain): | |
614 from twisted.names import error, dns | |
615 | |
616 if self.fallbackToDomain: | |
617 failure.trap(error.DNSNameError) | |
618 log.msg("MX lookup failed; attempting to use hostname (%s) directly"
% (domain,)) | |
619 | |
620 # Alright, I admit, this is a bit icky. | |
621 d = self.resolver.getHostByName(domain) | |
622 def cbResolved(addr): | |
623 return dns.Record_MX(name=addr) | |
624 def ebResolved(err): | |
625 err.trap(error.DNSNameError) | |
626 raise DNSLookupError() | |
627 d.addCallbacks(cbResolved, ebResolved) | |
628 return d | |
629 elif failure.check(error.DNSNameError): | |
630 raise IOError("No MX found for %r" % (domain,)) | |
631 return failure | |
OLD | NEW |