Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(626)

Unified Diff: tools/telemetry/third_party/gsutilz/third_party/boto/tests/unit/s3/test_uri.py

Issue 1493973002: Remove telemetry/third_party/gsutilz (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@gsutil_changes
Patch Set: rebase Created 5 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: tools/telemetry/third_party/gsutilz/third_party/boto/tests/unit/s3/test_uri.py
diff --git a/tools/telemetry/third_party/gsutilz/third_party/boto/tests/unit/s3/test_uri.py b/tools/telemetry/third_party/gsutilz/third_party/boto/tests/unit/s3/test_uri.py
deleted file mode 100755
index 89149923640a899bc55d100c6685b390a87c7991..0000000000000000000000000000000000000000
--- a/tools/telemetry/third_party/gsutilz/third_party/boto/tests/unit/s3/test_uri.py
+++ /dev/null
@@ -1,265 +0,0 @@
-#!/usr/bin/env python
-# Copyright (c) 2013 Google, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import boto
-import tempfile
-
-from boto.exception import InvalidUriError
-from boto import storage_uri
-from boto.compat import urllib
-from boto.s3.keyfile import KeyFile
-from tests.integration.s3.mock_storage_service import MockBucket
-from tests.integration.s3.mock_storage_service import MockBucketStorageUri
-from tests.integration.s3.mock_storage_service import MockConnection
-from tests.unit import unittest
-
-"""Unit tests for StorageUri interface."""
-
-class UriTest(unittest.TestCase):
-
- def test_provider_uri(self):
- for prov in ('gs', 's3'):
- uri_str = '%s://' % prov
- uri = boto.storage_uri(uri_str, validate=False,
- suppress_consec_slashes=False)
- self.assertEqual(prov, uri.scheme)
- self.assertEqual(uri_str, uri.uri)
- self.assertFalse(hasattr(uri, 'versionless_uri'))
- self.assertEqual('', uri.bucket_name)
- self.assertEqual('', uri.object_name)
- self.assertEqual(None, uri.version_id)
- self.assertEqual(None, uri.generation)
- self.assertEqual(uri.names_provider(), True)
- self.assertEqual(uri.names_container(), True)
- self.assertEqual(uri.names_bucket(), False)
- self.assertEqual(uri.names_object(), False)
- self.assertEqual(uri.names_directory(), False)
- self.assertEqual(uri.names_file(), False)
- self.assertEqual(uri.is_stream(), False)
- self.assertEqual(uri.is_version_specific, False)
-
- def test_bucket_uri_no_trailing_slash(self):
- for prov in ('gs', 's3'):
- uri_str = '%s://bucket' % prov
- uri = boto.storage_uri(uri_str, validate=False,
- suppress_consec_slashes=False)
- self.assertEqual(prov, uri.scheme)
- self.assertEqual('%s/' % uri_str, uri.uri)
- self.assertFalse(hasattr(uri, 'versionless_uri'))
- self.assertEqual('bucket', uri.bucket_name)
- self.assertEqual('', uri.object_name)
- self.assertEqual(None, uri.version_id)
- self.assertEqual(None, uri.generation)
- self.assertEqual(uri.names_provider(), False)
- self.assertEqual(uri.names_container(), True)
- self.assertEqual(uri.names_bucket(), True)
- self.assertEqual(uri.names_object(), False)
- self.assertEqual(uri.names_directory(), False)
- self.assertEqual(uri.names_file(), False)
- self.assertEqual(uri.is_stream(), False)
- self.assertEqual(uri.is_version_specific, False)
-
- def test_bucket_uri_with_trailing_slash(self):
- for prov in ('gs', 's3'):
- uri_str = '%s://bucket/' % prov
- uri = boto.storage_uri(uri_str, validate=False,
- suppress_consec_slashes=False)
- self.assertEqual(prov, uri.scheme)
- self.assertEqual(uri_str, uri.uri)
- self.assertFalse(hasattr(uri, 'versionless_uri'))
- self.assertEqual('bucket', uri.bucket_name)
- self.assertEqual('', uri.object_name)
- self.assertEqual(None, uri.version_id)
- self.assertEqual(None, uri.generation)
- self.assertEqual(uri.names_provider(), False)
- self.assertEqual(uri.names_container(), True)
- self.assertEqual(uri.names_bucket(), True)
- self.assertEqual(uri.names_object(), False)
- self.assertEqual(uri.names_directory(), False)
- self.assertEqual(uri.names_file(), False)
- self.assertEqual(uri.is_stream(), False)
- self.assertEqual(uri.is_version_specific, False)
-
- def test_non_versioned_object_uri(self):
- for prov in ('gs', 's3'):
- uri_str = '%s://bucket/obj/a/b' % prov
- uri = boto.storage_uri(uri_str, validate=False,
- suppress_consec_slashes=False)
- self.assertEqual(prov, uri.scheme)
- self.assertEqual(uri_str, uri.uri)
- self.assertEqual(uri_str, uri.versionless_uri)
- self.assertEqual('bucket', uri.bucket_name)
- self.assertEqual('obj/a/b', uri.object_name)
- self.assertEqual(None, uri.version_id)
- self.assertEqual(None, uri.generation)
- self.assertEqual(uri.names_provider(), False)
- self.assertEqual(uri.names_container(), False)
- self.assertEqual(uri.names_bucket(), False)
- self.assertEqual(uri.names_object(), True)
- self.assertEqual(uri.names_directory(), False)
- self.assertEqual(uri.names_file(), False)
- self.assertEqual(uri.is_stream(), False)
- self.assertEqual(uri.is_version_specific, False)
-
- def test_versioned_gs_object_uri(self):
- uri_str = 'gs://bucket/obj/a/b#1359908801674000'
- uri = boto.storage_uri(uri_str, validate=False,
- suppress_consec_slashes=False)
- self.assertEqual('gs', uri.scheme)
- self.assertEqual(uri_str, uri.uri)
- self.assertEqual('gs://bucket/obj/a/b', uri.versionless_uri)
- self.assertEqual('bucket', uri.bucket_name)
- self.assertEqual('obj/a/b', uri.object_name)
- self.assertEqual(None, uri.version_id)
- self.assertEqual(1359908801674000, uri.generation)
- self.assertEqual(uri.names_provider(), False)
- self.assertEqual(uri.names_container(), False)
- self.assertEqual(uri.names_bucket(), False)
- self.assertEqual(uri.names_object(), True)
- self.assertEqual(uri.names_directory(), False)
- self.assertEqual(uri.names_file(), False)
- self.assertEqual(uri.is_stream(), False)
- self.assertEqual(uri.is_version_specific, True)
-
- def test_versioned_gs_object_uri_with_legacy_generation_value(self):
- uri_str = 'gs://bucket/obj/a/b#1'
- uri = boto.storage_uri(uri_str, validate=False,
- suppress_consec_slashes=False)
- self.assertEqual('gs', uri.scheme)
- self.assertEqual(uri_str, uri.uri)
- self.assertEqual('gs://bucket/obj/a/b', uri.versionless_uri)
- self.assertEqual('bucket', uri.bucket_name)
- self.assertEqual('obj/a/b', uri.object_name)
- self.assertEqual(None, uri.version_id)
- self.assertEqual(1, uri.generation)
- self.assertEqual(uri.names_provider(), False)
- self.assertEqual(uri.names_container(), False)
- self.assertEqual(uri.names_bucket(), False)
- self.assertEqual(uri.names_object(), True)
- self.assertEqual(uri.names_directory(), False)
- self.assertEqual(uri.names_file(), False)
- self.assertEqual(uri.is_stream(), False)
- self.assertEqual(uri.is_version_specific, True)
-
- def test_roundtrip_versioned_gs_object_uri_parsed(self):
- uri_str = 'gs://bucket/obj#1359908801674000'
- uri = boto.storage_uri(uri_str, validate=False,
- suppress_consec_slashes=False)
- roundtrip_uri = boto.storage_uri(uri.uri, validate=False,
- suppress_consec_slashes=False)
- self.assertEqual(uri.uri, roundtrip_uri.uri)
- self.assertEqual(uri.is_version_specific, True)
-
- def test_versioned_s3_object_uri(self):
- uri_str = 's3://bucket/obj/a/b#eMuM0J15HkJ9QHlktfNP5MfA.oYR2q6S'
- uri = boto.storage_uri(uri_str, validate=False,
- suppress_consec_slashes=False)
- self.assertEqual('s3', uri.scheme)
- self.assertEqual(uri_str, uri.uri)
- self.assertEqual('s3://bucket/obj/a/b', uri.versionless_uri)
- self.assertEqual('bucket', uri.bucket_name)
- self.assertEqual('obj/a/b', uri.object_name)
- self.assertEqual('eMuM0J15HkJ9QHlktfNP5MfA.oYR2q6S', uri.version_id)
- self.assertEqual(None, uri.generation)
- self.assertEqual(uri.names_provider(), False)
- self.assertEqual(uri.names_container(), False)
- self.assertEqual(uri.names_bucket(), False)
- self.assertEqual(uri.names_object(), True)
- self.assertEqual(uri.names_directory(), False)
- self.assertEqual(uri.names_file(), False)
- self.assertEqual(uri.is_stream(), False)
- self.assertEqual(uri.is_version_specific, True)
-
- def test_explicit_file_uri(self):
- tmp_dir = tempfile.tempdir or ''
- uri_str = 'file://%s' % urllib.request.pathname2url(tmp_dir)
- uri = boto.storage_uri(uri_str, validate=False,
- suppress_consec_slashes=False)
- self.assertEqual('file', uri.scheme)
- self.assertEqual(uri_str, uri.uri)
- self.assertFalse(hasattr(uri, 'versionless_uri'))
- self.assertEqual('', uri.bucket_name)
- self.assertEqual(tmp_dir, uri.object_name)
- self.assertFalse(hasattr(uri, 'version_id'))
- self.assertFalse(hasattr(uri, 'generation'))
- self.assertFalse(hasattr(uri, 'is_version_specific'))
- self.assertEqual(uri.names_provider(), False)
- self.assertEqual(uri.names_bucket(), False)
- # Don't check uri.names_container(), uri.names_directory(),
- # uri.names_file(), or uri.names_object(), because for file URIs these
- # functions look at the file system and apparently unit tests run
- # chroot'd.
- self.assertEqual(uri.is_stream(), False)
-
- def test_implicit_file_uri(self):
- tmp_dir = tempfile.tempdir or ''
- uri_str = '%s' % urllib.request.pathname2url(tmp_dir)
- uri = boto.storage_uri(uri_str, validate=False,
- suppress_consec_slashes=False)
- self.assertEqual('file', uri.scheme)
- self.assertEqual('file://%s' % tmp_dir, uri.uri)
- self.assertFalse(hasattr(uri, 'versionless_uri'))
- self.assertEqual('', uri.bucket_name)
- self.assertEqual(tmp_dir, uri.object_name)
- self.assertFalse(hasattr(uri, 'version_id'))
- self.assertFalse(hasattr(uri, 'generation'))
- self.assertFalse(hasattr(uri, 'is_version_specific'))
- self.assertEqual(uri.names_provider(), False)
- self.assertEqual(uri.names_bucket(), False)
- # Don't check uri.names_container(), uri.names_directory(),
- # uri.names_file(), or uri.names_object(), because for file URIs these
- # functions look at the file system and apparently unit tests run
- # chroot'd.
- self.assertEqual(uri.is_stream(), False)
-
- def test_gs_object_uri_contains_sharp_not_matching_version_syntax(self):
- uri_str = 'gs://bucket/obj#13a990880167400'
- uri = boto.storage_uri(uri_str, validate=False,
- suppress_consec_slashes=False)
- self.assertEqual('gs', uri.scheme)
- self.assertEqual(uri_str, uri.uri)
- self.assertEqual('gs://bucket/obj#13a990880167400',
- uri.versionless_uri)
- self.assertEqual('bucket', uri.bucket_name)
- self.assertEqual('obj#13a990880167400', uri.object_name)
- self.assertEqual(None, uri.version_id)
- self.assertEqual(None, uri.generation)
- self.assertEqual(uri.names_provider(), False)
- self.assertEqual(uri.names_container(), False)
- self.assertEqual(uri.names_bucket(), False)
- self.assertEqual(uri.names_object(), True)
- self.assertEqual(uri.names_directory(), False)
- self.assertEqual(uri.names_file(), False)
- self.assertEqual(uri.is_stream(), False)
- self.assertEqual(uri.is_version_specific, False)
-
- def test_file_containing_colon(self):
- uri_str = 'abc:def'
- uri = boto.storage_uri(uri_str, validate=False,
- suppress_consec_slashes=False)
- self.assertEqual('file', uri.scheme)
- self.assertEqual('file://%s' % uri_str, uri.uri)
-
- def test_invalid_scheme(self):
- uri_str = 'mars://bucket/object'
- try:
- boto.storage_uri(uri_str, validate=False,
- suppress_consec_slashes=False)
- except InvalidUriError as e:
- self.assertIn('Unrecognized scheme', e.message)
-
-
-if __name__ == '__main__':
- unittest.main()

Powered by Google App Engine
This is Rietveld 408576698