Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1)

Unified Diff: third_party/twisted_8_1/twisted/mail/relaymanager.py

Issue 12261012: Remove third_party/twisted_8_1 (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/tools/build
Patch Set: Created 7 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: third_party/twisted_8_1/twisted/mail/relaymanager.py
diff --git a/third_party/twisted_8_1/twisted/mail/relaymanager.py b/third_party/twisted_8_1/twisted/mail/relaymanager.py
deleted file mode 100644
index 8cf4edace7eb5f370974a5debe387c1639b5daf2..0000000000000000000000000000000000000000
--- a/third_party/twisted_8_1/twisted/mail/relaymanager.py
+++ /dev/null
@@ -1,631 +0,0 @@
-# -*- test-case-name: twisted.mail.test.test_mail -*-
-# Copyright (c) 2001-2008 Twisted Matrix Laboratories.
-# See LICENSE for details.
-
-"""
-Infrastructure for relaying mail through smart host
-
-Today, internet e-mail has stopped being Peer-to-peer for many problems,
-spam (unsolicited bulk mail) among them. Instead, most nodes on the
-internet send all e-mail to a single computer, usually the ISP's though
-sometimes other schemes, such as SMTP-after-POP, are used. This computer
-is supposedly permanently up and traceable, and will do the work of
-figuring out MXs and connecting to them. This kind of configuration
-is usually termed "smart host", since the host we are connecting to
-is "smart" (and will find MXs and connect to them) rather then just
-accepting mail for a small set of domains.
-
-The classes here are meant to facilitate support for such a configuration
-for the twisted.mail SMTP server
-"""
-
-import rfc822
-import os
-import time
-
-try:
- import cPickle as pickle
-except ImportError:
- import pickle
-
-from twisted.python import log
-from twisted.python.failure import Failure
-from twisted.python.compat import set
-from twisted.mail import relay
-from twisted.mail import bounce
-from twisted.internet import protocol
-from twisted.internet.defer import Deferred, DeferredList
-from twisted.internet.error import DNSLookupError
-from twisted.mail import smtp
-from twisted.application import internet
-
-class ManagedRelayerMixin:
- """SMTP Relayer which notifies a manager
-
- Notify the manager about successful mail, failed mail
- and broken connections
- """
-
- def __init__(self, manager):
- self.manager = manager
-
- def sentMail(self, code, resp, numOk, addresses, log):
- """called when e-mail has been sent
-
- we will always get 0 or 1 addresses.
- """
- message = self.names[0]
- if code in smtp.SUCCESS:
- self.manager.notifySuccess(self.factory, message)
- else:
- self.manager.notifyFailure(self.factory, message)
- del self.messages[0]
- del self.names[0]
-
- def connectionLost(self, reason):
- """called when connection is broken
-
- notify manager we will try to send no more e-mail
- """
- self.manager.notifyDone(self.factory)
-
-class SMTPManagedRelayer(ManagedRelayerMixin, relay.SMTPRelayer):
- def __init__(self, messages, manager, *args, **kw):
- """
- @type messages: C{list} of C{str}
- @param messages: Filenames of messages to relay
-
- manager should support .notifySuccess, .notifyFailure
- and .notifyDone
- """
- ManagedRelayerMixin.__init__(self, manager)
- relay.SMTPRelayer.__init__(self, messages, *args, **kw)
-
-class ESMTPManagedRelayer(ManagedRelayerMixin, relay.ESMTPRelayer):
- def __init__(self, messages, manager, *args, **kw):
- """
- @type messages: C{list} of C{str}
- @param messages: Filenames of messages to relay
-
- manager should support .notifySuccess, .notifyFailure
- and .notifyDone
- """
- ManagedRelayerMixin.__init__(self, manager)
- relay.ESMTPRelayer.__init__(self, messages, *args, **kw)
-
-class SMTPManagedRelayerFactory(protocol.ClientFactory):
- protocol = SMTPManagedRelayer
-
- def __init__(self, messages, manager, *args, **kw):
- self.messages = messages
- self.manager = manager
- self.pArgs = args
- self.pKwArgs = kw
-
- def buildProtocol(self, addr):
- protocol = self.protocol(self.messages, self.manager, *self.pArgs,
- **self.pKwArgs)
- protocol.factory = self
- return protocol
-
- def clientConnectionFailed(self, connector, reason):
- """called when connection could not be made
-
- our manager should be notified that this happened,
- it might prefer some other host in that case"""
- self.manager.notifyNoConnection(self)
- self.manager.notifyDone(self)
-
-class ESMTPManagedRelayerFactory(SMTPManagedRelayerFactory):
- protocol = ESMTPManagedRelayer
-
- def __init__(self, messages, manager, secret, contextFactory, *args, **kw):
- self.secret = secret
- self.contextFactory = contextFactory
- SMTPManagedRelayerFactory.__init__(self, messages, manager, *args, **kw)
-
- def buildProtocol(self, addr):
- s = self.secret and self.secret(addr)
- protocol = self.protocol(self.messages, self.manager, s,
- self.contextFactory, *self.pArgs, **self.pKwArgs)
- protocol.factory = self
- return protocol
-
-class Queue:
- """A queue of ougoing emails."""
-
- noisy = True
-
- def __init__(self, directory):
- self.directory = directory
- self._init()
-
- def _init(self):
- self.n = 0
- self.waiting = {}
- self.relayed = {}
- self.readDirectory()
-
- def __getstate__(self):
- """(internal) delete volatile state"""
- return {'directory' : self.directory}
-
- def __setstate__(self, state):
- """(internal) restore volatile state"""
- self.__dict__.update(state)
- self._init()
-
- def readDirectory(self):
- """Read the messages directory.
-
- look for new messages.
- """
- for message in os.listdir(self.directory):
- # Skip non data files
- if message[-2:]!='-D':
- continue
- self.addMessage(message[:-2])
-
- def getWaiting(self):
- return self.waiting.keys()
-
- def hasWaiting(self):
- return len(self.waiting) > 0
-
- def getRelayed(self):
- return self.relayed.keys()
-
- def setRelaying(self, message):
- del self.waiting[message]
- self.relayed[message] = 1
-
- def setWaiting(self, message):
- del self.relayed[message]
- self.waiting[message] = 1
-
- def addMessage(self, message):
- if message not in self.relayed:
- self.waiting[message] = 1
- if self.noisy:
- log.msg('Set ' + message + ' waiting')
-
- def done(self, message):
- """Remove message to from queue."""
- message = os.path.basename(message)
- os.remove(self.getPath(message) + '-D')
- os.remove(self.getPath(message) + '-H')
- del self.relayed[message]
-
- def getPath(self, message):
- """Get the path in the filesystem of a message."""
- return os.path.join(self.directory, message)
-
- def getEnvelope(self, message):
- return pickle.load(self.getEnvelopeFile(message))
-
- def getEnvelopeFile(self, message):
- return open(os.path.join(self.directory, message+'-H'), 'rb')
-
- def createNewMessage(self):
- """Create a new message in the queue.
-
- Return a tuple - file-like object for headers, and ISMTPMessage.
- """
- fname = "%s_%s_%s_%s" % (os.getpid(), time.time(), self.n, id(self))
- self.n = self.n + 1
- headerFile = open(os.path.join(self.directory, fname+'-H'), 'wb')
- tempFilename = os.path.join(self.directory, fname+'-C')
- finalFilename = os.path.join(self.directory, fname+'-D')
- messageFile = open(tempFilename, 'wb')
-
- from twisted.mail.mail import FileMessage
- return headerFile,FileMessage(messageFile, tempFilename, finalFilename)
-
-
-class _AttemptManager(object):
- """
- Manage the state of a single attempt to flush the relay queue.
- """
- def __init__(self, manager):
- self.manager = manager
- self._completionDeferreds = []
-
-
- def getCompletionDeferred(self):
- self._completionDeferreds.append(Deferred())
- return self._completionDeferreds[-1]
-
-
- def _finish(self, relay, message):
- self.manager.managed[relay].remove(os.path.basename(message))
- self.manager.queue.done(message)
-
-
- def notifySuccess(self, relay, message):
- """a relay sent a message successfully
-
- Mark it as sent in our lists
- """
- if self.manager.queue.noisy:
- log.msg("success sending %s, removing from queue" % message)
- self._finish(relay, message)
-
-
- def notifyFailure(self, relay, message):
- """Relaying the message has failed."""
- if self.manager.queue.noisy:
- log.msg("could not relay "+message)
- # Moshe - Bounce E-mail here
- # Be careful: if it's a bounced bounce, silently
- # discard it
- message = os.path.basename(message)
- fp = self.manager.queue.getEnvelopeFile(message)
- from_, to = pickle.load(fp)
- fp.close()
- from_, to, bounceMessage = bounce.generateBounce(open(self.manager.queue.getPath(message)+'-D'), from_, to)
- fp, outgoingMessage = self.manager.queue.createNewMessage()
- pickle.dump([from_, to], fp)
- fp.close()
- for line in bounceMessage.splitlines():
- outgoingMessage.lineReceived(line)
- outgoingMessage.eomReceived()
- self._finish(relay, self.manager.queue.getPath(message))
-
-
- def notifyDone(self, relay):
- """A relaying SMTP client is disconnected.
-
- unmark all pending messages under this relay's responsibility
- as being relayed, and remove the relay.
- """
- for message in self.manager.managed.get(relay, ()):
- if self.manager.queue.noisy:
- log.msg("Setting " + message + " waiting")
- self.manager.queue.setWaiting(message)
- try:
- del self.manager.managed[relay]
- except KeyError:
- pass
- notifications = self._completionDeferreds
- self._completionDeferreds = None
- for d in notifications:
- d.callback(None)
-
-
- def notifyNoConnection(self, relay):
- """Relaying SMTP client couldn't connect.
-
- Useful because it tells us our upstream server is unavailable.
- """
- # Back off a bit
- try:
- msgs = self.manager.managed[relay]
- except KeyError:
- log.msg("notifyNoConnection passed unknown relay!")
- return
-
- if self.manager.queue.noisy:
- log.msg("Backing off on delivery of " + str(msgs))
- def setWaiting(queue, messages):
- map(queue.setWaiting, messages)
- from twisted.internet import reactor
- reactor.callLater(30, setWaiting, self.manager.queue, msgs)
- del self.manager.managed[relay]
-
-
-
-class SmartHostSMTPRelayingManager:
- """Manage SMTP Relayers
-
- Manage SMTP relayers, keeping track of the existing connections,
- each connection's responsibility in term of messages. Create
- more relayers if the need arises.
-
- Someone should press .checkState periodically
-
- @ivar fArgs: Additional positional arguments used to instantiate
- C{factory}.
-
- @ivar fKwArgs: Additional keyword arguments used to instantiate
- C{factory}.
-
- @ivar factory: A callable which returns a ClientFactory suitable for
- making SMTP connections.
- """
-
- factory = SMTPManagedRelayerFactory
-
- PORT = 25
-
- mxcalc = None
-
- def __init__(self, queue, maxConnections=2, maxMessagesPerConnection=10):
- """
- @type queue: Any implementor of C{IQueue}
- @param queue: The object used to queue messages on their way to
- delivery.
-
- @type maxConnections: C{int}
- @param maxConnections: The maximum number of SMTP connections to
- allow to be opened at any given time.
-
- @type maxMessagesPerConnection: C{int}
- @param maxMessagesPerConnection: The maximum number of messages a
- relayer will be given responsibility for.
-
- Default values are meant for a small box with 1-5 users.
- """
- self.maxConnections = maxConnections
- self.maxMessagesPerConnection = maxMessagesPerConnection
- self.managed = {} # SMTP clients we're managing
- self.queue = queue
- self.fArgs = ()
- self.fKwArgs = {}
-
- def __getstate__(self):
- """(internal) delete volatile state"""
- dct = self.__dict__.copy()
- del dct['managed']
- return dct
-
- def __setstate__(self, state):
- """(internal) restore volatile state"""
- self.__dict__.update(state)
- self.managed = {}
-
- def checkState(self):
- """
- Synchronize with the state of the world, and maybe launch a new
- relay.
-
- Call me periodically to check I am still up to date.
-
- @return: None or a Deferred which fires when all of the SMTP clients
- started by this call have disconnected.
- """
- self.queue.readDirectory()
- if (len(self.managed) >= self.maxConnections):
- return
- if not self.queue.hasWaiting():
- return
-
- return self._checkStateMX()
-
- def _checkStateMX(self):
- nextMessages = self.queue.getWaiting()
- nextMessages.reverse()
-
- exchanges = {}
- for msg in nextMessages:
- from_, to = self.queue.getEnvelope(msg)
- name, addr = rfc822.parseaddr(to)
- parts = addr.split('@', 1)
- if len(parts) != 2:
- log.err("Illegal message destination: " + to)
- continue
- domain = parts[1]
-
- self.queue.setRelaying(msg)
- exchanges.setdefault(domain, []).append(self.queue.getPath(msg))
- if len(exchanges) >= (self.maxConnections - len(self.managed)):
- break
-
- if self.mxcalc is None:
- self.mxcalc = MXCalculator()
-
- relays = []
- for (domain, msgs) in exchanges.iteritems():
- manager = _AttemptManager(self)
- factory = self.factory(msgs, manager, *self.fArgs, **self.fKwArgs)
- self.managed[factory] = map(os.path.basename, msgs)
- relayAttemptDeferred = manager.getCompletionDeferred()
- connectSetupDeferred = self.mxcalc.getMX(domain)
- connectSetupDeferred.addCallback(lambda mx: str(mx.name))
- connectSetupDeferred.addCallback(self._cbExchange, self.PORT, factory)
- connectSetupDeferred.addErrback(lambda err: (relayAttemptDeferred.errback(err), err)[1])
- connectSetupDeferred.addErrback(self._ebExchange, factory, domain)
- relays.append(relayAttemptDeferred)
- return DeferredList(relays)
-
-
- def _cbExchange(self, address, port, factory):
- from twisted.internet import reactor
- reactor.connectTCP(address, port, factory)
-
- def _ebExchange(self, failure, factory, domain):
- log.err('Error setting up managed relay factory for ' + domain)
- log.err(failure)
- def setWaiting(queue, messages):
- map(queue.setWaiting, messages)
- from twisted.internet import reactor
- reactor.callLater(30, setWaiting, self.queue, self.managed[factory])
- del self.managed[factory]
-
-class SmartHostESMTPRelayingManager(SmartHostSMTPRelayingManager):
- factory = ESMTPManagedRelayerFactory
-
-def _checkState(manager):
- manager.checkState()
-
-def RelayStateHelper(manager, delay):
- return internet.TimerService(delay, _checkState, manager)
-
-
-
-class CanonicalNameLoop(Exception):
- """
- When trying to look up the MX record for a host, a set of CNAME records was
- found which form a cycle and resolution was abandoned.
- """
-
-
-class CanonicalNameChainTooLong(Exception):
- """
- When trying to look up the MX record for a host, too many CNAME records
- which point to other CNAME records were encountered and resolution was
- abandoned.
- """
-
-
-class MXCalculator:
- """
- A utility for looking up mail exchange hosts and tracking whether they are
- working or not.
-
- @ivar clock: L{IReactorTime} provider which will be used to decide when to
- retry mail exchanges which have not been working.
- """
- timeOutBadMX = 60 * 60 # One hour
- fallbackToDomain = True
-
- def __init__(self, resolver=None, clock=None):
- self.badMXs = {}
- if resolver is None:
- from twisted.names.client import createResolver
- resolver = createResolver()
- self.resolver = resolver
- if clock is None:
- from twisted.internet import reactor as clock
- self.clock = clock
-
-
- def markBad(self, mx):
- """Indicate a given mx host is not currently functioning.
-
- @type mx: C{str}
- @param mx: The hostname of the host which is down.
- """
- self.badMXs[str(mx)] = self.clock.seconds() + self.timeOutBadMX
-
- def markGood(self, mx):
- """Indicate a given mx host is back online.
-
- @type mx: C{str}
- @param mx: The hostname of the host which is up.
- """
- try:
- del self.badMXs[mx]
- except KeyError:
- pass
-
- def getMX(self, domain, maximumCanonicalChainLength=3):
- """
- Find an MX record for the given domain.
-
- @type domain: C{str}
- @param domain: The domain name for which to look up an MX record.
-
- @type maximumCanonicalChainLength: C{int}
- @param maximumCanonicalChainLength: The maximum number of unique CNAME
- records to follow while looking up the MX record.
-
- @return: A L{Deferred} which is called back with a string giving the
- name in the found MX record or which is errbacked if no MX record
- can be found.
- """
- mailExchangeDeferred = self.resolver.lookupMailExchange(domain)
- mailExchangeDeferred.addCallback(self._filterRecords)
- mailExchangeDeferred.addCallback(
- self._cbMX, domain, maximumCanonicalChainLength)
- mailExchangeDeferred.addErrback(self._ebMX, domain)
- return mailExchangeDeferred
-
-
- def _filterRecords(self, records):
- """
- Convert a DNS response (a three-tuple of lists of RRHeaders) into a
- mapping from record names to lists of corresponding record payloads.
- """
- recordBag = {}
- for answer in records[0]:
- recordBag.setdefault(str(answer.name), []).append(answer.payload)
- return recordBag
-
-
- def _cbMX(self, answers, domain, cnamesLeft):
- """
- Try to find the MX host from the given DNS information.
-
- This will attempt to resolve CNAME results. It can recognize loops
- and will give up on non-cyclic chains after a specified number of
- lookups.
- """
- # Do this import here so that relaymanager.py doesn't depend on
- # twisted.names, only MXCalculator will.
- from twisted.names import dns, error
-
- seenAliases = set()
- exchanges = []
- # Examine the answers for the domain we asked about
- pertinentRecords = answers.get(domain, [])
- while pertinentRecords:
- record = pertinentRecords.pop()
-
- # If it's a CNAME, we'll need to do some more processing
- if record.TYPE == dns.CNAME:
-
- # Remember that this name was an alias.
- seenAliases.add(domain)
-
- canonicalName = str(record.name)
- # See if we have some local records which might be relevant.
- if canonicalName in answers:
-
- # Make sure it isn't a loop contained entirely within the
- # results we have here.
- if canonicalName in seenAliases:
- return Failure(CanonicalNameLoop(record))
-
- pertinentRecords = answers[canonicalName]
- exchanges = []
- else:
- if cnamesLeft:
- # Request more information from the server.
- return self.getMX(canonicalName, cnamesLeft - 1)
- else:
- # Give up.
- return Failure(CanonicalNameChainTooLong(record))
-
- # If it's an MX, collect it.
- if record.TYPE == dns.MX:
- exchanges.append((record.preference, record))
-
- if exchanges:
- exchanges.sort()
- for (preference, record) in exchanges:
- host = str(record.name)
- if host not in self.badMXs:
- return record
- t = self.clock.seconds() - self.badMXs[host]
- if t >= 0:
- del self.badMXs[host]
- return record
- return exchanges[0][1]
- else:
- # Treat no answers the same as an error - jump to the errback to try
- # to look up an A record. This provides behavior described as a
- # special case in RFC 974 in the section headed I{Interpreting the
- # List of MX RRs}.
- return Failure(
- error.DNSNameError("No MX records for %r" % (domain,)))
-
-
- def _ebMX(self, failure, domain):
- from twisted.names import error, dns
-
- if self.fallbackToDomain:
- failure.trap(error.DNSNameError)
- log.msg("MX lookup failed; attempting to use hostname (%s) directly" % (domain,))
-
- # Alright, I admit, this is a bit icky.
- d = self.resolver.getHostByName(domain)
- def cbResolved(addr):
- return dns.Record_MX(name=addr)
- def ebResolved(err):
- err.trap(error.DNSNameError)
- raise DNSLookupError()
- d.addCallbacks(cbResolved, ebResolved)
- return d
- elif failure.check(error.DNSNameError):
- raise IOError("No MX found for %r" % (domain,))
- return failure
« no previous file with comments | « third_party/twisted_8_1/twisted/mail/relay.py ('k') | third_party/twisted_8_1/twisted/mail/scripts/__init__.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698