Index: third_party/gsutil/third_party/apitools/apitools/base/py/transfer_test.py |
diff --git a/third_party/gsutil/third_party/apitools/apitools/base/py/transfer_test.py b/third_party/gsutil/third_party/apitools/apitools/base/py/transfer_test.py |
new file mode 100644 |
index 0000000000000000000000000000000000000000..9d58b1ea629b99105cdd4348dc18ce452ea66042 |
--- /dev/null |
+++ b/third_party/gsutil/third_party/apitools/apitools/base/py/transfer_test.py |
@@ -0,0 +1,205 @@ |
+"""Tests for transfer.py.""" |
+import string |
+ |
+import mock |
+import six |
+from six.moves import http_client |
+import unittest2 |
+ |
+from apitools.base.py import base_api |
+from apitools.base.py import http_wrapper |
+from apitools.base.py import transfer |
+ |
+ |
+class TransferTest(unittest2.TestCase): |
+ |
+ def assertRangeAndContentRangeCompatible(self, request, response): |
+ request_prefix = 'bytes=' |
+ self.assertIn('range', request.headers) |
+ self.assertTrue(request.headers['range'].startswith(request_prefix)) |
+ request_range = request.headers['range'][len(request_prefix):] |
+ |
+ response_prefix = 'bytes ' |
+ self.assertIn('content-range', response.info) |
+ response_header = response.info['content-range'] |
+ self.assertTrue(response_header.startswith(response_prefix)) |
+ response_range = ( |
+ response_header[len(response_prefix):].partition('/')[0]) |
+ |
+ msg = ('Request range ({0}) not a prefix of ' |
+ 'response_range ({1})').format( |
+ request_range, response_range) |
+ self.assertTrue(response_range.startswith(request_range), msg=msg) |
+ |
+ def testComputeEndByte(self): |
+ total_size = 100 |
+ chunksize = 10 |
+ download = transfer.Download.FromStream( |
+ six.StringIO(), chunksize=chunksize, total_size=total_size) |
+ self.assertEqual(chunksize - 1, |
+ download._Download__ComputeEndByte(0, end=50)) |
+ |
+ def testComputeEndByteReturnNone(self): |
+ download = transfer.Download.FromStream(six.StringIO()) |
+ self.assertIsNone( |
+ download._Download__ComputeEndByte(0, use_chunks=False)) |
+ |
+ def testComputeEndByteNoChunks(self): |
+ total_size = 100 |
+ download = transfer.Download.FromStream( |
+ six.StringIO(), chunksize=10, total_size=total_size) |
+ for end in (None, 1000): |
+ self.assertEqual( |
+ total_size - 1, |
+ download._Download__ComputeEndByte(0, end=end, |
+ use_chunks=False), |
+ msg='Failed on end={0}'.format(end)) |
+ |
+ def testComputeEndByteNoTotal(self): |
+ download = transfer.Download.FromStream(six.StringIO()) |
+ default_chunksize = download.chunksize |
+ for chunksize in (100, default_chunksize): |
+ download.chunksize = chunksize |
+ for start in (0, 10): |
+ self.assertEqual( |
+ download.chunksize + start - 1, |
+ download._Download__ComputeEndByte(start), |
+ msg='Failed on start={0}, chunksize={1}'.format( |
+ start, chunksize)) |
+ |
+ def testComputeEndByteSmallTotal(self): |
+ total_size = 100 |
+ download = transfer.Download.FromStream(six.StringIO(), |
+ total_size=total_size) |
+ for start in (0, 10): |
+ self.assertEqual(total_size - 1, |
+ download._Download__ComputeEndByte(start), |
+ msg='Failed on start={0}'.format(start)) |
+ |
+ def testNonChunkedDownload(self): |
+ bytes_http = object() |
+ http = object() |
+ download_stream = six.StringIO() |
+ download = transfer.Download.FromStream(download_stream, total_size=52) |
+ download.bytes_http = bytes_http |
+ base_url = 'https://part.one/' |
+ |
+ with mock.patch.object(http_wrapper, 'MakeRequest', |
+ autospec=True) as make_request: |
+ make_request.return_value = http_wrapper.Response( |
+ info={ |
+ 'content-range': 'bytes 0-51/52', |
+ 'status': http_client.OK, |
+ }, |
+ content=string.ascii_lowercase * 2, |
+ request_url=base_url, |
+ ) |
+ request = http_wrapper.Request(url='https://part.one/') |
+ download.InitializeDownload(request, http=http) |
+ self.assertEqual(1, make_request.call_count) |
+ received_request = make_request.call_args[0][1] |
+ self.assertEqual(base_url, received_request.url) |
+ self.assertRangeAndContentRangeCompatible( |
+ received_request, make_request.return_value) |
+ download_stream.seek(0) |
+ self.assertEqual(string.ascii_lowercase * 2, |
+ download_stream.getvalue()) |
+ |
+ def testChunkedDownload(self): |
+ bytes_http = object() |
+ http = object() |
+ download_stream = six.StringIO() |
+ download = transfer.Download.FromStream( |
+ download_stream, chunksize=26, total_size=52) |
+ download.bytes_http = bytes_http |
+ |
+ # Setting autospec on a mock with an iterable side_effect is |
+ # currently broken (http://bugs.python.org/issue17826), so |
+ # instead we write a little function. |
+ def _ReturnBytes(unused_http, http_request, |
+ *unused_args, **unused_kwds): |
+ url = http_request.url |
+ if url == 'https://part.one/': |
+ return http_wrapper.Response( |
+ info={ |
+ 'content-location': 'https://part.two/', |
+ 'content-range': 'bytes 0-25/52', |
+ 'status': http_client.PARTIAL_CONTENT, |
+ }, |
+ content=string.ascii_lowercase, |
+ request_url='https://part.one/', |
+ ) |
+ elif url == 'https://part.two/': |
+ return http_wrapper.Response( |
+ info={ |
+ 'content-range': 'bytes 26-51/52', |
+ 'status': http_client.OK, |
+ }, |
+ content=string.ascii_uppercase, |
+ request_url='https://part.two/', |
+ ) |
+ else: |
+ self.fail('Unknown URL requested: %s' % url) |
+ |
+ with mock.patch.object(http_wrapper, 'MakeRequest', |
+ autospec=True) as make_request: |
+ make_request.side_effect = _ReturnBytes |
+ request = http_wrapper.Request(url='https://part.one/') |
+ download.InitializeDownload(request, http=http) |
+ self.assertEqual(2, make_request.call_count) |
+ for call in make_request.call_args_list: |
+ self.assertRangeAndContentRangeCompatible( |
+ call[0][1], _ReturnBytes(*call[0])) |
+ download_stream.seek(0) |
+ self.assertEqual(string.ascii_lowercase + string.ascii_uppercase, |
+ download_stream.getvalue()) |
+ |
+ def testFromEncoding(self): |
+ # Test a specific corner case in multipart encoding. |
+ |
+ # Python's mime module by default encodes lines that start with |
+ # "From " as ">From ", which we need to make sure we don't run afoul |
+ # of when sending content that isn't intended to be so encoded. This |
+ # test calls out that we get this right. We test for both the |
+ # multipart and non-multipart case. |
+ multipart_body = '{"body_field_one": 7}' |
+ upload_contents = 'line one\nFrom \nline two' |
+ upload_config = base_api.ApiUploadInfo( |
+ accept=['*/*'], |
+ max_size=None, |
+ resumable_multipart=True, |
+ resumable_path=u'/resumable/upload', |
+ simple_multipart=True, |
+ simple_path=u'/upload', |
+ ) |
+ url_builder = base_api._UrlBuilder('http://www.uploads.com') |
+ |
+ # Test multipart: having a body argument in http_request forces |
+ # multipart here. |
+ upload = transfer.Upload.FromStream( |
+ six.StringIO(upload_contents), |
+ 'text/plain', |
+ total_size=len(upload_contents)) |
+ http_request = http_wrapper.Request( |
+ 'http://www.uploads.com', |
+ headers={'content-type': 'text/plain'}, |
+ body=multipart_body) |
+ upload.ConfigureRequest(upload_config, http_request, url_builder) |
+ self.assertEqual(url_builder.query_params['uploadType'], 'multipart') |
+ rewritten_upload_contents = '\n'.join( |
+ http_request.body.split('--')[2].splitlines()[1:]) |
+ self.assertTrue(rewritten_upload_contents.endswith(upload_contents)) |
+ |
+ # Test non-multipart (aka media): no body argument means this is |
+ # sent as media. |
+ upload = transfer.Upload.FromStream( |
+ six.StringIO(upload_contents), |
+ 'text/plain', |
+ total_size=len(upload_contents)) |
+ http_request = http_wrapper.Request( |
+ 'http://www.uploads.com', |
+ headers={'content-type': 'text/plain'}) |
+ upload.ConfigureRequest(upload_config, http_request, url_builder) |
+ self.assertEqual(url_builder.query_params['uploadType'], 'media') |
+ rewritten_upload_contents = http_request.body |
+ self.assertTrue(rewritten_upload_contents.endswith(upload_contents)) |