Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(492)

Unified Diff: recipe_engine/third_party/requests/tests/test_requests.py

Issue 2164713003: Vendor requests. (Closed) Base URL: https://chromium.googlesource.com/external/github.com/luci/recipes-py@master
Patch Set: Fix deps.pyl Created 4 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: recipe_engine/third_party/requests/tests/test_requests.py
diff --git a/recipe_engine/third_party/requests/tests/test_requests.py b/recipe_engine/third_party/requests/tests/test_requests.py
deleted file mode 100755
index 0a87b52cdaad377122d606202b581546372652ad..0000000000000000000000000000000000000000
--- a/recipe_engine/third_party/requests/tests/test_requests.py
+++ /dev/null
@@ -1,1616 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-
-"""Tests for Requests."""
-
-from __future__ import division
-import json
-import os
-import pickle
-import collections
-import contextlib
-
-import io
-import requests
-import pytest
-from requests.adapters import HTTPAdapter
-from requests.auth import HTTPDigestAuth, _basic_auth_str
-from requests.compat import (
- Morsel, cookielib, getproxies, str, urlparse,
- builtin_str, OrderedDict)
-from requests.cookies import cookiejar_from_dict, morsel_to_cookie
-from requests.exceptions import (
- ConnectionError, ConnectTimeout, InvalidSchema, InvalidURL,
- MissingSchema, ReadTimeout, Timeout, RetryError, TooManyRedirects,
- ProxyError)
-from requests.models import PreparedRequest
-from requests.structures import CaseInsensitiveDict
-from requests.sessions import SessionRedirectMixin
-from requests.models import urlencode
-from requests.hooks import default_hooks
-
-from .compat import StringIO, u
-
-# Requests to this URL should always fail with a connection timeout (nothing
-# listening on that port)
-TARPIT = 'http://10.255.255.1'
-
-
-class TestRequests:
-
- def test_entry_points(self):
-
- requests.session
- requests.session().get
- requests.session().head
- requests.get
- requests.head
- requests.put
- requests.patch
- requests.post
-
- @pytest.mark.parametrize(
- 'exception, url', (
- (MissingSchema, 'hiwpefhipowhefopw'),
- (InvalidSchema, 'localhost:3128'),
- (InvalidSchema, 'localhost.localdomain:3128/'),
- (InvalidSchema, '10.122.1.1:3128/'),
- (InvalidURL, 'http://'),
- ))
- def test_invalid_url(self, exception, url):
- with pytest.raises(exception):
- requests.get(url)
-
- def test_basic_building(self):
- req = requests.Request()
- req.url = 'http://kennethreitz.org/'
- req.data = {'life': '42'}
-
- pr = req.prepare()
- assert pr.url == req.url
- assert pr.body == 'life=42'
-
- @pytest.mark.parametrize('method', ('GET', 'HEAD'))
- def test_no_content_length(self, httpbin, method):
- req = requests.Request(method, httpbin(method.lower())).prepare()
- assert 'Content-Length' not in req.headers
-
- def test_override_content_length(self, httpbin):
- headers = {
- 'Content-Length': 'not zero'
- }
- r = requests.Request('POST', httpbin('post'), headers=headers).prepare()
- assert 'Content-Length' in r.headers
- assert r.headers['Content-Length'] == 'not zero'
-
- def test_path_is_not_double_encoded(self):
- request = requests.Request('GET', "http://0.0.0.0/get/test case").prepare()
-
- assert request.path_url == '/get/test%20case'
-
- @pytest.mark.parametrize(
- 'url, expected', (
- ('http://example.com/path#fragment', 'http://example.com/path?a=b#fragment'),
- ('http://example.com/path?key=value#fragment', 'http://example.com/path?key=value&a=b#fragment')
- ))
- def test_params_are_added_before_fragment(self, url, expected):
- request = requests.Request('GET', url, params={"a": "b"}).prepare()
- assert request.url == expected
-
- def test_params_original_order_is_preserved_by_default(self):
- param_ordered_dict = OrderedDict((('z', 1), ('a', 1), ('k', 1), ('d', 1)))
- session = requests.Session()
- request = requests.Request('GET', 'http://example.com/', params=param_ordered_dict)
- prep = session.prepare_request(request)
- assert prep.url == 'http://example.com/?z=1&a=1&k=1&d=1'
-
- def test_params_bytes_are_encoded(self):
- request = requests.Request('GET', 'http://example.com',
- params=b'test=foo').prepare()
- assert request.url == 'http://example.com/?test=foo'
-
- def test_binary_put(self):
- request = requests.Request('PUT', 'http://example.com',
- data=u"ööö".encode("utf-8")).prepare()
- assert isinstance(request.body, bytes)
-
- @pytest.mark.parametrize('scheme', ('http://', 'HTTP://', 'hTTp://', 'HttP://'))
- def test_mixed_case_scheme_acceptable(self, httpbin, scheme):
- s = requests.Session()
- s.proxies = getproxies()
- parts = urlparse(httpbin('get'))
- url = scheme + parts.netloc + parts.path
- r = requests.Request('GET', url)
- r = s.send(r.prepare())
- assert r.status_code == 200, 'failed for scheme {0}'.format(scheme)
-
- def test_HTTP_200_OK_GET_ALTERNATIVE(self, httpbin):
- r = requests.Request('GET', httpbin('get'))
- s = requests.Session()
- s.proxies = getproxies()
-
- r = s.send(r.prepare())
-
- assert r.status_code == 200
-
- def test_HTTP_302_ALLOW_REDIRECT_GET(self, httpbin):
- r = requests.get(httpbin('redirect', '1'))
- assert r.status_code == 200
- assert r.history[0].status_code == 302
- assert r.history[0].is_redirect
-
- def test_HTTP_302_TOO_MANY_REDIRECTS(self, httpbin):
- try:
- requests.get(httpbin('relative-redirect', '50'))
- except TooManyRedirects as e:
- url = httpbin('relative-redirect', '20')
- assert e.request.url == url
- assert e.response.url == url
- assert len(e.response.history) == 30
- else:
- pytest.fail('Expected redirect to raise TooManyRedirects but it did not')
-
- def test_HTTP_302_TOO_MANY_REDIRECTS_WITH_PARAMS(self, httpbin):
- s = requests.session()
- s.max_redirects = 5
- try:
- s.get(httpbin('relative-redirect', '50'))
- except TooManyRedirects as e:
- url = httpbin('relative-redirect', '45')
- assert e.request.url == url
- assert e.response.url == url
- assert len(e.response.history) == 5
- else:
- pytest.fail('Expected custom max number of redirects to be respected but was not')
-
- def test_http_301_changes_post_to_get(self, httpbin):
- r = requests.post(httpbin('status', '301'))
- assert r.status_code == 200
- assert r.request.method == 'GET'
- assert r.history[0].status_code == 301
- assert r.history[0].is_redirect
-
- def test_http_301_doesnt_change_head_to_get(self, httpbin):
- r = requests.head(httpbin('status', '301'), allow_redirects=True)
- print(r.content)
- assert r.status_code == 200
- assert r.request.method == 'HEAD'
- assert r.history[0].status_code == 301
- assert r.history[0].is_redirect
-
- def test_http_302_changes_post_to_get(self, httpbin):
- r = requests.post(httpbin('status', '302'))
- assert r.status_code == 200
- assert r.request.method == 'GET'
- assert r.history[0].status_code == 302
- assert r.history[0].is_redirect
-
- def test_http_302_doesnt_change_head_to_get(self, httpbin):
- r = requests.head(httpbin('status', '302'), allow_redirects=True)
- assert r.status_code == 200
- assert r.request.method == 'HEAD'
- assert r.history[0].status_code == 302
- assert r.history[0].is_redirect
-
- def test_http_303_changes_post_to_get(self, httpbin):
- r = requests.post(httpbin('status', '303'))
- assert r.status_code == 200
- assert r.request.method == 'GET'
- assert r.history[0].status_code == 303
- assert r.history[0].is_redirect
-
- def test_http_303_doesnt_change_head_to_get(self, httpbin):
- r = requests.head(httpbin('status', '303'), allow_redirects=True)
- assert r.status_code == 200
- assert r.request.method == 'HEAD'
- assert r.history[0].status_code == 303
- assert r.history[0].is_redirect
-
- # def test_HTTP_302_ALLOW_REDIRECT_POST(self):
- # r = requests.post(httpbin('status', '302'), data={'some': 'data'})
- # self.assertEqual(r.status_code, 200)
-
- def test_HTTP_200_OK_GET_WITH_PARAMS(self, httpbin):
- heads = {'User-agent': 'Mozilla/5.0'}
-
- r = requests.get(httpbin('user-agent'), headers=heads)
-
- assert heads['User-agent'] in r.text
- assert r.status_code == 200
-
- def test_HTTP_200_OK_GET_WITH_MIXED_PARAMS(self, httpbin):
- heads = {'User-agent': 'Mozilla/5.0'}
-
- r = requests.get(httpbin('get') + '?test=true', params={'q': 'test'}, headers=heads)
- assert r.status_code == 200
-
- def test_set_cookie_on_301(self, httpbin):
- s = requests.session()
- url = httpbin('cookies/set?foo=bar')
- s.get(url)
- assert s.cookies['foo'] == 'bar'
-
- def test_cookie_sent_on_redirect(self, httpbin):
- s = requests.session()
- s.get(httpbin('cookies/set?foo=bar'))
- r = s.get(httpbin('redirect/1')) # redirects to httpbin('get')
- assert 'Cookie' in r.json()['headers']
-
- def test_cookie_removed_on_expire(self, httpbin):
- s = requests.session()
- s.get(httpbin('cookies/set?foo=bar'))
- assert s.cookies['foo'] == 'bar'
- s.get(
- httpbin('response-headers'),
- params={
- 'Set-Cookie':
- 'foo=deleted; expires=Thu, 01-Jan-1970 00:00:01 GMT'
- }
- )
- assert 'foo' not in s.cookies
-
- def test_cookie_quote_wrapped(self, httpbin):
- s = requests.session()
- s.get(httpbin('cookies/set?foo="bar:baz"'))
- assert s.cookies['foo'] == '"bar:baz"'
-
- def test_cookie_persists_via_api(self, httpbin):
- s = requests.session()
- r = s.get(httpbin('redirect/1'), cookies={'foo': 'bar'})
- assert 'foo' in r.request.headers['Cookie']
- assert 'foo' in r.history[0].request.headers['Cookie']
-
- def test_request_cookie_overrides_session_cookie(self, httpbin):
- s = requests.session()
- s.cookies['foo'] = 'bar'
- r = s.get(httpbin('cookies'), cookies={'foo': 'baz'})
- assert r.json()['cookies']['foo'] == 'baz'
- # Session cookie should not be modified
- assert s.cookies['foo'] == 'bar'
-
- def test_request_cookies_not_persisted(self, httpbin):
- s = requests.session()
- s.get(httpbin('cookies'), cookies={'foo': 'baz'})
- # Sending a request with cookies should not add cookies to the session
- assert not s.cookies
-
- def test_generic_cookiejar_works(self, httpbin):
- cj = cookielib.CookieJar()
- cookiejar_from_dict({'foo': 'bar'}, cj)
- s = requests.session()
- s.cookies = cj
- r = s.get(httpbin('cookies'))
- # Make sure the cookie was sent
- assert r.json()['cookies']['foo'] == 'bar'
- # Make sure the session cj is still the custom one
- assert s.cookies is cj
-
- def test_param_cookiejar_works(self, httpbin):
- cj = cookielib.CookieJar()
- cookiejar_from_dict({'foo': 'bar'}, cj)
- s = requests.session()
- r = s.get(httpbin('cookies'), cookies=cj)
- # Make sure the cookie was sent
- assert r.json()['cookies']['foo'] == 'bar'
-
- def test_requests_in_history_are_not_overridden(self, httpbin):
- resp = requests.get(httpbin('redirect/3'))
- urls = [r.url for r in resp.history]
- req_urls = [r.request.url for r in resp.history]
- assert urls == req_urls
-
- def test_history_is_always_a_list(self, httpbin):
- """Show that even with redirects, Response.history is always a list."""
- resp = requests.get(httpbin('get'))
- assert isinstance(resp.history, list)
- resp = requests.get(httpbin('redirect/1'))
- assert isinstance(resp.history, list)
- assert not isinstance(resp.history, tuple)
-
- def test_headers_on_session_with_None_are_not_sent(self, httpbin):
- """Do not send headers in Session.headers with None values."""
- ses = requests.Session()
- ses.headers['Accept-Encoding'] = None
- req = requests.Request('GET', httpbin('get'))
- prep = ses.prepare_request(req)
- assert 'Accept-Encoding' not in prep.headers
-
- def test_headers_preserve_order(self, httpbin):
- """Preserve order when headers provided as OrderedDict."""
- ses = requests.Session()
- ses.headers = OrderedDict()
- ses.headers['Accept-Encoding'] = 'identity'
- ses.headers['First'] = '1'
- ses.headers['Second'] = '2'
- headers = OrderedDict([('Third', '3'), ('Fourth', '4')])
- headers['Fifth'] = '5'
- headers['Second'] = '222'
- req = requests.Request('GET', httpbin('get'), headers=headers)
- prep = ses.prepare_request(req)
- items = list(prep.headers.items())
- assert items[0] == ('Accept-Encoding', 'identity')
- assert items[1] == ('First', '1')
- assert items[2] == ('Second', '222')
- assert items[3] == ('Third', '3')
- assert items[4] == ('Fourth', '4')
- assert items[5] == ('Fifth', '5')
-
- @pytest.mark.parametrize('key', ('User-agent', 'user-agent'))
- def test_user_agent_transfers(self, httpbin, key):
-
- heads = {key: 'Mozilla/5.0 (github.com/kennethreitz/requests)'}
-
- r = requests.get(httpbin('user-agent'), headers=heads)
- assert heads[key] in r.text
-
- def test_HTTP_200_OK_HEAD(self, httpbin):
- r = requests.head(httpbin('get'))
- assert r.status_code == 200
-
- def test_HTTP_200_OK_PUT(self, httpbin):
- r = requests.put(httpbin('put'))
- assert r.status_code == 200
-
- def test_BASICAUTH_TUPLE_HTTP_200_OK_GET(self, httpbin):
- auth = ('user', 'pass')
- url = httpbin('basic-auth', 'user', 'pass')
-
- r = requests.get(url, auth=auth)
- assert r.status_code == 200
-
- r = requests.get(url)
- assert r.status_code == 401
-
- s = requests.session()
- s.auth = auth
- r = s.get(url)
- assert r.status_code == 200
-
- @pytest.mark.parametrize(
- 'url, exception', (
- # Connecting to an unknown domain should raise a ConnectionError
- ('http://doesnotexist.google.com', ConnectionError),
- # Connecting to an invalid port should raise a ConnectionError
- ('http://localhost:1', ConnectionError),
- # Inputing a URL that cannot be parsed should raise an InvalidURL error
- ('http://fe80::5054:ff:fe5a:fc0', InvalidURL)
- ))
- def test_errors(self, url, exception):
- with pytest.raises(exception):
- requests.get(url, timeout=1)
-
- def test_proxy_error(self):
- # any proxy related error (address resolution, no route to host, etc) should result in a ProxyError
- with pytest.raises(ProxyError):
- requests.get('http://localhost:1', proxies={'http': 'non-resolvable-address'})
-
- def test_basicauth_with_netrc(self, httpbin):
- auth = ('user', 'pass')
- wrong_auth = ('wronguser', 'wrongpass')
- url = httpbin('basic-auth', 'user', 'pass')
-
- old_auth = requests.sessions.get_netrc_auth
-
- try:
- def get_netrc_auth_mock(url):
- return auth
- requests.sessions.get_netrc_auth = get_netrc_auth_mock
-
- # Should use netrc and work.
- r = requests.get(url)
- assert r.status_code == 200
-
- # Given auth should override and fail.
- r = requests.get(url, auth=wrong_auth)
- assert r.status_code == 401
-
- s = requests.session()
-
- # Should use netrc and work.
- r = s.get(url)
- assert r.status_code == 200
-
- # Given auth should override and fail.
- s.auth = wrong_auth
- r = s.get(url)
- assert r.status_code == 401
- finally:
- requests.sessions.get_netrc_auth = old_auth
-
- def test_DIGEST_HTTP_200_OK_GET(self, httpbin):
-
- auth = HTTPDigestAuth('user', 'pass')
- url = httpbin('digest-auth', 'auth', 'user', 'pass')
-
- r = requests.get(url, auth=auth)
- assert r.status_code == 200
-
- r = requests.get(url)
- assert r.status_code == 401
-
- s = requests.session()
- s.auth = HTTPDigestAuth('user', 'pass')
- r = s.get(url)
- assert r.status_code == 200
-
- def test_DIGEST_AUTH_RETURNS_COOKIE(self, httpbin):
- url = httpbin('digest-auth', 'auth', 'user', 'pass')
- auth = HTTPDigestAuth('user', 'pass')
- r = requests.get(url)
- assert r.cookies['fake'] == 'fake_value'
-
- r = requests.get(url, auth=auth)
- assert r.status_code == 200
-
- def test_DIGEST_AUTH_SETS_SESSION_COOKIES(self, httpbin):
- url = httpbin('digest-auth', 'auth', 'user', 'pass')
- auth = HTTPDigestAuth('user', 'pass')
- s = requests.Session()
- s.get(url, auth=auth)
- assert s.cookies['fake'] == 'fake_value'
-
- def test_DIGEST_STREAM(self, httpbin):
-
- auth = HTTPDigestAuth('user', 'pass')
- url = httpbin('digest-auth', 'auth', 'user', 'pass')
-
- r = requests.get(url, auth=auth, stream=True)
- assert r.raw.read() != b''
-
- r = requests.get(url, auth=auth, stream=False)
- assert r.raw.read() == b''
-
- def test_DIGESTAUTH_WRONG_HTTP_401_GET(self, httpbin):
-
- auth = HTTPDigestAuth('user', 'wrongpass')
- url = httpbin('digest-auth', 'auth', 'user', 'pass')
-
- r = requests.get(url, auth=auth)
- assert r.status_code == 401
-
- r = requests.get(url)
- assert r.status_code == 401
-
- s = requests.session()
- s.auth = auth
- r = s.get(url)
- assert r.status_code == 401
-
- def test_DIGESTAUTH_QUOTES_QOP_VALUE(self, httpbin):
-
- auth = HTTPDigestAuth('user', 'pass')
- url = httpbin('digest-auth', 'auth', 'user', 'pass')
-
- r = requests.get(url, auth=auth)
- assert '"auth"' in r.request.headers['Authorization']
-
- def test_POSTBIN_GET_POST_FILES(self, httpbin):
-
- url = httpbin('post')
- requests.post(url).raise_for_status()
-
- post1 = requests.post(url, data={'some': 'data'})
- assert post1.status_code == 200
-
- with open('requirements.txt') as f:
- post2 = requests.post(url, files={'some': f})
- assert post2.status_code == 200
-
- post4 = requests.post(url, data='[{"some": "json"}]')
- assert post4.status_code == 200
-
- with pytest.raises(ValueError):
- requests.post(url, files=['bad file data'])
-
- def test_POSTBIN_SEEKED_OBJECT_WITH_NO_ITER(self, httpbin):
-
- class TestStream(object):
- def __init__(self, data):
- self.data = data.encode()
- self.length = len(self.data)
- self.index = 0
-
- def __len__(self):
- return self.length
-
- def read(self, size=None):
- if size:
- ret = self.data[self.index:self.index + size]
- self.index += size
- else:
- ret = self.data[self.index:]
- self.index = self.length
- return ret
-
- def tell(self):
- return self.index
-
- def seek(self, offset, where=0):
- if where == 0:
- self.index = offset
- elif where == 1:
- self.index += offset
- elif where == 2:
- self.index = self.length + offset
-
- test = TestStream('test')
- post1 = requests.post(httpbin('post'), data=test)
- assert post1.status_code == 200
- assert post1.json()['data'] == 'test'
-
- test = TestStream('test')
- test.seek(2)
- post2 = requests.post(httpbin('post'), data=test)
- assert post2.status_code == 200
- assert post2.json()['data'] == 'st'
-
- def test_POSTBIN_GET_POST_FILES_WITH_DATA(self, httpbin):
-
- url = httpbin('post')
- requests.post(url).raise_for_status()
-
- post1 = requests.post(url, data={'some': 'data'})
- assert post1.status_code == 200
-
- with open('requirements.txt') as f:
- post2 = requests.post(url,
- data={'some': 'data'}, files={'some': f})
- assert post2.status_code == 200
-
- post4 = requests.post(url, data='[{"some": "json"}]')
- assert post4.status_code == 200
-
- with pytest.raises(ValueError):
- requests.post(url, files=['bad file data'])
-
- def test_conflicting_post_params(self, httpbin):
- url = httpbin('post')
- with open('requirements.txt') as f:
- pytest.raises(ValueError, "requests.post(url, data='[{\"some\": \"data\"}]', files={'some': f})")
- pytest.raises(ValueError, "requests.post(url, data=u('[{\"some\": \"data\"}]'), files={'some': f})")
-
- def test_request_ok_set(self, httpbin):
- r = requests.get(httpbin('status', '404'))
- assert not r.ok
-
- def test_status_raising(self, httpbin):
- r = requests.get(httpbin('status', '404'))
- with pytest.raises(requests.exceptions.HTTPError):
- r.raise_for_status()
-
- r = requests.get(httpbin('status', '500'))
- assert not r.ok
-
- def test_decompress_gzip(self, httpbin):
- r = requests.get(httpbin('gzip'))
- r.content.decode('ascii')
-
- @pytest.mark.parametrize(
- 'url, params', (
- ('/get', {'foo': 'føø'}),
- ('/get', {'føø': 'føø'}),
- ('/get', {'føø': 'føø'}),
- ('/get', {'foo': 'foo'}),
- ('ø', {'foo': 'foo'}),
- ))
- def test_unicode_get(self, httpbin, url, params):
- requests.get(httpbin(url), params=params)
-
- def test_unicode_header_name(self, httpbin):
- requests.put(
- httpbin('put'),
- headers={str('Content-Type'): 'application/octet-stream'},
- data='\xff') # compat.str is unicode.
-
- def test_pyopenssl_redirect(self, httpbin_secure, httpbin_ca_bundle):
- requests.get(httpbin_secure('status', '301'), verify=httpbin_ca_bundle)
-
- def test_urlencoded_get_query_multivalued_param(self, httpbin):
-
- r = requests.get(httpbin('get'), params=dict(test=['foo', 'baz']))
- assert r.status_code == 200
- assert r.url == httpbin('get?test=foo&test=baz')
-
- def test_different_encodings_dont_break_post(self, httpbin):
- r = requests.post(httpbin('post'),
- data={'stuff': json.dumps({'a': 123})},
- params={'blah': 'asdf1234'},
- files={'file': ('test_requests.py', open(__file__, 'rb'))})
- assert r.status_code == 200
-
- @pytest.mark.parametrize(
- 'data', (
- {'stuff': u('ëlïxr')},
- {'stuff': u('ëlïxr').encode('utf-8')},
- {'stuff': 'elixr'},
- {'stuff': 'elixr'.encode('utf-8')},
- ))
- def test_unicode_multipart_post(self, httpbin, data):
- r = requests.post(httpbin('post'),
- data=data,
- files={'file': ('test_requests.py', open(__file__, 'rb'))})
- assert r.status_code == 200
-
- def test_unicode_multipart_post_fieldnames(self, httpbin):
- filename = os.path.splitext(__file__)[0] + '.py'
- r = requests.Request(
- method='POST', url=httpbin('post'),
- data={'stuff'.encode('utf-8'): 'elixr'},
- files={'file': ('test_requests.py', open(filename, 'rb'))})
- prep = r.prepare()
- assert b'name="stuff"' in prep.body
- assert b'name="b\'stuff\'"' not in prep.body
-
- def test_unicode_method_name(self, httpbin):
- files = {'file': open(__file__, 'rb')}
- r = requests.request(
- method=u('POST'), url=httpbin('post'), files=files)
- assert r.status_code == 200
-
- def test_unicode_method_name_with_request_object(self, httpbin):
- files = {'file': open(__file__, 'rb')}
- s = requests.Session()
- req = requests.Request(u('POST'), httpbin('post'), files=files)
- prep = s.prepare_request(req)
- assert isinstance(prep.method, builtin_str)
- assert prep.method == 'POST'
-
- resp = s.send(prep)
- assert resp.status_code == 200
-
- def test_non_prepared_request_error(self):
- s = requests.Session()
- req = requests.Request(u('POST'), '/')
-
- with pytest.raises(ValueError) as e:
- s.send(req)
- assert str(e.value) == 'You can only send PreparedRequests.'
-
- def test_custom_content_type(self, httpbin):
- r = requests.post(
- httpbin('post'),
- data={'stuff': json.dumps({'a': 123})},
- files={
- 'file1': ('test_requests.py', open(__file__, 'rb')),
- 'file2': ('test_requests', open(__file__, 'rb'),
- 'text/py-content-type')})
- assert r.status_code == 200
- assert b"text/py-content-type" in r.request.body
-
- def test_hook_receives_request_arguments(self, httpbin):
- def hook(resp, **kwargs):
- assert resp is not None
- assert kwargs != {}
-
- s = requests.Session()
- r = requests.Request('GET', httpbin(), hooks={'response': hook})
- prep = s.prepare_request(r)
- s.send(prep)
-
- def test_session_hooks_are_used_with_no_request_hooks(self, httpbin):
- hook = lambda x, *args, **kwargs: x
- s = requests.Session()
- s.hooks['response'].append(hook)
- r = requests.Request('GET', httpbin())
- prep = s.prepare_request(r)
- assert prep.hooks['response'] != []
- assert prep.hooks['response'] == [hook]
-
- def test_session_hooks_are_overridden_by_request_hooks(self, httpbin):
- hook1 = lambda x, *args, **kwargs: x
- hook2 = lambda x, *args, **kwargs: x
- assert hook1 is not hook2
- s = requests.Session()
- s.hooks['response'].append(hook2)
- r = requests.Request('GET', httpbin(), hooks={'response': [hook1]})
- prep = s.prepare_request(r)
- assert prep.hooks['response'] == [hook1]
-
- def test_prepared_request_hook(self, httpbin):
- def hook(resp, **kwargs):
- resp.hook_working = True
- return resp
-
- req = requests.Request('GET', httpbin(), hooks={'response': hook})
- prep = req.prepare()
-
- s = requests.Session()
- s.proxies = getproxies()
- resp = s.send(prep)
-
- assert hasattr(resp, 'hook_working')
-
- def test_prepared_from_session(self, httpbin):
- class DummyAuth(requests.auth.AuthBase):
- def __call__(self, r):
- r.headers['Dummy-Auth-Test'] = 'dummy-auth-test-ok'
- return r
-
- req = requests.Request('GET', httpbin('headers'))
- assert not req.auth
-
- s = requests.Session()
- s.auth = DummyAuth()
-
- prep = s.prepare_request(req)
- resp = s.send(prep)
-
- assert resp.json()['headers'][
- 'Dummy-Auth-Test'] == 'dummy-auth-test-ok'
-
- def test_prepare_request_with_bytestring_url(self):
- req = requests.Request('GET', b'https://httpbin.org/')
- s = requests.Session()
- prep = s.prepare_request(req)
- assert prep.url == "https://httpbin.org/"
-
- def test_links(self):
- r = requests.Response()
- r.headers = {
- 'cache-control': 'public, max-age=60, s-maxage=60',
- 'connection': 'keep-alive',
- 'content-encoding': 'gzip',
- 'content-type': 'application/json; charset=utf-8',
- 'date': 'Sat, 26 Jan 2013 16:47:56 GMT',
- 'etag': '"6ff6a73c0e446c1f61614769e3ceb778"',
- 'last-modified': 'Sat, 26 Jan 2013 16:22:39 GMT',
- 'link': ('<https://api.github.com/users/kennethreitz/repos?'
- 'page=2&per_page=10>; rel="next", <https://api.github.'
- 'com/users/kennethreitz/repos?page=7&per_page=10>; '
- ' rel="last"'),
- 'server': 'GitHub.com',
- 'status': '200 OK',
- 'vary': 'Accept',
- 'x-content-type-options': 'nosniff',
- 'x-github-media-type': 'github.beta',
- 'x-ratelimit-limit': '60',
- 'x-ratelimit-remaining': '57'
- }
- assert r.links['next']['rel'] == 'next'
-
- def test_cookie_parameters(self):
- key = 'some_cookie'
- value = 'some_value'
- secure = True
- domain = 'test.com'
- rest = {'HttpOnly': True}
-
- jar = requests.cookies.RequestsCookieJar()
- jar.set(key, value, secure=secure, domain=domain, rest=rest)
-
- assert len(jar) == 1
- assert 'some_cookie' in jar
-
- cookie = list(jar)[0]
- assert cookie.secure == secure
- assert cookie.domain == domain
- assert cookie._rest['HttpOnly'] == rest['HttpOnly']
-
- def test_cookie_as_dict_keeps_len(self):
- key = 'some_cookie'
- value = 'some_value'
-
- key1 = 'some_cookie1'
- value1 = 'some_value1'
-
- jar = requests.cookies.RequestsCookieJar()
- jar.set(key, value)
- jar.set(key1, value1)
-
- d1 = dict(jar)
- d2 = dict(jar.iteritems())
- d3 = dict(jar.items())
-
- assert len(jar) == 2
- assert len(d1) == 2
- assert len(d2) == 2
- assert len(d3) == 2
-
- def test_cookie_as_dict_keeps_items(self):
- key = 'some_cookie'
- value = 'some_value'
-
- key1 = 'some_cookie1'
- value1 = 'some_value1'
-
- jar = requests.cookies.RequestsCookieJar()
- jar.set(key, value)
- jar.set(key1, value1)
-
- d1 = dict(jar)
- d2 = dict(jar.iteritems())
- d3 = dict(jar.items())
-
- assert d1['some_cookie'] == 'some_value'
- assert d2['some_cookie'] == 'some_value'
- assert d3['some_cookie1'] == 'some_value1'
-
- def test_cookie_as_dict_keys(self):
- key = 'some_cookie'
- value = 'some_value'
-
- key1 = 'some_cookie1'
- value1 = 'some_value1'
-
- jar = requests.cookies.RequestsCookieJar()
- jar.set(key, value)
- jar.set(key1, value1)
-
- keys = jar.keys()
- assert keys == list(keys)
- # make sure one can use keys multiple times
- assert list(keys) == list(keys)
-
- def test_cookie_as_dict_values(self):
- key = 'some_cookie'
- value = 'some_value'
-
- key1 = 'some_cookie1'
- value1 = 'some_value1'
-
- jar = requests.cookies.RequestsCookieJar()
- jar.set(key, value)
- jar.set(key1, value1)
-
- values = jar.values()
- assert values == list(values)
- # make sure one can use values multiple times
- assert list(values) == list(values)
-
- def test_cookie_as_dict_items(self):
- key = 'some_cookie'
- value = 'some_value'
-
- key1 = 'some_cookie1'
- value1 = 'some_value1'
-
- jar = requests.cookies.RequestsCookieJar()
- jar.set(key, value)
- jar.set(key1, value1)
-
- items = jar.items()
- assert items == list(items)
- # make sure one can use items multiple times
- assert list(items) == list(items)
-
- def test_cookie_duplicate_names_different_domains(self):
- key = 'some_cookie'
- value = 'some_value'
- domain1 = 'test1.com'
- domain2 = 'test2.com'
-
- jar = requests.cookies.RequestsCookieJar()
- jar.set(key, value, domain=domain1)
- jar.set(key, value, domain=domain2)
- assert key in jar
- items = jar.items()
- assert len(items) == 2
-
- # Verify that CookieConflictError is raised if domain is not specified
- with pytest.raises(requests.cookies.CookieConflictError):
- jar.get(key)
-
- # Verify that CookieConflictError is not raised if domain is specified
- cookie = jar.get(key, domain=domain1)
- assert cookie == value
-
- def test_cookie_duplicate_names_raises_cookie_conflict_error(self):
- key = 'some_cookie'
- value = 'some_value'
- path = 'some_path'
-
- jar = requests.cookies.RequestsCookieJar()
- jar.set(key, value, path=path)
- jar.set(key, value)
- with pytest.raises(requests.cookies.CookieConflictError):
- jar.get(key)
-
- def test_time_elapsed_blank(self, httpbin):
- r = requests.get(httpbin('get'))
- td = r.elapsed
- total_seconds = ((td.microseconds + (td.seconds + td.days * 24 * 3600)
- * 10**6) / 10**6)
- assert total_seconds > 0.0
-
- def test_response_is_iterable(self):
- r = requests.Response()
- io = StringIO.StringIO('abc')
- read_ = io.read
-
- def read_mock(amt, decode_content=None):
- return read_(amt)
- setattr(io, 'read', read_mock)
- r.raw = io
- assert next(iter(r))
- io.close()
-
- def test_response_decode_unicode(self):
- """
- When called with decode_unicode, Response.iter_content should always
- return unicode.
- """
- r = requests.Response()
- r._content_consumed = True
- r._content = b'the content'
- r.encoding = 'ascii'
-
- chunks = r.iter_content(decode_unicode=True)
- assert all(isinstance(chunk, str) for chunk in chunks)
-
- # also for streaming
- r = requests.Response()
- r.raw = io.BytesIO(b'the content')
- r.encoding = 'ascii'
- chunks = r.iter_content(decode_unicode=True)
- assert all(isinstance(chunk, str) for chunk in chunks)
-
- def test_request_and_response_are_pickleable(self, httpbin):
- r = requests.get(httpbin('get'))
-
- # verify we can pickle the original request
- assert pickle.loads(pickle.dumps(r.request))
-
- # verify we can pickle the response and that we have access to
- # the original request.
- pr = pickle.loads(pickle.dumps(r))
- assert r.request.url == pr.request.url
- assert r.request.headers == pr.request.headers
-
- def test_cannot_send_unprepared_requests(self, httpbin):
- r = requests.Request(url=httpbin())
- with pytest.raises(ValueError):
- requests.Session().send(r)
-
- def test_http_error(self):
- error = requests.exceptions.HTTPError()
- assert not error.response
- response = requests.Response()
- error = requests.exceptions.HTTPError(response=response)
- assert error.response == response
- error = requests.exceptions.HTTPError('message', response=response)
- assert str(error) == 'message'
- assert error.response == response
-
- def test_session_pickling(self, httpbin):
- r = requests.Request('GET', httpbin('get'))
- s = requests.Session()
-
- s = pickle.loads(pickle.dumps(s))
- s.proxies = getproxies()
-
- r = s.send(r.prepare())
- assert r.status_code == 200
-
- def test_fixes_1329(self, httpbin):
- """
- Ensure that header updates are done case-insensitively.
- """
- s = requests.Session()
- s.headers.update({'ACCEPT': 'BOGUS'})
- s.headers.update({'accept': 'application/json'})
- r = s.get(httpbin('get'))
- headers = r.request.headers
- assert headers['accept'] == 'application/json'
- assert headers['Accept'] == 'application/json'
- assert headers['ACCEPT'] == 'application/json'
-
- def test_uppercase_scheme_redirect(self, httpbin):
- parts = urlparse(httpbin('html'))
- url = "HTTP://" + parts.netloc + parts.path
- r = requests.get(httpbin('redirect-to'), params={'url': url})
- assert r.status_code == 200
- assert r.url.lower() == url.lower()
-
- def test_transport_adapter_ordering(self):
- s = requests.Session()
- order = ['https://', 'http://']
- assert order == list(s.adapters)
- s.mount('http://git', HTTPAdapter())
- s.mount('http://github', HTTPAdapter())
- s.mount('http://github.com', HTTPAdapter())
- s.mount('http://github.com/about/', HTTPAdapter())
- order = [
- 'http://github.com/about/',
- 'http://github.com',
- 'http://github',
- 'http://git',
- 'https://',
- 'http://',
- ]
- assert order == list(s.adapters)
- s.mount('http://gittip', HTTPAdapter())
- s.mount('http://gittip.com', HTTPAdapter())
- s.mount('http://gittip.com/about/', HTTPAdapter())
- order = [
- 'http://github.com/about/',
- 'http://gittip.com/about/',
- 'http://github.com',
- 'http://gittip.com',
- 'http://github',
- 'http://gittip',
- 'http://git',
- 'https://',
- 'http://',
- ]
- assert order == list(s.adapters)
- s2 = requests.Session()
- s2.adapters = {'http://': HTTPAdapter()}
- s2.mount('https://', HTTPAdapter())
- assert 'http://' in s2.adapters
- assert 'https://' in s2.adapters
-
- def test_header_remove_is_case_insensitive(self, httpbin):
- # From issue #1321
- s = requests.Session()
- s.headers['foo'] = 'bar'
- r = s.get(httpbin('get'), headers={'FOO': None})
- assert 'foo' not in r.request.headers
-
- def test_params_are_merged_case_sensitive(self, httpbin):
- s = requests.Session()
- s.params['foo'] = 'bar'
- r = s.get(httpbin('get'), params={'FOO': 'bar'})
- assert r.json()['args'] == {'foo': 'bar', 'FOO': 'bar'}
-
- def test_long_authinfo_in_url(self):
- url = 'http://{0}:{1}@{2}:9000/path?query#frag'.format(
- 'E8A3BE87-9E3F-4620-8858-95478E385B5B',
- 'EA770032-DA4D-4D84-8CE9-29C6D910BF1E',
- 'exactly-------------sixty-----------three------------characters',
- )
- r = requests.Request('GET', url).prepare()
- assert r.url == url
-
- def test_header_keys_are_native(self, httpbin):
- headers = {u('unicode'): 'blah', 'byte'.encode('ascii'): 'blah'}
- r = requests.Request('GET', httpbin('get'), headers=headers)
- p = r.prepare()
-
- # This is testing that they are builtin strings. A bit weird, but there
- # we go.
- assert 'unicode' in p.headers.keys()
- assert 'byte' in p.headers.keys()
-
- @pytest.mark.parametrize('files', ('foo', b'foo', bytearray(b'foo')))
- def test_can_send_objects_with_files(self, httpbin, files):
- data = {'a': 'this is a string'}
- files = {'b': files}
- r = requests.Request('POST', httpbin('post'), data=data, files=files)
- p = r.prepare()
- assert 'multipart/form-data' in p.headers['Content-Type']
-
- def test_can_send_file_object_with_non_string_filename(self, httpbin):
- f = io.BytesIO()
- f.name = 2
- r = requests.Request('POST', httpbin('post'), files={'f': f})
- p = r.prepare()
-
- assert 'multipart/form-data' in p.headers['Content-Type']
-
- def test_autoset_header_values_are_native(self, httpbin):
- data = 'this is a string'
- length = '16'
- req = requests.Request('POST', httpbin('post'), data=data)
- p = req.prepare()
-
- assert p.headers['Content-Length'] == length
-
- def test_nonhttp_schemes_dont_check_URLs(self):
- test_urls = (
- 'data:image/gif;base64,R0lGODlhAQABAHAAACH5BAUAAAAALAAAAAABAAEAAAICRAEAOw==',
- 'file:///etc/passwd',
- 'magnet:?xt=urn:btih:be08f00302bc2d1d3cfa3af02024fa647a271431',
- )
- for test_url in test_urls:
- req = requests.Request('GET', test_url)
- preq = req.prepare()
- assert test_url == preq.url
-
- def test_auth_is_stripped_on_redirect_off_host(self, httpbin):
- r = requests.get(
- httpbin('redirect-to'),
- params={'url': 'http://www.google.co.uk'},
- auth=('user', 'pass'),
- )
- assert r.history[0].request.headers['Authorization']
- assert not r.request.headers.get('Authorization', '')
-
- def test_auth_is_retained_for_redirect_on_host(self, httpbin):
- r = requests.get(httpbin('redirect/1'), auth=('user', 'pass'))
- h1 = r.history[0].request.headers['Authorization']
- h2 = r.request.headers['Authorization']
-
- assert h1 == h2
-
- def test_manual_redirect_with_partial_body_read(self, httpbin):
- s = requests.Session()
- r1 = s.get(httpbin('redirect/2'), allow_redirects=False, stream=True)
- assert r1.is_redirect
- rg = s.resolve_redirects(r1, r1.request, stream=True)
-
- # read only the first eight bytes of the response body,
- # then follow the redirect
- r1.iter_content(8)
- r2 = next(rg)
- assert r2.is_redirect
-
- # read all of the response via iter_content,
- # then follow the redirect
- for _ in r2.iter_content():
- pass
- r3 = next(rg)
- assert not r3.is_redirect
-
- def _patch_adapter_gzipped_redirect(self, session, url):
- adapter = session.get_adapter(url=url)
- org_build_response = adapter.build_response
- self._patched_response = False
-
- def build_response(*args, **kwargs):
- resp = org_build_response(*args, **kwargs)
- if not self._patched_response:
- resp.raw.headers['content-encoding'] = 'gzip'
- self._patched_response = True
- return resp
-
- adapter.build_response = build_response
-
- def test_redirect_with_wrong_gzipped_header(self, httpbin):
- s = requests.Session()
- url = httpbin('redirect/1')
- self._patch_adapter_gzipped_redirect(s, url)
- s.get(url)
-
- def test_basic_auth_str_is_always_native(self):
- s = _basic_auth_str("test", "test")
- assert isinstance(s, builtin_str)
- assert s == "Basic dGVzdDp0ZXN0"
-
- def test_requests_history_is_saved(self, httpbin):
- r = requests.get(httpbin('redirect/5'))
- total = r.history[-1].history
- i = 0
- for item in r.history:
- assert item.history == total[0:i]
- i += 1
-
- def test_json_param_post_content_type_works(self, httpbin):
- r = requests.post(
- httpbin('post'),
- json={'life': 42}
- )
- assert r.status_code == 200
- assert 'application/json' in r.request.headers['Content-Type']
- assert {'life': 42} == r.json()['json']
-
- def test_json_param_post_should_not_override_data_param(self, httpbin):
- r = requests.Request(method='POST', url=httpbin('post'),
- data={'stuff': 'elixr'},
- json={'music': 'flute'})
- prep = r.prepare()
- assert 'stuff=elixr' == prep.body
-
- def test_response_iter_lines(self, httpbin):
- r = requests.get(httpbin('stream/4'), stream=True)
- assert r.status_code == 200
-
- it = r.iter_lines()
- next(it)
- assert len(list(it)) == 3
-
- def test_unconsumed_session_response_closes_connection(self, httpbin):
- s = requests.session()
-
- with contextlib.closing(s.get(httpbin('stream/4'), stream=True)) as response:
- pass
-
- assert response._content_consumed is False
- assert response.raw.closed
-
- @pytest.mark.xfail
- def test_response_iter_lines_reentrant(self, httpbin):
- """Response.iter_lines() is not reentrant safe"""
- r = requests.get(httpbin('stream/4'), stream=True)
- assert r.status_code == 200
-
- next(r.iter_lines())
- assert len(list(r.iter_lines())) == 3
-
- def test_session_close_proxy_clear(self, mocker):
- proxies = {
- 'one': mocker.Mock(),
- 'two': mocker.Mock(),
- }
- session = requests.Session()
- mocker.patch.dict(session.adapters['http://'].proxy_manager, proxies)
- session.close()
- proxies['one'].clear.assert_called_once_with()
- proxies['two'].clear.assert_called_once_with()
-
-
-class TestCaseInsensitiveDict:
-
- @pytest.mark.parametrize(
- 'cid', (
- CaseInsensitiveDict({'Foo': 'foo', 'BAr': 'bar'}),
- CaseInsensitiveDict([('Foo', 'foo'), ('BAr', 'bar')]),
- CaseInsensitiveDict(FOO='foo', BAr='bar'),
- ))
- def test_init(self, cid):
- assert len(cid) == 2
- assert 'foo' in cid
- assert 'bar' in cid
-
- def test_docstring_example(self):
- cid = CaseInsensitiveDict()
- cid['Accept'] = 'application/json'
- assert cid['aCCEPT'] == 'application/json'
- assert list(cid) == ['Accept']
-
- def test_len(self):
- cid = CaseInsensitiveDict({'a': 'a', 'b': 'b'})
- cid['A'] = 'a'
- assert len(cid) == 2
-
- def test_getitem(self):
- cid = CaseInsensitiveDict({'Spam': 'blueval'})
- assert cid['spam'] == 'blueval'
- assert cid['SPAM'] == 'blueval'
-
- def test_fixes_649(self):
- """__setitem__ should behave case-insensitively."""
- cid = CaseInsensitiveDict()
- cid['spam'] = 'oneval'
- cid['Spam'] = 'twoval'
- cid['sPAM'] = 'redval'
- cid['SPAM'] = 'blueval'
- assert cid['spam'] == 'blueval'
- assert cid['SPAM'] == 'blueval'
- assert list(cid.keys()) == ['SPAM']
-
- def test_delitem(self):
- cid = CaseInsensitiveDict()
- cid['Spam'] = 'someval'
- del cid['sPam']
- assert 'spam' not in cid
- assert len(cid) == 0
-
- def test_contains(self):
- cid = CaseInsensitiveDict()
- cid['Spam'] = 'someval'
- assert 'Spam' in cid
- assert 'spam' in cid
- assert 'SPAM' in cid
- assert 'sPam' in cid
- assert 'notspam' not in cid
-
- def test_get(self):
- cid = CaseInsensitiveDict()
- cid['spam'] = 'oneval'
- cid['SPAM'] = 'blueval'
- assert cid.get('spam') == 'blueval'
- assert cid.get('SPAM') == 'blueval'
- assert cid.get('sPam') == 'blueval'
- assert cid.get('notspam', 'default') == 'default'
-
- def test_update(self):
- cid = CaseInsensitiveDict()
- cid['spam'] = 'blueval'
- cid.update({'sPam': 'notblueval'})
- assert cid['spam'] == 'notblueval'
- cid = CaseInsensitiveDict({'Foo': 'foo', 'BAr': 'bar'})
- cid.update({'fOO': 'anotherfoo', 'bAR': 'anotherbar'})
- assert len(cid) == 2
- assert cid['foo'] == 'anotherfoo'
- assert cid['bar'] == 'anotherbar'
-
- def test_update_retains_unchanged(self):
- cid = CaseInsensitiveDict({'foo': 'foo', 'bar': 'bar'})
- cid.update({'foo': 'newfoo'})
- assert cid['bar'] == 'bar'
-
- def test_iter(self):
- cid = CaseInsensitiveDict({'Spam': 'spam', 'Eggs': 'eggs'})
- keys = frozenset(['Spam', 'Eggs'])
- assert frozenset(iter(cid)) == keys
-
- def test_equality(self):
- cid = CaseInsensitiveDict({'SPAM': 'blueval', 'Eggs': 'redval'})
- othercid = CaseInsensitiveDict({'spam': 'blueval', 'eggs': 'redval'})
- assert cid == othercid
- del othercid['spam']
- assert cid != othercid
- assert cid == {'spam': 'blueval', 'eggs': 'redval'}
- assert cid != object()
-
- def test_setdefault(self):
- cid = CaseInsensitiveDict({'Spam': 'blueval'})
- assert cid.setdefault('spam', 'notblueval') == 'blueval'
- assert cid.setdefault('notspam', 'notblueval') == 'notblueval'
-
- def test_lower_items(self):
- cid = CaseInsensitiveDict({
- 'Accept': 'application/json',
- 'user-Agent': 'requests',
- })
- keyset = frozenset(lowerkey for lowerkey, v in cid.lower_items())
- lowerkeyset = frozenset(['accept', 'user-agent'])
- assert keyset == lowerkeyset
-
- def test_preserve_key_case(self):
- cid = CaseInsensitiveDict({
- 'Accept': 'application/json',
- 'user-Agent': 'requests',
- })
- keyset = frozenset(['Accept', 'user-Agent'])
- assert frozenset(i[0] for i in cid.items()) == keyset
- assert frozenset(cid.keys()) == keyset
- assert frozenset(cid) == keyset
-
- def test_preserve_last_key_case(self):
- cid = CaseInsensitiveDict({
- 'Accept': 'application/json',
- 'user-Agent': 'requests',
- })
- cid.update({'ACCEPT': 'application/json'})
- cid['USER-AGENT'] = 'requests'
- keyset = frozenset(['ACCEPT', 'USER-AGENT'])
- assert frozenset(i[0] for i in cid.items()) == keyset
- assert frozenset(cid.keys()) == keyset
- assert frozenset(cid) == keyset
-
- def test_copy(self):
- cid = CaseInsensitiveDict({
- 'Accept': 'application/json',
- 'user-Agent': 'requests',
- })
- cid_copy = cid.copy()
- assert cid == cid_copy
- cid['changed'] = True
- assert cid != cid_copy
-
-
-class TestMorselToCookieExpires:
- """Tests for morsel_to_cookie when morsel contains expires."""
-
- def test_expires_valid_str(self):
- """Test case where we convert expires from string time."""
-
- morsel = Morsel()
- morsel['expires'] = 'Thu, 01-Jan-1970 00:00:01 GMT'
- cookie = morsel_to_cookie(morsel)
- assert cookie.expires == 1
-
- @pytest.mark.parametrize(
- 'value, exception', (
- (100, TypeError),
- ('woops', ValueError),
- ))
- def test_expires_invalid_int(self, value, exception):
- """Test case where an invalid type is passed for expires."""
- morsel = Morsel()
- morsel['expires'] = value
- with pytest.raises(exception):
- morsel_to_cookie(morsel)
-
- def test_expires_none(self):
- """Test case where expires is None."""
-
- morsel = Morsel()
- morsel['expires'] = None
- cookie = morsel_to_cookie(morsel)
- assert cookie.expires is None
-
-
-class TestMorselToCookieMaxAge:
-
- """Tests for morsel_to_cookie when morsel contains max-age."""
-
- def test_max_age_valid_int(self):
- """Test case where a valid max age in seconds is passed."""
-
- morsel = Morsel()
- morsel['max-age'] = 60
- cookie = morsel_to_cookie(morsel)
- assert isinstance(cookie.expires, int)
-
- def test_max_age_invalid_str(self):
- """Test case where a invalid max age is passed."""
-
- morsel = Morsel()
- morsel['max-age'] = 'woops'
- with pytest.raises(TypeError):
- morsel_to_cookie(morsel)
-
-
-class TestTimeout:
-
- def test_stream_timeout(self, httpbin):
- try:
- requests.get(httpbin('delay/10'), timeout=2.0)
- except requests.exceptions.Timeout as e:
- assert 'Read timed out' in e.args[0].args[0]
-
- @pytest.mark.parametrize(
- 'timeout, error_text', (
- ((3, 4, 5), '(connect, read)'),
- ('foo', 'must be an int or float'),
- ))
- def test_invalid_timeout(self, httpbin, timeout, error_text):
- with pytest.raises(ValueError) as e:
- requests.get(httpbin('get'), timeout=timeout)
- assert error_text in str(e)
-
- def test_none_timeout(self, httpbin):
- """ Check that you can set None as a valid timeout value.
-
- To actually test this behavior, we'd want to check that setting the
- timeout to None actually lets the request block past the system default
- timeout. However, this would make the test suite unbearably slow.
- Instead we verify that setting the timeout to None does not prevent the
- request from succeeding.
- """
- r = requests.get(httpbin('get'), timeout=None)
- assert r.status_code == 200
-
- def test_read_timeout(self, httpbin):
- try:
- requests.get(httpbin('delay/10'), timeout=(None, 0.1))
- pytest.fail('The recv() request should time out.')
- except ReadTimeout:
- pass
-
- def test_connect_timeout(self):
- try:
- requests.get(TARPIT, timeout=(0.1, None))
- pytest.fail('The connect() request should time out.')
- except ConnectTimeout as e:
- assert isinstance(e, ConnectionError)
- assert isinstance(e, Timeout)
-
- def test_total_timeout_connect(self):
- try:
- requests.get(TARPIT, timeout=(0.1, 0.1))
- pytest.fail('The connect() request should time out.')
- except ConnectTimeout:
- pass
-
- def test_encoded_methods(self, httpbin):
- """See: https://github.com/kennethreitz/requests/issues/2316"""
- r = requests.request(b'GET', httpbin('get'))
- assert r.ok
-
-
-SendCall = collections.namedtuple('SendCall', ('args', 'kwargs'))
-
-
-class RedirectSession(SessionRedirectMixin):
- def __init__(self, order_of_redirects):
- self.redirects = order_of_redirects
- self.calls = []
- self.max_redirects = 30
- self.cookies = {}
- self.trust_env = False
-
- def send(self, *args, **kwargs):
- self.calls.append(SendCall(args, kwargs))
- return self.build_response()
-
- def build_response(self):
- request = self.calls[-1].args[0]
- r = requests.Response()
-
- try:
- r.status_code = int(self.redirects.pop(0))
- except IndexError:
- r.status_code = 200
-
- r.headers = CaseInsensitiveDict({'Location': '/'})
- r.raw = self._build_raw()
- r.request = request
- return r
-
- def _build_raw(self):
- string = StringIO.StringIO('')
- setattr(string, 'release_conn', lambda *args: args)
- return string
-
-
-def test_requests_are_updated_each_time(httpbin):
- session = RedirectSession([303, 307])
- prep = requests.Request('POST', httpbin('post')).prepare()
- r0 = session.send(prep)
- assert r0.request.method == 'POST'
- assert session.calls[-1] == SendCall((r0.request,), {})
- redirect_generator = session.resolve_redirects(r0, prep)
- default_keyword_args = {
- 'stream': False,
- 'verify': True,
- 'cert': None,
- 'timeout': None,
- 'allow_redirects': False,
- 'proxies': {},
- }
- for response in redirect_generator:
- assert response.request.method == 'GET'
- send_call = SendCall((response.request,), default_keyword_args)
- assert session.calls[-1] == send_call
-
-
-@pytest.mark.parametrize(
- 'data', (
- (('a', 'b'), ('c', 'd')),
- (('c', 'd'), ('a', 'b')),
- (('a', 'b'), ('c', 'd'), ('e', 'f')),
- ))
-def test_data_argument_accepts_tuples(data):
- """Ensure that the data argument will accept tuples of strings
- and properly encode them.
- """
- p = PreparedRequest()
- p.prepare(
- method='GET',
- url='http://www.example.com',
- data=data,
- hooks=default_hooks()
- )
- assert p.body == urlencode(data)
-
-
-@pytest.mark.parametrize(
- 'kwargs', (
- None,
- {
- 'method': 'GET',
- 'url': 'http://www.example.com',
- 'data': 'foo=bar',
- 'hooks': default_hooks()
- },
- {
- 'method': 'GET',
- 'url': 'http://www.example.com',
- 'data': 'foo=bar',
- 'hooks': default_hooks(),
- 'cookies': {'foo': 'bar'}
- },
- {
- 'method': 'GET',
- 'url': u('http://www.example.com/üniçø∂é')
- },
- ))
-def test_prepared_copy(kwargs):
- p = PreparedRequest()
- if kwargs:
- p.prepare(**kwargs)
- copy = p.copy()
- for attr in ('method', 'url', 'headers', '_cookies', 'body', 'hooks'):
- assert getattr(p, attr) == getattr(copy, attr)
-
-
-def test_urllib3_retries(httpbin):
- from requests.packages.urllib3.util import Retry
- s = requests.Session()
- s.mount('http://', HTTPAdapter(max_retries=Retry(
- total=2, status_forcelist=[500]
- )))
-
- with pytest.raises(RetryError):
- s.get(httpbin('status/500'))
-
-
-def test_urllib3_pool_connection_closed(httpbin):
- s = requests.Session()
- s.mount('http://', HTTPAdapter(pool_connections=0, pool_maxsize=0))
-
- try:
- s.get(httpbin('status/200'))
- except ConnectionError as e:
- assert u"Pool is closed." in str(e)
-
-
-def test_vendor_aliases():
- from requests.packages import urllib3
- from requests.packages import chardet
-
- with pytest.raises(ImportError):
- from requests.packages import webbrowser
« no previous file with comments | « recipe_engine/third_party/requests/tests/test_lowlevel.py ('k') | recipe_engine/third_party/requests/tests/test_structures.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698