Index: third_party/twisted_8_1/twisted/conch/test/test_filetransfer.py |
diff --git a/third_party/twisted_8_1/twisted/conch/test/test_filetransfer.py b/third_party/twisted_8_1/twisted/conch/test/test_filetransfer.py |
deleted file mode 100644 |
index a779169023cd9cbeb4a409bad5b3c684d2691eaf..0000000000000000000000000000000000000000 |
--- a/third_party/twisted_8_1/twisted/conch/test/test_filetransfer.py |
+++ /dev/null |
@@ -1,513 +0,0 @@ |
-# -*- test-case-name: twisted.conch.test.test_filetransfer -*- |
-# Copyright (c) 2001-2008 Twisted Matrix Laboratories. |
-# See LICENSE file for details. |
- |
- |
-import os |
-import struct |
-import sys |
- |
-from twisted.trial import unittest |
-try: |
- from twisted.conch import unix |
- unix # shut up pyflakes |
-except ImportError: |
- unix = None |
- try: |
- del sys.modules['twisted.conch.unix'] # remove the bad import |
- except KeyError: |
- # In Python 2.4, the bad import has already been cleaned up for us. |
- # Hooray. |
- pass |
- |
-from twisted.conch import avatar |
-from twisted.conch.ssh import common, connection, filetransfer, session |
-from twisted.internet import defer |
-from twisted.protocols import loopback |
-from twisted.python import components |
- |
- |
-class TestAvatar(avatar.ConchUser): |
- def __init__(self): |
- avatar.ConchUser.__init__(self) |
- self.channelLookup['session'] = session.SSHSession |
- self.subsystemLookup['sftp'] = filetransfer.FileTransferServer |
- |
- def _runAsUser(self, f, *args, **kw): |
- try: |
- f = iter(f) |
- except TypeError: |
- f = [(f, args, kw)] |
- for i in f: |
- func = i[0] |
- args = len(i)>1 and i[1] or () |
- kw = len(i)>2 and i[2] or {} |
- r = func(*args, **kw) |
- return r |
- |
- |
-class FileTransferTestAvatar(TestAvatar): |
- |
- def __init__(self, homeDir): |
- TestAvatar.__init__(self) |
- self.homeDir = homeDir |
- |
- def getHomeDir(self): |
- return os.path.join(os.getcwd(), self.homeDir) |
- |
- |
-class ConchSessionForTestAvatar: |
- |
- def __init__(self, avatar): |
- self.avatar = avatar |
- |
-if unix: |
- if not hasattr(unix, 'SFTPServerForUnixConchUser'): |
- # unix should either be a fully working module, or None. I'm not sure |
- # how this happens, but on win32 it does. Try to cope. --spiv. |
- import warnings |
- warnings.warn(("twisted.conch.unix imported %r, " |
- "but doesn't define SFTPServerForUnixConchUser'") |
- % (unix,)) |
- unix = None |
- else: |
- class FileTransferForTestAvatar(unix.SFTPServerForUnixConchUser): |
- |
- def gotVersion(self, version, otherExt): |
- return {'conchTest' : 'ext data'} |
- |
- def extendedRequest(self, extName, extData): |
- if extName == 'testExtendedRequest': |
- return 'bar' |
- raise NotImplementedError |
- |
- components.registerAdapter(FileTransferForTestAvatar, |
- TestAvatar, |
- filetransfer.ISFTPServer) |
- |
-class SFTPTestBase(unittest.TestCase): |
- |
- def setUp(self): |
- self.testDir = self.mktemp() |
- # Give the testDir another level so we can safely "cd .." from it in |
- # tests. |
- self.testDir = os.path.join(self.testDir, 'extra') |
- os.makedirs(os.path.join(self.testDir, 'testDirectory')) |
- |
- f = file(os.path.join(self.testDir, 'testfile1'),'w') |
- f.write('a'*10+'b'*10) |
- f.write(file('/dev/urandom').read(1024*64)) # random data |
- os.chmod(os.path.join(self.testDir, 'testfile1'), 0644) |
- file(os.path.join(self.testDir, 'testRemoveFile'), 'w').write('a') |
- file(os.path.join(self.testDir, 'testRenameFile'), 'w').write('a') |
- file(os.path.join(self.testDir, '.testHiddenFile'), 'w').write('a') |
- |
- |
-class TestOurServerOurClient(SFTPTestBase): |
- |
- if not unix: |
- skip = "can't run on non-posix computers" |
- |
- def setUp(self): |
- SFTPTestBase.setUp(self) |
- |
- self.avatar = FileTransferTestAvatar(self.testDir) |
- self.server = filetransfer.FileTransferServer(avatar=self.avatar) |
- clientTransport = loopback.LoopbackRelay(self.server) |
- |
- self.client = filetransfer.FileTransferClient() |
- self._serverVersion = None |
- self._extData = None |
- def _(serverVersion, extData): |
- self._serverVersion = serverVersion |
- self._extData = extData |
- self.client.gotServerVersion = _ |
- serverTransport = loopback.LoopbackRelay(self.client) |
- self.client.makeConnection(clientTransport) |
- self.server.makeConnection(serverTransport) |
- |
- self.clientTransport = clientTransport |
- self.serverTransport = serverTransport |
- |
- self._emptyBuffers() |
- |
- |
- def _emptyBuffers(self): |
- while self.serverTransport.buffer or self.clientTransport.buffer: |
- self.serverTransport.clearBuffer() |
- self.clientTransport.clearBuffer() |
- |
- |
- def testServerVersion(self): |
- self.failUnlessEqual(self._serverVersion, 3) |
- self.failUnlessEqual(self._extData, {'conchTest' : 'ext data'}) |
- |
- def testOpenFileIO(self): |
- d = self.client.openFile("testfile1", filetransfer.FXF_READ | |
- filetransfer.FXF_WRITE, {}) |
- self._emptyBuffers() |
- |
- def _fileOpened(openFile): |
- self.failUnlessEqual(openFile, filetransfer.ISFTPFile(openFile)) |
- d = _readChunk(openFile) |
- d.addCallback(_writeChunk, openFile) |
- return d |
- |
- def _readChunk(openFile): |
- d = openFile.readChunk(0, 20) |
- self._emptyBuffers() |
- d.addCallback(self.failUnlessEqual, 'a'*10 + 'b'*10) |
- return d |
- |
- def _writeChunk(_, openFile): |
- d = openFile.writeChunk(20, 'c'*10) |
- self._emptyBuffers() |
- d.addCallback(_readChunk2, openFile) |
- return d |
- |
- def _readChunk2(_, openFile): |
- d = openFile.readChunk(0, 30) |
- self._emptyBuffers() |
- d.addCallback(self.failUnlessEqual, 'a'*10 + 'b'*10 + 'c'*10) |
- return d |
- |
- d.addCallback(_fileOpened) |
- return d |
- |
- def testClosedFileGetAttrs(self): |
- d = self.client.openFile("testfile1", filetransfer.FXF_READ | |
- filetransfer.FXF_WRITE, {}) |
- self._emptyBuffers() |
- |
- def _getAttrs(_, openFile): |
- d = openFile.getAttrs() |
- self._emptyBuffers() |
- return d |
- |
- def _err(f): |
- self.flushLoggedErrors() |
- return f |
- |
- def _close(openFile): |
- d = openFile.close() |
- self._emptyBuffers() |
- d.addCallback(_getAttrs, openFile) |
- d.addErrback(_err) |
- return self.assertFailure(d, filetransfer.SFTPError) |
- |
- d.addCallback(_close) |
- return d |
- |
- def testOpenFileAttributes(self): |
- d = self.client.openFile("testfile1", filetransfer.FXF_READ | |
- filetransfer.FXF_WRITE, {}) |
- self._emptyBuffers() |
- |
- def _getAttrs(openFile): |
- d = openFile.getAttrs() |
- self._emptyBuffers() |
- d.addCallback(_getAttrs2) |
- return d |
- |
- def _getAttrs2(attrs1): |
- d = self.client.getAttrs('testfile1') |
- self._emptyBuffers() |
- d.addCallback(self.failUnlessEqual, attrs1) |
- return d |
- |
- return d.addCallback(_getAttrs) |
- |
- |
- def testOpenFileSetAttrs(self): |
- # XXX test setAttrs |
- # Ok, how about this for a start? It caught a bug :) -- spiv. |
- d = self.client.openFile("testfile1", filetransfer.FXF_READ | |
- filetransfer.FXF_WRITE, {}) |
- self._emptyBuffers() |
- |
- def _getAttrs(openFile): |
- d = openFile.getAttrs() |
- self._emptyBuffers() |
- d.addCallback(_setAttrs) |
- return d |
- |
- def _setAttrs(attrs): |
- attrs['atime'] = 0 |
- d = self.client.setAttrs('testfile1', attrs) |
- self._emptyBuffers() |
- d.addCallback(_getAttrs2) |
- d.addCallback(self.failUnlessEqual, attrs) |
- return d |
- |
- def _getAttrs2(_): |
- d = self.client.getAttrs('testfile1') |
- self._emptyBuffers() |
- return d |
- |
- d.addCallback(_getAttrs) |
- return d |
- |
- |
- def test_openFileExtendedAttributes(self): |
- """ |
- Check that L{filetransfer.FileTransferClient.openFile} can send |
- extended attributes, that should be extracted server side. By default, |
- they are ignored, so we just verify they are correctly parsed. |
- """ |
- savedAttributes = {} |
- def openFile(filename, flags, attrs): |
- savedAttributes.update(attrs) |
- self.server.client.openFile = openFile |
- |
- d = self.client.openFile("testfile1", filetransfer.FXF_READ | |
- filetransfer.FXF_WRITE, {"ext_foo": "bar"}) |
- self._emptyBuffers() |
- |
- def check(ign): |
- self.assertEquals(savedAttributes, {"ext_foo": "bar"}) |
- |
- return d.addCallback(check) |
- |
- |
- def testRemoveFile(self): |
- d = self.client.getAttrs("testRemoveFile") |
- self._emptyBuffers() |
- def _removeFile(ignored): |
- d = self.client.removeFile("testRemoveFile") |
- self._emptyBuffers() |
- return d |
- d.addCallback(_removeFile) |
- d.addCallback(_removeFile) |
- return self.assertFailure(d, filetransfer.SFTPError) |
- |
- def testRenameFile(self): |
- d = self.client.getAttrs("testRenameFile") |
- self._emptyBuffers() |
- def _rename(attrs): |
- d = self.client.renameFile("testRenameFile", "testRenamedFile") |
- self._emptyBuffers() |
- d.addCallback(_testRenamed, attrs) |
- return d |
- def _testRenamed(_, attrs): |
- d = self.client.getAttrs("testRenamedFile") |
- self._emptyBuffers() |
- d.addCallback(self.failUnlessEqual, attrs) |
- return d.addCallback(_rename) |
- |
- def testDirectoryBad(self): |
- d = self.client.getAttrs("testMakeDirectory") |
- self._emptyBuffers() |
- return self.assertFailure(d, filetransfer.SFTPError) |
- |
- def testDirectoryCreation(self): |
- d = self.client.makeDirectory("testMakeDirectory", {}) |
- self._emptyBuffers() |
- |
- def _getAttrs(_): |
- d = self.client.getAttrs("testMakeDirectory") |
- self._emptyBuffers() |
- return d |
- |
- # XXX not until version 4/5 |
- # self.failUnlessEqual(filetransfer.FILEXFER_TYPE_DIRECTORY&attrs['type'], |
- # filetransfer.FILEXFER_TYPE_DIRECTORY) |
- |
- def _removeDirectory(_): |
- d = self.client.removeDirectory("testMakeDirectory") |
- self._emptyBuffers() |
- return d |
- |
- d.addCallback(_getAttrs) |
- d.addCallback(_removeDirectory) |
- d.addCallback(_getAttrs) |
- return self.assertFailure(d, filetransfer.SFTPError) |
- |
- def testOpenDirectory(self): |
- d = self.client.openDirectory('') |
- self._emptyBuffers() |
- files = [] |
- |
- def _getFiles(openDir): |
- def append(f): |
- files.append(f) |
- return openDir |
- d = defer.maybeDeferred(openDir.next) |
- self._emptyBuffers() |
- d.addCallback(append) |
- d.addCallback(_getFiles) |
- d.addErrback(_close, openDir) |
- return d |
- |
- def _checkFiles(ignored): |
- fs = list(zip(*files)[0]) |
- fs.sort() |
- self.failUnlessEqual(fs, |
- ['.testHiddenFile', 'testDirectory', |
- 'testRemoveFile', 'testRenameFile', |
- 'testfile1']) |
- |
- def _close(_, openDir): |
- d = openDir.close() |
- self._emptyBuffers() |
- return d |
- |
- d.addCallback(_getFiles) |
- d.addCallback(_checkFiles) |
- return d |
- |
- def testLinkDoesntExist(self): |
- d = self.client.getAttrs('testLink') |
- self._emptyBuffers() |
- return self.assertFailure(d, filetransfer.SFTPError) |
- |
- def testLinkSharesAttrs(self): |
- d = self.client.makeLink('testLink', 'testfile1') |
- self._emptyBuffers() |
- def _getFirstAttrs(_): |
- d = self.client.getAttrs('testLink', 1) |
- self._emptyBuffers() |
- return d |
- def _getSecondAttrs(firstAttrs): |
- d = self.client.getAttrs('testfile1') |
- self._emptyBuffers() |
- d.addCallback(self.assertEqual, firstAttrs) |
- return d |
- d.addCallback(_getFirstAttrs) |
- return d.addCallback(_getSecondAttrs) |
- |
- def testLinkPath(self): |
- d = self.client.makeLink('testLink', 'testfile1') |
- self._emptyBuffers() |
- def _readLink(_): |
- d = self.client.readLink('testLink') |
- self._emptyBuffers() |
- d.addCallback(self.failUnlessEqual, |
- os.path.join(os.getcwd(), self.testDir, 'testfile1')) |
- return d |
- def _realPath(_): |
- d = self.client.realPath('testLink') |
- self._emptyBuffers() |
- d.addCallback(self.failUnlessEqual, |
- os.path.join(os.getcwd(), self.testDir, 'testfile1')) |
- return d |
- d.addCallback(_readLink) |
- d.addCallback(_realPath) |
- return d |
- |
- def testExtendedRequest(self): |
- d = self.client.extendedRequest('testExtendedRequest', 'foo') |
- self._emptyBuffers() |
- d.addCallback(self.failUnlessEqual, 'bar') |
- d.addCallback(self._cbTestExtendedRequest) |
- return d |
- |
- def _cbTestExtendedRequest(self, ignored): |
- d = self.client.extendedRequest('testBadRequest', '') |
- self._emptyBuffers() |
- return self.assertFailure(d, NotImplementedError) |
- |
- |
-class FakeConn: |
- def sendClose(self, channel): |
- pass |
- |
- |
-class TestFileTransferClose(unittest.TestCase): |
- |
- if not unix: |
- skip = "can't run on non-posix computers" |
- |
- def setUp(self): |
- self.avatar = TestAvatar() |
- |
- def buildServerConnection(self): |
- # make a server connection |
- conn = connection.SSHConnection() |
- # server connections have a 'self.transport.avatar'. |
- class DummyTransport: |
- def __init__(self): |
- self.transport = self |
- def sendPacket(self, kind, data): |
- pass |
- def logPrefix(self): |
- return 'dummy transport' |
- conn.transport = DummyTransport() |
- conn.transport.avatar = self.avatar |
- return conn |
- |
- def interceptConnectionLost(self, sftpServer): |
- self.connectionLostFired = False |
- origConnectionLost = sftpServer.connectionLost |
- def connectionLost(reason): |
- self.connectionLostFired = True |
- origConnectionLost(reason) |
- sftpServer.connectionLost = connectionLost |
- |
- def assertSFTPConnectionLost(self): |
- self.assertTrue(self.connectionLostFired, |
- "sftpServer's connectionLost was not called") |
- |
- def test_sessionClose(self): |
- """ |
- Closing a session should notify an SFTP subsystem launched by that |
- session. |
- """ |
- # make a session |
- testSession = session.SSHSession(conn=FakeConn(), avatar=self.avatar) |
- |
- # start an SFTP subsystem on the session |
- testSession.request_subsystem(common.NS('sftp')) |
- sftpServer = testSession.client.transport.proto |
- |
- # intercept connectionLost so we can check that it's called |
- self.interceptConnectionLost(sftpServer) |
- |
- # close session |
- testSession.closeReceived() |
- |
- self.assertSFTPConnectionLost() |
- |
- def test_clientClosesChannelOnConnnection(self): |
- """ |
- A client sending CHANNEL_CLOSE should trigger closeReceived on the |
- associated channel instance. |
- """ |
- conn = self.buildServerConnection() |
- |
- # somehow get a session |
- packet = common.NS('session') + struct.pack('>L', 0) * 3 |
- conn.ssh_CHANNEL_OPEN(packet) |
- sessionChannel = conn.channels[0] |
- |
- sessionChannel.request_subsystem(common.NS('sftp')) |
- sftpServer = sessionChannel.client.transport.proto |
- self.interceptConnectionLost(sftpServer) |
- |
- # intercept closeReceived |
- self.interceptConnectionLost(sftpServer) |
- |
- # close the connection |
- conn.ssh_CHANNEL_CLOSE(struct.pack('>L', 0)) |
- |
- self.assertSFTPConnectionLost() |
- |
- |
- def test_stopConnectionServiceClosesChannel(self): |
- """ |
- Closing an SSH connection should close all sessions within it. |
- """ |
- conn = self.buildServerConnection() |
- |
- # somehow get a session |
- packet = common.NS('session') + struct.pack('>L', 0) * 3 |
- conn.ssh_CHANNEL_OPEN(packet) |
- sessionChannel = conn.channels[0] |
- |
- sessionChannel.request_subsystem(common.NS('sftp')) |
- sftpServer = sessionChannel.client.transport.proto |
- self.interceptConnectionLost(sftpServer) |
- |
- # close the connection |
- conn.serviceStopped() |
- |
- self.assertSFTPConnectionLost() |