OLD | NEW |
| (Empty) |
1 # -*- test-case-name: twisted.names.test.test_client -*- | |
2 # Copyright (c) 2001-2007 Twisted Matrix Laboratories. | |
3 # See LICENSE for details. | |
4 | |
5 """ | |
6 Test cases for twisted.names.client | |
7 """ | |
8 | |
9 from twisted.names import client, dns | |
10 from twisted.names.error import DNSQueryTimeoutError | |
11 from twisted.trial import unittest | |
12 from twisted.names.common import ResolverBase | |
13 from twisted.internet import defer | |
14 | |
15 class FakeResolver(ResolverBase): | |
16 | |
17 def _lookup(self, name, cls, qtype, timeout): | |
18 """ | |
19 The getHostByNameTest does a different type of query that requires it | |
20 return an A record from an ALL_RECORDS lookup, so we accomodate that | |
21 here. | |
22 """ | |
23 if name == 'getHostByNameTest': | |
24 rr = dns.RRHeader(name=name, type=dns.A, cls=cls, ttl=60, | |
25 payload=dns.Record_A(address='127.0.0.1', ttl=60)) | |
26 else: | |
27 rr = dns.RRHeader(name=name, type=qtype, cls=cls, ttl=60) | |
28 | |
29 results = [rr] | |
30 authority = [] | |
31 addtional = [] | |
32 return defer.succeed((results, authority, addtional)) | |
33 | |
34 | |
35 | |
36 class StubDNSDatagramProtocol(object): | |
37 """ | |
38 L{dns.DNSDatagramProtocol}-alike. | |
39 | |
40 @ivar queries: A C{list} of tuples giving the arguments passed to | |
41 C{query} along with the L{defer.Deferred} which was returned from | |
42 the call. | |
43 """ | |
44 def __init__(self): | |
45 self.queries = [] | |
46 | |
47 | |
48 def query(self, address, queries, timeout=10, id=None): | |
49 """ | |
50 Record the given arguments and return a Deferred which will not be | |
51 called back by this code. | |
52 """ | |
53 result = defer.Deferred() | |
54 self.queries.append((address, queries, timeout, id, result)) | |
55 return result | |
56 | |
57 | |
58 | |
59 class ResolverTests(unittest.TestCase): | |
60 """ | |
61 Tests for L{client.Resolver}. | |
62 """ | |
63 def test_datagramQueryServerOrder(self): | |
64 """ | |
65 L{client.Resolver.queryUDP} should issue queries to its | |
66 L{dns.DNSDatagramProtocol} with server addresses taken from its own | |
67 C{servers} and C{dynServers} lists, proceeding through them in order | |
68 as L{DNSQueryTimeoutError}s occur. | |
69 """ | |
70 protocol = StubDNSDatagramProtocol() | |
71 protocol.transport = object() | |
72 | |
73 servers = [object(), object()] | |
74 dynServers = [object(), object()] | |
75 resolver = client.Resolver(servers=servers) | |
76 resolver.dynServers = dynServers | |
77 resolver.protocol = protocol | |
78 | |
79 expectedResult = object() | |
80 queryResult = resolver.queryUDP(None) | |
81 queryResult.addCallback(self.assertEqual, expectedResult) | |
82 | |
83 self.assertEqual(len(protocol.queries), 1) | |
84 self.assertIdentical(protocol.queries[0][0], servers[0]) | |
85 protocol.queries[0][-1].errback(DNSQueryTimeoutError(0)) | |
86 self.assertEqual(len(protocol.queries), 2) | |
87 self.assertIdentical(protocol.queries[1][0], servers[1]) | |
88 protocol.queries[1][-1].errback(DNSQueryTimeoutError(1)) | |
89 self.assertEqual(len(protocol.queries), 3) | |
90 self.assertIdentical(protocol.queries[2][0], dynServers[0]) | |
91 protocol.queries[2][-1].errback(DNSQueryTimeoutError(2)) | |
92 self.assertEqual(len(protocol.queries), 4) | |
93 self.assertIdentical(protocol.queries[3][0], dynServers[1]) | |
94 protocol.queries[3][-1].callback(expectedResult) | |
95 | |
96 return queryResult | |
97 | |
98 | |
99 | |
100 class ClientTestCase(unittest.TestCase): | |
101 | |
102 def setUp(self): | |
103 """ | |
104 Replace the resolver with a FakeResolver | |
105 """ | |
106 client.theResolver = FakeResolver() | |
107 self.hostname = 'example.com' | |
108 self.ghbntest = 'getHostByNameTest' | |
109 | |
110 def tearDown(self): | |
111 """ | |
112 By setting the resolver to None, it will be recreated next time a name | |
113 lookup is done. | |
114 """ | |
115 client.theResolver = None | |
116 | |
117 def checkResult(self, (results, authority, additional), qtype): | |
118 """ | |
119 Verify that the result is the same query type as what is expected. | |
120 """ | |
121 result = results[0] | |
122 self.assertEquals(str(result.name), self.hostname) | |
123 self.assertEquals(result.type, qtype) | |
124 | |
125 def checkGetHostByName(self, result): | |
126 """ | |
127 Test that the getHostByName query returns the 127.0.0.1 address. | |
128 """ | |
129 self.assertEquals(result, '127.0.0.1') | |
130 | |
131 def test_getHostByName(self): | |
132 """ | |
133 do a getHostByName of a value that should return 127.0.0.1. | |
134 """ | |
135 d = client.getHostByName(self.ghbntest) | |
136 d.addCallback(self.checkGetHostByName) | |
137 return d | |
138 | |
139 def test_lookupAddress(self): | |
140 """ | |
141 Do a lookup and test that the resolver will issue the correct type of | |
142 query type. We do this by checking that FakeResolver returns a result | |
143 record with the same query type as what we issued. | |
144 """ | |
145 d = client.lookupAddress(self.hostname) | |
146 d.addCallback(self.checkResult, dns.A) | |
147 return d | |
148 | |
149 def test_lookupIPV6Address(self): | |
150 """ | |
151 See L{test_lookupAddress} | |
152 """ | |
153 d = client.lookupIPV6Address(self.hostname) | |
154 d.addCallback(self.checkResult, dns.AAAA) | |
155 return d | |
156 | |
157 def test_lookupAddress6(self): | |
158 """ | |
159 See L{test_lookupAddress} | |
160 """ | |
161 d = client.lookupAddress6(self.hostname) | |
162 d.addCallback(self.checkResult, dns.A6) | |
163 return d | |
164 | |
165 def test_lookupNameservers(self): | |
166 """ | |
167 See L{test_lookupAddress} | |
168 """ | |
169 d = client.lookupNameservers(self.hostname) | |
170 d.addCallback(self.checkResult, dns.NS) | |
171 return d | |
172 | |
173 def test_lookupCanonicalName(self): | |
174 """ | |
175 See L{test_lookupAddress} | |
176 """ | |
177 d = client.lookupCanonicalName(self.hostname) | |
178 d.addCallback(self.checkResult, dns.CNAME) | |
179 return d | |
180 | |
181 def test_lookupAuthority(self): | |
182 """ | |
183 See L{test_lookupAddress} | |
184 """ | |
185 d = client.lookupAuthority(self.hostname) | |
186 d.addCallback(self.checkResult, dns.SOA) | |
187 return d | |
188 | |
189 def test_lookupMailBox(self): | |
190 """ | |
191 See L{test_lookupAddress} | |
192 """ | |
193 d = client.lookupMailBox(self.hostname) | |
194 d.addCallback(self.checkResult, dns.MB) | |
195 return d | |
196 | |
197 def test_lookupMailGroup(self): | |
198 """ | |
199 See L{test_lookupAddress} | |
200 """ | |
201 d = client.lookupMailGroup(self.hostname) | |
202 d.addCallback(self.checkResult, dns.MG) | |
203 return d | |
204 | |
205 def test_lookupMailRename(self): | |
206 """ | |
207 See L{test_lookupAddress} | |
208 """ | |
209 d = client.lookupMailRename(self.hostname) | |
210 d.addCallback(self.checkResult, dns.MR) | |
211 return d | |
212 | |
213 def test_lookupNull(self): | |
214 """ | |
215 See L{test_lookupAddress} | |
216 """ | |
217 d = client.lookupNull(self.hostname) | |
218 d.addCallback(self.checkResult, dns.NULL) | |
219 return d | |
220 | |
221 def test_lookupWellKnownServices(self): | |
222 """ | |
223 See L{test_lookupAddress} | |
224 """ | |
225 d = client.lookupWellKnownServices(self.hostname) | |
226 d.addCallback(self.checkResult, dns.WKS) | |
227 return d | |
228 | |
229 def test_lookupPointer(self): | |
230 """ | |
231 See L{test_lookupAddress} | |
232 """ | |
233 d = client.lookupPointer(self.hostname) | |
234 d.addCallback(self.checkResult, dns.PTR) | |
235 return d | |
236 | |
237 def test_lookupHostInfo(self): | |
238 """ | |
239 See L{test_lookupAddress} | |
240 """ | |
241 d = client.lookupHostInfo(self.hostname) | |
242 d.addCallback(self.checkResult, dns.HINFO) | |
243 return d | |
244 | |
245 def test_lookupMailboxInfo(self): | |
246 """ | |
247 See L{test_lookupAddress} | |
248 """ | |
249 d = client.lookupMailboxInfo(self.hostname) | |
250 d.addCallback(self.checkResult, dns.MINFO) | |
251 return d | |
252 | |
253 def test_lookupMailExchange(self): | |
254 """ | |
255 See L{test_lookupAddress} | |
256 """ | |
257 d = client.lookupMailExchange(self.hostname) | |
258 d.addCallback(self.checkResult, dns.MX) | |
259 return d | |
260 | |
261 def test_lookupText(self): | |
262 """ | |
263 See L{test_lookupAddress} | |
264 """ | |
265 d = client.lookupText(self.hostname) | |
266 d.addCallback(self.checkResult, dns.TXT) | |
267 return d | |
268 | |
269 def test_lookupResponsibility(self): | |
270 """ | |
271 See L{test_lookupAddress} | |
272 """ | |
273 d = client.lookupResponsibility(self.hostname) | |
274 d.addCallback(self.checkResult, dns.RP) | |
275 return d | |
276 | |
277 def test_lookupAFSDatabase(self): | |
278 """ | |
279 See L{test_lookupAddress} | |
280 """ | |
281 d = client.lookupAFSDatabase(self.hostname) | |
282 d.addCallback(self.checkResult, dns.AFSDB) | |
283 return d | |
284 | |
285 def test_lookupService(self): | |
286 """ | |
287 See L{test_lookupAddress} | |
288 """ | |
289 d = client.lookupService(self.hostname) | |
290 d.addCallback(self.checkResult, dns.SRV) | |
291 return d | |
292 | |
293 def test_lookupZone(self): | |
294 """ | |
295 See L{test_lookupAddress} | |
296 """ | |
297 d = client.lookupZone(self.hostname) | |
298 d.addCallback(self.checkResult, dns.AXFR) | |
299 return d | |
300 | |
301 def test_lookupAllRecords(self): | |
302 """ | |
303 See L{test_lookupAddress} | |
304 """ | |
305 d = client.lookupAllRecords(self.hostname) | |
306 d.addCallback(self.checkResult, dns.ALL_RECORDS) | |
307 return d | |
OLD | NEW |