Index: third_party/twisted_8_1/twisted/conch/test/test_connection.py |
diff --git a/third_party/twisted_8_1/twisted/conch/test/test_connection.py b/third_party/twisted_8_1/twisted/conch/test/test_connection.py |
deleted file mode 100644 |
index b0551882a2e0b54fe3b87f48fdd71ec70869089c..0000000000000000000000000000000000000000 |
--- a/third_party/twisted_8_1/twisted/conch/test/test_connection.py |
+++ /dev/null |
@@ -1,615 +0,0 @@ |
-# Copyright (c) 2007 Twisted Matrix Laboratories. |
-# See LICENSE for details |
- |
-""" |
-This module tests twisted.conch.ssh.connection. |
-""" |
-import struct |
-from twisted.conch import error |
-from twisted.conch.ssh import channel, common, connection |
-from twisted.trial import unittest |
-from twisted.conch.test import test_userauth |
- |
-class TestChannel(channel.SSHChannel): |
- """ |
- A mocked-up version of twisted.conch.ssh.channel.SSHChannel. |
- |
- @ivar gotOpen: True if channelOpen has been called. |
- @type gotOpen: C{bool} |
- @ivar specificData: the specific channel open data passed to channelOpen. |
- @type specificData: C{str} |
- @ivar openFailureReason: the reason passed to openFailed. |
- @type openFailed: C{error.ConchError} |
- @ivar inBuffer: a C{list} of strings received by the channel. |
- @type inBuffer: C{list} |
- @ivar extBuffer: a C{list} of 2-tuples (type, extended data) of received by |
- the channel. |
- @type extBuffer: C{list} |
- @ivar numberRequests: the number of requests that have been made to this |
- channel. |
- @type numberRequests: C{int} |
- @ivar gotEOF: True if the other side sent EOF. |
- @type gotEOF: C{bool} |
- @ivar gotOneClose: True if the other side closed the connection. |
- @type gotOneClose: C{bool} |
- @ivar gotClosed: True if the channel is closed. |
- @type gotClosed: C{bool} |
- """ |
- name = "TestChannel" |
- gotOpen = False |
- |
- def logPrefix(self): |
- return "TestChannel %i" % self.id |
- |
- def channelOpen(self, specificData): |
- """ |
- The channel is open. Set up the instance variables. |
- """ |
- self.gotOpen = True |
- self.specificData = specificData |
- self.inBuffer = [] |
- self.extBuffer = [] |
- self.numberRequests = 0 |
- self.gotEOF = False |
- self.gotOneClose = False |
- self.gotClosed = False |
- |
- def openFailed(self, reason): |
- """ |
- Opening the channel failed. Store the reason why. |
- """ |
- self.openFailureReason = reason |
- |
- def request_test(self, data): |
- """ |
- A test request. Return True if data is 'data'. |
- |
- @type data: C{str} |
- """ |
- self.numberRequests += 1 |
- return data == 'data' |
- |
- def dataReceived(self, data): |
- """ |
- Data was received. Store it in the buffer. |
- """ |
- self.inBuffer.append(data) |
- |
- def extReceived(self, code, data): |
- """ |
- Extended data was received. Store it in the buffer. |
- """ |
- self.extBuffer.append((code, data)) |
- |
- def eofReceived(self): |
- """ |
- EOF was received. Remember it. |
- """ |
- self.gotEOF = True |
- |
- def closeReceived(self): |
- """ |
- Close was received. Remember it. |
- """ |
- self.gotOneClose = True |
- |
- def closed(self): |
- """ |
- The channel is closed. Rembember it. |
- """ |
- self.gotClosed = True |
- |
-class TestAvatar: |
- """ |
- A mocked-up version of twisted.conch.avatar.ConchUser |
- """ |
- |
- def lookupChannel(self, channelType, windowSize, maxPacket, data): |
- """ |
- The server wants us to return a channel. If the requested channel is |
- our TestChannel, return it, otherwise return None. |
- """ |
- if channelType == TestChannel.name: |
- return TestChannel(remoteWindow=windowSize, |
- remoteMaxPacket=maxPacket, |
- data=data, avatar=self) |
- |
- def gotGlobalRequest(self, requestType, data): |
- """ |
- The client has made a global request. If the global request is |
- 'TestGlobal', return True. If the global request is 'TestData', |
- return True and the request-specific data we received. Otherwise, |
- return False. |
- """ |
- if requestType == 'TestGlobal': |
- return True |
- elif requestType == 'TestData': |
- return True, data |
- else: |
- return False |
- |
-class TestConnection(connection.SSHConnection): |
- """ |
- A subclass of SSHConnection for testing. |
- |
- @ivar channel: the current channel. |
- @type channel. C{TestChannel} |
- """ |
- |
- def logPrefix(self): |
- return "TestConnection" |
- |
- def global_TestGlobal(self, data): |
- """ |
- The other side made the 'TestGlobal' global request. Return True. |
- """ |
- return True |
- |
- def global_Test_Data(self, data): |
- """ |
- The other side made the 'Test-Data' global request. Return True and |
- the data we received. |
- """ |
- return True, data |
- |
- def channel_TestChannel(self, windowSize, maxPacket, data): |
- """ |
- The other side is requesting the TestChannel. Create a C{TestChannel} |
- instance, store it, and return it. |
- """ |
- self.channel = TestChannel(remoteWindow=windowSize, |
- remoteMaxPacket=maxPacket, data=data) |
- return self.channel |
- |
- def channel_ErrorChannel(self, windowSize, maxPacket, data): |
- """ |
- The other side is requesting the ErrorChannel. Raise an exception. |
- """ |
- raise AssertionError('no such thing') |
- |
-class ConnectionTestCase(unittest.TestCase): |
- |
- def setUp(self): |
- self.transport = test_userauth.FakeTransport(None) |
- self.transport.avatar = TestAvatar() |
- self.conn = TestConnection() |
- self.conn.transport = self.transport |
- self.conn.serviceStarted() |
- |
- def _openChannel(self, channel): |
- """ |
- Open the channel with the default connection. |
- """ |
- self.conn.openChannel(channel) |
- self.transport.packets = self.transport.packets[:-1] |
- self.conn.ssh_CHANNEL_OPEN_CONFIRMATION(struct.pack('>2L', |
- channel.id, 255) + '\x00\x02\x00\x00\x00\x00\x80\x00') |
- |
- def tearDown(self): |
- self.conn.serviceStopped() |
- |
- def test_linkAvatar(self): |
- """ |
- Test that the connection links itself to the avatar in the |
- transport. |
- """ |
- self.assertIdentical(self.transport.avatar.conn, self.conn) |
- |
- def test_serviceStopped(self): |
- """ |
- Test that serviceStopped() closes any open channels. |
- """ |
- channel1 = TestChannel() |
- channel2 = TestChannel() |
- self.conn.openChannel(channel1) |
- self.conn.openChannel(channel2) |
- self.conn.ssh_CHANNEL_OPEN_CONFIRMATION('\x00\x00\x00\x00' * 4) |
- self.assertTrue(channel1.gotOpen) |
- self.assertFalse(channel2.gotOpen) |
- self.conn.serviceStopped() |
- self.assertTrue(channel1.gotClosed) |
- |
- def test_GLOBAL_REQUEST(self): |
- """ |
- Test that global request packets are dispatched to the global_* |
- methods and the return values are translated into success or failure |
- messages. |
- """ |
- self.conn.ssh_GLOBAL_REQUEST(common.NS('TestGlobal') + '\xff') |
- self.assertEquals(self.transport.packets, |
- [(connection.MSG_REQUEST_SUCCESS, '')]) |
- self.transport.packets = [] |
- self.conn.ssh_GLOBAL_REQUEST(common.NS('TestData') + '\xff' + |
- 'test data') |
- self.assertEquals(self.transport.packets, |
- [(connection.MSG_REQUEST_SUCCESS, 'test data')]) |
- self.transport.packets = [] |
- self.conn.ssh_GLOBAL_REQUEST(common.NS('TestBad') + '\xff') |
- self.assertEquals(self.transport.packets, |
- [(connection.MSG_REQUEST_FAILURE, '')]) |
- self.transport.packets = [] |
- self.conn.ssh_GLOBAL_REQUEST(common.NS('TestGlobal') + '\x00') |
- self.assertEquals(self.transport.packets, []) |
- |
- def test_REQUEST_SUCCESS(self): |
- """ |
- Test that global request success packets cause the Deferred to be |
- called back. |
- """ |
- d = self.conn.sendGlobalRequest('request', 'data', True) |
- self.conn.ssh_REQUEST_SUCCESS('data') |
- def check(data): |
- self.assertEquals(data, 'data') |
- d.addCallback(check) |
- d.addErrback(self.fail) |
- return d |
- |
- def test_REQUEST_FAILURE(self): |
- """ |
- Test that global request failure packets cause the Deferred to be |
- erred back. |
- """ |
- d = self.conn.sendGlobalRequest('request', 'data', True) |
- self.conn.ssh_REQUEST_FAILURE('data') |
- def check(f): |
- self.assertEquals(f.value.data, 'data') |
- d.addCallback(self.fail) |
- d.addErrback(check) |
- return d |
- |
- def test_CHANNEL_OPEN(self): |
- """ |
- Test that open channel packets cause a channel to be created and |
- opened or a failure message to be returned. |
- """ |
- del self.transport.avatar |
- self.conn.ssh_CHANNEL_OPEN(common.NS('TestChannel') + |
- '\x00\x00\x00\x01' * 4) |
- self.assertTrue(self.conn.channel.gotOpen) |
- self.assertEquals(self.conn.channel.conn, self.conn) |
- self.assertEquals(self.conn.channel.data, '\x00\x00\x00\x01') |
- self.assertEquals(self.conn.channel.specificData, '\x00\x00\x00\x01') |
- self.assertEquals(self.conn.channel.remoteWindowLeft, 1) |
- self.assertEquals(self.conn.channel.remoteMaxPacket, 1) |
- self.assertEquals(self.transport.packets, |
- [(connection.MSG_CHANNEL_OPEN_CONFIRMATION, |
- '\x00\x00\x00\x01\x00\x00\x00\x00\x00\x02\x00\x00' |
- '\x00\x00\x80\x00')]) |
- self.transport.packets = [] |
- self.conn.ssh_CHANNEL_OPEN(common.NS('BadChannel') + |
- '\x00\x00\x00\x02' * 4) |
- self.flushLoggedErrors() |
- self.assertEquals(self.transport.packets, |
- [(connection.MSG_CHANNEL_OPEN_FAILURE, |
- '\x00\x00\x00\x02\x00\x00\x00\x03' + common.NS( |
- 'unknown channel') + common.NS(''))]) |
- self.transport.packets = [] |
- self.conn.ssh_CHANNEL_OPEN(common.NS('ErrorChannel') + |
- '\x00\x00\x00\x02' * 4) |
- self.flushLoggedErrors() |
- self.assertEquals(self.transport.packets, |
- [(connection.MSG_CHANNEL_OPEN_FAILURE, |
- '\x00\x00\x00\x02\x00\x00\x00\x02' + common.NS( |
- 'unknown failure') + common.NS(''))]) |
- |
- def test_CHANNEL_OPEN_CONFIRMATION(self): |
- """ |
- Test that channel open confirmation packets cause the channel to be |
- notified that it's open. |
- """ |
- channel = TestChannel() |
- self.conn.openChannel(channel) |
- self.conn.ssh_CHANNEL_OPEN_CONFIRMATION('\x00\x00\x00\x00'*5) |
- self.assertEquals(channel.remoteWindowLeft, 0) |
- self.assertEquals(channel.remoteMaxPacket, 0) |
- self.assertEquals(channel.specificData, '\x00\x00\x00\x00') |
- self.assertEquals(self.conn.channelsToRemoteChannel[channel], |
- 0) |
- self.assertEquals(self.conn.localToRemoteChannel[0], 0) |
- |
- def test_CHANNEL_OPEN_FAILURE(self): |
- """ |
- Test that channel open failure packets cause the channel to be |
- notified that its opening failed. |
- """ |
- channel = TestChannel() |
- self.conn.openChannel(channel) |
- self.conn.ssh_CHANNEL_OPEN_FAILURE('\x00\x00\x00\x00\x00\x00\x00' |
- '\x01' + common.NS('failure!')) |
- self.assertEquals(channel.openFailureReason.args, ('failure!', 1)) |
- self.assertEquals(self.conn.channels.get(channel), None) |
- |
- |
- def test_CHANNEL_WINDOW_ADJUST(self): |
- """ |
- Test that channel window adjust messages add bytes to the channel |
- window. |
- """ |
- channel = TestChannel() |
- self._openChannel(channel) |
- oldWindowSize = channel.remoteWindowLeft |
- self.conn.ssh_CHANNEL_WINDOW_ADJUST('\x00\x00\x00\x00\x00\x00\x00' |
- '\x01') |
- self.assertEquals(channel.remoteWindowLeft, oldWindowSize + 1) |
- |
- def test_CHANNEL_DATA(self): |
- """ |
- Test that channel data messages are passed up to the channel, or |
- cause the channel to be closed if the data is too large. |
- """ |
- channel = TestChannel(localWindow=6, localMaxPacket=5) |
- self._openChannel(channel) |
- self.conn.ssh_CHANNEL_DATA('\x00\x00\x00\x00' + common.NS('data')) |
- self.assertEquals(channel.inBuffer, ['data']) |
- self.assertEquals(self.transport.packets, |
- [(connection.MSG_CHANNEL_WINDOW_ADJUST, '\x00\x00\x00\xff' |
- '\x00\x00\x00\x04')]) |
- self.transport.packets = [] |
- longData = 'a' * (channel.localWindowLeft + 1) |
- self.conn.ssh_CHANNEL_DATA('\x00\x00\x00\x00' + common.NS(longData)) |
- self.assertEquals(channel.inBuffer, ['data']) |
- self.assertEquals(self.transport.packets, |
- [(connection.MSG_CHANNEL_CLOSE, '\x00\x00\x00\xff')]) |
- channel = TestChannel() |
- self._openChannel(channel) |
- bigData = 'a' * (channel.localMaxPacket + 1) |
- self.transport.packets = [] |
- self.conn.ssh_CHANNEL_DATA('\x00\x00\x00\x01' + common.NS(bigData)) |
- self.assertEquals(channel.inBuffer, []) |
- self.assertEquals(self.transport.packets, |
- [(connection.MSG_CHANNEL_CLOSE, '\x00\x00\x00\xff')]) |
- |
- def test_CHANNEL_EXTENDED_DATA(self): |
- """ |
- Test that channel extended data messages are passed up to the channel, |
- or cause the channel to be closed if they're too big. |
- """ |
- channel = TestChannel(localWindow=6, localMaxPacket=5) |
- self._openChannel(channel) |
- self.conn.ssh_CHANNEL_EXTENDED_DATA('\x00\x00\x00\x00\x00\x00\x00' |
- '\x00' + common.NS('data')) |
- self.assertEquals(channel.extBuffer, [(0, 'data')]) |
- self.assertEquals(self.transport.packets, |
- [(connection.MSG_CHANNEL_WINDOW_ADJUST, '\x00\x00\x00\xff' |
- '\x00\x00\x00\x04')]) |
- self.transport.packets = [] |
- longData = 'a' * (channel.localWindowLeft + 1) |
- self.conn.ssh_CHANNEL_EXTENDED_DATA('\x00\x00\x00\x00\x00\x00\x00' |
- '\x00' + common.NS(longData)) |
- self.assertEquals(channel.extBuffer, [(0, 'data')]) |
- self.assertEquals(self.transport.packets, |
- [(connection.MSG_CHANNEL_CLOSE, '\x00\x00\x00\xff')]) |
- channel = TestChannel() |
- self._openChannel(channel) |
- bigData = 'a' * (channel.localMaxPacket + 1) |
- self.transport.packets = [] |
- self.conn.ssh_CHANNEL_EXTENDED_DATA('\x00\x00\x00\x01\x00\x00\x00' |
- '\x00' + common.NS(bigData)) |
- self.assertEquals(channel.extBuffer, []) |
- self.assertEquals(self.transport.packets, |
- [(connection.MSG_CHANNEL_CLOSE, '\x00\x00\x00\xff')]) |
- |
- def test_CHANNEL_EOF(self): |
- """ |
- Test that channel eof messages are passed up to the channel. |
- """ |
- channel = TestChannel() |
- self._openChannel(channel) |
- self.conn.ssh_CHANNEL_EOF('\x00\x00\x00\x00') |
- self.assertTrue(channel.gotEOF) |
- |
- def test_CHANNEL_CLOSE(self): |
- """ |
- Test that channel close messages are passed up to the channel. Also, |
- test that channel.close() is called if both sides are closed when this |
- message is received. |
- """ |
- channel = TestChannel() |
- self._openChannel(channel) |
- self.conn.sendClose(channel) |
- self.conn.ssh_CHANNEL_CLOSE('\x00\x00\x00\x00') |
- self.assertTrue(channel.gotOneClose) |
- self.assertTrue(channel.gotClosed) |
- |
- def test_CHANNEL_REQUEST_success(self): |
- """ |
- Test that channel requests that succeed send MSG_CHANNEL_SUCCESS. |
- """ |
- channel = TestChannel() |
- self._openChannel(channel) |
- self.conn.ssh_CHANNEL_REQUEST('\x00\x00\x00\x00' + common.NS('test') |
- + '\x00') |
- self.assertEquals(channel.numberRequests, 1) |
- d = self.conn.ssh_CHANNEL_REQUEST('\x00\x00\x00\x00' + common.NS( |
- 'test') + '\xff' + 'data') |
- def check(result): |
- self.assertEquals(self.transport.packets, |
- [(connection.MSG_CHANNEL_SUCCESS, '\x00\x00\x00\xff')]) |
- d.addCallback(check) |
- return d |
- |
- def test_CHANNEL_REQUEST_failure(self): |
- """ |
- Test that channel requests that fail send MSG_CHANNEL_FAILURE. |
- """ |
- channel = TestChannel() |
- self._openChannel(channel) |
- d = self.conn.ssh_CHANNEL_REQUEST('\x00\x00\x00\x00' + common.NS( |
- 'test') + '\xff') |
- def check(result): |
- self.assertEquals(self.transport.packets, |
- [(connection.MSG_CHANNEL_FAILURE, '\x00\x00\x00\xff' |
- )]) |
- d.addCallback(self.fail) |
- d.addErrback(check) |
- return d |
- |
- def test_CHANNEL_REQUEST_SUCCESS(self): |
- """ |
- Test that channel request success messages cause the Deferred to be |
- called back. |
- """ |
- channel = TestChannel() |
- self._openChannel(channel) |
- d = self.conn.sendRequest(channel, 'test', 'data', True) |
- self.conn.ssh_CHANNEL_SUCCESS('\x00\x00\x00\x00') |
- def check(result): |
- self.assertTrue(result) |
- return d |
- |
- def test_CHANNEL_REQUEST_FAILURE(self): |
- """ |
- Test that channel request failure messages cause the Deferred to be |
- erred back. |
- """ |
- channel = TestChannel() |
- self._openChannel(channel) |
- d = self.conn.sendRequest(channel, 'test', '', True) |
- self.conn.ssh_CHANNEL_FAILURE('\x00\x00\x00\x00') |
- def check(result): |
- self.assertEquals(result.value.value, 'channel request failed') |
- d.addCallback(self.fail) |
- d.addErrback(check) |
- return d |
- |
- def test_sendGlobalRequest(self): |
- """ |
- Test that global request messages are sent in the right format. |
- """ |
- d = self.conn.sendGlobalRequest('wantReply', 'data', True) |
- self.conn.sendGlobalRequest('noReply', '', False) |
- self.assertEquals(self.transport.packets, |
- [(connection.MSG_GLOBAL_REQUEST, common.NS('wantReply') + |
- '\xffdata'), |
- (connection.MSG_GLOBAL_REQUEST, common.NS('noReply') + |
- '\x00')]) |
- self.assertEquals(self.conn.deferreds, {'global':[d]}) |
- |
- def test_openChannel(self): |
- """ |
- Test that open channel messages are sent in the right format. |
- """ |
- channel = TestChannel() |
- self.conn.openChannel(channel, 'aaaa') |
- self.assertEquals(self.transport.packets, |
- [(connection.MSG_CHANNEL_OPEN, common.NS('TestChannel') + |
- '\x00\x00\x00\x00\x00\x02\x00\x00\x00\x00\x80\x00aaaa')]) |
- self.assertEquals(channel.id, 0) |
- self.assertEquals(self.conn.localChannelID, 1) |
- |
- def test_sendRequest(self): |
- """ |
- Test that channel request messages are sent in the right format. |
- """ |
- channel = TestChannel() |
- self._openChannel(channel) |
- d = self.conn.sendRequest(channel, 'test', 'test', True) |
- self.conn.sendRequest(channel, 'test2', '', False) |
- channel.localClosed = True # emulate sending a close message |
- self.conn.sendRequest(channel, 'test3', '', True) |
- self.assertEquals(self.transport.packets, |
- [(connection.MSG_CHANNEL_REQUEST, '\x00\x00\x00\xff' + |
- common.NS('test') + '\x01test'), |
- (connection.MSG_CHANNEL_REQUEST, '\x00\x00\x00\xff' + |
- common.NS('test2') + '\x00')]) |
- self.assertEquals(self.conn.deferreds, {0:[d]}) |
- |
- def test_adjustWindow(self): |
- """ |
- Test that channel window adjust messages cause bytes to be added |
- to the window. |
- """ |
- channel = TestChannel(localWindow=5) |
- self._openChannel(channel) |
- channel.localWindowLeft = 0 |
- self.conn.adjustWindow(channel, 1) |
- self.assertEquals(channel.localWindowLeft, 1) |
- channel.localClosed = True |
- self.conn.adjustWindow(channel, 2) |
- self.assertEquals(channel.localWindowLeft, 1) |
- self.assertEquals(self.transport.packets, |
- [(connection.MSG_CHANNEL_WINDOW_ADJUST, '\x00\x00\x00\xff' |
- '\x00\x00\x00\x01')]) |
- |
- def test_sendData(self): |
- """ |
- Test that channel data messages are sent in the right format. |
- """ |
- channel = TestChannel() |
- self._openChannel(channel) |
- self.conn.sendData(channel, 'a') |
- channel.localClosed = True |
- self.conn.sendData(channel, 'b') |
- self.assertEquals(self.transport.packets, |
- [(connection.MSG_CHANNEL_DATA, '\x00\x00\x00\xff' + |
- common.NS('a'))]) |
- |
- def test_sendExtendedData(self): |
- """ |
- Test that channel extended data messages are sent in the right format. |
- """ |
- channel = TestChannel() |
- self._openChannel(channel) |
- self.conn.sendExtendedData(channel, 1, 'test') |
- channel.localClosed = True |
- self.conn.sendExtendedData(channel, 2, 'test2') |
- self.assertEquals(self.transport.packets, |
- [(connection.MSG_CHANNEL_EXTENDED_DATA, '\x00\x00\x00\xff' + |
- '\x00\x00\x00\x01' + common.NS('test'))]) |
- |
- def test_sendEOF(self): |
- """ |
- Test that channel EOF messages are sent in the right format. |
- """ |
- channel = TestChannel() |
- self._openChannel(channel) |
- self.conn.sendEOF(channel) |
- self.assertEquals(self.transport.packets, |
- [(connection.MSG_CHANNEL_EOF, '\x00\x00\x00\xff')]) |
- channel.localClosed = True |
- self.conn.sendEOF(channel) |
- self.assertEquals(self.transport.packets, |
- [(connection.MSG_CHANNEL_EOF, '\x00\x00\x00\xff')]) |
- |
- def test_sendClose(self): |
- """ |
- Test that channel close messages are sent in the right format. |
- """ |
- channel = TestChannel() |
- self._openChannel(channel) |
- self.conn.sendClose(channel) |
- self.assertTrue(channel.localClosed) |
- self.assertEquals(self.transport.packets, |
- [(connection.MSG_CHANNEL_CLOSE, '\x00\x00\x00\xff')]) |
- self.conn.sendClose(channel) |
- self.assertEquals(self.transport.packets, |
- [(connection.MSG_CHANNEL_CLOSE, '\x00\x00\x00\xff')]) |
- |
- channel2 = TestChannel() |
- self._openChannel(channel2) |
- channel2.remoteClosed = True |
- self.conn.sendClose(channel2) |
- self.assertTrue(channel2.gotClosed) |
- |
- def test_getChannelWithAvatar(self): |
- """ |
- Test that getChannel dispatches to the avatar when an avatar is |
- present. Correct functioning without the avatar is verified in |
- test_CHANNEL_OPEN. |
- """ |
- channel = self.conn.getChannel('TestChannel', 50, 30, 'data') |
- self.assertEquals(channel.data, 'data') |
- self.assertEquals(channel.remoteWindowLeft, 50) |
- self.assertEquals(channel.remoteMaxPacket, 30) |
- self.assertRaises(error.ConchError, self.conn.getChannel, |
- 'BadChannel', 50, 30, 'data') |
- |
- def test_gotGlobalRequestWithoutAvatar(self): |
- """ |
- Test that gotGlobalRequests dispatches to global_* without an avatar. |
- """ |
- del self.transport.avatar |
- self.assertTrue(self.conn.gotGlobalRequest('TestGlobal', 'data')) |
- self.assertEquals(self.conn.gotGlobalRequest('Test-Data', 'data'), |
- (True, 'data')) |
- self.assertFalse(self.conn.gotGlobalRequest('BadGlobal', 'data')) |