| Index: third_party/gsutil/third_party/httplib2/python3/httplib2test.py
|
| diff --git a/third_party/gsutil/third_party/httplib2/python3/httplib2test.py b/third_party/gsutil/third_party/httplib2/python3/httplib2test.py
|
| old mode 100755
|
| new mode 100644
|
| index 8884a5a57a25943e02add3ce751be46ac9c1511f..480d28ec62a2d9f6d2dc845781af198f96496d89
|
| --- a/third_party/gsutil/third_party/httplib2/python3/httplib2test.py
|
| +++ b/third_party/gsutil/third_party/httplib2/python3/httplib2test.py
|
| @@ -1,1580 +1,1580 @@
|
| -#!/usr/bin/env python3
|
| -"""
|
| -httplib2test
|
| -
|
| -A set of unit tests for httplib2.py.
|
| -
|
| -Requires Python 3.0 or later
|
| -"""
|
| -
|
| -__author__ = "Joe Gregorio (joe@bitworking.org)"
|
| -__copyright__ = "Copyright 2006, Joe Gregorio"
|
| -__contributors__ = ["Mark Pilgrim"]
|
| -__license__ = "MIT"
|
| -__history__ = """ """
|
| -__version__ = "0.2 ($Rev: 118 $)"
|
| -
|
| -import base64
|
| -import http.client
|
| -import httplib2
|
| -import io
|
| -import os
|
| -import pickle
|
| -import socket
|
| -import ssl
|
| -import sys
|
| -import time
|
| -import unittest
|
| -import urllib.parse
|
| -
|
| -# The test resources base uri
|
| -base = 'http://bitworking.org/projects/httplib2/test/'
|
| -#base = 'http://localhost/projects/httplib2/test/'
|
| -cacheDirName = ".cache"
|
| -
|
| -
|
| -class CredentialsTest(unittest.TestCase):
|
| - def test(self):
|
| - c = httplib2.Credentials()
|
| - c.add("joe", "password")
|
| - self.assertEqual(("joe", "password"), list(c.iter("bitworking.org"))[0])
|
| - self.assertEqual(("joe", "password"), list(c.iter(""))[0])
|
| - c.add("fred", "password2", "wellformedweb.org")
|
| - self.assertEqual(("joe", "password"), list(c.iter("bitworking.org"))[0])
|
| - self.assertEqual(1, len(list(c.iter("bitworking.org"))))
|
| - self.assertEqual(2, len(list(c.iter("wellformedweb.org"))))
|
| - self.assertTrue(("fred", "password2") in list(c.iter("wellformedweb.org")))
|
| - c.clear()
|
| - self.assertEqual(0, len(list(c.iter("bitworking.org"))))
|
| - c.add("fred", "password2", "wellformedweb.org")
|
| - self.assertTrue(("fred", "password2") in list(c.iter("wellformedweb.org")))
|
| - self.assertEqual(0, len(list(c.iter("bitworking.org"))))
|
| - self.assertEqual(0, len(list(c.iter(""))))
|
| -
|
| -
|
| -class ParserTest(unittest.TestCase):
|
| - def testFromStd66(self):
|
| - self.assertEqual( ('http', 'example.com', '', None, None ), httplib2.parse_uri("http://example.com"))
|
| - self.assertEqual( ('https', 'example.com', '', None, None ), httplib2.parse_uri("https://example.com"))
|
| - self.assertEqual( ('https', 'example.com:8080', '', None, None ), httplib2.parse_uri("https://example.com:8080"))
|
| - self.assertEqual( ('http', 'example.com', '/', None, None ), httplib2.parse_uri("http://example.com/"))
|
| - self.assertEqual( ('http', 'example.com', '/path', None, None ), httplib2.parse_uri("http://example.com/path"))
|
| - self.assertEqual( ('http', 'example.com', '/path', 'a=1&b=2', None ), httplib2.parse_uri("http://example.com/path?a=1&b=2"))
|
| - self.assertEqual( ('http', 'example.com', '/path', 'a=1&b=2', 'fred' ), httplib2.parse_uri("http://example.com/path?a=1&b=2#fred"))
|
| - self.assertEqual( ('http', 'example.com', '/path', 'a=1&b=2', 'fred' ), httplib2.parse_uri("http://example.com/path?a=1&b=2#fred"))
|
| -
|
| -
|
| -class UrlNormTest(unittest.TestCase):
|
| - def test(self):
|
| - self.assertEqual( "http://example.org/", httplib2.urlnorm("http://example.org")[-1])
|
| - self.assertEqual( "http://example.org/", httplib2.urlnorm("http://EXAMple.org")[-1])
|
| - self.assertEqual( "http://example.org/?=b", httplib2.urlnorm("http://EXAMple.org?=b")[-1])
|
| - self.assertEqual( "http://example.org/mypath?a=b", httplib2.urlnorm("http://EXAMple.org/mypath?a=b")[-1])
|
| - self.assertEqual( "http://localhost:80/", httplib2.urlnorm("http://localhost:80")[-1])
|
| - self.assertEqual( httplib2.urlnorm("http://localhost:80/"), httplib2.urlnorm("HTTP://LOCALHOST:80"))
|
| - try:
|
| - httplib2.urlnorm("/")
|
| - self.fail("Non-absolute URIs should raise an exception")
|
| - except httplib2.RelativeURIError:
|
| - pass
|
| -
|
| -class UrlSafenameTest(unittest.TestCase):
|
| - def test(self):
|
| - # Test that different URIs end up generating different safe names
|
| - self.assertEqual( "example.org,fred,a=b,58489f63a7a83c3b7794a6a398ee8b1f", httplib2.safename("http://example.org/fred/?a=b"))
|
| - self.assertEqual( "example.org,fred,a=b,8c5946d56fec453071f43329ff0be46b", httplib2.safename("http://example.org/fred?/a=b"))
|
| - self.assertEqual( "www.example.org,fred,a=b,499c44b8d844a011b67ea2c015116968", httplib2.safename("http://www.example.org/fred?/a=b"))
|
| - self.assertEqual( httplib2.safename(httplib2.urlnorm("http://www")[-1]), httplib2.safename(httplib2.urlnorm("http://WWW")[-1]))
|
| - self.assertEqual( "www.example.org,fred,a=b,692e843a333484ce0095b070497ab45d", httplib2.safename("https://www.example.org/fred?/a=b"))
|
| - self.assertNotEqual( httplib2.safename("http://www"), httplib2.safename("https://www"))
|
| - # Test the max length limits
|
| - uri = "http://" + ("w" * 200) + ".org"
|
| - uri2 = "http://" + ("w" * 201) + ".org"
|
| - self.assertNotEqual( httplib2.safename(uri2), httplib2.safename(uri))
|
| - # Max length should be 200 + 1 (",") + 32
|
| - self.assertEqual(233, len(httplib2.safename(uri2)))
|
| - self.assertEqual(233, len(httplib2.safename(uri)))
|
| - # Unicode
|
| - if sys.version_info >= (2,3):
|
| - self.assertEqual( "xn--http,-4y1d.org,fred,a=b,579924c35db315e5a32e3d9963388193", httplib2.safename("http://\u2304.org/fred/?a=b"))
|
| -
|
| -class _MyResponse(io.BytesIO):
|
| - def __init__(self, body, **kwargs):
|
| - io.BytesIO.__init__(self, body)
|
| - self.headers = kwargs
|
| -
|
| - def items(self):
|
| - return self.headers.items()
|
| -
|
| - def iteritems(self):
|
| - return iter(self.headers.items())
|
| -
|
| -
|
| -class _MyHTTPConnection(object):
|
| - "This class is just a mock of httplib.HTTPConnection used for testing"
|
| -
|
| - def __init__(self, host, port=None, key_file=None, cert_file=None,
|
| - strict=None, timeout=None, proxy_info=None):
|
| - self.host = host
|
| - self.port = port
|
| - self.timeout = timeout
|
| - self.log = ""
|
| - self.sock = None
|
| -
|
| - def set_debuglevel(self, level):
|
| - pass
|
| -
|
| - def connect(self):
|
| - "Connect to a host on a given port."
|
| - pass
|
| -
|
| - def close(self):
|
| - pass
|
| -
|
| - def request(self, method, request_uri, body, headers):
|
| - pass
|
| -
|
| - def getresponse(self):
|
| - return _MyResponse(b"the body", status="200")
|
| -
|
| -
|
| -class HttpTest(unittest.TestCase):
|
| - def setUp(self):
|
| - if os.path.exists(cacheDirName):
|
| - [os.remove(os.path.join(cacheDirName, file)) for file in os.listdir(cacheDirName)]
|
| - self.http = httplib2.Http(cacheDirName)
|
| - self.http.clear_credentials()
|
| -
|
| - def testIPv6NoSSL(self):
|
| - try:
|
| - self.http.request("http://[::1]/")
|
| - except socket.gaierror:
|
| - self.fail("should get the address family right for IPv6")
|
| - except socket.error:
|
| - # Even if IPv6 isn't installed on a machine it should just raise socket.error
|
| - pass
|
| -
|
| - def testIPv6SSL(self):
|
| - try:
|
| - self.http.request("https://[::1]/")
|
| - except socket.gaierror:
|
| - self.fail("should get the address family right for IPv6")
|
| - except socket.error:
|
| - # Even if IPv6 isn't installed on a machine it should just raise socket.error
|
| - pass
|
| -
|
| - def testConnectionType(self):
|
| - self.http.force_exception_to_status_code = False
|
| - response, content = self.http.request("http://bitworking.org", connection_type=_MyHTTPConnection)
|
| - self.assertEqual(response['content-location'], "http://bitworking.org")
|
| - self.assertEqual(content, b"the body")
|
| -
|
| - def testGetUnknownServer(self):
|
| - self.http.force_exception_to_status_code = False
|
| - try:
|
| - self.http.request("http://fred.bitworking.org/")
|
| - self.fail("An httplib2.ServerNotFoundError Exception must be thrown on an unresolvable server.")
|
| - except httplib2.ServerNotFoundError:
|
| - pass
|
| -
|
| - # Now test with exceptions turned off
|
| - self.http.force_exception_to_status_code = True
|
| -
|
| - (response, content) = self.http.request("http://fred.bitworking.org/")
|
| - self.assertEqual(response['content-type'], 'text/plain')
|
| - self.assertTrue(content.startswith(b"Unable to find"))
|
| - self.assertEqual(response.status, 400)
|
| -
|
| - def testGetConnectionRefused(self):
|
| - self.http.force_exception_to_status_code = False
|
| - try:
|
| - self.http.request("http://localhost:7777/")
|
| - self.fail("An socket.error exception must be thrown on Connection Refused.")
|
| - except socket.error:
|
| - pass
|
| -
|
| - # Now test with exceptions turned off
|
| - self.http.force_exception_to_status_code = True
|
| -
|
| - (response, content) = self.http.request("http://localhost:7777/")
|
| - self.assertEqual(response['content-type'], 'text/plain')
|
| - self.assertTrue(b"Connection refused" in content)
|
| - self.assertEqual(response.status, 400)
|
| -
|
| - def testGetIRI(self):
|
| - if sys.version_info >= (2,3):
|
| - uri = urllib.parse.urljoin(base, "reflector/reflector.cgi?d=\N{CYRILLIC CAPITAL LETTER DJE}")
|
| - (response, content) = self.http.request(uri, "GET")
|
| - d = self.reflector(content)
|
| - self.assertTrue('QUERY_STRING' in d)
|
| - self.assertTrue(d['QUERY_STRING'].find('%D0%82') > 0)
|
| -
|
| - def testGetIsDefaultMethod(self):
|
| - # Test that GET is the default method
|
| - uri = urllib.parse.urljoin(base, "methods/method_reflector.cgi")
|
| - (response, content) = self.http.request(uri)
|
| - self.assertEqual(response['x-method'], "GET")
|
| -
|
| - def testDifferentMethods(self):
|
| - # Test that all methods can be used
|
| - uri = urllib.parse.urljoin(base, "methods/method_reflector.cgi")
|
| - for method in ["GET", "PUT", "DELETE", "POST"]:
|
| - (response, content) = self.http.request(uri, method, body=b" ")
|
| - self.assertEqual(response['x-method'], method)
|
| -
|
| - def testHeadRead(self):
|
| - # Test that we don't try to read the response of a HEAD request
|
| - # since httplib blocks response.read() for HEAD requests.
|
| - # Oddly enough this doesn't appear as a problem when doing HEAD requests
|
| - # against Apache servers.
|
| - uri = "http://www.google.com/"
|
| - (response, content) = self.http.request(uri, "HEAD")
|
| - self.assertEqual(response.status, 200)
|
| - self.assertEqual(content, b"")
|
| -
|
| - def testGetNoCache(self):
|
| - # Test that can do a GET w/o the cache turned on.
|
| - http = httplib2.Http()
|
| - uri = urllib.parse.urljoin(base, "304/test_etag.txt")
|
| - (response, content) = http.request(uri, "GET")
|
| - self.assertEqual(response.status, 200)
|
| - self.assertEqual(response.previous, None)
|
| -
|
| - def testGetOnlyIfCachedCacheHit(self):
|
| - # Test that can do a GET with cache and 'only-if-cached'
|
| - uri = urllib.parse.urljoin(base, "304/test_etag.txt")
|
| - (response, content) = self.http.request(uri, "GET")
|
| - (response, content) = self.http.request(uri, "GET", headers={'cache-control': 'only-if-cached'})
|
| - self.assertEqual(response.fromcache, True)
|
| - self.assertEqual(response.status, 200)
|
| -
|
| - def testGetOnlyIfCachedCacheMiss(self):
|
| - # Test that can do a GET with no cache with 'only-if-cached'
|
| - uri = urllib.parse.urljoin(base, "304/test_etag.txt")
|
| - (response, content) = self.http.request(uri, "GET", headers={'cache-control': 'only-if-cached'})
|
| - self.assertEqual(response.fromcache, False)
|
| - self.assertEqual(response.status, 504)
|
| -
|
| - def testGetOnlyIfCachedNoCacheAtAll(self):
|
| - # Test that can do a GET with no cache with 'only-if-cached'
|
| - # Of course, there might be an intermediary beyond us
|
| - # that responds to the 'only-if-cached', so this
|
| - # test can't really be guaranteed to pass.
|
| - http = httplib2.Http()
|
| - uri = urllib.parse.urljoin(base, "304/test_etag.txt")
|
| - (response, content) = http.request(uri, "GET", headers={'cache-control': 'only-if-cached'})
|
| - self.assertEqual(response.fromcache, False)
|
| - self.assertEqual(response.status, 504)
|
| -
|
| - def testUserAgent(self):
|
| - # Test that we provide a default user-agent
|
| - uri = urllib.parse.urljoin(base, "user-agent/test.cgi")
|
| - (response, content) = self.http.request(uri, "GET")
|
| - self.assertEqual(response.status, 200)
|
| - self.assertTrue(content.startswith(b"Python-httplib2/"))
|
| -
|
| - def testUserAgentNonDefault(self):
|
| - # Test that the default user-agent can be over-ridden
|
| -
|
| - uri = urllib.parse.urljoin(base, "user-agent/test.cgi")
|
| - (response, content) = self.http.request(uri, "GET", headers={'User-Agent': 'fred/1.0'})
|
| - self.assertEqual(response.status, 200)
|
| - self.assertTrue(content.startswith(b"fred/1.0"))
|
| -
|
| - def testGet300WithLocation(self):
|
| - # Test the we automatically follow 300 redirects if a Location: header is provided
|
| - uri = urllib.parse.urljoin(base, "300/with-location-header.asis")
|
| - (response, content) = self.http.request(uri, "GET")
|
| - self.assertEqual(response.status, 200)
|
| - self.assertEqual(content, b"This is the final destination.\n")
|
| - self.assertEqual(response.previous.status, 300)
|
| - self.assertEqual(response.previous.fromcache, False)
|
| -
|
| - # Confirm that the intermediate 300 is not cached
|
| - (response, content) = self.http.request(uri, "GET")
|
| - self.assertEqual(response.status, 200)
|
| - self.assertEqual(content, b"This is the final destination.\n")
|
| - self.assertEqual(response.previous.status, 300)
|
| - self.assertEqual(response.previous.fromcache, False)
|
| -
|
| - def testGet300WithLocationNoRedirect(self):
|
| - # Test the we automatically follow 300 redirects if a Location: header is provided
|
| - self.http.follow_redirects = False
|
| - uri = urllib.parse.urljoin(base, "300/with-location-header.asis")
|
| - (response, content) = self.http.request(uri, "GET")
|
| - self.assertEqual(response.status, 300)
|
| -
|
| - def testGet300WithoutLocation(self):
|
| - # Not giving a Location: header in a 300 response is acceptable
|
| - # In which case we just return the 300 response
|
| - uri = urllib.parse.urljoin(base, "300/without-location-header.asis")
|
| - (response, content) = self.http.request(uri, "GET")
|
| - self.assertEqual(response.status, 300)
|
| - self.assertTrue(response['content-type'].startswith("text/html"))
|
| - self.assertEqual(response.previous, None)
|
| -
|
| - def testGet301(self):
|
| - # Test that we automatically follow 301 redirects
|
| - # and that we cache the 301 response
|
| - uri = urllib.parse.urljoin(base, "301/onestep.asis")
|
| - destination = urllib.parse.urljoin(base, "302/final-destination.txt")
|
| - (response, content) = self.http.request(uri, "GET")
|
| - self.assertEqual(response.status, 200)
|
| - self.assertTrue('content-location' in response)
|
| - self.assertEqual(response['content-location'], destination)
|
| - self.assertEqual(content, b"This is the final destination.\n")
|
| - self.assertEqual(response.previous.status, 301)
|
| - self.assertEqual(response.previous.fromcache, False)
|
| -
|
| - (response, content) = self.http.request(uri, "GET")
|
| - self.assertEqual(response.status, 200)
|
| - self.assertEqual(response['content-location'], destination)
|
| - self.assertEqual(content, b"This is the final destination.\n")
|
| - self.assertEqual(response.previous.status, 301)
|
| - self.assertEqual(response.previous.fromcache, True)
|
| -
|
| - def testHead301(self):
|
| - # Test that we automatically follow 301 redirects
|
| - uri = urllib.parse.urljoin(base, "301/onestep.asis")
|
| - (response, content) = self.http.request(uri, "HEAD")
|
| - self.assertEqual(response.status, 200)
|
| - self.assertEqual(response.previous.status, 301)
|
| - self.assertEqual(response.previous.fromcache, False)
|
| -
|
| - def testGet301NoRedirect(self):
|
| - # Test that we automatically follow 301 redirects
|
| - # and that we cache the 301 response
|
| - self.http.follow_redirects = False
|
| - uri = urllib.parse.urljoin(base, "301/onestep.asis")
|
| - destination = urllib.parse.urljoin(base, "302/final-destination.txt")
|
| - (response, content) = self.http.request(uri, "GET")
|
| - self.assertEqual(response.status, 301)
|
| -
|
| -
|
| - def testGet302(self):
|
| - # Test that we automatically follow 302 redirects
|
| - # and that we DO NOT cache the 302 response
|
| - uri = urllib.parse.urljoin(base, "302/onestep.asis")
|
| - destination = urllib.parse.urljoin(base, "302/final-destination.txt")
|
| - (response, content) = self.http.request(uri, "GET")
|
| - self.assertEqual(response.status, 200)
|
| - self.assertEqual(response['content-location'], destination)
|
| - self.assertEqual(content, b"This is the final destination.\n")
|
| - self.assertEqual(response.previous.status, 302)
|
| - self.assertEqual(response.previous.fromcache, False)
|
| -
|
| - uri = urllib.parse.urljoin(base, "302/onestep.asis")
|
| - (response, content) = self.http.request(uri, "GET")
|
| - self.assertEqual(response.status, 200)
|
| - self.assertEqual(response.fromcache, True)
|
| - self.assertEqual(response['content-location'], destination)
|
| - self.assertEqual(content, b"This is the final destination.\n")
|
| - self.assertEqual(response.previous.status, 302)
|
| - self.assertEqual(response.previous.fromcache, False)
|
| - self.assertEqual(response.previous['content-location'], uri)
|
| -
|
| - uri = urllib.parse.urljoin(base, "302/twostep.asis")
|
| -
|
| - (response, content) = self.http.request(uri, "GET")
|
| - self.assertEqual(response.status, 200)
|
| - self.assertEqual(response.fromcache, True)
|
| - self.assertEqual(content, b"This is the final destination.\n")
|
| - self.assertEqual(response.previous.status, 302)
|
| - self.assertEqual(response.previous.fromcache, False)
|
| -
|
| - def testGet302RedirectionLimit(self):
|
| - # Test that we can set a lower redirection limit
|
| - # and that we raise an exception when we exceed
|
| - # that limit.
|
| - self.http.force_exception_to_status_code = False
|
| -
|
| - uri = urllib.parse.urljoin(base, "302/twostep.asis")
|
| - try:
|
| - (response, content) = self.http.request(uri, "GET", redirections = 1)
|
| - self.fail("This should not happen")
|
| - except httplib2.RedirectLimit:
|
| - pass
|
| - except Exception as e:
|
| - self.fail("Threw wrong kind of exception ")
|
| -
|
| - # Re-run the test with out the exceptions
|
| - self.http.force_exception_to_status_code = True
|
| -
|
| - (response, content) = self.http.request(uri, "GET", redirections = 1)
|
| - self.assertEqual(response.status, 500)
|
| - self.assertTrue(response.reason.startswith("Redirected more"))
|
| - self.assertEqual("302", response['status'])
|
| - self.assertTrue(content.startswith(b"<html>"))
|
| - self.assertTrue(response.previous != None)
|
| -
|
| - def testGet302NoLocation(self):
|
| - # Test that we throw an exception when we get
|
| - # a 302 with no Location: header.
|
| - self.http.force_exception_to_status_code = False
|
| - uri = urllib.parse.urljoin(base, "302/no-location.asis")
|
| - try:
|
| - (response, content) = self.http.request(uri, "GET")
|
| - self.fail("Should never reach here")
|
| - except httplib2.RedirectMissingLocation:
|
| - pass
|
| - except Exception as e:
|
| - self.fail("Threw wrong kind of exception ")
|
| -
|
| - # Re-run the test with out the exceptions
|
| - self.http.force_exception_to_status_code = True
|
| -
|
| - (response, content) = self.http.request(uri, "GET")
|
| - self.assertEqual(response.status, 500)
|
| - self.assertTrue(response.reason.startswith("Redirected but"))
|
| - self.assertEqual("302", response['status'])
|
| - self.assertTrue(content.startswith(b"This is content"))
|
| -
|
| - def testGet301ViaHttps(self):
|
| - # Google always redirects to http://google.com
|
| - (response, content) = self.http.request("https://code.google.com/apis/", "GET")
|
| - self.assertEqual(200, response.status)
|
| - self.assertEqual(301, response.previous.status)
|
| -
|
| - def testGetViaHttps(self):
|
| - # Test that we can handle HTTPS
|
| - (response, content) = self.http.request("https://google.com/adsense/", "GET")
|
| - self.assertEqual(200, response.status)
|
| -
|
| - def testGetViaHttpsSpecViolationOnLocation(self):
|
| - # Test that we follow redirects through HTTPS
|
| - # even if they violate the spec by including
|
| - # a relative Location: header instead of an
|
| - # absolute one.
|
| - (response, content) = self.http.request("https://google.com/adsense", "GET")
|
| - self.assertEqual(200, response.status)
|
| - self.assertNotEqual(None, response.previous)
|
| -
|
| -
|
| - def testGetViaHttpsKeyCert(self):
|
| - # At this point I can only test
|
| - # that the key and cert files are passed in
|
| - # correctly to httplib. It would be nice to have
|
| - # a real https endpoint to test against.
|
| - http = httplib2.Http(timeout=2)
|
| -
|
| - http.add_certificate("akeyfile", "acertfile", "bitworking.org")
|
| - try:
|
| - (response, content) = http.request("https://bitworking.org", "GET")
|
| - except AttributeError:
|
| - self.assertEqual(http.connections["https:bitworking.org"].key_file, "akeyfile")
|
| - self.assertEqual(http.connections["https:bitworking.org"].cert_file, "acertfile")
|
| - except IOError:
|
| - # Skip on 3.2
|
| - pass
|
| -
|
| - try:
|
| - (response, content) = http.request("https://notthere.bitworking.org", "GET")
|
| - except httplib2.ServerNotFoundError:
|
| - self.assertEqual(http.connections["https:notthere.bitworking.org"].key_file, None)
|
| - self.assertEqual(http.connections["https:notthere.bitworking.org"].cert_file, None)
|
| - except IOError:
|
| - # Skip on 3.2
|
| - pass
|
| -
|
| - def testSslCertValidation(self):
|
| - # Test that we get an ssl.SSLError when specifying a non-existent CA
|
| - # certs file.
|
| - http = httplib2.Http(ca_certs='/nosuchfile')
|
| - self.assertRaises(IOError,
|
| - http.request, "https://www.google.com/", "GET")
|
| -
|
| - # Test that we get a SSLHandshakeError if we try to access
|
| - # https://www.google.com, using a CA cert file that doesn't contain
|
| - # the CA Gogole uses (i.e., simulating a cert that's not signed by a
|
| - # trusted CA).
|
| - other_ca_certs = os.path.join(
|
| - os.path.dirname(os.path.abspath(httplib2.__file__ )),
|
| - "test", "other_cacerts.txt")
|
| - http = httplib2.Http(ca_certs=other_ca_certs)
|
| - self.assertRaises(ssl.SSLError,
|
| - http.request,"https://www.google.com/", "GET")
|
| -
|
| - def testSniHostnameValidation(self):
|
| - self.http.request("https://google.com/", method="GET")
|
| -
|
| - def testGet303(self):
|
| - # Do a follow-up GET on a Location: header
|
| - # returned from a POST that gave a 303.
|
| - uri = urllib.parse.urljoin(base, "303/303.cgi")
|
| - (response, content) = self.http.request(uri, "POST", " ")
|
| - self.assertEqual(response.status, 200)
|
| - self.assertEqual(content, b"This is the final destination.\n")
|
| - self.assertEqual(response.previous.status, 303)
|
| -
|
| - def testGet303NoRedirect(self):
|
| - # Do a follow-up GET on a Location: header
|
| - # returned from a POST that gave a 303.
|
| - self.http.follow_redirects = False
|
| - uri = urllib.parse.urljoin(base, "303/303.cgi")
|
| - (response, content) = self.http.request(uri, "POST", " ")
|
| - self.assertEqual(response.status, 303)
|
| -
|
| - def test303ForDifferentMethods(self):
|
| - # Test that all methods can be used
|
| - uri = urllib.parse.urljoin(base, "303/redirect-to-reflector.cgi")
|
| - for (method, method_on_303) in [("PUT", "GET"), ("DELETE", "GET"), ("POST", "GET"), ("GET", "GET"), ("HEAD", "GET")]:
|
| - (response, content) = self.http.request(uri, method, body=b" ")
|
| - self.assertEqual(response['x-method'], method_on_303)
|
| -
|
| - def testGet304(self):
|
| - # Test that we use ETags properly to validate our cache
|
| - uri = urllib.parse.urljoin(base, "304/test_etag.txt")
|
| - (response, content) = self.http.request(uri, "GET")
|
| - self.assertNotEqual(response['etag'], "")
|
| -
|
| - (response, content) = self.http.request(uri, "GET")
|
| - (response, content) = self.http.request(uri, "GET", headers = {'cache-control': 'must-revalidate'})
|
| - self.assertEqual(response.status, 200)
|
| - self.assertEqual(response.fromcache, True)
|
| -
|
| - cache_file_name = os.path.join(cacheDirName, httplib2.safename(httplib2.urlnorm(uri)[-1]))
|
| - f = open(cache_file_name, "r")
|
| - status_line = f.readline()
|
| - f.close()
|
| -
|
| - self.assertTrue(status_line.startswith("status:"))
|
| -
|
| - (response, content) = self.http.request(uri, "HEAD")
|
| - self.assertEqual(response.status, 200)
|
| - self.assertEqual(response.fromcache, True)
|
| -
|
| - (response, content) = self.http.request(uri, "GET", headers = {'range': 'bytes=0-0'})
|
| - self.assertEqual(response.status, 206)
|
| - self.assertEqual(response.fromcache, False)
|
| -
|
| - def testGetIgnoreEtag(self):
|
| - # Test that we can forcibly ignore ETags
|
| - uri = urllib.parse.urljoin(base, "reflector/reflector.cgi")
|
| - (response, content) = self.http.request(uri, "GET")
|
| - self.assertNotEqual(response['etag'], "")
|
| -
|
| - (response, content) = self.http.request(uri, "GET", headers = {'cache-control': 'max-age=0'})
|
| - d = self.reflector(content)
|
| - self.assertTrue('HTTP_IF_NONE_MATCH' in d)
|
| -
|
| - self.http.ignore_etag = True
|
| - (response, content) = self.http.request(uri, "GET", headers = {'cache-control': 'max-age=0'})
|
| - d = self.reflector(content)
|
| - self.assertEqual(response.fromcache, False)
|
| - self.assertFalse('HTTP_IF_NONE_MATCH' in d)
|
| -
|
| - def testOverrideEtag(self):
|
| - # Test that we can forcibly ignore ETags
|
| - uri = urllib.parse.urljoin(base, "reflector/reflector.cgi")
|
| - (response, content) = self.http.request(uri, "GET")
|
| - self.assertNotEqual(response['etag'], "")
|
| -
|
| - (response, content) = self.http.request(uri, "GET", headers = {'cache-control': 'max-age=0'})
|
| - d = self.reflector(content)
|
| - self.assertTrue('HTTP_IF_NONE_MATCH' in d)
|
| - self.assertNotEqual(d['HTTP_IF_NONE_MATCH'], "fred")
|
| -
|
| - (response, content) = self.http.request(uri, "GET", headers = {'cache-control': 'max-age=0', 'if-none-match': 'fred'})
|
| - d = self.reflector(content)
|
| - self.assertTrue('HTTP_IF_NONE_MATCH' in d)
|
| - self.assertEqual(d['HTTP_IF_NONE_MATCH'], "fred")
|
| -
|
| -#MAP-commented this out because it consistently fails
|
| -# def testGet304EndToEnd(self):
|
| -# # Test that end to end headers get overwritten in the cache
|
| -# uri = urllib.parse.urljoin(base, "304/end2end.cgi")
|
| -# (response, content) = self.http.request(uri, "GET")
|
| -# self.assertNotEqual(response['etag'], "")
|
| -# old_date = response['date']
|
| -# time.sleep(2)
|
| -#
|
| -# (response, content) = self.http.request(uri, "GET", headers = {'Cache-Control': 'max-age=0'})
|
| -# # The response should be from the cache, but the Date: header should be updated.
|
| -# new_date = response['date']
|
| -# self.assertNotEqual(new_date, old_date)
|
| -# self.assertEqual(response.status, 200)
|
| -# self.assertEqual(response.fromcache, True)
|
| -
|
| - def testGet304LastModified(self):
|
| - # Test that we can still handle a 304
|
| - # by only using the last-modified cache validator.
|
| - uri = urllib.parse.urljoin(base, "304/last-modified-only/last-modified-only.txt")
|
| - (response, content) = self.http.request(uri, "GET")
|
| -
|
| - self.assertNotEqual(response['last-modified'], "")
|
| - (response, content) = self.http.request(uri, "GET")
|
| - (response, content) = self.http.request(uri, "GET")
|
| - self.assertEqual(response.status, 200)
|
| - self.assertEqual(response.fromcache, True)
|
| -
|
| - def testGet307(self):
|
| - # Test that we do follow 307 redirects but
|
| - # do not cache the 307
|
| - uri = urllib.parse.urljoin(base, "307/onestep.asis")
|
| - (response, content) = self.http.request(uri, "GET")
|
| - self.assertEqual(response.status, 200)
|
| - self.assertEqual(content, b"This is the final destination.\n")
|
| - self.assertEqual(response.previous.status, 307)
|
| - self.assertEqual(response.previous.fromcache, False)
|
| -
|
| - (response, content) = self.http.request(uri, "GET")
|
| - self.assertEqual(response.status, 200)
|
| - self.assertEqual(response.fromcache, True)
|
| - self.assertEqual(content, b"This is the final destination.\n")
|
| - self.assertEqual(response.previous.status, 307)
|
| - self.assertEqual(response.previous.fromcache, False)
|
| -
|
| - def testGet410(self):
|
| - # Test that we pass 410's through
|
| - uri = urllib.parse.urljoin(base, "410/410.asis")
|
| - (response, content) = self.http.request(uri, "GET")
|
| - self.assertEqual(response.status, 410)
|
| -
|
| - def testVaryHeaderSimple(self):
|
| - """
|
| - RFC 2616 13.6
|
| - When the cache receives a subsequent request whose Request-URI
|
| - specifies one or more cache entries including a Vary header field,
|
| - the cache MUST NOT use such a cache entry to construct a response
|
| - to the new request unless all of the selecting request-headers
|
| - present in the new request match the corresponding stored
|
| - request-headers in the original request.
|
| - """
|
| - # test that the vary header is sent
|
| - uri = urllib.parse.urljoin(base, "vary/accept.asis")
|
| - (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/plain'})
|
| - self.assertEqual(response.status, 200)
|
| - self.assertTrue('vary' in response)
|
| -
|
| - # get the resource again, from the cache since accept header in this
|
| - # request is the same as the request
|
| - (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/plain'})
|
| - self.assertEqual(response.status, 200)
|
| - self.assertEqual(response.fromcache, True, msg="Should be from cache")
|
| -
|
| - # get the resource again, not from cache since Accept headers does not match
|
| - (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/html'})
|
| - self.assertEqual(response.status, 200)
|
| - self.assertEqual(response.fromcache, False, msg="Should not be from cache")
|
| -
|
| - # get the resource again, without any Accept header, so again no match
|
| - (response, content) = self.http.request(uri, "GET")
|
| - self.assertEqual(response.status, 200)
|
| - self.assertEqual(response.fromcache, False, msg="Should not be from cache")
|
| -
|
| - def testNoVary(self):
|
| - pass
|
| - # when there is no vary, a different Accept header (e.g.) should not
|
| - # impact if the cache is used
|
| - # test that the vary header is not sent
|
| - # uri = urllib.parse.urljoin(base, "vary/no-vary.asis")
|
| - # (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/plain'})
|
| - # self.assertEqual(response.status, 200)
|
| - # self.assertFalse('vary' in response)
|
| - #
|
| - # (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/plain'})
|
| - # self.assertEqual(response.status, 200)
|
| - # self.assertEqual(response.fromcache, True, msg="Should be from cache")
|
| - #
|
| - # (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/html'})
|
| - # self.assertEqual(response.status, 200)
|
| - # self.assertEqual(response.fromcache, True, msg="Should be from cache")
|
| -
|
| - def testVaryHeaderDouble(self):
|
| - uri = urllib.parse.urljoin(base, "vary/accept-double.asis")
|
| - (response, content) = self.http.request(uri, "GET", headers={
|
| - 'Accept': 'text/plain', 'Accept-Language': 'da, en-gb;q=0.8, en;q=0.7'})
|
| - self.assertEqual(response.status, 200)
|
| - self.assertTrue('vary' in response)
|
| -
|
| - # we are from cache
|
| - (response, content) = self.http.request(uri, "GET", headers={
|
| - 'Accept': 'text/plain', 'Accept-Language': 'da, en-gb;q=0.8, en;q=0.7'})
|
| - self.assertEqual(response.fromcache, True, msg="Should be from cache")
|
| -
|
| - (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/plain'})
|
| - self.assertEqual(response.status, 200)
|
| - self.assertEqual(response.fromcache, False)
|
| -
|
| - # get the resource again, not from cache, varied headers don't match exact
|
| - (response, content) = self.http.request(uri, "GET", headers={'Accept-Language': 'da'})
|
| - self.assertEqual(response.status, 200)
|
| - self.assertEqual(response.fromcache, False, msg="Should not be from cache")
|
| -
|
| - def testVaryUnusedHeader(self):
|
| - # A header's value is not considered to vary if it's not used at all.
|
| - uri = urllib.parse.urljoin(base, "vary/unused-header.asis")
|
| - (response, content) = self.http.request(uri, "GET", headers={
|
| - 'Accept': 'text/plain'})
|
| - self.assertEqual(response.status, 200)
|
| - self.assertTrue('vary' in response)
|
| -
|
| - # we are from cache
|
| - (response, content) = self.http.request(uri, "GET", headers={
|
| - 'Accept': 'text/plain',})
|
| - self.assertEqual(response.fromcache, True, msg="Should be from cache")
|
| -
|
| - def testHeadGZip(self):
|
| - # Test that we don't try to decompress a HEAD response
|
| - uri = urllib.parse.urljoin(base, "gzip/final-destination.txt")
|
| - (response, content) = self.http.request(uri, "HEAD")
|
| - self.assertEqual(response.status, 200)
|
| - self.assertNotEqual(int(response['content-length']), 0)
|
| - self.assertEqual(content, b"")
|
| -
|
| - def testGetGZip(self):
|
| - # Test that we support gzip compression
|
| - uri = urllib.parse.urljoin(base, "gzip/final-destination.txt")
|
| - (response, content) = self.http.request(uri, "GET")
|
| - self.assertEqual(response.status, 200)
|
| - self.assertFalse('content-encoding' in response)
|
| - self.assertTrue('-content-encoding' in response)
|
| - self.assertEqual(int(response['content-length']), len(b"This is the final destination.\n"))
|
| - self.assertEqual(content, b"This is the final destination.\n")
|
| -
|
| - def testPostAndGZipResponse(self):
|
| - uri = urllib.parse.urljoin(base, "gzip/post.cgi")
|
| - (response, content) = self.http.request(uri, "POST", body=" ")
|
| - self.assertEqual(response.status, 200)
|
| - self.assertFalse('content-encoding' in response)
|
| - self.assertTrue('-content-encoding' in response)
|
| -
|
| - def testGetGZipFailure(self):
|
| - # Test that we raise a good exception when the gzip fails
|
| - self.http.force_exception_to_status_code = False
|
| - uri = urllib.parse.urljoin(base, "gzip/failed-compression.asis")
|
| - try:
|
| - (response, content) = self.http.request(uri, "GET")
|
| - self.fail("Should never reach here")
|
| - except httplib2.FailedToDecompressContent:
|
| - pass
|
| - except Exception:
|
| - self.fail("Threw wrong kind of exception")
|
| -
|
| - # Re-run the test with out the exceptions
|
| - self.http.force_exception_to_status_code = True
|
| -
|
| - (response, content) = self.http.request(uri, "GET")
|
| - self.assertEqual(response.status, 500)
|
| - self.assertTrue(response.reason.startswith("Content purported"))
|
| -
|
| - def testIndividualTimeout(self):
|
| - uri = urllib.parse.urljoin(base, "timeout/timeout.cgi")
|
| - http = httplib2.Http(timeout=1)
|
| - http.force_exception_to_status_code = True
|
| -
|
| - (response, content) = http.request(uri)
|
| - self.assertEqual(response.status, 408)
|
| - self.assertTrue(response.reason.startswith("Request Timeout"))
|
| - self.assertTrue(content.startswith(b"Request Timeout"))
|
| -
|
| -
|
| - def testGetDeflate(self):
|
| - # Test that we support deflate compression
|
| - uri = urllib.parse.urljoin(base, "deflate/deflated.asis")
|
| - (response, content) = self.http.request(uri, "GET")
|
| - self.assertEqual(response.status, 200)
|
| - self.assertFalse('content-encoding' in response)
|
| - self.assertEqual(int(response['content-length']), len("This is the final destination."))
|
| - self.assertEqual(content, b"This is the final destination.")
|
| -
|
| - def testGetDeflateFailure(self):
|
| - # Test that we raise a good exception when the deflate fails
|
| - self.http.force_exception_to_status_code = False
|
| -
|
| - uri = urllib.parse.urljoin(base, "deflate/failed-compression.asis")
|
| - try:
|
| - (response, content) = self.http.request(uri, "GET")
|
| - self.fail("Should never reach here")
|
| - except httplib2.FailedToDecompressContent:
|
| - pass
|
| - except Exception:
|
| - self.fail("Threw wrong kind of exception")
|
| -
|
| - # Re-run the test with out the exceptions
|
| - self.http.force_exception_to_status_code = True
|
| -
|
| - (response, content) = self.http.request(uri, "GET")
|
| - self.assertEqual(response.status, 500)
|
| - self.assertTrue(response.reason.startswith("Content purported"))
|
| -
|
| - def testGetDuplicateHeaders(self):
|
| - # Test that duplicate headers get concatenated via ','
|
| - uri = urllib.parse.urljoin(base, "duplicate-headers/multilink.asis")
|
| - (response, content) = self.http.request(uri, "GET")
|
| - self.assertEqual(response.status, 200)
|
| - self.assertEqual(content, b"This is content\n")
|
| - self.assertEqual(response['link'].split(",")[0], '<http://bitworking.org>; rel="home"; title="BitWorking"')
|
| -
|
| - def testGetCacheControlNoCache(self):
|
| - # Test Cache-Control: no-cache on requests
|
| - uri = urllib.parse.urljoin(base, "304/test_etag.txt")
|
| - (response, content) = self.http.request(uri, "GET")
|
| - self.assertNotEqual(response['etag'], "")
|
| - (response, content) = self.http.request(uri, "GET")
|
| - self.assertEqual(response.status, 200)
|
| - self.assertEqual(response.fromcache, True)
|
| -
|
| - (response, content) = self.http.request(uri, "GET", headers={'Cache-Control': 'no-cache'})
|
| - self.assertEqual(response.status, 200)
|
| - self.assertEqual(response.fromcache, False)
|
| -
|
| - def testGetCacheControlPragmaNoCache(self):
|
| - # Test Pragma: no-cache on requests
|
| - uri = urllib.parse.urljoin(base, "304/test_etag.txt")
|
| - (response, content) = self.http.request(uri, "GET")
|
| - self.assertNotEqual(response['etag'], "")
|
| - (response, content) = self.http.request(uri, "GET")
|
| - self.assertEqual(response.status, 200)
|
| - self.assertEqual(response.fromcache, True)
|
| -
|
| - (response, content) = self.http.request(uri, "GET", headers={'Pragma': 'no-cache'})
|
| - self.assertEqual(response.status, 200)
|
| - self.assertEqual(response.fromcache, False)
|
| -
|
| - def testGetCacheControlNoStoreRequest(self):
|
| - # A no-store request means that the response should not be stored.
|
| - uri = urllib.parse.urljoin(base, "304/test_etag.txt")
|
| -
|
| - (response, content) = self.http.request(uri, "GET", headers={'Cache-Control': 'no-store'})
|
| - self.assertEqual(response.status, 200)
|
| - self.assertEqual(response.fromcache, False)
|
| -
|
| - (response, content) = self.http.request(uri, "GET", headers={'Cache-Control': 'no-store'})
|
| - self.assertEqual(response.status, 200)
|
| - self.assertEqual(response.fromcache, False)
|
| -
|
| - def testGetCacheControlNoStoreResponse(self):
|
| - # A no-store response means that the response should not be stored.
|
| - uri = urllib.parse.urljoin(base, "no-store/no-store.asis")
|
| -
|
| - (response, content) = self.http.request(uri, "GET")
|
| - self.assertEqual(response.status, 200)
|
| - self.assertEqual(response.fromcache, False)
|
| -
|
| - (response, content) = self.http.request(uri, "GET")
|
| - self.assertEqual(response.status, 200)
|
| - self.assertEqual(response.fromcache, False)
|
| -
|
| - def testGetCacheControlNoCacheNoStoreRequest(self):
|
| - # Test that a no-store, no-cache clears the entry from the cache
|
| - # even if it was cached previously.
|
| - uri = urllib.parse.urljoin(base, "304/test_etag.txt")
|
| -
|
| - (response, content) = self.http.request(uri, "GET")
|
| - (response, content) = self.http.request(uri, "GET")
|
| - self.assertEqual(response.fromcache, True)
|
| - (response, content) = self.http.request(uri, "GET", headers={'Cache-Control': 'no-store, no-cache'})
|
| - (response, content) = self.http.request(uri, "GET", headers={'Cache-Control': 'no-store, no-cache'})
|
| - self.assertEqual(response.status, 200)
|
| - self.assertEqual(response.fromcache, False)
|
| -
|
| - def testUpdateInvalidatesCache(self):
|
| - # Test that calling PUT or DELETE on a
|
| - # URI that is cache invalidates that cache.
|
| - uri = urllib.parse.urljoin(base, "304/test_etag.txt")
|
| -
|
| - (response, content) = self.http.request(uri, "GET")
|
| - (response, content) = self.http.request(uri, "GET")
|
| - self.assertEqual(response.fromcache, True)
|
| - (response, content) = self.http.request(uri, "DELETE")
|
| - self.assertEqual(response.status, 405)
|
| -
|
| - (response, content) = self.http.request(uri, "GET")
|
| - self.assertEqual(response.fromcache, False)
|
| -
|
| - def testUpdateUsesCachedETag(self):
|
| - # Test that we natively support http://www.w3.org/1999/04/Editing/
|
| - uri = urllib.parse.urljoin(base, "conditional-updates/test.cgi")
|
| -
|
| - (response, content) = self.http.request(uri, "GET")
|
| - self.assertEqual(response.status, 200)
|
| - self.assertEqual(response.fromcache, False)
|
| - (response, content) = self.http.request(uri, "GET")
|
| - self.assertEqual(response.status, 200)
|
| - self.assertEqual(response.fromcache, True)
|
| - (response, content) = self.http.request(uri, "PUT", body="foo")
|
| - self.assertEqual(response.status, 200)
|
| - (response, content) = self.http.request(uri, "PUT", body="foo")
|
| - self.assertEqual(response.status, 412)
|
| -
|
| -
|
| - def testUpdatePatchUsesCachedETag(self):
|
| - # Test that we natively support http://www.w3.org/1999/04/Editing/
|
| - uri = urllib.parse.urljoin(base, "conditional-updates/test.cgi")
|
| -
|
| - (response, content) = self.http.request(uri, "GET")
|
| - self.assertEqual(response.status, 200)
|
| - self.assertEqual(response.fromcache, False)
|
| - (response, content) = self.http.request(uri, "GET")
|
| - self.assertEqual(response.status, 200)
|
| - self.assertEqual(response.fromcache, True)
|
| - (response, content) = self.http.request(uri, "PATCH", body="foo")
|
| - self.assertEqual(response.status, 200)
|
| - (response, content) = self.http.request(uri, "PATCH", body="foo")
|
| - self.assertEqual(response.status, 412)
|
| -
|
| - def testUpdateUsesCachedETagAndOCMethod(self):
|
| - # Test that we natively support http://www.w3.org/1999/04/Editing/
|
| - uri = urllib.parse.urljoin(base, "conditional-updates/test.cgi")
|
| -
|
| - (response, content) = self.http.request(uri, "GET")
|
| - self.assertEqual(response.status, 200)
|
| - self.assertEqual(response.fromcache, False)
|
| - (response, content) = self.http.request(uri, "GET")
|
| - self.assertEqual(response.status, 200)
|
| - self.assertEqual(response.fromcache, True)
|
| - self.http.optimistic_concurrency_methods.append("DELETE")
|
| - (response, content) = self.http.request(uri, "DELETE")
|
| - self.assertEqual(response.status, 200)
|
| -
|
| -
|
| - def testUpdateUsesCachedETagOverridden(self):
|
| - # Test that we natively support http://www.w3.org/1999/04/Editing/
|
| - uri = urllib.parse.urljoin(base, "conditional-updates/test.cgi")
|
| -
|
| - (response, content) = self.http.request(uri, "GET")
|
| - self.assertEqual(response.status, 200)
|
| - self.assertEqual(response.fromcache, False)
|
| - (response, content) = self.http.request(uri, "GET")
|
| - self.assertEqual(response.status, 200)
|
| - self.assertEqual(response.fromcache, True)
|
| - (response, content) = self.http.request(uri, "PUT", body="foo", headers={'if-match': 'fred'})
|
| - self.assertEqual(response.status, 412)
|
| -
|
| - def testBasicAuth(self):
|
| - # Test Basic Authentication
|
| - uri = urllib.parse.urljoin(base, "basic/file.txt")
|
| - (response, content) = self.http.request(uri, "GET")
|
| - self.assertEqual(response.status, 401)
|
| -
|
| - uri = urllib.parse.urljoin(base, "basic/")
|
| - (response, content) = self.http.request(uri, "GET")
|
| - self.assertEqual(response.status, 401)
|
| -
|
| - self.http.add_credentials('joe', 'password')
|
| - (response, content) = self.http.request(uri, "GET")
|
| - self.assertEqual(response.status, 200)
|
| -
|
| - uri = urllib.parse.urljoin(base, "basic/file.txt")
|
| - (response, content) = self.http.request(uri, "GET")
|
| - self.assertEqual(response.status, 200)
|
| -
|
| - def testBasicAuthWithDomain(self):
|
| - # Test Basic Authentication
|
| - uri = urllib.parse.urljoin(base, "basic/file.txt")
|
| - (response, content) = self.http.request(uri, "GET")
|
| - self.assertEqual(response.status, 401)
|
| -
|
| - uri = urllib.parse.urljoin(base, "basic/")
|
| - (response, content) = self.http.request(uri, "GET")
|
| - self.assertEqual(response.status, 401)
|
| -
|
| - self.http.add_credentials('joe', 'password', "example.org")
|
| - (response, content) = self.http.request(uri, "GET")
|
| - self.assertEqual(response.status, 401)
|
| -
|
| - uri = urllib.parse.urljoin(base, "basic/file.txt")
|
| - (response, content) = self.http.request(uri, "GET")
|
| - self.assertEqual(response.status, 401)
|
| -
|
| - domain = urllib.parse.urlparse(base)[1]
|
| - self.http.add_credentials('joe', 'password', domain)
|
| - (response, content) = self.http.request(uri, "GET")
|
| - self.assertEqual(response.status, 200)
|
| -
|
| - uri = urllib.parse.urljoin(base, "basic/file.txt")
|
| - (response, content) = self.http.request(uri, "GET")
|
| - self.assertEqual(response.status, 200)
|
| -
|
| -
|
| -
|
| -
|
| -
|
| -
|
| - def testBasicAuthTwoDifferentCredentials(self):
|
| - # Test Basic Authentication with multiple sets of credentials
|
| - uri = urllib.parse.urljoin(base, "basic2/file.txt")
|
| - (response, content) = self.http.request(uri, "GET")
|
| - self.assertEqual(response.status, 401)
|
| -
|
| - uri = urllib.parse.urljoin(base, "basic2/")
|
| - (response, content) = self.http.request(uri, "GET")
|
| - self.assertEqual(response.status, 401)
|
| -
|
| - self.http.add_credentials('fred', 'barney')
|
| - (response, content) = self.http.request(uri, "GET")
|
| - self.assertEqual(response.status, 200)
|
| -
|
| - uri = urllib.parse.urljoin(base, "basic2/file.txt")
|
| - (response, content) = self.http.request(uri, "GET")
|
| - self.assertEqual(response.status, 200)
|
| -
|
| - def testBasicAuthNested(self):
|
| - # Test Basic Authentication with resources
|
| - # that are nested
|
| - uri = urllib.parse.urljoin(base, "basic-nested/")
|
| - (response, content) = self.http.request(uri, "GET")
|
| - self.assertEqual(response.status, 401)
|
| -
|
| - uri = urllib.parse.urljoin(base, "basic-nested/subdir")
|
| - (response, content) = self.http.request(uri, "GET")
|
| - self.assertEqual(response.status, 401)
|
| -
|
| - # Now add in credentials one at a time and test.
|
| - self.http.add_credentials('joe', 'password')
|
| -
|
| - uri = urllib.parse.urljoin(base, "basic-nested/")
|
| - (response, content) = self.http.request(uri, "GET")
|
| - self.assertEqual(response.status, 200)
|
| -
|
| - uri = urllib.parse.urljoin(base, "basic-nested/subdir")
|
| - (response, content) = self.http.request(uri, "GET")
|
| - self.assertEqual(response.status, 401)
|
| -
|
| - self.http.add_credentials('fred', 'barney')
|
| -
|
| - uri = urllib.parse.urljoin(base, "basic-nested/")
|
| - (response, content) = self.http.request(uri, "GET")
|
| - self.assertEqual(response.status, 200)
|
| -
|
| - uri = urllib.parse.urljoin(base, "basic-nested/subdir")
|
| - (response, content) = self.http.request(uri, "GET")
|
| - self.assertEqual(response.status, 200)
|
| -
|
| - def testDigestAuth(self):
|
| - # Test that we support Digest Authentication
|
| - uri = urllib.parse.urljoin(base, "digest/")
|
| - (response, content) = self.http.request(uri, "GET")
|
| - self.assertEqual(response.status, 401)
|
| -
|
| - self.http.add_credentials('joe', 'password')
|
| - (response, content) = self.http.request(uri, "GET")
|
| - self.assertEqual(response.status, 200)
|
| -
|
| - uri = urllib.parse.urljoin(base, "digest/file.txt")
|
| - (response, content) = self.http.request(uri, "GET")
|
| -
|
| - def testDigestAuthNextNonceAndNC(self):
|
| - # Test that if the server sets nextnonce that we reset
|
| - # the nonce count back to 1
|
| - uri = urllib.parse.urljoin(base, "digest/file.txt")
|
| - self.http.add_credentials('joe', 'password')
|
| - (response, content) = self.http.request(uri, "GET", headers = {"cache-control":"no-cache"})
|
| - info = httplib2._parse_www_authenticate(response, 'authentication-info')
|
| - self.assertEqual(response.status, 200)
|
| - (response, content) = self.http.request(uri, "GET", headers = {"cache-control":"no-cache"})
|
| - info2 = httplib2._parse_www_authenticate(response, 'authentication-info')
|
| - self.assertEqual(response.status, 200)
|
| -
|
| - if 'nextnonce' in info:
|
| - self.assertEqual(info2['nc'], 1)
|
| -
|
| - def testDigestAuthStale(self):
|
| - # Test that we can handle a nonce becoming stale
|
| - uri = urllib.parse.urljoin(base, "digest-expire/file.txt")
|
| - self.http.add_credentials('joe', 'password')
|
| - (response, content) = self.http.request(uri, "GET", headers = {"cache-control":"no-cache"})
|
| - info = httplib2._parse_www_authenticate(response, 'authentication-info')
|
| - self.assertEqual(response.status, 200)
|
| -
|
| - time.sleep(3)
|
| - # Sleep long enough that the nonce becomes stale
|
| -
|
| - (response, content) = self.http.request(uri, "GET", headers = {"cache-control":"no-cache"})
|
| - self.assertFalse(response.fromcache)
|
| - self.assertTrue(response._stale_digest)
|
| - info3 = httplib2._parse_www_authenticate(response, 'authentication-info')
|
| - self.assertEqual(response.status, 200)
|
| -
|
| - def reflector(self, content):
|
| - return dict( [tuple(x.split("=", 1)) for x in content.decode('utf-8').strip().split("\n")] )
|
| -
|
| - def testReflector(self):
|
| - uri = urllib.parse.urljoin(base, "reflector/reflector.cgi")
|
| - (response, content) = self.http.request(uri, "GET")
|
| - d = self.reflector(content)
|
| - self.assertTrue('HTTP_USER_AGENT' in d)
|
| -
|
| -
|
| - def testConnectionClose(self):
|
| - uri = "http://www.google.com/"
|
| - (response, content) = self.http.request(uri, "GET")
|
| - for c in self.http.connections.values():
|
| - self.assertNotEqual(None, c.sock)
|
| - (response, content) = self.http.request(uri, "GET", headers={"connection": "close"})
|
| - for c in self.http.connections.values():
|
| - self.assertEqual(None, c.sock)
|
| -
|
| - def testPickleHttp(self):
|
| - pickled_http = pickle.dumps(self.http)
|
| - new_http = pickle.loads(pickled_http)
|
| -
|
| - self.assertEqual(sorted(new_http.__dict__.keys()),
|
| - sorted(self.http.__dict__.keys()))
|
| - for key in new_http.__dict__:
|
| - if key in ('certificates', 'credentials'):
|
| - self.assertEqual(new_http.__dict__[key].credentials,
|
| - self.http.__dict__[key].credentials)
|
| - elif key == 'cache':
|
| - self.assertEqual(new_http.__dict__[key].cache,
|
| - self.http.__dict__[key].cache)
|
| - else:
|
| - self.assertEqual(new_http.__dict__[key],
|
| - self.http.__dict__[key])
|
| -
|
| - def testPickleHttpWithConnection(self):
|
| - self.http.request('http://bitworking.org',
|
| - connection_type=_MyHTTPConnection)
|
| - pickled_http = pickle.dumps(self.http)
|
| - new_http = pickle.loads(pickled_http)
|
| -
|
| - self.assertEqual(list(self.http.connections.keys()),
|
| - ['http:bitworking.org'])
|
| - self.assertEqual(new_http.connections, {})
|
| -
|
| - def testPickleCustomRequestHttp(self):
|
| - def dummy_request(*args, **kwargs):
|
| - return new_request(*args, **kwargs)
|
| - dummy_request.dummy_attr = 'dummy_value'
|
| -
|
| - self.http.request = dummy_request
|
| - pickled_http = pickle.dumps(self.http)
|
| - self.assertFalse(b"S'request'" in pickled_http)
|
| -
|
| -try:
|
| - import memcache
|
| - class HttpTestMemCached(HttpTest):
|
| - def setUp(self):
|
| - self.cache = memcache.Client(['127.0.0.1:11211'], debug=0)
|
| - #self.cache = memcache.Client(['10.0.0.4:11211'], debug=1)
|
| - self.http = httplib2.Http(self.cache)
|
| - self.cache.flush_all()
|
| - # Not exactly sure why the sleep is needed here, but
|
| - # if not present then some unit tests that rely on caching
|
| - # fail. Memcached seems to lose some sets immediately
|
| - # after a flush_all if the set is to a value that
|
| - # was previously cached. (Maybe the flush is handled async?)
|
| - time.sleep(1)
|
| - self.http.clear_credentials()
|
| -except:
|
| - pass
|
| -
|
| -
|
| -
|
| -# ------------------------------------------------------------------------
|
| -
|
| -class HttpPrivateTest(unittest.TestCase):
|
| -
|
| - def testParseCacheControl(self):
|
| - # Test that we can parse the Cache-Control header
|
| - self.assertEqual({}, httplib2._parse_cache_control({}))
|
| - self.assertEqual({'no-cache': 1}, httplib2._parse_cache_control({'cache-control': ' no-cache'}))
|
| - cc = httplib2._parse_cache_control({'cache-control': ' no-cache, max-age = 7200'})
|
| - self.assertEqual(cc['no-cache'], 1)
|
| - self.assertEqual(cc['max-age'], '7200')
|
| - cc = httplib2._parse_cache_control({'cache-control': ' , '})
|
| - self.assertEqual(cc[''], 1)
|
| -
|
| - try:
|
| - cc = httplib2._parse_cache_control({'cache-control': 'Max-age=3600;post-check=1800,pre-check=3600'})
|
| - self.assertTrue("max-age" in cc)
|
| - except:
|
| - self.fail("Should not throw exception")
|
| -
|
| -
|
| -
|
| -
|
| - def testNormalizeHeaders(self):
|
| - # Test that we normalize headers to lowercase
|
| - h = httplib2._normalize_headers({'Cache-Control': 'no-cache', 'Other': 'Stuff'})
|
| - self.assertTrue('cache-control' in h)
|
| - self.assertTrue('other' in h)
|
| - self.assertEqual('Stuff', h['other'])
|
| -
|
| - def testExpirationModelTransparent(self):
|
| - # Test that no-cache makes our request TRANSPARENT
|
| - response_headers = {
|
| - 'cache-control': 'max-age=7200'
|
| - }
|
| - request_headers = {
|
| - 'cache-control': 'no-cache'
|
| - }
|
| - self.assertEqual("TRANSPARENT", httplib2._entry_disposition(response_headers, request_headers))
|
| -
|
| - def testMaxAgeNonNumeric(self):
|
| - # Test that no-cache makes our request TRANSPARENT
|
| - response_headers = {
|
| - 'cache-control': 'max-age=fred, min-fresh=barney'
|
| - }
|
| - request_headers = {
|
| - }
|
| - self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
|
| -
|
| -
|
| - def testExpirationModelNoCacheResponse(self):
|
| - # The date and expires point to an entry that should be
|
| - # FRESH, but the no-cache over-rides that.
|
| - now = time.time()
|
| - response_headers = {
|
| - 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
|
| - 'expires': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now+4)),
|
| - 'cache-control': 'no-cache'
|
| - }
|
| - request_headers = {
|
| - }
|
| - self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
|
| -
|
| - def testExpirationModelStaleRequestMustReval(self):
|
| - # must-revalidate forces STALE
|
| - self.assertEqual("STALE", httplib2._entry_disposition({}, {'cache-control': 'must-revalidate'}))
|
| -
|
| - def testExpirationModelStaleResponseMustReval(self):
|
| - # must-revalidate forces STALE
|
| - self.assertEqual("STALE", httplib2._entry_disposition({'cache-control': 'must-revalidate'}, {}))
|
| -
|
| - def testExpirationModelFresh(self):
|
| - response_headers = {
|
| - 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime()),
|
| - 'cache-control': 'max-age=2'
|
| - }
|
| - request_headers = {
|
| - }
|
| - self.assertEqual("FRESH", httplib2._entry_disposition(response_headers, request_headers))
|
| - time.sleep(3)
|
| - self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
|
| -
|
| - def testExpirationMaxAge0(self):
|
| - response_headers = {
|
| - 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime()),
|
| - 'cache-control': 'max-age=0'
|
| - }
|
| - request_headers = {
|
| - }
|
| - self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
|
| -
|
| - def testExpirationModelDateAndExpires(self):
|
| - now = time.time()
|
| - response_headers = {
|
| - 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
|
| - 'expires': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now+2)),
|
| - }
|
| - request_headers = {
|
| - }
|
| - self.assertEqual("FRESH", httplib2._entry_disposition(response_headers, request_headers))
|
| - time.sleep(3)
|
| - self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
|
| -
|
| - def testExpiresZero(self):
|
| - now = time.time()
|
| - response_headers = {
|
| - 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
|
| - 'expires': "0",
|
| - }
|
| - request_headers = {
|
| - }
|
| - self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
|
| -
|
| - def testExpirationModelDateOnly(self):
|
| - now = time.time()
|
| - response_headers = {
|
| - 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now+3)),
|
| - }
|
| - request_headers = {
|
| - }
|
| - self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
|
| -
|
| - def testExpirationModelOnlyIfCached(self):
|
| - response_headers = {
|
| - }
|
| - request_headers = {
|
| - 'cache-control': 'only-if-cached',
|
| - }
|
| - self.assertEqual("FRESH", httplib2._entry_disposition(response_headers, request_headers))
|
| -
|
| - def testExpirationModelMaxAgeBoth(self):
|
| - now = time.time()
|
| - response_headers = {
|
| - 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
|
| - 'cache-control': 'max-age=2'
|
| - }
|
| - request_headers = {
|
| - 'cache-control': 'max-age=0'
|
| - }
|
| - self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
|
| -
|
| - def testExpirationModelDateAndExpiresMinFresh1(self):
|
| - now = time.time()
|
| - response_headers = {
|
| - 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
|
| - 'expires': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now+2)),
|
| - }
|
| - request_headers = {
|
| - 'cache-control': 'min-fresh=2'
|
| - }
|
| - self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
|
| -
|
| - def testExpirationModelDateAndExpiresMinFresh2(self):
|
| - now = time.time()
|
| - response_headers = {
|
| - 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
|
| - 'expires': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now+4)),
|
| - }
|
| - request_headers = {
|
| - 'cache-control': 'min-fresh=2'
|
| - }
|
| - self.assertEqual("FRESH", httplib2._entry_disposition(response_headers, request_headers))
|
| -
|
| - def testParseWWWAuthenticateEmpty(self):
|
| - res = httplib2._parse_www_authenticate({})
|
| - self.assertEqual(len(list(res.keys())), 0)
|
| -
|
| - def testParseWWWAuthenticate(self):
|
| - # different uses of spaces around commas
|
| - res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Test realm="test realm" , foo=foo ,bar="bar", baz=baz,qux=qux'})
|
| - self.assertEqual(len(list(res.keys())), 1)
|
| - self.assertEqual(len(list(res['test'].keys())), 5)
|
| -
|
| - # tokens with non-alphanum
|
| - res = httplib2._parse_www_authenticate({ 'www-authenticate': 'T*!%#st realm=to*!%#en, to*!%#en="quoted string"'})
|
| - self.assertEqual(len(list(res.keys())), 1)
|
| - self.assertEqual(len(list(res['t*!%#st'].keys())), 2)
|
| -
|
| - # quoted string with quoted pairs
|
| - res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Test realm="a \\"test\\" realm"'})
|
| - self.assertEqual(len(list(res.keys())), 1)
|
| - self.assertEqual(res['test']['realm'], 'a "test" realm')
|
| -
|
| - def testParseWWWAuthenticateStrict(self):
|
| - httplib2.USE_WWW_AUTH_STRICT_PARSING = 1;
|
| - self.testParseWWWAuthenticate();
|
| - httplib2.USE_WWW_AUTH_STRICT_PARSING = 0;
|
| -
|
| - def testParseWWWAuthenticateBasic(self):
|
| - res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Basic realm="me"'})
|
| - basic = res['basic']
|
| - self.assertEqual('me', basic['realm'])
|
| -
|
| - res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Basic realm="me", algorithm="MD5"'})
|
| - basic = res['basic']
|
| - self.assertEqual('me', basic['realm'])
|
| - self.assertEqual('MD5', basic['algorithm'])
|
| -
|
| - res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Basic realm="me", algorithm=MD5'})
|
| - basic = res['basic']
|
| - self.assertEqual('me', basic['realm'])
|
| - self.assertEqual('MD5', basic['algorithm'])
|
| -
|
| - def testParseWWWAuthenticateBasic2(self):
|
| - res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Basic realm="me",other="fred" '})
|
| - basic = res['basic']
|
| - self.assertEqual('me', basic['realm'])
|
| - self.assertEqual('fred', basic['other'])
|
| -
|
| - def testParseWWWAuthenticateBasic3(self):
|
| - res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Basic REAlm="me" '})
|
| - basic = res['basic']
|
| - self.assertEqual('me', basic['realm'])
|
| -
|
| -
|
| - def testParseWWWAuthenticateDigest(self):
|
| - res = httplib2._parse_www_authenticate({ 'www-authenticate':
|
| - 'Digest realm="testrealm@host.com", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41"'})
|
| - digest = res['digest']
|
| - self.assertEqual('testrealm@host.com', digest['realm'])
|
| - self.assertEqual('auth,auth-int', digest['qop'])
|
| -
|
| -
|
| - def testParseWWWAuthenticateMultiple(self):
|
| - res = httplib2._parse_www_authenticate({ 'www-authenticate':
|
| - 'Digest realm="testrealm@host.com", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41" Basic REAlm="me" '})
|
| - digest = res['digest']
|
| - self.assertEqual('testrealm@host.com', digest['realm'])
|
| - self.assertEqual('auth,auth-int', digest['qop'])
|
| - self.assertEqual('dcd98b7102dd2f0e8b11d0f600bfb0c093', digest['nonce'])
|
| - self.assertEqual('5ccc069c403ebaf9f0171e9517f40e41', digest['opaque'])
|
| - basic = res['basic']
|
| - self.assertEqual('me', basic['realm'])
|
| -
|
| - def testParseWWWAuthenticateMultiple2(self):
|
| - # Handle an added comma between challenges, which might get thrown in if the challenges were
|
| - # originally sent in separate www-authenticate headers.
|
| - res = httplib2._parse_www_authenticate({ 'www-authenticate':
|
| - 'Digest realm="testrealm@host.com", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41", Basic REAlm="me" '})
|
| - digest = res['digest']
|
| - self.assertEqual('testrealm@host.com', digest['realm'])
|
| - self.assertEqual('auth,auth-int', digest['qop'])
|
| - self.assertEqual('dcd98b7102dd2f0e8b11d0f600bfb0c093', digest['nonce'])
|
| - self.assertEqual('5ccc069c403ebaf9f0171e9517f40e41', digest['opaque'])
|
| - basic = res['basic']
|
| - self.assertEqual('me', basic['realm'])
|
| -
|
| - def testParseWWWAuthenticateMultiple3(self):
|
| - # Handle an added comma between challenges, which might get thrown in if the challenges were
|
| - # originally sent in separate www-authenticate headers.
|
| - res = httplib2._parse_www_authenticate({ 'www-authenticate':
|
| - 'Digest realm="testrealm@host.com", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41", Basic REAlm="me", WSSE realm="foo", profile="UsernameToken"'})
|
| - digest = res['digest']
|
| - self.assertEqual('testrealm@host.com', digest['realm'])
|
| - self.assertEqual('auth,auth-int', digest['qop'])
|
| - self.assertEqual('dcd98b7102dd2f0e8b11d0f600bfb0c093', digest['nonce'])
|
| - self.assertEqual('5ccc069c403ebaf9f0171e9517f40e41', digest['opaque'])
|
| - basic = res['basic']
|
| - self.assertEqual('me', basic['realm'])
|
| - wsse = res['wsse']
|
| - self.assertEqual('foo', wsse['realm'])
|
| - self.assertEqual('UsernameToken', wsse['profile'])
|
| -
|
| - def testParseWWWAuthenticateMultiple4(self):
|
| - res = httplib2._parse_www_authenticate({ 'www-authenticate':
|
| - 'Digest realm="test-real.m@host.com", qop \t=\t"\tauth,auth-int", nonce="(*)&^&$%#",opaque="5ccc069c403ebaf9f0171e9517f40e41", Basic REAlm="me", WSSE realm="foo", profile="UsernameToken"'})
|
| - digest = res['digest']
|
| - self.assertEqual('test-real.m@host.com', digest['realm'])
|
| - self.assertEqual('\tauth,auth-int', digest['qop'])
|
| - self.assertEqual('(*)&^&$%#', digest['nonce'])
|
| -
|
| - def testParseWWWAuthenticateMoreQuoteCombos(self):
|
| - res = httplib2._parse_www_authenticate({'www-authenticate':'Digest realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", algorithm=MD5, qop="auth", stale=true'})
|
| - digest = res['digest']
|
| - self.assertEqual('myrealm', digest['realm'])
|
| -
|
| - def testParseWWWAuthenticateMalformed(self):
|
| - try:
|
| - res = httplib2._parse_www_authenticate({'www-authenticate':'OAuth "Facebook Platform" "invalid_token" "Invalid OAuth access token."'})
|
| - self.fail("should raise an exception")
|
| - except httplib2.MalformedHeader:
|
| - pass
|
| -
|
| - def testDigestObject(self):
|
| - credentials = ('joe', 'password')
|
| - host = None
|
| - request_uri = '/projects/httplib2/test/digest/'
|
| - headers = {}
|
| - response = {
|
| - 'www-authenticate': 'Digest realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", algorithm=MD5, qop="auth"'
|
| - }
|
| - content = b""
|
| -
|
| - d = httplib2.DigestAuthentication(credentials, host, request_uri, headers, response, content, None)
|
| - d.request("GET", request_uri, headers, content, cnonce="33033375ec278a46")
|
| - our_request = "authorization: %s" % headers['authorization']
|
| - working_request = 'authorization: Digest username="joe", realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", uri="/projects/httplib2/test/digest/", algorithm=MD5, response="97ed129401f7cdc60e5db58a80f3ea8b", qop=auth, nc=00000001, cnonce="33033375ec278a46"'
|
| - self.assertEqual(our_request, working_request)
|
| -
|
| - def testDigestObjectWithOpaque(self):
|
| - credentials = ('joe', 'password')
|
| - host = None
|
| - request_uri = '/projects/httplib2/test/digest/'
|
| - headers = {}
|
| - response = {
|
| - 'www-authenticate': 'Digest realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", algorithm=MD5, qop="auth", opaque="atestopaque"'
|
| - }
|
| - content = ""
|
| -
|
| - d = httplib2.DigestAuthentication(credentials, host, request_uri, headers, response, content, None)
|
| - d.request("GET", request_uri, headers, content, cnonce="33033375ec278a46")
|
| - our_request = "authorization: %s" % headers['authorization']
|
| - working_request = 'authorization: Digest username="joe", realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", uri="/projects/httplib2/test/digest/", algorithm=MD5, response="97ed129401f7cdc60e5db58a80f3ea8b", qop=auth, nc=00000001, cnonce="33033375ec278a46", opaque="atestopaque"'
|
| - self.assertEqual(our_request, working_request)
|
| -
|
| - def testDigestObjectStale(self):
|
| - credentials = ('joe', 'password')
|
| - host = None
|
| - request_uri = '/projects/httplib2/test/digest/'
|
| - headers = {}
|
| - response = httplib2.Response({ })
|
| - response['www-authenticate'] = 'Digest realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", algorithm=MD5, qop="auth", stale=true'
|
| - response.status = 401
|
| - content = b""
|
| - d = httplib2.DigestAuthentication(credentials, host, request_uri, headers, response, content, None)
|
| - # Returns true to force a retry
|
| - self.assertTrue( d.response(response, content) )
|
| -
|
| - def testDigestObjectAuthInfo(self):
|
| - credentials = ('joe', 'password')
|
| - host = None
|
| - request_uri = '/projects/httplib2/test/digest/'
|
| - headers = {}
|
| - response = httplib2.Response({ })
|
| - response['www-authenticate'] = 'Digest realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", algorithm=MD5, qop="auth", stale=true'
|
| - response['authentication-info'] = 'nextnonce="fred"'
|
| - content = b""
|
| - d = httplib2.DigestAuthentication(credentials, host, request_uri, headers, response, content, None)
|
| - # Returns true to force a retry
|
| - self.assertFalse( d.response(response, content) )
|
| - self.assertEqual('fred', d.challenge['nonce'])
|
| - self.assertEqual(1, d.challenge['nc'])
|
| -
|
| - def testWsseAlgorithm(self):
|
| - digest = httplib2._wsse_username_token("d36e316282959a9ed4c89851497a717f", "2003-12-15T14:43:07Z", "taadtaadpstcsm")
|
| - expected = b"quR/EWLAV4xLf9Zqyw4pDmfV9OY="
|
| - self.assertEqual(expected, digest)
|
| -
|
| - def testEnd2End(self):
|
| - # one end to end header
|
| - response = {'content-type': 'application/atom+xml', 'te': 'deflate'}
|
| - end2end = httplib2._get_end2end_headers(response)
|
| - self.assertTrue('content-type' in end2end)
|
| - self.assertTrue('te' not in end2end)
|
| - self.assertTrue('connection' not in end2end)
|
| -
|
| - # one end to end header that gets eliminated
|
| - response = {'connection': 'content-type', 'content-type': 'application/atom+xml', 'te': 'deflate'}
|
| - end2end = httplib2._get_end2end_headers(response)
|
| - self.assertTrue('content-type' not in end2end)
|
| - self.assertTrue('te' not in end2end)
|
| - self.assertTrue('connection' not in end2end)
|
| -
|
| - # Degenerate case of no headers
|
| - response = {}
|
| - end2end = httplib2._get_end2end_headers(response)
|
| - self.assertEqual(0, len(end2end))
|
| -
|
| - # Degenerate case of connection referrring to a header not passed in
|
| - response = {'connection': 'content-type'}
|
| - end2end = httplib2._get_end2end_headers(response)
|
| - self.assertEqual(0, len(end2end))
|
| -
|
| -
|
| -class TestProxyInfo(unittest.TestCase):
|
| - def setUp(self):
|
| - self.orig_env = dict(os.environ)
|
| -
|
| - def tearDown(self):
|
| - os.environ.clear()
|
| - os.environ.update(self.orig_env)
|
| -
|
| - def test_from_url(self):
|
| - pi = httplib2.proxy_info_from_url('http://myproxy.example.com')
|
| - self.assertEqual(pi.proxy_host, 'myproxy.example.com')
|
| - self.assertEqual(pi.proxy_port, 80)
|
| - self.assertEqual(pi.proxy_user, None)
|
| -
|
| - def test_from_url_ident(self):
|
| - pi = httplib2.proxy_info_from_url('http://zoidberg:fish@someproxy:99')
|
| - self.assertEqual(pi.proxy_host, 'someproxy')
|
| - self.assertEqual(pi.proxy_port, 99)
|
| - self.assertEqual(pi.proxy_user, 'zoidberg')
|
| - self.assertEqual(pi.proxy_pass, 'fish')
|
| -
|
| - def test_from_env(self):
|
| - os.environ['http_proxy'] = 'http://myproxy.example.com:8080'
|
| - pi = httplib2.proxy_info_from_environment()
|
| - self.assertEqual(pi.proxy_host, 'myproxy.example.com')
|
| - self.assertEqual(pi.proxy_port, 8080)
|
| -
|
| - def test_from_env_no_proxy(self):
|
| - os.environ['http_proxy'] = 'http://myproxy.example.com:80'
|
| - os.environ['https_proxy'] = 'http://myproxy.example.com:81'
|
| - pi = httplib2.proxy_info_from_environment('https')
|
| - self.assertEqual(pi.proxy_host, 'myproxy.example.com')
|
| - self.assertEqual(pi.proxy_port, 81)
|
| -
|
| - def test_from_env_none(self):
|
| - os.environ.clear()
|
| - pi = httplib2.proxy_info_from_environment()
|
| - self.assertEqual(pi, None)
|
| -
|
| -
|
| -if __name__ == '__main__':
|
| - unittest.main()
|
| +#!/usr/bin/env python3
|
| +"""
|
| +httplib2test
|
| +
|
| +A set of unit tests for httplib2.py.
|
| +
|
| +Requires Python 3.0 or later
|
| +"""
|
| +
|
| +__author__ = "Joe Gregorio (joe@bitworking.org)"
|
| +__copyright__ = "Copyright 2006, Joe Gregorio"
|
| +__contributors__ = ["Mark Pilgrim"]
|
| +__license__ = "MIT"
|
| +__history__ = """ """
|
| +__version__ = "0.2 ($Rev: 118 $)"
|
| +
|
| +import base64
|
| +import http.client
|
| +import httplib2
|
| +import io
|
| +import os
|
| +import pickle
|
| +import socket
|
| +import ssl
|
| +import sys
|
| +import time
|
| +import unittest
|
| +import urllib.parse
|
| +
|
| +# The test resources base uri
|
| +base = 'http://bitworking.org/projects/httplib2/test/'
|
| +#base = 'http://localhost/projects/httplib2/test/'
|
| +cacheDirName = ".cache"
|
| +
|
| +
|
| +class CredentialsTest(unittest.TestCase):
|
| + def test(self):
|
| + c = httplib2.Credentials()
|
| + c.add("joe", "password")
|
| + self.assertEqual(("joe", "password"), list(c.iter("bitworking.org"))[0])
|
| + self.assertEqual(("joe", "password"), list(c.iter(""))[0])
|
| + c.add("fred", "password2", "wellformedweb.org")
|
| + self.assertEqual(("joe", "password"), list(c.iter("bitworking.org"))[0])
|
| + self.assertEqual(1, len(list(c.iter("bitworking.org"))))
|
| + self.assertEqual(2, len(list(c.iter("wellformedweb.org"))))
|
| + self.assertTrue(("fred", "password2") in list(c.iter("wellformedweb.org")))
|
| + c.clear()
|
| + self.assertEqual(0, len(list(c.iter("bitworking.org"))))
|
| + c.add("fred", "password2", "wellformedweb.org")
|
| + self.assertTrue(("fred", "password2") in list(c.iter("wellformedweb.org")))
|
| + self.assertEqual(0, len(list(c.iter("bitworking.org"))))
|
| + self.assertEqual(0, len(list(c.iter(""))))
|
| +
|
| +
|
| +class ParserTest(unittest.TestCase):
|
| + def testFromStd66(self):
|
| + self.assertEqual( ('http', 'example.com', '', None, None ), httplib2.parse_uri("http://example.com"))
|
| + self.assertEqual( ('https', 'example.com', '', None, None ), httplib2.parse_uri("https://example.com"))
|
| + self.assertEqual( ('https', 'example.com:8080', '', None, None ), httplib2.parse_uri("https://example.com:8080"))
|
| + self.assertEqual( ('http', 'example.com', '/', None, None ), httplib2.parse_uri("http://example.com/"))
|
| + self.assertEqual( ('http', 'example.com', '/path', None, None ), httplib2.parse_uri("http://example.com/path"))
|
| + self.assertEqual( ('http', 'example.com', '/path', 'a=1&b=2', None ), httplib2.parse_uri("http://example.com/path?a=1&b=2"))
|
| + self.assertEqual( ('http', 'example.com', '/path', 'a=1&b=2', 'fred' ), httplib2.parse_uri("http://example.com/path?a=1&b=2#fred"))
|
| + self.assertEqual( ('http', 'example.com', '/path', 'a=1&b=2', 'fred' ), httplib2.parse_uri("http://example.com/path?a=1&b=2#fred"))
|
| +
|
| +
|
| +class UrlNormTest(unittest.TestCase):
|
| + def test(self):
|
| + self.assertEqual( "http://example.org/", httplib2.urlnorm("http://example.org")[-1])
|
| + self.assertEqual( "http://example.org/", httplib2.urlnorm("http://EXAMple.org")[-1])
|
| + self.assertEqual( "http://example.org/?=b", httplib2.urlnorm("http://EXAMple.org?=b")[-1])
|
| + self.assertEqual( "http://example.org/mypath?a=b", httplib2.urlnorm("http://EXAMple.org/mypath?a=b")[-1])
|
| + self.assertEqual( "http://localhost:80/", httplib2.urlnorm("http://localhost:80")[-1])
|
| + self.assertEqual( httplib2.urlnorm("http://localhost:80/"), httplib2.urlnorm("HTTP://LOCALHOST:80"))
|
| + try:
|
| + httplib2.urlnorm("/")
|
| + self.fail("Non-absolute URIs should raise an exception")
|
| + except httplib2.RelativeURIError:
|
| + pass
|
| +
|
| +class UrlSafenameTest(unittest.TestCase):
|
| + def test(self):
|
| + # Test that different URIs end up generating different safe names
|
| + self.assertEqual( "example.org,fred,a=b,58489f63a7a83c3b7794a6a398ee8b1f", httplib2.safename("http://example.org/fred/?a=b"))
|
| + self.assertEqual( "example.org,fred,a=b,8c5946d56fec453071f43329ff0be46b", httplib2.safename("http://example.org/fred?/a=b"))
|
| + self.assertEqual( "www.example.org,fred,a=b,499c44b8d844a011b67ea2c015116968", httplib2.safename("http://www.example.org/fred?/a=b"))
|
| + self.assertEqual( httplib2.safename(httplib2.urlnorm("http://www")[-1]), httplib2.safename(httplib2.urlnorm("http://WWW")[-1]))
|
| + self.assertEqual( "www.example.org,fred,a=b,692e843a333484ce0095b070497ab45d", httplib2.safename("https://www.example.org/fred?/a=b"))
|
| + self.assertNotEqual( httplib2.safename("http://www"), httplib2.safename("https://www"))
|
| + # Test the max length limits
|
| + uri = "http://" + ("w" * 200) + ".org"
|
| + uri2 = "http://" + ("w" * 201) + ".org"
|
| + self.assertNotEqual( httplib2.safename(uri2), httplib2.safename(uri))
|
| + # Max length should be 200 + 1 (",") + 32
|
| + self.assertEqual(233, len(httplib2.safename(uri2)))
|
| + self.assertEqual(233, len(httplib2.safename(uri)))
|
| + # Unicode
|
| + if sys.version_info >= (2,3):
|
| + self.assertEqual( "xn--http,-4y1d.org,fred,a=b,579924c35db315e5a32e3d9963388193", httplib2.safename("http://\u2304.org/fred/?a=b"))
|
| +
|
| +class _MyResponse(io.BytesIO):
|
| + def __init__(self, body, **kwargs):
|
| + io.BytesIO.__init__(self, body)
|
| + self.headers = kwargs
|
| +
|
| + def items(self):
|
| + return self.headers.items()
|
| +
|
| + def iteritems(self):
|
| + return iter(self.headers.items())
|
| +
|
| +
|
| +class _MyHTTPConnection(object):
|
| + "This class is just a mock of httplib.HTTPConnection used for testing"
|
| +
|
| + def __init__(self, host, port=None, key_file=None, cert_file=None,
|
| + strict=None, timeout=None, proxy_info=None):
|
| + self.host = host
|
| + self.port = port
|
| + self.timeout = timeout
|
| + self.log = ""
|
| + self.sock = None
|
| +
|
| + def set_debuglevel(self, level):
|
| + pass
|
| +
|
| + def connect(self):
|
| + "Connect to a host on a given port."
|
| + pass
|
| +
|
| + def close(self):
|
| + pass
|
| +
|
| + def request(self, method, request_uri, body, headers):
|
| + pass
|
| +
|
| + def getresponse(self):
|
| + return _MyResponse(b"the body", status="200")
|
| +
|
| +
|
| +class HttpTest(unittest.TestCase):
|
| + def setUp(self):
|
| + if os.path.exists(cacheDirName):
|
| + [os.remove(os.path.join(cacheDirName, file)) for file in os.listdir(cacheDirName)]
|
| + self.http = httplib2.Http(cacheDirName)
|
| + self.http.clear_credentials()
|
| +
|
| + def testIPv6NoSSL(self):
|
| + try:
|
| + self.http.request("http://[::1]/")
|
| + except socket.gaierror:
|
| + self.fail("should get the address family right for IPv6")
|
| + except socket.error:
|
| + # Even if IPv6 isn't installed on a machine it should just raise socket.error
|
| + pass
|
| +
|
| + def testIPv6SSL(self):
|
| + try:
|
| + self.http.request("https://[::1]/")
|
| + except socket.gaierror:
|
| + self.fail("should get the address family right for IPv6")
|
| + except socket.error:
|
| + # Even if IPv6 isn't installed on a machine it should just raise socket.error
|
| + pass
|
| +
|
| + def testConnectionType(self):
|
| + self.http.force_exception_to_status_code = False
|
| + response, content = self.http.request("http://bitworking.org", connection_type=_MyHTTPConnection)
|
| + self.assertEqual(response['content-location'], "http://bitworking.org")
|
| + self.assertEqual(content, b"the body")
|
| +
|
| + def testGetUnknownServer(self):
|
| + self.http.force_exception_to_status_code = False
|
| + try:
|
| + self.http.request("http://fred.bitworking.org/")
|
| + self.fail("An httplib2.ServerNotFoundError Exception must be thrown on an unresolvable server.")
|
| + except httplib2.ServerNotFoundError:
|
| + pass
|
| +
|
| + # Now test with exceptions turned off
|
| + self.http.force_exception_to_status_code = True
|
| +
|
| + (response, content) = self.http.request("http://fred.bitworking.org/")
|
| + self.assertEqual(response['content-type'], 'text/plain')
|
| + self.assertTrue(content.startswith(b"Unable to find"))
|
| + self.assertEqual(response.status, 400)
|
| +
|
| + def testGetConnectionRefused(self):
|
| + self.http.force_exception_to_status_code = False
|
| + try:
|
| + self.http.request("http://localhost:7777/")
|
| + self.fail("An socket.error exception must be thrown on Connection Refused.")
|
| + except socket.error:
|
| + pass
|
| +
|
| + # Now test with exceptions turned off
|
| + self.http.force_exception_to_status_code = True
|
| +
|
| + (response, content) = self.http.request("http://localhost:7777/")
|
| + self.assertEqual(response['content-type'], 'text/plain')
|
| + self.assertTrue(b"Connection refused" in content)
|
| + self.assertEqual(response.status, 400)
|
| +
|
| + def testGetIRI(self):
|
| + if sys.version_info >= (2,3):
|
| + uri = urllib.parse.urljoin(base, "reflector/reflector.cgi?d=\N{CYRILLIC CAPITAL LETTER DJE}")
|
| + (response, content) = self.http.request(uri, "GET")
|
| + d = self.reflector(content)
|
| + self.assertTrue('QUERY_STRING' in d)
|
| + self.assertTrue(d['QUERY_STRING'].find('%D0%82') > 0)
|
| +
|
| + def testGetIsDefaultMethod(self):
|
| + # Test that GET is the default method
|
| + uri = urllib.parse.urljoin(base, "methods/method_reflector.cgi")
|
| + (response, content) = self.http.request(uri)
|
| + self.assertEqual(response['x-method'], "GET")
|
| +
|
| + def testDifferentMethods(self):
|
| + # Test that all methods can be used
|
| + uri = urllib.parse.urljoin(base, "methods/method_reflector.cgi")
|
| + for method in ["GET", "PUT", "DELETE", "POST"]:
|
| + (response, content) = self.http.request(uri, method, body=b" ")
|
| + self.assertEqual(response['x-method'], method)
|
| +
|
| + def testHeadRead(self):
|
| + # Test that we don't try to read the response of a HEAD request
|
| + # since httplib blocks response.read() for HEAD requests.
|
| + # Oddly enough this doesn't appear as a problem when doing HEAD requests
|
| + # against Apache servers.
|
| + uri = "http://www.google.com/"
|
| + (response, content) = self.http.request(uri, "HEAD")
|
| + self.assertEqual(response.status, 200)
|
| + self.assertEqual(content, b"")
|
| +
|
| + def testGetNoCache(self):
|
| + # Test that can do a GET w/o the cache turned on.
|
| + http = httplib2.Http()
|
| + uri = urllib.parse.urljoin(base, "304/test_etag.txt")
|
| + (response, content) = http.request(uri, "GET")
|
| + self.assertEqual(response.status, 200)
|
| + self.assertEqual(response.previous, None)
|
| +
|
| + def testGetOnlyIfCachedCacheHit(self):
|
| + # Test that can do a GET with cache and 'only-if-cached'
|
| + uri = urllib.parse.urljoin(base, "304/test_etag.txt")
|
| + (response, content) = self.http.request(uri, "GET")
|
| + (response, content) = self.http.request(uri, "GET", headers={'cache-control': 'only-if-cached'})
|
| + self.assertEqual(response.fromcache, True)
|
| + self.assertEqual(response.status, 200)
|
| +
|
| + def testGetOnlyIfCachedCacheMiss(self):
|
| + # Test that can do a GET with no cache with 'only-if-cached'
|
| + uri = urllib.parse.urljoin(base, "304/test_etag.txt")
|
| + (response, content) = self.http.request(uri, "GET", headers={'cache-control': 'only-if-cached'})
|
| + self.assertEqual(response.fromcache, False)
|
| + self.assertEqual(response.status, 504)
|
| +
|
| + def testGetOnlyIfCachedNoCacheAtAll(self):
|
| + # Test that can do a GET with no cache with 'only-if-cached'
|
| + # Of course, there might be an intermediary beyond us
|
| + # that responds to the 'only-if-cached', so this
|
| + # test can't really be guaranteed to pass.
|
| + http = httplib2.Http()
|
| + uri = urllib.parse.urljoin(base, "304/test_etag.txt")
|
| + (response, content) = http.request(uri, "GET", headers={'cache-control': 'only-if-cached'})
|
| + self.assertEqual(response.fromcache, False)
|
| + self.assertEqual(response.status, 504)
|
| +
|
| + def testUserAgent(self):
|
| + # Test that we provide a default user-agent
|
| + uri = urllib.parse.urljoin(base, "user-agent/test.cgi")
|
| + (response, content) = self.http.request(uri, "GET")
|
| + self.assertEqual(response.status, 200)
|
| + self.assertTrue(content.startswith(b"Python-httplib2/"))
|
| +
|
| + def testUserAgentNonDefault(self):
|
| + # Test that the default user-agent can be over-ridden
|
| +
|
| + uri = urllib.parse.urljoin(base, "user-agent/test.cgi")
|
| + (response, content) = self.http.request(uri, "GET", headers={'User-Agent': 'fred/1.0'})
|
| + self.assertEqual(response.status, 200)
|
| + self.assertTrue(content.startswith(b"fred/1.0"))
|
| +
|
| + def testGet300WithLocation(self):
|
| + # Test the we automatically follow 300 redirects if a Location: header is provided
|
| + uri = urllib.parse.urljoin(base, "300/with-location-header.asis")
|
| + (response, content) = self.http.request(uri, "GET")
|
| + self.assertEqual(response.status, 200)
|
| + self.assertEqual(content, b"This is the final destination.\n")
|
| + self.assertEqual(response.previous.status, 300)
|
| + self.assertEqual(response.previous.fromcache, False)
|
| +
|
| + # Confirm that the intermediate 300 is not cached
|
| + (response, content) = self.http.request(uri, "GET")
|
| + self.assertEqual(response.status, 200)
|
| + self.assertEqual(content, b"This is the final destination.\n")
|
| + self.assertEqual(response.previous.status, 300)
|
| + self.assertEqual(response.previous.fromcache, False)
|
| +
|
| + def testGet300WithLocationNoRedirect(self):
|
| + # Test the we automatically follow 300 redirects if a Location: header is provided
|
| + self.http.follow_redirects = False
|
| + uri = urllib.parse.urljoin(base, "300/with-location-header.asis")
|
| + (response, content) = self.http.request(uri, "GET")
|
| + self.assertEqual(response.status, 300)
|
| +
|
| + def testGet300WithoutLocation(self):
|
| + # Not giving a Location: header in a 300 response is acceptable
|
| + # In which case we just return the 300 response
|
| + uri = urllib.parse.urljoin(base, "300/without-location-header.asis")
|
| + (response, content) = self.http.request(uri, "GET")
|
| + self.assertEqual(response.status, 300)
|
| + self.assertTrue(response['content-type'].startswith("text/html"))
|
| + self.assertEqual(response.previous, None)
|
| +
|
| + def testGet301(self):
|
| + # Test that we automatically follow 301 redirects
|
| + # and that we cache the 301 response
|
| + uri = urllib.parse.urljoin(base, "301/onestep.asis")
|
| + destination = urllib.parse.urljoin(base, "302/final-destination.txt")
|
| + (response, content) = self.http.request(uri, "GET")
|
| + self.assertEqual(response.status, 200)
|
| + self.assertTrue('content-location' in response)
|
| + self.assertEqual(response['content-location'], destination)
|
| + self.assertEqual(content, b"This is the final destination.\n")
|
| + self.assertEqual(response.previous.status, 301)
|
| + self.assertEqual(response.previous.fromcache, False)
|
| +
|
| + (response, content) = self.http.request(uri, "GET")
|
| + self.assertEqual(response.status, 200)
|
| + self.assertEqual(response['content-location'], destination)
|
| + self.assertEqual(content, b"This is the final destination.\n")
|
| + self.assertEqual(response.previous.status, 301)
|
| + self.assertEqual(response.previous.fromcache, True)
|
| +
|
| + def testHead301(self):
|
| + # Test that we automatically follow 301 redirects
|
| + uri = urllib.parse.urljoin(base, "301/onestep.asis")
|
| + (response, content) = self.http.request(uri, "HEAD")
|
| + self.assertEqual(response.status, 200)
|
| + self.assertEqual(response.previous.status, 301)
|
| + self.assertEqual(response.previous.fromcache, False)
|
| +
|
| + def testGet301NoRedirect(self):
|
| + # Test that we automatically follow 301 redirects
|
| + # and that we cache the 301 response
|
| + self.http.follow_redirects = False
|
| + uri = urllib.parse.urljoin(base, "301/onestep.asis")
|
| + destination = urllib.parse.urljoin(base, "302/final-destination.txt")
|
| + (response, content) = self.http.request(uri, "GET")
|
| + self.assertEqual(response.status, 301)
|
| +
|
| +
|
| + def testGet302(self):
|
| + # Test that we automatically follow 302 redirects
|
| + # and that we DO NOT cache the 302 response
|
| + uri = urllib.parse.urljoin(base, "302/onestep.asis")
|
| + destination = urllib.parse.urljoin(base, "302/final-destination.txt")
|
| + (response, content) = self.http.request(uri, "GET")
|
| + self.assertEqual(response.status, 200)
|
| + self.assertEqual(response['content-location'], destination)
|
| + self.assertEqual(content, b"This is the final destination.\n")
|
| + self.assertEqual(response.previous.status, 302)
|
| + self.assertEqual(response.previous.fromcache, False)
|
| +
|
| + uri = urllib.parse.urljoin(base, "302/onestep.asis")
|
| + (response, content) = self.http.request(uri, "GET")
|
| + self.assertEqual(response.status, 200)
|
| + self.assertEqual(response.fromcache, True)
|
| + self.assertEqual(response['content-location'], destination)
|
| + self.assertEqual(content, b"This is the final destination.\n")
|
| + self.assertEqual(response.previous.status, 302)
|
| + self.assertEqual(response.previous.fromcache, False)
|
| + self.assertEqual(response.previous['content-location'], uri)
|
| +
|
| + uri = urllib.parse.urljoin(base, "302/twostep.asis")
|
| +
|
| + (response, content) = self.http.request(uri, "GET")
|
| + self.assertEqual(response.status, 200)
|
| + self.assertEqual(response.fromcache, True)
|
| + self.assertEqual(content, b"This is the final destination.\n")
|
| + self.assertEqual(response.previous.status, 302)
|
| + self.assertEqual(response.previous.fromcache, False)
|
| +
|
| + def testGet302RedirectionLimit(self):
|
| + # Test that we can set a lower redirection limit
|
| + # and that we raise an exception when we exceed
|
| + # that limit.
|
| + self.http.force_exception_to_status_code = False
|
| +
|
| + uri = urllib.parse.urljoin(base, "302/twostep.asis")
|
| + try:
|
| + (response, content) = self.http.request(uri, "GET", redirections = 1)
|
| + self.fail("This should not happen")
|
| + except httplib2.RedirectLimit:
|
| + pass
|
| + except Exception as e:
|
| + self.fail("Threw wrong kind of exception ")
|
| +
|
| + # Re-run the test with out the exceptions
|
| + self.http.force_exception_to_status_code = True
|
| +
|
| + (response, content) = self.http.request(uri, "GET", redirections = 1)
|
| + self.assertEqual(response.status, 500)
|
| + self.assertTrue(response.reason.startswith("Redirected more"))
|
| + self.assertEqual("302", response['status'])
|
| + self.assertTrue(content.startswith(b"<html>"))
|
| + self.assertTrue(response.previous != None)
|
| +
|
| + def testGet302NoLocation(self):
|
| + # Test that we throw an exception when we get
|
| + # a 302 with no Location: header.
|
| + self.http.force_exception_to_status_code = False
|
| + uri = urllib.parse.urljoin(base, "302/no-location.asis")
|
| + try:
|
| + (response, content) = self.http.request(uri, "GET")
|
| + self.fail("Should never reach here")
|
| + except httplib2.RedirectMissingLocation:
|
| + pass
|
| + except Exception as e:
|
| + self.fail("Threw wrong kind of exception ")
|
| +
|
| + # Re-run the test with out the exceptions
|
| + self.http.force_exception_to_status_code = True
|
| +
|
| + (response, content) = self.http.request(uri, "GET")
|
| + self.assertEqual(response.status, 500)
|
| + self.assertTrue(response.reason.startswith("Redirected but"))
|
| + self.assertEqual("302", response['status'])
|
| + self.assertTrue(content.startswith(b"This is content"))
|
| +
|
| + def testGet301ViaHttps(self):
|
| + # Google always redirects to http://google.com
|
| + (response, content) = self.http.request("https://code.google.com/apis/", "GET")
|
| + self.assertEqual(200, response.status)
|
| + self.assertEqual(301, response.previous.status)
|
| +
|
| + def testGetViaHttps(self):
|
| + # Test that we can handle HTTPS
|
| + (response, content) = self.http.request("https://google.com/adsense/", "GET")
|
| + self.assertEqual(200, response.status)
|
| +
|
| + def testGetViaHttpsSpecViolationOnLocation(self):
|
| + # Test that we follow redirects through HTTPS
|
| + # even if they violate the spec by including
|
| + # a relative Location: header instead of an
|
| + # absolute one.
|
| + (response, content) = self.http.request("https://google.com/adsense", "GET")
|
| + self.assertEqual(200, response.status)
|
| + self.assertNotEqual(None, response.previous)
|
| +
|
| +
|
| + def testGetViaHttpsKeyCert(self):
|
| + # At this point I can only test
|
| + # that the key and cert files are passed in
|
| + # correctly to httplib. It would be nice to have
|
| + # a real https endpoint to test against.
|
| + http = httplib2.Http(timeout=2)
|
| +
|
| + http.add_certificate("akeyfile", "acertfile", "bitworking.org")
|
| + try:
|
| + (response, content) = http.request("https://bitworking.org", "GET")
|
| + except AttributeError:
|
| + self.assertEqual(http.connections["https:bitworking.org"].key_file, "akeyfile")
|
| + self.assertEqual(http.connections["https:bitworking.org"].cert_file, "acertfile")
|
| + except IOError:
|
| + # Skip on 3.2
|
| + pass
|
| +
|
| + try:
|
| + (response, content) = http.request("https://notthere.bitworking.org", "GET")
|
| + except httplib2.ServerNotFoundError:
|
| + self.assertEqual(http.connections["https:notthere.bitworking.org"].key_file, None)
|
| + self.assertEqual(http.connections["https:notthere.bitworking.org"].cert_file, None)
|
| + except IOError:
|
| + # Skip on 3.2
|
| + pass
|
| +
|
| + def testSslCertValidation(self):
|
| + # Test that we get an ssl.SSLError when specifying a non-existent CA
|
| + # certs file.
|
| + http = httplib2.Http(ca_certs='/nosuchfile')
|
| + self.assertRaises(IOError,
|
| + http.request, "https://www.google.com/", "GET")
|
| +
|
| + # Test that we get a SSLHandshakeError if we try to access
|
| + # https://www.google.com, using a CA cert file that doesn't contain
|
| + # the CA Gogole uses (i.e., simulating a cert that's not signed by a
|
| + # trusted CA).
|
| + other_ca_certs = os.path.join(
|
| + os.path.dirname(os.path.abspath(httplib2.__file__ )),
|
| + "test", "other_cacerts.txt")
|
| + http = httplib2.Http(ca_certs=other_ca_certs)
|
| + self.assertRaises(ssl.SSLError,
|
| + http.request,"https://www.google.com/", "GET")
|
| +
|
| + def testSniHostnameValidation(self):
|
| + self.http.request("https://google.com/", method="GET")
|
| +
|
| + def testGet303(self):
|
| + # Do a follow-up GET on a Location: header
|
| + # returned from a POST that gave a 303.
|
| + uri = urllib.parse.urljoin(base, "303/303.cgi")
|
| + (response, content) = self.http.request(uri, "POST", " ")
|
| + self.assertEqual(response.status, 200)
|
| + self.assertEqual(content, b"This is the final destination.\n")
|
| + self.assertEqual(response.previous.status, 303)
|
| +
|
| + def testGet303NoRedirect(self):
|
| + # Do a follow-up GET on a Location: header
|
| + # returned from a POST that gave a 303.
|
| + self.http.follow_redirects = False
|
| + uri = urllib.parse.urljoin(base, "303/303.cgi")
|
| + (response, content) = self.http.request(uri, "POST", " ")
|
| + self.assertEqual(response.status, 303)
|
| +
|
| + def test303ForDifferentMethods(self):
|
| + # Test that all methods can be used
|
| + uri = urllib.parse.urljoin(base, "303/redirect-to-reflector.cgi")
|
| + for (method, method_on_303) in [("PUT", "GET"), ("DELETE", "GET"), ("POST", "GET"), ("GET", "GET"), ("HEAD", "GET")]:
|
| + (response, content) = self.http.request(uri, method, body=b" ")
|
| + self.assertEqual(response['x-method'], method_on_303)
|
| +
|
| + def testGet304(self):
|
| + # Test that we use ETags properly to validate our cache
|
| + uri = urllib.parse.urljoin(base, "304/test_etag.txt")
|
| + (response, content) = self.http.request(uri, "GET")
|
| + self.assertNotEqual(response['etag'], "")
|
| +
|
| + (response, content) = self.http.request(uri, "GET")
|
| + (response, content) = self.http.request(uri, "GET", headers = {'cache-control': 'must-revalidate'})
|
| + self.assertEqual(response.status, 200)
|
| + self.assertEqual(response.fromcache, True)
|
| +
|
| + cache_file_name = os.path.join(cacheDirName, httplib2.safename(httplib2.urlnorm(uri)[-1]))
|
| + f = open(cache_file_name, "r")
|
| + status_line = f.readline()
|
| + f.close()
|
| +
|
| + self.assertTrue(status_line.startswith("status:"))
|
| +
|
| + (response, content) = self.http.request(uri, "HEAD")
|
| + self.assertEqual(response.status, 200)
|
| + self.assertEqual(response.fromcache, True)
|
| +
|
| + (response, content) = self.http.request(uri, "GET", headers = {'range': 'bytes=0-0'})
|
| + self.assertEqual(response.status, 206)
|
| + self.assertEqual(response.fromcache, False)
|
| +
|
| + def testGetIgnoreEtag(self):
|
| + # Test that we can forcibly ignore ETags
|
| + uri = urllib.parse.urljoin(base, "reflector/reflector.cgi")
|
| + (response, content) = self.http.request(uri, "GET")
|
| + self.assertNotEqual(response['etag'], "")
|
| +
|
| + (response, content) = self.http.request(uri, "GET", headers = {'cache-control': 'max-age=0'})
|
| + d = self.reflector(content)
|
| + self.assertTrue('HTTP_IF_NONE_MATCH' in d)
|
| +
|
| + self.http.ignore_etag = True
|
| + (response, content) = self.http.request(uri, "GET", headers = {'cache-control': 'max-age=0'})
|
| + d = self.reflector(content)
|
| + self.assertEqual(response.fromcache, False)
|
| + self.assertFalse('HTTP_IF_NONE_MATCH' in d)
|
| +
|
| + def testOverrideEtag(self):
|
| + # Test that we can forcibly ignore ETags
|
| + uri = urllib.parse.urljoin(base, "reflector/reflector.cgi")
|
| + (response, content) = self.http.request(uri, "GET")
|
| + self.assertNotEqual(response['etag'], "")
|
| +
|
| + (response, content) = self.http.request(uri, "GET", headers = {'cache-control': 'max-age=0'})
|
| + d = self.reflector(content)
|
| + self.assertTrue('HTTP_IF_NONE_MATCH' in d)
|
| + self.assertNotEqual(d['HTTP_IF_NONE_MATCH'], "fred")
|
| +
|
| + (response, content) = self.http.request(uri, "GET", headers = {'cache-control': 'max-age=0', 'if-none-match': 'fred'})
|
| + d = self.reflector(content)
|
| + self.assertTrue('HTTP_IF_NONE_MATCH' in d)
|
| + self.assertEqual(d['HTTP_IF_NONE_MATCH'], "fred")
|
| +
|
| +#MAP-commented this out because it consistently fails
|
| +# def testGet304EndToEnd(self):
|
| +# # Test that end to end headers get overwritten in the cache
|
| +# uri = urllib.parse.urljoin(base, "304/end2end.cgi")
|
| +# (response, content) = self.http.request(uri, "GET")
|
| +# self.assertNotEqual(response['etag'], "")
|
| +# old_date = response['date']
|
| +# time.sleep(2)
|
| +#
|
| +# (response, content) = self.http.request(uri, "GET", headers = {'Cache-Control': 'max-age=0'})
|
| +# # The response should be from the cache, but the Date: header should be updated.
|
| +# new_date = response['date']
|
| +# self.assertNotEqual(new_date, old_date)
|
| +# self.assertEqual(response.status, 200)
|
| +# self.assertEqual(response.fromcache, True)
|
| +
|
| + def testGet304LastModified(self):
|
| + # Test that we can still handle a 304
|
| + # by only using the last-modified cache validator.
|
| + uri = urllib.parse.urljoin(base, "304/last-modified-only/last-modified-only.txt")
|
| + (response, content) = self.http.request(uri, "GET")
|
| +
|
| + self.assertNotEqual(response['last-modified'], "")
|
| + (response, content) = self.http.request(uri, "GET")
|
| + (response, content) = self.http.request(uri, "GET")
|
| + self.assertEqual(response.status, 200)
|
| + self.assertEqual(response.fromcache, True)
|
| +
|
| + def testGet307(self):
|
| + # Test that we do follow 307 redirects but
|
| + # do not cache the 307
|
| + uri = urllib.parse.urljoin(base, "307/onestep.asis")
|
| + (response, content) = self.http.request(uri, "GET")
|
| + self.assertEqual(response.status, 200)
|
| + self.assertEqual(content, b"This is the final destination.\n")
|
| + self.assertEqual(response.previous.status, 307)
|
| + self.assertEqual(response.previous.fromcache, False)
|
| +
|
| + (response, content) = self.http.request(uri, "GET")
|
| + self.assertEqual(response.status, 200)
|
| + self.assertEqual(response.fromcache, True)
|
| + self.assertEqual(content, b"This is the final destination.\n")
|
| + self.assertEqual(response.previous.status, 307)
|
| + self.assertEqual(response.previous.fromcache, False)
|
| +
|
| + def testGet410(self):
|
| + # Test that we pass 410's through
|
| + uri = urllib.parse.urljoin(base, "410/410.asis")
|
| + (response, content) = self.http.request(uri, "GET")
|
| + self.assertEqual(response.status, 410)
|
| +
|
| + def testVaryHeaderSimple(self):
|
| + """
|
| + RFC 2616 13.6
|
| + When the cache receives a subsequent request whose Request-URI
|
| + specifies one or more cache entries including a Vary header field,
|
| + the cache MUST NOT use such a cache entry to construct a response
|
| + to the new request unless all of the selecting request-headers
|
| + present in the new request match the corresponding stored
|
| + request-headers in the original request.
|
| + """
|
| + # test that the vary header is sent
|
| + uri = urllib.parse.urljoin(base, "vary/accept.asis")
|
| + (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/plain'})
|
| + self.assertEqual(response.status, 200)
|
| + self.assertTrue('vary' in response)
|
| +
|
| + # get the resource again, from the cache since accept header in this
|
| + # request is the same as the request
|
| + (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/plain'})
|
| + self.assertEqual(response.status, 200)
|
| + self.assertEqual(response.fromcache, True, msg="Should be from cache")
|
| +
|
| + # get the resource again, not from cache since Accept headers does not match
|
| + (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/html'})
|
| + self.assertEqual(response.status, 200)
|
| + self.assertEqual(response.fromcache, False, msg="Should not be from cache")
|
| +
|
| + # get the resource again, without any Accept header, so again no match
|
| + (response, content) = self.http.request(uri, "GET")
|
| + self.assertEqual(response.status, 200)
|
| + self.assertEqual(response.fromcache, False, msg="Should not be from cache")
|
| +
|
| + def testNoVary(self):
|
| + pass
|
| + # when there is no vary, a different Accept header (e.g.) should not
|
| + # impact if the cache is used
|
| + # test that the vary header is not sent
|
| + # uri = urllib.parse.urljoin(base, "vary/no-vary.asis")
|
| + # (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/plain'})
|
| + # self.assertEqual(response.status, 200)
|
| + # self.assertFalse('vary' in response)
|
| + #
|
| + # (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/plain'})
|
| + # self.assertEqual(response.status, 200)
|
| + # self.assertEqual(response.fromcache, True, msg="Should be from cache")
|
| + #
|
| + # (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/html'})
|
| + # self.assertEqual(response.status, 200)
|
| + # self.assertEqual(response.fromcache, True, msg="Should be from cache")
|
| +
|
| + def testVaryHeaderDouble(self):
|
| + uri = urllib.parse.urljoin(base, "vary/accept-double.asis")
|
| + (response, content) = self.http.request(uri, "GET", headers={
|
| + 'Accept': 'text/plain', 'Accept-Language': 'da, en-gb;q=0.8, en;q=0.7'})
|
| + self.assertEqual(response.status, 200)
|
| + self.assertTrue('vary' in response)
|
| +
|
| + # we are from cache
|
| + (response, content) = self.http.request(uri, "GET", headers={
|
| + 'Accept': 'text/plain', 'Accept-Language': 'da, en-gb;q=0.8, en;q=0.7'})
|
| + self.assertEqual(response.fromcache, True, msg="Should be from cache")
|
| +
|
| + (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/plain'})
|
| + self.assertEqual(response.status, 200)
|
| + self.assertEqual(response.fromcache, False)
|
| +
|
| + # get the resource again, not from cache, varied headers don't match exact
|
| + (response, content) = self.http.request(uri, "GET", headers={'Accept-Language': 'da'})
|
| + self.assertEqual(response.status, 200)
|
| + self.assertEqual(response.fromcache, False, msg="Should not be from cache")
|
| +
|
| + def testVaryUnusedHeader(self):
|
| + # A header's value is not considered to vary if it's not used at all.
|
| + uri = urllib.parse.urljoin(base, "vary/unused-header.asis")
|
| + (response, content) = self.http.request(uri, "GET", headers={
|
| + 'Accept': 'text/plain'})
|
| + self.assertEqual(response.status, 200)
|
| + self.assertTrue('vary' in response)
|
| +
|
| + # we are from cache
|
| + (response, content) = self.http.request(uri, "GET", headers={
|
| + 'Accept': 'text/plain',})
|
| + self.assertEqual(response.fromcache, True, msg="Should be from cache")
|
| +
|
| + def testHeadGZip(self):
|
| + # Test that we don't try to decompress a HEAD response
|
| + uri = urllib.parse.urljoin(base, "gzip/final-destination.txt")
|
| + (response, content) = self.http.request(uri, "HEAD")
|
| + self.assertEqual(response.status, 200)
|
| + self.assertNotEqual(int(response['content-length']), 0)
|
| + self.assertEqual(content, b"")
|
| +
|
| + def testGetGZip(self):
|
| + # Test that we support gzip compression
|
| + uri = urllib.parse.urljoin(base, "gzip/final-destination.txt")
|
| + (response, content) = self.http.request(uri, "GET")
|
| + self.assertEqual(response.status, 200)
|
| + self.assertFalse('content-encoding' in response)
|
| + self.assertTrue('-content-encoding' in response)
|
| + self.assertEqual(int(response['content-length']), len(b"This is the final destination.\n"))
|
| + self.assertEqual(content, b"This is the final destination.\n")
|
| +
|
| + def testPostAndGZipResponse(self):
|
| + uri = urllib.parse.urljoin(base, "gzip/post.cgi")
|
| + (response, content) = self.http.request(uri, "POST", body=" ")
|
| + self.assertEqual(response.status, 200)
|
| + self.assertFalse('content-encoding' in response)
|
| + self.assertTrue('-content-encoding' in response)
|
| +
|
| + def testGetGZipFailure(self):
|
| + # Test that we raise a good exception when the gzip fails
|
| + self.http.force_exception_to_status_code = False
|
| + uri = urllib.parse.urljoin(base, "gzip/failed-compression.asis")
|
| + try:
|
| + (response, content) = self.http.request(uri, "GET")
|
| + self.fail("Should never reach here")
|
| + except httplib2.FailedToDecompressContent:
|
| + pass
|
| + except Exception:
|
| + self.fail("Threw wrong kind of exception")
|
| +
|
| + # Re-run the test with out the exceptions
|
| + self.http.force_exception_to_status_code = True
|
| +
|
| + (response, content) = self.http.request(uri, "GET")
|
| + self.assertEqual(response.status, 500)
|
| + self.assertTrue(response.reason.startswith("Content purported"))
|
| +
|
| + def testIndividualTimeout(self):
|
| + uri = urllib.parse.urljoin(base, "timeout/timeout.cgi")
|
| + http = httplib2.Http(timeout=1)
|
| + http.force_exception_to_status_code = True
|
| +
|
| + (response, content) = http.request(uri)
|
| + self.assertEqual(response.status, 408)
|
| + self.assertTrue(response.reason.startswith("Request Timeout"))
|
| + self.assertTrue(content.startswith(b"Request Timeout"))
|
| +
|
| +
|
| + def testGetDeflate(self):
|
| + # Test that we support deflate compression
|
| + uri = urllib.parse.urljoin(base, "deflate/deflated.asis")
|
| + (response, content) = self.http.request(uri, "GET")
|
| + self.assertEqual(response.status, 200)
|
| + self.assertFalse('content-encoding' in response)
|
| + self.assertEqual(int(response['content-length']), len("This is the final destination."))
|
| + self.assertEqual(content, b"This is the final destination.")
|
| +
|
| + def testGetDeflateFailure(self):
|
| + # Test that we raise a good exception when the deflate fails
|
| + self.http.force_exception_to_status_code = False
|
| +
|
| + uri = urllib.parse.urljoin(base, "deflate/failed-compression.asis")
|
| + try:
|
| + (response, content) = self.http.request(uri, "GET")
|
| + self.fail("Should never reach here")
|
| + except httplib2.FailedToDecompressContent:
|
| + pass
|
| + except Exception:
|
| + self.fail("Threw wrong kind of exception")
|
| +
|
| + # Re-run the test with out the exceptions
|
| + self.http.force_exception_to_status_code = True
|
| +
|
| + (response, content) = self.http.request(uri, "GET")
|
| + self.assertEqual(response.status, 500)
|
| + self.assertTrue(response.reason.startswith("Content purported"))
|
| +
|
| + def testGetDuplicateHeaders(self):
|
| + # Test that duplicate headers get concatenated via ','
|
| + uri = urllib.parse.urljoin(base, "duplicate-headers/multilink.asis")
|
| + (response, content) = self.http.request(uri, "GET")
|
| + self.assertEqual(response.status, 200)
|
| + self.assertEqual(content, b"This is content\n")
|
| + self.assertEqual(response['link'].split(",")[0], '<http://bitworking.org>; rel="home"; title="BitWorking"')
|
| +
|
| + def testGetCacheControlNoCache(self):
|
| + # Test Cache-Control: no-cache on requests
|
| + uri = urllib.parse.urljoin(base, "304/test_etag.txt")
|
| + (response, content) = self.http.request(uri, "GET")
|
| + self.assertNotEqual(response['etag'], "")
|
| + (response, content) = self.http.request(uri, "GET")
|
| + self.assertEqual(response.status, 200)
|
| + self.assertEqual(response.fromcache, True)
|
| +
|
| + (response, content) = self.http.request(uri, "GET", headers={'Cache-Control': 'no-cache'})
|
| + self.assertEqual(response.status, 200)
|
| + self.assertEqual(response.fromcache, False)
|
| +
|
| + def testGetCacheControlPragmaNoCache(self):
|
| + # Test Pragma: no-cache on requests
|
| + uri = urllib.parse.urljoin(base, "304/test_etag.txt")
|
| + (response, content) = self.http.request(uri, "GET")
|
| + self.assertNotEqual(response['etag'], "")
|
| + (response, content) = self.http.request(uri, "GET")
|
| + self.assertEqual(response.status, 200)
|
| + self.assertEqual(response.fromcache, True)
|
| +
|
| + (response, content) = self.http.request(uri, "GET", headers={'Pragma': 'no-cache'})
|
| + self.assertEqual(response.status, 200)
|
| + self.assertEqual(response.fromcache, False)
|
| +
|
| + def testGetCacheControlNoStoreRequest(self):
|
| + # A no-store request means that the response should not be stored.
|
| + uri = urllib.parse.urljoin(base, "304/test_etag.txt")
|
| +
|
| + (response, content) = self.http.request(uri, "GET", headers={'Cache-Control': 'no-store'})
|
| + self.assertEqual(response.status, 200)
|
| + self.assertEqual(response.fromcache, False)
|
| +
|
| + (response, content) = self.http.request(uri, "GET", headers={'Cache-Control': 'no-store'})
|
| + self.assertEqual(response.status, 200)
|
| + self.assertEqual(response.fromcache, False)
|
| +
|
| + def testGetCacheControlNoStoreResponse(self):
|
| + # A no-store response means that the response should not be stored.
|
| + uri = urllib.parse.urljoin(base, "no-store/no-store.asis")
|
| +
|
| + (response, content) = self.http.request(uri, "GET")
|
| + self.assertEqual(response.status, 200)
|
| + self.assertEqual(response.fromcache, False)
|
| +
|
| + (response, content) = self.http.request(uri, "GET")
|
| + self.assertEqual(response.status, 200)
|
| + self.assertEqual(response.fromcache, False)
|
| +
|
| + def testGetCacheControlNoCacheNoStoreRequest(self):
|
| + # Test that a no-store, no-cache clears the entry from the cache
|
| + # even if it was cached previously.
|
| + uri = urllib.parse.urljoin(base, "304/test_etag.txt")
|
| +
|
| + (response, content) = self.http.request(uri, "GET")
|
| + (response, content) = self.http.request(uri, "GET")
|
| + self.assertEqual(response.fromcache, True)
|
| + (response, content) = self.http.request(uri, "GET", headers={'Cache-Control': 'no-store, no-cache'})
|
| + (response, content) = self.http.request(uri, "GET", headers={'Cache-Control': 'no-store, no-cache'})
|
| + self.assertEqual(response.status, 200)
|
| + self.assertEqual(response.fromcache, False)
|
| +
|
| + def testUpdateInvalidatesCache(self):
|
| + # Test that calling PUT or DELETE on a
|
| + # URI that is cache invalidates that cache.
|
| + uri = urllib.parse.urljoin(base, "304/test_etag.txt")
|
| +
|
| + (response, content) = self.http.request(uri, "GET")
|
| + (response, content) = self.http.request(uri, "GET")
|
| + self.assertEqual(response.fromcache, True)
|
| + (response, content) = self.http.request(uri, "DELETE")
|
| + self.assertEqual(response.status, 405)
|
| +
|
| + (response, content) = self.http.request(uri, "GET")
|
| + self.assertEqual(response.fromcache, False)
|
| +
|
| + def testUpdateUsesCachedETag(self):
|
| + # Test that we natively support http://www.w3.org/1999/04/Editing/
|
| + uri = urllib.parse.urljoin(base, "conditional-updates/test.cgi")
|
| +
|
| + (response, content) = self.http.request(uri, "GET")
|
| + self.assertEqual(response.status, 200)
|
| + self.assertEqual(response.fromcache, False)
|
| + (response, content) = self.http.request(uri, "GET")
|
| + self.assertEqual(response.status, 200)
|
| + self.assertEqual(response.fromcache, True)
|
| + (response, content) = self.http.request(uri, "PUT", body="foo")
|
| + self.assertEqual(response.status, 200)
|
| + (response, content) = self.http.request(uri, "PUT", body="foo")
|
| + self.assertEqual(response.status, 412)
|
| +
|
| +
|
| + def testUpdatePatchUsesCachedETag(self):
|
| + # Test that we natively support http://www.w3.org/1999/04/Editing/
|
| + uri = urllib.parse.urljoin(base, "conditional-updates/test.cgi")
|
| +
|
| + (response, content) = self.http.request(uri, "GET")
|
| + self.assertEqual(response.status, 200)
|
| + self.assertEqual(response.fromcache, False)
|
| + (response, content) = self.http.request(uri, "GET")
|
| + self.assertEqual(response.status, 200)
|
| + self.assertEqual(response.fromcache, True)
|
| + (response, content) = self.http.request(uri, "PATCH", body="foo")
|
| + self.assertEqual(response.status, 200)
|
| + (response, content) = self.http.request(uri, "PATCH", body="foo")
|
| + self.assertEqual(response.status, 412)
|
| +
|
| + def testUpdateUsesCachedETagAndOCMethod(self):
|
| + # Test that we natively support http://www.w3.org/1999/04/Editing/
|
| + uri = urllib.parse.urljoin(base, "conditional-updates/test.cgi")
|
| +
|
| + (response, content) = self.http.request(uri, "GET")
|
| + self.assertEqual(response.status, 200)
|
| + self.assertEqual(response.fromcache, False)
|
| + (response, content) = self.http.request(uri, "GET")
|
| + self.assertEqual(response.status, 200)
|
| + self.assertEqual(response.fromcache, True)
|
| + self.http.optimistic_concurrency_methods.append("DELETE")
|
| + (response, content) = self.http.request(uri, "DELETE")
|
| + self.assertEqual(response.status, 200)
|
| +
|
| +
|
| + def testUpdateUsesCachedETagOverridden(self):
|
| + # Test that we natively support http://www.w3.org/1999/04/Editing/
|
| + uri = urllib.parse.urljoin(base, "conditional-updates/test.cgi")
|
| +
|
| + (response, content) = self.http.request(uri, "GET")
|
| + self.assertEqual(response.status, 200)
|
| + self.assertEqual(response.fromcache, False)
|
| + (response, content) = self.http.request(uri, "GET")
|
| + self.assertEqual(response.status, 200)
|
| + self.assertEqual(response.fromcache, True)
|
| + (response, content) = self.http.request(uri, "PUT", body="foo", headers={'if-match': 'fred'})
|
| + self.assertEqual(response.status, 412)
|
| +
|
| + def testBasicAuth(self):
|
| + # Test Basic Authentication
|
| + uri = urllib.parse.urljoin(base, "basic/file.txt")
|
| + (response, content) = self.http.request(uri, "GET")
|
| + self.assertEqual(response.status, 401)
|
| +
|
| + uri = urllib.parse.urljoin(base, "basic/")
|
| + (response, content) = self.http.request(uri, "GET")
|
| + self.assertEqual(response.status, 401)
|
| +
|
| + self.http.add_credentials('joe', 'password')
|
| + (response, content) = self.http.request(uri, "GET")
|
| + self.assertEqual(response.status, 200)
|
| +
|
| + uri = urllib.parse.urljoin(base, "basic/file.txt")
|
| + (response, content) = self.http.request(uri, "GET")
|
| + self.assertEqual(response.status, 200)
|
| +
|
| + def testBasicAuthWithDomain(self):
|
| + # Test Basic Authentication
|
| + uri = urllib.parse.urljoin(base, "basic/file.txt")
|
| + (response, content) = self.http.request(uri, "GET")
|
| + self.assertEqual(response.status, 401)
|
| +
|
| + uri = urllib.parse.urljoin(base, "basic/")
|
| + (response, content) = self.http.request(uri, "GET")
|
| + self.assertEqual(response.status, 401)
|
| +
|
| + self.http.add_credentials('joe', 'password', "example.org")
|
| + (response, content) = self.http.request(uri, "GET")
|
| + self.assertEqual(response.status, 401)
|
| +
|
| + uri = urllib.parse.urljoin(base, "basic/file.txt")
|
| + (response, content) = self.http.request(uri, "GET")
|
| + self.assertEqual(response.status, 401)
|
| +
|
| + domain = urllib.parse.urlparse(base)[1]
|
| + self.http.add_credentials('joe', 'password', domain)
|
| + (response, content) = self.http.request(uri, "GET")
|
| + self.assertEqual(response.status, 200)
|
| +
|
| + uri = urllib.parse.urljoin(base, "basic/file.txt")
|
| + (response, content) = self.http.request(uri, "GET")
|
| + self.assertEqual(response.status, 200)
|
| +
|
| +
|
| +
|
| +
|
| +
|
| +
|
| + def testBasicAuthTwoDifferentCredentials(self):
|
| + # Test Basic Authentication with multiple sets of credentials
|
| + uri = urllib.parse.urljoin(base, "basic2/file.txt")
|
| + (response, content) = self.http.request(uri, "GET")
|
| + self.assertEqual(response.status, 401)
|
| +
|
| + uri = urllib.parse.urljoin(base, "basic2/")
|
| + (response, content) = self.http.request(uri, "GET")
|
| + self.assertEqual(response.status, 401)
|
| +
|
| + self.http.add_credentials('fred', 'barney')
|
| + (response, content) = self.http.request(uri, "GET")
|
| + self.assertEqual(response.status, 200)
|
| +
|
| + uri = urllib.parse.urljoin(base, "basic2/file.txt")
|
| + (response, content) = self.http.request(uri, "GET")
|
| + self.assertEqual(response.status, 200)
|
| +
|
| + def testBasicAuthNested(self):
|
| + # Test Basic Authentication with resources
|
| + # that are nested
|
| + uri = urllib.parse.urljoin(base, "basic-nested/")
|
| + (response, content) = self.http.request(uri, "GET")
|
| + self.assertEqual(response.status, 401)
|
| +
|
| + uri = urllib.parse.urljoin(base, "basic-nested/subdir")
|
| + (response, content) = self.http.request(uri, "GET")
|
| + self.assertEqual(response.status, 401)
|
| +
|
| + # Now add in credentials one at a time and test.
|
| + self.http.add_credentials('joe', 'password')
|
| +
|
| + uri = urllib.parse.urljoin(base, "basic-nested/")
|
| + (response, content) = self.http.request(uri, "GET")
|
| + self.assertEqual(response.status, 200)
|
| +
|
| + uri = urllib.parse.urljoin(base, "basic-nested/subdir")
|
| + (response, content) = self.http.request(uri, "GET")
|
| + self.assertEqual(response.status, 401)
|
| +
|
| + self.http.add_credentials('fred', 'barney')
|
| +
|
| + uri = urllib.parse.urljoin(base, "basic-nested/")
|
| + (response, content) = self.http.request(uri, "GET")
|
| + self.assertEqual(response.status, 200)
|
| +
|
| + uri = urllib.parse.urljoin(base, "basic-nested/subdir")
|
| + (response, content) = self.http.request(uri, "GET")
|
| + self.assertEqual(response.status, 200)
|
| +
|
| + def testDigestAuth(self):
|
| + # Test that we support Digest Authentication
|
| + uri = urllib.parse.urljoin(base, "digest/")
|
| + (response, content) = self.http.request(uri, "GET")
|
| + self.assertEqual(response.status, 401)
|
| +
|
| + self.http.add_credentials('joe', 'password')
|
| + (response, content) = self.http.request(uri, "GET")
|
| + self.assertEqual(response.status, 200)
|
| +
|
| + uri = urllib.parse.urljoin(base, "digest/file.txt")
|
| + (response, content) = self.http.request(uri, "GET")
|
| +
|
| + def testDigestAuthNextNonceAndNC(self):
|
| + # Test that if the server sets nextnonce that we reset
|
| + # the nonce count back to 1
|
| + uri = urllib.parse.urljoin(base, "digest/file.txt")
|
| + self.http.add_credentials('joe', 'password')
|
| + (response, content) = self.http.request(uri, "GET", headers = {"cache-control":"no-cache"})
|
| + info = httplib2._parse_www_authenticate(response, 'authentication-info')
|
| + self.assertEqual(response.status, 200)
|
| + (response, content) = self.http.request(uri, "GET", headers = {"cache-control":"no-cache"})
|
| + info2 = httplib2._parse_www_authenticate(response, 'authentication-info')
|
| + self.assertEqual(response.status, 200)
|
| +
|
| + if 'nextnonce' in info:
|
| + self.assertEqual(info2['nc'], 1)
|
| +
|
| + def testDigestAuthStale(self):
|
| + # Test that we can handle a nonce becoming stale
|
| + uri = urllib.parse.urljoin(base, "digest-expire/file.txt")
|
| + self.http.add_credentials('joe', 'password')
|
| + (response, content) = self.http.request(uri, "GET", headers = {"cache-control":"no-cache"})
|
| + info = httplib2._parse_www_authenticate(response, 'authentication-info')
|
| + self.assertEqual(response.status, 200)
|
| +
|
| + time.sleep(3)
|
| + # Sleep long enough that the nonce becomes stale
|
| +
|
| + (response, content) = self.http.request(uri, "GET", headers = {"cache-control":"no-cache"})
|
| + self.assertFalse(response.fromcache)
|
| + self.assertTrue(response._stale_digest)
|
| + info3 = httplib2._parse_www_authenticate(response, 'authentication-info')
|
| + self.assertEqual(response.status, 200)
|
| +
|
| + def reflector(self, content):
|
| + return dict( [tuple(x.split("=", 1)) for x in content.decode('utf-8').strip().split("\n")] )
|
| +
|
| + def testReflector(self):
|
| + uri = urllib.parse.urljoin(base, "reflector/reflector.cgi")
|
| + (response, content) = self.http.request(uri, "GET")
|
| + d = self.reflector(content)
|
| + self.assertTrue('HTTP_USER_AGENT' in d)
|
| +
|
| +
|
| + def testConnectionClose(self):
|
| + uri = "http://www.google.com/"
|
| + (response, content) = self.http.request(uri, "GET")
|
| + for c in self.http.connections.values():
|
| + self.assertNotEqual(None, c.sock)
|
| + (response, content) = self.http.request(uri, "GET", headers={"connection": "close"})
|
| + for c in self.http.connections.values():
|
| + self.assertEqual(None, c.sock)
|
| +
|
| + def testPickleHttp(self):
|
| + pickled_http = pickle.dumps(self.http)
|
| + new_http = pickle.loads(pickled_http)
|
| +
|
| + self.assertEqual(sorted(new_http.__dict__.keys()),
|
| + sorted(self.http.__dict__.keys()))
|
| + for key in new_http.__dict__:
|
| + if key in ('certificates', 'credentials'):
|
| + self.assertEqual(new_http.__dict__[key].credentials,
|
| + self.http.__dict__[key].credentials)
|
| + elif key == 'cache':
|
| + self.assertEqual(new_http.__dict__[key].cache,
|
| + self.http.__dict__[key].cache)
|
| + else:
|
| + self.assertEqual(new_http.__dict__[key],
|
| + self.http.__dict__[key])
|
| +
|
| + def testPickleHttpWithConnection(self):
|
| + self.http.request('http://bitworking.org',
|
| + connection_type=_MyHTTPConnection)
|
| + pickled_http = pickle.dumps(self.http)
|
| + new_http = pickle.loads(pickled_http)
|
| +
|
| + self.assertEqual(list(self.http.connections.keys()),
|
| + ['http:bitworking.org'])
|
| + self.assertEqual(new_http.connections, {})
|
| +
|
| + def testPickleCustomRequestHttp(self):
|
| + def dummy_request(*args, **kwargs):
|
| + return new_request(*args, **kwargs)
|
| + dummy_request.dummy_attr = 'dummy_value'
|
| +
|
| + self.http.request = dummy_request
|
| + pickled_http = pickle.dumps(self.http)
|
| + self.assertFalse(b"S'request'" in pickled_http)
|
| +
|
| +try:
|
| + import memcache
|
| + class HttpTestMemCached(HttpTest):
|
| + def setUp(self):
|
| + self.cache = memcache.Client(['127.0.0.1:11211'], debug=0)
|
| + #self.cache = memcache.Client(['10.0.0.4:11211'], debug=1)
|
| + self.http = httplib2.Http(self.cache)
|
| + self.cache.flush_all()
|
| + # Not exactly sure why the sleep is needed here, but
|
| + # if not present then some unit tests that rely on caching
|
| + # fail. Memcached seems to lose some sets immediately
|
| + # after a flush_all if the set is to a value that
|
| + # was previously cached. (Maybe the flush is handled async?)
|
| + time.sleep(1)
|
| + self.http.clear_credentials()
|
| +except:
|
| + pass
|
| +
|
| +
|
| +
|
| +# ------------------------------------------------------------------------
|
| +
|
| +class HttpPrivateTest(unittest.TestCase):
|
| +
|
| + def testParseCacheControl(self):
|
| + # Test that we can parse the Cache-Control header
|
| + self.assertEqual({}, httplib2._parse_cache_control({}))
|
| + self.assertEqual({'no-cache': 1}, httplib2._parse_cache_control({'cache-control': ' no-cache'}))
|
| + cc = httplib2._parse_cache_control({'cache-control': ' no-cache, max-age = 7200'})
|
| + self.assertEqual(cc['no-cache'], 1)
|
| + self.assertEqual(cc['max-age'], '7200')
|
| + cc = httplib2._parse_cache_control({'cache-control': ' , '})
|
| + self.assertEqual(cc[''], 1)
|
| +
|
| + try:
|
| + cc = httplib2._parse_cache_control({'cache-control': 'Max-age=3600;post-check=1800,pre-check=3600'})
|
| + self.assertTrue("max-age" in cc)
|
| + except:
|
| + self.fail("Should not throw exception")
|
| +
|
| +
|
| +
|
| +
|
| + def testNormalizeHeaders(self):
|
| + # Test that we normalize headers to lowercase
|
| + h = httplib2._normalize_headers({'Cache-Control': 'no-cache', 'Other': 'Stuff'})
|
| + self.assertTrue('cache-control' in h)
|
| + self.assertTrue('other' in h)
|
| + self.assertEqual('Stuff', h['other'])
|
| +
|
| + def testExpirationModelTransparent(self):
|
| + # Test that no-cache makes our request TRANSPARENT
|
| + response_headers = {
|
| + 'cache-control': 'max-age=7200'
|
| + }
|
| + request_headers = {
|
| + 'cache-control': 'no-cache'
|
| + }
|
| + self.assertEqual("TRANSPARENT", httplib2._entry_disposition(response_headers, request_headers))
|
| +
|
| + def testMaxAgeNonNumeric(self):
|
| + # Test that no-cache makes our request TRANSPARENT
|
| + response_headers = {
|
| + 'cache-control': 'max-age=fred, min-fresh=barney'
|
| + }
|
| + request_headers = {
|
| + }
|
| + self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
|
| +
|
| +
|
| + def testExpirationModelNoCacheResponse(self):
|
| + # The date and expires point to an entry that should be
|
| + # FRESH, but the no-cache over-rides that.
|
| + now = time.time()
|
| + response_headers = {
|
| + 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
|
| + 'expires': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now+4)),
|
| + 'cache-control': 'no-cache'
|
| + }
|
| + request_headers = {
|
| + }
|
| + self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
|
| +
|
| + def testExpirationModelStaleRequestMustReval(self):
|
| + # must-revalidate forces STALE
|
| + self.assertEqual("STALE", httplib2._entry_disposition({}, {'cache-control': 'must-revalidate'}))
|
| +
|
| + def testExpirationModelStaleResponseMustReval(self):
|
| + # must-revalidate forces STALE
|
| + self.assertEqual("STALE", httplib2._entry_disposition({'cache-control': 'must-revalidate'}, {}))
|
| +
|
| + def testExpirationModelFresh(self):
|
| + response_headers = {
|
| + 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime()),
|
| + 'cache-control': 'max-age=2'
|
| + }
|
| + request_headers = {
|
| + }
|
| + self.assertEqual("FRESH", httplib2._entry_disposition(response_headers, request_headers))
|
| + time.sleep(3)
|
| + self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
|
| +
|
| + def testExpirationMaxAge0(self):
|
| + response_headers = {
|
| + 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime()),
|
| + 'cache-control': 'max-age=0'
|
| + }
|
| + request_headers = {
|
| + }
|
| + self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
|
| +
|
| + def testExpirationModelDateAndExpires(self):
|
| + now = time.time()
|
| + response_headers = {
|
| + 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
|
| + 'expires': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now+2)),
|
| + }
|
| + request_headers = {
|
| + }
|
| + self.assertEqual("FRESH", httplib2._entry_disposition(response_headers, request_headers))
|
| + time.sleep(3)
|
| + self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
|
| +
|
| + def testExpiresZero(self):
|
| + now = time.time()
|
| + response_headers = {
|
| + 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
|
| + 'expires': "0",
|
| + }
|
| + request_headers = {
|
| + }
|
| + self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
|
| +
|
| + def testExpirationModelDateOnly(self):
|
| + now = time.time()
|
| + response_headers = {
|
| + 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now+3)),
|
| + }
|
| + request_headers = {
|
| + }
|
| + self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
|
| +
|
| + def testExpirationModelOnlyIfCached(self):
|
| + response_headers = {
|
| + }
|
| + request_headers = {
|
| + 'cache-control': 'only-if-cached',
|
| + }
|
| + self.assertEqual("FRESH", httplib2._entry_disposition(response_headers, request_headers))
|
| +
|
| + def testExpirationModelMaxAgeBoth(self):
|
| + now = time.time()
|
| + response_headers = {
|
| + 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
|
| + 'cache-control': 'max-age=2'
|
| + }
|
| + request_headers = {
|
| + 'cache-control': 'max-age=0'
|
| + }
|
| + self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
|
| +
|
| + def testExpirationModelDateAndExpiresMinFresh1(self):
|
| + now = time.time()
|
| + response_headers = {
|
| + 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
|
| + 'expires': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now+2)),
|
| + }
|
| + request_headers = {
|
| + 'cache-control': 'min-fresh=2'
|
| + }
|
| + self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
|
| +
|
| + def testExpirationModelDateAndExpiresMinFresh2(self):
|
| + now = time.time()
|
| + response_headers = {
|
| + 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
|
| + 'expires': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now+4)),
|
| + }
|
| + request_headers = {
|
| + 'cache-control': 'min-fresh=2'
|
| + }
|
| + self.assertEqual("FRESH", httplib2._entry_disposition(response_headers, request_headers))
|
| +
|
| + def testParseWWWAuthenticateEmpty(self):
|
| + res = httplib2._parse_www_authenticate({})
|
| + self.assertEqual(len(list(res.keys())), 0)
|
| +
|
| + def testParseWWWAuthenticate(self):
|
| + # different uses of spaces around commas
|
| + res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Test realm="test realm" , foo=foo ,bar="bar", baz=baz,qux=qux'})
|
| + self.assertEqual(len(list(res.keys())), 1)
|
| + self.assertEqual(len(list(res['test'].keys())), 5)
|
| +
|
| + # tokens with non-alphanum
|
| + res = httplib2._parse_www_authenticate({ 'www-authenticate': 'T*!%#st realm=to*!%#en, to*!%#en="quoted string"'})
|
| + self.assertEqual(len(list(res.keys())), 1)
|
| + self.assertEqual(len(list(res['t*!%#st'].keys())), 2)
|
| +
|
| + # quoted string with quoted pairs
|
| + res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Test realm="a \\"test\\" realm"'})
|
| + self.assertEqual(len(list(res.keys())), 1)
|
| + self.assertEqual(res['test']['realm'], 'a "test" realm')
|
| +
|
| + def testParseWWWAuthenticateStrict(self):
|
| + httplib2.USE_WWW_AUTH_STRICT_PARSING = 1;
|
| + self.testParseWWWAuthenticate();
|
| + httplib2.USE_WWW_AUTH_STRICT_PARSING = 0;
|
| +
|
| + def testParseWWWAuthenticateBasic(self):
|
| + res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Basic realm="me"'})
|
| + basic = res['basic']
|
| + self.assertEqual('me', basic['realm'])
|
| +
|
| + res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Basic realm="me", algorithm="MD5"'})
|
| + basic = res['basic']
|
| + self.assertEqual('me', basic['realm'])
|
| + self.assertEqual('MD5', basic['algorithm'])
|
| +
|
| + res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Basic realm="me", algorithm=MD5'})
|
| + basic = res['basic']
|
| + self.assertEqual('me', basic['realm'])
|
| + self.assertEqual('MD5', basic['algorithm'])
|
| +
|
| + def testParseWWWAuthenticateBasic2(self):
|
| + res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Basic realm="me",other="fred" '})
|
| + basic = res['basic']
|
| + self.assertEqual('me', basic['realm'])
|
| + self.assertEqual('fred', basic['other'])
|
| +
|
| + def testParseWWWAuthenticateBasic3(self):
|
| + res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Basic REAlm="me" '})
|
| + basic = res['basic']
|
| + self.assertEqual('me', basic['realm'])
|
| +
|
| +
|
| + def testParseWWWAuthenticateDigest(self):
|
| + res = httplib2._parse_www_authenticate({ 'www-authenticate':
|
| + 'Digest realm="testrealm@host.com", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41"'})
|
| + digest = res['digest']
|
| + self.assertEqual('testrealm@host.com', digest['realm'])
|
| + self.assertEqual('auth,auth-int', digest['qop'])
|
| +
|
| +
|
| + def testParseWWWAuthenticateMultiple(self):
|
| + res = httplib2._parse_www_authenticate({ 'www-authenticate':
|
| + 'Digest realm="testrealm@host.com", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41" Basic REAlm="me" '})
|
| + digest = res['digest']
|
| + self.assertEqual('testrealm@host.com', digest['realm'])
|
| + self.assertEqual('auth,auth-int', digest['qop'])
|
| + self.assertEqual('dcd98b7102dd2f0e8b11d0f600bfb0c093', digest['nonce'])
|
| + self.assertEqual('5ccc069c403ebaf9f0171e9517f40e41', digest['opaque'])
|
| + basic = res['basic']
|
| + self.assertEqual('me', basic['realm'])
|
| +
|
| + def testParseWWWAuthenticateMultiple2(self):
|
| + # Handle an added comma between challenges, which might get thrown in if the challenges were
|
| + # originally sent in separate www-authenticate headers.
|
| + res = httplib2._parse_www_authenticate({ 'www-authenticate':
|
| + 'Digest realm="testrealm@host.com", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41", Basic REAlm="me" '})
|
| + digest = res['digest']
|
| + self.assertEqual('testrealm@host.com', digest['realm'])
|
| + self.assertEqual('auth,auth-int', digest['qop'])
|
| + self.assertEqual('dcd98b7102dd2f0e8b11d0f600bfb0c093', digest['nonce'])
|
| + self.assertEqual('5ccc069c403ebaf9f0171e9517f40e41', digest['opaque'])
|
| + basic = res['basic']
|
| + self.assertEqual('me', basic['realm'])
|
| +
|
| + def testParseWWWAuthenticateMultiple3(self):
|
| + # Handle an added comma between challenges, which might get thrown in if the challenges were
|
| + # originally sent in separate www-authenticate headers.
|
| + res = httplib2._parse_www_authenticate({ 'www-authenticate':
|
| + 'Digest realm="testrealm@host.com", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41", Basic REAlm="me", WSSE realm="foo", profile="UsernameToken"'})
|
| + digest = res['digest']
|
| + self.assertEqual('testrealm@host.com', digest['realm'])
|
| + self.assertEqual('auth,auth-int', digest['qop'])
|
| + self.assertEqual('dcd98b7102dd2f0e8b11d0f600bfb0c093', digest['nonce'])
|
| + self.assertEqual('5ccc069c403ebaf9f0171e9517f40e41', digest['opaque'])
|
| + basic = res['basic']
|
| + self.assertEqual('me', basic['realm'])
|
| + wsse = res['wsse']
|
| + self.assertEqual('foo', wsse['realm'])
|
| + self.assertEqual('UsernameToken', wsse['profile'])
|
| +
|
| + def testParseWWWAuthenticateMultiple4(self):
|
| + res = httplib2._parse_www_authenticate({ 'www-authenticate':
|
| + 'Digest realm="test-real.m@host.com", qop \t=\t"\tauth,auth-int", nonce="(*)&^&$%#",opaque="5ccc069c403ebaf9f0171e9517f40e41", Basic REAlm="me", WSSE realm="foo", profile="UsernameToken"'})
|
| + digest = res['digest']
|
| + self.assertEqual('test-real.m@host.com', digest['realm'])
|
| + self.assertEqual('\tauth,auth-int', digest['qop'])
|
| + self.assertEqual('(*)&^&$%#', digest['nonce'])
|
| +
|
| + def testParseWWWAuthenticateMoreQuoteCombos(self):
|
| + res = httplib2._parse_www_authenticate({'www-authenticate':'Digest realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", algorithm=MD5, qop="auth", stale=true'})
|
| + digest = res['digest']
|
| + self.assertEqual('myrealm', digest['realm'])
|
| +
|
| + def testParseWWWAuthenticateMalformed(self):
|
| + try:
|
| + res = httplib2._parse_www_authenticate({'www-authenticate':'OAuth "Facebook Platform" "invalid_token" "Invalid OAuth access token."'})
|
| + self.fail("should raise an exception")
|
| + except httplib2.MalformedHeader:
|
| + pass
|
| +
|
| + def testDigestObject(self):
|
| + credentials = ('joe', 'password')
|
| + host = None
|
| + request_uri = '/projects/httplib2/test/digest/'
|
| + headers = {}
|
| + response = {
|
| + 'www-authenticate': 'Digest realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", algorithm=MD5, qop="auth"'
|
| + }
|
| + content = b""
|
| +
|
| + d = httplib2.DigestAuthentication(credentials, host, request_uri, headers, response, content, None)
|
| + d.request("GET", request_uri, headers, content, cnonce="33033375ec278a46")
|
| + our_request = "authorization: %s" % headers['authorization']
|
| + working_request = 'authorization: Digest username="joe", realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", uri="/projects/httplib2/test/digest/", algorithm=MD5, response="97ed129401f7cdc60e5db58a80f3ea8b", qop=auth, nc=00000001, cnonce="33033375ec278a46"'
|
| + self.assertEqual(our_request, working_request)
|
| +
|
| + def testDigestObjectWithOpaque(self):
|
| + credentials = ('joe', 'password')
|
| + host = None
|
| + request_uri = '/projects/httplib2/test/digest/'
|
| + headers = {}
|
| + response = {
|
| + 'www-authenticate': 'Digest realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", algorithm=MD5, qop="auth", opaque="atestopaque"'
|
| + }
|
| + content = ""
|
| +
|
| + d = httplib2.DigestAuthentication(credentials, host, request_uri, headers, response, content, None)
|
| + d.request("GET", request_uri, headers, content, cnonce="33033375ec278a46")
|
| + our_request = "authorization: %s" % headers['authorization']
|
| + working_request = 'authorization: Digest username="joe", realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", uri="/projects/httplib2/test/digest/", algorithm=MD5, response="97ed129401f7cdc60e5db58a80f3ea8b", qop=auth, nc=00000001, cnonce="33033375ec278a46", opaque="atestopaque"'
|
| + self.assertEqual(our_request, working_request)
|
| +
|
| + def testDigestObjectStale(self):
|
| + credentials = ('joe', 'password')
|
| + host = None
|
| + request_uri = '/projects/httplib2/test/digest/'
|
| + headers = {}
|
| + response = httplib2.Response({ })
|
| + response['www-authenticate'] = 'Digest realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", algorithm=MD5, qop="auth", stale=true'
|
| + response.status = 401
|
| + content = b""
|
| + d = httplib2.DigestAuthentication(credentials, host, request_uri, headers, response, content, None)
|
| + # Returns true to force a retry
|
| + self.assertTrue( d.response(response, content) )
|
| +
|
| + def testDigestObjectAuthInfo(self):
|
| + credentials = ('joe', 'password')
|
| + host = None
|
| + request_uri = '/projects/httplib2/test/digest/'
|
| + headers = {}
|
| + response = httplib2.Response({ })
|
| + response['www-authenticate'] = 'Digest realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", algorithm=MD5, qop="auth", stale=true'
|
| + response['authentication-info'] = 'nextnonce="fred"'
|
| + content = b""
|
| + d = httplib2.DigestAuthentication(credentials, host, request_uri, headers, response, content, None)
|
| + # Returns true to force a retry
|
| + self.assertFalse( d.response(response, content) )
|
| + self.assertEqual('fred', d.challenge['nonce'])
|
| + self.assertEqual(1, d.challenge['nc'])
|
| +
|
| + def testWsseAlgorithm(self):
|
| + digest = httplib2._wsse_username_token("d36e316282959a9ed4c89851497a717f", "2003-12-15T14:43:07Z", "taadtaadpstcsm")
|
| + expected = b"quR/EWLAV4xLf9Zqyw4pDmfV9OY="
|
| + self.assertEqual(expected, digest)
|
| +
|
| + def testEnd2End(self):
|
| + # one end to end header
|
| + response = {'content-type': 'application/atom+xml', 'te': 'deflate'}
|
| + end2end = httplib2._get_end2end_headers(response)
|
| + self.assertTrue('content-type' in end2end)
|
| + self.assertTrue('te' not in end2end)
|
| + self.assertTrue('connection' not in end2end)
|
| +
|
| + # one end to end header that gets eliminated
|
| + response = {'connection': 'content-type', 'content-type': 'application/atom+xml', 'te': 'deflate'}
|
| + end2end = httplib2._get_end2end_headers(response)
|
| + self.assertTrue('content-type' not in end2end)
|
| + self.assertTrue('te' not in end2end)
|
| + self.assertTrue('connection' not in end2end)
|
| +
|
| + # Degenerate case of no headers
|
| + response = {}
|
| + end2end = httplib2._get_end2end_headers(response)
|
| + self.assertEqual(0, len(end2end))
|
| +
|
| + # Degenerate case of connection referrring to a header not passed in
|
| + response = {'connection': 'content-type'}
|
| + end2end = httplib2._get_end2end_headers(response)
|
| + self.assertEqual(0, len(end2end))
|
| +
|
| +
|
| +class TestProxyInfo(unittest.TestCase):
|
| + def setUp(self):
|
| + self.orig_env = dict(os.environ)
|
| +
|
| + def tearDown(self):
|
| + os.environ.clear()
|
| + os.environ.update(self.orig_env)
|
| +
|
| + def test_from_url(self):
|
| + pi = httplib2.proxy_info_from_url('http://myproxy.example.com')
|
| + self.assertEqual(pi.proxy_host, 'myproxy.example.com')
|
| + self.assertEqual(pi.proxy_port, 80)
|
| + self.assertEqual(pi.proxy_user, None)
|
| +
|
| + def test_from_url_ident(self):
|
| + pi = httplib2.proxy_info_from_url('http://zoidberg:fish@someproxy:99')
|
| + self.assertEqual(pi.proxy_host, 'someproxy')
|
| + self.assertEqual(pi.proxy_port, 99)
|
| + self.assertEqual(pi.proxy_user, 'zoidberg')
|
| + self.assertEqual(pi.proxy_pass, 'fish')
|
| +
|
| + def test_from_env(self):
|
| + os.environ['http_proxy'] = 'http://myproxy.example.com:8080'
|
| + pi = httplib2.proxy_info_from_environment()
|
| + self.assertEqual(pi.proxy_host, 'myproxy.example.com')
|
| + self.assertEqual(pi.proxy_port, 8080)
|
| +
|
| + def test_from_env_no_proxy(self):
|
| + os.environ['http_proxy'] = 'http://myproxy.example.com:80'
|
| + os.environ['https_proxy'] = 'http://myproxy.example.com:81'
|
| + pi = httplib2.proxy_info_from_environment('https')
|
| + self.assertEqual(pi.proxy_host, 'myproxy.example.com')
|
| + self.assertEqual(pi.proxy_port, 81)
|
| +
|
| + def test_from_env_none(self):
|
| + os.environ.clear()
|
| + pi = httplib2.proxy_info_from_environment()
|
| + self.assertEqual(pi, None)
|
| +
|
| +
|
| +if __name__ == '__main__':
|
| + unittest.main()
|
|
|