| Index: tools/telemetry/third_party/gsutilz/third_party/boto/tests/unit/s3/test_uri.py
|
| diff --git a/tools/telemetry/third_party/gsutilz/third_party/boto/tests/unit/s3/test_uri.py b/tools/telemetry/third_party/gsutilz/third_party/boto/tests/unit/s3/test_uri.py
|
| deleted file mode 100755
|
| index 89149923640a899bc55d100c6685b390a87c7991..0000000000000000000000000000000000000000
|
| --- a/tools/telemetry/third_party/gsutilz/third_party/boto/tests/unit/s3/test_uri.py
|
| +++ /dev/null
|
| @@ -1,265 +0,0 @@
|
| -#!/usr/bin/env python
|
| -# Copyright (c) 2013 Google, Inc.
|
| -#
|
| -# Licensed under the Apache License, Version 2.0 (the "License");
|
| -# you may not use this file except in compliance with the License.
|
| -# You may obtain a copy of the License at
|
| -#
|
| -# http://www.apache.org/licenses/LICENSE-2.0
|
| -#
|
| -# Unless required by applicable law or agreed to in writing, software
|
| -# distributed under the License is distributed on an "AS IS" BASIS,
|
| -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
| -# See the License for the specific language governing permissions and
|
| -# limitations under the License.
|
| -
|
| -import boto
|
| -import tempfile
|
| -
|
| -from boto.exception import InvalidUriError
|
| -from boto import storage_uri
|
| -from boto.compat import urllib
|
| -from boto.s3.keyfile import KeyFile
|
| -from tests.integration.s3.mock_storage_service import MockBucket
|
| -from tests.integration.s3.mock_storage_service import MockBucketStorageUri
|
| -from tests.integration.s3.mock_storage_service import MockConnection
|
| -from tests.unit import unittest
|
| -
|
| -"""Unit tests for StorageUri interface."""
|
| -
|
| -class UriTest(unittest.TestCase):
|
| -
|
| - def test_provider_uri(self):
|
| - for prov in ('gs', 's3'):
|
| - uri_str = '%s://' % prov
|
| - uri = boto.storage_uri(uri_str, validate=False,
|
| - suppress_consec_slashes=False)
|
| - self.assertEqual(prov, uri.scheme)
|
| - self.assertEqual(uri_str, uri.uri)
|
| - self.assertFalse(hasattr(uri, 'versionless_uri'))
|
| - self.assertEqual('', uri.bucket_name)
|
| - self.assertEqual('', uri.object_name)
|
| - self.assertEqual(None, uri.version_id)
|
| - self.assertEqual(None, uri.generation)
|
| - self.assertEqual(uri.names_provider(), True)
|
| - self.assertEqual(uri.names_container(), True)
|
| - self.assertEqual(uri.names_bucket(), False)
|
| - self.assertEqual(uri.names_object(), False)
|
| - self.assertEqual(uri.names_directory(), False)
|
| - self.assertEqual(uri.names_file(), False)
|
| - self.assertEqual(uri.is_stream(), False)
|
| - self.assertEqual(uri.is_version_specific, False)
|
| -
|
| - def test_bucket_uri_no_trailing_slash(self):
|
| - for prov in ('gs', 's3'):
|
| - uri_str = '%s://bucket' % prov
|
| - uri = boto.storage_uri(uri_str, validate=False,
|
| - suppress_consec_slashes=False)
|
| - self.assertEqual(prov, uri.scheme)
|
| - self.assertEqual('%s/' % uri_str, uri.uri)
|
| - self.assertFalse(hasattr(uri, 'versionless_uri'))
|
| - self.assertEqual('bucket', uri.bucket_name)
|
| - self.assertEqual('', uri.object_name)
|
| - self.assertEqual(None, uri.version_id)
|
| - self.assertEqual(None, uri.generation)
|
| - self.assertEqual(uri.names_provider(), False)
|
| - self.assertEqual(uri.names_container(), True)
|
| - self.assertEqual(uri.names_bucket(), True)
|
| - self.assertEqual(uri.names_object(), False)
|
| - self.assertEqual(uri.names_directory(), False)
|
| - self.assertEqual(uri.names_file(), False)
|
| - self.assertEqual(uri.is_stream(), False)
|
| - self.assertEqual(uri.is_version_specific, False)
|
| -
|
| - def test_bucket_uri_with_trailing_slash(self):
|
| - for prov in ('gs', 's3'):
|
| - uri_str = '%s://bucket/' % prov
|
| - uri = boto.storage_uri(uri_str, validate=False,
|
| - suppress_consec_slashes=False)
|
| - self.assertEqual(prov, uri.scheme)
|
| - self.assertEqual(uri_str, uri.uri)
|
| - self.assertFalse(hasattr(uri, 'versionless_uri'))
|
| - self.assertEqual('bucket', uri.bucket_name)
|
| - self.assertEqual('', uri.object_name)
|
| - self.assertEqual(None, uri.version_id)
|
| - self.assertEqual(None, uri.generation)
|
| - self.assertEqual(uri.names_provider(), False)
|
| - self.assertEqual(uri.names_container(), True)
|
| - self.assertEqual(uri.names_bucket(), True)
|
| - self.assertEqual(uri.names_object(), False)
|
| - self.assertEqual(uri.names_directory(), False)
|
| - self.assertEqual(uri.names_file(), False)
|
| - self.assertEqual(uri.is_stream(), False)
|
| - self.assertEqual(uri.is_version_specific, False)
|
| -
|
| - def test_non_versioned_object_uri(self):
|
| - for prov in ('gs', 's3'):
|
| - uri_str = '%s://bucket/obj/a/b' % prov
|
| - uri = boto.storage_uri(uri_str, validate=False,
|
| - suppress_consec_slashes=False)
|
| - self.assertEqual(prov, uri.scheme)
|
| - self.assertEqual(uri_str, uri.uri)
|
| - self.assertEqual(uri_str, uri.versionless_uri)
|
| - self.assertEqual('bucket', uri.bucket_name)
|
| - self.assertEqual('obj/a/b', uri.object_name)
|
| - self.assertEqual(None, uri.version_id)
|
| - self.assertEqual(None, uri.generation)
|
| - self.assertEqual(uri.names_provider(), False)
|
| - self.assertEqual(uri.names_container(), False)
|
| - self.assertEqual(uri.names_bucket(), False)
|
| - self.assertEqual(uri.names_object(), True)
|
| - self.assertEqual(uri.names_directory(), False)
|
| - self.assertEqual(uri.names_file(), False)
|
| - self.assertEqual(uri.is_stream(), False)
|
| - self.assertEqual(uri.is_version_specific, False)
|
| -
|
| - def test_versioned_gs_object_uri(self):
|
| - uri_str = 'gs://bucket/obj/a/b#1359908801674000'
|
| - uri = boto.storage_uri(uri_str, validate=False,
|
| - suppress_consec_slashes=False)
|
| - self.assertEqual('gs', uri.scheme)
|
| - self.assertEqual(uri_str, uri.uri)
|
| - self.assertEqual('gs://bucket/obj/a/b', uri.versionless_uri)
|
| - self.assertEqual('bucket', uri.bucket_name)
|
| - self.assertEqual('obj/a/b', uri.object_name)
|
| - self.assertEqual(None, uri.version_id)
|
| - self.assertEqual(1359908801674000, uri.generation)
|
| - self.assertEqual(uri.names_provider(), False)
|
| - self.assertEqual(uri.names_container(), False)
|
| - self.assertEqual(uri.names_bucket(), False)
|
| - self.assertEqual(uri.names_object(), True)
|
| - self.assertEqual(uri.names_directory(), False)
|
| - self.assertEqual(uri.names_file(), False)
|
| - self.assertEqual(uri.is_stream(), False)
|
| - self.assertEqual(uri.is_version_specific, True)
|
| -
|
| - def test_versioned_gs_object_uri_with_legacy_generation_value(self):
|
| - uri_str = 'gs://bucket/obj/a/b#1'
|
| - uri = boto.storage_uri(uri_str, validate=False,
|
| - suppress_consec_slashes=False)
|
| - self.assertEqual('gs', uri.scheme)
|
| - self.assertEqual(uri_str, uri.uri)
|
| - self.assertEqual('gs://bucket/obj/a/b', uri.versionless_uri)
|
| - self.assertEqual('bucket', uri.bucket_name)
|
| - self.assertEqual('obj/a/b', uri.object_name)
|
| - self.assertEqual(None, uri.version_id)
|
| - self.assertEqual(1, uri.generation)
|
| - self.assertEqual(uri.names_provider(), False)
|
| - self.assertEqual(uri.names_container(), False)
|
| - self.assertEqual(uri.names_bucket(), False)
|
| - self.assertEqual(uri.names_object(), True)
|
| - self.assertEqual(uri.names_directory(), False)
|
| - self.assertEqual(uri.names_file(), False)
|
| - self.assertEqual(uri.is_stream(), False)
|
| - self.assertEqual(uri.is_version_specific, True)
|
| -
|
| - def test_roundtrip_versioned_gs_object_uri_parsed(self):
|
| - uri_str = 'gs://bucket/obj#1359908801674000'
|
| - uri = boto.storage_uri(uri_str, validate=False,
|
| - suppress_consec_slashes=False)
|
| - roundtrip_uri = boto.storage_uri(uri.uri, validate=False,
|
| - suppress_consec_slashes=False)
|
| - self.assertEqual(uri.uri, roundtrip_uri.uri)
|
| - self.assertEqual(uri.is_version_specific, True)
|
| -
|
| - def test_versioned_s3_object_uri(self):
|
| - uri_str = 's3://bucket/obj/a/b#eMuM0J15HkJ9QHlktfNP5MfA.oYR2q6S'
|
| - uri = boto.storage_uri(uri_str, validate=False,
|
| - suppress_consec_slashes=False)
|
| - self.assertEqual('s3', uri.scheme)
|
| - self.assertEqual(uri_str, uri.uri)
|
| - self.assertEqual('s3://bucket/obj/a/b', uri.versionless_uri)
|
| - self.assertEqual('bucket', uri.bucket_name)
|
| - self.assertEqual('obj/a/b', uri.object_name)
|
| - self.assertEqual('eMuM0J15HkJ9QHlktfNP5MfA.oYR2q6S', uri.version_id)
|
| - self.assertEqual(None, uri.generation)
|
| - self.assertEqual(uri.names_provider(), False)
|
| - self.assertEqual(uri.names_container(), False)
|
| - self.assertEqual(uri.names_bucket(), False)
|
| - self.assertEqual(uri.names_object(), True)
|
| - self.assertEqual(uri.names_directory(), False)
|
| - self.assertEqual(uri.names_file(), False)
|
| - self.assertEqual(uri.is_stream(), False)
|
| - self.assertEqual(uri.is_version_specific, True)
|
| -
|
| - def test_explicit_file_uri(self):
|
| - tmp_dir = tempfile.tempdir or ''
|
| - uri_str = 'file://%s' % urllib.request.pathname2url(tmp_dir)
|
| - uri = boto.storage_uri(uri_str, validate=False,
|
| - suppress_consec_slashes=False)
|
| - self.assertEqual('file', uri.scheme)
|
| - self.assertEqual(uri_str, uri.uri)
|
| - self.assertFalse(hasattr(uri, 'versionless_uri'))
|
| - self.assertEqual('', uri.bucket_name)
|
| - self.assertEqual(tmp_dir, uri.object_name)
|
| - self.assertFalse(hasattr(uri, 'version_id'))
|
| - self.assertFalse(hasattr(uri, 'generation'))
|
| - self.assertFalse(hasattr(uri, 'is_version_specific'))
|
| - self.assertEqual(uri.names_provider(), False)
|
| - self.assertEqual(uri.names_bucket(), False)
|
| - # Don't check uri.names_container(), uri.names_directory(),
|
| - # uri.names_file(), or uri.names_object(), because for file URIs these
|
| - # functions look at the file system and apparently unit tests run
|
| - # chroot'd.
|
| - self.assertEqual(uri.is_stream(), False)
|
| -
|
| - def test_implicit_file_uri(self):
|
| - tmp_dir = tempfile.tempdir or ''
|
| - uri_str = '%s' % urllib.request.pathname2url(tmp_dir)
|
| - uri = boto.storage_uri(uri_str, validate=False,
|
| - suppress_consec_slashes=False)
|
| - self.assertEqual('file', uri.scheme)
|
| - self.assertEqual('file://%s' % tmp_dir, uri.uri)
|
| - self.assertFalse(hasattr(uri, 'versionless_uri'))
|
| - self.assertEqual('', uri.bucket_name)
|
| - self.assertEqual(tmp_dir, uri.object_name)
|
| - self.assertFalse(hasattr(uri, 'version_id'))
|
| - self.assertFalse(hasattr(uri, 'generation'))
|
| - self.assertFalse(hasattr(uri, 'is_version_specific'))
|
| - self.assertEqual(uri.names_provider(), False)
|
| - self.assertEqual(uri.names_bucket(), False)
|
| - # Don't check uri.names_container(), uri.names_directory(),
|
| - # uri.names_file(), or uri.names_object(), because for file URIs these
|
| - # functions look at the file system and apparently unit tests run
|
| - # chroot'd.
|
| - self.assertEqual(uri.is_stream(), False)
|
| -
|
| - def test_gs_object_uri_contains_sharp_not_matching_version_syntax(self):
|
| - uri_str = 'gs://bucket/obj#13a990880167400'
|
| - uri = boto.storage_uri(uri_str, validate=False,
|
| - suppress_consec_slashes=False)
|
| - self.assertEqual('gs', uri.scheme)
|
| - self.assertEqual(uri_str, uri.uri)
|
| - self.assertEqual('gs://bucket/obj#13a990880167400',
|
| - uri.versionless_uri)
|
| - self.assertEqual('bucket', uri.bucket_name)
|
| - self.assertEqual('obj#13a990880167400', uri.object_name)
|
| - self.assertEqual(None, uri.version_id)
|
| - self.assertEqual(None, uri.generation)
|
| - self.assertEqual(uri.names_provider(), False)
|
| - self.assertEqual(uri.names_container(), False)
|
| - self.assertEqual(uri.names_bucket(), False)
|
| - self.assertEqual(uri.names_object(), True)
|
| - self.assertEqual(uri.names_directory(), False)
|
| - self.assertEqual(uri.names_file(), False)
|
| - self.assertEqual(uri.is_stream(), False)
|
| - self.assertEqual(uri.is_version_specific, False)
|
| -
|
| - def test_file_containing_colon(self):
|
| - uri_str = 'abc:def'
|
| - uri = boto.storage_uri(uri_str, validate=False,
|
| - suppress_consec_slashes=False)
|
| - self.assertEqual('file', uri.scheme)
|
| - self.assertEqual('file://%s' % uri_str, uri.uri)
|
| -
|
| - def test_invalid_scheme(self):
|
| - uri_str = 'mars://bucket/object'
|
| - try:
|
| - boto.storage_uri(uri_str, validate=False,
|
| - suppress_consec_slashes=False)
|
| - except InvalidUriError as e:
|
| - self.assertIn('Unrecognized scheme', e.message)
|
| -
|
| -
|
| -if __name__ == '__main__':
|
| - unittest.main()
|
|
|