| Index: third_party/twisted_8_1/twisted/test/test_ftp.py
|
| diff --git a/third_party/twisted_8_1/twisted/test/test_ftp.py b/third_party/twisted_8_1/twisted/test/test_ftp.py
|
| deleted file mode 100644
|
| index 92c36ed056d9ac6390e958f1abecf3b6842f9f3d..0000000000000000000000000000000000000000
|
| --- a/third_party/twisted_8_1/twisted/test/test_ftp.py
|
| +++ /dev/null
|
| @@ -1,2253 +0,0 @@
|
| -# Copyright (c) 2001-2007 Twisted Matrix Laboratories.
|
| -# See LICENSE for details.
|
| -
|
| -
|
| -"""
|
| -FTP tests.
|
| -
|
| -Maintainer: U{Andrew Bennetts<mailto:spiv@twistedmatrix.com>}
|
| -"""
|
| -
|
| -import os.path
|
| -from StringIO import StringIO
|
| -import shutil
|
| -import errno
|
| -
|
| -from zope.interface import implements
|
| -
|
| -from twisted.trial import unittest
|
| -from twisted.protocols import basic
|
| -from twisted.internet import reactor, protocol, defer, error
|
| -from twisted.internet.interfaces import IConsumer
|
| -from twisted.cred import portal, checkers, credentials
|
| -from twisted.python import failure, filepath
|
| -from twisted.test import proto_helpers
|
| -
|
| -from twisted.protocols import ftp, loopback
|
| -
|
| -class NonClosingStringIO(StringIO):
|
| - def close(self):
|
| - pass
|
| -
|
| -StringIOWithoutClosing = NonClosingStringIO
|
| -
|
| -
|
| -
|
| -class Dummy(basic.LineReceiver):
|
| - logname = None
|
| - def __init__(self):
|
| - self.lines = []
|
| - self.rawData = []
|
| - def connectionMade(self):
|
| - self.f = self.factory # to save typing in pdb :-)
|
| - def lineReceived(self,line):
|
| - self.lines.append(line)
|
| - def rawDataReceived(self, data):
|
| - self.rawData.append(data)
|
| - def lineLengthExceeded(self, line):
|
| - pass
|
| -
|
| -
|
| -class _BufferingProtocol(protocol.Protocol):
|
| - def connectionMade(self):
|
| - self.buffer = ''
|
| - self.d = defer.Deferred()
|
| - def dataReceived(self, data):
|
| - self.buffer += data
|
| - def connectionLost(self, reason):
|
| - self.d.callback(self)
|
| -
|
| -
|
| -class FTPServerTestCase(unittest.TestCase):
|
| - """
|
| - Simple tests for an FTP server with the default settings.
|
| -
|
| - @ivar clientFactory: class used as ftp client.
|
| - """
|
| - clientFactory = ftp.FTPClientBasic
|
| -
|
| - def setUp(self):
|
| - # Create a directory
|
| - self.directory = self.mktemp()
|
| - os.mkdir(self.directory)
|
| -
|
| - # Start the server
|
| - p = portal.Portal(ftp.FTPRealm(self.directory))
|
| - p.registerChecker(checkers.AllowAnonymousAccess(),
|
| - credentials.IAnonymous)
|
| - self.factory = ftp.FTPFactory(portal=p)
|
| - self.port = reactor.listenTCP(0, self.factory, interface="127.0.0.1")
|
| -
|
| - # Hook the server's buildProtocol to make the protocol instance
|
| - # accessible to tests.
|
| - buildProtocol = self.factory.buildProtocol
|
| - d1 = defer.Deferred()
|
| - def _rememberProtocolInstance(addr):
|
| - protocol = buildProtocol(addr)
|
| - self.serverProtocol = protocol.wrappedProtocol
|
| - d1.callback(None)
|
| - return protocol
|
| - self.factory.buildProtocol = _rememberProtocolInstance
|
| -
|
| - # Connect a client to it
|
| - portNum = self.port.getHost().port
|
| - clientCreator = protocol.ClientCreator(reactor, self.clientFactory)
|
| - d2 = clientCreator.connectTCP("127.0.0.1", portNum)
|
| - def gotClient(client):
|
| - self.client = client
|
| - d2.addCallback(gotClient)
|
| - return defer.gatherResults([d1, d2])
|
| -
|
| - def tearDown(self):
|
| - # Clean up sockets
|
| - self.client.transport.loseConnection()
|
| - d = defer.maybeDeferred(self.port.stopListening)
|
| - d.addCallback(self.ebTearDown)
|
| - return d
|
| -
|
| - def ebTearDown(self, ignore):
|
| - del self.serverProtocol
|
| - # Clean up temporary directory
|
| - shutil.rmtree(self.directory)
|
| -
|
| - def assertCommandResponse(self, command, expectedResponseLines,
|
| - chainDeferred=None):
|
| - """Asserts that a sending an FTP command receives the expected
|
| - response.
|
| -
|
| - Returns a Deferred. Optionally accepts a deferred to chain its actions
|
| - to.
|
| - """
|
| - if chainDeferred is None:
|
| - chainDeferred = defer.succeed(None)
|
| -
|
| - def queueCommand(ignored):
|
| - d = self.client.queueStringCommand(command)
|
| - def gotResponse(responseLines):
|
| - self.assertEquals(expectedResponseLines, responseLines)
|
| - return d.addCallback(gotResponse)
|
| - return chainDeferred.addCallback(queueCommand)
|
| -
|
| - def assertCommandFailed(self, command, expectedResponse=None,
|
| - chainDeferred=None):
|
| - if chainDeferred is None:
|
| - chainDeferred = defer.succeed(None)
|
| -
|
| - def queueCommand(ignored):
|
| - return self.client.queueStringCommand(command)
|
| - chainDeferred.addCallback(queueCommand)
|
| - self.assertFailure(chainDeferred, ftp.CommandFailed)
|
| - def failed(exception):
|
| - if expectedResponse is not None:
|
| - self.failUnlessEqual(
|
| - expectedResponse, exception.args[0])
|
| - return chainDeferred.addCallback(failed)
|
| -
|
| - def _anonymousLogin(self):
|
| - d = self.assertCommandResponse(
|
| - 'USER anonymous',
|
| - ['331 Guest login ok, type your email address as password.'])
|
| - return self.assertCommandResponse(
|
| - 'PASS test@twistedmatrix.com',
|
| - ['230 Anonymous login ok, access restrictions apply.'],
|
| - chainDeferred=d)
|
| -
|
| -
|
| -class BasicFTPServerTestCase(FTPServerTestCase):
|
| - def testNotLoggedInReply(self):
|
| - """When not logged in, all commands other than USER and PASS should
|
| - get NOT_LOGGED_IN errors.
|
| - """
|
| - commandList = ['CDUP', 'CWD', 'LIST', 'MODE', 'PASV',
|
| - 'PWD', 'RETR', 'STRU', 'SYST', 'TYPE']
|
| -
|
| - # Issue commands, check responses
|
| - def checkResponse(exception):
|
| - failureResponseLines = exception.args[0]
|
| - self.failUnless(failureResponseLines[-1].startswith("530"),
|
| - "Response didn't start with 530: %r"
|
| - % (failureResponseLines[-1],))
|
| - deferreds = []
|
| - for command in commandList:
|
| - deferred = self.client.queueStringCommand(command)
|
| - self.assertFailure(deferred, ftp.CommandFailed)
|
| - deferred.addCallback(checkResponse)
|
| - deferreds.append(deferred)
|
| - return defer.DeferredList(deferreds, fireOnOneErrback=True)
|
| -
|
| - def testPASSBeforeUSER(self):
|
| - """Issuing PASS before USER should give an error."""
|
| - return self.assertCommandFailed(
|
| - 'PASS foo',
|
| - ["503 Incorrect sequence of commands: "
|
| - "USER required before PASS"])
|
| -
|
| - def testNoParamsForUSER(self):
|
| - """Issuing USER without a username is a syntax error."""
|
| - return self.assertCommandFailed(
|
| - 'USER',
|
| - ['500 Syntax error: USER requires an argument.'])
|
| -
|
| - def testNoParamsForPASS(self):
|
| - """Issuing PASS without a password is a syntax error."""
|
| - d = self.client.queueStringCommand('USER foo')
|
| - return self.assertCommandFailed(
|
| - 'PASS',
|
| - ['500 Syntax error: PASS requires an argument.'],
|
| - chainDeferred=d)
|
| -
|
| - def testAnonymousLogin(self):
|
| - return self._anonymousLogin()
|
| -
|
| - def testQuit(self):
|
| - """Issuing QUIT should return a 221 message."""
|
| - d = self._anonymousLogin()
|
| - return self.assertCommandResponse(
|
| - 'QUIT',
|
| - ['221 Goodbye.'],
|
| - chainDeferred=d)
|
| -
|
| - def testAnonymousLoginDenied(self):
|
| - # Reconfigure the server to disallow anonymous access, and to have an
|
| - # IUsernamePassword checker that always rejects.
|
| - self.factory.allowAnonymous = False
|
| - denyAlwaysChecker = checkers.InMemoryUsernamePasswordDatabaseDontUse()
|
| - self.factory.portal.registerChecker(denyAlwaysChecker,
|
| - credentials.IUsernamePassword)
|
| -
|
| - # Same response code as allowAnonymous=True, but different text.
|
| - d = self.assertCommandResponse(
|
| - 'USER anonymous',
|
| - ['331 Password required for anonymous.'])
|
| -
|
| - # It will be denied. No-one can login.
|
| - d = self.assertCommandFailed(
|
| - 'PASS test@twistedmatrix.com',
|
| - ['530 Sorry, Authentication failed.'],
|
| - chainDeferred=d)
|
| -
|
| - # It's not just saying that. You aren't logged in.
|
| - d = self.assertCommandFailed(
|
| - 'PWD',
|
| - ['530 Please login with USER and PASS.'],
|
| - chainDeferred=d)
|
| - return d
|
| -
|
| - def testUnknownCommand(self):
|
| - d = self._anonymousLogin()
|
| - return self.assertCommandFailed(
|
| - 'GIBBERISH',
|
| - ["502 Command 'GIBBERISH' not implemented"],
|
| - chainDeferred=d)
|
| -
|
| - def testRETRBeforePORT(self):
|
| - d = self._anonymousLogin()
|
| - return self.assertCommandFailed(
|
| - 'RETR foo',
|
| - ["503 Incorrect sequence of commands: "
|
| - "PORT or PASV required before RETR"],
|
| - chainDeferred=d)
|
| -
|
| - def testSTORBeforePORT(self):
|
| - d = self._anonymousLogin()
|
| - return self.assertCommandFailed(
|
| - 'STOR foo',
|
| - ["503 Incorrect sequence of commands: "
|
| - "PORT or PASV required before STOR"],
|
| - chainDeferred=d)
|
| -
|
| - def testBadCommandArgs(self):
|
| - d = self._anonymousLogin()
|
| - self.assertCommandFailed(
|
| - 'MODE z',
|
| - ["504 Not implemented for parameter 'z'."],
|
| - chainDeferred=d)
|
| - self.assertCommandFailed(
|
| - 'STRU I',
|
| - ["504 Not implemented for parameter 'I'."],
|
| - chainDeferred=d)
|
| - return d
|
| -
|
| - def testDecodeHostPort(self):
|
| - self.assertEquals(ftp.decodeHostPort('25,234,129,22,100,23'),
|
| - ('25.234.129.22', 25623))
|
| - nums = range(6)
|
| - for i in range(6):
|
| - badValue = list(nums)
|
| - badValue[i] = 256
|
| - s = ','.join(map(str, badValue))
|
| - self.assertRaises(ValueError, ftp.decodeHostPort, s)
|
| -
|
| - def testPASV(self):
|
| - # Login
|
| - wfd = defer.waitForDeferred(self._anonymousLogin())
|
| - yield wfd
|
| - wfd.getResult()
|
| -
|
| - # Issue a PASV command, and extract the host and port from the response
|
| - pasvCmd = defer.waitForDeferred(self.client.queueStringCommand('PASV'))
|
| - yield pasvCmd
|
| - responseLines = pasvCmd.getResult()
|
| - host, port = ftp.decodeHostPort(responseLines[-1][4:])
|
| -
|
| - # Make sure the server is listening on the port it claims to be
|
| - self.assertEqual(port, self.serverProtocol.dtpPort.getHost().port)
|
| -
|
| - # Semi-reasonable way to force cleanup
|
| - self.serverProtocol.transport.loseConnection()
|
| - testPASV = defer.deferredGenerator(testPASV)
|
| -
|
| - def testSYST(self):
|
| - d = self._anonymousLogin()
|
| - self.assertCommandResponse('SYST', ["215 UNIX Type: L8"],
|
| - chainDeferred=d)
|
| - return d
|
| -
|
| -
|
| - def test_portRangeForwardError(self):
|
| - """
|
| - Exceptions other than L{error.CannotListenError} which are raised by
|
| - C{listenFactory} should be raised to the caller of L{FTP.getDTPPort}.
|
| - """
|
| - def listenFactory(portNumber, factory):
|
| - raise RuntimeError()
|
| - self.serverProtocol.listenFactory = listenFactory
|
| -
|
| - self.assertRaises(RuntimeError, self.serverProtocol.getDTPPort,
|
| - protocol.Factory())
|
| -
|
| -
|
| - def test_portRange(self):
|
| - """
|
| - L{FTP.passivePortRange} should determine the ports which
|
| - L{FTP.getDTPPort} attempts to bind. If no port from that iterator can
|
| - be bound, L{error.CannotListenError} should be raised, otherwise the
|
| - first successful result from L{FTP.listenFactory} should be returned.
|
| - """
|
| - def listenFactory(portNumber, factory):
|
| - if portNumber in (22032, 22033, 22034):
|
| - raise error.CannotListenError('localhost', portNumber, 'error')
|
| - return portNumber
|
| - self.serverProtocol.listenFactory = listenFactory
|
| -
|
| - port = self.serverProtocol.getDTPPort(protocol.Factory())
|
| - self.assertEquals(port, 0)
|
| -
|
| - self.serverProtocol.passivePortRange = xrange(22032, 65536)
|
| - port = self.serverProtocol.getDTPPort(protocol.Factory())
|
| - self.assertEquals(port, 22035)
|
| -
|
| - self.serverProtocol.passivePortRange = xrange(22032, 22035)
|
| - self.assertRaises(error.CannotListenError,
|
| - self.serverProtocol.getDTPPort,
|
| - protocol.Factory())
|
| -
|
| -
|
| -
|
| -class FTPServerTestCaseAdvancedClient(FTPServerTestCase):
|
| - """
|
| - Test FTP server with the L{ftp.FTPClient} class.
|
| - """
|
| - clientFactory = ftp.FTPClient
|
| -
|
| - def test_anonymousSTOR(self):
|
| - """
|
| - Try to make an STOR as anonymous, and check that we got a permission
|
| - denied error.
|
| - """
|
| - def eb(res):
|
| - res.trap(ftp.CommandFailed)
|
| - self.assertEquals(res.value.args[0][0],
|
| - '550 foo: Permission denied.')
|
| - d1, d2 = self.client.storeFile('foo')
|
| - d2.addErrback(eb)
|
| - return defer.gatherResults([d1, d2])
|
| -
|
| -
|
| -class FTPServerPasvDataConnectionTestCase(FTPServerTestCase):
|
| - def _makeDataConnection(self, ignored=None):
|
| - # Establish a passive data connection (i.e. client connecting to
|
| - # server).
|
| - d = self.client.queueStringCommand('PASV')
|
| - def gotPASV(responseLines):
|
| - host, port = ftp.decodeHostPort(responseLines[-1][4:])
|
| - cc = protocol.ClientCreator(reactor, _BufferingProtocol)
|
| - return cc.connectTCP('127.0.0.1', port)
|
| - return d.addCallback(gotPASV)
|
| -
|
| - def _download(self, command, chainDeferred=None):
|
| - if chainDeferred is None:
|
| - chainDeferred = defer.succeed(None)
|
| -
|
| - chainDeferred.addCallback(self._makeDataConnection)
|
| - def queueCommand(downloader):
|
| - # wait for the command to return, and the download connection to be
|
| - # closed.
|
| - d1 = self.client.queueStringCommand(command)
|
| - d2 = downloader.d
|
| - return defer.gatherResults([d1, d2])
|
| - chainDeferred.addCallback(queueCommand)
|
| -
|
| - def downloadDone((ignored, downloader)):
|
| - return downloader.buffer
|
| - return chainDeferred.addCallback(downloadDone)
|
| -
|
| - def testEmptyLIST(self):
|
| - # Login
|
| - d = self._anonymousLogin()
|
| -
|
| - # No files, so the file listing should be empty
|
| - self._download('LIST', chainDeferred=d)
|
| - def checkEmpty(result):
|
| - self.assertEqual('', result)
|
| - return d.addCallback(checkEmpty)
|
| -
|
| - def testTwoDirLIST(self):
|
| - # Make some directories
|
| - os.mkdir(os.path.join(self.directory, 'foo'))
|
| - os.mkdir(os.path.join(self.directory, 'bar'))
|
| -
|
| - # Login
|
| - d = self._anonymousLogin()
|
| -
|
| - # We expect 2 lines because there are two files.
|
| - self._download('LIST', chainDeferred=d)
|
| - def checkDownload(download):
|
| - self.assertEqual(2, len(download[:-2].split('\r\n')))
|
| - d.addCallback(checkDownload)
|
| -
|
| - # Download a names-only listing.
|
| - self._download('NLST ', chainDeferred=d)
|
| - def checkDownload(download):
|
| - filenames = download[:-2].split('\r\n')
|
| - filenames.sort()
|
| - self.assertEqual(['bar', 'foo'], filenames)
|
| - d.addCallback(checkDownload)
|
| -
|
| - # Download a listing of the 'foo' subdirectory. 'foo' has no files, so
|
| - # the file listing should be empty.
|
| - self._download('LIST foo', chainDeferred=d)
|
| - def checkDownload(download):
|
| - self.assertEqual('', download)
|
| - d.addCallback(checkDownload)
|
| -
|
| - # Change the current working directory to 'foo'.
|
| - def chdir(ignored):
|
| - return self.client.queueStringCommand('CWD foo')
|
| - d.addCallback(chdir)
|
| -
|
| - # Download a listing from within 'foo', and again it should be empty,
|
| - # because LIST uses the working directory by default.
|
| - self._download('LIST', chainDeferred=d)
|
| - def checkDownload(download):
|
| - self.assertEqual('', download)
|
| - return d.addCallback(checkDownload)
|
| -
|
| - def testManyLargeDownloads(self):
|
| - # Login
|
| - d = self._anonymousLogin()
|
| -
|
| - # Download a range of different size files
|
| - for size in range(100000, 110000, 500):
|
| - fObj = file(os.path.join(self.directory, '%d.txt' % (size,)), 'wb')
|
| - fObj.write('x' * size)
|
| - fObj.close()
|
| -
|
| - self._download('RETR %d.txt' % (size,), chainDeferred=d)
|
| - def checkDownload(download, size=size):
|
| - self.assertEqual('x' * size, download)
|
| - d.addCallback(checkDownload)
|
| - return d
|
| -
|
| -
|
| -class FTPServerPortDataConnectionTestCase(FTPServerPasvDataConnectionTestCase):
|
| - def setUp(self):
|
| - self.dataPorts = []
|
| - return FTPServerPasvDataConnectionTestCase.setUp(self)
|
| -
|
| - def _makeDataConnection(self, ignored=None):
|
| - # Establish an active data connection (i.e. server connecting to
|
| - # client).
|
| - deferred = defer.Deferred()
|
| - class DataFactory(protocol.ServerFactory):
|
| - protocol = _BufferingProtocol
|
| - def buildProtocol(self, addr):
|
| - p = protocol.ServerFactory.buildProtocol(self, addr)
|
| - reactor.callLater(0, deferred.callback, p)
|
| - return p
|
| - dataPort = reactor.listenTCP(0, DataFactory(), interface='127.0.0.1')
|
| - self.dataPorts.append(dataPort)
|
| - cmd = 'PORT ' + ftp.encodeHostPort('127.0.0.1', dataPort.getHost().port)
|
| - self.client.queueStringCommand(cmd)
|
| - return deferred
|
| -
|
| - def tearDown(self):
|
| - l = [defer.maybeDeferred(port.stopListening) for port in self.dataPorts]
|
| - d = defer.maybeDeferred(
|
| - FTPServerPasvDataConnectionTestCase.tearDown, self)
|
| - l.append(d)
|
| - return defer.DeferredList(l, fireOnOneErrback=True)
|
| -
|
| - def testPORTCannotConnect(self):
|
| - # Login
|
| - d = self._anonymousLogin()
|
| -
|
| - # Listen on a port, and immediately stop listening as a way to find a
|
| - # port number that is definitely closed.
|
| - def loggedIn(ignored):
|
| - port = reactor.listenTCP(0, protocol.Factory(),
|
| - interface='127.0.0.1')
|
| - portNum = port.getHost().port
|
| - d = port.stopListening()
|
| - d.addCallback(lambda _: portNum)
|
| - return d
|
| - d.addCallback(loggedIn)
|
| -
|
| - # Tell the server to connect to that port with a PORT command, and
|
| - # verify that it fails with the right error.
|
| - def gotPortNum(portNum):
|
| - return self.assertCommandFailed(
|
| - 'PORT ' + ftp.encodeHostPort('127.0.0.1', portNum),
|
| - ["425 Can't open data connection."])
|
| - return d.addCallback(gotPortNum)
|
| -
|
| -
|
| -# -- Client Tests -----------------------------------------------------------
|
| -
|
| -class PrintLines(protocol.Protocol):
|
| - """Helper class used by FTPFileListingTests."""
|
| -
|
| - def __init__(self, lines):
|
| - self._lines = lines
|
| -
|
| - def connectionMade(self):
|
| - for line in self._lines:
|
| - self.transport.write(line + "\r\n")
|
| - self.transport.loseConnection()
|
| -
|
| -
|
| -class MyFTPFileListProtocol(ftp.FTPFileListProtocol):
|
| - def __init__(self):
|
| - self.other = []
|
| - ftp.FTPFileListProtocol.__init__(self)
|
| -
|
| - def unknownLine(self, line):
|
| - self.other.append(line)
|
| -
|
| -
|
| -class FTPFileListingTests(unittest.TestCase):
|
| - def getFilesForLines(self, lines):
|
| - fileList = MyFTPFileListProtocol()
|
| - d = loopback.loopbackAsync(PrintLines(lines), fileList)
|
| - d.addCallback(lambda _: (fileList.files, fileList.other))
|
| - return d
|
| -
|
| - def testOneLine(self):
|
| - # This example line taken from the docstring for FTPFileListProtocol
|
| - line = '-rw-r--r-- 1 root other 531 Jan 29 03:26 README'
|
| - def check(((file,), other)):
|
| - self.failIf(other, 'unexpect unparsable lines: %s' % repr(other))
|
| - self.failUnless(file['filetype'] == '-', 'misparsed fileitem')
|
| - self.failUnless(file['perms'] == 'rw-r--r--', 'misparsed perms')
|
| - self.failUnless(file['owner'] == 'root', 'misparsed fileitem')
|
| - self.failUnless(file['group'] == 'other', 'misparsed fileitem')
|
| - self.failUnless(file['size'] == 531, 'misparsed fileitem')
|
| - self.failUnless(file['date'] == 'Jan 29 03:26', 'misparsed fileitem')
|
| - self.failUnless(file['filename'] == 'README', 'misparsed fileitem')
|
| - self.failUnless(file['nlinks'] == 1, 'misparsed nlinks')
|
| - self.failIf(file['linktarget'], 'misparsed linktarget')
|
| - return self.getFilesForLines([line]).addCallback(check)
|
| -
|
| - def testVariantLines(self):
|
| - line1 = 'drw-r--r-- 2 root other 531 Jan 9 2003 A'
|
| - line2 = 'lrw-r--r-- 1 root other 1 Jan 29 03:26 B -> A'
|
| - line3 = 'woohoo! '
|
| - def check(((file1, file2), (other,))):
|
| - self.failUnless(other == 'woohoo! \r', 'incorrect other line')
|
| - # file 1
|
| - self.failUnless(file1['filetype'] == 'd', 'misparsed fileitem')
|
| - self.failUnless(file1['perms'] == 'rw-r--r--', 'misparsed perms')
|
| - self.failUnless(file1['owner'] == 'root', 'misparsed owner')
|
| - self.failUnless(file1['group'] == 'other', 'misparsed group')
|
| - self.failUnless(file1['size'] == 531, 'misparsed size')
|
| - self.failUnless(file1['date'] == 'Jan 9 2003', 'misparsed date')
|
| - self.failUnless(file1['filename'] == 'A', 'misparsed filename')
|
| - self.failUnless(file1['nlinks'] == 2, 'misparsed nlinks')
|
| - self.failIf(file1['linktarget'], 'misparsed linktarget')
|
| - # file 2
|
| - self.failUnless(file2['filetype'] == 'l', 'misparsed fileitem')
|
| - self.failUnless(file2['perms'] == 'rw-r--r--', 'misparsed perms')
|
| - self.failUnless(file2['owner'] == 'root', 'misparsed owner')
|
| - self.failUnless(file2['group'] == 'other', 'misparsed group')
|
| - self.failUnless(file2['size'] == 1, 'misparsed size')
|
| - self.failUnless(file2['date'] == 'Jan 29 03:26', 'misparsed date')
|
| - self.failUnless(file2['filename'] == 'B', 'misparsed filename')
|
| - self.failUnless(file2['nlinks'] == 1, 'misparsed nlinks')
|
| - self.failUnless(file2['linktarget'] == 'A', 'misparsed linktarget')
|
| - return self.getFilesForLines([line1, line2, line3]).addCallback(check)
|
| -
|
| - def testUnknownLine(self):
|
| - def check((files, others)):
|
| - self.failIf(files, 'unexpected file entries')
|
| - self.failUnless(others == ['ABC\r', 'not a file\r'],
|
| - 'incorrect unparsable lines: %s' % repr(others))
|
| - return self.getFilesForLines(['ABC', 'not a file']).addCallback(check)
|
| -
|
| - def testYear(self):
|
| - # This example derived from bug description in issue 514.
|
| - fileList = ftp.FTPFileListProtocol()
|
| - exampleLine = (
|
| - '-rw-r--r-- 1 root other 531 Jan 29 2003 README\n')
|
| - class PrintLine(protocol.Protocol):
|
| - def connectionMade(self):
|
| - self.transport.write(exampleLine)
|
| - self.transport.loseConnection()
|
| -
|
| - def check(ignored):
|
| - file = fileList.files[0]
|
| - self.failUnless(file['size'] == 531, 'misparsed fileitem')
|
| - self.failUnless(file['date'] == 'Jan 29 2003', 'misparsed fileitem')
|
| - self.failUnless(file['filename'] == 'README', 'misparsed fileitem')
|
| -
|
| - d = loopback.loopbackAsync(PrintLine(), fileList)
|
| - return d.addCallback(check)
|
| -
|
| -
|
| -class FTPClientTests(unittest.TestCase):
|
| - def tearDown(self):
|
| - # Clean up self.port, if any.
|
| - port = getattr(self, 'port', None)
|
| - if port is not None:
|
| - return port.stopListening()
|
| -
|
| - def testFailedRETR(self):
|
| - f = protocol.Factory()
|
| - f.noisy = 0
|
| - self.port = reactor.listenTCP(0, f, interface="127.0.0.1")
|
| - portNum = self.port.getHost().port
|
| - # This test data derived from a bug report by ranty on #twisted
|
| - responses = ['220 ready, dude (vsFTPd 1.0.0: beat me, break me)',
|
| - # USER anonymous
|
| - '331 Please specify the password.',
|
| - # PASS twisted@twistedmatrix.com
|
| - '230 Login successful. Have fun.',
|
| - # TYPE I
|
| - '200 Binary it is, then.',
|
| - # PASV
|
| - '227 Entering Passive Mode (127,0,0,1,%d,%d)' %
|
| - (portNum >> 8, portNum & 0xff),
|
| - # RETR /file/that/doesnt/exist
|
| - '550 Failed to open file.']
|
| - f.buildProtocol = lambda addr: PrintLines(responses)
|
| -
|
| - client = ftp.FTPClient(passive=1)
|
| - cc = protocol.ClientCreator(reactor, ftp.FTPClient, passive=1)
|
| - d = cc.connectTCP('127.0.0.1', portNum)
|
| - def gotClient(client):
|
| - p = protocol.Protocol()
|
| - return client.retrieveFile('/file/that/doesnt/exist', p)
|
| - d.addCallback(gotClient)
|
| - return self.assertFailure(d, ftp.CommandFailed)
|
| -
|
| - def test_errbacksUponDisconnect(self):
|
| - """
|
| - Test the ftp command errbacks when a connection lost happens during
|
| - the operation.
|
| - """
|
| - ftpClient = ftp.FTPClient()
|
| - tr = proto_helpers.StringTransportWithDisconnection()
|
| - ftpClient.makeConnection(tr)
|
| - tr.protocol = ftpClient
|
| - d = ftpClient.list('some path', Dummy())
|
| - m = []
|
| - def _eb(failure):
|
| - m.append(failure)
|
| - return None
|
| - d.addErrback(_eb)
|
| - from twisted.internet.main import CONNECTION_LOST
|
| - ftpClient.connectionLost(failure.Failure(CONNECTION_LOST))
|
| - self.failUnless(m, m)
|
| - return d
|
| -
|
| -
|
| -
|
| -class FTPClientTestCase(unittest.TestCase):
|
| - """
|
| - Test advanced FTP client commands.
|
| - """
|
| - def setUp(self):
|
| - """
|
| - Create a FTP client and connect it to fake transport.
|
| - """
|
| - self.client = ftp.FTPClient()
|
| - self.transport = proto_helpers.StringTransportWithDisconnection()
|
| - self.client.makeConnection(self.transport)
|
| - self.transport.protocol = self.client
|
| -
|
| -
|
| - def tearDown(self):
|
| - """
|
| - Deliver disconnection notification to the client so that it can
|
| - perform any cleanup which may be required.
|
| - """
|
| - self.client.connectionLost(error.ConnectionLost())
|
| -
|
| -
|
| - def _testLogin(self):
|
| - """
|
| - Test the login part.
|
| - """
|
| - self.assertEquals(self.transport.value(), '')
|
| - self.client.lineReceived(
|
| - '331 Guest login ok, type your email address as password.')
|
| - self.assertEquals(self.transport.value(), 'USER anonymous\r\n')
|
| - self.transport.clear()
|
| - self.client.lineReceived(
|
| - '230 Anonymous login ok, access restrictions apply.')
|
| - self.assertEquals(self.transport.value(), 'TYPE I\r\n')
|
| - self.transport.clear()
|
| - self.client.lineReceived('200 Type set to I.')
|
| -
|
| -
|
| - def test_CDUP(self):
|
| - """
|
| - Test the CDUP command.
|
| -
|
| - L{ftp.FTPClient.cdup} should return a Deferred which fires with a
|
| - sequence of one element which is the string the server sent
|
| - indicating that the command was executed successfully.
|
| -
|
| - (XXX - This is a bad API)
|
| - """
|
| - def cbCdup(res):
|
| - self.assertEquals(res[0], '250 Requested File Action Completed OK')
|
| -
|
| - self._testLogin()
|
| - d = self.client.cdup().addCallback(cbCdup)
|
| - self.assertEquals(self.transport.value(), 'CDUP\r\n')
|
| - self.transport.clear()
|
| - self.client.lineReceived('250 Requested File Action Completed OK')
|
| - return d
|
| -
|
| -
|
| - def test_failedCDUP(self):
|
| - """
|
| - Test L{ftp.FTPClient.cdup}'s handling of a failed CDUP command.
|
| -
|
| - When the CDUP command fails, the returned Deferred should errback
|
| - with L{ftp.CommandFailed}.
|
| - """
|
| - self._testLogin()
|
| - d = self.client.cdup()
|
| - self.assertFailure(d, ftp.CommandFailed)
|
| - self.assertEquals(self.transport.value(), 'CDUP\r\n')
|
| - self.transport.clear()
|
| - self.client.lineReceived('550 ..: No such file or directory')
|
| - return d
|
| -
|
| -
|
| - def test_PWD(self):
|
| - """
|
| - Test the PWD command.
|
| -
|
| - L{ftp.FTPClient.pwd} should return a Deferred which fires with a
|
| - sequence of one element which is a string representing the current
|
| - working directory on the server.
|
| -
|
| - (XXX - This is a bad API)
|
| - """
|
| - def cbPwd(res):
|
| - self.assertEquals(ftp.parsePWDResponse(res[0]), "/bar/baz")
|
| -
|
| - self._testLogin()
|
| - d = self.client.pwd().addCallback(cbPwd)
|
| - self.assertEquals(self.transport.value(), 'PWD\r\n')
|
| - self.client.lineReceived('257 "/bar/baz"')
|
| - return d
|
| -
|
| -
|
| - def test_failedPWD(self):
|
| - """
|
| - Test a failure in PWD command.
|
| -
|
| - When the PWD command fails, the returned Deferred should errback
|
| - with L{ftp.CommandFailed}.
|
| - """
|
| - self._testLogin()
|
| - d = self.client.pwd()
|
| - self.assertFailure(d, ftp.CommandFailed)
|
| - self.assertEquals(self.transport.value(), 'PWD\r\n')
|
| - self.client.lineReceived('550 /bar/baz: No such file or directory')
|
| - return d
|
| -
|
| -
|
| - def test_CWD(self):
|
| - """
|
| - Test the CWD command.
|
| -
|
| - L{ftp.FTPClient.cwd} should return a Deferred which fires with a
|
| - sequence of one element which is the string the server sent
|
| - indicating that the command was executed successfully.
|
| -
|
| - (XXX - This is a bad API)
|
| - """
|
| - def cbCwd(res):
|
| - self.assertEquals(res[0], '250 Requested File Action Completed OK')
|
| -
|
| - self._testLogin()
|
| - d = self.client.cwd("bar/foo").addCallback(cbCwd)
|
| - self.assertEquals(self.transport.value(), 'CWD bar/foo\r\n')
|
| - self.client.lineReceived('250 Requested File Action Completed OK')
|
| - return d
|
| -
|
| -
|
| - def test_failedCWD(self):
|
| - """
|
| - Test a failure in CWD command.
|
| -
|
| - When the PWD command fails, the returned Deferred should errback
|
| - with L{ftp.CommandFailed}.
|
| - """
|
| - self._testLogin()
|
| - d = self.client.cwd("bar/foo")
|
| - self.assertFailure(d, ftp.CommandFailed)
|
| - self.assertEquals(self.transport.value(), 'CWD bar/foo\r\n')
|
| - self.client.lineReceived('550 bar/foo: No such file or directory')
|
| - return d
|
| -
|
| -
|
| - def test_passiveRETR(self):
|
| - """
|
| - Test the RETR command in passive mode: get a file and verify its
|
| - content.
|
| -
|
| - L{ftp.FTPClient.retrieveFile} should return a Deferred which fires
|
| - with the protocol instance passed to it after the download has
|
| - completed.
|
| -
|
| - (XXX - This API should be based on producers and consumers)
|
| - """
|
| - def cbRetr(res, proto):
|
| - self.assertEquals(proto.buffer, 'x' * 1000)
|
| -
|
| - def cbConnect(host, port, factory):
|
| - self.assertEquals(host, '127.0.0.1')
|
| - self.assertEquals(port, 12345)
|
| - proto = factory.buildProtocol((host, port))
|
| - proto.makeConnection(proto_helpers.StringTransport())
|
| - self.client.lineReceived(
|
| - '150 File status okay; about to open data connection.')
|
| - proto.dataReceived("x" * 1000)
|
| - proto.connectionLost(failure.Failure(error.ConnectionDone("")))
|
| -
|
| - self.client.connectFactory = cbConnect
|
| - self._testLogin()
|
| - proto = _BufferingProtocol()
|
| - d = self.client.retrieveFile("spam", proto)
|
| - d.addCallback(cbRetr, proto)
|
| - self.assertEquals(self.transport.value(), 'PASV\r\n')
|
| - self.transport.clear()
|
| - self.client.lineReceived('227 Entering Passive Mode (%s).' %
|
| - (ftp.encodeHostPort('127.0.0.1', 12345),))
|
| - self.assertEquals(self.transport.value(), 'RETR spam\r\n')
|
| - self.transport.clear()
|
| - self.client.lineReceived('226 Transfer Complete.')
|
| - return d
|
| -
|
| -
|
| - def test_RETR(self):
|
| - """
|
| - Test the RETR command in non-passive mode.
|
| -
|
| - Like L{test_passiveRETR} but in the configuration where the server
|
| - establishes the data connection to the client, rather than the other
|
| - way around.
|
| - """
|
| - self.client.passive = False
|
| -
|
| - def generatePort(portCmd):
|
| - portCmd.text = 'PORT %s' % (ftp.encodeHostPort('127.0.0.1', 9876),)
|
| - portCmd.protocol.makeConnection(proto_helpers.StringTransport())
|
| - portCmd.protocol.dataReceived("x" * 1000)
|
| - portCmd.protocol.connectionLost(
|
| - failure.Failure(error.ConnectionDone("")))
|
| -
|
| - def cbRetr(res, proto):
|
| - self.assertEquals(proto.buffer, 'x' * 1000)
|
| -
|
| - self.client.generatePortCommand = generatePort
|
| - self._testLogin()
|
| - proto = _BufferingProtocol()
|
| - d = self.client.retrieveFile("spam", proto)
|
| - d.addCallback(cbRetr, proto)
|
| - self.assertEquals(self.transport.value(), 'PORT %s\r\n' %
|
| - (ftp.encodeHostPort('127.0.0.1', 9876),))
|
| - self.transport.clear()
|
| - self.client.lineReceived('200 PORT OK')
|
| - self.assertEquals(self.transport.value(), 'RETR spam\r\n')
|
| - self.transport.clear()
|
| - self.client.lineReceived('226 Transfer Complete.')
|
| - return d
|
| -
|
| -
|
| - def test_failedRETR(self):
|
| - """
|
| - Try to RETR an unexisting file.
|
| -
|
| - L{ftp.FTPClient.retrieveFile} should return a Deferred which
|
| - errbacks with L{ftp.CommandFailed} if the server indicates the file
|
| - cannot be transferred for some reason.
|
| - """
|
| - def cbConnect(host, port, factory):
|
| - self.assertEquals(host, '127.0.0.1')
|
| - self.assertEquals(port, 12345)
|
| - proto = factory.buildProtocol((host, port))
|
| - proto.makeConnection(proto_helpers.StringTransport())
|
| - self.client.lineReceived(
|
| - '150 File status okay; about to open data connection.')
|
| - proto.connectionLost(failure.Failure(error.ConnectionDone("")))
|
| -
|
| - self.client.connectFactory = cbConnect
|
| - self._testLogin()
|
| - proto = _BufferingProtocol()
|
| - d = self.client.retrieveFile("spam", proto)
|
| - self.assertFailure(d, ftp.CommandFailed)
|
| - self.assertEquals(self.transport.value(), 'PASV\r\n')
|
| - self.transport.clear()
|
| - self.client.lineReceived('227 Entering Passive Mode (%s).' %
|
| - (ftp.encodeHostPort('127.0.0.1', 12345),))
|
| - self.assertEquals(self.transport.value(), 'RETR spam\r\n')
|
| - self.transport.clear()
|
| - self.client.lineReceived('550 spam: No such file or directory')
|
| - return d
|
| -
|
| -
|
| - def test_lostRETR(self):
|
| - """
|
| - Try a RETR, but disconnect during the transfer.
|
| - L{ftp.FTPClient.retrieveFile} should return a Deferred which
|
| - errbacks with L{ftp.ConnectionLost)
|
| - """
|
| - self.client.passive = False
|
| -
|
| - l = []
|
| - def generatePort(portCmd):
|
| - portCmd.text = 'PORT %s' % (ftp.encodeHostPort('127.0.0.1', 9876),)
|
| - tr = proto_helpers.StringTransportWithDisconnection()
|
| - portCmd.protocol.makeConnection(tr)
|
| - tr.protocol = portCmd.protocol
|
| - portCmd.protocol.dataReceived("x" * 500)
|
| - l.append(tr)
|
| -
|
| - self.client.generatePortCommand = generatePort
|
| - self._testLogin()
|
| - proto = _BufferingProtocol()
|
| - d = self.client.retrieveFile("spam", proto)
|
| - self.assertEquals(self.transport.value(), 'PORT %s\r\n' %
|
| - (ftp.encodeHostPort('127.0.0.1', 9876),))
|
| - self.transport.clear()
|
| - self.client.lineReceived('200 PORT OK')
|
| - self.assertEquals(self.transport.value(), 'RETR spam\r\n')
|
| -
|
| - self.assert_(l)
|
| - l[0].loseConnection()
|
| - self.transport.loseConnection()
|
| - self.assertFailure(d, ftp.ConnectionLost)
|
| - return d
|
| -
|
| -
|
| - def test_passiveSTOR(self):
|
| - """
|
| - Test the STOR command: send a file and verify its content.
|
| -
|
| - L{ftp.FTPClient.storeFile} should return a two-tuple of Deferreds.
|
| - The first of which should fire with a protocol instance when the
|
| - data connection has been established and is responsible for sending
|
| - the contents of the file. The second of which should fire when the
|
| - upload has completed, the data connection has been closed, and the
|
| - server has acknowledged receipt of the file.
|
| -
|
| - (XXX - storeFile should take a producer as an argument, instead, and
|
| - only return a Deferred which fires when the upload has succeeded or
|
| - failed).
|
| - """
|
| - tr = proto_helpers.StringTransport()
|
| - def cbStore(sender):
|
| - self.client.lineReceived(
|
| - '150 File status okay; about to open data connection.')
|
| - sender.transport.write("x" * 1000)
|
| - sender.finish()
|
| - sender.connectionLost(failure.Failure(error.ConnectionDone("")))
|
| -
|
| - def cbFinish(ign):
|
| - self.assertEquals(tr.value(), "x" * 1000)
|
| -
|
| - def cbConnect(host, port, factory):
|
| - self.assertEquals(host, '127.0.0.1')
|
| - self.assertEquals(port, 12345)
|
| - proto = factory.buildProtocol((host, port))
|
| - proto.makeConnection(tr)
|
| -
|
| - self.client.connectFactory = cbConnect
|
| - self._testLogin()
|
| - d1, d2 = self.client.storeFile("spam")
|
| - d1.addCallback(cbStore)
|
| - d2.addCallback(cbFinish)
|
| - self.assertEquals(self.transport.value(), 'PASV\r\n')
|
| - self.transport.clear()
|
| - self.client.lineReceived('227 Entering Passive Mode (%s).' %
|
| - (ftp.encodeHostPort('127.0.0.1', 12345),))
|
| - self.assertEquals(self.transport.value(), 'STOR spam\r\n')
|
| - self.transport.clear()
|
| - self.client.lineReceived('226 Transfer Complete.')
|
| - return defer.gatherResults([d1, d2])
|
| -
|
| -
|
| - def test_failedSTOR(self):
|
| - """
|
| - Test a failure in the STOR command.
|
| -
|
| - If the server does not acknowledge successful receipt of the
|
| - uploaded file, the second Deferred returned by
|
| - L{ftp.FTPClient.storeFile} should errback with L{ftp.CommandFailed}.
|
| - """
|
| - tr = proto_helpers.StringTransport()
|
| - def cbStore(sender):
|
| - self.client.lineReceived(
|
| - '150 File status okay; about to open data connection.')
|
| - sender.transport.write("x" * 1000)
|
| - sender.finish()
|
| - sender.connectionLost(failure.Failure(error.ConnectionDone("")))
|
| -
|
| - def cbConnect(host, port, factory):
|
| - self.assertEquals(host, '127.0.0.1')
|
| - self.assertEquals(port, 12345)
|
| - proto = factory.buildProtocol((host, port))
|
| - proto.makeConnection(tr)
|
| -
|
| - self.client.connectFactory = cbConnect
|
| - self._testLogin()
|
| - d1, d2 = self.client.storeFile("spam")
|
| - d1.addCallback(cbStore)
|
| - self.assertFailure(d2, ftp.CommandFailed)
|
| - self.assertEquals(self.transport.value(), 'PASV\r\n')
|
| - self.transport.clear()
|
| - self.client.lineReceived('227 Entering Passive Mode (%s).' %
|
| - (ftp.encodeHostPort('127.0.0.1', 12345),))
|
| - self.assertEquals(self.transport.value(), 'STOR spam\r\n')
|
| - self.transport.clear()
|
| - self.client.lineReceived(
|
| - '426 Transfer aborted. Data connection closed.')
|
| - return defer.gatherResults([d1, d2])
|
| -
|
| -
|
| - def test_STOR(self):
|
| - """
|
| - Test the STOR command in non-passive mode.
|
| -
|
| - Like L{test_passiveSTOR} but in the configuration where the server
|
| - establishes the data connection to the client, rather than the other
|
| - way around.
|
| - """
|
| - tr = proto_helpers.StringTransport()
|
| - self.client.passive = False
|
| - def generatePort(portCmd):
|
| - portCmd.text = 'PORT %s' % ftp.encodeHostPort('127.0.0.1', 9876)
|
| - portCmd.protocol.makeConnection(tr)
|
| -
|
| - def cbStore(sender):
|
| - self.assertEquals(self.transport.value(), 'PORT %s\r\n' %
|
| - (ftp.encodeHostPort('127.0.0.1', 9876),))
|
| - self.transport.clear()
|
| - self.client.lineReceived('200 PORT OK')
|
| - self.assertEquals(self.transport.value(), 'STOR spam\r\n')
|
| - self.transport.clear()
|
| - self.client.lineReceived(
|
| - '150 File status okay; about to open data connection.')
|
| - sender.transport.write("x" * 1000)
|
| - sender.finish()
|
| - sender.connectionLost(failure.Failure(error.ConnectionDone("")))
|
| - self.client.lineReceived('226 Transfer Complete.')
|
| -
|
| - def cbFinish(ign):
|
| - self.assertEquals(tr.value(), "x" * 1000)
|
| -
|
| - self.client.generatePortCommand = generatePort
|
| - self._testLogin()
|
| - d1, d2 = self.client.storeFile("spam")
|
| - d1.addCallback(cbStore)
|
| - d2.addCallback(cbFinish)
|
| - return defer.gatherResults([d1, d2])
|
| -
|
| -
|
| - def test_passiveLIST(self):
|
| - """
|
| - Test the LIST command.
|
| -
|
| - L{ftp.FTPClient.list} should return a Deferred which fires with a
|
| - protocol instance which was passed to list after the command has
|
| - succeeded.
|
| -
|
| - (XXX - This is a very unfortunate API; if my understanding is
|
| - correct, the results are always at least line-oriented, so allowing
|
| - a per-line parser function to be specified would make this simpler,
|
| - but a default implementation should really be provided which knows
|
| - how to deal with all the formats used in real servers, so
|
| - application developers never have to care about this insanity. It
|
| - would also be nice to either get back a Deferred of a list of
|
| - filenames or to be able to consume the files as they are received
|
| - (which the current API does allow, but in a somewhat inconvenient
|
| - fashion) -exarkun)
|
| - """
|
| - def cbList(res, fileList):
|
| - fls = [f["filename"] for f in fileList.files]
|
| - expected = ["foo", "bar", "baz"]
|
| - expected.sort()
|
| - fls.sort()
|
| - self.assertEquals(fls, expected)
|
| -
|
| - def cbConnect(host, port, factory):
|
| - self.assertEquals(host, '127.0.0.1')
|
| - self.assertEquals(port, 12345)
|
| - proto = factory.buildProtocol((host, port))
|
| - proto.makeConnection(proto_helpers.StringTransport())
|
| - self.client.lineReceived(
|
| - '150 File status okay; about to open data connection.')
|
| - sending = [
|
| - '-rw-r--r-- 0 spam egg 100 Oct 10 2006 foo\r\n',
|
| - '-rw-r--r-- 3 spam egg 100 Oct 10 2006 bar\r\n',
|
| - '-rw-r--r-- 4 spam egg 100 Oct 10 2006 baz\r\n',
|
| - ]
|
| - for i in sending:
|
| - proto.dataReceived(i)
|
| - proto.connectionLost(failure.Failure(error.ConnectionDone("")))
|
| -
|
| - self.client.connectFactory = cbConnect
|
| - self._testLogin()
|
| - fileList = ftp.FTPFileListProtocol()
|
| - d = self.client.list('foo/bar', fileList).addCallback(cbList, fileList)
|
| - self.assertEquals(self.transport.value(), 'PASV\r\n')
|
| - self.transport.clear()
|
| - self.client.lineReceived('227 Entering Passive Mode (%s).' %
|
| - (ftp.encodeHostPort('127.0.0.1', 12345),))
|
| - self.assertEquals(self.transport.value(), 'LIST foo/bar\r\n')
|
| - self.client.lineReceived('226 Transfer Complete.')
|
| - return d
|
| -
|
| -
|
| - def test_LIST(self):
|
| - """
|
| - Test the LIST command in non-passive mode.
|
| -
|
| - Like L{test_passiveLIST} but in the configuration where the server
|
| - establishes the data connection to the client, rather than the other
|
| - way around.
|
| - """
|
| - self.client.passive = False
|
| - def generatePort(portCmd):
|
| - portCmd.text = 'PORT %s' % (ftp.encodeHostPort('127.0.0.1', 9876),)
|
| - portCmd.protocol.makeConnection(proto_helpers.StringTransport())
|
| - self.client.lineReceived(
|
| - '150 File status okay; about to open data connection.')
|
| - sending = [
|
| - '-rw-r--r-- 0 spam egg 100 Oct 10 2006 foo\r\n',
|
| - '-rw-r--r-- 3 spam egg 100 Oct 10 2006 bar\r\n',
|
| - '-rw-r--r-- 4 spam egg 100 Oct 10 2006 baz\r\n',
|
| - ]
|
| - for i in sending:
|
| - portCmd.protocol.dataReceived(i)
|
| - portCmd.protocol.connectionLost(
|
| - failure.Failure(error.ConnectionDone("")))
|
| -
|
| - def cbList(res, fileList):
|
| - fls = [f["filename"] for f in fileList.files]
|
| - expected = ["foo", "bar", "baz"]
|
| - expected.sort()
|
| - fls.sort()
|
| - self.assertEquals(fls, expected)
|
| -
|
| - self.client.generatePortCommand = generatePort
|
| - self._testLogin()
|
| - fileList = ftp.FTPFileListProtocol()
|
| - d = self.client.list('foo/bar', fileList).addCallback(cbList, fileList)
|
| - self.assertEquals(self.transport.value(), 'PORT %s\r\n' %
|
| - (ftp.encodeHostPort('127.0.0.1', 9876),))
|
| - self.transport.clear()
|
| - self.client.lineReceived('200 PORT OK')
|
| - self.assertEquals(self.transport.value(), 'LIST foo/bar\r\n')
|
| - self.transport.clear()
|
| - self.client.lineReceived('226 Transfer Complete.')
|
| - return d
|
| -
|
| -
|
| - def test_failedLIST(self):
|
| - """
|
| - Test a failure in LIST command.
|
| -
|
| - L{ftp.FTPClient.list} should return a Deferred which fails with
|
| - L{ftp.CommandFailed} if the server indicates the indicated path is
|
| - invalid for some reason.
|
| - """
|
| - def cbConnect(host, port, factory):
|
| - self.assertEquals(host, '127.0.0.1')
|
| - self.assertEquals(port, 12345)
|
| - proto = factory.buildProtocol((host, port))
|
| - proto.makeConnection(proto_helpers.StringTransport())
|
| - self.client.lineReceived(
|
| - '150 File status okay; about to open data connection.')
|
| - proto.connectionLost(failure.Failure(error.ConnectionDone("")))
|
| -
|
| - self.client.connectFactory = cbConnect
|
| - self._testLogin()
|
| - fileList = ftp.FTPFileListProtocol()
|
| - d = self.client.list('foo/bar', fileList)
|
| - self.assertFailure(d, ftp.CommandFailed)
|
| - self.assertEquals(self.transport.value(), 'PASV\r\n')
|
| - self.transport.clear()
|
| - self.client.lineReceived('227 Entering Passive Mode (%s).' %
|
| - (ftp.encodeHostPort('127.0.0.1', 12345),))
|
| - self.assertEquals(self.transport.value(), 'LIST foo/bar\r\n')
|
| - self.client.lineReceived('550 foo/bar: No such file or directory')
|
| - return d
|
| -
|
| -
|
| - def test_NLST(self):
|
| - """
|
| - Test the NLST command in non-passive mode.
|
| -
|
| - L{ftp.FTPClient.nlst} should return a Deferred which fires with a
|
| - list of filenames when the list command has completed.
|
| - """
|
| - self.client.passive = False
|
| - def generatePort(portCmd):
|
| - portCmd.text = 'PORT %s' % (ftp.encodeHostPort('127.0.0.1', 9876),)
|
| - portCmd.protocol.makeConnection(proto_helpers.StringTransport())
|
| - self.client.lineReceived(
|
| - '150 File status okay; about to open data connection.')
|
| - portCmd.protocol.dataReceived('foo\r\n')
|
| - portCmd.protocol.dataReceived('bar\r\n')
|
| - portCmd.protocol.dataReceived('baz\r\n')
|
| - portCmd.protocol.connectionLost(
|
| - failure.Failure(error.ConnectionDone("")))
|
| -
|
| - def cbList(res, proto):
|
| - fls = proto.buffer.splitlines()
|
| - expected = ["foo", "bar", "baz"]
|
| - expected.sort()
|
| - fls.sort()
|
| - self.assertEquals(fls, expected)
|
| -
|
| - self.client.generatePortCommand = generatePort
|
| - self._testLogin()
|
| - lstproto = _BufferingProtocol()
|
| - d = self.client.nlst('foo/bar', lstproto).addCallback(cbList, lstproto)
|
| - self.assertEquals(self.transport.value(), 'PORT %s\r\n' %
|
| - (ftp.encodeHostPort('127.0.0.1', 9876),))
|
| - self.transport.clear()
|
| - self.client.lineReceived('200 PORT OK')
|
| - self.assertEquals(self.transport.value(), 'NLST foo/bar\r\n')
|
| - self.client.lineReceived('226 Transfer Complete.')
|
| - return d
|
| -
|
| -
|
| - def test_passiveNLST(self):
|
| - """
|
| - Test the NLST command.
|
| -
|
| - Like L{test_passiveNLST} but in the configuration where the server
|
| - establishes the data connection to the client, rather than the other
|
| - way around.
|
| - """
|
| - def cbList(res, proto):
|
| - fls = proto.buffer.splitlines()
|
| - expected = ["foo", "bar", "baz"]
|
| - expected.sort()
|
| - fls.sort()
|
| - self.assertEquals(fls, expected)
|
| -
|
| - def cbConnect(host, port, factory):
|
| - self.assertEquals(host, '127.0.0.1')
|
| - self.assertEquals(port, 12345)
|
| - proto = factory.buildProtocol((host, port))
|
| - proto.makeConnection(proto_helpers.StringTransport())
|
| - self.client.lineReceived(
|
| - '150 File status okay; about to open data connection.')
|
| - proto.dataReceived('foo\r\n')
|
| - proto.dataReceived('bar\r\n')
|
| - proto.dataReceived('baz\r\n')
|
| - proto.connectionLost(failure.Failure(error.ConnectionDone("")))
|
| -
|
| - self.client.connectFactory = cbConnect
|
| - self._testLogin()
|
| - lstproto = _BufferingProtocol()
|
| - d = self.client.nlst('foo/bar', lstproto).addCallback(cbList, lstproto)
|
| - self.assertEquals(self.transport.value(), 'PASV\r\n')
|
| - self.transport.clear()
|
| - self.client.lineReceived('227 Entering Passive Mode (%s).' %
|
| - (ftp.encodeHostPort('127.0.0.1', 12345),))
|
| - self.assertEquals(self.transport.value(), 'NLST foo/bar\r\n')
|
| - self.client.lineReceived('226 Transfer Complete.')
|
| - return d
|
| -
|
| -
|
| - def test_failedNLST(self):
|
| - """
|
| - Test a failure in NLST command.
|
| -
|
| - L{ftp.FTPClient.nlst} should return a Deferred which fails with
|
| - L{ftp.CommandFailed} if the server indicates the indicated path is
|
| - invalid for some reason.
|
| - """
|
| - tr = proto_helpers.StringTransport()
|
| - def cbConnect(host, port, factory):
|
| - self.assertEquals(host, '127.0.0.1')
|
| - self.assertEquals(port, 12345)
|
| - proto = factory.buildProtocol((host, port))
|
| - proto.makeConnection(tr)
|
| - self.client.lineReceived(
|
| - '150 File status okay; about to open data connection.')
|
| - proto.connectionLost(failure.Failure(error.ConnectionDone("")))
|
| -
|
| - self.client.connectFactory = cbConnect
|
| - self._testLogin()
|
| - lstproto = _BufferingProtocol()
|
| - d = self.client.nlst('foo/bar', lstproto)
|
| - self.assertFailure(d, ftp.CommandFailed)
|
| - self.assertEquals(self.transport.value(), 'PASV\r\n')
|
| - self.transport.clear()
|
| - self.client.lineReceived('227 Entering Passive Mode (%s).' %
|
| - (ftp.encodeHostPort('127.0.0.1', 12345),))
|
| - self.assertEquals(self.transport.value(), 'NLST foo/bar\r\n')
|
| - self.client.lineReceived('550 foo/bar: No such file or directory')
|
| - return d
|
| -
|
| -
|
| - def test_changeDirectory(self):
|
| - """
|
| - Test the changeDirectory method.
|
| -
|
| - L{ftp.FTPClient.changeDirectory} should return a Deferred which fires
|
| - with True if succeeded.
|
| - """
|
| - def cbCd(res):
|
| - self.assertEquals(res, True)
|
| -
|
| - self._testLogin()
|
| - d = self.client.changeDirectory("bar/foo").addCallback(cbCd)
|
| - self.assertEquals(self.transport.value(), 'CWD bar/foo\r\n')
|
| - self.client.lineReceived('250 Requested File Action Completed OK')
|
| - return d
|
| -
|
| -
|
| - def test_failedChangeDirectory(self):
|
| - """
|
| - Test a failure in the changeDirectory method.
|
| -
|
| - The behaviour here is the same as a failed CWD.
|
| - """
|
| - self._testLogin()
|
| - d = self.client.changeDirectory("bar/foo")
|
| - self.assertFailure(d, ftp.CommandFailed)
|
| - self.assertEquals(self.transport.value(), 'CWD bar/foo\r\n')
|
| - self.client.lineReceived('550 bar/foo: No such file or directory')
|
| - return d
|
| -
|
| -
|
| - def test_strangeFailedChangeDirectory(self):
|
| - """
|
| - Test a strange failure in changeDirectory method.
|
| -
|
| - L{ftp.FTPClient.changeDirectory} is stricter than CWD as it checks
|
| - code 250 for success.
|
| - """
|
| - self._testLogin()
|
| - d = self.client.changeDirectory("bar/foo")
|
| - self.assertFailure(d, ftp.CommandFailed)
|
| - self.assertEquals(self.transport.value(), 'CWD bar/foo\r\n')
|
| - self.client.lineReceived('252 I do what I want !')
|
| - return d
|
| -
|
| -
|
| - def test_getDirectory(self):
|
| - """
|
| - Test the getDirectory method.
|
| -
|
| - L{ftp.FTPClient.getDirectory} should return a Deferred which fires with
|
| - the current directory on the server. It wraps PWD command.
|
| - """
|
| - def cbGet(res):
|
| - self.assertEquals(res, "/bar/baz")
|
| -
|
| - self._testLogin()
|
| - d = self.client.getDirectory().addCallback(cbGet)
|
| - self.assertEquals(self.transport.value(), 'PWD\r\n')
|
| - self.client.lineReceived('257 "/bar/baz"')
|
| - return d
|
| -
|
| -
|
| - def test_failedGetDirectory(self):
|
| - """
|
| - Test a failure in getDirectory method.
|
| -
|
| - The behaviour should be the same as PWD.
|
| - """
|
| - self._testLogin()
|
| - d = self.client.getDirectory()
|
| - self.assertFailure(d, ftp.CommandFailed)
|
| - self.assertEquals(self.transport.value(), 'PWD\r\n')
|
| - self.client.lineReceived('550 /bar/baz: No such file or directory')
|
| - return d
|
| -
|
| -
|
| - def test_anotherFailedGetDirectory(self):
|
| - """
|
| - Test a different failure in getDirectory method.
|
| -
|
| - The response should be quoted to be parsed, so it returns an error
|
| - otherwise.
|
| - """
|
| - self._testLogin()
|
| - d = self.client.getDirectory()
|
| - self.assertFailure(d, ftp.CommandFailed)
|
| - self.assertEquals(self.transport.value(), 'PWD\r\n')
|
| - self.client.lineReceived('257 /bar/baz')
|
| - return d
|
| -
|
| -
|
| -
|
| -class DummyTransport:
|
| - def write(self, bytes):
|
| - pass
|
| -
|
| -class BufferingTransport:
|
| - buffer = ''
|
| - def write(self, bytes):
|
| - self.buffer += bytes
|
| -
|
| -
|
| -class FTPClientBasicTests(unittest.TestCase):
|
| -
|
| - def testGreeting(self):
|
| - # The first response is captured as a greeting.
|
| - ftpClient = ftp.FTPClientBasic()
|
| - ftpClient.lineReceived('220 Imaginary FTP.')
|
| - self.failUnlessEqual(['220 Imaginary FTP.'], ftpClient.greeting)
|
| -
|
| - def testResponseWithNoMessage(self):
|
| - # Responses with no message are still valid, i.e. three digits followed
|
| - # by a space is complete response.
|
| - ftpClient = ftp.FTPClientBasic()
|
| - ftpClient.lineReceived('220 ')
|
| - self.failUnlessEqual(['220 '], ftpClient.greeting)
|
| -
|
| - def testMultilineResponse(self):
|
| - ftpClient = ftp.FTPClientBasic()
|
| - ftpClient.transport = DummyTransport()
|
| - ftpClient.lineReceived('220 Imaginary FTP.')
|
| -
|
| - # Queue (and send) a dummy command, and set up a callback to capture the
|
| - # result
|
| - deferred = ftpClient.queueStringCommand('BLAH')
|
| - result = []
|
| - deferred.addCallback(result.append)
|
| - deferred.addErrback(self.fail)
|
| -
|
| - # Send the first line of a multiline response.
|
| - ftpClient.lineReceived('210-First line.')
|
| - self.failUnlessEqual([], result)
|
| -
|
| - # Send a second line, again prefixed with "nnn-".
|
| - ftpClient.lineReceived('123-Second line.')
|
| - self.failUnlessEqual([], result)
|
| -
|
| - # Send a plain line of text, no prefix.
|
| - ftpClient.lineReceived('Just some text.')
|
| - self.failUnlessEqual([], result)
|
| -
|
| - # Now send a short (less than 4 chars) line.
|
| - ftpClient.lineReceived('Hi')
|
| - self.failUnlessEqual([], result)
|
| -
|
| - # Now send an empty line.
|
| - ftpClient.lineReceived('')
|
| - self.failUnlessEqual([], result)
|
| -
|
| - # And a line with 3 digits in it, and nothing else.
|
| - ftpClient.lineReceived('321')
|
| - self.failUnlessEqual([], result)
|
| -
|
| - # Now finish it.
|
| - ftpClient.lineReceived('210 Done.')
|
| - self.failUnlessEqual(
|
| - ['210-First line.',
|
| - '123-Second line.',
|
| - 'Just some text.',
|
| - 'Hi',
|
| - '',
|
| - '321',
|
| - '210 Done.'], result[0])
|
| -
|
| - def testNoPasswordGiven(self):
|
| - """Passing None as the password avoids sending the PASS command."""
|
| - # Create a client, and give it a greeting.
|
| - ftpClient = ftp.FTPClientBasic()
|
| - ftpClient.transport = BufferingTransport()
|
| - ftpClient.lineReceived('220 Welcome to Imaginary FTP.')
|
| -
|
| - # Queue a login with no password
|
| - ftpClient.queueLogin('bob', None)
|
| - self.failUnlessEqual('USER bob\r\n', ftpClient.transport.buffer)
|
| -
|
| - # Clear the test buffer, acknowledge the USER command.
|
| - ftpClient.transport.buffer = ''
|
| - ftpClient.lineReceived('200 Hello bob.')
|
| -
|
| - # The client shouldn't have sent anything more (i.e. it shouldn't have
|
| - # sent a PASS command).
|
| - self.failUnlessEqual('', ftpClient.transport.buffer)
|
| -
|
| - def testNoPasswordNeeded(self):
|
| - """Receiving a 230 response to USER prevents PASS from being sent."""
|
| - # Create a client, and give it a greeting.
|
| - ftpClient = ftp.FTPClientBasic()
|
| - ftpClient.transport = BufferingTransport()
|
| - ftpClient.lineReceived('220 Welcome to Imaginary FTP.')
|
| -
|
| - # Queue a login with no password
|
| - ftpClient.queueLogin('bob', 'secret')
|
| - self.failUnlessEqual('USER bob\r\n', ftpClient.transport.buffer)
|
| -
|
| - # Clear the test buffer, acknowledge the USER command with a 230
|
| - # response code.
|
| - ftpClient.transport.buffer = ''
|
| - ftpClient.lineReceived('230 Hello bob. No password needed.')
|
| -
|
| - # The client shouldn't have sent anything more (i.e. it shouldn't have
|
| - # sent a PASS command).
|
| - self.failUnlessEqual('', ftpClient.transport.buffer)
|
| -
|
| -
|
| -class PathHandling(unittest.TestCase):
|
| - def testNormalizer(self):
|
| - for inp, outp in [('a', ['a']),
|
| - ('/a', ['a']),
|
| - ('/', []),
|
| - ('a/b/c', ['a', 'b', 'c']),
|
| - ('/a/b/c', ['a', 'b', 'c']),
|
| - ('/a/', ['a']),
|
| - ('a/', ['a'])]:
|
| - self.assertEquals(ftp.toSegments([], inp), outp)
|
| -
|
| - for inp, outp in [('b', ['a', 'b']),
|
| - ('b/', ['a', 'b']),
|
| - ('/b', ['b']),
|
| - ('/b/', ['b']),
|
| - ('b/c', ['a', 'b', 'c']),
|
| - ('b/c/', ['a', 'b', 'c']),
|
| - ('/b/c', ['b', 'c']),
|
| - ('/b/c/', ['b', 'c'])]:
|
| - self.assertEquals(ftp.toSegments(['a'], inp), outp)
|
| -
|
| - for inp, outp in [('//', []),
|
| - ('//a', ['a']),
|
| - ('a//', ['a']),
|
| - ('a//b', ['a', 'b'])]:
|
| - self.assertEquals(ftp.toSegments([], inp), outp)
|
| -
|
| - for inp, outp in [('//', []),
|
| - ('//b', ['b']),
|
| - ('b//c', ['a', 'b', 'c'])]:
|
| - self.assertEquals(ftp.toSegments(['a'], inp), outp)
|
| -
|
| - for inp, outp in [('..', []),
|
| - ('../', []),
|
| - ('a/..', ['x']),
|
| - ('/a/..', []),
|
| - ('/a/b/..', ['a']),
|
| - ('/a/b/../', ['a']),
|
| - ('/a/b/../c', ['a', 'c']),
|
| - ('/a/b/../c/', ['a', 'c']),
|
| - ('/a/b/../../c', ['c']),
|
| - ('/a/b/../../c/', ['c']),
|
| - ('/a/b/../../c/..', []),
|
| - ('/a/b/../../c/../', [])]:
|
| - self.assertEquals(ftp.toSegments(['x'], inp), outp)
|
| -
|
| - for inp in ['..', '../', 'a/../..', 'a/../../',
|
| - '/..', '/../', '/a/../..', '/a/../../',
|
| - '/a/b/../../..']:
|
| - self.assertRaises(ftp.InvalidPath, ftp.toSegments, [], inp)
|
| -
|
| - for inp in ['../..', '../../', '../a/../..']:
|
| - self.assertRaises(ftp.InvalidPath, ftp.toSegments, ['x'], inp)
|
| -
|
| -
|
| -
|
| -class ErrnoToFailureTestCase(unittest.TestCase):
|
| - """
|
| - Tests for L{ftp.errnoToFailure} errno checking.
|
| - """
|
| -
|
| - def test_notFound(self):
|
| - """
|
| - C{errno.ENOENT} should be translated to L{ftp.FileNotFoundError}.
|
| - """
|
| - d = ftp.errnoToFailure(errno.ENOENT, "foo")
|
| - return self.assertFailure(d, ftp.FileNotFoundError)
|
| -
|
| -
|
| - def test_permissionDenied(self):
|
| - """
|
| - C{errno.EPERM} should be translated to L{ftp.PermissionDeniedError}.
|
| - """
|
| - d = ftp.errnoToFailure(errno.EPERM, "foo")
|
| - return self.assertFailure(d, ftp.PermissionDeniedError)
|
| -
|
| -
|
| - def test_accessDenied(self):
|
| - """
|
| - C{errno.EACCES} should be translated to L{ftp.PermissionDeniedError}.
|
| - """
|
| - d = ftp.errnoToFailure(errno.EACCES, "foo")
|
| - return self.assertFailure(d, ftp.PermissionDeniedError)
|
| -
|
| -
|
| - def test_notDirectory(self):
|
| - """
|
| - C{errno.ENOTDIR} should be translated to L{ftp.IsNotADirectoryError}.
|
| - """
|
| - d = ftp.errnoToFailure(errno.ENOTDIR, "foo")
|
| - return self.assertFailure(d, ftp.IsNotADirectoryError)
|
| -
|
| -
|
| - def test_fileExists(self):
|
| - """
|
| - C{errno.EEXIST} should be translated to L{ftp.FileExistsError}.
|
| - """
|
| - d = ftp.errnoToFailure(errno.EEXIST, "foo")
|
| - return self.assertFailure(d, ftp.FileExistsError)
|
| -
|
| -
|
| - def test_isDirectory(self):
|
| - """
|
| - C{errno.EISDIR} should be translated to L{ftp.IsADirectoryError}.
|
| - """
|
| - d = ftp.errnoToFailure(errno.EISDIR, "foo")
|
| - return self.assertFailure(d, ftp.IsADirectoryError)
|
| -
|
| -
|
| - def test_passThrough(self):
|
| - """
|
| - If an unknown errno is passed to L{ftp.errnoToFailure}, it should let
|
| - the originating exception pass through.
|
| - """
|
| - try:
|
| - raise RuntimeError("bar")
|
| - except:
|
| - d = ftp.errnoToFailure(-1, "foo")
|
| - return self.assertFailure(d, RuntimeError)
|
| -
|
| -
|
| -
|
| -class AnonymousFTPShellTestCase(unittest.TestCase):
|
| - """
|
| - Test anynomous shell properties.
|
| - """
|
| -
|
| - def test_anonymousWrite(self):
|
| - """
|
| - Check that L{ftp.FTPAnonymousShell} returns an error when trying to
|
| - open it in write mode.
|
| - """
|
| - shell = ftp.FTPAnonymousShell('')
|
| - d = shell.openForWriting(('foo',))
|
| - self.assertFailure(d, ftp.PermissionDeniedError)
|
| - return d
|
| -
|
| -
|
| -
|
| -class IFTPShellTestsMixin:
|
| - """
|
| - Generic tests for the C{IFTPShell} interface.
|
| - """
|
| -
|
| - def directoryExists(self, path):
|
| - """
|
| - Test if the directory exists at C{path}.
|
| -
|
| - @param path: the relative path to check.
|
| - @type path: C{str}.
|
| -
|
| - @return: C{True} if C{path} exists and is a directory, C{False} if
|
| - it's not the case
|
| - @rtype: C{bool}
|
| - """
|
| - raise NotImplementedError()
|
| -
|
| -
|
| - def createDirectory(self, path):
|
| - """
|
| - Create a directory in C{path}.
|
| -
|
| - @param path: the relative path of the directory to create, with one
|
| - segment.
|
| - @type path: C{str}
|
| - """
|
| - raise NotImplementedError()
|
| -
|
| -
|
| - def fileExists(self, path):
|
| - """
|
| - Test if the file exists at C{path}.
|
| -
|
| - @param path: the relative path to check.
|
| - @type path: C{str}.
|
| -
|
| - @return: C{True} if C{path} exists and is a file, C{False} if it's not
|
| - the case.
|
| - @rtype: C{bool}
|
| - """
|
| - raise NotImplementedError()
|
| -
|
| -
|
| - def createFile(self, path, fileContent=''):
|
| - """
|
| - Create a file named C{path} with some content.
|
| -
|
| - @param path: the relative path of the file to create, without
|
| - directory.
|
| - @type path: C{str}
|
| -
|
| - @param fileContent: the content of the file.
|
| - @type fileContent: C{str}
|
| - """
|
| - raise NotImplementedError()
|
| -
|
| -
|
| - def test_createDirectory(self):
|
| - """
|
| - C{directoryExists} should report correctly about directory existence,
|
| - and C{createDirectory} should create a directory detectable by
|
| - C{directoryExists}.
|
| - """
|
| - self.assertFalse(self.directoryExists('bar'))
|
| - self.createDirectory('bar')
|
| - self.assertTrue(self.directoryExists('bar'))
|
| -
|
| -
|
| - def test_createFile(self):
|
| - """
|
| - C{fileExists} should report correctly about file existence, and
|
| - C{createFile} should create a file detectable by C{fileExists}.
|
| - """
|
| - self.assertFalse(self.fileExists('file.txt'))
|
| - self.createFile('file.txt')
|
| - self.assertTrue(self.fileExists('file.txt'))
|
| -
|
| -
|
| - def test_makeDirectory(self):
|
| - """
|
| - Create a directory and check it ends in the filesystem.
|
| - """
|
| - d = self.shell.makeDirectory(('foo',))
|
| - def cb(result):
|
| - self.assertTrue(self.directoryExists('foo'))
|
| - return d.addCallback(cb)
|
| -
|
| -
|
| - def test_makeDirectoryError(self):
|
| - """
|
| - Creating a directory that already exists should fail with a
|
| - C{ftp.FileExistsError}.
|
| - """
|
| - self.createDirectory('foo')
|
| - d = self.shell.makeDirectory(('foo',))
|
| - return self.assertFailure(d, ftp.FileExistsError)
|
| -
|
| -
|
| - def test_removeDirectory(self):
|
| - """
|
| - Try to remove a directory and check it's removed from the filesystem.
|
| - """
|
| - self.createDirectory('bar')
|
| - d = self.shell.removeDirectory(('bar',))
|
| - def cb(result):
|
| - self.assertFalse(self.directoryExists('bar'))
|
| - return d.addCallback(cb)
|
| -
|
| -
|
| - def test_removeDirectoryOnFile(self):
|
| - """
|
| - removeDirectory should not work in file and fail with a
|
| - C{ftp.IsNotADirectoryError}.
|
| - """
|
| - self.createFile('file.txt')
|
| - d = self.shell.removeDirectory(('file.txt',))
|
| - return self.assertFailure(d, ftp.IsNotADirectoryError)
|
| -
|
| -
|
| - def test_removeNotExistingDirectory(self):
|
| - """
|
| - Removing directory that doesn't exist should fail with a
|
| - C{ftp.FileNotFoundError}.
|
| - """
|
| - d = self.shell.removeDirectory(('bar',))
|
| - return self.assertFailure(d, ftp.FileNotFoundError)
|
| -
|
| -
|
| - def test_removeFile(self):
|
| - """
|
| - Try to remove a file and check it's removed from the filesystem.
|
| - """
|
| - self.createFile('file.txt')
|
| - d = self.shell.removeFile(('file.txt',))
|
| - def cb(res):
|
| - self.assertFalse(self.fileExists('file.txt'))
|
| - d.addCallback(cb)
|
| - return d
|
| -
|
| -
|
| - def test_removeFileOnDirectory(self):
|
| - """
|
| - removeFile should not work on directory.
|
| - """
|
| - self.createDirectory('ned')
|
| - d = self.shell.removeFile(('ned',))
|
| - return self.assertFailure(d, ftp.IsADirectoryError)
|
| -
|
| -
|
| - def test_removeNotExistingFile(self):
|
| - """
|
| - Try to remove a non existent file, and check it raises a
|
| - L{ivfs.NotFoundError}.
|
| - """
|
| - d = self.shell.removeFile(('foo',))
|
| - return self.assertFailure(d, ftp.FileNotFoundError)
|
| -
|
| -
|
| - def test_list(self):
|
| - """
|
| - Check the output of the list method.
|
| - """
|
| - self.createDirectory('ned')
|
| - self.createFile('file.txt')
|
| - d = self.shell.list(('.',))
|
| - def cb(l):
|
| - l.sort()
|
| - self.assertEquals(l,
|
| - [('file.txt', []), ('ned', [])])
|
| - return d.addCallback(cb)
|
| -
|
| -
|
| - def test_listWithStat(self):
|
| - """
|
| - Check the output of list with asked stats.
|
| - """
|
| - self.createDirectory('ned')
|
| - self.createFile('file.txt')
|
| - d = self.shell.list(('.',), ('size', 'permissions',))
|
| - def cb(l):
|
| - l.sort()
|
| - self.assertEquals(len(l), 2)
|
| - self.assertEquals(l[0][0], 'file.txt')
|
| - self.assertEquals(l[1][0], 'ned')
|
| - # Size and permissions are reported differently between platforms
|
| - # so just check they are present
|
| - self.assertEquals(len(l[0][1]), 2)
|
| - self.assertEquals(len(l[1][1]), 2)
|
| - return d.addCallback(cb)
|
| -
|
| -
|
| - def test_listWithInvalidStat(self):
|
| - """
|
| - Querying an invalid stat should result to a C{AttributeError}.
|
| - """
|
| - self.createDirectory('ned')
|
| - d = self.shell.list(('.',), ('size', 'whateverstat',))
|
| - return self.assertFailure(d, AttributeError)
|
| -
|
| -
|
| - def test_listFile(self):
|
| - """
|
| - Check the output of the list method on a file.
|
| - """
|
| - self.createFile('file.txt')
|
| - d = self.shell.list(('file.txt',))
|
| - def cb(l):
|
| - l.sort()
|
| - self.assertEquals(l,
|
| - [('file.txt', [])])
|
| - return d.addCallback(cb)
|
| -
|
| -
|
| - def test_listNotExistingDirectory(self):
|
| - """
|
| - list on a directory that doesn't exist should fail with a
|
| - L{ftp.FileNotFoundError}.
|
| - """
|
| - d = self.shell.list(('foo',))
|
| - return self.assertFailure(d, ftp.FileNotFoundError)
|
| -
|
| -
|
| - def test_access(self):
|
| - """
|
| - Try to access a resource.
|
| - """
|
| - self.createDirectory('ned')
|
| - d = self.shell.access(('ned',))
|
| - return d
|
| -
|
| -
|
| - def test_accessNotFound(self):
|
| - """
|
| - access should fail on a resource that doesn't exist.
|
| - """
|
| - d = self.shell.access(('foo',))
|
| - return self.assertFailure(d, ftp.FileNotFoundError)
|
| -
|
| -
|
| - def test_openForReading(self):
|
| - """
|
| - Check that openForReading returns an object providing C{ftp.IReadFile}.
|
| - """
|
| - self.createFile('file.txt')
|
| - d = self.shell.openForReading(('file.txt',))
|
| - def cb(res):
|
| - self.assertTrue(ftp.IReadFile.providedBy(res))
|
| - d.addCallback(cb)
|
| - return d
|
| -
|
| -
|
| - def test_openForReadingNotFound(self):
|
| - """
|
| - openForReading should fail with a C{ftp.FileNotFoundError} on a file
|
| - that doesn't exist.
|
| - """
|
| - d = self.shell.openForReading(('ned',))
|
| - return self.assertFailure(d, ftp.FileNotFoundError)
|
| -
|
| -
|
| - def test_openForReadingOnDirectory(self):
|
| - """
|
| - openForReading should not work on directory.
|
| - """
|
| - self.createDirectory('ned')
|
| - d = self.shell.openForReading(('ned',))
|
| - return self.assertFailure(d, ftp.IsADirectoryError)
|
| -
|
| -
|
| - def test_openForWriting(self):
|
| - """
|
| - Check that openForWriting returns an object providing C{ftp.IWriteFile}.
|
| - """
|
| - d = self.shell.openForWriting(('foo',))
|
| - def cb1(res):
|
| - self.assertTrue(ftp.IWriteFile.providedBy(res))
|
| - return res.receive().addCallback(cb2)
|
| - def cb2(res):
|
| - self.assertTrue(IConsumer.providedBy(res))
|
| - d.addCallback(cb1)
|
| - return d
|
| -
|
| -
|
| - def test_openForWritingExistingDirectory(self):
|
| - """
|
| - openForWriting should not be able to open a directory that already
|
| - exists.
|
| - """
|
| - self.createDirectory('ned')
|
| - d = self.shell.openForWriting(('ned',))
|
| - return self.assertFailure(d, ftp.IsADirectoryError)
|
| -
|
| -
|
| - def test_openForWritingInNotExistingDirectory(self):
|
| - """
|
| - openForWring should fail with a L{ftp.FileNotFoundError} if you specify
|
| - a file in a directory that doesn't exist.
|
| - """
|
| - self.createDirectory('ned')
|
| - d = self.shell.openForWriting(('ned', 'idonotexist', 'foo'))
|
| - return self.assertFailure(d, ftp.FileNotFoundError)
|
| -
|
| -
|
| - def test_statFile(self):
|
| - """
|
| - Check the output of the stat method on a file.
|
| - """
|
| - fileContent = 'wobble\n'
|
| - self.createFile('file.txt', fileContent)
|
| - d = self.shell.stat(('file.txt',), ('size', 'directory'))
|
| - def cb(res):
|
| - self.assertEquals(res[0], len(fileContent))
|
| - self.assertFalse(res[1])
|
| - d.addCallback(cb)
|
| - return d
|
| -
|
| -
|
| - def test_statDirectory(self):
|
| - """
|
| - Check the output of the stat method on a directory.
|
| - """
|
| - self.createDirectory('ned')
|
| - d = self.shell.stat(('ned',), ('size', 'directory'))
|
| - def cb(res):
|
| - self.assertTrue(res[1])
|
| - d.addCallback(cb)
|
| - return d
|
| -
|
| -
|
| - def test_statOwnerGroup(self):
|
| - """
|
| - Check the owner and groups stats.
|
| - """
|
| - self.createDirectory('ned')
|
| - d = self.shell.stat(('ned',), ('owner', 'group'))
|
| - def cb(res):
|
| - self.assertEquals(len(res), 2)
|
| - d.addCallback(cb)
|
| - return d
|
| -
|
| -
|
| - def test_statNotExisting(self):
|
| - """
|
| - stat should fail with L{ftp.FileNotFoundError} on a file that doesn't
|
| - exist.
|
| - """
|
| - d = self.shell.stat(('foo',), ('size', 'directory'))
|
| - return self.assertFailure(d, ftp.FileNotFoundError)
|
| -
|
| -
|
| - def test_invalidStat(self):
|
| - """
|
| - Querying an invalid stat should result to a C{AttributeError}.
|
| - """
|
| - self.createDirectory('ned')
|
| - d = self.shell.stat(('ned',), ('size', 'whateverstat'))
|
| - return self.assertFailure(d, AttributeError)
|
| -
|
| -
|
| - def test_rename(self):
|
| - """
|
| - Try to rename a directory.
|
| - """
|
| - self.createDirectory('ned')
|
| - d = self.shell.rename(('ned',), ('foo',))
|
| - def cb(res):
|
| - self.assertTrue(self.directoryExists('foo'))
|
| - self.assertFalse(self.directoryExists('ned'))
|
| - return d.addCallback(cb)
|
| -
|
| -
|
| - def test_renameNotExisting(self):
|
| - """
|
| - Renaming a directory that doesn't exist should fail with
|
| - L{ftp.FileNotFoundError}.
|
| - """
|
| - d = self.shell.rename(('foo',), ('bar',))
|
| - return self.assertFailure(d, ftp.FileNotFoundError)
|
| -
|
| -
|
| -
|
| -class FTPShellTestCase(unittest.TestCase, IFTPShellTestsMixin):
|
| - """
|
| - Tests for the C{ftp.FTPShell} object.
|
| - """
|
| -
|
| - def setUp(self):
|
| - """
|
| - Create a root directory and instantiate a shell.
|
| - """
|
| - self.root = filepath.FilePath(self.mktemp())
|
| - self.root.createDirectory()
|
| - self.shell = ftp.FTPShell(self.root)
|
| -
|
| -
|
| - def directoryExists(self, path):
|
| - """
|
| - Test if the directory exists at C{path}.
|
| - """
|
| - return self.root.child(path).isdir()
|
| -
|
| -
|
| - def createDirectory(self, path):
|
| - """
|
| - Create a directory in C{path}.
|
| - """
|
| - return self.root.child(path).createDirectory()
|
| -
|
| -
|
| - def fileExists(self, path):
|
| - """
|
| - Test if the file exists at C{path}.
|
| - """
|
| - return self.root.child(path).isfile()
|
| -
|
| -
|
| - def createFile(self, path, fileContent=''):
|
| - """
|
| - Create a file named C{path} with some content.
|
| - """
|
| - return self.root.child(path).setContent(fileContent)
|
| -
|
| -
|
| -
|
| -class TestConsumer(object):
|
| - """
|
| - A simple consumer for tests. It only works with non-streaming producers.
|
| -
|
| - @ivar producer: an object providing
|
| - L{twisted.internet.interfaces.IPullProducer}.
|
| - """
|
| -
|
| - implements(IConsumer)
|
| - producer = None
|
| -
|
| - def registerProducer(self, producer, streaming):
|
| - """
|
| - Simple register of producer, checks that no register has happened
|
| - before.
|
| - """
|
| - assert self.producer is None
|
| - self.buffer = []
|
| - self.producer = producer
|
| - self.producer.resumeProducing()
|
| -
|
| -
|
| - def unregisterProducer(self):
|
| - """
|
| - Unregister the producer, it should be done after a register.
|
| - """
|
| - assert self.producer is not None
|
| - self.producer = None
|
| -
|
| -
|
| - def write(self, data):
|
| - """
|
| - Save the data received.
|
| - """
|
| - self.buffer.append(data)
|
| - self.producer.resumeProducing()
|
| -
|
| -
|
| -
|
| -class TestProducer(object):
|
| - """
|
| - A dumb producer.
|
| - """
|
| -
|
| - def __init__(self, toProduce, consumer):
|
| - """
|
| - @param toProduce: data to write
|
| - @type toProduce: C{str}
|
| - @param consumer: the consumer of data.
|
| - @type consumer: C{IConsumer}
|
| - """
|
| - self.toProduce = toProduce
|
| - self.consumer = consumer
|
| -
|
| -
|
| - def start(self):
|
| - """
|
| - Send the data to consume.
|
| - """
|
| - self.consumer.write(self.toProduce)
|
| -
|
| -
|
| -
|
| -class IReadWriteTestsMixin:
|
| - """
|
| - Generic tests for the C{IReadFile} and C{IWriteFile} interfaces.
|
| - """
|
| -
|
| - def getFileReader(self, content):
|
| - """
|
| - Return an object providing C{IReadFile}, ready to send data C{content}.
|
| - """
|
| - raise NotImplementedError()
|
| -
|
| -
|
| - def getFileWriter(self):
|
| - """
|
| - Return an object providing C{IWriteFile}, ready to receive data.
|
| - """
|
| - raise NotImplementedError()
|
| -
|
| -
|
| - def getFileContent(self):
|
| - """
|
| - Return the content of the file used.
|
| - """
|
| - raise NotImplementedError()
|
| -
|
| -
|
| - def test_read(self):
|
| - """
|
| - Test L{ftp.IReadFile}: the implementation should have a send method
|
| - returning a C{Deferred} which fires when all the data has been sent
|
| - to the consumer, and the data should be correctly send to the consumer.
|
| - """
|
| - content = 'wobble\n'
|
| - consumer = TestConsumer()
|
| - def cbGet(reader):
|
| - return reader.send(consumer).addCallback(cbSend)
|
| - def cbSend(res):
|
| - self.assertEquals("".join(consumer.buffer), content)
|
| - return self.getFileReader(content).addCallback(cbGet)
|
| -
|
| -
|
| - def test_write(self):
|
| - """
|
| - Test L{ftp.IWriteFile}: the implementation should have a receive method
|
| - returning a C{Deferred} with fires with a consumer ready to receive
|
| - data to be written.
|
| - """
|
| - content = 'elbbow\n'
|
| - def cbGet(writer):
|
| - return writer.receive().addCallback(cbReceive)
|
| - def cbReceive(consumer):
|
| - producer = TestProducer(content, consumer)
|
| - consumer.registerProducer(None, True)
|
| - producer.start()
|
| - consumer.unregisterProducer()
|
| - self.assertEquals(self.getFileContent(), content)
|
| - return self.getFileWriter().addCallback(cbGet)
|
| -
|
| -
|
| -
|
| -class FTPReadWriteTestCase(unittest.TestCase, IReadWriteTestsMixin):
|
| - """
|
| - Tests for C{ftp._FileReader} and C{ftp._FileWriter}, the objects returned
|
| - by the shell in C{openForReading}/C{openForWriting}.
|
| - """
|
| -
|
| - def setUp(self):
|
| - """
|
| - Create a temporary file used later.
|
| - """
|
| - self.root = filepath.FilePath(self.mktemp())
|
| - self.root.createDirectory()
|
| - self.shell = ftp.FTPShell(self.root)
|
| - self.filename = "file.txt"
|
| -
|
| -
|
| - def getFileReader(self, content):
|
| - """
|
| - Return a C{ftp._FileReader} instance with a file opened for reading.
|
| - """
|
| - self.root.child(self.filename).setContent(content)
|
| - return self.shell.openForReading((self.filename,))
|
| -
|
| -
|
| - def getFileWriter(self):
|
| - """
|
| - Return a C{ftp._FileWriter} instance with a file opened for writing.
|
| - """
|
| - return self.shell.openForWriting((self.filename,))
|
| -
|
| -
|
| - def getFileContent(self):
|
| - """
|
| - Return the content of the temporary file.
|
| - """
|
| - return self.root.child(self.filename).getContent()
|
| -
|
|
|