| Index: third_party/twisted_8_1/twisted/words/test/test_msn.py
|
| diff --git a/third_party/twisted_8_1/twisted/words/test/test_msn.py b/third_party/twisted_8_1/twisted/words/test/test_msn.py
|
| deleted file mode 100644
|
| index 97e486c5e4ef21ab3cddc3d3b9fd9bd111561403..0000000000000000000000000000000000000000
|
| --- a/third_party/twisted_8_1/twisted/words/test/test_msn.py
|
| +++ /dev/null
|
| @@ -1,379 +0,0 @@
|
| -# Copyright (c) 2001-2005 Twisted Matrix Laboratories.
|
| -# See LICENSE for details.
|
| -
|
| -"""
|
| -Test cases for twisted.words.protocols.msn
|
| -"""
|
| -
|
| -# System imports
|
| -import StringIO, sys
|
| -
|
| -# Twisted imports
|
| -
|
| -# t.w.p.msn requires an HTTP client
|
| -try:
|
| - # So try to get one - do it directly instead of catching an ImportError
|
| - # from t.w.p.msn so that other problems which cause that module to fail
|
| - # to import don't cause the tests to be skipped.
|
| - from twisted.web import client
|
| -except ImportError:
|
| - # If there isn't one, we're going to skip all the tests.
|
| - msn = None
|
| -else:
|
| - # Otherwise importing it should work, so do it.
|
| - from twisted.words.protocols import msn
|
| -
|
| -
|
| -from twisted.protocols import loopback
|
| -from twisted.internet.defer import Deferred
|
| -from twisted.trial import unittest
|
| -
|
| -def printError(f):
|
| - print f
|
| -
|
| -class StringIOWithoutClosing(StringIO.StringIO):
|
| - disconnecting = 0
|
| - def close(self): pass
|
| - def loseConnection(self): pass
|
| -
|
| -class PassportTests(unittest.TestCase):
|
| -
|
| - def setUp(self):
|
| - self.result = []
|
| - self.deferred = Deferred()
|
| - self.deferred.addCallback(lambda r: self.result.append(r))
|
| - self.deferred.addErrback(printError)
|
| -
|
| - def testNexus(self):
|
| - protocol = msn.PassportNexus(self.deferred, 'https://foobar.com/somepage.quux')
|
| - headers = {
|
| - 'Content-Length' : '0',
|
| - 'Content-Type' : 'text/html',
|
| - 'PassportURLs' : 'DARealm=Passport.Net,DALogin=login.myserver.com/,DAReg=reg.myserver.com'
|
| - }
|
| - transport = StringIOWithoutClosing()
|
| - protocol.makeConnection(transport)
|
| - protocol.dataReceived('HTTP/1.0 200 OK\r\n')
|
| - for (h,v) in headers.items(): protocol.dataReceived('%s: %s\r\n' % (h,v))
|
| - protocol.dataReceived('\r\n')
|
| - self.failUnless(self.result[0] == "https://login.myserver.com/")
|
| -
|
| - def _doLoginTest(self, response, headers):
|
| - protocol = msn.PassportLogin(self.deferred,'foo@foo.com','testpass','https://foo.com/', 'a')
|
| - protocol.makeConnection(StringIOWithoutClosing())
|
| - protocol.dataReceived(response)
|
| - for (h,v) in headers.items(): protocol.dataReceived('%s: %s\r\n' % (h,v))
|
| - protocol.dataReceived('\r\n')
|
| -
|
| - def testPassportLoginSuccess(self):
|
| - headers = {
|
| - 'Content-Length' : '0',
|
| - 'Content-Type' : 'text/html',
|
| - 'Authentication-Info' : "Passport1.4 da-status=success,tname=MSPAuth," +
|
| - "tname=MSPProf,tname=MSPSec,from-PP='somekey'," +
|
| - "ru=http://messenger.msn.com"
|
| - }
|
| - self._doLoginTest('HTTP/1.1 200 OK\r\n', headers)
|
| - self.failUnless(self.result[0] == (msn.LOGIN_SUCCESS, 'somekey'))
|
| -
|
| - def testPassportLoginFailure(self):
|
| - headers = {
|
| - 'Content-Type' : 'text/html',
|
| - 'WWW-Authenticate' : 'Passport1.4 da-status=failed,' +
|
| - 'srealm=Passport.NET,ts=-3,prompt,cburl=http://host.com,' +
|
| - 'cbtxt=the%20error%20message'
|
| - }
|
| - self._doLoginTest('HTTP/1.1 401 Unauthorized\r\n', headers)
|
| - self.failUnless(self.result[0] == (msn.LOGIN_FAILURE, 'the error message'))
|
| -
|
| - def testPassportLoginRedirect(self):
|
| - headers = {
|
| - 'Content-Type' : 'text/html',
|
| - 'Authentication-Info' : 'Passport1.4 da-status=redir',
|
| - 'Location' : 'https://newlogin.host.com/'
|
| - }
|
| - self._doLoginTest('HTTP/1.1 302 Found\r\n', headers)
|
| - self.failUnless(self.result[0] == (msn.LOGIN_REDIRECT, 'https://newlogin.host.com/', 'a'))
|
| -
|
| -
|
| -if msn is not None:
|
| - class DummySwitchboardClient(msn.SwitchboardClient):
|
| - def userTyping(self, message):
|
| - self.state = 'TYPING'
|
| -
|
| - def gotSendRequest(self, fileName, fileSize, cookie, message):
|
| - if fileName == 'foobar.ext' and fileSize == 31337 and cookie == 1234: self.state = 'INVITATION'
|
| -
|
| -
|
| - class DummyNotificationClient(msn.NotificationClient):
|
| - def loggedIn(self, userHandle, screenName, verified):
|
| - if userHandle == 'foo@bar.com' and screenName == 'Test Screen Name' and verified:
|
| - self.state = 'LOGIN'
|
| -
|
| - def gotProfile(self, message):
|
| - self.state = 'PROFILE'
|
| -
|
| - def gotContactStatus(self, code, userHandle, screenName):
|
| - if code == msn.STATUS_AWAY and userHandle == "foo@bar.com" and screenName == "Test Screen Name":
|
| - self.state = 'INITSTATUS'
|
| -
|
| - def contactStatusChanged(self, code, userHandle, screenName):
|
| - if code == msn.STATUS_LUNCH and userHandle == "foo@bar.com" and screenName == "Test Name":
|
| - self.state = 'NEWSTATUS'
|
| -
|
| - def contactOffline(self, userHandle):
|
| - if userHandle == "foo@bar.com": self.state = 'OFFLINE'
|
| -
|
| - def statusChanged(self, code):
|
| - if code == msn.STATUS_HIDDEN: self.state = 'MYSTATUS'
|
| -
|
| - def listSynchronized(self, *args):
|
| - self.state = 'GOTLIST'
|
| -
|
| - def gotPhoneNumber(self, listVersion, userHandle, phoneType, number):
|
| - msn.NotificationClient.gotPhoneNumber(self, listVersion, userHandle, phoneType, number)
|
| - self.state = 'GOTPHONE'
|
| -
|
| - def userRemovedMe(self, userHandle, listVersion):
|
| - msn.NotificationClient.userRemovedMe(self, userHandle, listVersion)
|
| - c = self.factory.contacts.getContact(userHandle)
|
| - if not c and self.factory.contacts.version == listVersion: self.state = 'USERREMOVEDME'
|
| -
|
| - def userAddedMe(self, userHandle, screenName, listVersion):
|
| - msn.NotificationClient.userAddedMe(self, userHandle, screenName, listVersion)
|
| - c = self.factory.contacts.getContact(userHandle)
|
| - if c and (c.lists | msn.REVERSE_LIST) and (self.factory.contacts.version == listVersion) and \
|
| - (screenName == 'Screen Name'):
|
| - self.state = 'USERADDEDME'
|
| -
|
| - def gotSwitchboardInvitation(self, sessionID, host, port, key, userHandle, screenName):
|
| - if sessionID == 1234 and \
|
| - host == '192.168.1.1' and \
|
| - port == 1863 and \
|
| - key == '123.456' and \
|
| - userHandle == 'foo@foo.com' and \
|
| - screenName == 'Screen Name':
|
| - self.state = 'SBINVITED'
|
| -
|
| -class NotificationTests(unittest.TestCase):
|
| - """ testing the various events in NotificationClient """
|
| -
|
| - def setUp(self):
|
| - self.client = DummyNotificationClient()
|
| - self.client.factory = msn.NotificationFactory()
|
| - self.client.state = 'START'
|
| -
|
| - def tearDown(self):
|
| - self.client = None
|
| -
|
| - def testLogin(self):
|
| - self.client.lineReceived('USR 1 OK foo@bar.com Test%20Screen%20Name 1 0')
|
| - self.failUnless((self.client.state == 'LOGIN'), msg='Failed to detect successful login')
|
| -
|
| - def testProfile(self):
|
| - m = 'MSG Hotmail Hotmail 353\r\nMIME-Version: 1.0\r\nContent-Type: text/x-msmsgsprofile; charset=UTF-8\r\n'
|
| - m += 'LoginTime: 1016941010\r\nEmailEnabled: 1\r\nMemberIdHigh: 40000\r\nMemberIdLow: -600000000\r\nlang_preference: 1033\r\n'
|
| - m += 'preferredEmail: foo@bar.com\r\ncountry: AU\r\nPostalCode: 90210\r\nGender: M\r\nKid: 0\r\nAge:\r\nsid: 400\r\n'
|
| - m += 'kv: 2\r\nMSPAuth: 2CACCBCCADMoV8ORoz64BVwmjtksIg!kmR!Rj5tBBqEaW9hc4YnPHSOQ$$\r\n\r\n'
|
| - map(self.client.lineReceived, m.split('\r\n')[:-1])
|
| - self.failUnless((self.client.state == 'PROFILE'), msg='Failed to detect initial profile')
|
| -
|
| - def testStatus(self):
|
| - t = [('ILN 1 AWY foo@bar.com Test%20Screen%20Name 0', 'INITSTATUS', 'Failed to detect initial status report'),
|
| - ('NLN LUN foo@bar.com Test%20Name 0', 'NEWSTATUS', 'Failed to detect contact status change'),
|
| - ('FLN foo@bar.com', 'OFFLINE', 'Failed to detect contact signing off'),
|
| - ('CHG 1 HDN 0', 'MYSTATUS', 'Failed to detect my status changing')]
|
| - for i in t:
|
| - self.client.lineReceived(i[0])
|
| - self.failUnless((self.client.state == i[1]), msg=i[2])
|
| -
|
| - def testListSync(self):
|
| - # currently this test does not take into account the fact
|
| - # that BPRs sent as part of the SYN reply may not be interpreted
|
| - # as such if they are for the last LST -- maybe I should
|
| - # factor this in later.
|
| - self.client.makeConnection(StringIOWithoutClosing())
|
| - msn.NotificationClient.loggedIn(self.client, 'foo@foo.com', 'foobar', 1)
|
| - lines = [
|
| - "SYN %s 100 1 1" % self.client.currentID,
|
| - "GTC A",
|
| - "BLP AL",
|
| - "LSG 0 Other%20Contacts 0",
|
| - "LST userHandle@email.com Some%20Name 11 0"
|
| - ]
|
| - map(self.client.lineReceived, lines)
|
| - contacts = self.client.factory.contacts
|
| - contact = contacts.getContact('userHandle@email.com')
|
| - self.failUnless(contacts.version == 100, "Invalid contact list version")
|
| - self.failUnless(contact.screenName == 'Some Name', "Invalid screen-name for user")
|
| - self.failUnless(contacts.groups == {0 : 'Other Contacts'}, "Did not get proper group list")
|
| - self.failUnless(contact.groups == [0] and contact.lists == 11, "Invalid contact list/group info")
|
| - self.failUnless(self.client.state == 'GOTLIST', "Failed to call list sync handler")
|
| -
|
| - def testAsyncPhoneChange(self):
|
| - c = msn.MSNContact(userHandle='userHandle@email.com')
|
| - self.client.factory.contacts = msn.MSNContactList()
|
| - self.client.factory.contacts.addContact(c)
|
| - self.client.makeConnection(StringIOWithoutClosing())
|
| - self.client.lineReceived("BPR 101 userHandle@email.com PHH 123%20456")
|
| - c = self.client.factory.contacts.getContact('userHandle@email.com')
|
| - self.failUnless(self.client.state == 'GOTPHONE', "Did not fire phone change callback")
|
| - self.failUnless(c.homePhone == '123 456', "Did not update the contact's phone number")
|
| - self.failUnless(self.client.factory.contacts.version == 101, "Did not update list version")
|
| -
|
| - def testLateBPR(self):
|
| - """
|
| - This test makes sure that if a BPR response that was meant
|
| - to be part of a SYN response (but came after the last LST)
|
| - is received, the correct contact is updated and all is well
|
| - """
|
| - self.client.makeConnection(StringIOWithoutClosing())
|
| - msn.NotificationClient.loggedIn(self.client, 'foo@foo.com', 'foo', 1)
|
| - lines = [
|
| - "SYN %s 100 1 1" % self.client.currentID,
|
| - "GTC A",
|
| - "BLP AL",
|
| - "LSG 0 Other%20Contacts 0",
|
| - "LST userHandle@email.com Some%20Name 11 0",
|
| - "BPR PHH 123%20456"
|
| - ]
|
| - map(self.client.lineReceived, lines)
|
| - contact = self.client.factory.contacts.getContact('userHandle@email.com')
|
| - self.failUnless(contact.homePhone == '123 456', "Did not update contact's phone number")
|
| -
|
| - def testUserRemovedMe(self):
|
| - self.client.factory.contacts = msn.MSNContactList()
|
| - contact = msn.MSNContact(userHandle='foo@foo.com')
|
| - contact.addToList(msn.REVERSE_LIST)
|
| - self.client.factory.contacts.addContact(contact)
|
| - self.client.lineReceived("REM 0 RL 100 foo@foo.com")
|
| - self.failUnless(self.client.state == 'USERREMOVEDME', "Failed to remove user from reverse list")
|
| -
|
| - def testUserAddedMe(self):
|
| - self.client.factory.contacts = msn.MSNContactList()
|
| - self.client.lineReceived("ADD 0 RL 100 foo@foo.com Screen%20Name")
|
| - self.failUnless(self.client.state == 'USERADDEDME', "Failed to add user to reverse lise")
|
| -
|
| - def testAsyncSwitchboardInvitation(self):
|
| - self.client.lineReceived("RNG 1234 192.168.1.1:1863 CKI 123.456 foo@foo.com Screen%20Name")
|
| - self.failUnless(self.client.state == "SBINVITED")
|
| -
|
| - def testCommandFailed(self):
|
| - """
|
| - Ensures that error responses from the server fires an errback with
|
| - MSNCommandFailed.
|
| - """
|
| - id, d = self.client._createIDMapping()
|
| - self.client.lineReceived("201 %s" % id)
|
| - d = self.assertFailure(d, msn.MSNCommandFailed)
|
| - def assertErrorCode(exception):
|
| - self.assertEqual(201, exception.errorCode)
|
| - return d.addCallback(assertErrorCode)
|
| -
|
| -
|
| -class MessageHandlingTests(unittest.TestCase):
|
| - """ testing various message handling methods from SwichboardClient """
|
| -
|
| - def setUp(self):
|
| - self.client = DummySwitchboardClient()
|
| - self.client.state = 'START'
|
| -
|
| - def tearDown(self):
|
| - self.client = None
|
| -
|
| - def testClientCapabilitiesCheck(self):
|
| - m = msn.MSNMessage()
|
| - m.setHeader('Content-Type', 'text/x-clientcaps')
|
| - self.assertEquals(self.client.checkMessage(m), 0, 'Failed to detect client capability message')
|
| -
|
| - def testTypingCheck(self):
|
| - m = msn.MSNMessage()
|
| - m.setHeader('Content-Type', 'text/x-msmsgscontrol')
|
| - m.setHeader('TypingUser', 'foo@bar')
|
| - self.client.checkMessage(m)
|
| - self.failUnless((self.client.state == 'TYPING'), msg='Failed to detect typing notification')
|
| -
|
| - def testFileInvitation(self, lazyClient=False):
|
| - m = msn.MSNMessage()
|
| - m.setHeader('Content-Type', 'text/x-msmsgsinvite; charset=UTF-8')
|
| - m.message += 'Application-Name: File Transfer\r\n'
|
| - if not lazyClient:
|
| - m.message += 'Application-GUID: {5D3E02AB-6190-11d3-BBBB-00C04F795683}\r\n'
|
| - m.message += 'Invitation-Command: Invite\r\n'
|
| - m.message += 'Invitation-Cookie: 1234\r\n'
|
| - m.message += 'Application-File: foobar.ext\r\n'
|
| - m.message += 'Application-FileSize: 31337\r\n\r\n'
|
| - self.client.checkMessage(m)
|
| - self.failUnless((self.client.state == 'INVITATION'), msg='Failed to detect file transfer invitation')
|
| -
|
| - def testFileInvitationMissingGUID(self):
|
| - return self.testFileInvitation(True)
|
| -
|
| - def testFileResponse(self):
|
| - d = Deferred()
|
| - d.addCallback(self.fileResponse)
|
| - self.client.cookies['iCookies'][1234] = (d, None)
|
| - m = msn.MSNMessage()
|
| - m.setHeader('Content-Type', 'text/x-msmsgsinvite; charset=UTF-8')
|
| - m.message += 'Invitation-Command: ACCEPT\r\n'
|
| - m.message += 'Invitation-Cookie: 1234\r\n\r\n'
|
| - self.client.checkMessage(m)
|
| - self.failUnless((self.client.state == 'RESPONSE'), msg='Failed to detect file transfer response')
|
| -
|
| - def testFileInfo(self):
|
| - d = Deferred()
|
| - d.addCallback(self.fileInfo)
|
| - self.client.cookies['external'][1234] = (d, None)
|
| - m = msn.MSNMessage()
|
| - m.setHeader('Content-Type', 'text/x-msmsgsinvite; charset=UTF-8')
|
| - m.message += 'Invitation-Command: ACCEPT\r\n'
|
| - m.message += 'Invitation-Cookie: 1234\r\n'
|
| - m.message += 'IP-Address: 192.168.0.1\r\n'
|
| - m.message += 'Port: 6891\r\n'
|
| - m.message += 'AuthCookie: 4321\r\n\r\n'
|
| - self.client.checkMessage(m)
|
| - self.failUnless((self.client.state == 'INFO'), msg='Failed to detect file transfer info')
|
| -
|
| - def fileResponse(self, (accept, cookie, info)):
|
| - if accept and cookie == 1234: self.client.state = 'RESPONSE'
|
| -
|
| - def fileInfo(self, (accept, ip, port, aCookie, info)):
|
| - if accept and ip == '192.168.0.1' and port == 6891 and aCookie == 4321: self.client.state = 'INFO'
|
| -
|
| -
|
| -class FileTransferTestCase(unittest.TestCase):
|
| - """ test FileSend against FileReceive """
|
| -
|
| - def setUp(self):
|
| - self.input = StringIOWithoutClosing()
|
| - self.input.writelines(['a'] * 7000)
|
| - self.input.seek(0)
|
| - self.output = StringIOWithoutClosing()
|
| -
|
| - def tearDown(self):
|
| - self.input = None
|
| - self.output = None
|
| -
|
| - def testFileTransfer(self):
|
| - auth = 1234
|
| - sender = msn.FileSend(self.input)
|
| - sender.auth = auth
|
| - sender.fileSize = 7000
|
| - client = msn.FileReceive(auth, "foo@bar.com", self.output)
|
| - client.fileSize = 7000
|
| - def check(ignored):
|
| - self.failUnless((client.completed and sender.completed),
|
| - msg="send failed to complete")
|
| - self.failUnless((self.input.getvalue() == self.output.getvalue()),
|
| - msg="saved file does not match original")
|
| - d = loopback.loopbackAsync(sender, client)
|
| - d.addCallback(check)
|
| - return d
|
| -
|
| -
|
| -if msn is None:
|
| - for testClass in [PassportTests, NotificationTests,
|
| - MessageHandlingTests, FileTransferTestCase]:
|
| - testClass.skip = (
|
| - "MSN requires an HTTP client but none is available, "
|
| - "skipping tests.")
|
|
|