| OLD | NEW |
| (Empty) |
| 1 #!/usr/bin/env python | |
| 2 # -*- coding: utf-8 -*- | |
| 3 | |
| 4 """Tests for Requests.""" | |
| 5 | |
| 6 from __future__ import division | |
| 7 import json | |
| 8 import os | |
| 9 import pickle | |
| 10 import collections | |
| 11 import contextlib | |
| 12 | |
| 13 import io | |
| 14 import requests | |
| 15 import pytest | |
| 16 from requests.adapters import HTTPAdapter | |
| 17 from requests.auth import HTTPDigestAuth, _basic_auth_str | |
| 18 from requests.compat import ( | |
| 19 Morsel, cookielib, getproxies, str, urlparse, | |
| 20 builtin_str, OrderedDict) | |
| 21 from requests.cookies import cookiejar_from_dict, morsel_to_cookie | |
| 22 from requests.exceptions import ( | |
| 23 ConnectionError, ConnectTimeout, InvalidSchema, InvalidURL, | |
| 24 MissingSchema, ReadTimeout, Timeout, RetryError, TooManyRedirects, | |
| 25 ProxyError) | |
| 26 from requests.models import PreparedRequest | |
| 27 from requests.structures import CaseInsensitiveDict | |
| 28 from requests.sessions import SessionRedirectMixin | |
| 29 from requests.models import urlencode | |
| 30 from requests.hooks import default_hooks | |
| 31 | |
| 32 from .compat import StringIO, u | |
| 33 | |
| 34 # Requests to this URL should always fail with a connection timeout (nothing | |
| 35 # listening on that port) | |
| 36 TARPIT = 'http://10.255.255.1' | |
| 37 | |
| 38 | |
| 39 class TestRequests: | |
| 40 | |
| 41 def test_entry_points(self): | |
| 42 | |
| 43 requests.session | |
| 44 requests.session().get | |
| 45 requests.session().head | |
| 46 requests.get | |
| 47 requests.head | |
| 48 requests.put | |
| 49 requests.patch | |
| 50 requests.post | |
| 51 | |
| 52 @pytest.mark.parametrize( | |
| 53 'exception, url', ( | |
| 54 (MissingSchema, 'hiwpefhipowhefopw'), | |
| 55 (InvalidSchema, 'localhost:3128'), | |
| 56 (InvalidSchema, 'localhost.localdomain:3128/'), | |
| 57 (InvalidSchema, '10.122.1.1:3128/'), | |
| 58 (InvalidURL, 'http://'), | |
| 59 )) | |
| 60 def test_invalid_url(self, exception, url): | |
| 61 with pytest.raises(exception): | |
| 62 requests.get(url) | |
| 63 | |
| 64 def test_basic_building(self): | |
| 65 req = requests.Request() | |
| 66 req.url = 'http://kennethreitz.org/' | |
| 67 req.data = {'life': '42'} | |
| 68 | |
| 69 pr = req.prepare() | |
| 70 assert pr.url == req.url | |
| 71 assert pr.body == 'life=42' | |
| 72 | |
| 73 @pytest.mark.parametrize('method', ('GET', 'HEAD')) | |
| 74 def test_no_content_length(self, httpbin, method): | |
| 75 req = requests.Request(method, httpbin(method.lower())).prepare() | |
| 76 assert 'Content-Length' not in req.headers | |
| 77 | |
| 78 def test_override_content_length(self, httpbin): | |
| 79 headers = { | |
| 80 'Content-Length': 'not zero' | |
| 81 } | |
| 82 r = requests.Request('POST', httpbin('post'), headers=headers).prepare() | |
| 83 assert 'Content-Length' in r.headers | |
| 84 assert r.headers['Content-Length'] == 'not zero' | |
| 85 | |
| 86 def test_path_is_not_double_encoded(self): | |
| 87 request = requests.Request('GET', "http://0.0.0.0/get/test case").prepar
e() | |
| 88 | |
| 89 assert request.path_url == '/get/test%20case' | |
| 90 | |
| 91 @pytest.mark.parametrize( | |
| 92 'url, expected', ( | |
| 93 ('http://example.com/path#fragment', 'http://example.com/path?a=b#fr
agment'), | |
| 94 ('http://example.com/path?key=value#fragment', 'http://example.com/p
ath?key=value&a=b#fragment') | |
| 95 )) | |
| 96 def test_params_are_added_before_fragment(self, url, expected): | |
| 97 request = requests.Request('GET', url, params={"a": "b"}).prepare() | |
| 98 assert request.url == expected | |
| 99 | |
| 100 def test_params_original_order_is_preserved_by_default(self): | |
| 101 param_ordered_dict = OrderedDict((('z', 1), ('a', 1), ('k', 1), ('d', 1)
)) | |
| 102 session = requests.Session() | |
| 103 request = requests.Request('GET', 'http://example.com/', params=param_or
dered_dict) | |
| 104 prep = session.prepare_request(request) | |
| 105 assert prep.url == 'http://example.com/?z=1&a=1&k=1&d=1' | |
| 106 | |
| 107 def test_params_bytes_are_encoded(self): | |
| 108 request = requests.Request('GET', 'http://example.com', | |
| 109 params=b'test=foo').prepare() | |
| 110 assert request.url == 'http://example.com/?test=foo' | |
| 111 | |
| 112 def test_binary_put(self): | |
| 113 request = requests.Request('PUT', 'http://example.com', | |
| 114 data=u"ööö".encode("utf-8")).prepare() | |
| 115 assert isinstance(request.body, bytes) | |
| 116 | |
| 117 @pytest.mark.parametrize('scheme', ('http://', 'HTTP://', 'hTTp://', 'HttP:/
/')) | |
| 118 def test_mixed_case_scheme_acceptable(self, httpbin, scheme): | |
| 119 s = requests.Session() | |
| 120 s.proxies = getproxies() | |
| 121 parts = urlparse(httpbin('get')) | |
| 122 url = scheme + parts.netloc + parts.path | |
| 123 r = requests.Request('GET', url) | |
| 124 r = s.send(r.prepare()) | |
| 125 assert r.status_code == 200, 'failed for scheme {0}'.format(scheme) | |
| 126 | |
| 127 def test_HTTP_200_OK_GET_ALTERNATIVE(self, httpbin): | |
| 128 r = requests.Request('GET', httpbin('get')) | |
| 129 s = requests.Session() | |
| 130 s.proxies = getproxies() | |
| 131 | |
| 132 r = s.send(r.prepare()) | |
| 133 | |
| 134 assert r.status_code == 200 | |
| 135 | |
| 136 def test_HTTP_302_ALLOW_REDIRECT_GET(self, httpbin): | |
| 137 r = requests.get(httpbin('redirect', '1')) | |
| 138 assert r.status_code == 200 | |
| 139 assert r.history[0].status_code == 302 | |
| 140 assert r.history[0].is_redirect | |
| 141 | |
| 142 def test_HTTP_302_TOO_MANY_REDIRECTS(self, httpbin): | |
| 143 try: | |
| 144 requests.get(httpbin('relative-redirect', '50')) | |
| 145 except TooManyRedirects as e: | |
| 146 url = httpbin('relative-redirect', '20') | |
| 147 assert e.request.url == url | |
| 148 assert e.response.url == url | |
| 149 assert len(e.response.history) == 30 | |
| 150 else: | |
| 151 pytest.fail('Expected redirect to raise TooManyRedirects but it did
not') | |
| 152 | |
| 153 def test_HTTP_302_TOO_MANY_REDIRECTS_WITH_PARAMS(self, httpbin): | |
| 154 s = requests.session() | |
| 155 s.max_redirects = 5 | |
| 156 try: | |
| 157 s.get(httpbin('relative-redirect', '50')) | |
| 158 except TooManyRedirects as e: | |
| 159 url = httpbin('relative-redirect', '45') | |
| 160 assert e.request.url == url | |
| 161 assert e.response.url == url | |
| 162 assert len(e.response.history) == 5 | |
| 163 else: | |
| 164 pytest.fail('Expected custom max number of redirects to be respected
but was not') | |
| 165 | |
| 166 def test_http_301_changes_post_to_get(self, httpbin): | |
| 167 r = requests.post(httpbin('status', '301')) | |
| 168 assert r.status_code == 200 | |
| 169 assert r.request.method == 'GET' | |
| 170 assert r.history[0].status_code == 301 | |
| 171 assert r.history[0].is_redirect | |
| 172 | |
| 173 def test_http_301_doesnt_change_head_to_get(self, httpbin): | |
| 174 r = requests.head(httpbin('status', '301'), allow_redirects=True) | |
| 175 print(r.content) | |
| 176 assert r.status_code == 200 | |
| 177 assert r.request.method == 'HEAD' | |
| 178 assert r.history[0].status_code == 301 | |
| 179 assert r.history[0].is_redirect | |
| 180 | |
| 181 def test_http_302_changes_post_to_get(self, httpbin): | |
| 182 r = requests.post(httpbin('status', '302')) | |
| 183 assert r.status_code == 200 | |
| 184 assert r.request.method == 'GET' | |
| 185 assert r.history[0].status_code == 302 | |
| 186 assert r.history[0].is_redirect | |
| 187 | |
| 188 def test_http_302_doesnt_change_head_to_get(self, httpbin): | |
| 189 r = requests.head(httpbin('status', '302'), allow_redirects=True) | |
| 190 assert r.status_code == 200 | |
| 191 assert r.request.method == 'HEAD' | |
| 192 assert r.history[0].status_code == 302 | |
| 193 assert r.history[0].is_redirect | |
| 194 | |
| 195 def test_http_303_changes_post_to_get(self, httpbin): | |
| 196 r = requests.post(httpbin('status', '303')) | |
| 197 assert r.status_code == 200 | |
| 198 assert r.request.method == 'GET' | |
| 199 assert r.history[0].status_code == 303 | |
| 200 assert r.history[0].is_redirect | |
| 201 | |
| 202 def test_http_303_doesnt_change_head_to_get(self, httpbin): | |
| 203 r = requests.head(httpbin('status', '303'), allow_redirects=True) | |
| 204 assert r.status_code == 200 | |
| 205 assert r.request.method == 'HEAD' | |
| 206 assert r.history[0].status_code == 303 | |
| 207 assert r.history[0].is_redirect | |
| 208 | |
| 209 # def test_HTTP_302_ALLOW_REDIRECT_POST(self): | |
| 210 # r = requests.post(httpbin('status', '302'), data={'some': 'data'}) | |
| 211 # self.assertEqual(r.status_code, 200) | |
| 212 | |
| 213 def test_HTTP_200_OK_GET_WITH_PARAMS(self, httpbin): | |
| 214 heads = {'User-agent': 'Mozilla/5.0'} | |
| 215 | |
| 216 r = requests.get(httpbin('user-agent'), headers=heads) | |
| 217 | |
| 218 assert heads['User-agent'] in r.text | |
| 219 assert r.status_code == 200 | |
| 220 | |
| 221 def test_HTTP_200_OK_GET_WITH_MIXED_PARAMS(self, httpbin): | |
| 222 heads = {'User-agent': 'Mozilla/5.0'} | |
| 223 | |
| 224 r = requests.get(httpbin('get') + '?test=true', params={'q': 'test'}, he
aders=heads) | |
| 225 assert r.status_code == 200 | |
| 226 | |
| 227 def test_set_cookie_on_301(self, httpbin): | |
| 228 s = requests.session() | |
| 229 url = httpbin('cookies/set?foo=bar') | |
| 230 s.get(url) | |
| 231 assert s.cookies['foo'] == 'bar' | |
| 232 | |
| 233 def test_cookie_sent_on_redirect(self, httpbin): | |
| 234 s = requests.session() | |
| 235 s.get(httpbin('cookies/set?foo=bar')) | |
| 236 r = s.get(httpbin('redirect/1')) # redirects to httpbin('get') | |
| 237 assert 'Cookie' in r.json()['headers'] | |
| 238 | |
| 239 def test_cookie_removed_on_expire(self, httpbin): | |
| 240 s = requests.session() | |
| 241 s.get(httpbin('cookies/set?foo=bar')) | |
| 242 assert s.cookies['foo'] == 'bar' | |
| 243 s.get( | |
| 244 httpbin('response-headers'), | |
| 245 params={ | |
| 246 'Set-Cookie': | |
| 247 'foo=deleted; expires=Thu, 01-Jan-1970 00:00:01 GMT' | |
| 248 } | |
| 249 ) | |
| 250 assert 'foo' not in s.cookies | |
| 251 | |
| 252 def test_cookie_quote_wrapped(self, httpbin): | |
| 253 s = requests.session() | |
| 254 s.get(httpbin('cookies/set?foo="bar:baz"')) | |
| 255 assert s.cookies['foo'] == '"bar:baz"' | |
| 256 | |
| 257 def test_cookie_persists_via_api(self, httpbin): | |
| 258 s = requests.session() | |
| 259 r = s.get(httpbin('redirect/1'), cookies={'foo': 'bar'}) | |
| 260 assert 'foo' in r.request.headers['Cookie'] | |
| 261 assert 'foo' in r.history[0].request.headers['Cookie'] | |
| 262 | |
| 263 def test_request_cookie_overrides_session_cookie(self, httpbin): | |
| 264 s = requests.session() | |
| 265 s.cookies['foo'] = 'bar' | |
| 266 r = s.get(httpbin('cookies'), cookies={'foo': 'baz'}) | |
| 267 assert r.json()['cookies']['foo'] == 'baz' | |
| 268 # Session cookie should not be modified | |
| 269 assert s.cookies['foo'] == 'bar' | |
| 270 | |
| 271 def test_request_cookies_not_persisted(self, httpbin): | |
| 272 s = requests.session() | |
| 273 s.get(httpbin('cookies'), cookies={'foo': 'baz'}) | |
| 274 # Sending a request with cookies should not add cookies to the session | |
| 275 assert not s.cookies | |
| 276 | |
| 277 def test_generic_cookiejar_works(self, httpbin): | |
| 278 cj = cookielib.CookieJar() | |
| 279 cookiejar_from_dict({'foo': 'bar'}, cj) | |
| 280 s = requests.session() | |
| 281 s.cookies = cj | |
| 282 r = s.get(httpbin('cookies')) | |
| 283 # Make sure the cookie was sent | |
| 284 assert r.json()['cookies']['foo'] == 'bar' | |
| 285 # Make sure the session cj is still the custom one | |
| 286 assert s.cookies is cj | |
| 287 | |
| 288 def test_param_cookiejar_works(self, httpbin): | |
| 289 cj = cookielib.CookieJar() | |
| 290 cookiejar_from_dict({'foo': 'bar'}, cj) | |
| 291 s = requests.session() | |
| 292 r = s.get(httpbin('cookies'), cookies=cj) | |
| 293 # Make sure the cookie was sent | |
| 294 assert r.json()['cookies']['foo'] == 'bar' | |
| 295 | |
| 296 def test_requests_in_history_are_not_overridden(self, httpbin): | |
| 297 resp = requests.get(httpbin('redirect/3')) | |
| 298 urls = [r.url for r in resp.history] | |
| 299 req_urls = [r.request.url for r in resp.history] | |
| 300 assert urls == req_urls | |
| 301 | |
| 302 def test_history_is_always_a_list(self, httpbin): | |
| 303 """Show that even with redirects, Response.history is always a list.""" | |
| 304 resp = requests.get(httpbin('get')) | |
| 305 assert isinstance(resp.history, list) | |
| 306 resp = requests.get(httpbin('redirect/1')) | |
| 307 assert isinstance(resp.history, list) | |
| 308 assert not isinstance(resp.history, tuple) | |
| 309 | |
| 310 def test_headers_on_session_with_None_are_not_sent(self, httpbin): | |
| 311 """Do not send headers in Session.headers with None values.""" | |
| 312 ses = requests.Session() | |
| 313 ses.headers['Accept-Encoding'] = None | |
| 314 req = requests.Request('GET', httpbin('get')) | |
| 315 prep = ses.prepare_request(req) | |
| 316 assert 'Accept-Encoding' not in prep.headers | |
| 317 | |
| 318 def test_headers_preserve_order(self, httpbin): | |
| 319 """Preserve order when headers provided as OrderedDict.""" | |
| 320 ses = requests.Session() | |
| 321 ses.headers = OrderedDict() | |
| 322 ses.headers['Accept-Encoding'] = 'identity' | |
| 323 ses.headers['First'] = '1' | |
| 324 ses.headers['Second'] = '2' | |
| 325 headers = OrderedDict([('Third', '3'), ('Fourth', '4')]) | |
| 326 headers['Fifth'] = '5' | |
| 327 headers['Second'] = '222' | |
| 328 req = requests.Request('GET', httpbin('get'), headers=headers) | |
| 329 prep = ses.prepare_request(req) | |
| 330 items = list(prep.headers.items()) | |
| 331 assert items[0] == ('Accept-Encoding', 'identity') | |
| 332 assert items[1] == ('First', '1') | |
| 333 assert items[2] == ('Second', '222') | |
| 334 assert items[3] == ('Third', '3') | |
| 335 assert items[4] == ('Fourth', '4') | |
| 336 assert items[5] == ('Fifth', '5') | |
| 337 | |
| 338 @pytest.mark.parametrize('key', ('User-agent', 'user-agent')) | |
| 339 def test_user_agent_transfers(self, httpbin, key): | |
| 340 | |
| 341 heads = {key: 'Mozilla/5.0 (github.com/kennethreitz/requests)'} | |
| 342 | |
| 343 r = requests.get(httpbin('user-agent'), headers=heads) | |
| 344 assert heads[key] in r.text | |
| 345 | |
| 346 def test_HTTP_200_OK_HEAD(self, httpbin): | |
| 347 r = requests.head(httpbin('get')) | |
| 348 assert r.status_code == 200 | |
| 349 | |
| 350 def test_HTTP_200_OK_PUT(self, httpbin): | |
| 351 r = requests.put(httpbin('put')) | |
| 352 assert r.status_code == 200 | |
| 353 | |
| 354 def test_BASICAUTH_TUPLE_HTTP_200_OK_GET(self, httpbin): | |
| 355 auth = ('user', 'pass') | |
| 356 url = httpbin('basic-auth', 'user', 'pass') | |
| 357 | |
| 358 r = requests.get(url, auth=auth) | |
| 359 assert r.status_code == 200 | |
| 360 | |
| 361 r = requests.get(url) | |
| 362 assert r.status_code == 401 | |
| 363 | |
| 364 s = requests.session() | |
| 365 s.auth = auth | |
| 366 r = s.get(url) | |
| 367 assert r.status_code == 200 | |
| 368 | |
| 369 @pytest.mark.parametrize( | |
| 370 'url, exception', ( | |
| 371 # Connecting to an unknown domain should raise a ConnectionError | |
| 372 ('http://doesnotexist.google.com', ConnectionError), | |
| 373 # Connecting to an invalid port should raise a ConnectionError | |
| 374 ('http://localhost:1', ConnectionError), | |
| 375 # Inputing a URL that cannot be parsed should raise an InvalidURL er
ror | |
| 376 ('http://fe80::5054:ff:fe5a:fc0', InvalidURL) | |
| 377 )) | |
| 378 def test_errors(self, url, exception): | |
| 379 with pytest.raises(exception): | |
| 380 requests.get(url, timeout=1) | |
| 381 | |
| 382 def test_proxy_error(self): | |
| 383 # any proxy related error (address resolution, no route to host, etc) sh
ould result in a ProxyError | |
| 384 with pytest.raises(ProxyError): | |
| 385 requests.get('http://localhost:1', proxies={'http': 'non-resolvable-
address'}) | |
| 386 | |
| 387 def test_basicauth_with_netrc(self, httpbin): | |
| 388 auth = ('user', 'pass') | |
| 389 wrong_auth = ('wronguser', 'wrongpass') | |
| 390 url = httpbin('basic-auth', 'user', 'pass') | |
| 391 | |
| 392 old_auth = requests.sessions.get_netrc_auth | |
| 393 | |
| 394 try: | |
| 395 def get_netrc_auth_mock(url): | |
| 396 return auth | |
| 397 requests.sessions.get_netrc_auth = get_netrc_auth_mock | |
| 398 | |
| 399 # Should use netrc and work. | |
| 400 r = requests.get(url) | |
| 401 assert r.status_code == 200 | |
| 402 | |
| 403 # Given auth should override and fail. | |
| 404 r = requests.get(url, auth=wrong_auth) | |
| 405 assert r.status_code == 401 | |
| 406 | |
| 407 s = requests.session() | |
| 408 | |
| 409 # Should use netrc and work. | |
| 410 r = s.get(url) | |
| 411 assert r.status_code == 200 | |
| 412 | |
| 413 # Given auth should override and fail. | |
| 414 s.auth = wrong_auth | |
| 415 r = s.get(url) | |
| 416 assert r.status_code == 401 | |
| 417 finally: | |
| 418 requests.sessions.get_netrc_auth = old_auth | |
| 419 | |
| 420 def test_DIGEST_HTTP_200_OK_GET(self, httpbin): | |
| 421 | |
| 422 auth = HTTPDigestAuth('user', 'pass') | |
| 423 url = httpbin('digest-auth', 'auth', 'user', 'pass') | |
| 424 | |
| 425 r = requests.get(url, auth=auth) | |
| 426 assert r.status_code == 200 | |
| 427 | |
| 428 r = requests.get(url) | |
| 429 assert r.status_code == 401 | |
| 430 | |
| 431 s = requests.session() | |
| 432 s.auth = HTTPDigestAuth('user', 'pass') | |
| 433 r = s.get(url) | |
| 434 assert r.status_code == 200 | |
| 435 | |
| 436 def test_DIGEST_AUTH_RETURNS_COOKIE(self, httpbin): | |
| 437 url = httpbin('digest-auth', 'auth', 'user', 'pass') | |
| 438 auth = HTTPDigestAuth('user', 'pass') | |
| 439 r = requests.get(url) | |
| 440 assert r.cookies['fake'] == 'fake_value' | |
| 441 | |
| 442 r = requests.get(url, auth=auth) | |
| 443 assert r.status_code == 200 | |
| 444 | |
| 445 def test_DIGEST_AUTH_SETS_SESSION_COOKIES(self, httpbin): | |
| 446 url = httpbin('digest-auth', 'auth', 'user', 'pass') | |
| 447 auth = HTTPDigestAuth('user', 'pass') | |
| 448 s = requests.Session() | |
| 449 s.get(url, auth=auth) | |
| 450 assert s.cookies['fake'] == 'fake_value' | |
| 451 | |
| 452 def test_DIGEST_STREAM(self, httpbin): | |
| 453 | |
| 454 auth = HTTPDigestAuth('user', 'pass') | |
| 455 url = httpbin('digest-auth', 'auth', 'user', 'pass') | |
| 456 | |
| 457 r = requests.get(url, auth=auth, stream=True) | |
| 458 assert r.raw.read() != b'' | |
| 459 | |
| 460 r = requests.get(url, auth=auth, stream=False) | |
| 461 assert r.raw.read() == b'' | |
| 462 | |
| 463 def test_DIGESTAUTH_WRONG_HTTP_401_GET(self, httpbin): | |
| 464 | |
| 465 auth = HTTPDigestAuth('user', 'wrongpass') | |
| 466 url = httpbin('digest-auth', 'auth', 'user', 'pass') | |
| 467 | |
| 468 r = requests.get(url, auth=auth) | |
| 469 assert r.status_code == 401 | |
| 470 | |
| 471 r = requests.get(url) | |
| 472 assert r.status_code == 401 | |
| 473 | |
| 474 s = requests.session() | |
| 475 s.auth = auth | |
| 476 r = s.get(url) | |
| 477 assert r.status_code == 401 | |
| 478 | |
| 479 def test_DIGESTAUTH_QUOTES_QOP_VALUE(self, httpbin): | |
| 480 | |
| 481 auth = HTTPDigestAuth('user', 'pass') | |
| 482 url = httpbin('digest-auth', 'auth', 'user', 'pass') | |
| 483 | |
| 484 r = requests.get(url, auth=auth) | |
| 485 assert '"auth"' in r.request.headers['Authorization'] | |
| 486 | |
| 487 def test_POSTBIN_GET_POST_FILES(self, httpbin): | |
| 488 | |
| 489 url = httpbin('post') | |
| 490 requests.post(url).raise_for_status() | |
| 491 | |
| 492 post1 = requests.post(url, data={'some': 'data'}) | |
| 493 assert post1.status_code == 200 | |
| 494 | |
| 495 with open('requirements.txt') as f: | |
| 496 post2 = requests.post(url, files={'some': f}) | |
| 497 assert post2.status_code == 200 | |
| 498 | |
| 499 post4 = requests.post(url, data='[{"some": "json"}]') | |
| 500 assert post4.status_code == 200 | |
| 501 | |
| 502 with pytest.raises(ValueError): | |
| 503 requests.post(url, files=['bad file data']) | |
| 504 | |
| 505 def test_POSTBIN_SEEKED_OBJECT_WITH_NO_ITER(self, httpbin): | |
| 506 | |
| 507 class TestStream(object): | |
| 508 def __init__(self, data): | |
| 509 self.data = data.encode() | |
| 510 self.length = len(self.data) | |
| 511 self.index = 0 | |
| 512 | |
| 513 def __len__(self): | |
| 514 return self.length | |
| 515 | |
| 516 def read(self, size=None): | |
| 517 if size: | |
| 518 ret = self.data[self.index:self.index + size] | |
| 519 self.index += size | |
| 520 else: | |
| 521 ret = self.data[self.index:] | |
| 522 self.index = self.length | |
| 523 return ret | |
| 524 | |
| 525 def tell(self): | |
| 526 return self.index | |
| 527 | |
| 528 def seek(self, offset, where=0): | |
| 529 if where == 0: | |
| 530 self.index = offset | |
| 531 elif where == 1: | |
| 532 self.index += offset | |
| 533 elif where == 2: | |
| 534 self.index = self.length + offset | |
| 535 | |
| 536 test = TestStream('test') | |
| 537 post1 = requests.post(httpbin('post'), data=test) | |
| 538 assert post1.status_code == 200 | |
| 539 assert post1.json()['data'] == 'test' | |
| 540 | |
| 541 test = TestStream('test') | |
| 542 test.seek(2) | |
| 543 post2 = requests.post(httpbin('post'), data=test) | |
| 544 assert post2.status_code == 200 | |
| 545 assert post2.json()['data'] == 'st' | |
| 546 | |
| 547 def test_POSTBIN_GET_POST_FILES_WITH_DATA(self, httpbin): | |
| 548 | |
| 549 url = httpbin('post') | |
| 550 requests.post(url).raise_for_status() | |
| 551 | |
| 552 post1 = requests.post(url, data={'some': 'data'}) | |
| 553 assert post1.status_code == 200 | |
| 554 | |
| 555 with open('requirements.txt') as f: | |
| 556 post2 = requests.post(url, | |
| 557 data={'some': 'data'}, files={'some': f}) | |
| 558 assert post2.status_code == 200 | |
| 559 | |
| 560 post4 = requests.post(url, data='[{"some": "json"}]') | |
| 561 assert post4.status_code == 200 | |
| 562 | |
| 563 with pytest.raises(ValueError): | |
| 564 requests.post(url, files=['bad file data']) | |
| 565 | |
| 566 def test_conflicting_post_params(self, httpbin): | |
| 567 url = httpbin('post') | |
| 568 with open('requirements.txt') as f: | |
| 569 pytest.raises(ValueError, "requests.post(url, data='[{\"some\": \"da
ta\"}]', files={'some': f})") | |
| 570 pytest.raises(ValueError, "requests.post(url, data=u('[{\"some\": \"
data\"}]'), files={'some': f})") | |
| 571 | |
| 572 def test_request_ok_set(self, httpbin): | |
| 573 r = requests.get(httpbin('status', '404')) | |
| 574 assert not r.ok | |
| 575 | |
| 576 def test_status_raising(self, httpbin): | |
| 577 r = requests.get(httpbin('status', '404')) | |
| 578 with pytest.raises(requests.exceptions.HTTPError): | |
| 579 r.raise_for_status() | |
| 580 | |
| 581 r = requests.get(httpbin('status', '500')) | |
| 582 assert not r.ok | |
| 583 | |
| 584 def test_decompress_gzip(self, httpbin): | |
| 585 r = requests.get(httpbin('gzip')) | |
| 586 r.content.decode('ascii') | |
| 587 | |
| 588 @pytest.mark.parametrize( | |
| 589 'url, params', ( | |
| 590 ('/get', {'foo': 'føø'}), | |
| 591 ('/get', {'føø': 'føø'}), | |
| 592 ('/get', {'føø': 'føø'}), | |
| 593 ('/get', {'foo': 'foo'}), | |
| 594 ('ø', {'foo': 'foo'}), | |
| 595 )) | |
| 596 def test_unicode_get(self, httpbin, url, params): | |
| 597 requests.get(httpbin(url), params=params) | |
| 598 | |
| 599 def test_unicode_header_name(self, httpbin): | |
| 600 requests.put( | |
| 601 httpbin('put'), | |
| 602 headers={str('Content-Type'): 'application/octet-stream'}, | |
| 603 data='\xff') # compat.str is unicode. | |
| 604 | |
| 605 def test_pyopenssl_redirect(self, httpbin_secure, httpbin_ca_bundle): | |
| 606 requests.get(httpbin_secure('status', '301'), verify=httpbin_ca_bundle) | |
| 607 | |
| 608 def test_urlencoded_get_query_multivalued_param(self, httpbin): | |
| 609 | |
| 610 r = requests.get(httpbin('get'), params=dict(test=['foo', 'baz'])) | |
| 611 assert r.status_code == 200 | |
| 612 assert r.url == httpbin('get?test=foo&test=baz') | |
| 613 | |
| 614 def test_different_encodings_dont_break_post(self, httpbin): | |
| 615 r = requests.post(httpbin('post'), | |
| 616 data={'stuff': json.dumps({'a': 123})}, | |
| 617 params={'blah': 'asdf1234'}, | |
| 618 files={'file': ('test_requests.py', open(__file__, 'rb'))}) | |
| 619 assert r.status_code == 200 | |
| 620 | |
| 621 @pytest.mark.parametrize( | |
| 622 'data', ( | |
| 623 {'stuff': u('ëlïxr')}, | |
| 624 {'stuff': u('ëlïxr').encode('utf-8')}, | |
| 625 {'stuff': 'elixr'}, | |
| 626 {'stuff': 'elixr'.encode('utf-8')}, | |
| 627 )) | |
| 628 def test_unicode_multipart_post(self, httpbin, data): | |
| 629 r = requests.post(httpbin('post'), | |
| 630 data=data, | |
| 631 files={'file': ('test_requests.py', open(__file__, 'rb'))}) | |
| 632 assert r.status_code == 200 | |
| 633 | |
| 634 def test_unicode_multipart_post_fieldnames(self, httpbin): | |
| 635 filename = os.path.splitext(__file__)[0] + '.py' | |
| 636 r = requests.Request( | |
| 637 method='POST', url=httpbin('post'), | |
| 638 data={'stuff'.encode('utf-8'): 'elixr'}, | |
| 639 files={'file': ('test_requests.py', open(filename, 'rb'))}) | |
| 640 prep = r.prepare() | |
| 641 assert b'name="stuff"' in prep.body | |
| 642 assert b'name="b\'stuff\'"' not in prep.body | |
| 643 | |
| 644 def test_unicode_method_name(self, httpbin): | |
| 645 files = {'file': open(__file__, 'rb')} | |
| 646 r = requests.request( | |
| 647 method=u('POST'), url=httpbin('post'), files=files) | |
| 648 assert r.status_code == 200 | |
| 649 | |
| 650 def test_unicode_method_name_with_request_object(self, httpbin): | |
| 651 files = {'file': open(__file__, 'rb')} | |
| 652 s = requests.Session() | |
| 653 req = requests.Request(u('POST'), httpbin('post'), files=files) | |
| 654 prep = s.prepare_request(req) | |
| 655 assert isinstance(prep.method, builtin_str) | |
| 656 assert prep.method == 'POST' | |
| 657 | |
| 658 resp = s.send(prep) | |
| 659 assert resp.status_code == 200 | |
| 660 | |
| 661 def test_non_prepared_request_error(self): | |
| 662 s = requests.Session() | |
| 663 req = requests.Request(u('POST'), '/') | |
| 664 | |
| 665 with pytest.raises(ValueError) as e: | |
| 666 s.send(req) | |
| 667 assert str(e.value) == 'You can only send PreparedRequests.' | |
| 668 | |
| 669 def test_custom_content_type(self, httpbin): | |
| 670 r = requests.post( | |
| 671 httpbin('post'), | |
| 672 data={'stuff': json.dumps({'a': 123})}, | |
| 673 files={ | |
| 674 'file1': ('test_requests.py', open(__file__, 'rb')), | |
| 675 'file2': ('test_requests', open(__file__, 'rb'), | |
| 676 'text/py-content-type')}) | |
| 677 assert r.status_code == 200 | |
| 678 assert b"text/py-content-type" in r.request.body | |
| 679 | |
| 680 def test_hook_receives_request_arguments(self, httpbin): | |
| 681 def hook(resp, **kwargs): | |
| 682 assert resp is not None | |
| 683 assert kwargs != {} | |
| 684 | |
| 685 s = requests.Session() | |
| 686 r = requests.Request('GET', httpbin(), hooks={'response': hook}) | |
| 687 prep = s.prepare_request(r) | |
| 688 s.send(prep) | |
| 689 | |
| 690 def test_session_hooks_are_used_with_no_request_hooks(self, httpbin): | |
| 691 hook = lambda x, *args, **kwargs: x | |
| 692 s = requests.Session() | |
| 693 s.hooks['response'].append(hook) | |
| 694 r = requests.Request('GET', httpbin()) | |
| 695 prep = s.prepare_request(r) | |
| 696 assert prep.hooks['response'] != [] | |
| 697 assert prep.hooks['response'] == [hook] | |
| 698 | |
| 699 def test_session_hooks_are_overridden_by_request_hooks(self, httpbin): | |
| 700 hook1 = lambda x, *args, **kwargs: x | |
| 701 hook2 = lambda x, *args, **kwargs: x | |
| 702 assert hook1 is not hook2 | |
| 703 s = requests.Session() | |
| 704 s.hooks['response'].append(hook2) | |
| 705 r = requests.Request('GET', httpbin(), hooks={'response': [hook1]}) | |
| 706 prep = s.prepare_request(r) | |
| 707 assert prep.hooks['response'] == [hook1] | |
| 708 | |
| 709 def test_prepared_request_hook(self, httpbin): | |
| 710 def hook(resp, **kwargs): | |
| 711 resp.hook_working = True | |
| 712 return resp | |
| 713 | |
| 714 req = requests.Request('GET', httpbin(), hooks={'response': hook}) | |
| 715 prep = req.prepare() | |
| 716 | |
| 717 s = requests.Session() | |
| 718 s.proxies = getproxies() | |
| 719 resp = s.send(prep) | |
| 720 | |
| 721 assert hasattr(resp, 'hook_working') | |
| 722 | |
| 723 def test_prepared_from_session(self, httpbin): | |
| 724 class DummyAuth(requests.auth.AuthBase): | |
| 725 def __call__(self, r): | |
| 726 r.headers['Dummy-Auth-Test'] = 'dummy-auth-test-ok' | |
| 727 return r | |
| 728 | |
| 729 req = requests.Request('GET', httpbin('headers')) | |
| 730 assert not req.auth | |
| 731 | |
| 732 s = requests.Session() | |
| 733 s.auth = DummyAuth() | |
| 734 | |
| 735 prep = s.prepare_request(req) | |
| 736 resp = s.send(prep) | |
| 737 | |
| 738 assert resp.json()['headers'][ | |
| 739 'Dummy-Auth-Test'] == 'dummy-auth-test-ok' | |
| 740 | |
| 741 def test_prepare_request_with_bytestring_url(self): | |
| 742 req = requests.Request('GET', b'https://httpbin.org/') | |
| 743 s = requests.Session() | |
| 744 prep = s.prepare_request(req) | |
| 745 assert prep.url == "https://httpbin.org/" | |
| 746 | |
| 747 def test_links(self): | |
| 748 r = requests.Response() | |
| 749 r.headers = { | |
| 750 'cache-control': 'public, max-age=60, s-maxage=60', | |
| 751 'connection': 'keep-alive', | |
| 752 'content-encoding': 'gzip', | |
| 753 'content-type': 'application/json; charset=utf-8', | |
| 754 'date': 'Sat, 26 Jan 2013 16:47:56 GMT', | |
| 755 'etag': '"6ff6a73c0e446c1f61614769e3ceb778"', | |
| 756 'last-modified': 'Sat, 26 Jan 2013 16:22:39 GMT', | |
| 757 'link': ('<https://api.github.com/users/kennethreitz/repos?' | |
| 758 'page=2&per_page=10>; rel="next", <https://api.github.' | |
| 759 'com/users/kennethreitz/repos?page=7&per_page=10>; ' | |
| 760 ' rel="last"'), | |
| 761 'server': 'GitHub.com', | |
| 762 'status': '200 OK', | |
| 763 'vary': 'Accept', | |
| 764 'x-content-type-options': 'nosniff', | |
| 765 'x-github-media-type': 'github.beta', | |
| 766 'x-ratelimit-limit': '60', | |
| 767 'x-ratelimit-remaining': '57' | |
| 768 } | |
| 769 assert r.links['next']['rel'] == 'next' | |
| 770 | |
| 771 def test_cookie_parameters(self): | |
| 772 key = 'some_cookie' | |
| 773 value = 'some_value' | |
| 774 secure = True | |
| 775 domain = 'test.com' | |
| 776 rest = {'HttpOnly': True} | |
| 777 | |
| 778 jar = requests.cookies.RequestsCookieJar() | |
| 779 jar.set(key, value, secure=secure, domain=domain, rest=rest) | |
| 780 | |
| 781 assert len(jar) == 1 | |
| 782 assert 'some_cookie' in jar | |
| 783 | |
| 784 cookie = list(jar)[0] | |
| 785 assert cookie.secure == secure | |
| 786 assert cookie.domain == domain | |
| 787 assert cookie._rest['HttpOnly'] == rest['HttpOnly'] | |
| 788 | |
| 789 def test_cookie_as_dict_keeps_len(self): | |
| 790 key = 'some_cookie' | |
| 791 value = 'some_value' | |
| 792 | |
| 793 key1 = 'some_cookie1' | |
| 794 value1 = 'some_value1' | |
| 795 | |
| 796 jar = requests.cookies.RequestsCookieJar() | |
| 797 jar.set(key, value) | |
| 798 jar.set(key1, value1) | |
| 799 | |
| 800 d1 = dict(jar) | |
| 801 d2 = dict(jar.iteritems()) | |
| 802 d3 = dict(jar.items()) | |
| 803 | |
| 804 assert len(jar) == 2 | |
| 805 assert len(d1) == 2 | |
| 806 assert len(d2) == 2 | |
| 807 assert len(d3) == 2 | |
| 808 | |
| 809 def test_cookie_as_dict_keeps_items(self): | |
| 810 key = 'some_cookie' | |
| 811 value = 'some_value' | |
| 812 | |
| 813 key1 = 'some_cookie1' | |
| 814 value1 = 'some_value1' | |
| 815 | |
| 816 jar = requests.cookies.RequestsCookieJar() | |
| 817 jar.set(key, value) | |
| 818 jar.set(key1, value1) | |
| 819 | |
| 820 d1 = dict(jar) | |
| 821 d2 = dict(jar.iteritems()) | |
| 822 d3 = dict(jar.items()) | |
| 823 | |
| 824 assert d1['some_cookie'] == 'some_value' | |
| 825 assert d2['some_cookie'] == 'some_value' | |
| 826 assert d3['some_cookie1'] == 'some_value1' | |
| 827 | |
| 828 def test_cookie_as_dict_keys(self): | |
| 829 key = 'some_cookie' | |
| 830 value = 'some_value' | |
| 831 | |
| 832 key1 = 'some_cookie1' | |
| 833 value1 = 'some_value1' | |
| 834 | |
| 835 jar = requests.cookies.RequestsCookieJar() | |
| 836 jar.set(key, value) | |
| 837 jar.set(key1, value1) | |
| 838 | |
| 839 keys = jar.keys() | |
| 840 assert keys == list(keys) | |
| 841 # make sure one can use keys multiple times | |
| 842 assert list(keys) == list(keys) | |
| 843 | |
| 844 def test_cookie_as_dict_values(self): | |
| 845 key = 'some_cookie' | |
| 846 value = 'some_value' | |
| 847 | |
| 848 key1 = 'some_cookie1' | |
| 849 value1 = 'some_value1' | |
| 850 | |
| 851 jar = requests.cookies.RequestsCookieJar() | |
| 852 jar.set(key, value) | |
| 853 jar.set(key1, value1) | |
| 854 | |
| 855 values = jar.values() | |
| 856 assert values == list(values) | |
| 857 # make sure one can use values multiple times | |
| 858 assert list(values) == list(values) | |
| 859 | |
| 860 def test_cookie_as_dict_items(self): | |
| 861 key = 'some_cookie' | |
| 862 value = 'some_value' | |
| 863 | |
| 864 key1 = 'some_cookie1' | |
| 865 value1 = 'some_value1' | |
| 866 | |
| 867 jar = requests.cookies.RequestsCookieJar() | |
| 868 jar.set(key, value) | |
| 869 jar.set(key1, value1) | |
| 870 | |
| 871 items = jar.items() | |
| 872 assert items == list(items) | |
| 873 # make sure one can use items multiple times | |
| 874 assert list(items) == list(items) | |
| 875 | |
| 876 def test_cookie_duplicate_names_different_domains(self): | |
| 877 key = 'some_cookie' | |
| 878 value = 'some_value' | |
| 879 domain1 = 'test1.com' | |
| 880 domain2 = 'test2.com' | |
| 881 | |
| 882 jar = requests.cookies.RequestsCookieJar() | |
| 883 jar.set(key, value, domain=domain1) | |
| 884 jar.set(key, value, domain=domain2) | |
| 885 assert key in jar | |
| 886 items = jar.items() | |
| 887 assert len(items) == 2 | |
| 888 | |
| 889 # Verify that CookieConflictError is raised if domain is not specified | |
| 890 with pytest.raises(requests.cookies.CookieConflictError): | |
| 891 jar.get(key) | |
| 892 | |
| 893 # Verify that CookieConflictError is not raised if domain is specified | |
| 894 cookie = jar.get(key, domain=domain1) | |
| 895 assert cookie == value | |
| 896 | |
| 897 def test_cookie_duplicate_names_raises_cookie_conflict_error(self): | |
| 898 key = 'some_cookie' | |
| 899 value = 'some_value' | |
| 900 path = 'some_path' | |
| 901 | |
| 902 jar = requests.cookies.RequestsCookieJar() | |
| 903 jar.set(key, value, path=path) | |
| 904 jar.set(key, value) | |
| 905 with pytest.raises(requests.cookies.CookieConflictError): | |
| 906 jar.get(key) | |
| 907 | |
| 908 def test_time_elapsed_blank(self, httpbin): | |
| 909 r = requests.get(httpbin('get')) | |
| 910 td = r.elapsed | |
| 911 total_seconds = ((td.microseconds + (td.seconds + td.days * 24 * 3600) | |
| 912 * 10**6) / 10**6) | |
| 913 assert total_seconds > 0.0 | |
| 914 | |
| 915 def test_response_is_iterable(self): | |
| 916 r = requests.Response() | |
| 917 io = StringIO.StringIO('abc') | |
| 918 read_ = io.read | |
| 919 | |
| 920 def read_mock(amt, decode_content=None): | |
| 921 return read_(amt) | |
| 922 setattr(io, 'read', read_mock) | |
| 923 r.raw = io | |
| 924 assert next(iter(r)) | |
| 925 io.close() | |
| 926 | |
| 927 def test_response_decode_unicode(self): | |
| 928 """ | |
| 929 When called with decode_unicode, Response.iter_content should always | |
| 930 return unicode. | |
| 931 """ | |
| 932 r = requests.Response() | |
| 933 r._content_consumed = True | |
| 934 r._content = b'the content' | |
| 935 r.encoding = 'ascii' | |
| 936 | |
| 937 chunks = r.iter_content(decode_unicode=True) | |
| 938 assert all(isinstance(chunk, str) for chunk in chunks) | |
| 939 | |
| 940 # also for streaming | |
| 941 r = requests.Response() | |
| 942 r.raw = io.BytesIO(b'the content') | |
| 943 r.encoding = 'ascii' | |
| 944 chunks = r.iter_content(decode_unicode=True) | |
| 945 assert all(isinstance(chunk, str) for chunk in chunks) | |
| 946 | |
| 947 def test_request_and_response_are_pickleable(self, httpbin): | |
| 948 r = requests.get(httpbin('get')) | |
| 949 | |
| 950 # verify we can pickle the original request | |
| 951 assert pickle.loads(pickle.dumps(r.request)) | |
| 952 | |
| 953 # verify we can pickle the response and that we have access to | |
| 954 # the original request. | |
| 955 pr = pickle.loads(pickle.dumps(r)) | |
| 956 assert r.request.url == pr.request.url | |
| 957 assert r.request.headers == pr.request.headers | |
| 958 | |
| 959 def test_cannot_send_unprepared_requests(self, httpbin): | |
| 960 r = requests.Request(url=httpbin()) | |
| 961 with pytest.raises(ValueError): | |
| 962 requests.Session().send(r) | |
| 963 | |
| 964 def test_http_error(self): | |
| 965 error = requests.exceptions.HTTPError() | |
| 966 assert not error.response | |
| 967 response = requests.Response() | |
| 968 error = requests.exceptions.HTTPError(response=response) | |
| 969 assert error.response == response | |
| 970 error = requests.exceptions.HTTPError('message', response=response) | |
| 971 assert str(error) == 'message' | |
| 972 assert error.response == response | |
| 973 | |
| 974 def test_session_pickling(self, httpbin): | |
| 975 r = requests.Request('GET', httpbin('get')) | |
| 976 s = requests.Session() | |
| 977 | |
| 978 s = pickle.loads(pickle.dumps(s)) | |
| 979 s.proxies = getproxies() | |
| 980 | |
| 981 r = s.send(r.prepare()) | |
| 982 assert r.status_code == 200 | |
| 983 | |
| 984 def test_fixes_1329(self, httpbin): | |
| 985 """ | |
| 986 Ensure that header updates are done case-insensitively. | |
| 987 """ | |
| 988 s = requests.Session() | |
| 989 s.headers.update({'ACCEPT': 'BOGUS'}) | |
| 990 s.headers.update({'accept': 'application/json'}) | |
| 991 r = s.get(httpbin('get')) | |
| 992 headers = r.request.headers | |
| 993 assert headers['accept'] == 'application/json' | |
| 994 assert headers['Accept'] == 'application/json' | |
| 995 assert headers['ACCEPT'] == 'application/json' | |
| 996 | |
| 997 def test_uppercase_scheme_redirect(self, httpbin): | |
| 998 parts = urlparse(httpbin('html')) | |
| 999 url = "HTTP://" + parts.netloc + parts.path | |
| 1000 r = requests.get(httpbin('redirect-to'), params={'url': url}) | |
| 1001 assert r.status_code == 200 | |
| 1002 assert r.url.lower() == url.lower() | |
| 1003 | |
| 1004 def test_transport_adapter_ordering(self): | |
| 1005 s = requests.Session() | |
| 1006 order = ['https://', 'http://'] | |
| 1007 assert order == list(s.adapters) | |
| 1008 s.mount('http://git', HTTPAdapter()) | |
| 1009 s.mount('http://github', HTTPAdapter()) | |
| 1010 s.mount('http://github.com', HTTPAdapter()) | |
| 1011 s.mount('http://github.com/about/', HTTPAdapter()) | |
| 1012 order = [ | |
| 1013 'http://github.com/about/', | |
| 1014 'http://github.com', | |
| 1015 'http://github', | |
| 1016 'http://git', | |
| 1017 'https://', | |
| 1018 'http://', | |
| 1019 ] | |
| 1020 assert order == list(s.adapters) | |
| 1021 s.mount('http://gittip', HTTPAdapter()) | |
| 1022 s.mount('http://gittip.com', HTTPAdapter()) | |
| 1023 s.mount('http://gittip.com/about/', HTTPAdapter()) | |
| 1024 order = [ | |
| 1025 'http://github.com/about/', | |
| 1026 'http://gittip.com/about/', | |
| 1027 'http://github.com', | |
| 1028 'http://gittip.com', | |
| 1029 'http://github', | |
| 1030 'http://gittip', | |
| 1031 'http://git', | |
| 1032 'https://', | |
| 1033 'http://', | |
| 1034 ] | |
| 1035 assert order == list(s.adapters) | |
| 1036 s2 = requests.Session() | |
| 1037 s2.adapters = {'http://': HTTPAdapter()} | |
| 1038 s2.mount('https://', HTTPAdapter()) | |
| 1039 assert 'http://' in s2.adapters | |
| 1040 assert 'https://' in s2.adapters | |
| 1041 | |
| 1042 def test_header_remove_is_case_insensitive(self, httpbin): | |
| 1043 # From issue #1321 | |
| 1044 s = requests.Session() | |
| 1045 s.headers['foo'] = 'bar' | |
| 1046 r = s.get(httpbin('get'), headers={'FOO': None}) | |
| 1047 assert 'foo' not in r.request.headers | |
| 1048 | |
| 1049 def test_params_are_merged_case_sensitive(self, httpbin): | |
| 1050 s = requests.Session() | |
| 1051 s.params['foo'] = 'bar' | |
| 1052 r = s.get(httpbin('get'), params={'FOO': 'bar'}) | |
| 1053 assert r.json()['args'] == {'foo': 'bar', 'FOO': 'bar'} | |
| 1054 | |
| 1055 def test_long_authinfo_in_url(self): | |
| 1056 url = 'http://{0}:{1}@{2}:9000/path?query#frag'.format( | |
| 1057 'E8A3BE87-9E3F-4620-8858-95478E385B5B', | |
| 1058 'EA770032-DA4D-4D84-8CE9-29C6D910BF1E', | |
| 1059 'exactly-------------sixty-----------three------------characters', | |
| 1060 ) | |
| 1061 r = requests.Request('GET', url).prepare() | |
| 1062 assert r.url == url | |
| 1063 | |
| 1064 def test_header_keys_are_native(self, httpbin): | |
| 1065 headers = {u('unicode'): 'blah', 'byte'.encode('ascii'): 'blah'} | |
| 1066 r = requests.Request('GET', httpbin('get'), headers=headers) | |
| 1067 p = r.prepare() | |
| 1068 | |
| 1069 # This is testing that they are builtin strings. A bit weird, but there | |
| 1070 # we go. | |
| 1071 assert 'unicode' in p.headers.keys() | |
| 1072 assert 'byte' in p.headers.keys() | |
| 1073 | |
| 1074 @pytest.mark.parametrize('files', ('foo', b'foo', bytearray(b'foo'))) | |
| 1075 def test_can_send_objects_with_files(self, httpbin, files): | |
| 1076 data = {'a': 'this is a string'} | |
| 1077 files = {'b': files} | |
| 1078 r = requests.Request('POST', httpbin('post'), data=data, files=files) | |
| 1079 p = r.prepare() | |
| 1080 assert 'multipart/form-data' in p.headers['Content-Type'] | |
| 1081 | |
| 1082 def test_can_send_file_object_with_non_string_filename(self, httpbin): | |
| 1083 f = io.BytesIO() | |
| 1084 f.name = 2 | |
| 1085 r = requests.Request('POST', httpbin('post'), files={'f': f}) | |
| 1086 p = r.prepare() | |
| 1087 | |
| 1088 assert 'multipart/form-data' in p.headers['Content-Type'] | |
| 1089 | |
| 1090 def test_autoset_header_values_are_native(self, httpbin): | |
| 1091 data = 'this is a string' | |
| 1092 length = '16' | |
| 1093 req = requests.Request('POST', httpbin('post'), data=data) | |
| 1094 p = req.prepare() | |
| 1095 | |
| 1096 assert p.headers['Content-Length'] == length | |
| 1097 | |
| 1098 def test_nonhttp_schemes_dont_check_URLs(self): | |
| 1099 test_urls = ( | |
| 1100 'data:image/gif;base64,R0lGODlhAQABAHAAACH5BAUAAAAALAAAAAABAAEAAAICR
AEAOw==', | |
| 1101 'file:///etc/passwd', | |
| 1102 'magnet:?xt=urn:btih:be08f00302bc2d1d3cfa3af02024fa647a271431', | |
| 1103 ) | |
| 1104 for test_url in test_urls: | |
| 1105 req = requests.Request('GET', test_url) | |
| 1106 preq = req.prepare() | |
| 1107 assert test_url == preq.url | |
| 1108 | |
| 1109 def test_auth_is_stripped_on_redirect_off_host(self, httpbin): | |
| 1110 r = requests.get( | |
| 1111 httpbin('redirect-to'), | |
| 1112 params={'url': 'http://www.google.co.uk'}, | |
| 1113 auth=('user', 'pass'), | |
| 1114 ) | |
| 1115 assert r.history[0].request.headers['Authorization'] | |
| 1116 assert not r.request.headers.get('Authorization', '') | |
| 1117 | |
| 1118 def test_auth_is_retained_for_redirect_on_host(self, httpbin): | |
| 1119 r = requests.get(httpbin('redirect/1'), auth=('user', 'pass')) | |
| 1120 h1 = r.history[0].request.headers['Authorization'] | |
| 1121 h2 = r.request.headers['Authorization'] | |
| 1122 | |
| 1123 assert h1 == h2 | |
| 1124 | |
| 1125 def test_manual_redirect_with_partial_body_read(self, httpbin): | |
| 1126 s = requests.Session() | |
| 1127 r1 = s.get(httpbin('redirect/2'), allow_redirects=False, stream=True) | |
| 1128 assert r1.is_redirect | |
| 1129 rg = s.resolve_redirects(r1, r1.request, stream=True) | |
| 1130 | |
| 1131 # read only the first eight bytes of the response body, | |
| 1132 # then follow the redirect | |
| 1133 r1.iter_content(8) | |
| 1134 r2 = next(rg) | |
| 1135 assert r2.is_redirect | |
| 1136 | |
| 1137 # read all of the response via iter_content, | |
| 1138 # then follow the redirect | |
| 1139 for _ in r2.iter_content(): | |
| 1140 pass | |
| 1141 r3 = next(rg) | |
| 1142 assert not r3.is_redirect | |
| 1143 | |
| 1144 def _patch_adapter_gzipped_redirect(self, session, url): | |
| 1145 adapter = session.get_adapter(url=url) | |
| 1146 org_build_response = adapter.build_response | |
| 1147 self._patched_response = False | |
| 1148 | |
| 1149 def build_response(*args, **kwargs): | |
| 1150 resp = org_build_response(*args, **kwargs) | |
| 1151 if not self._patched_response: | |
| 1152 resp.raw.headers['content-encoding'] = 'gzip' | |
| 1153 self._patched_response = True | |
| 1154 return resp | |
| 1155 | |
| 1156 adapter.build_response = build_response | |
| 1157 | |
| 1158 def test_redirect_with_wrong_gzipped_header(self, httpbin): | |
| 1159 s = requests.Session() | |
| 1160 url = httpbin('redirect/1') | |
| 1161 self._patch_adapter_gzipped_redirect(s, url) | |
| 1162 s.get(url) | |
| 1163 | |
| 1164 def test_basic_auth_str_is_always_native(self): | |
| 1165 s = _basic_auth_str("test", "test") | |
| 1166 assert isinstance(s, builtin_str) | |
| 1167 assert s == "Basic dGVzdDp0ZXN0" | |
| 1168 | |
| 1169 def test_requests_history_is_saved(self, httpbin): | |
| 1170 r = requests.get(httpbin('redirect/5')) | |
| 1171 total = r.history[-1].history | |
| 1172 i = 0 | |
| 1173 for item in r.history: | |
| 1174 assert item.history == total[0:i] | |
| 1175 i += 1 | |
| 1176 | |
| 1177 def test_json_param_post_content_type_works(self, httpbin): | |
| 1178 r = requests.post( | |
| 1179 httpbin('post'), | |
| 1180 json={'life': 42} | |
| 1181 ) | |
| 1182 assert r.status_code == 200 | |
| 1183 assert 'application/json' in r.request.headers['Content-Type'] | |
| 1184 assert {'life': 42} == r.json()['json'] | |
| 1185 | |
| 1186 def test_json_param_post_should_not_override_data_param(self, httpbin): | |
| 1187 r = requests.Request(method='POST', url=httpbin('post'), | |
| 1188 data={'stuff': 'elixr'}, | |
| 1189 json={'music': 'flute'}) | |
| 1190 prep = r.prepare() | |
| 1191 assert 'stuff=elixr' == prep.body | |
| 1192 | |
| 1193 def test_response_iter_lines(self, httpbin): | |
| 1194 r = requests.get(httpbin('stream/4'), stream=True) | |
| 1195 assert r.status_code == 200 | |
| 1196 | |
| 1197 it = r.iter_lines() | |
| 1198 next(it) | |
| 1199 assert len(list(it)) == 3 | |
| 1200 | |
| 1201 def test_unconsumed_session_response_closes_connection(self, httpbin): | |
| 1202 s = requests.session() | |
| 1203 | |
| 1204 with contextlib.closing(s.get(httpbin('stream/4'), stream=True)) as resp
onse: | |
| 1205 pass | |
| 1206 | |
| 1207 assert response._content_consumed is False | |
| 1208 assert response.raw.closed | |
| 1209 | |
| 1210 @pytest.mark.xfail | |
| 1211 def test_response_iter_lines_reentrant(self, httpbin): | |
| 1212 """Response.iter_lines() is not reentrant safe""" | |
| 1213 r = requests.get(httpbin('stream/4'), stream=True) | |
| 1214 assert r.status_code == 200 | |
| 1215 | |
| 1216 next(r.iter_lines()) | |
| 1217 assert len(list(r.iter_lines())) == 3 | |
| 1218 | |
| 1219 def test_session_close_proxy_clear(self, mocker): | |
| 1220 proxies = { | |
| 1221 'one': mocker.Mock(), | |
| 1222 'two': mocker.Mock(), | |
| 1223 } | |
| 1224 session = requests.Session() | |
| 1225 mocker.patch.dict(session.adapters['http://'].proxy_manager, proxies) | |
| 1226 session.close() | |
| 1227 proxies['one'].clear.assert_called_once_with() | |
| 1228 proxies['two'].clear.assert_called_once_with() | |
| 1229 | |
| 1230 | |
| 1231 class TestCaseInsensitiveDict: | |
| 1232 | |
| 1233 @pytest.mark.parametrize( | |
| 1234 'cid', ( | |
| 1235 CaseInsensitiveDict({'Foo': 'foo', 'BAr': 'bar'}), | |
| 1236 CaseInsensitiveDict([('Foo', 'foo'), ('BAr', 'bar')]), | |
| 1237 CaseInsensitiveDict(FOO='foo', BAr='bar'), | |
| 1238 )) | |
| 1239 def test_init(self, cid): | |
| 1240 assert len(cid) == 2 | |
| 1241 assert 'foo' in cid | |
| 1242 assert 'bar' in cid | |
| 1243 | |
| 1244 def test_docstring_example(self): | |
| 1245 cid = CaseInsensitiveDict() | |
| 1246 cid['Accept'] = 'application/json' | |
| 1247 assert cid['aCCEPT'] == 'application/json' | |
| 1248 assert list(cid) == ['Accept'] | |
| 1249 | |
| 1250 def test_len(self): | |
| 1251 cid = CaseInsensitiveDict({'a': 'a', 'b': 'b'}) | |
| 1252 cid['A'] = 'a' | |
| 1253 assert len(cid) == 2 | |
| 1254 | |
| 1255 def test_getitem(self): | |
| 1256 cid = CaseInsensitiveDict({'Spam': 'blueval'}) | |
| 1257 assert cid['spam'] == 'blueval' | |
| 1258 assert cid['SPAM'] == 'blueval' | |
| 1259 | |
| 1260 def test_fixes_649(self): | |
| 1261 """__setitem__ should behave case-insensitively.""" | |
| 1262 cid = CaseInsensitiveDict() | |
| 1263 cid['spam'] = 'oneval' | |
| 1264 cid['Spam'] = 'twoval' | |
| 1265 cid['sPAM'] = 'redval' | |
| 1266 cid['SPAM'] = 'blueval' | |
| 1267 assert cid['spam'] == 'blueval' | |
| 1268 assert cid['SPAM'] == 'blueval' | |
| 1269 assert list(cid.keys()) == ['SPAM'] | |
| 1270 | |
| 1271 def test_delitem(self): | |
| 1272 cid = CaseInsensitiveDict() | |
| 1273 cid['Spam'] = 'someval' | |
| 1274 del cid['sPam'] | |
| 1275 assert 'spam' not in cid | |
| 1276 assert len(cid) == 0 | |
| 1277 | |
| 1278 def test_contains(self): | |
| 1279 cid = CaseInsensitiveDict() | |
| 1280 cid['Spam'] = 'someval' | |
| 1281 assert 'Spam' in cid | |
| 1282 assert 'spam' in cid | |
| 1283 assert 'SPAM' in cid | |
| 1284 assert 'sPam' in cid | |
| 1285 assert 'notspam' not in cid | |
| 1286 | |
| 1287 def test_get(self): | |
| 1288 cid = CaseInsensitiveDict() | |
| 1289 cid['spam'] = 'oneval' | |
| 1290 cid['SPAM'] = 'blueval' | |
| 1291 assert cid.get('spam') == 'blueval' | |
| 1292 assert cid.get('SPAM') == 'blueval' | |
| 1293 assert cid.get('sPam') == 'blueval' | |
| 1294 assert cid.get('notspam', 'default') == 'default' | |
| 1295 | |
| 1296 def test_update(self): | |
| 1297 cid = CaseInsensitiveDict() | |
| 1298 cid['spam'] = 'blueval' | |
| 1299 cid.update({'sPam': 'notblueval'}) | |
| 1300 assert cid['spam'] == 'notblueval' | |
| 1301 cid = CaseInsensitiveDict({'Foo': 'foo', 'BAr': 'bar'}) | |
| 1302 cid.update({'fOO': 'anotherfoo', 'bAR': 'anotherbar'}) | |
| 1303 assert len(cid) == 2 | |
| 1304 assert cid['foo'] == 'anotherfoo' | |
| 1305 assert cid['bar'] == 'anotherbar' | |
| 1306 | |
| 1307 def test_update_retains_unchanged(self): | |
| 1308 cid = CaseInsensitiveDict({'foo': 'foo', 'bar': 'bar'}) | |
| 1309 cid.update({'foo': 'newfoo'}) | |
| 1310 assert cid['bar'] == 'bar' | |
| 1311 | |
| 1312 def test_iter(self): | |
| 1313 cid = CaseInsensitiveDict({'Spam': 'spam', 'Eggs': 'eggs'}) | |
| 1314 keys = frozenset(['Spam', 'Eggs']) | |
| 1315 assert frozenset(iter(cid)) == keys | |
| 1316 | |
| 1317 def test_equality(self): | |
| 1318 cid = CaseInsensitiveDict({'SPAM': 'blueval', 'Eggs': 'redval'}) | |
| 1319 othercid = CaseInsensitiveDict({'spam': 'blueval', 'eggs': 'redval'}) | |
| 1320 assert cid == othercid | |
| 1321 del othercid['spam'] | |
| 1322 assert cid != othercid | |
| 1323 assert cid == {'spam': 'blueval', 'eggs': 'redval'} | |
| 1324 assert cid != object() | |
| 1325 | |
| 1326 def test_setdefault(self): | |
| 1327 cid = CaseInsensitiveDict({'Spam': 'blueval'}) | |
| 1328 assert cid.setdefault('spam', 'notblueval') == 'blueval' | |
| 1329 assert cid.setdefault('notspam', 'notblueval') == 'notblueval' | |
| 1330 | |
| 1331 def test_lower_items(self): | |
| 1332 cid = CaseInsensitiveDict({ | |
| 1333 'Accept': 'application/json', | |
| 1334 'user-Agent': 'requests', | |
| 1335 }) | |
| 1336 keyset = frozenset(lowerkey for lowerkey, v in cid.lower_items()) | |
| 1337 lowerkeyset = frozenset(['accept', 'user-agent']) | |
| 1338 assert keyset == lowerkeyset | |
| 1339 | |
| 1340 def test_preserve_key_case(self): | |
| 1341 cid = CaseInsensitiveDict({ | |
| 1342 'Accept': 'application/json', | |
| 1343 'user-Agent': 'requests', | |
| 1344 }) | |
| 1345 keyset = frozenset(['Accept', 'user-Agent']) | |
| 1346 assert frozenset(i[0] for i in cid.items()) == keyset | |
| 1347 assert frozenset(cid.keys()) == keyset | |
| 1348 assert frozenset(cid) == keyset | |
| 1349 | |
| 1350 def test_preserve_last_key_case(self): | |
| 1351 cid = CaseInsensitiveDict({ | |
| 1352 'Accept': 'application/json', | |
| 1353 'user-Agent': 'requests', | |
| 1354 }) | |
| 1355 cid.update({'ACCEPT': 'application/json'}) | |
| 1356 cid['USER-AGENT'] = 'requests' | |
| 1357 keyset = frozenset(['ACCEPT', 'USER-AGENT']) | |
| 1358 assert frozenset(i[0] for i in cid.items()) == keyset | |
| 1359 assert frozenset(cid.keys()) == keyset | |
| 1360 assert frozenset(cid) == keyset | |
| 1361 | |
| 1362 def test_copy(self): | |
| 1363 cid = CaseInsensitiveDict({ | |
| 1364 'Accept': 'application/json', | |
| 1365 'user-Agent': 'requests', | |
| 1366 }) | |
| 1367 cid_copy = cid.copy() | |
| 1368 assert cid == cid_copy | |
| 1369 cid['changed'] = True | |
| 1370 assert cid != cid_copy | |
| 1371 | |
| 1372 | |
| 1373 class TestMorselToCookieExpires: | |
| 1374 """Tests for morsel_to_cookie when morsel contains expires.""" | |
| 1375 | |
| 1376 def test_expires_valid_str(self): | |
| 1377 """Test case where we convert expires from string time.""" | |
| 1378 | |
| 1379 morsel = Morsel() | |
| 1380 morsel['expires'] = 'Thu, 01-Jan-1970 00:00:01 GMT' | |
| 1381 cookie = morsel_to_cookie(morsel) | |
| 1382 assert cookie.expires == 1 | |
| 1383 | |
| 1384 @pytest.mark.parametrize( | |
| 1385 'value, exception', ( | |
| 1386 (100, TypeError), | |
| 1387 ('woops', ValueError), | |
| 1388 )) | |
| 1389 def test_expires_invalid_int(self, value, exception): | |
| 1390 """Test case where an invalid type is passed for expires.""" | |
| 1391 morsel = Morsel() | |
| 1392 morsel['expires'] = value | |
| 1393 with pytest.raises(exception): | |
| 1394 morsel_to_cookie(morsel) | |
| 1395 | |
| 1396 def test_expires_none(self): | |
| 1397 """Test case where expires is None.""" | |
| 1398 | |
| 1399 morsel = Morsel() | |
| 1400 morsel['expires'] = None | |
| 1401 cookie = morsel_to_cookie(morsel) | |
| 1402 assert cookie.expires is None | |
| 1403 | |
| 1404 | |
| 1405 class TestMorselToCookieMaxAge: | |
| 1406 | |
| 1407 """Tests for morsel_to_cookie when morsel contains max-age.""" | |
| 1408 | |
| 1409 def test_max_age_valid_int(self): | |
| 1410 """Test case where a valid max age in seconds is passed.""" | |
| 1411 | |
| 1412 morsel = Morsel() | |
| 1413 morsel['max-age'] = 60 | |
| 1414 cookie = morsel_to_cookie(morsel) | |
| 1415 assert isinstance(cookie.expires, int) | |
| 1416 | |
| 1417 def test_max_age_invalid_str(self): | |
| 1418 """Test case where a invalid max age is passed.""" | |
| 1419 | |
| 1420 morsel = Morsel() | |
| 1421 morsel['max-age'] = 'woops' | |
| 1422 with pytest.raises(TypeError): | |
| 1423 morsel_to_cookie(morsel) | |
| 1424 | |
| 1425 | |
| 1426 class TestTimeout: | |
| 1427 | |
| 1428 def test_stream_timeout(self, httpbin): | |
| 1429 try: | |
| 1430 requests.get(httpbin('delay/10'), timeout=2.0) | |
| 1431 except requests.exceptions.Timeout as e: | |
| 1432 assert 'Read timed out' in e.args[0].args[0] | |
| 1433 | |
| 1434 @pytest.mark.parametrize( | |
| 1435 'timeout, error_text', ( | |
| 1436 ((3, 4, 5), '(connect, read)'), | |
| 1437 ('foo', 'must be an int or float'), | |
| 1438 )) | |
| 1439 def test_invalid_timeout(self, httpbin, timeout, error_text): | |
| 1440 with pytest.raises(ValueError) as e: | |
| 1441 requests.get(httpbin('get'), timeout=timeout) | |
| 1442 assert error_text in str(e) | |
| 1443 | |
| 1444 def test_none_timeout(self, httpbin): | |
| 1445 """ Check that you can set None as a valid timeout value. | |
| 1446 | |
| 1447 To actually test this behavior, we'd want to check that setting the | |
| 1448 timeout to None actually lets the request block past the system default | |
| 1449 timeout. However, this would make the test suite unbearably slow. | |
| 1450 Instead we verify that setting the timeout to None does not prevent the | |
| 1451 request from succeeding. | |
| 1452 """ | |
| 1453 r = requests.get(httpbin('get'), timeout=None) | |
| 1454 assert r.status_code == 200 | |
| 1455 | |
| 1456 def test_read_timeout(self, httpbin): | |
| 1457 try: | |
| 1458 requests.get(httpbin('delay/10'), timeout=(None, 0.1)) | |
| 1459 pytest.fail('The recv() request should time out.') | |
| 1460 except ReadTimeout: | |
| 1461 pass | |
| 1462 | |
| 1463 def test_connect_timeout(self): | |
| 1464 try: | |
| 1465 requests.get(TARPIT, timeout=(0.1, None)) | |
| 1466 pytest.fail('The connect() request should time out.') | |
| 1467 except ConnectTimeout as e: | |
| 1468 assert isinstance(e, ConnectionError) | |
| 1469 assert isinstance(e, Timeout) | |
| 1470 | |
| 1471 def test_total_timeout_connect(self): | |
| 1472 try: | |
| 1473 requests.get(TARPIT, timeout=(0.1, 0.1)) | |
| 1474 pytest.fail('The connect() request should time out.') | |
| 1475 except ConnectTimeout: | |
| 1476 pass | |
| 1477 | |
| 1478 def test_encoded_methods(self, httpbin): | |
| 1479 """See: https://github.com/kennethreitz/requests/issues/2316""" | |
| 1480 r = requests.request(b'GET', httpbin('get')) | |
| 1481 assert r.ok | |
| 1482 | |
| 1483 | |
| 1484 SendCall = collections.namedtuple('SendCall', ('args', 'kwargs')) | |
| 1485 | |
| 1486 | |
| 1487 class RedirectSession(SessionRedirectMixin): | |
| 1488 def __init__(self, order_of_redirects): | |
| 1489 self.redirects = order_of_redirects | |
| 1490 self.calls = [] | |
| 1491 self.max_redirects = 30 | |
| 1492 self.cookies = {} | |
| 1493 self.trust_env = False | |
| 1494 | |
| 1495 def send(self, *args, **kwargs): | |
| 1496 self.calls.append(SendCall(args, kwargs)) | |
| 1497 return self.build_response() | |
| 1498 | |
| 1499 def build_response(self): | |
| 1500 request = self.calls[-1].args[0] | |
| 1501 r = requests.Response() | |
| 1502 | |
| 1503 try: | |
| 1504 r.status_code = int(self.redirects.pop(0)) | |
| 1505 except IndexError: | |
| 1506 r.status_code = 200 | |
| 1507 | |
| 1508 r.headers = CaseInsensitiveDict({'Location': '/'}) | |
| 1509 r.raw = self._build_raw() | |
| 1510 r.request = request | |
| 1511 return r | |
| 1512 | |
| 1513 def _build_raw(self): | |
| 1514 string = StringIO.StringIO('') | |
| 1515 setattr(string, 'release_conn', lambda *args: args) | |
| 1516 return string | |
| 1517 | |
| 1518 | |
| 1519 def test_requests_are_updated_each_time(httpbin): | |
| 1520 session = RedirectSession([303, 307]) | |
| 1521 prep = requests.Request('POST', httpbin('post')).prepare() | |
| 1522 r0 = session.send(prep) | |
| 1523 assert r0.request.method == 'POST' | |
| 1524 assert session.calls[-1] == SendCall((r0.request,), {}) | |
| 1525 redirect_generator = session.resolve_redirects(r0, prep) | |
| 1526 default_keyword_args = { | |
| 1527 'stream': False, | |
| 1528 'verify': True, | |
| 1529 'cert': None, | |
| 1530 'timeout': None, | |
| 1531 'allow_redirects': False, | |
| 1532 'proxies': {}, | |
| 1533 } | |
| 1534 for response in redirect_generator: | |
| 1535 assert response.request.method == 'GET' | |
| 1536 send_call = SendCall((response.request,), default_keyword_args) | |
| 1537 assert session.calls[-1] == send_call | |
| 1538 | |
| 1539 | |
| 1540 @pytest.mark.parametrize( | |
| 1541 'data', ( | |
| 1542 (('a', 'b'), ('c', 'd')), | |
| 1543 (('c', 'd'), ('a', 'b')), | |
| 1544 (('a', 'b'), ('c', 'd'), ('e', 'f')), | |
| 1545 )) | |
| 1546 def test_data_argument_accepts_tuples(data): | |
| 1547 """Ensure that the data argument will accept tuples of strings | |
| 1548 and properly encode them. | |
| 1549 """ | |
| 1550 p = PreparedRequest() | |
| 1551 p.prepare( | |
| 1552 method='GET', | |
| 1553 url='http://www.example.com', | |
| 1554 data=data, | |
| 1555 hooks=default_hooks() | |
| 1556 ) | |
| 1557 assert p.body == urlencode(data) | |
| 1558 | |
| 1559 | |
| 1560 @pytest.mark.parametrize( | |
| 1561 'kwargs', ( | |
| 1562 None, | |
| 1563 { | |
| 1564 'method': 'GET', | |
| 1565 'url': 'http://www.example.com', | |
| 1566 'data': 'foo=bar', | |
| 1567 'hooks': default_hooks() | |
| 1568 }, | |
| 1569 { | |
| 1570 'method': 'GET', | |
| 1571 'url': 'http://www.example.com', | |
| 1572 'data': 'foo=bar', | |
| 1573 'hooks': default_hooks(), | |
| 1574 'cookies': {'foo': 'bar'} | |
| 1575 }, | |
| 1576 { | |
| 1577 'method': 'GET', | |
| 1578 'url': u('http://www.example.com/üniçø∂é') | |
| 1579 }, | |
| 1580 )) | |
| 1581 def test_prepared_copy(kwargs): | |
| 1582 p = PreparedRequest() | |
| 1583 if kwargs: | |
| 1584 p.prepare(**kwargs) | |
| 1585 copy = p.copy() | |
| 1586 for attr in ('method', 'url', 'headers', '_cookies', 'body', 'hooks'): | |
| 1587 assert getattr(p, attr) == getattr(copy, attr) | |
| 1588 | |
| 1589 | |
| 1590 def test_urllib3_retries(httpbin): | |
| 1591 from requests.packages.urllib3.util import Retry | |
| 1592 s = requests.Session() | |
| 1593 s.mount('http://', HTTPAdapter(max_retries=Retry( | |
| 1594 total=2, status_forcelist=[500] | |
| 1595 ))) | |
| 1596 | |
| 1597 with pytest.raises(RetryError): | |
| 1598 s.get(httpbin('status/500')) | |
| 1599 | |
| 1600 | |
| 1601 def test_urllib3_pool_connection_closed(httpbin): | |
| 1602 s = requests.Session() | |
| 1603 s.mount('http://', HTTPAdapter(pool_connections=0, pool_maxsize=0)) | |
| 1604 | |
| 1605 try: | |
| 1606 s.get(httpbin('status/200')) | |
| 1607 except ConnectionError as e: | |
| 1608 assert u"Pool is closed." in str(e) | |
| 1609 | |
| 1610 | |
| 1611 def test_vendor_aliases(): | |
| 1612 from requests.packages import urllib3 | |
| 1613 from requests.packages import chardet | |
| 1614 | |
| 1615 with pytest.raises(ImportError): | |
| 1616 from requests.packages import webbrowser | |
| OLD | NEW |