Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(162)

Side by Side Diff: recipe_engine/third_party/requests/tests/test_requests.py

Issue 2164713003: Vendor requests. (Closed) Base URL: https://chromium.googlesource.com/external/github.com/luci/recipes-py@master
Patch Set: Fix deps.pyl Created 4 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 """Tests for Requests."""
5
6 from __future__ import division
7 import json
8 import os
9 import pickle
10 import collections
11 import contextlib
12
13 import io
14 import requests
15 import pytest
16 from requests.adapters import HTTPAdapter
17 from requests.auth import HTTPDigestAuth, _basic_auth_str
18 from requests.compat import (
19 Morsel, cookielib, getproxies, str, urlparse,
20 builtin_str, OrderedDict)
21 from requests.cookies import cookiejar_from_dict, morsel_to_cookie
22 from requests.exceptions import (
23 ConnectionError, ConnectTimeout, InvalidSchema, InvalidURL,
24 MissingSchema, ReadTimeout, Timeout, RetryError, TooManyRedirects,
25 ProxyError)
26 from requests.models import PreparedRequest
27 from requests.structures import CaseInsensitiveDict
28 from requests.sessions import SessionRedirectMixin
29 from requests.models import urlencode
30 from requests.hooks import default_hooks
31
32 from .compat import StringIO, u
33
34 # Requests to this URL should always fail with a connection timeout (nothing
35 # listening on that port)
36 TARPIT = 'http://10.255.255.1'
37
38
39 class TestRequests:
40
41 def test_entry_points(self):
42
43 requests.session
44 requests.session().get
45 requests.session().head
46 requests.get
47 requests.head
48 requests.put
49 requests.patch
50 requests.post
51
52 @pytest.mark.parametrize(
53 'exception, url', (
54 (MissingSchema, 'hiwpefhipowhefopw'),
55 (InvalidSchema, 'localhost:3128'),
56 (InvalidSchema, 'localhost.localdomain:3128/'),
57 (InvalidSchema, '10.122.1.1:3128/'),
58 (InvalidURL, 'http://'),
59 ))
60 def test_invalid_url(self, exception, url):
61 with pytest.raises(exception):
62 requests.get(url)
63
64 def test_basic_building(self):
65 req = requests.Request()
66 req.url = 'http://kennethreitz.org/'
67 req.data = {'life': '42'}
68
69 pr = req.prepare()
70 assert pr.url == req.url
71 assert pr.body == 'life=42'
72
73 @pytest.mark.parametrize('method', ('GET', 'HEAD'))
74 def test_no_content_length(self, httpbin, method):
75 req = requests.Request(method, httpbin(method.lower())).prepare()
76 assert 'Content-Length' not in req.headers
77
78 def test_override_content_length(self, httpbin):
79 headers = {
80 'Content-Length': 'not zero'
81 }
82 r = requests.Request('POST', httpbin('post'), headers=headers).prepare()
83 assert 'Content-Length' in r.headers
84 assert r.headers['Content-Length'] == 'not zero'
85
86 def test_path_is_not_double_encoded(self):
87 request = requests.Request('GET', "http://0.0.0.0/get/test case").prepar e()
88
89 assert request.path_url == '/get/test%20case'
90
91 @pytest.mark.parametrize(
92 'url, expected', (
93 ('http://example.com/path#fragment', 'http://example.com/path?a=b#fr agment'),
94 ('http://example.com/path?key=value#fragment', 'http://example.com/p ath?key=value&a=b#fragment')
95 ))
96 def test_params_are_added_before_fragment(self, url, expected):
97 request = requests.Request('GET', url, params={"a": "b"}).prepare()
98 assert request.url == expected
99
100 def test_params_original_order_is_preserved_by_default(self):
101 param_ordered_dict = OrderedDict((('z', 1), ('a', 1), ('k', 1), ('d', 1) ))
102 session = requests.Session()
103 request = requests.Request('GET', 'http://example.com/', params=param_or dered_dict)
104 prep = session.prepare_request(request)
105 assert prep.url == 'http://example.com/?z=1&a=1&k=1&d=1'
106
107 def test_params_bytes_are_encoded(self):
108 request = requests.Request('GET', 'http://example.com',
109 params=b'test=foo').prepare()
110 assert request.url == 'http://example.com/?test=foo'
111
112 def test_binary_put(self):
113 request = requests.Request('PUT', 'http://example.com',
114 data=u"ööö".encode("utf-8")).prepare()
115 assert isinstance(request.body, bytes)
116
117 @pytest.mark.parametrize('scheme', ('http://', 'HTTP://', 'hTTp://', 'HttP:/ /'))
118 def test_mixed_case_scheme_acceptable(self, httpbin, scheme):
119 s = requests.Session()
120 s.proxies = getproxies()
121 parts = urlparse(httpbin('get'))
122 url = scheme + parts.netloc + parts.path
123 r = requests.Request('GET', url)
124 r = s.send(r.prepare())
125 assert r.status_code == 200, 'failed for scheme {0}'.format(scheme)
126
127 def test_HTTP_200_OK_GET_ALTERNATIVE(self, httpbin):
128 r = requests.Request('GET', httpbin('get'))
129 s = requests.Session()
130 s.proxies = getproxies()
131
132 r = s.send(r.prepare())
133
134 assert r.status_code == 200
135
136 def test_HTTP_302_ALLOW_REDIRECT_GET(self, httpbin):
137 r = requests.get(httpbin('redirect', '1'))
138 assert r.status_code == 200
139 assert r.history[0].status_code == 302
140 assert r.history[0].is_redirect
141
142 def test_HTTP_302_TOO_MANY_REDIRECTS(self, httpbin):
143 try:
144 requests.get(httpbin('relative-redirect', '50'))
145 except TooManyRedirects as e:
146 url = httpbin('relative-redirect', '20')
147 assert e.request.url == url
148 assert e.response.url == url
149 assert len(e.response.history) == 30
150 else:
151 pytest.fail('Expected redirect to raise TooManyRedirects but it did not')
152
153 def test_HTTP_302_TOO_MANY_REDIRECTS_WITH_PARAMS(self, httpbin):
154 s = requests.session()
155 s.max_redirects = 5
156 try:
157 s.get(httpbin('relative-redirect', '50'))
158 except TooManyRedirects as e:
159 url = httpbin('relative-redirect', '45')
160 assert e.request.url == url
161 assert e.response.url == url
162 assert len(e.response.history) == 5
163 else:
164 pytest.fail('Expected custom max number of redirects to be respected but was not')
165
166 def test_http_301_changes_post_to_get(self, httpbin):
167 r = requests.post(httpbin('status', '301'))
168 assert r.status_code == 200
169 assert r.request.method == 'GET'
170 assert r.history[0].status_code == 301
171 assert r.history[0].is_redirect
172
173 def test_http_301_doesnt_change_head_to_get(self, httpbin):
174 r = requests.head(httpbin('status', '301'), allow_redirects=True)
175 print(r.content)
176 assert r.status_code == 200
177 assert r.request.method == 'HEAD'
178 assert r.history[0].status_code == 301
179 assert r.history[0].is_redirect
180
181 def test_http_302_changes_post_to_get(self, httpbin):
182 r = requests.post(httpbin('status', '302'))
183 assert r.status_code == 200
184 assert r.request.method == 'GET'
185 assert r.history[0].status_code == 302
186 assert r.history[0].is_redirect
187
188 def test_http_302_doesnt_change_head_to_get(self, httpbin):
189 r = requests.head(httpbin('status', '302'), allow_redirects=True)
190 assert r.status_code == 200
191 assert r.request.method == 'HEAD'
192 assert r.history[0].status_code == 302
193 assert r.history[0].is_redirect
194
195 def test_http_303_changes_post_to_get(self, httpbin):
196 r = requests.post(httpbin('status', '303'))
197 assert r.status_code == 200
198 assert r.request.method == 'GET'
199 assert r.history[0].status_code == 303
200 assert r.history[0].is_redirect
201
202 def test_http_303_doesnt_change_head_to_get(self, httpbin):
203 r = requests.head(httpbin('status', '303'), allow_redirects=True)
204 assert r.status_code == 200
205 assert r.request.method == 'HEAD'
206 assert r.history[0].status_code == 303
207 assert r.history[0].is_redirect
208
209 # def test_HTTP_302_ALLOW_REDIRECT_POST(self):
210 # r = requests.post(httpbin('status', '302'), data={'some': 'data'})
211 # self.assertEqual(r.status_code, 200)
212
213 def test_HTTP_200_OK_GET_WITH_PARAMS(self, httpbin):
214 heads = {'User-agent': 'Mozilla/5.0'}
215
216 r = requests.get(httpbin('user-agent'), headers=heads)
217
218 assert heads['User-agent'] in r.text
219 assert r.status_code == 200
220
221 def test_HTTP_200_OK_GET_WITH_MIXED_PARAMS(self, httpbin):
222 heads = {'User-agent': 'Mozilla/5.0'}
223
224 r = requests.get(httpbin('get') + '?test=true', params={'q': 'test'}, he aders=heads)
225 assert r.status_code == 200
226
227 def test_set_cookie_on_301(self, httpbin):
228 s = requests.session()
229 url = httpbin('cookies/set?foo=bar')
230 s.get(url)
231 assert s.cookies['foo'] == 'bar'
232
233 def test_cookie_sent_on_redirect(self, httpbin):
234 s = requests.session()
235 s.get(httpbin('cookies/set?foo=bar'))
236 r = s.get(httpbin('redirect/1')) # redirects to httpbin('get')
237 assert 'Cookie' in r.json()['headers']
238
239 def test_cookie_removed_on_expire(self, httpbin):
240 s = requests.session()
241 s.get(httpbin('cookies/set?foo=bar'))
242 assert s.cookies['foo'] == 'bar'
243 s.get(
244 httpbin('response-headers'),
245 params={
246 'Set-Cookie':
247 'foo=deleted; expires=Thu, 01-Jan-1970 00:00:01 GMT'
248 }
249 )
250 assert 'foo' not in s.cookies
251
252 def test_cookie_quote_wrapped(self, httpbin):
253 s = requests.session()
254 s.get(httpbin('cookies/set?foo="bar:baz"'))
255 assert s.cookies['foo'] == '"bar:baz"'
256
257 def test_cookie_persists_via_api(self, httpbin):
258 s = requests.session()
259 r = s.get(httpbin('redirect/1'), cookies={'foo': 'bar'})
260 assert 'foo' in r.request.headers['Cookie']
261 assert 'foo' in r.history[0].request.headers['Cookie']
262
263 def test_request_cookie_overrides_session_cookie(self, httpbin):
264 s = requests.session()
265 s.cookies['foo'] = 'bar'
266 r = s.get(httpbin('cookies'), cookies={'foo': 'baz'})
267 assert r.json()['cookies']['foo'] == 'baz'
268 # Session cookie should not be modified
269 assert s.cookies['foo'] == 'bar'
270
271 def test_request_cookies_not_persisted(self, httpbin):
272 s = requests.session()
273 s.get(httpbin('cookies'), cookies={'foo': 'baz'})
274 # Sending a request with cookies should not add cookies to the session
275 assert not s.cookies
276
277 def test_generic_cookiejar_works(self, httpbin):
278 cj = cookielib.CookieJar()
279 cookiejar_from_dict({'foo': 'bar'}, cj)
280 s = requests.session()
281 s.cookies = cj
282 r = s.get(httpbin('cookies'))
283 # Make sure the cookie was sent
284 assert r.json()['cookies']['foo'] == 'bar'
285 # Make sure the session cj is still the custom one
286 assert s.cookies is cj
287
288 def test_param_cookiejar_works(self, httpbin):
289 cj = cookielib.CookieJar()
290 cookiejar_from_dict({'foo': 'bar'}, cj)
291 s = requests.session()
292 r = s.get(httpbin('cookies'), cookies=cj)
293 # Make sure the cookie was sent
294 assert r.json()['cookies']['foo'] == 'bar'
295
296 def test_requests_in_history_are_not_overridden(self, httpbin):
297 resp = requests.get(httpbin('redirect/3'))
298 urls = [r.url for r in resp.history]
299 req_urls = [r.request.url for r in resp.history]
300 assert urls == req_urls
301
302 def test_history_is_always_a_list(self, httpbin):
303 """Show that even with redirects, Response.history is always a list."""
304 resp = requests.get(httpbin('get'))
305 assert isinstance(resp.history, list)
306 resp = requests.get(httpbin('redirect/1'))
307 assert isinstance(resp.history, list)
308 assert not isinstance(resp.history, tuple)
309
310 def test_headers_on_session_with_None_are_not_sent(self, httpbin):
311 """Do not send headers in Session.headers with None values."""
312 ses = requests.Session()
313 ses.headers['Accept-Encoding'] = None
314 req = requests.Request('GET', httpbin('get'))
315 prep = ses.prepare_request(req)
316 assert 'Accept-Encoding' not in prep.headers
317
318 def test_headers_preserve_order(self, httpbin):
319 """Preserve order when headers provided as OrderedDict."""
320 ses = requests.Session()
321 ses.headers = OrderedDict()
322 ses.headers['Accept-Encoding'] = 'identity'
323 ses.headers['First'] = '1'
324 ses.headers['Second'] = '2'
325 headers = OrderedDict([('Third', '3'), ('Fourth', '4')])
326 headers['Fifth'] = '5'
327 headers['Second'] = '222'
328 req = requests.Request('GET', httpbin('get'), headers=headers)
329 prep = ses.prepare_request(req)
330 items = list(prep.headers.items())
331 assert items[0] == ('Accept-Encoding', 'identity')
332 assert items[1] == ('First', '1')
333 assert items[2] == ('Second', '222')
334 assert items[3] == ('Third', '3')
335 assert items[4] == ('Fourth', '4')
336 assert items[5] == ('Fifth', '5')
337
338 @pytest.mark.parametrize('key', ('User-agent', 'user-agent'))
339 def test_user_agent_transfers(self, httpbin, key):
340
341 heads = {key: 'Mozilla/5.0 (github.com/kennethreitz/requests)'}
342
343 r = requests.get(httpbin('user-agent'), headers=heads)
344 assert heads[key] in r.text
345
346 def test_HTTP_200_OK_HEAD(self, httpbin):
347 r = requests.head(httpbin('get'))
348 assert r.status_code == 200
349
350 def test_HTTP_200_OK_PUT(self, httpbin):
351 r = requests.put(httpbin('put'))
352 assert r.status_code == 200
353
354 def test_BASICAUTH_TUPLE_HTTP_200_OK_GET(self, httpbin):
355 auth = ('user', 'pass')
356 url = httpbin('basic-auth', 'user', 'pass')
357
358 r = requests.get(url, auth=auth)
359 assert r.status_code == 200
360
361 r = requests.get(url)
362 assert r.status_code == 401
363
364 s = requests.session()
365 s.auth = auth
366 r = s.get(url)
367 assert r.status_code == 200
368
369 @pytest.mark.parametrize(
370 'url, exception', (
371 # Connecting to an unknown domain should raise a ConnectionError
372 ('http://doesnotexist.google.com', ConnectionError),
373 # Connecting to an invalid port should raise a ConnectionError
374 ('http://localhost:1', ConnectionError),
375 # Inputing a URL that cannot be parsed should raise an InvalidURL er ror
376 ('http://fe80::5054:ff:fe5a:fc0', InvalidURL)
377 ))
378 def test_errors(self, url, exception):
379 with pytest.raises(exception):
380 requests.get(url, timeout=1)
381
382 def test_proxy_error(self):
383 # any proxy related error (address resolution, no route to host, etc) sh ould result in a ProxyError
384 with pytest.raises(ProxyError):
385 requests.get('http://localhost:1', proxies={'http': 'non-resolvable- address'})
386
387 def test_basicauth_with_netrc(self, httpbin):
388 auth = ('user', 'pass')
389 wrong_auth = ('wronguser', 'wrongpass')
390 url = httpbin('basic-auth', 'user', 'pass')
391
392 old_auth = requests.sessions.get_netrc_auth
393
394 try:
395 def get_netrc_auth_mock(url):
396 return auth
397 requests.sessions.get_netrc_auth = get_netrc_auth_mock
398
399 # Should use netrc and work.
400 r = requests.get(url)
401 assert r.status_code == 200
402
403 # Given auth should override and fail.
404 r = requests.get(url, auth=wrong_auth)
405 assert r.status_code == 401
406
407 s = requests.session()
408
409 # Should use netrc and work.
410 r = s.get(url)
411 assert r.status_code == 200
412
413 # Given auth should override and fail.
414 s.auth = wrong_auth
415 r = s.get(url)
416 assert r.status_code == 401
417 finally:
418 requests.sessions.get_netrc_auth = old_auth
419
420 def test_DIGEST_HTTP_200_OK_GET(self, httpbin):
421
422 auth = HTTPDigestAuth('user', 'pass')
423 url = httpbin('digest-auth', 'auth', 'user', 'pass')
424
425 r = requests.get(url, auth=auth)
426 assert r.status_code == 200
427
428 r = requests.get(url)
429 assert r.status_code == 401
430
431 s = requests.session()
432 s.auth = HTTPDigestAuth('user', 'pass')
433 r = s.get(url)
434 assert r.status_code == 200
435
436 def test_DIGEST_AUTH_RETURNS_COOKIE(self, httpbin):
437 url = httpbin('digest-auth', 'auth', 'user', 'pass')
438 auth = HTTPDigestAuth('user', 'pass')
439 r = requests.get(url)
440 assert r.cookies['fake'] == 'fake_value'
441
442 r = requests.get(url, auth=auth)
443 assert r.status_code == 200
444
445 def test_DIGEST_AUTH_SETS_SESSION_COOKIES(self, httpbin):
446 url = httpbin('digest-auth', 'auth', 'user', 'pass')
447 auth = HTTPDigestAuth('user', 'pass')
448 s = requests.Session()
449 s.get(url, auth=auth)
450 assert s.cookies['fake'] == 'fake_value'
451
452 def test_DIGEST_STREAM(self, httpbin):
453
454 auth = HTTPDigestAuth('user', 'pass')
455 url = httpbin('digest-auth', 'auth', 'user', 'pass')
456
457 r = requests.get(url, auth=auth, stream=True)
458 assert r.raw.read() != b''
459
460 r = requests.get(url, auth=auth, stream=False)
461 assert r.raw.read() == b''
462
463 def test_DIGESTAUTH_WRONG_HTTP_401_GET(self, httpbin):
464
465 auth = HTTPDigestAuth('user', 'wrongpass')
466 url = httpbin('digest-auth', 'auth', 'user', 'pass')
467
468 r = requests.get(url, auth=auth)
469 assert r.status_code == 401
470
471 r = requests.get(url)
472 assert r.status_code == 401
473
474 s = requests.session()
475 s.auth = auth
476 r = s.get(url)
477 assert r.status_code == 401
478
479 def test_DIGESTAUTH_QUOTES_QOP_VALUE(self, httpbin):
480
481 auth = HTTPDigestAuth('user', 'pass')
482 url = httpbin('digest-auth', 'auth', 'user', 'pass')
483
484 r = requests.get(url, auth=auth)
485 assert '"auth"' in r.request.headers['Authorization']
486
487 def test_POSTBIN_GET_POST_FILES(self, httpbin):
488
489 url = httpbin('post')
490 requests.post(url).raise_for_status()
491
492 post1 = requests.post(url, data={'some': 'data'})
493 assert post1.status_code == 200
494
495 with open('requirements.txt') as f:
496 post2 = requests.post(url, files={'some': f})
497 assert post2.status_code == 200
498
499 post4 = requests.post(url, data='[{"some": "json"}]')
500 assert post4.status_code == 200
501
502 with pytest.raises(ValueError):
503 requests.post(url, files=['bad file data'])
504
505 def test_POSTBIN_SEEKED_OBJECT_WITH_NO_ITER(self, httpbin):
506
507 class TestStream(object):
508 def __init__(self, data):
509 self.data = data.encode()
510 self.length = len(self.data)
511 self.index = 0
512
513 def __len__(self):
514 return self.length
515
516 def read(self, size=None):
517 if size:
518 ret = self.data[self.index:self.index + size]
519 self.index += size
520 else:
521 ret = self.data[self.index:]
522 self.index = self.length
523 return ret
524
525 def tell(self):
526 return self.index
527
528 def seek(self, offset, where=0):
529 if where == 0:
530 self.index = offset
531 elif where == 1:
532 self.index += offset
533 elif where == 2:
534 self.index = self.length + offset
535
536 test = TestStream('test')
537 post1 = requests.post(httpbin('post'), data=test)
538 assert post1.status_code == 200
539 assert post1.json()['data'] == 'test'
540
541 test = TestStream('test')
542 test.seek(2)
543 post2 = requests.post(httpbin('post'), data=test)
544 assert post2.status_code == 200
545 assert post2.json()['data'] == 'st'
546
547 def test_POSTBIN_GET_POST_FILES_WITH_DATA(self, httpbin):
548
549 url = httpbin('post')
550 requests.post(url).raise_for_status()
551
552 post1 = requests.post(url, data={'some': 'data'})
553 assert post1.status_code == 200
554
555 with open('requirements.txt') as f:
556 post2 = requests.post(url,
557 data={'some': 'data'}, files={'some': f})
558 assert post2.status_code == 200
559
560 post4 = requests.post(url, data='[{"some": "json"}]')
561 assert post4.status_code == 200
562
563 with pytest.raises(ValueError):
564 requests.post(url, files=['bad file data'])
565
566 def test_conflicting_post_params(self, httpbin):
567 url = httpbin('post')
568 with open('requirements.txt') as f:
569 pytest.raises(ValueError, "requests.post(url, data='[{\"some\": \"da ta\"}]', files={'some': f})")
570 pytest.raises(ValueError, "requests.post(url, data=u('[{\"some\": \" data\"}]'), files={'some': f})")
571
572 def test_request_ok_set(self, httpbin):
573 r = requests.get(httpbin('status', '404'))
574 assert not r.ok
575
576 def test_status_raising(self, httpbin):
577 r = requests.get(httpbin('status', '404'))
578 with pytest.raises(requests.exceptions.HTTPError):
579 r.raise_for_status()
580
581 r = requests.get(httpbin('status', '500'))
582 assert not r.ok
583
584 def test_decompress_gzip(self, httpbin):
585 r = requests.get(httpbin('gzip'))
586 r.content.decode('ascii')
587
588 @pytest.mark.parametrize(
589 'url, params', (
590 ('/get', {'foo': 'føø'}),
591 ('/get', {'føø': 'føø'}),
592 ('/get', {'føø': 'føø'}),
593 ('/get', {'foo': 'foo'}),
594 ('ø', {'foo': 'foo'}),
595 ))
596 def test_unicode_get(self, httpbin, url, params):
597 requests.get(httpbin(url), params=params)
598
599 def test_unicode_header_name(self, httpbin):
600 requests.put(
601 httpbin('put'),
602 headers={str('Content-Type'): 'application/octet-stream'},
603 data='\xff') # compat.str is unicode.
604
605 def test_pyopenssl_redirect(self, httpbin_secure, httpbin_ca_bundle):
606 requests.get(httpbin_secure('status', '301'), verify=httpbin_ca_bundle)
607
608 def test_urlencoded_get_query_multivalued_param(self, httpbin):
609
610 r = requests.get(httpbin('get'), params=dict(test=['foo', 'baz']))
611 assert r.status_code == 200
612 assert r.url == httpbin('get?test=foo&test=baz')
613
614 def test_different_encodings_dont_break_post(self, httpbin):
615 r = requests.post(httpbin('post'),
616 data={'stuff': json.dumps({'a': 123})},
617 params={'blah': 'asdf1234'},
618 files={'file': ('test_requests.py', open(__file__, 'rb'))})
619 assert r.status_code == 200
620
621 @pytest.mark.parametrize(
622 'data', (
623 {'stuff': u('ëlïxr')},
624 {'stuff': u('ëlïxr').encode('utf-8')},
625 {'stuff': 'elixr'},
626 {'stuff': 'elixr'.encode('utf-8')},
627 ))
628 def test_unicode_multipart_post(self, httpbin, data):
629 r = requests.post(httpbin('post'),
630 data=data,
631 files={'file': ('test_requests.py', open(__file__, 'rb'))})
632 assert r.status_code == 200
633
634 def test_unicode_multipart_post_fieldnames(self, httpbin):
635 filename = os.path.splitext(__file__)[0] + '.py'
636 r = requests.Request(
637 method='POST', url=httpbin('post'),
638 data={'stuff'.encode('utf-8'): 'elixr'},
639 files={'file': ('test_requests.py', open(filename, 'rb'))})
640 prep = r.prepare()
641 assert b'name="stuff"' in prep.body
642 assert b'name="b\'stuff\'"' not in prep.body
643
644 def test_unicode_method_name(self, httpbin):
645 files = {'file': open(__file__, 'rb')}
646 r = requests.request(
647 method=u('POST'), url=httpbin('post'), files=files)
648 assert r.status_code == 200
649
650 def test_unicode_method_name_with_request_object(self, httpbin):
651 files = {'file': open(__file__, 'rb')}
652 s = requests.Session()
653 req = requests.Request(u('POST'), httpbin('post'), files=files)
654 prep = s.prepare_request(req)
655 assert isinstance(prep.method, builtin_str)
656 assert prep.method == 'POST'
657
658 resp = s.send(prep)
659 assert resp.status_code == 200
660
661 def test_non_prepared_request_error(self):
662 s = requests.Session()
663 req = requests.Request(u('POST'), '/')
664
665 with pytest.raises(ValueError) as e:
666 s.send(req)
667 assert str(e.value) == 'You can only send PreparedRequests.'
668
669 def test_custom_content_type(self, httpbin):
670 r = requests.post(
671 httpbin('post'),
672 data={'stuff': json.dumps({'a': 123})},
673 files={
674 'file1': ('test_requests.py', open(__file__, 'rb')),
675 'file2': ('test_requests', open(__file__, 'rb'),
676 'text/py-content-type')})
677 assert r.status_code == 200
678 assert b"text/py-content-type" in r.request.body
679
680 def test_hook_receives_request_arguments(self, httpbin):
681 def hook(resp, **kwargs):
682 assert resp is not None
683 assert kwargs != {}
684
685 s = requests.Session()
686 r = requests.Request('GET', httpbin(), hooks={'response': hook})
687 prep = s.prepare_request(r)
688 s.send(prep)
689
690 def test_session_hooks_are_used_with_no_request_hooks(self, httpbin):
691 hook = lambda x, *args, **kwargs: x
692 s = requests.Session()
693 s.hooks['response'].append(hook)
694 r = requests.Request('GET', httpbin())
695 prep = s.prepare_request(r)
696 assert prep.hooks['response'] != []
697 assert prep.hooks['response'] == [hook]
698
699 def test_session_hooks_are_overridden_by_request_hooks(self, httpbin):
700 hook1 = lambda x, *args, **kwargs: x
701 hook2 = lambda x, *args, **kwargs: x
702 assert hook1 is not hook2
703 s = requests.Session()
704 s.hooks['response'].append(hook2)
705 r = requests.Request('GET', httpbin(), hooks={'response': [hook1]})
706 prep = s.prepare_request(r)
707 assert prep.hooks['response'] == [hook1]
708
709 def test_prepared_request_hook(self, httpbin):
710 def hook(resp, **kwargs):
711 resp.hook_working = True
712 return resp
713
714 req = requests.Request('GET', httpbin(), hooks={'response': hook})
715 prep = req.prepare()
716
717 s = requests.Session()
718 s.proxies = getproxies()
719 resp = s.send(prep)
720
721 assert hasattr(resp, 'hook_working')
722
723 def test_prepared_from_session(self, httpbin):
724 class DummyAuth(requests.auth.AuthBase):
725 def __call__(self, r):
726 r.headers['Dummy-Auth-Test'] = 'dummy-auth-test-ok'
727 return r
728
729 req = requests.Request('GET', httpbin('headers'))
730 assert not req.auth
731
732 s = requests.Session()
733 s.auth = DummyAuth()
734
735 prep = s.prepare_request(req)
736 resp = s.send(prep)
737
738 assert resp.json()['headers'][
739 'Dummy-Auth-Test'] == 'dummy-auth-test-ok'
740
741 def test_prepare_request_with_bytestring_url(self):
742 req = requests.Request('GET', b'https://httpbin.org/')
743 s = requests.Session()
744 prep = s.prepare_request(req)
745 assert prep.url == "https://httpbin.org/"
746
747 def test_links(self):
748 r = requests.Response()
749 r.headers = {
750 'cache-control': 'public, max-age=60, s-maxage=60',
751 'connection': 'keep-alive',
752 'content-encoding': 'gzip',
753 'content-type': 'application/json; charset=utf-8',
754 'date': 'Sat, 26 Jan 2013 16:47:56 GMT',
755 'etag': '"6ff6a73c0e446c1f61614769e3ceb778"',
756 'last-modified': 'Sat, 26 Jan 2013 16:22:39 GMT',
757 'link': ('<https://api.github.com/users/kennethreitz/repos?'
758 'page=2&per_page=10>; rel="next", <https://api.github.'
759 'com/users/kennethreitz/repos?page=7&per_page=10>; '
760 ' rel="last"'),
761 'server': 'GitHub.com',
762 'status': '200 OK',
763 'vary': 'Accept',
764 'x-content-type-options': 'nosniff',
765 'x-github-media-type': 'github.beta',
766 'x-ratelimit-limit': '60',
767 'x-ratelimit-remaining': '57'
768 }
769 assert r.links['next']['rel'] == 'next'
770
771 def test_cookie_parameters(self):
772 key = 'some_cookie'
773 value = 'some_value'
774 secure = True
775 domain = 'test.com'
776 rest = {'HttpOnly': True}
777
778 jar = requests.cookies.RequestsCookieJar()
779 jar.set(key, value, secure=secure, domain=domain, rest=rest)
780
781 assert len(jar) == 1
782 assert 'some_cookie' in jar
783
784 cookie = list(jar)[0]
785 assert cookie.secure == secure
786 assert cookie.domain == domain
787 assert cookie._rest['HttpOnly'] == rest['HttpOnly']
788
789 def test_cookie_as_dict_keeps_len(self):
790 key = 'some_cookie'
791 value = 'some_value'
792
793 key1 = 'some_cookie1'
794 value1 = 'some_value1'
795
796 jar = requests.cookies.RequestsCookieJar()
797 jar.set(key, value)
798 jar.set(key1, value1)
799
800 d1 = dict(jar)
801 d2 = dict(jar.iteritems())
802 d3 = dict(jar.items())
803
804 assert len(jar) == 2
805 assert len(d1) == 2
806 assert len(d2) == 2
807 assert len(d3) == 2
808
809 def test_cookie_as_dict_keeps_items(self):
810 key = 'some_cookie'
811 value = 'some_value'
812
813 key1 = 'some_cookie1'
814 value1 = 'some_value1'
815
816 jar = requests.cookies.RequestsCookieJar()
817 jar.set(key, value)
818 jar.set(key1, value1)
819
820 d1 = dict(jar)
821 d2 = dict(jar.iteritems())
822 d3 = dict(jar.items())
823
824 assert d1['some_cookie'] == 'some_value'
825 assert d2['some_cookie'] == 'some_value'
826 assert d3['some_cookie1'] == 'some_value1'
827
828 def test_cookie_as_dict_keys(self):
829 key = 'some_cookie'
830 value = 'some_value'
831
832 key1 = 'some_cookie1'
833 value1 = 'some_value1'
834
835 jar = requests.cookies.RequestsCookieJar()
836 jar.set(key, value)
837 jar.set(key1, value1)
838
839 keys = jar.keys()
840 assert keys == list(keys)
841 # make sure one can use keys multiple times
842 assert list(keys) == list(keys)
843
844 def test_cookie_as_dict_values(self):
845 key = 'some_cookie'
846 value = 'some_value'
847
848 key1 = 'some_cookie1'
849 value1 = 'some_value1'
850
851 jar = requests.cookies.RequestsCookieJar()
852 jar.set(key, value)
853 jar.set(key1, value1)
854
855 values = jar.values()
856 assert values == list(values)
857 # make sure one can use values multiple times
858 assert list(values) == list(values)
859
860 def test_cookie_as_dict_items(self):
861 key = 'some_cookie'
862 value = 'some_value'
863
864 key1 = 'some_cookie1'
865 value1 = 'some_value1'
866
867 jar = requests.cookies.RequestsCookieJar()
868 jar.set(key, value)
869 jar.set(key1, value1)
870
871 items = jar.items()
872 assert items == list(items)
873 # make sure one can use items multiple times
874 assert list(items) == list(items)
875
876 def test_cookie_duplicate_names_different_domains(self):
877 key = 'some_cookie'
878 value = 'some_value'
879 domain1 = 'test1.com'
880 domain2 = 'test2.com'
881
882 jar = requests.cookies.RequestsCookieJar()
883 jar.set(key, value, domain=domain1)
884 jar.set(key, value, domain=domain2)
885 assert key in jar
886 items = jar.items()
887 assert len(items) == 2
888
889 # Verify that CookieConflictError is raised if domain is not specified
890 with pytest.raises(requests.cookies.CookieConflictError):
891 jar.get(key)
892
893 # Verify that CookieConflictError is not raised if domain is specified
894 cookie = jar.get(key, domain=domain1)
895 assert cookie == value
896
897 def test_cookie_duplicate_names_raises_cookie_conflict_error(self):
898 key = 'some_cookie'
899 value = 'some_value'
900 path = 'some_path'
901
902 jar = requests.cookies.RequestsCookieJar()
903 jar.set(key, value, path=path)
904 jar.set(key, value)
905 with pytest.raises(requests.cookies.CookieConflictError):
906 jar.get(key)
907
908 def test_time_elapsed_blank(self, httpbin):
909 r = requests.get(httpbin('get'))
910 td = r.elapsed
911 total_seconds = ((td.microseconds + (td.seconds + td.days * 24 * 3600)
912 * 10**6) / 10**6)
913 assert total_seconds > 0.0
914
915 def test_response_is_iterable(self):
916 r = requests.Response()
917 io = StringIO.StringIO('abc')
918 read_ = io.read
919
920 def read_mock(amt, decode_content=None):
921 return read_(amt)
922 setattr(io, 'read', read_mock)
923 r.raw = io
924 assert next(iter(r))
925 io.close()
926
927 def test_response_decode_unicode(self):
928 """
929 When called with decode_unicode, Response.iter_content should always
930 return unicode.
931 """
932 r = requests.Response()
933 r._content_consumed = True
934 r._content = b'the content'
935 r.encoding = 'ascii'
936
937 chunks = r.iter_content(decode_unicode=True)
938 assert all(isinstance(chunk, str) for chunk in chunks)
939
940 # also for streaming
941 r = requests.Response()
942 r.raw = io.BytesIO(b'the content')
943 r.encoding = 'ascii'
944 chunks = r.iter_content(decode_unicode=True)
945 assert all(isinstance(chunk, str) for chunk in chunks)
946
947 def test_request_and_response_are_pickleable(self, httpbin):
948 r = requests.get(httpbin('get'))
949
950 # verify we can pickle the original request
951 assert pickle.loads(pickle.dumps(r.request))
952
953 # verify we can pickle the response and that we have access to
954 # the original request.
955 pr = pickle.loads(pickle.dumps(r))
956 assert r.request.url == pr.request.url
957 assert r.request.headers == pr.request.headers
958
959 def test_cannot_send_unprepared_requests(self, httpbin):
960 r = requests.Request(url=httpbin())
961 with pytest.raises(ValueError):
962 requests.Session().send(r)
963
964 def test_http_error(self):
965 error = requests.exceptions.HTTPError()
966 assert not error.response
967 response = requests.Response()
968 error = requests.exceptions.HTTPError(response=response)
969 assert error.response == response
970 error = requests.exceptions.HTTPError('message', response=response)
971 assert str(error) == 'message'
972 assert error.response == response
973
974 def test_session_pickling(self, httpbin):
975 r = requests.Request('GET', httpbin('get'))
976 s = requests.Session()
977
978 s = pickle.loads(pickle.dumps(s))
979 s.proxies = getproxies()
980
981 r = s.send(r.prepare())
982 assert r.status_code == 200
983
984 def test_fixes_1329(self, httpbin):
985 """
986 Ensure that header updates are done case-insensitively.
987 """
988 s = requests.Session()
989 s.headers.update({'ACCEPT': 'BOGUS'})
990 s.headers.update({'accept': 'application/json'})
991 r = s.get(httpbin('get'))
992 headers = r.request.headers
993 assert headers['accept'] == 'application/json'
994 assert headers['Accept'] == 'application/json'
995 assert headers['ACCEPT'] == 'application/json'
996
997 def test_uppercase_scheme_redirect(self, httpbin):
998 parts = urlparse(httpbin('html'))
999 url = "HTTP://" + parts.netloc + parts.path
1000 r = requests.get(httpbin('redirect-to'), params={'url': url})
1001 assert r.status_code == 200
1002 assert r.url.lower() == url.lower()
1003
1004 def test_transport_adapter_ordering(self):
1005 s = requests.Session()
1006 order = ['https://', 'http://']
1007 assert order == list(s.adapters)
1008 s.mount('http://git', HTTPAdapter())
1009 s.mount('http://github', HTTPAdapter())
1010 s.mount('http://github.com', HTTPAdapter())
1011 s.mount('http://github.com/about/', HTTPAdapter())
1012 order = [
1013 'http://github.com/about/',
1014 'http://github.com',
1015 'http://github',
1016 'http://git',
1017 'https://',
1018 'http://',
1019 ]
1020 assert order == list(s.adapters)
1021 s.mount('http://gittip', HTTPAdapter())
1022 s.mount('http://gittip.com', HTTPAdapter())
1023 s.mount('http://gittip.com/about/', HTTPAdapter())
1024 order = [
1025 'http://github.com/about/',
1026 'http://gittip.com/about/',
1027 'http://github.com',
1028 'http://gittip.com',
1029 'http://github',
1030 'http://gittip',
1031 'http://git',
1032 'https://',
1033 'http://',
1034 ]
1035 assert order == list(s.adapters)
1036 s2 = requests.Session()
1037 s2.adapters = {'http://': HTTPAdapter()}
1038 s2.mount('https://', HTTPAdapter())
1039 assert 'http://' in s2.adapters
1040 assert 'https://' in s2.adapters
1041
1042 def test_header_remove_is_case_insensitive(self, httpbin):
1043 # From issue #1321
1044 s = requests.Session()
1045 s.headers['foo'] = 'bar'
1046 r = s.get(httpbin('get'), headers={'FOO': None})
1047 assert 'foo' not in r.request.headers
1048
1049 def test_params_are_merged_case_sensitive(self, httpbin):
1050 s = requests.Session()
1051 s.params['foo'] = 'bar'
1052 r = s.get(httpbin('get'), params={'FOO': 'bar'})
1053 assert r.json()['args'] == {'foo': 'bar', 'FOO': 'bar'}
1054
1055 def test_long_authinfo_in_url(self):
1056 url = 'http://{0}:{1}@{2}:9000/path?query#frag'.format(
1057 'E8A3BE87-9E3F-4620-8858-95478E385B5B',
1058 'EA770032-DA4D-4D84-8CE9-29C6D910BF1E',
1059 'exactly-------------sixty-----------three------------characters',
1060 )
1061 r = requests.Request('GET', url).prepare()
1062 assert r.url == url
1063
1064 def test_header_keys_are_native(self, httpbin):
1065 headers = {u('unicode'): 'blah', 'byte'.encode('ascii'): 'blah'}
1066 r = requests.Request('GET', httpbin('get'), headers=headers)
1067 p = r.prepare()
1068
1069 # This is testing that they are builtin strings. A bit weird, but there
1070 # we go.
1071 assert 'unicode' in p.headers.keys()
1072 assert 'byte' in p.headers.keys()
1073
1074 @pytest.mark.parametrize('files', ('foo', b'foo', bytearray(b'foo')))
1075 def test_can_send_objects_with_files(self, httpbin, files):
1076 data = {'a': 'this is a string'}
1077 files = {'b': files}
1078 r = requests.Request('POST', httpbin('post'), data=data, files=files)
1079 p = r.prepare()
1080 assert 'multipart/form-data' in p.headers['Content-Type']
1081
1082 def test_can_send_file_object_with_non_string_filename(self, httpbin):
1083 f = io.BytesIO()
1084 f.name = 2
1085 r = requests.Request('POST', httpbin('post'), files={'f': f})
1086 p = r.prepare()
1087
1088 assert 'multipart/form-data' in p.headers['Content-Type']
1089
1090 def test_autoset_header_values_are_native(self, httpbin):
1091 data = 'this is a string'
1092 length = '16'
1093 req = requests.Request('POST', httpbin('post'), data=data)
1094 p = req.prepare()
1095
1096 assert p.headers['Content-Length'] == length
1097
1098 def test_nonhttp_schemes_dont_check_URLs(self):
1099 test_urls = (
1100 'data:image/gif;base64,R0lGODlhAQABAHAAACH5BAUAAAAALAAAAAABAAEAAAICR AEAOw==',
1101 'file:///etc/passwd',
1102 'magnet:?xt=urn:btih:be08f00302bc2d1d3cfa3af02024fa647a271431',
1103 )
1104 for test_url in test_urls:
1105 req = requests.Request('GET', test_url)
1106 preq = req.prepare()
1107 assert test_url == preq.url
1108
1109 def test_auth_is_stripped_on_redirect_off_host(self, httpbin):
1110 r = requests.get(
1111 httpbin('redirect-to'),
1112 params={'url': 'http://www.google.co.uk'},
1113 auth=('user', 'pass'),
1114 )
1115 assert r.history[0].request.headers['Authorization']
1116 assert not r.request.headers.get('Authorization', '')
1117
1118 def test_auth_is_retained_for_redirect_on_host(self, httpbin):
1119 r = requests.get(httpbin('redirect/1'), auth=('user', 'pass'))
1120 h1 = r.history[0].request.headers['Authorization']
1121 h2 = r.request.headers['Authorization']
1122
1123 assert h1 == h2
1124
1125 def test_manual_redirect_with_partial_body_read(self, httpbin):
1126 s = requests.Session()
1127 r1 = s.get(httpbin('redirect/2'), allow_redirects=False, stream=True)
1128 assert r1.is_redirect
1129 rg = s.resolve_redirects(r1, r1.request, stream=True)
1130
1131 # read only the first eight bytes of the response body,
1132 # then follow the redirect
1133 r1.iter_content(8)
1134 r2 = next(rg)
1135 assert r2.is_redirect
1136
1137 # read all of the response via iter_content,
1138 # then follow the redirect
1139 for _ in r2.iter_content():
1140 pass
1141 r3 = next(rg)
1142 assert not r3.is_redirect
1143
1144 def _patch_adapter_gzipped_redirect(self, session, url):
1145 adapter = session.get_adapter(url=url)
1146 org_build_response = adapter.build_response
1147 self._patched_response = False
1148
1149 def build_response(*args, **kwargs):
1150 resp = org_build_response(*args, **kwargs)
1151 if not self._patched_response:
1152 resp.raw.headers['content-encoding'] = 'gzip'
1153 self._patched_response = True
1154 return resp
1155
1156 adapter.build_response = build_response
1157
1158 def test_redirect_with_wrong_gzipped_header(self, httpbin):
1159 s = requests.Session()
1160 url = httpbin('redirect/1')
1161 self._patch_adapter_gzipped_redirect(s, url)
1162 s.get(url)
1163
1164 def test_basic_auth_str_is_always_native(self):
1165 s = _basic_auth_str("test", "test")
1166 assert isinstance(s, builtin_str)
1167 assert s == "Basic dGVzdDp0ZXN0"
1168
1169 def test_requests_history_is_saved(self, httpbin):
1170 r = requests.get(httpbin('redirect/5'))
1171 total = r.history[-1].history
1172 i = 0
1173 for item in r.history:
1174 assert item.history == total[0:i]
1175 i += 1
1176
1177 def test_json_param_post_content_type_works(self, httpbin):
1178 r = requests.post(
1179 httpbin('post'),
1180 json={'life': 42}
1181 )
1182 assert r.status_code == 200
1183 assert 'application/json' in r.request.headers['Content-Type']
1184 assert {'life': 42} == r.json()['json']
1185
1186 def test_json_param_post_should_not_override_data_param(self, httpbin):
1187 r = requests.Request(method='POST', url=httpbin('post'),
1188 data={'stuff': 'elixr'},
1189 json={'music': 'flute'})
1190 prep = r.prepare()
1191 assert 'stuff=elixr' == prep.body
1192
1193 def test_response_iter_lines(self, httpbin):
1194 r = requests.get(httpbin('stream/4'), stream=True)
1195 assert r.status_code == 200
1196
1197 it = r.iter_lines()
1198 next(it)
1199 assert len(list(it)) == 3
1200
1201 def test_unconsumed_session_response_closes_connection(self, httpbin):
1202 s = requests.session()
1203
1204 with contextlib.closing(s.get(httpbin('stream/4'), stream=True)) as resp onse:
1205 pass
1206
1207 assert response._content_consumed is False
1208 assert response.raw.closed
1209
1210 @pytest.mark.xfail
1211 def test_response_iter_lines_reentrant(self, httpbin):
1212 """Response.iter_lines() is not reentrant safe"""
1213 r = requests.get(httpbin('stream/4'), stream=True)
1214 assert r.status_code == 200
1215
1216 next(r.iter_lines())
1217 assert len(list(r.iter_lines())) == 3
1218
1219 def test_session_close_proxy_clear(self, mocker):
1220 proxies = {
1221 'one': mocker.Mock(),
1222 'two': mocker.Mock(),
1223 }
1224 session = requests.Session()
1225 mocker.patch.dict(session.adapters['http://'].proxy_manager, proxies)
1226 session.close()
1227 proxies['one'].clear.assert_called_once_with()
1228 proxies['two'].clear.assert_called_once_with()
1229
1230
1231 class TestCaseInsensitiveDict:
1232
1233 @pytest.mark.parametrize(
1234 'cid', (
1235 CaseInsensitiveDict({'Foo': 'foo', 'BAr': 'bar'}),
1236 CaseInsensitiveDict([('Foo', 'foo'), ('BAr', 'bar')]),
1237 CaseInsensitiveDict(FOO='foo', BAr='bar'),
1238 ))
1239 def test_init(self, cid):
1240 assert len(cid) == 2
1241 assert 'foo' in cid
1242 assert 'bar' in cid
1243
1244 def test_docstring_example(self):
1245 cid = CaseInsensitiveDict()
1246 cid['Accept'] = 'application/json'
1247 assert cid['aCCEPT'] == 'application/json'
1248 assert list(cid) == ['Accept']
1249
1250 def test_len(self):
1251 cid = CaseInsensitiveDict({'a': 'a', 'b': 'b'})
1252 cid['A'] = 'a'
1253 assert len(cid) == 2
1254
1255 def test_getitem(self):
1256 cid = CaseInsensitiveDict({'Spam': 'blueval'})
1257 assert cid['spam'] == 'blueval'
1258 assert cid['SPAM'] == 'blueval'
1259
1260 def test_fixes_649(self):
1261 """__setitem__ should behave case-insensitively."""
1262 cid = CaseInsensitiveDict()
1263 cid['spam'] = 'oneval'
1264 cid['Spam'] = 'twoval'
1265 cid['sPAM'] = 'redval'
1266 cid['SPAM'] = 'blueval'
1267 assert cid['spam'] == 'blueval'
1268 assert cid['SPAM'] == 'blueval'
1269 assert list(cid.keys()) == ['SPAM']
1270
1271 def test_delitem(self):
1272 cid = CaseInsensitiveDict()
1273 cid['Spam'] = 'someval'
1274 del cid['sPam']
1275 assert 'spam' not in cid
1276 assert len(cid) == 0
1277
1278 def test_contains(self):
1279 cid = CaseInsensitiveDict()
1280 cid['Spam'] = 'someval'
1281 assert 'Spam' in cid
1282 assert 'spam' in cid
1283 assert 'SPAM' in cid
1284 assert 'sPam' in cid
1285 assert 'notspam' not in cid
1286
1287 def test_get(self):
1288 cid = CaseInsensitiveDict()
1289 cid['spam'] = 'oneval'
1290 cid['SPAM'] = 'blueval'
1291 assert cid.get('spam') == 'blueval'
1292 assert cid.get('SPAM') == 'blueval'
1293 assert cid.get('sPam') == 'blueval'
1294 assert cid.get('notspam', 'default') == 'default'
1295
1296 def test_update(self):
1297 cid = CaseInsensitiveDict()
1298 cid['spam'] = 'blueval'
1299 cid.update({'sPam': 'notblueval'})
1300 assert cid['spam'] == 'notblueval'
1301 cid = CaseInsensitiveDict({'Foo': 'foo', 'BAr': 'bar'})
1302 cid.update({'fOO': 'anotherfoo', 'bAR': 'anotherbar'})
1303 assert len(cid) == 2
1304 assert cid['foo'] == 'anotherfoo'
1305 assert cid['bar'] == 'anotherbar'
1306
1307 def test_update_retains_unchanged(self):
1308 cid = CaseInsensitiveDict({'foo': 'foo', 'bar': 'bar'})
1309 cid.update({'foo': 'newfoo'})
1310 assert cid['bar'] == 'bar'
1311
1312 def test_iter(self):
1313 cid = CaseInsensitiveDict({'Spam': 'spam', 'Eggs': 'eggs'})
1314 keys = frozenset(['Spam', 'Eggs'])
1315 assert frozenset(iter(cid)) == keys
1316
1317 def test_equality(self):
1318 cid = CaseInsensitiveDict({'SPAM': 'blueval', 'Eggs': 'redval'})
1319 othercid = CaseInsensitiveDict({'spam': 'blueval', 'eggs': 'redval'})
1320 assert cid == othercid
1321 del othercid['spam']
1322 assert cid != othercid
1323 assert cid == {'spam': 'blueval', 'eggs': 'redval'}
1324 assert cid != object()
1325
1326 def test_setdefault(self):
1327 cid = CaseInsensitiveDict({'Spam': 'blueval'})
1328 assert cid.setdefault('spam', 'notblueval') == 'blueval'
1329 assert cid.setdefault('notspam', 'notblueval') == 'notblueval'
1330
1331 def test_lower_items(self):
1332 cid = CaseInsensitiveDict({
1333 'Accept': 'application/json',
1334 'user-Agent': 'requests',
1335 })
1336 keyset = frozenset(lowerkey for lowerkey, v in cid.lower_items())
1337 lowerkeyset = frozenset(['accept', 'user-agent'])
1338 assert keyset == lowerkeyset
1339
1340 def test_preserve_key_case(self):
1341 cid = CaseInsensitiveDict({
1342 'Accept': 'application/json',
1343 'user-Agent': 'requests',
1344 })
1345 keyset = frozenset(['Accept', 'user-Agent'])
1346 assert frozenset(i[0] for i in cid.items()) == keyset
1347 assert frozenset(cid.keys()) == keyset
1348 assert frozenset(cid) == keyset
1349
1350 def test_preserve_last_key_case(self):
1351 cid = CaseInsensitiveDict({
1352 'Accept': 'application/json',
1353 'user-Agent': 'requests',
1354 })
1355 cid.update({'ACCEPT': 'application/json'})
1356 cid['USER-AGENT'] = 'requests'
1357 keyset = frozenset(['ACCEPT', 'USER-AGENT'])
1358 assert frozenset(i[0] for i in cid.items()) == keyset
1359 assert frozenset(cid.keys()) == keyset
1360 assert frozenset(cid) == keyset
1361
1362 def test_copy(self):
1363 cid = CaseInsensitiveDict({
1364 'Accept': 'application/json',
1365 'user-Agent': 'requests',
1366 })
1367 cid_copy = cid.copy()
1368 assert cid == cid_copy
1369 cid['changed'] = True
1370 assert cid != cid_copy
1371
1372
1373 class TestMorselToCookieExpires:
1374 """Tests for morsel_to_cookie when morsel contains expires."""
1375
1376 def test_expires_valid_str(self):
1377 """Test case where we convert expires from string time."""
1378
1379 morsel = Morsel()
1380 morsel['expires'] = 'Thu, 01-Jan-1970 00:00:01 GMT'
1381 cookie = morsel_to_cookie(morsel)
1382 assert cookie.expires == 1
1383
1384 @pytest.mark.parametrize(
1385 'value, exception', (
1386 (100, TypeError),
1387 ('woops', ValueError),
1388 ))
1389 def test_expires_invalid_int(self, value, exception):
1390 """Test case where an invalid type is passed for expires."""
1391 morsel = Morsel()
1392 morsel['expires'] = value
1393 with pytest.raises(exception):
1394 morsel_to_cookie(morsel)
1395
1396 def test_expires_none(self):
1397 """Test case where expires is None."""
1398
1399 morsel = Morsel()
1400 morsel['expires'] = None
1401 cookie = morsel_to_cookie(morsel)
1402 assert cookie.expires is None
1403
1404
1405 class TestMorselToCookieMaxAge:
1406
1407 """Tests for morsel_to_cookie when morsel contains max-age."""
1408
1409 def test_max_age_valid_int(self):
1410 """Test case where a valid max age in seconds is passed."""
1411
1412 morsel = Morsel()
1413 morsel['max-age'] = 60
1414 cookie = morsel_to_cookie(morsel)
1415 assert isinstance(cookie.expires, int)
1416
1417 def test_max_age_invalid_str(self):
1418 """Test case where a invalid max age is passed."""
1419
1420 morsel = Morsel()
1421 morsel['max-age'] = 'woops'
1422 with pytest.raises(TypeError):
1423 morsel_to_cookie(morsel)
1424
1425
1426 class TestTimeout:
1427
1428 def test_stream_timeout(self, httpbin):
1429 try:
1430 requests.get(httpbin('delay/10'), timeout=2.0)
1431 except requests.exceptions.Timeout as e:
1432 assert 'Read timed out' in e.args[0].args[0]
1433
1434 @pytest.mark.parametrize(
1435 'timeout, error_text', (
1436 ((3, 4, 5), '(connect, read)'),
1437 ('foo', 'must be an int or float'),
1438 ))
1439 def test_invalid_timeout(self, httpbin, timeout, error_text):
1440 with pytest.raises(ValueError) as e:
1441 requests.get(httpbin('get'), timeout=timeout)
1442 assert error_text in str(e)
1443
1444 def test_none_timeout(self, httpbin):
1445 """ Check that you can set None as a valid timeout value.
1446
1447 To actually test this behavior, we'd want to check that setting the
1448 timeout to None actually lets the request block past the system default
1449 timeout. However, this would make the test suite unbearably slow.
1450 Instead we verify that setting the timeout to None does not prevent the
1451 request from succeeding.
1452 """
1453 r = requests.get(httpbin('get'), timeout=None)
1454 assert r.status_code == 200
1455
1456 def test_read_timeout(self, httpbin):
1457 try:
1458 requests.get(httpbin('delay/10'), timeout=(None, 0.1))
1459 pytest.fail('The recv() request should time out.')
1460 except ReadTimeout:
1461 pass
1462
1463 def test_connect_timeout(self):
1464 try:
1465 requests.get(TARPIT, timeout=(0.1, None))
1466 pytest.fail('The connect() request should time out.')
1467 except ConnectTimeout as e:
1468 assert isinstance(e, ConnectionError)
1469 assert isinstance(e, Timeout)
1470
1471 def test_total_timeout_connect(self):
1472 try:
1473 requests.get(TARPIT, timeout=(0.1, 0.1))
1474 pytest.fail('The connect() request should time out.')
1475 except ConnectTimeout:
1476 pass
1477
1478 def test_encoded_methods(self, httpbin):
1479 """See: https://github.com/kennethreitz/requests/issues/2316"""
1480 r = requests.request(b'GET', httpbin('get'))
1481 assert r.ok
1482
1483
1484 SendCall = collections.namedtuple('SendCall', ('args', 'kwargs'))
1485
1486
1487 class RedirectSession(SessionRedirectMixin):
1488 def __init__(self, order_of_redirects):
1489 self.redirects = order_of_redirects
1490 self.calls = []
1491 self.max_redirects = 30
1492 self.cookies = {}
1493 self.trust_env = False
1494
1495 def send(self, *args, **kwargs):
1496 self.calls.append(SendCall(args, kwargs))
1497 return self.build_response()
1498
1499 def build_response(self):
1500 request = self.calls[-1].args[0]
1501 r = requests.Response()
1502
1503 try:
1504 r.status_code = int(self.redirects.pop(0))
1505 except IndexError:
1506 r.status_code = 200
1507
1508 r.headers = CaseInsensitiveDict({'Location': '/'})
1509 r.raw = self._build_raw()
1510 r.request = request
1511 return r
1512
1513 def _build_raw(self):
1514 string = StringIO.StringIO('')
1515 setattr(string, 'release_conn', lambda *args: args)
1516 return string
1517
1518
1519 def test_requests_are_updated_each_time(httpbin):
1520 session = RedirectSession([303, 307])
1521 prep = requests.Request('POST', httpbin('post')).prepare()
1522 r0 = session.send(prep)
1523 assert r0.request.method == 'POST'
1524 assert session.calls[-1] == SendCall((r0.request,), {})
1525 redirect_generator = session.resolve_redirects(r0, prep)
1526 default_keyword_args = {
1527 'stream': False,
1528 'verify': True,
1529 'cert': None,
1530 'timeout': None,
1531 'allow_redirects': False,
1532 'proxies': {},
1533 }
1534 for response in redirect_generator:
1535 assert response.request.method == 'GET'
1536 send_call = SendCall((response.request,), default_keyword_args)
1537 assert session.calls[-1] == send_call
1538
1539
1540 @pytest.mark.parametrize(
1541 'data', (
1542 (('a', 'b'), ('c', 'd')),
1543 (('c', 'd'), ('a', 'b')),
1544 (('a', 'b'), ('c', 'd'), ('e', 'f')),
1545 ))
1546 def test_data_argument_accepts_tuples(data):
1547 """Ensure that the data argument will accept tuples of strings
1548 and properly encode them.
1549 """
1550 p = PreparedRequest()
1551 p.prepare(
1552 method='GET',
1553 url='http://www.example.com',
1554 data=data,
1555 hooks=default_hooks()
1556 )
1557 assert p.body == urlencode(data)
1558
1559
1560 @pytest.mark.parametrize(
1561 'kwargs', (
1562 None,
1563 {
1564 'method': 'GET',
1565 'url': 'http://www.example.com',
1566 'data': 'foo=bar',
1567 'hooks': default_hooks()
1568 },
1569 {
1570 'method': 'GET',
1571 'url': 'http://www.example.com',
1572 'data': 'foo=bar',
1573 'hooks': default_hooks(),
1574 'cookies': {'foo': 'bar'}
1575 },
1576 {
1577 'method': 'GET',
1578 'url': u('http://www.example.com/üniçø∂é')
1579 },
1580 ))
1581 def test_prepared_copy(kwargs):
1582 p = PreparedRequest()
1583 if kwargs:
1584 p.prepare(**kwargs)
1585 copy = p.copy()
1586 for attr in ('method', 'url', 'headers', '_cookies', 'body', 'hooks'):
1587 assert getattr(p, attr) == getattr(copy, attr)
1588
1589
1590 def test_urllib3_retries(httpbin):
1591 from requests.packages.urllib3.util import Retry
1592 s = requests.Session()
1593 s.mount('http://', HTTPAdapter(max_retries=Retry(
1594 total=2, status_forcelist=[500]
1595 )))
1596
1597 with pytest.raises(RetryError):
1598 s.get(httpbin('status/500'))
1599
1600
1601 def test_urllib3_pool_connection_closed(httpbin):
1602 s = requests.Session()
1603 s.mount('http://', HTTPAdapter(pool_connections=0, pool_maxsize=0))
1604
1605 try:
1606 s.get(httpbin('status/200'))
1607 except ConnectionError as e:
1608 assert u"Pool is closed." in str(e)
1609
1610
1611 def test_vendor_aliases():
1612 from requests.packages import urllib3
1613 from requests.packages import chardet
1614
1615 with pytest.raises(ImportError):
1616 from requests.packages import webbrowser
OLDNEW
« no previous file with comments | « recipe_engine/third_party/requests/tests/test_lowlevel.py ('k') | recipe_engine/third_party/requests/tests/test_structures.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698