| Index: third_party/google-endpoints/apitools/base/py/transfer_test.py
|
| diff --git a/third_party/google-endpoints/apitools/base/py/transfer_test.py b/third_party/google-endpoints/apitools/base/py/transfer_test.py
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..53906fd9ba7646ace7b36d3cecf57a48588ccc55
|
| --- /dev/null
|
| +++ b/third_party/google-endpoints/apitools/base/py/transfer_test.py
|
| @@ -0,0 +1,250 @@
|
| +#
|
| +# Copyright 2015 Google Inc.
|
| +#
|
| +# Licensed under the Apache License, Version 2.0 (the "License");
|
| +# you may not use this file except in compliance with the License.
|
| +# You may obtain a copy of the License at
|
| +#
|
| +# http://www.apache.org/licenses/LICENSE-2.0
|
| +#
|
| +# Unless required by applicable law or agreed to in writing, software
|
| +# distributed under the License is distributed on an "AS IS" BASIS,
|
| +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
| +# See the License for the specific language governing permissions and
|
| +# limitations under the License.
|
| +
|
| +"""Tests for transfer.py."""
|
| +import string
|
| +
|
| +import mock
|
| +import six
|
| +from six.moves import http_client
|
| +import unittest2
|
| +
|
| +from apitools.base.py import base_api
|
| +from apitools.base.py import http_wrapper
|
| +from apitools.base.py import transfer
|
| +
|
| +
|
| +class TransferTest(unittest2.TestCase):
|
| +
|
| + def assertRangeAndContentRangeCompatible(self, request, response):
|
| + request_prefix = 'bytes='
|
| + self.assertIn('range', request.headers)
|
| + self.assertTrue(request.headers['range'].startswith(request_prefix))
|
| + request_range = request.headers['range'][len(request_prefix):]
|
| +
|
| + response_prefix = 'bytes '
|
| + self.assertIn('content-range', response.info)
|
| + response_header = response.info['content-range']
|
| + self.assertTrue(response_header.startswith(response_prefix))
|
| + response_range = (
|
| + response_header[len(response_prefix):].partition('/')[0])
|
| +
|
| + msg = ('Request range ({0}) not a prefix of '
|
| + 'response_range ({1})').format(
|
| + request_range, response_range)
|
| + self.assertTrue(response_range.startswith(request_range), msg=msg)
|
| +
|
| + def testComputeEndByte(self):
|
| + total_size = 100
|
| + chunksize = 10
|
| + download = transfer.Download.FromStream(
|
| + six.StringIO(), chunksize=chunksize, total_size=total_size)
|
| + self.assertEqual(chunksize - 1,
|
| + download._Download__ComputeEndByte(0, end=50))
|
| +
|
| + def testComputeEndByteReturnNone(self):
|
| + download = transfer.Download.FromStream(six.StringIO())
|
| + self.assertIsNone(
|
| + download._Download__ComputeEndByte(0, use_chunks=False))
|
| +
|
| + def testComputeEndByteNoChunks(self):
|
| + total_size = 100
|
| + download = transfer.Download.FromStream(
|
| + six.StringIO(), chunksize=10, total_size=total_size)
|
| + for end in (None, 1000):
|
| + self.assertEqual(
|
| + total_size - 1,
|
| + download._Download__ComputeEndByte(0, end=end,
|
| + use_chunks=False),
|
| + msg='Failed on end={0}'.format(end))
|
| +
|
| + def testComputeEndByteNoTotal(self):
|
| + download = transfer.Download.FromStream(six.StringIO())
|
| + default_chunksize = download.chunksize
|
| + for chunksize in (100, default_chunksize):
|
| + download.chunksize = chunksize
|
| + for start in (0, 10):
|
| + self.assertEqual(
|
| + download.chunksize + start - 1,
|
| + download._Download__ComputeEndByte(start),
|
| + msg='Failed on start={0}, chunksize={1}'.format(
|
| + start, chunksize))
|
| +
|
| + def testComputeEndByteSmallTotal(self):
|
| + total_size = 100
|
| + download = transfer.Download.FromStream(six.StringIO(),
|
| + total_size=total_size)
|
| + for start in (0, 10):
|
| + self.assertEqual(total_size - 1,
|
| + download._Download__ComputeEndByte(start),
|
| + msg='Failed on start={0}'.format(start))
|
| +
|
| + def testGetRange(self):
|
| + for (start_byte, end_byte) in [(0, 25), (5, 15), (0, 0), (25, 25)]:
|
| + bytes_http = object()
|
| + http = object()
|
| + download_stream = six.StringIO()
|
| + download = transfer.Download.FromStream(download_stream,
|
| + total_size=26,
|
| + auto_transfer=False)
|
| + download.bytes_http = bytes_http
|
| + base_url = 'https://part.one/'
|
| + with mock.patch.object(http_wrapper, 'MakeRequest',
|
| + autospec=True) as make_request:
|
| + make_request.return_value = http_wrapper.Response(
|
| + info={
|
| + 'content-range': 'bytes %d-%d/26' %
|
| + (start_byte, end_byte),
|
| + 'status': http_client.OK,
|
| + },
|
| + content=string.ascii_lowercase[start_byte:end_byte+1],
|
| + request_url=base_url,
|
| + )
|
| + request = http_wrapper.Request(url='https://part.one/')
|
| + download.InitializeDownload(request, http=http)
|
| + download.GetRange(start_byte, end_byte)
|
| + self.assertEqual(1, make_request.call_count)
|
| + received_request = make_request.call_args[0][1]
|
| + self.assertEqual(base_url, received_request.url)
|
| + self.assertRangeAndContentRangeCompatible(
|
| + received_request, make_request.return_value)
|
| +
|
| + def testNonChunkedDownload(self):
|
| + bytes_http = object()
|
| + http = object()
|
| + download_stream = six.StringIO()
|
| + download = transfer.Download.FromStream(download_stream, total_size=52)
|
| + download.bytes_http = bytes_http
|
| + base_url = 'https://part.one/'
|
| +
|
| + with mock.patch.object(http_wrapper, 'MakeRequest',
|
| + autospec=True) as make_request:
|
| + make_request.return_value = http_wrapper.Response(
|
| + info={
|
| + 'content-range': 'bytes 0-51/52',
|
| + 'status': http_client.OK,
|
| + },
|
| + content=string.ascii_lowercase * 2,
|
| + request_url=base_url,
|
| + )
|
| + request = http_wrapper.Request(url='https://part.one/')
|
| + download.InitializeDownload(request, http=http)
|
| + self.assertEqual(1, make_request.call_count)
|
| + received_request = make_request.call_args[0][1]
|
| + self.assertEqual(base_url, received_request.url)
|
| + self.assertRangeAndContentRangeCompatible(
|
| + received_request, make_request.return_value)
|
| + download_stream.seek(0)
|
| + self.assertEqual(string.ascii_lowercase * 2,
|
| + download_stream.getvalue())
|
| +
|
| + def testChunkedDownload(self):
|
| + bytes_http = object()
|
| + http = object()
|
| + download_stream = six.StringIO()
|
| + download = transfer.Download.FromStream(
|
| + download_stream, chunksize=26, total_size=52)
|
| + download.bytes_http = bytes_http
|
| +
|
| + # Setting autospec on a mock with an iterable side_effect is
|
| + # currently broken (http://bugs.python.org/issue17826), so
|
| + # instead we write a little function.
|
| + def _ReturnBytes(unused_http, http_request,
|
| + *unused_args, **unused_kwds):
|
| + url = http_request.url
|
| + if url == 'https://part.one/':
|
| + return http_wrapper.Response(
|
| + info={
|
| + 'content-location': 'https://part.two/',
|
| + 'content-range': 'bytes 0-25/52',
|
| + 'status': http_client.PARTIAL_CONTENT,
|
| + },
|
| + content=string.ascii_lowercase,
|
| + request_url='https://part.one/',
|
| + )
|
| + elif url == 'https://part.two/':
|
| + return http_wrapper.Response(
|
| + info={
|
| + 'content-range': 'bytes 26-51/52',
|
| + 'status': http_client.OK,
|
| + },
|
| + content=string.ascii_uppercase,
|
| + request_url='https://part.two/',
|
| + )
|
| + else:
|
| + self.fail('Unknown URL requested: %s' % url)
|
| +
|
| + with mock.patch.object(http_wrapper, 'MakeRequest',
|
| + autospec=True) as make_request:
|
| + make_request.side_effect = _ReturnBytes
|
| + request = http_wrapper.Request(url='https://part.one/')
|
| + download.InitializeDownload(request, http=http)
|
| + self.assertEqual(2, make_request.call_count)
|
| + for call in make_request.call_args_list:
|
| + self.assertRangeAndContentRangeCompatible(
|
| + call[0][1], _ReturnBytes(*call[0]))
|
| + download_stream.seek(0)
|
| + self.assertEqual(string.ascii_lowercase + string.ascii_uppercase,
|
| + download_stream.getvalue())
|
| +
|
| + def testFromEncoding(self):
|
| + # Test a specific corner case in multipart encoding.
|
| +
|
| + # Python's mime module by default encodes lines that start with
|
| + # "From " as ">From ", which we need to make sure we don't run afoul
|
| + # of when sending content that isn't intended to be so encoded. This
|
| + # test calls out that we get this right. We test for both the
|
| + # multipart and non-multipart case.
|
| + multipart_body = '{"body_field_one": 7}'
|
| + upload_contents = 'line one\nFrom \nline two'
|
| + upload_config = base_api.ApiUploadInfo(
|
| + accept=['*/*'],
|
| + max_size=None,
|
| + resumable_multipart=True,
|
| + resumable_path=u'/resumable/upload',
|
| + simple_multipart=True,
|
| + simple_path=u'/upload',
|
| + )
|
| + url_builder = base_api._UrlBuilder('http://www.uploads.com')
|
| +
|
| + # Test multipart: having a body argument in http_request forces
|
| + # multipart here.
|
| + upload = transfer.Upload.FromStream(
|
| + six.StringIO(upload_contents),
|
| + 'text/plain',
|
| + total_size=len(upload_contents))
|
| + http_request = http_wrapper.Request(
|
| + 'http://www.uploads.com',
|
| + headers={'content-type': 'text/plain'},
|
| + body=multipart_body)
|
| + upload.ConfigureRequest(upload_config, http_request, url_builder)
|
| + self.assertEqual(url_builder.query_params['uploadType'], 'multipart')
|
| + rewritten_upload_contents = '\n'.join(
|
| + http_request.body.split('--')[2].splitlines()[1:])
|
| + self.assertTrue(rewritten_upload_contents.endswith(upload_contents))
|
| +
|
| + # Test non-multipart (aka media): no body argument means this is
|
| + # sent as media.
|
| + upload = transfer.Upload.FromStream(
|
| + six.StringIO(upload_contents),
|
| + 'text/plain',
|
| + total_size=len(upload_contents))
|
| + http_request = http_wrapper.Request(
|
| + 'http://www.uploads.com',
|
| + headers={'content-type': 'text/plain'})
|
| + upload.ConfigureRequest(upload_config, http_request, url_builder)
|
| + self.assertEqual(url_builder.query_params['uploadType'], 'media')
|
| + rewritten_upload_contents = http_request.body
|
| + self.assertTrue(rewritten_upload_contents.endswith(upload_contents))
|
|
|