Index: third_party/twisted_8_1/twisted/names/test/test_names.py |
diff --git a/third_party/twisted_8_1/twisted/names/test/test_names.py b/third_party/twisted_8_1/twisted/names/test/test_names.py |
deleted file mode 100644 |
index 6446ebfa7676f3b87e964934bb5f85d5724283e6..0000000000000000000000000000000000000000 |
--- a/third_party/twisted_8_1/twisted/names/test/test_names.py |
+++ /dev/null |
@@ -1,712 +0,0 @@ |
-# -*- test-case-name: twisted.names.test.test_names -*- |
-# Copyright (c) 2001-2004 Twisted Matrix Laboratories. |
-# See LICENSE for details. |
- |
-""" |
-Test cases for twisted.names. |
-""" |
-from __future__ import nested_scopes |
- |
-import socket, operator, copy |
- |
-from twisted.trial import unittest |
- |
-from twisted.internet import reactor, defer, error |
-from twisted.internet.defer import succeed |
-from twisted.names import client, server, common, authority, hosts, dns |
-from twisted.python import failure |
-from twisted.names.error import DNSFormatError, DNSServerError, DNSNameError |
-from twisted.names.error import DNSNotImplementedError, DNSQueryRefusedError |
-from twisted.names.error import DNSUnknownError |
-from twisted.names.dns import EFORMAT, ESERVER, ENAME, ENOTIMP, EREFUSED |
-from twisted.names.dns import Message |
-from twisted.names.client import Resolver |
- |
- |
-def justPayload(results): |
- return [r.payload for r in results[0]] |
- |
-class NoFileAuthority(authority.FileAuthority): |
- def __init__(self, soa, records): |
- # Yes, skip FileAuthority |
- common.ResolverBase.__init__(self) |
- self.soa, self.records = soa, records |
- |
- |
-soa_record = dns.Record_SOA( |
- mname = 'test-domain.com', |
- rname = 'root.test-domain.com', |
- serial = 100, |
- refresh = 1234, |
- minimum = 7654, |
- expire = 19283784, |
- retry = 15, |
- ttl=1 |
- ) |
- |
-reverse_soa = dns.Record_SOA( |
- mname = '93.84.28.in-addr.arpa', |
- rname = '93.84.28.in-addr.arpa', |
- serial = 120, |
- refresh = 54321, |
- minimum = 382, |
- expire = 11193983, |
- retry = 30, |
- ttl=3 |
- ) |
- |
-my_soa = dns.Record_SOA( |
- mname = 'my-domain.com', |
- rname = 'postmaster.test-domain.com', |
- serial = 130, |
- refresh = 12345, |
- minimum = 1, |
- expire = 999999, |
- retry = 100, |
- ) |
- |
-test_domain_com = NoFileAuthority( |
- soa = ('test-domain.com', soa_record), |
- records = { |
- 'test-domain.com': [ |
- soa_record, |
- dns.Record_A('127.0.0.1'), |
- dns.Record_NS('39.28.189.39'), |
- dns.Record_MX(10, 'host.test-domain.com'), |
- dns.Record_HINFO(os='Linux', cpu='A Fast One, Dontcha know'), |
- dns.Record_CNAME('canonical.name.com'), |
- dns.Record_MB('mailbox.test-domain.com'), |
- dns.Record_MG('mail.group.someplace'), |
- dns.Record_TXT('A First piece of Text', 'a SecoNd piece'), |
- dns.Record_A6(0, 'ABCD::4321', ''), |
- dns.Record_A6(12, '0:0069::0', 'some.network.tld'), |
- dns.Record_A6(8, '0:5634:1294:AFCB:56AC:48EF:34C3:01FF', 'tra.la.la.net'), |
- dns.Record_TXT('Some more text, haha! Yes. \0 Still here?'), |
- dns.Record_MR('mail.redirect.or.whatever'), |
- dns.Record_MINFO(rmailbx='r mail box', emailbx='e mail box'), |
- dns.Record_AFSDB(subtype=1, hostname='afsdb.test-domain.com'), |
- dns.Record_RP(mbox='whatever.i.dunno', txt='some.more.text'), |
- dns.Record_WKS('12.54.78.12', socket.IPPROTO_TCP, '\x12\x01\x16\xfe\xc1\x00\x01'), |
- dns.Record_AAAA('AF43:5634:1294:AFCB:56AC:48EF:34C3:01FF')], |
- 'http.tcp.test-domain.com': [ |
- dns.Record_SRV(257, 16383, 43690, 'some.other.place.fool') |
- ], |
- 'host.test-domain.com': [ |
- dns.Record_A('123.242.1.5'), |
- dns.Record_A('0.255.0.255'), |
- ], |
- 'host-two.test-domain.com': [ |
-# |
-# Python bug |
-# dns.Record_A('255.255.255.255'), |
-# |
- dns.Record_A('255.255.255.254'), |
- dns.Record_A('0.0.0.0') |
- ], |
- 'cname.test-domain.com': [ |
- dns.Record_CNAME('test-domain.com') |
- ], |
- 'anothertest-domain.com': [ |
- dns.Record_A('1.2.3.4')], |
- } |
-) |
- |
-reverse_domain = NoFileAuthority( |
- soa = ('93.84.28.in-addr.arpa', reverse_soa), |
- records = { |
- '123.93.84.28.in-addr.arpa': [ |
- dns.Record_PTR('test.host-reverse.lookup.com'), |
- reverse_soa |
- ] |
- } |
-) |
- |
- |
-my_domain_com = NoFileAuthority( |
- soa = ('my-domain.com', my_soa), |
- records = { |
- 'my-domain.com': [ |
- my_soa, |
- dns.Record_A('1.2.3.4', ttl='1S'), |
- dns.Record_NS('ns1.domain', ttl='2M'), |
- dns.Record_NS('ns2.domain', ttl='3H'), |
- dns.Record_SRV(257, 16383, 43690, 'some.other.place.fool', ttl='4D') |
- ] |
- } |
- ) |
- |
- |
-class ServerDNSTestCase(unittest.TestCase): |
- """ |
- Test cases for DNS server and client. |
- """ |
- |
- def setUp(self): |
- self.factory = server.DNSServerFactory([ |
- test_domain_com, reverse_domain, my_domain_com |
- ], verbose=2) |
- |
- p = dns.DNSDatagramProtocol(self.factory) |
- |
- while 1: |
- self.listenerTCP = reactor.listenTCP(0, self.factory, interface="127.0.0.1") |
- port = self.listenerTCP.getHost().port |
- |
- try: |
- self.listenerUDP = reactor.listenUDP(port, p, interface="127.0.0.1") |
- except error.CannotListenError: |
- self.listenerTCP.stopListening() |
- else: |
- break |
- |
- self.resolver = client.Resolver(servers=[('127.0.0.1', port)]) |
- |
- |
- def tearDown(self): |
- """ |
- Asynchronously disconnect listenerTCP, listenerUDP and resolver. |
- """ |
- d1 = self.listenerTCP.loseConnection() |
- d2 = defer.maybeDeferred(self.listenerUDP.stopListening) |
- d = defer.gatherResults([d1, d2]) |
- def disconnectTransport(ignored): |
- if getattr(self.resolver.protocol, 'transport', None) is not None: |
- return self.resolver.protocol.transport.stopListening() |
- d.addCallback(disconnectTransport) |
- d.addCallback(lambda x : self.failUnless( |
- self.listenerUDP.disconnected |
- and self.listenerTCP.disconnected)) |
- return d |
- |
- |
- def namesTest(self, d, r): |
- self.response = None |
- def setDone(response): |
- self.response = response |
- |
- def checkResults(ignored): |
- if isinstance(self.response, failure.Failure): |
- raise self.response |
- results = justPayload(self.response) |
- assert len(results) == len(r), "%s != %s" % (map(str, results), map(str, r)) |
- for rec in results: |
- assert rec in r, "%s not in %s" % (rec, map(str, r)) |
- |
- d.addBoth(setDone) |
- d.addCallback(checkResults) |
- return d |
- |
- def testAddressRecord1(self): |
- """Test simple DNS 'A' record queries""" |
- return self.namesTest( |
- self.resolver.lookupAddress('test-domain.com'), |
- [dns.Record_A('127.0.0.1', ttl=19283784)] |
- ) |
- |
- |
- def testAddressRecord2(self): |
- """Test DNS 'A' record queries with multiple answers""" |
- return self.namesTest( |
- self.resolver.lookupAddress('host.test-domain.com'), |
- [dns.Record_A('123.242.1.5', ttl=19283784), dns.Record_A('0.255.0.255', ttl=19283784)] |
- ) |
- |
- |
- def testAdressRecord3(self): |
- """Test DNS 'A' record queries with edge cases""" |
- return self.namesTest( |
- self.resolver.lookupAddress('host-two.test-domain.com'), |
- [dns.Record_A('255.255.255.254', ttl=19283784), dns.Record_A('0.0.0.0', ttl=19283784)] |
- ) |
- |
- def testAuthority(self): |
- """Test DNS 'SOA' record queries""" |
- return self.namesTest( |
- self.resolver.lookupAuthority('test-domain.com'), |
- [soa_record] |
- ) |
- |
- |
- def testMailExchangeRecord(self): |
- """Test DNS 'MX' record queries""" |
- return self.namesTest( |
- self.resolver.lookupMailExchange('test-domain.com'), |
- [dns.Record_MX(10, 'host.test-domain.com', ttl=19283784)] |
- ) |
- |
- |
- def testNameserver(self): |
- """Test DNS 'NS' record queries""" |
- return self.namesTest( |
- self.resolver.lookupNameservers('test-domain.com'), |
- [dns.Record_NS('39.28.189.39', ttl=19283784)] |
- ) |
- |
- |
- def testHINFO(self): |
- """Test DNS 'HINFO' record queries""" |
- return self.namesTest( |
- self.resolver.lookupHostInfo('test-domain.com'), |
- [dns.Record_HINFO(os='Linux', cpu='A Fast One, Dontcha know', ttl=19283784)] |
- ) |
- |
- def testPTR(self): |
- """Test DNS 'PTR' record queries""" |
- return self.namesTest( |
- self.resolver.lookupPointer('123.93.84.28.in-addr.arpa'), |
- [dns.Record_PTR('test.host-reverse.lookup.com', ttl=11193983)] |
- ) |
- |
- |
- def testCNAME(self): |
- """Test DNS 'CNAME' record queries""" |
- return self.namesTest( |
- self.resolver.lookupCanonicalName('test-domain.com'), |
- [dns.Record_CNAME('canonical.name.com', ttl=19283784)] |
- ) |
- |
- def testCNAMEAdditional(self): |
- """Test additional processing for CNAME records""" |
- return self.namesTest( |
- self.resolver.lookupAddress('cname.test-domain.com'), |
- [dns.Record_CNAME('test-domain.com', ttl=19283784), dns.Record_A('127.0.0.1', ttl=19283784)] |
- ) |
- |
- def testMB(self): |
- """Test DNS 'MB' record queries""" |
- return self.namesTest( |
- self.resolver.lookupMailBox('test-domain.com'), |
- [dns.Record_MB('mailbox.test-domain.com', ttl=19283784)] |
- ) |
- |
- |
- def testMG(self): |
- """Test DNS 'MG' record queries""" |
- return self.namesTest( |
- self.resolver.lookupMailGroup('test-domain.com'), |
- [dns.Record_MG('mail.group.someplace', ttl=19283784)] |
- ) |
- |
- |
- def testMR(self): |
- """Test DNS 'MR' record queries""" |
- return self.namesTest( |
- self.resolver.lookupMailRename('test-domain.com'), |
- [dns.Record_MR('mail.redirect.or.whatever', ttl=19283784)] |
- ) |
- |
- |
- def testMINFO(self): |
- """Test DNS 'MINFO' record queries""" |
- return self.namesTest( |
- self.resolver.lookupMailboxInfo('test-domain.com'), |
- [dns.Record_MINFO(rmailbx='r mail box', emailbx='e mail box', ttl=19283784)] |
- ) |
- |
- |
- def testSRV(self): |
- """Test DNS 'SRV' record queries""" |
- return self.namesTest( |
- self.resolver.lookupService('http.tcp.test-domain.com'), |
- [dns.Record_SRV(257, 16383, 43690, 'some.other.place.fool', ttl=19283784)] |
- ) |
- |
- def testAFSDB(self): |
- """Test DNS 'AFSDB' record queries""" |
- return self.namesTest( |
- self.resolver.lookupAFSDatabase('test-domain.com'), |
- [dns.Record_AFSDB(subtype=1, hostname='afsdb.test-domain.com', ttl=19283784)] |
- ) |
- |
- |
- def testRP(self): |
- """Test DNS 'RP' record queries""" |
- return self.namesTest( |
- self.resolver.lookupResponsibility('test-domain.com'), |
- [dns.Record_RP(mbox='whatever.i.dunno', txt='some.more.text', ttl=19283784)] |
- ) |
- |
- |
- def testTXT(self): |
- """Test DNS 'TXT' record queries""" |
- return self.namesTest( |
- self.resolver.lookupText('test-domain.com'), |
- [dns.Record_TXT('A First piece of Text', 'a SecoNd piece', ttl=19283784), |
- dns.Record_TXT('Some more text, haha! Yes. \0 Still here?', ttl=19283784)] |
- ) |
- |
- |
- def testWKS(self): |
- """Test DNS 'WKS' record queries""" |
- return self.namesTest( |
- self.resolver.lookupWellKnownServices('test-domain.com'), |
- [dns.Record_WKS('12.54.78.12', socket.IPPROTO_TCP, '\x12\x01\x16\xfe\xc1\x00\x01', ttl=19283784)] |
- ) |
- |
- |
- def testSomeRecordsWithTTLs(self): |
- result_soa = copy.copy(my_soa) |
- result_soa.ttl = my_soa.expire |
- return self.namesTest( |
- self.resolver.lookupAllRecords('my-domain.com'), |
- [result_soa, |
- dns.Record_A('1.2.3.4', ttl='1S'), |
- dns.Record_NS('ns1.domain', ttl='2M'), |
- dns.Record_NS('ns2.domain', ttl='3H'), |
- dns.Record_SRV(257, 16383, 43690, 'some.other.place.fool', ttl='4D')] |
- ) |
- |
- |
- def testAAAA(self): |
- """Test DNS 'AAAA' record queries (IPv6)""" |
- return self.namesTest( |
- self.resolver.lookupIPV6Address('test-domain.com'), |
- [dns.Record_AAAA('AF43:5634:1294:AFCB:56AC:48EF:34C3:01FF', ttl=19283784)] |
- ) |
- |
- def testA6(self): |
- """Test DNS 'A6' record queries (IPv6)""" |
- return self.namesTest( |
- self.resolver.lookupAddress6('test-domain.com'), |
- [dns.Record_A6(0, 'ABCD::4321', '', ttl=19283784), |
- dns.Record_A6(12, '0:0069::0', 'some.network.tld', ttl=19283784), |
- dns.Record_A6(8, '0:5634:1294:AFCB:56AC:48EF:34C3:01FF', 'tra.la.la.net', ttl=19283784)] |
- ) |
- |
- |
- def testZoneTransfer(self): |
- """Test DNS 'AXFR' queries (Zone transfer)""" |
- default_ttl = soa_record.expire |
- results = [copy.copy(r) for r in reduce(operator.add, test_domain_com.records.values())] |
- for r in results: |
- if r.ttl is None: |
- r.ttl = default_ttl |
- return self.namesTest( |
- self.resolver.lookupZone('test-domain.com').addCallback(lambda r: (r[0][:-1],)), |
- results |
- ) |
- |
- def testSimilarZonesDontInterfere(self): |
- """Tests that unrelated zones don't mess with each other.""" |
- return self.namesTest( |
- self.resolver.lookupAddress("anothertest-domain.com"), |
- [dns.Record_A('1.2.3.4', ttl=19283784)] |
- ) |
- |
- |
- |
-class DNSServerFactoryTests(unittest.TestCase): |
- """ |
- Tests for L{server.DNSServerFactory}. |
- """ |
- def _messageReceivedTest(self, methodName, message): |
- """ |
- Assert that the named method is called with the given message when |
- it is passed to L{DNSServerFactory.messageReceived}. |
- """ |
- # Make it appear to have some queries so that |
- # DNSServerFactory.allowQuery allows it. |
- message.queries = [None] |
- |
- receivedMessages = [] |
- def fakeHandler(message, protocol, address): |
- receivedMessages.append((message, protocol, address)) |
- |
- class FakeProtocol(object): |
- def writeMessage(self, message): |
- pass |
- |
- protocol = FakeProtocol() |
- factory = server.DNSServerFactory(None) |
- setattr(factory, methodName, fakeHandler) |
- factory.messageReceived(message, protocol) |
- self.assertEqual(receivedMessages, [(message, protocol, None)]) |
- |
- |
- def test_notifyMessageReceived(self): |
- """ |
- L{DNSServerFactory.messageReceived} passes messages with an opcode |
- of C{OP_NOTIFY} on to L{DNSServerFactory.handleNotify}. |
- """ |
- # RFC 1996, section 4.5 |
- opCode = 4 |
- self._messageReceivedTest('handleNotify', Message(opCode=opCode)) |
- |
- |
- def test_updateMessageReceived(self): |
- """ |
- L{DNSServerFactory.messageReceived} passes messages with an opcode |
- of C{OP_UPDATE} on to L{DNSServerFactory.handleOther}. |
- |
- This may change if the implementation ever covers update messages. |
- """ |
- # RFC 2136, section 1.3 |
- opCode = 5 |
- self._messageReceivedTest('handleOther', Message(opCode=opCode)) |
- |
- |
- |
-class HelperTestCase(unittest.TestCase): |
- def testSerialGenerator(self): |
- f = self.mktemp() |
- a = authority.getSerial(f) |
- for i in range(20): |
- b = authority.getSerial(f) |
- self.failUnless(a < b) |
- a = b |
- |
- |
-class AXFRTest(unittest.TestCase): |
- def setUp(self): |
- self.results = None |
- self.d = defer.Deferred() |
- self.d.addCallback(self._gotResults) |
- self.controller = client.AXFRController('fooby.com', self.d) |
- |
- self.soa = dns.RRHeader(name='fooby.com', type=dns.SOA, cls=dns.IN, ttl=86400, auth=False, |
- payload=dns.Record_SOA(mname='fooby.com', |
- rname='hooj.fooby.com', |
- serial=100, |
- refresh=200, |
- retry=300, |
- expire=400, |
- minimum=500, |
- ttl=600)) |
- |
- self.records = [ |
- self.soa, |
- dns.RRHeader(name='fooby.com', type=dns.NS, cls=dns.IN, ttl=700, auth=False, |
- payload=dns.Record_NS(name='ns.twistedmatrix.com', ttl=700)), |
- |
- dns.RRHeader(name='fooby.com', type=dns.MX, cls=dns.IN, ttl=700, auth=False, |
- payload=dns.Record_MX(preference=10, exchange='mail.mv3d.com', ttl=700)), |
- |
- dns.RRHeader(name='fooby.com', type=dns.A, cls=dns.IN, ttl=700, auth=False, |
- payload=dns.Record_A(address='64.123.27.105', ttl=700)), |
- self.soa |
- ] |
- |
- def _makeMessage(self): |
- # hooray they all have the same message format |
- return dns.Message(id=999, answer=1, opCode=0, recDes=0, recAv=1, auth=1, rCode=0, trunc=0, maxSize=0) |
- |
- def testBindAndTNamesStyle(self): |
- # Bind style = One big single message |
- m = self._makeMessage() |
- m.queries = [dns.Query('fooby.com', dns.AXFR, dns.IN)] |
- m.answers = self.records |
- self.controller.messageReceived(m, None) |
- self.assertEquals(self.results, self.records) |
- |
- def _gotResults(self, result): |
- self.results = result |
- |
- def testDJBStyle(self): |
- # DJB style = message per record |
- records = self.records[:] |
- while records: |
- m = self._makeMessage() |
- m.queries = [] # DJB *doesn't* specify any queries.. hmm.. |
- m.answers = [records.pop(0)] |
- self.controller.messageReceived(m, None) |
- self.assertEquals(self.results, self.records) |
- |
-class HostsTestCase(unittest.TestCase): |
- def setUp(self): |
- f = open('EtcHosts', 'w') |
- f.write(''' |
-1.1.1.1 EXAMPLE EXAMPLE.EXAMPLETHING |
-1.1.1.2 HOOJY |
-::1 ip6thingy |
-''') |
- f.close() |
- self.resolver = hosts.Resolver('EtcHosts') |
- |
- def testGetHostByName(self): |
- data = [('EXAMPLE', '1.1.1.1'), |
- ('EXAMPLE.EXAMPLETHING', '1.1.1.1'), |
- ('HOOJY', '1.1.1.2'), |
- ] |
- ds = [self.resolver.getHostByName(n).addCallback(self.assertEqual, ip) |
- for n, ip in data] |
- return defer.gatherResults(ds) |
- |
- def testLookupAddress(self): |
- d = self.resolver.lookupAddress('HOOJY') |
- d.addCallback(lambda x: self.assertEqual(x[0][0].payload.dottedQuad(), |
- '1.1.1.2')) |
- return d |
- |
- def testIPv6(self): |
- d = self.resolver.lookupIPV6Address('ip6thingy') |
- d.addCallback(self.assertEqual, '::1') |
- return d |
- |
- testIPv6.skip = 'IPv6 support is not in our hosts resolver yet' |
- |
- def testNotImplemented(self): |
- return self.assertFailure(self.resolver.lookupMailExchange('EXAMPLE'), |
- NotImplementedError) |
- |
- def testQuery(self): |
- d = self.resolver.query(dns.Query('EXAMPLE')) |
- d.addCallback(lambda x: self.assertEqual(x[0][0].payload.dottedQuad(), |
- '1.1.1.1')) |
- return d |
- |
- def testNotFound(self): |
- return self.assertFailure(self.resolver.lookupAddress('foueoa'), |
- dns.DomainError) |
- |
- |
-class FakeDNSDatagramProtocol(object): |
- transport = object() |
- |
- def __init__(self): |
- self.queries = [] |
- |
- def query(self, address, queries, timeout=10, id=None): |
- self.queries.append((address, queries, timeout, id)) |
- return defer.fail(dns.DNSQueryTimeoutError(queries)) |
- |
- def removeResend(self, id): |
- # Ignore this for the time being. |
- pass |
- |
-class RetryLogic(unittest.TestCase): |
- testServers = [ |
- '1.2.3.4', |
- '4.3.2.1', |
- 'a.b.c.d', |
- 'z.y.x.w'] |
- |
- def testRoundRobinBackoff(self): |
- addrs = [(x, 53) for x in self.testServers] |
- r = client.Resolver(resolv=None, servers=addrs) |
- r.protocol = proto = FakeDNSDatagramProtocol() |
- return r.lookupAddress("foo.example.com" |
- ).addCallback(self._cbRoundRobinBackoff |
- ).addErrback(self._ebRoundRobinBackoff, proto |
- ) |
- |
- def _cbRoundRobinBackoff(self, result): |
- raise unittest.FailTest("Lookup address succeeded, should have timed out") |
- |
- def _ebRoundRobinBackoff(self, failure, fakeProto): |
- failure.trap(defer.TimeoutError) |
- |
- # Assert that each server is tried with a particular timeout |
- # before the timeout is increased and the attempts are repeated. |
- |
- for t in (1, 3, 11, 45): |
- tries = fakeProto.queries[:len(self.testServers)] |
- del fakeProto.queries[:len(self.testServers)] |
- |
- tries.sort() |
- expected = list(self.testServers) |
- expected.sort() |
- |
- for ((addr, query, timeout, id), expectedAddr) in zip(tries, expected): |
- self.assertEquals(addr, (expectedAddr, 53)) |
- self.assertEquals(timeout, t) |
- |
- self.failIf(fakeProto.queries) |
- |
-class ResolvConfHandling(unittest.TestCase): |
- def testMissing(self): |
- resolvConf = self.mktemp() |
- r = client.Resolver(resolv=resolvConf) |
- self.assertEquals(r.dynServers, [('127.0.0.1', 53)]) |
- r._parseCall.cancel() |
- |
- def testEmpty(self): |
- resolvConf = self.mktemp() |
- fObj = file(resolvConf, 'w') |
- fObj.close() |
- r = client.Resolver(resolv=resolvConf) |
- self.assertEquals(r.dynServers, [('127.0.0.1', 53)]) |
- r._parseCall.cancel() |
- |
- |
- |
-class FilterAnswersTests(unittest.TestCase): |
- """ |
- Test L{twisted.names.client.Resolver.filterAnswers}'s handling of various |
- error conditions it might encounter. |
- """ |
- def setUp(self): |
- # Create a resolver pointed at an invalid server - we won't be hitting |
- # the network in any of these tests. |
- self.resolver = Resolver(servers=[('0.0.0.0', 0)]) |
- |
- |
- def test_truncatedMessage(self): |
- """ |
- Test that a truncated message results in an equivalent request made via |
- TCP. |
- """ |
- m = Message(trunc=True) |
- m.addQuery('example.com') |
- |
- def queryTCP(queries): |
- self.assertEqual(queries, m.queries) |
- response = Message() |
- response.answers = ['answer'] |
- response.authority = ['authority'] |
- response.additional = ['additional'] |
- return succeed(response) |
- self.resolver.queryTCP = queryTCP |
- d = self.resolver.filterAnswers(m) |
- d.addCallback( |
- self.assertEqual, (['answer'], ['authority'], ['additional'])) |
- return d |
- |
- |
- def _rcodeTest(self, rcode, exc): |
- m = Message(rCode=rcode) |
- err = self.resolver.filterAnswers(m) |
- err.trap(exc) |
- |
- |
- def test_formatError(self): |
- """ |
- Test that a message with a result code of C{EFORMAT} results in a |
- failure wrapped around L{DNSFormatError}. |
- """ |
- return self._rcodeTest(EFORMAT, DNSFormatError) |
- |
- |
- def test_serverError(self): |
- """ |
- Like L{test_formatError} but for C{ESERVER}/L{DNSServerError}. |
- """ |
- return self._rcodeTest(ESERVER, DNSServerError) |
- |
- |
- def test_nameError(self): |
- """ |
- Like L{test_formatError} but for C{ENAME}/L{DNSNameError}. |
- """ |
- return self._rcodeTest(ENAME, DNSNameError) |
- |
- |
- def test_notImplementedError(self): |
- """ |
- Like L{test_formatError} but for C{ENOTIMP}/L{DNSNotImplementedError}. |
- """ |
- return self._rcodeTest(ENOTIMP, DNSNotImplementedError) |
- |
- |
- def test_refusedError(self): |
- """ |
- Like L{test_formatError} but for C{EREFUSED}/L{DNSQueryRefusedError}. |
- """ |
- return self._rcodeTest(EREFUSED, DNSQueryRefusedError) |
- |
- |
- def test_refusedErrorUnknown(self): |
- """ |
- Like L{test_formatError} but for an unrecognized error code and |
- L{DNSUnknownError}. |
- """ |
- return self._rcodeTest(EREFUSED + 1, DNSUnknownError) |