Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(511)

Unified Diff: tools/telemetry/third_party/gsutil/gslib/tests/test_cp.py

Issue 1260493004: Revert "Add gsutil 4.13 to telemetry/third_party" (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 5 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: tools/telemetry/third_party/gsutil/gslib/tests/test_cp.py
diff --git a/tools/telemetry/third_party/gsutil/gslib/tests/test_cp.py b/tools/telemetry/third_party/gsutil/gslib/tests/test_cp.py
deleted file mode 100644
index 7c44c366a614f71eb3c020660c5bf6876e0764b9..0000000000000000000000000000000000000000
--- a/tools/telemetry/third_party/gsutil/gslib/tests/test_cp.py
+++ /dev/null
@@ -1,2021 +0,0 @@
-# -*- coding: utf-8 -*-
-# Copyright 2013 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-"""Integration tests for cp command."""
-
-from __future__ import absolute_import
-
-import base64
-import binascii
-import datetime
-import httplib
-import logging
-import os
-import pickle
-import pkgutil
-import random
-import re
-import string
-import sys
-
-from apitools.base.py import exceptions as apitools_exceptions
-import boto
-from boto import storage_uri
-from boto.exception import ResumableTransferDisposition
-from boto.exception import ResumableUploadException
-from boto.exception import StorageResponseError
-from boto.storage_uri import BucketStorageUri
-
-from gslib.cloud_api import ResumableDownloadException
-from gslib.cloud_api import ResumableUploadException
-from gslib.cloud_api import ResumableUploadStartOverException
-from gslib.copy_helper import GetTrackerFilePath
-from gslib.copy_helper import TrackerFileType
-from gslib.cs_api_map import ApiSelector
-from gslib.gcs_json_api import GcsJsonApi
-from gslib.hashing_helper import CalculateMd5FromContents
-from gslib.storage_url import StorageUrlFromString
-import gslib.tests.testcase as testcase
-from gslib.tests.testcase.base import NotParallelizable
-from gslib.tests.testcase.integration_testcase import SkipForS3
-from gslib.tests.util import GenerationFromURI as urigen
-from gslib.tests.util import HAS_S3_CREDS
-from gslib.tests.util import ObjectToURI as suri
-from gslib.tests.util import PerformsFileToObjectUpload
-from gslib.tests.util import SetBotoConfigForTest
-from gslib.tests.util import unittest
-from gslib.third_party.storage_apitools import storage_v1_messages as apitools_messages
-from gslib.tracker_file import DeleteTrackerFile
-from gslib.tracker_file import GetRewriteTrackerFilePath
-from gslib.util import EIGHT_MIB
-from gslib.util import IS_WINDOWS
-from gslib.util import MakeHumanReadable
-from gslib.util import ONE_KIB
-from gslib.util import ONE_MIB
-from gslib.util import Retry
-from gslib.util import START_CALLBACK_PER_BYTES
-from gslib.util import UTF8
-
-
-# Custom test callbacks must be pickleable, and therefore at global scope.
-class _HaltingCopyCallbackHandler(object):
- """Test callback handler for intentionally stopping a resumable transfer."""
-
- def __init__(self, is_upload, halt_at_byte):
- self._is_upload = is_upload
- self._halt_at_byte = halt_at_byte
-
- # pylint: disable=invalid-name
- def call(self, total_bytes_transferred, total_size):
- """Forcibly exits if the transfer has passed the halting point."""
- if total_bytes_transferred >= self._halt_at_byte:
- sys.stderr.write(
- 'Halting transfer after byte %s. %s/%s transferred.\r\n' % (
- self._halt_at_byte, MakeHumanReadable(total_bytes_transferred),
- MakeHumanReadable(total_size)))
- if self._is_upload:
- raise ResumableUploadException('Artifically halting upload.')
- else:
- raise ResumableDownloadException('Artifically halting download.')
-
-
-class _JSONForceHTTPErrorCopyCallbackHandler(object):
- """Test callback handler that raises an arbitrary HTTP error exception."""
-
- def __init__(self, startover_at_byte, http_error_num):
- self._startover_at_byte = startover_at_byte
- self._http_error_num = http_error_num
- self.started_over_once = False
-
- # pylint: disable=invalid-name
- def call(self, total_bytes_transferred, total_size):
- """Forcibly exits if the transfer has passed the halting point."""
- if (total_bytes_transferred >= self._startover_at_byte
- and not self.started_over_once):
- sys.stderr.write(
- 'Forcing HTTP error %s after byte %s. '
- '%s/%s transferred.\r\n' % (
- self._http_error_num,
- self._startover_at_byte,
- MakeHumanReadable(total_bytes_transferred),
- MakeHumanReadable(total_size)))
- self.started_over_once = True
- raise apitools_exceptions.HttpError(
- {'status': self._http_error_num}, None, None)
-
-
-class _XMLResumableUploadStartOverCopyCallbackHandler(object):
- """Test callback handler that raises start-over exception during upload."""
-
- def __init__(self, startover_at_byte):
- self._startover_at_byte = startover_at_byte
- self.started_over_once = False
-
- # pylint: disable=invalid-name
- def call(self, total_bytes_transferred, total_size):
- """Forcibly exits if the transfer has passed the halting point."""
- if (total_bytes_transferred >= self._startover_at_byte
- and not self.started_over_once):
- sys.stderr.write(
- 'Forcing ResumableUpload start over error after byte %s. '
- '%s/%s transferred.\r\n' % (
- self._startover_at_byte,
- MakeHumanReadable(total_bytes_transferred),
- MakeHumanReadable(total_size)))
- self.started_over_once = True
- raise boto.exception.ResumableUploadException(
- 'Forcing upload start over',
- ResumableTransferDisposition.START_OVER)
-
-
-class _DeleteBucketThenStartOverCopyCallbackHandler(object):
- """Test callback handler that deletes bucket then raises start-over."""
-
- def __init__(self, startover_at_byte, bucket_uri):
- self._startover_at_byte = startover_at_byte
- self._bucket_uri = bucket_uri
- self.started_over_once = False
-
- # pylint: disable=invalid-name
- def call(self, total_bytes_transferred, total_size):
- """Forcibly exits if the transfer has passed the halting point."""
- if (total_bytes_transferred >= self._startover_at_byte
- and not self.started_over_once):
- sys.stderr.write('Deleting bucket (%s)' %(self._bucket_uri.bucket_name))
-
- @Retry(StorageResponseError, tries=5, timeout_secs=1)
- def DeleteBucket():
- bucket_list = list(self._bucket_uri.list_bucket(all_versions=True))
- for k in bucket_list:
- self._bucket_uri.get_bucket().delete_key(k.name,
- version_id=k.version_id)
- self._bucket_uri.delete_bucket()
-
- DeleteBucket()
- sys.stderr.write(
- 'Forcing ResumableUpload start over error after byte %s. '
- '%s/%s transferred.\r\n' % (
- self._startover_at_byte,
- MakeHumanReadable(total_bytes_transferred),
- MakeHumanReadable(total_size)))
- self.started_over_once = True
- raise ResumableUploadStartOverException(
- 'Artificially forcing start-over')
-
-
-class _RewriteHaltException(Exception):
- pass
-
-
-class _HaltingRewriteCallbackHandler(object):
- """Test callback handler for intentionally stopping a rewrite operation."""
-
- def __init__(self, halt_at_byte):
- self._halt_at_byte = halt_at_byte
-
- # pylint: disable=invalid-name
- def call(self, total_bytes_rewritten, unused_total_size):
- """Forcibly exits if the operation has passed the halting point."""
- if total_bytes_rewritten >= self._halt_at_byte:
- raise _RewriteHaltException('Artificially halting rewrite')
-
-
-class _EnsureRewriteResumeCallbackHandler(object):
- """Test callback handler for ensuring a rewrite operation resumed."""
-
- def __init__(self, required_byte):
- self._required_byte = required_byte
-
- # pylint: disable=invalid-name
- def call(self, total_bytes_rewritten, unused_total_size):
- """Forcibly exits if the operation has passed the halting point."""
- if total_bytes_rewritten <= self._required_byte:
- raise _RewriteHaltException(
- 'Rewrite did not resume; %s bytes written, but %s bytes should '
- 'have already been written.' % (total_bytes_rewritten,
- self._required_byte))
-
-
-class _ResumableUploadRetryHandler(object):
- """Test callback handler for causing retries during a resumable transfer."""
-
- def __init__(self, retry_at_byte, exception_to_raise, exc_args,
- num_retries=1):
- self._retry_at_byte = retry_at_byte
- self._exception_to_raise = exception_to_raise
- self._exception_args = exc_args
- self._num_retries = num_retries
-
- self._retries_made = 0
-
- # pylint: disable=invalid-name
- def call(self, total_bytes_transferred, unused_total_size):
- """Cause a single retry at the retry point."""
- if (total_bytes_transferred >= self._retry_at_byte
- and self._retries_made < self._num_retries):
- self._retries_made += 1
- raise self._exception_to_raise(*self._exception_args)
-
-
-class TestCp(testcase.GsUtilIntegrationTestCase):
- """Integration tests for cp command."""
-
- # For tests that artificially halt, we need to ensure at least one callback
- # occurs.
- halt_size = START_CALLBACK_PER_BYTES * 2
-
- def _get_test_file(self, name):
- contents = pkgutil.get_data('gslib', 'tests/test_data/%s' % name)
- return self.CreateTempFile(file_name=name, contents=contents)
-
- @PerformsFileToObjectUpload
- def test_noclobber(self):
- key_uri = self.CreateObject(contents='foo')
- fpath = self.CreateTempFile(contents='bar')
- stderr = self.RunGsUtil(['cp', '-n', fpath, suri(key_uri)],
- return_stderr=True)
- self.assertIn('Skipping existing item: %s' % suri(key_uri), stderr)
- self.assertEqual(key_uri.get_contents_as_string(), 'foo')
- stderr = self.RunGsUtil(['cp', '-n', suri(key_uri), fpath],
- return_stderr=True)
- with open(fpath, 'r') as f:
- self.assertIn('Skipping existing item: %s' % suri(f), stderr)
- self.assertEqual(f.read(), 'bar')
-
- def test_dest_bucket_not_exist(self):
- fpath = self.CreateTempFile(contents='foo')
- invalid_bucket_uri = (
- '%s://%s' % (self.default_provider, self.nonexistent_bucket_name))
- stderr = self.RunGsUtil(['cp', fpath, invalid_bucket_uri],
- expected_status=1, return_stderr=True)
- self.assertIn('does not exist.', stderr)
-
- def test_copy_in_cloud_noclobber(self):
- bucket1_uri = self.CreateBucket()
- bucket2_uri = self.CreateBucket()
- key_uri = self.CreateObject(bucket_uri=bucket1_uri, contents='foo')
- stderr = self.RunGsUtil(['cp', suri(key_uri), suri(bucket2_uri)],
- return_stderr=True)
- # Rewrite API may output an additional 'Copying' progress notification.
- self.assertGreaterEqual(stderr.count('Copying'), 1)
- self.assertLessEqual(stderr.count('Copying'), 2)
- stderr = self.RunGsUtil(['cp', '-n', suri(key_uri), suri(bucket2_uri)],
- return_stderr=True)
- self.assertIn('Skipping existing item: %s' %
- suri(bucket2_uri, key_uri.object_name), stderr)
-
- @PerformsFileToObjectUpload
- def test_streaming(self):
- bucket_uri = self.CreateBucket()
- stderr = self.RunGsUtil(['cp', '-', '%s' % suri(bucket_uri, 'foo')],
- stdin='bar', return_stderr=True)
- self.assertIn('Copying from <STDIN>', stderr)
- key_uri = bucket_uri.clone_replace_name('foo')
- self.assertEqual(key_uri.get_contents_as_string(), 'bar')
-
- def test_streaming_multiple_arguments(self):
- bucket_uri = self.CreateBucket()
- stderr = self.RunGsUtil(['cp', '-', '-', suri(bucket_uri)],
- stdin='bar', return_stderr=True, expected_status=1)
- self.assertIn('Multiple URL strings are not supported with streaming',
- stderr)
-
- # TODO: Implement a way to test both with and without using magic file.
-
- @PerformsFileToObjectUpload
- def test_detect_content_type(self):
- """Tests local detection of content type."""
- bucket_uri = self.CreateBucket()
- dsturi = suri(bucket_uri, 'foo')
-
- self.RunGsUtil(['cp', self._get_test_file('test.mp3'), dsturi])
-
- # Use @Retry as hedge against bucket listing eventual consistency.
- @Retry(AssertionError, tries=3, timeout_secs=1)
- def _Check1():
- stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True)
- if IS_WINDOWS:
- self.assertTrue(
- re.search(r'Content-Type:\s+audio/x-mpg', stdout) or
- re.search(r'Content-Type:\s+audio/mpeg', stdout))
- else:
- self.assertRegexpMatches(stdout, r'Content-Type:\s+audio/mpeg')
- _Check1()
-
- self.RunGsUtil(['cp', self._get_test_file('test.gif'), dsturi])
-
- # Use @Retry as hedge against bucket listing eventual consistency.
- @Retry(AssertionError, tries=3, timeout_secs=1)
- def _Check2():
- stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True)
- self.assertRegexpMatches(stdout, r'Content-Type:\s+image/gif')
- _Check2()
-
- def test_content_type_override_default(self):
- """Tests overriding content type with the default value."""
- bucket_uri = self.CreateBucket()
- dsturi = suri(bucket_uri, 'foo')
-
- self.RunGsUtil(['-h', 'Content-Type:', 'cp',
- self._get_test_file('test.mp3'), dsturi])
-
- # Use @Retry as hedge against bucket listing eventual consistency.
- @Retry(AssertionError, tries=3, timeout_secs=1)
- def _Check1():
- stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True)
- self.assertRegexpMatches(stdout,
- r'Content-Type:\s+application/octet-stream')
- _Check1()
-
- self.RunGsUtil(['-h', 'Content-Type:', 'cp',
- self._get_test_file('test.gif'), dsturi])
-
- # Use @Retry as hedge against bucket listing eventual consistency.
- @Retry(AssertionError, tries=3, timeout_secs=1)
- def _Check2():
- stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True)
- self.assertRegexpMatches(stdout,
- r'Content-Type:\s+application/octet-stream')
- _Check2()
-
- def test_content_type_override(self):
- """Tests overriding content type with a value."""
- bucket_uri = self.CreateBucket()
- dsturi = suri(bucket_uri, 'foo')
-
- self.RunGsUtil(['-h', 'Content-Type:text/plain', 'cp',
- self._get_test_file('test.mp3'), dsturi])
-
- # Use @Retry as hedge against bucket listing eventual consistency.
- @Retry(AssertionError, tries=3, timeout_secs=1)
- def _Check1():
- stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True)
- self.assertRegexpMatches(stdout, r'Content-Type:\s+text/plain')
- _Check1()
-
- self.RunGsUtil(['-h', 'Content-Type:text/plain', 'cp',
- self._get_test_file('test.gif'), dsturi])
-
- # Use @Retry as hedge against bucket listing eventual consistency.
- @Retry(AssertionError, tries=3, timeout_secs=1)
- def _Check2():
- stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True)
- self.assertRegexpMatches(stdout, r'Content-Type:\s+text/plain')
- _Check2()
-
- @unittest.skipIf(IS_WINDOWS, 'magicfile is not available on Windows.')
- @PerformsFileToObjectUpload
- def test_magicfile_override(self):
- """Tests content type override with magicfile value."""
- bucket_uri = self.CreateBucket()
- dsturi = suri(bucket_uri, 'foo')
- fpath = self.CreateTempFile(contents='foo/bar\n')
- self.RunGsUtil(['cp', fpath, dsturi])
-
- # Use @Retry as hedge against bucket listing eventual consistency.
- @Retry(AssertionError, tries=3, timeout_secs=1)
- def _Check1():
- stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True)
- use_magicfile = boto.config.getbool('GSUtil', 'use_magicfile', False)
- content_type = ('text/plain' if use_magicfile
- else 'application/octet-stream')
- self.assertRegexpMatches(stdout, r'Content-Type:\s+%s' % content_type)
- _Check1()
-
- @PerformsFileToObjectUpload
- def test_content_type_mismatches(self):
- """Tests overriding content type when it does not match the file type."""
- bucket_uri = self.CreateBucket()
- dsturi = suri(bucket_uri, 'foo')
- fpath = self.CreateTempFile(contents='foo/bar\n')
-
- self.RunGsUtil(['-h', 'Content-Type:image/gif', 'cp',
- self._get_test_file('test.mp3'), dsturi])
-
- # Use @Retry as hedge against bucket listing eventual consistency.
- @Retry(AssertionError, tries=3, timeout_secs=1)
- def _Check1():
- stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True)
- self.assertRegexpMatches(stdout, r'Content-Type:\s+image/gif')
- _Check1()
-
- self.RunGsUtil(['-h', 'Content-Type:image/gif', 'cp',
- self._get_test_file('test.gif'), dsturi])
-
- # Use @Retry as hedge against bucket listing eventual consistency.
- @Retry(AssertionError, tries=3, timeout_secs=1)
- def _Check2():
- stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True)
- self.assertRegexpMatches(stdout, r'Content-Type:\s+image/gif')
- _Check2()
-
- self.RunGsUtil(['-h', 'Content-Type:image/gif', 'cp', fpath, dsturi])
-
- # Use @Retry as hedge against bucket listing eventual consistency.
- @Retry(AssertionError, tries=3, timeout_secs=1)
- def _Check3():
- stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True)
- self.assertRegexpMatches(stdout, r'Content-Type:\s+image/gif')
- _Check3()
-
- @PerformsFileToObjectUpload
- def test_content_type_header_case_insensitive(self):
- """Tests that content type header is treated with case insensitivity."""
- bucket_uri = self.CreateBucket()
- dsturi = suri(bucket_uri, 'foo')
- fpath = self._get_test_file('test.gif')
-
- self.RunGsUtil(['-h', 'content-Type:text/plain', 'cp',
- fpath, dsturi])
-
- # Use @Retry as hedge against bucket listing eventual consistency.
- @Retry(AssertionError, tries=3, timeout_secs=1)
- def _Check1():
- stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True)
- self.assertRegexpMatches(stdout, r'Content-Type:\s+text/plain')
- self.assertNotRegexpMatches(stdout, r'image/gif')
- _Check1()
-
- self.RunGsUtil(['-h', 'CONTENT-TYPE:image/gif',
- '-h', 'content-type:image/gif',
- 'cp', fpath, dsturi])
-
- # Use @Retry as hedge against bucket listing eventual consistency.
- @Retry(AssertionError, tries=3, timeout_secs=1)
- def _Check2():
- stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True)
- self.assertRegexpMatches(stdout, r'Content-Type:\s+image/gif')
- self.assertNotRegexpMatches(stdout, r'image/gif,\s*image/gif')
- _Check2()
-
- @PerformsFileToObjectUpload
- def test_other_headers(self):
- """Tests that non-content-type headers are applied successfully on copy."""
- bucket_uri = self.CreateBucket()
- dst_uri = suri(bucket_uri, 'foo')
- fpath = self._get_test_file('test.gif')
-
- self.RunGsUtil(['-h', 'Cache-Control:public,max-age=12',
- '-h', 'x-%s-meta-1:abcd' % self.provider_custom_meta, 'cp',
- fpath, dst_uri])
-
- stdout = self.RunGsUtil(['ls', '-L', dst_uri], return_stdout=True)
- self.assertRegexpMatches(stdout, r'Cache-Control\s*:\s*public,max-age=12')
- self.assertRegexpMatches(stdout, r'Metadata:\s*1:\s*abcd')
-
- dst_uri2 = suri(bucket_uri, 'bar')
- self.RunGsUtil(['cp', dst_uri, dst_uri2])
- # Ensure metadata was preserved across copy.
- stdout = self.RunGsUtil(['ls', '-L', dst_uri2], return_stdout=True)
- self.assertRegexpMatches(stdout, r'Cache-Control\s*:\s*public,max-age=12')
- self.assertRegexpMatches(stdout, r'Metadata:\s*1:\s*abcd')
-
- @PerformsFileToObjectUpload
- def test_versioning(self):
- """Tests copy with versioning."""
- bucket_uri = self.CreateVersionedBucket()
- k1_uri = self.CreateObject(bucket_uri=bucket_uri, contents='data2')
- k2_uri = self.CreateObject(bucket_uri=bucket_uri, contents='data1')
- g1 = urigen(k2_uri)
- self.RunGsUtil(['cp', suri(k1_uri), suri(k2_uri)])
- k2_uri = bucket_uri.clone_replace_name(k2_uri.object_name)
- k2_uri = bucket_uri.clone_replace_key(k2_uri.get_key())
- g2 = urigen(k2_uri)
- k2_uri.set_contents_from_string('data3')
- g3 = urigen(k2_uri)
-
- fpath = self.CreateTempFile()
- # Check to make sure current version is data3.
- self.RunGsUtil(['cp', k2_uri.versionless_uri, fpath])
- with open(fpath, 'r') as f:
- self.assertEqual(f.read(), 'data3')
-
- # Check contents of all three versions
- self.RunGsUtil(['cp', '%s#%s' % (k2_uri.versionless_uri, g1), fpath])
- with open(fpath, 'r') as f:
- self.assertEqual(f.read(), 'data1')
- self.RunGsUtil(['cp', '%s#%s' % (k2_uri.versionless_uri, g2), fpath])
- with open(fpath, 'r') as f:
- self.assertEqual(f.read(), 'data2')
- self.RunGsUtil(['cp', '%s#%s' % (k2_uri.versionless_uri, g3), fpath])
- with open(fpath, 'r') as f:
- self.assertEqual(f.read(), 'data3')
-
- # Copy first version to current and verify.
- self.RunGsUtil(['cp', '%s#%s' % (k2_uri.versionless_uri, g1),
- k2_uri.versionless_uri])
- self.RunGsUtil(['cp', k2_uri.versionless_uri, fpath])
- with open(fpath, 'r') as f:
- self.assertEqual(f.read(), 'data1')
-
- # Attempt to specify a version-specific URI for destination.
- stderr = self.RunGsUtil(['cp', fpath, k2_uri.uri], return_stderr=True,
- expected_status=1)
- self.assertIn('cannot be the destination for gsutil cp', stderr)
-
- @SkipForS3('S3 lists versioned objects in reverse timestamp order.')
- def test_recursive_copying_versioned_bucket(self):
- """Tests that cp -R with versioned buckets copies all versions in order."""
- bucket1_uri = self.CreateVersionedBucket()
- bucket2_uri = self.CreateVersionedBucket()
-
- # Write two versions of an object to the bucket1.
- self.CreateObject(bucket_uri=bucket1_uri, object_name='k', contents='data0')
- self.CreateObject(bucket_uri=bucket1_uri, object_name='k',
- contents='longer_data1')
-
- self.AssertNObjectsInBucket(bucket1_uri, 2, versioned=True)
- self.AssertNObjectsInBucket(bucket2_uri, 0, versioned=True)
-
- # Recursively copy to second versioned bucket.
- self.RunGsUtil(['cp', '-R', suri(bucket1_uri, '*'), suri(bucket2_uri)])
-
- # Use @Retry as hedge against bucket listing eventual consistency.
- @Retry(AssertionError, tries=3, timeout_secs=1)
- def _Check2():
- """Validates the results of the cp -R."""
- listing1 = self.RunGsUtil(['ls', '-la', suri(bucket1_uri)],
- return_stdout=True).split('\n')
- listing2 = self.RunGsUtil(['ls', '-la', suri(bucket2_uri)],
- return_stdout=True).split('\n')
- # 2 lines of listing output, 1 summary line, 1 empty line from \n split.
- self.assertEquals(len(listing1), 4)
- self.assertEquals(len(listing2), 4)
-
- # First object in each bucket should match in size and version-less name.
- size1, _, uri_str1, _ = listing1[0].split()
- self.assertEquals(size1, str(len('data0')))
- self.assertEquals(storage_uri(uri_str1).object_name, 'k')
- size2, _, uri_str2, _ = listing2[0].split()
- self.assertEquals(size2, str(len('data0')))
- self.assertEquals(storage_uri(uri_str2).object_name, 'k')
-
- # Similarly for second object in each bucket.
- size1, _, uri_str1, _ = listing1[1].split()
- self.assertEquals(size1, str(len('longer_data1')))
- self.assertEquals(storage_uri(uri_str1).object_name, 'k')
- size2, _, uri_str2, _ = listing2[1].split()
- self.assertEquals(size2, str(len('longer_data1')))
- self.assertEquals(storage_uri(uri_str2).object_name, 'k')
- _Check2()
-
- @PerformsFileToObjectUpload
- @SkipForS3('Preconditions not supported for S3.')
- def test_cp_generation_zero_match(self):
- """Tests that cp handles an object-not-exists precondition header."""
- bucket_uri = self.CreateBucket()
- fpath1 = self.CreateTempFile(contents='data1')
- # Match 0 means only write the object if it doesn't already exist.
- gen_match_header = 'x-goog-if-generation-match:0'
-
- # First copy should succeed.
- # TODO: This can fail (rarely) if the server returns a 5xx but actually
- # commits the bytes. If we add restarts on small uploads, handle this
- # case.
- self.RunGsUtil(['-h', gen_match_header, 'cp', fpath1, suri(bucket_uri)])
-
- # Second copy should fail with a precondition error.
- stderr = self.RunGsUtil(['-h', gen_match_header, 'cp', fpath1,
- suri(bucket_uri)],
- return_stderr=True, expected_status=1)
- self.assertIn('PreconditionException', stderr)
-
- @PerformsFileToObjectUpload
- @SkipForS3('Preconditions not supported for S3.')
- def test_cp_v_generation_match(self):
- """Tests that cp -v option handles the if-generation-match header."""
- bucket_uri = self.CreateVersionedBucket()
- k1_uri = self.CreateObject(bucket_uri=bucket_uri, contents='data1')
- g1 = k1_uri.generation
-
- tmpdir = self.CreateTempDir()
- fpath1 = self.CreateTempFile(tmpdir=tmpdir, contents='data2')
-
- gen_match_header = 'x-goog-if-generation-match:%s' % g1
- # First copy should succeed.
- self.RunGsUtil(['-h', gen_match_header, 'cp', fpath1, suri(k1_uri)])
-
- # Second copy should fail the precondition.
- stderr = self.RunGsUtil(['-h', gen_match_header, 'cp', fpath1,
- suri(k1_uri)],
- return_stderr=True, expected_status=1)
-
- self.assertIn('PreconditionException', stderr)
-
- # Specifiying a generation with -n should fail before the request hits the
- # server.
- stderr = self.RunGsUtil(['-h', gen_match_header, 'cp', '-n', fpath1,
- suri(k1_uri)],
- return_stderr=True, expected_status=1)
-
- self.assertIn('ArgumentException', stderr)
- self.assertIn('Specifying x-goog-if-generation-match is not supported '
- 'with cp -n', stderr)
-
- @PerformsFileToObjectUpload
- def test_cp_nv(self):
- """Tests that cp -nv works when skipping existing file."""
- bucket_uri = self.CreateVersionedBucket()
- k1_uri = self.CreateObject(bucket_uri=bucket_uri, contents='data1')
-
- tmpdir = self.CreateTempDir()
- fpath1 = self.CreateTempFile(tmpdir=tmpdir, contents='data2')
-
- # First copy should succeed.
- self.RunGsUtil(['cp', '-nv', fpath1, suri(k1_uri)])
-
- # Second copy should skip copying.
- stderr = self.RunGsUtil(['cp', '-nv', fpath1, suri(k1_uri)],
- return_stderr=True)
- self.assertIn('Skipping existing item:', stderr)
-
- @PerformsFileToObjectUpload
- @SkipForS3('S3 lists versioned objects in reverse timestamp order.')
- def test_cp_v_option(self):
- """"Tests that cp -v returns the created object's version-specific URI."""
- bucket_uri = self.CreateVersionedBucket()
- k1_uri = self.CreateObject(bucket_uri=bucket_uri, contents='data1')
- k2_uri = self.CreateObject(bucket_uri=bucket_uri, contents='data2')
-
- # Case 1: Upload file to object using one-shot PUT.
- tmpdir = self.CreateTempDir()
- fpath1 = self.CreateTempFile(tmpdir=tmpdir, contents='data1')
- self._run_cp_minus_v_test('-v', fpath1, k2_uri.uri)
-
- # Case 2: Upload file to object using resumable upload.
- size_threshold = ONE_KIB
- boto_config_for_test = ('GSUtil', 'resumable_threshold',
- str(size_threshold))
- with SetBotoConfigForTest([boto_config_for_test]):
- file_as_string = os.urandom(size_threshold)
- tmpdir = self.CreateTempDir()
- fpath1 = self.CreateTempFile(tmpdir=tmpdir, contents=file_as_string)
- self._run_cp_minus_v_test('-v', fpath1, k2_uri.uri)
-
- # Case 3: Upload stream to object.
- self._run_cp_minus_v_test('-v', '-', k2_uri.uri)
-
- # Case 4: Download object to file. For this case we just expect output of
- # gsutil cp -v to be the URI of the file.
- tmpdir = self.CreateTempDir()
- fpath1 = self.CreateTempFile(tmpdir=tmpdir)
- dst_uri = storage_uri(fpath1)
- stderr = self.RunGsUtil(['cp', '-v', suri(k1_uri), suri(dst_uri)],
- return_stderr=True)
- self.assertIn('Created: %s' % dst_uri.uri, stderr.split('\n')[-2])
-
- # Case 5: Daisy-chain from object to object.
- self._run_cp_minus_v_test('-Dv', k1_uri.uri, k2_uri.uri)
-
- # Case 6: Copy object to object in-the-cloud.
- self._run_cp_minus_v_test('-v', k1_uri.uri, k2_uri.uri)
-
- def _run_cp_minus_v_test(self, opt, src_str, dst_str):
- """Runs cp -v with the options and validates the results."""
- stderr = self.RunGsUtil(['cp', opt, src_str, dst_str], return_stderr=True)
- match = re.search(r'Created: (.*)\n', stderr)
- self.assertIsNotNone(match)
- created_uri = match.group(1)
-
- # Use @Retry as hedge against bucket listing eventual consistency.
- @Retry(AssertionError, tries=3, timeout_secs=1)
- def _Check1():
- stdout = self.RunGsUtil(['ls', '-a', dst_str], return_stdout=True)
- lines = stdout.split('\n')
- # Final (most recent) object should match the "Created:" URI. This is
- # in second-to-last line (last line is '\n').
- self.assertGreater(len(lines), 2)
- self.assertEqual(created_uri, lines[-2])
- _Check1()
-
- @PerformsFileToObjectUpload
- def test_stdin_args(self):
- """Tests cp with the -I option."""
- tmpdir = self.CreateTempDir()
- fpath1 = self.CreateTempFile(tmpdir=tmpdir, contents='data1')
- fpath2 = self.CreateTempFile(tmpdir=tmpdir, contents='data2')
- bucket_uri = self.CreateBucket()
- self.RunGsUtil(['cp', '-I', suri(bucket_uri)],
- stdin='\n'.join((fpath1, fpath2)))
-
- # Use @Retry as hedge against bucket listing eventual consistency.
- @Retry(AssertionError, tries=3, timeout_secs=1)
- def _Check1():
- stdout = self.RunGsUtil(['ls', suri(bucket_uri)], return_stdout=True)
- self.assertIn(os.path.basename(fpath1), stdout)
- self.assertIn(os.path.basename(fpath2), stdout)
- self.assertNumLines(stdout, 2)
- _Check1()
-
- def test_cross_storage_class_cloud_cp(self):
- bucket1_uri = self.CreateBucket(storage_class='STANDARD')
- bucket2_uri = self.CreateBucket(
- storage_class='DURABLE_REDUCED_AVAILABILITY')
- key_uri = self.CreateObject(bucket_uri=bucket1_uri, contents='foo')
- # Server now allows copy-in-the-cloud across storage classes.
- self.RunGsUtil(['cp', suri(key_uri), suri(bucket2_uri)])
-
- @unittest.skipUnless(HAS_S3_CREDS, 'Test requires both S3 and GS credentials')
- def test_cross_provider_cp(self):
- s3_bucket = self.CreateBucket(provider='s3')
- gs_bucket = self.CreateBucket(provider='gs')
- s3_key = self.CreateObject(bucket_uri=s3_bucket, contents='foo')
- gs_key = self.CreateObject(bucket_uri=gs_bucket, contents='bar')
- self.RunGsUtil(['cp', suri(s3_key), suri(gs_bucket)])
- self.RunGsUtil(['cp', suri(gs_key), suri(s3_bucket)])
-
- @unittest.skipUnless(HAS_S3_CREDS, 'Test requires both S3 and GS credentials')
- @unittest.skip('This test performs a large copy but remains here for '
- 'debugging purposes.')
- def test_cross_provider_large_cp(self):
- s3_bucket = self.CreateBucket(provider='s3')
- gs_bucket = self.CreateBucket(provider='gs')
- s3_key = self.CreateObject(bucket_uri=s3_bucket, contents='f'*1024*1024)
- gs_key = self.CreateObject(bucket_uri=gs_bucket, contents='b'*1024*1024)
- self.RunGsUtil(['cp', suri(s3_key), suri(gs_bucket)])
- self.RunGsUtil(['cp', suri(gs_key), suri(s3_bucket)])
- with SetBotoConfigForTest([
- ('GSUtil', 'resumable_threshold', str(ONE_KIB)),
- ('GSUtil', 'json_resumable_chunk_size', str(ONE_KIB * 256))]):
- # Ensure copy also works across json upload chunk boundaries.
- self.RunGsUtil(['cp', suri(s3_key), suri(gs_bucket)])
-
- @unittest.skip('This test is slow due to creating many objects, '
- 'but remains here for debugging purposes.')
- def test_daisy_chain_cp_file_sizes(self):
- """Ensure daisy chain cp works with a wide of file sizes."""
- bucket_uri = self.CreateBucket()
- bucket2_uri = self.CreateBucket()
- exponent_cap = 28 # Up to 256 MiB in size.
- for i in range(exponent_cap):
- one_byte_smaller = 2**i - 1
- normal = 2**i
- one_byte_larger = 2**i + 1
- self.CreateObject(bucket_uri=bucket_uri, contents='a'*one_byte_smaller)
- self.CreateObject(bucket_uri=bucket_uri, contents='b'*normal)
- self.CreateObject(bucket_uri=bucket_uri, contents='c'*one_byte_larger)
-
- self.AssertNObjectsInBucket(bucket_uri, exponent_cap*3)
- self.RunGsUtil(['-m', 'cp', '-D', suri(bucket_uri, '**'),
- suri(bucket2_uri)])
-
- self.AssertNObjectsInBucket(bucket2_uri, exponent_cap*3)
-
- def test_daisy_chain_cp(self):
- """Tests cp with the -D option."""
- bucket1_uri = self.CreateBucket(storage_class='STANDARD')
- bucket2_uri = self.CreateBucket(
- storage_class='DURABLE_REDUCED_AVAILABILITY')
- key_uri = self.CreateObject(bucket_uri=bucket1_uri, contents='foo')
- # Set some headers on source object so we can verify that headers are
- # presereved by daisy-chain copy.
- self.RunGsUtil(['setmeta', '-h', 'Cache-Control:public,max-age=12',
- '-h', 'Content-Type:image/gif',
- '-h', 'x-%s-meta-1:abcd' % self.provider_custom_meta,
- suri(key_uri)])
- # Set public-read (non-default) ACL so we can verify that cp -D -p works.
- self.RunGsUtil(['acl', 'set', 'public-read', suri(key_uri)])
- acl_json = self.RunGsUtil(['acl', 'get', suri(key_uri)], return_stdout=True)
- # Perform daisy-chain copy and verify that source object headers and ACL
- # were preserved. Also specify -n option to test that gsutil correctly
- # removes the x-goog-if-generation-match:0 header that was set at uploading
- # time when updating the ACL.
- stderr = self.RunGsUtil(['cp', '-Dpn', suri(key_uri), suri(bucket2_uri)],
- return_stderr=True)
- self.assertNotIn('Copy-in-the-cloud disallowed', stderr)
-
- @Retry(AssertionError, tries=3, timeout_secs=1)
- def _Check():
- uri = suri(bucket2_uri, key_uri.object_name)
- stdout = self.RunGsUtil(['ls', '-L', uri], return_stdout=True)
- self.assertRegexpMatches(stdout, r'Cache-Control:\s+public,max-age=12')
- self.assertRegexpMatches(stdout, r'Content-Type:\s+image/gif')
- self.assertRegexpMatches(stdout, r'Metadata:\s+1:\s+abcd')
- new_acl_json = self.RunGsUtil(['acl', 'get', uri], return_stdout=True)
- self.assertEqual(acl_json, new_acl_json)
- _Check()
-
- def test_daisy_chain_cp_download_failure(self):
- """Tests cp with the -D option when the download thread dies."""
- bucket1_uri = self.CreateBucket()
- bucket2_uri = self.CreateBucket()
- key_uri = self.CreateObject(bucket_uri=bucket1_uri,
- contents='a' * self.halt_size)
- boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB))
- test_callback_file = self.CreateTempFile(
- contents=pickle.dumps(_HaltingCopyCallbackHandler(False, 5)))
- with SetBotoConfigForTest([boto_config_for_test]):
- stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file,
- '-D', suri(key_uri), suri(bucket2_uri)],
- expected_status=1, return_stderr=True)
- # Should have two exception traces; one from the download thread and
- # one from the upload thread.
- self.assertEqual(stderr.count(
- 'ResumableDownloadException: Artifically halting download'), 2)
-
- def test_canned_acl_cp(self):
- """Tests copying with a canned ACL."""
- bucket1_uri = self.CreateBucket()
- bucket2_uri = self.CreateBucket()
- key_uri = self.CreateObject(bucket_uri=bucket1_uri, contents='foo')
- self.RunGsUtil(['cp', '-a', 'public-read', suri(key_uri),
- suri(bucket2_uri)])
- # Set public-read on the original key after the copy so we can compare
- # the ACLs.
- self.RunGsUtil(['acl', 'set', 'public-read', suri(key_uri)])
- public_read_acl = self.RunGsUtil(['acl', 'get', suri(key_uri)],
- return_stdout=True)
-
- @Retry(AssertionError, tries=3, timeout_secs=1)
- def _Check():
- uri = suri(bucket2_uri, key_uri.object_name)
- new_acl_json = self.RunGsUtil(['acl', 'get', uri], return_stdout=True)
- self.assertEqual(public_read_acl, new_acl_json)
- _Check()
-
- @PerformsFileToObjectUpload
- def test_canned_acl_upload(self):
- """Tests uploading a file with a canned ACL."""
- bucket1_uri = self.CreateBucket()
- key_uri = self.CreateObject(bucket_uri=bucket1_uri, contents='foo')
- # Set public-read on the object so we can compare the ACLs.
- self.RunGsUtil(['acl', 'set', 'public-read', suri(key_uri)])
- public_read_acl = self.RunGsUtil(['acl', 'get', suri(key_uri)],
- return_stdout=True)
-
- file_name = 'bar'
- fpath = self.CreateTempFile(file_name=file_name, contents='foo')
- self.RunGsUtil(['cp', '-a', 'public-read', fpath, suri(bucket1_uri)])
- new_acl_json = self.RunGsUtil(['acl', 'get', suri(bucket1_uri, file_name)],
- return_stdout=True)
- self.assertEqual(public_read_acl, new_acl_json)
-
- resumable_size = ONE_KIB
- boto_config_for_test = ('GSUtil', 'resumable_threshold',
- str(resumable_size))
- with SetBotoConfigForTest([boto_config_for_test]):
- resumable_file_name = 'resumable_bar'
- resumable_contents = os.urandom(resumable_size)
- resumable_fpath = self.CreateTempFile(
- file_name=resumable_file_name, contents=resumable_contents)
- self.RunGsUtil(['cp', '-a', 'public-read', resumable_fpath,
- suri(bucket1_uri)])
- new_resumable_acl_json = self.RunGsUtil(
- ['acl', 'get', suri(bucket1_uri, resumable_file_name)],
- return_stdout=True)
- self.assertEqual(public_read_acl, new_resumable_acl_json)
-
- def test_cp_key_to_local_stream(self):
- bucket_uri = self.CreateBucket()
- contents = 'foo'
- key_uri = self.CreateObject(bucket_uri=bucket_uri, contents=contents)
- stdout = self.RunGsUtil(['cp', suri(key_uri), '-'], return_stdout=True)
- self.assertIn(contents, stdout)
-
- def test_cp_local_file_to_local_stream(self):
- contents = 'content'
- fpath = self.CreateTempFile(contents=contents)
- stdout = self.RunGsUtil(['cp', fpath, '-'], return_stdout=True)
- self.assertIn(contents, stdout)
-
- @PerformsFileToObjectUpload
- def test_cp_zero_byte_file(self):
- dst_bucket_uri = self.CreateBucket()
- src_dir = self.CreateTempDir()
- fpath = os.path.join(src_dir, 'zero_byte')
- with open(fpath, 'w') as unused_out_file:
- pass # Write a zero byte file
- self.RunGsUtil(['cp', fpath, suri(dst_bucket_uri)])
-
- @Retry(AssertionError, tries=3, timeout_secs=1)
- def _Check1():
- stdout = self.RunGsUtil(['ls', suri(dst_bucket_uri)], return_stdout=True)
- self.assertIn(os.path.basename(fpath), stdout)
- _Check1()
-
- download_path = os.path.join(src_dir, 'zero_byte_download')
- self.RunGsUtil(['cp', suri(dst_bucket_uri, 'zero_byte'), download_path])
- self.assertTrue(os.stat(download_path))
-
- def test_copy_bucket_to_bucket(self):
- """Tests that recursively copying from bucket to bucket.
-
- This should produce identically named objects (and not, in particular,
- destination objects named by the version-specific URI from source objects).
- """
- src_bucket_uri = self.CreateVersionedBucket()
- dst_bucket_uri = self.CreateVersionedBucket()
- self.CreateObject(bucket_uri=src_bucket_uri, object_name='obj0',
- contents='abc')
- self.CreateObject(bucket_uri=src_bucket_uri, object_name='obj1',
- contents='def')
-
- # Use @Retry as hedge against bucket listing eventual consistency.
- @Retry(AssertionError, tries=3, timeout_secs=1)
- def _CopyAndCheck():
- self.RunGsUtil(['cp', '-R', suri(src_bucket_uri),
- suri(dst_bucket_uri)])
- stdout = self.RunGsUtil(['ls', '-R', dst_bucket_uri.uri],
- return_stdout=True)
- self.assertIn('%s%s/obj0\n' % (dst_bucket_uri,
- src_bucket_uri.bucket_name), stdout)
- self.assertIn('%s%s/obj1\n' % (dst_bucket_uri,
- src_bucket_uri.bucket_name), stdout)
- _CopyAndCheck()
-
- def test_copy_bucket_to_dir(self):
- """Tests recursively copying from bucket to a directory.
-
- This should produce identically named objects (and not, in particular,
- destination objects named by the version- specific URI from source objects).
- """
- src_bucket_uri = self.CreateBucket()
- dst_dir = self.CreateTempDir()
- self.CreateObject(bucket_uri=src_bucket_uri, object_name='obj0',
- contents='abc')
- self.CreateObject(bucket_uri=src_bucket_uri, object_name='obj1',
- contents='def')
-
- # Use @Retry as hedge against bucket listing eventual consistency.
- @Retry(AssertionError, tries=3, timeout_secs=1)
- def _CopyAndCheck():
- """Copies the bucket recursively and validates the results."""
- self.RunGsUtil(['cp', '-R', suri(src_bucket_uri), dst_dir])
- dir_list = []
- for dirname, _, filenames in os.walk(dst_dir):
- for filename in filenames:
- dir_list.append(os.path.join(dirname, filename))
- dir_list = sorted(dir_list)
- self.assertEqual(len(dir_list), 2)
- self.assertEqual(os.path.join(dst_dir, src_bucket_uri.bucket_name,
- 'obj0'), dir_list[0])
- self.assertEqual(os.path.join(dst_dir, src_bucket_uri.bucket_name,
- 'obj1'), dir_list[1])
- _CopyAndCheck()
-
- def test_recursive_download_with_leftover_dir_placeholder(self):
- """Tests that we correctly handle leftover dir placeholders."""
- src_bucket_uri = self.CreateBucket()
- dst_dir = self.CreateTempDir()
- self.CreateObject(bucket_uri=src_bucket_uri, object_name='obj0',
- contents='abc')
- self.CreateObject(bucket_uri=src_bucket_uri, object_name='obj1',
- contents='def')
-
- # Create a placeholder like what can be left over by web GUI tools.
- key_uri = src_bucket_uri.clone_replace_name('/')
- key_uri.set_contents_from_string('')
- self.AssertNObjectsInBucket(src_bucket_uri, 3)
-
- stderr = self.RunGsUtil(['cp', '-R', suri(src_bucket_uri), dst_dir],
- return_stderr=True)
- self.assertIn('Skipping cloud sub-directory placeholder object', stderr)
- dir_list = []
- for dirname, _, filenames in os.walk(dst_dir):
- for filename in filenames:
- dir_list.append(os.path.join(dirname, filename))
- dir_list = sorted(dir_list)
- self.assertEqual(len(dir_list), 2)
- self.assertEqual(os.path.join(dst_dir, src_bucket_uri.bucket_name,
- 'obj0'), dir_list[0])
- self.assertEqual(os.path.join(dst_dir, src_bucket_uri.bucket_name,
- 'obj1'), dir_list[1])
-
- def test_copy_quiet(self):
- bucket_uri = self.CreateBucket()
- key_uri = self.CreateObject(bucket_uri=bucket_uri, contents='foo')
- stderr = self.RunGsUtil(['-q', 'cp', suri(key_uri),
- suri(bucket_uri.clone_replace_name('o2'))],
- return_stderr=True)
- self.assertEqual(stderr.count('Copying '), 0)
-
- def test_cp_md5_match(self):
- """Tests that the uploaded object has the expected MD5.
-
- Note that while this does perform a file to object upload, MD5's are
- not supported for composite objects so we don't use the decorator in this
- case.
- """
- bucket_uri = self.CreateBucket()
- fpath = self.CreateTempFile(contents='bar')
- with open(fpath, 'r') as f_in:
- file_md5 = base64.encodestring(binascii.unhexlify(
- CalculateMd5FromContents(f_in))).rstrip('\n')
- self.RunGsUtil(['cp', fpath, suri(bucket_uri)])
-
- # Use @Retry as hedge against bucket listing eventual consistency.
- @Retry(AssertionError, tries=3, timeout_secs=1)
- def _Check1():
- stdout = self.RunGsUtil(['ls', '-L', suri(bucket_uri)],
- return_stdout=True)
- self.assertRegexpMatches(stdout,
- r'Hash\s+\(md5\):\s+%s' % re.escape(file_md5))
- _Check1()
-
- @unittest.skipIf(IS_WINDOWS,
- 'Unicode handling on Windows requires mods to site-packages')
- @PerformsFileToObjectUpload
- def test_cp_manifest_upload_unicode(self):
- return self._ManifestUpload('foo-unicöde', 'bar-unicöde',
- 'manifest-unicöde')
-
- @PerformsFileToObjectUpload
- def test_cp_manifest_upload(self):
- """Tests uploading with a mnifest file."""
- return self._ManifestUpload('foo', 'bar', 'manifest')
-
- def _ManifestUpload(self, file_name, object_name, manifest_name):
- """Tests uploading with a manifest file."""
- bucket_uri = self.CreateBucket()
- dsturi = suri(bucket_uri, object_name)
-
- fpath = self.CreateTempFile(file_name=file_name, contents='bar')
- logpath = self.CreateTempFile(file_name=manifest_name, contents='')
- # Ensure the file is empty.
- open(logpath, 'w').close()
- self.RunGsUtil(['cp', '-L', logpath, fpath, dsturi])
- with open(logpath, 'r') as f:
- lines = f.readlines()
- self.assertEqual(len(lines), 2)
-
- expected_headers = ['Source', 'Destination', 'Start', 'End', 'Md5',
- 'UploadId', 'Source Size', 'Bytes Transferred',
- 'Result', 'Description']
- self.assertEqual(expected_headers, lines[0].strip().split(','))
- results = lines[1].strip().split(',')
- self.assertEqual(results[0][:7], 'file://') # source
- self.assertEqual(results[1][:5], '%s://' %
- self.default_provider) # destination
- date_format = '%Y-%m-%dT%H:%M:%S.%fZ'
- start_date = datetime.datetime.strptime(results[2], date_format)
- end_date = datetime.datetime.strptime(results[3], date_format)
- self.assertEqual(end_date > start_date, True)
- if self.RunGsUtil == testcase.GsUtilIntegrationTestCase.RunGsUtil:
- # Check that we didn't do automatic parallel uploads - compose doesn't
- # calculate the MD5 hash. Since RunGsUtil is overriden in
- # TestCpParallelUploads to force parallel uploads, we can check which
- # method was used.
- self.assertEqual(results[4], 'rL0Y20zC+Fzt72VPzMSk2A==') # md5
- self.assertEqual(int(results[6]), 3) # Source Size
- self.assertEqual(int(results[7]), 3) # Bytes Transferred
- self.assertEqual(results[8], 'OK') # Result
-
- @PerformsFileToObjectUpload
- def test_cp_manifest_download(self):
- """Tests downloading with a manifest file."""
- key_uri = self.CreateObject(contents='foo')
- fpath = self.CreateTempFile(contents='')
- logpath = self.CreateTempFile(contents='')
- # Ensure the file is empty.
- open(logpath, 'w').close()
- self.RunGsUtil(['cp', '-L', logpath, suri(key_uri), fpath],
- return_stdout=True)
- with open(logpath, 'r') as f:
- lines = f.readlines()
- self.assertEqual(len(lines), 2)
-
- expected_headers = ['Source', 'Destination', 'Start', 'End', 'Md5',
- 'UploadId', 'Source Size', 'Bytes Transferred',
- 'Result', 'Description']
- self.assertEqual(expected_headers, lines[0].strip().split(','))
- results = lines[1].strip().split(',')
- self.assertEqual(results[0][:5], '%s://' %
- self.default_provider) # source
- self.assertEqual(results[1][:7], 'file://') # destination
- date_format = '%Y-%m-%dT%H:%M:%S.%fZ'
- start_date = datetime.datetime.strptime(results[2], date_format)
- end_date = datetime.datetime.strptime(results[3], date_format)
- self.assertEqual(end_date > start_date, True)
- self.assertEqual(results[4], 'rL0Y20zC+Fzt72VPzMSk2A==') # md5
- self.assertEqual(int(results[6]), 3) # Source Size
- # Bytes transferred might be more than 3 if the file was gzipped, since
- # the minimum gzip header is 10 bytes.
- self.assertGreaterEqual(int(results[7]), 3) # Bytes Transferred
- self.assertEqual(results[8], 'OK') # Result
-
- @PerformsFileToObjectUpload
- def test_copy_unicode_non_ascii_filename(self):
- key_uri = self.CreateObject(contents='foo')
- # Make file large enough to cause a resumable upload (which hashes filename
- # to construct tracker filename).
- fpath = self.CreateTempFile(file_name=u'Аудиоархив',
- contents='x' * 3 * 1024 * 1024)
- fpath_bytes = fpath.encode(UTF8)
- stderr = self.RunGsUtil(['cp', fpath_bytes, suri(key_uri)],
- return_stderr=True)
- self.assertIn('Copying file:', stderr)
-
- # Note: We originally one time implemented a test
- # (test_copy_invalid_unicode_filename) that invalid unicode filenames were
- # skipped, but it turns out os.walk() on MacOS doesn't have problems with
- # such files (so, failed that test). Given that, we decided to remove the
- # test.
-
- def test_gzip_upload_and_download(self):
- bucket_uri = self.CreateBucket()
- contents = 'x' * 10000
- tmpdir = self.CreateTempDir()
- self.CreateTempFile(file_name='test.html', tmpdir=tmpdir, contents=contents)
- self.CreateTempFile(file_name='test.js', tmpdir=tmpdir, contents=contents)
- self.CreateTempFile(file_name='test.txt', tmpdir=tmpdir, contents=contents)
- # Test that copying specifying only 2 of the 3 prefixes gzips the correct
- # files, and test that including whitespace in the extension list works.
- self.RunGsUtil(['cp', '-z', 'js, html',
- os.path.join(tmpdir, 'test.*'), suri(bucket_uri)])
- self.AssertNObjectsInBucket(bucket_uri, 3)
- uri1 = suri(bucket_uri, 'test.html')
- uri2 = suri(bucket_uri, 'test.js')
- uri3 = suri(bucket_uri, 'test.txt')
- stdout = self.RunGsUtil(['stat', uri1], return_stdout=True)
- self.assertRegexpMatches(stdout, r'Content-Encoding:\s+gzip')
- stdout = self.RunGsUtil(['stat', uri2], return_stdout=True)
- self.assertRegexpMatches(stdout, r'Content-Encoding:\s+gzip')
- stdout = self.RunGsUtil(['stat', uri3], return_stdout=True)
- self.assertNotRegexpMatches(stdout, r'Content-Encoding:\s+gzip')
- fpath4 = self.CreateTempFile()
- for uri in (uri1, uri2, uri3):
- self.RunGsUtil(['cp', uri, suri(fpath4)])
- with open(fpath4, 'r') as f:
- self.assertEqual(f.read(), contents)
-
- def test_upload_with_subdir_and_unexpanded_wildcard(self):
- fpath1 = self.CreateTempFile(file_name=('tmp', 'x', 'y', 'z'))
- bucket_uri = self.CreateBucket()
- wildcard_uri = '%s*' % fpath1[:-5]
- stderr = self.RunGsUtil(['cp', '-R', wildcard_uri, suri(bucket_uri)],
- return_stderr=True)
- self.assertIn('Copying file:', stderr)
- self.AssertNObjectsInBucket(bucket_uri, 1)
-
- def test_cp_object_ending_with_slash(self):
- """Tests that cp works with object names ending with slash."""
- tmpdir = self.CreateTempDir()
- bucket_uri = self.CreateBucket()
- self.CreateObject(bucket_uri=bucket_uri,
- object_name='abc/',
- contents='dir')
- self.CreateObject(bucket_uri=bucket_uri,
- object_name='abc/def',
- contents='def')
- self.AssertNObjectsInBucket(bucket_uri, 2)
- self.RunGsUtil(['cp', '-R', suri(bucket_uri), tmpdir])
- # Check that files in the subdir got copied even though subdir object
- # download was skipped.
- with open(os.path.join(tmpdir, bucket_uri.bucket_name, 'abc', 'def')) as f:
- self.assertEquals('def', '\n'.join(f.readlines()))
-
- def test_cp_without_read_access(self):
- """Tests that cp fails without read access to the object."""
- # TODO: With 401's triggering retries in apitools, this test will take
- # a long time. Ideally, make apitools accept a num_retries config for this
- # until we stop retrying the 401's.
- bucket_uri = self.CreateBucket()
- object_uri = self.CreateObject(bucket_uri=bucket_uri, contents='foo')
-
- # Use @Retry as hedge against bucket listing eventual consistency.
- self.AssertNObjectsInBucket(bucket_uri, 1)
-
- with self.SetAnonymousBotoCreds():
- stderr = self.RunGsUtil(['cp', suri(object_uri), 'foo'],
- return_stderr=True, expected_status=1)
- self.assertIn('AccessDenied', stderr)
-
- @unittest.skipIf(IS_WINDOWS, 'os.symlink() is not available on Windows.')
- def test_cp_minus_e(self):
- fpath_dir = self.CreateTempDir()
- fpath1 = self.CreateTempFile(tmpdir=fpath_dir)
- fpath2 = os.path.join(fpath_dir, 'cp_minus_e')
- bucket_uri = self.CreateBucket()
- os.symlink(fpath1, fpath2)
- stderr = self.RunGsUtil(
- ['cp', '-e', '%s%s*' % (fpath_dir, os.path.sep),
- suri(bucket_uri, 'files')],
- return_stderr=True)
- self.assertIn('Copying file', stderr)
- self.assertIn('Skipping symbolic link file', stderr)
-
- def test_cp_multithreaded_wildcard(self):
- """Tests that cp -m works with a wildcard."""
- num_test_files = 5
- tmp_dir = self.CreateTempDir(test_files=num_test_files)
- bucket_uri = self.CreateBucket()
- wildcard_uri = '%s%s*' % (tmp_dir, os.sep)
- self.RunGsUtil(['-m', 'cp', wildcard_uri, suri(bucket_uri)])
- self.AssertNObjectsInBucket(bucket_uri, num_test_files)
-
- def test_cp_duplicate_source_args(self):
- """Tests that cp -m works when a source argument is provided twice."""
- object_contents = 'edge'
- object_uri = self.CreateObject(object_name='foo', contents=object_contents)
- tmp_dir = self.CreateTempDir()
- self.RunGsUtil(['-m', 'cp', suri(object_uri), suri(object_uri), tmp_dir])
- with open(os.path.join(tmp_dir, 'foo'), 'r') as in_fp:
- contents = in_fp.read()
- # Contents should be not duplicated.
- self.assertEqual(contents, object_contents)
-
- @SkipForS3('No resumable upload support for S3.')
- def test_cp_resumable_upload_break(self):
- """Tests that an upload can be resumed after a connection break."""
- bucket_uri = self.CreateBucket()
- fpath = self.CreateTempFile(contents='a' * self.halt_size)
- boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB))
- test_callback_file = self.CreateTempFile(
- contents=pickle.dumps(_HaltingCopyCallbackHandler(True, 5)))
-
- with SetBotoConfigForTest([boto_config_for_test]):
- stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file,
- fpath, suri(bucket_uri)],
- expected_status=1, return_stderr=True)
- self.assertIn('Artifically halting upload', stderr)
- stderr = self.RunGsUtil(['cp', fpath, suri(bucket_uri)],
- return_stderr=True)
- self.assertIn('Resuming upload', stderr)
-
- @SkipForS3('No resumable upload support for S3.')
- def test_cp_resumable_upload_retry(self):
- """Tests that a resumable upload completes with one retry."""
- bucket_uri = self.CreateBucket()
- fpath = self.CreateTempFile(contents='a' * self.halt_size)
- # TODO: Raising an httplib or socket error blocks bucket teardown
- # in JSON for 60-120s on a multiprocessing lock acquire. Figure out why;
- # until then, raise an apitools retryable exception.
- if self.test_api == ApiSelector.XML:
- test_callback_file = self.CreateTempFile(
- contents=pickle.dumps(_ResumableUploadRetryHandler(
- 5, httplib.BadStatusLine, ('unused',))))
- else:
- test_callback_file = self.CreateTempFile(
- contents=pickle.dumps(_ResumableUploadRetryHandler(
- 5, apitools_exceptions.BadStatusCodeError,
- ('unused', 'unused', 'unused'))))
- boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB))
- with SetBotoConfigForTest([boto_config_for_test]):
- stderr = self.RunGsUtil(['-D', 'cp', '--testcallbackfile',
- test_callback_file, fpath, suri(bucket_uri)],
- return_stderr=1)
- if self.test_api == ApiSelector.XML:
- self.assertIn('Got retryable failure', stderr)
- else:
- self.assertIn('Retrying', stderr)
-
- @SkipForS3('No resumable upload support for S3.')
- def test_cp_resumable_streaming_upload_retry(self):
- """Tests that a streaming resumable upload completes with one retry."""
- if self.test_api == ApiSelector.XML:
- return unittest.skip('XML does not support resumable streaming uploads.')
- bucket_uri = self.CreateBucket()
-
- test_callback_file = self.CreateTempFile(
- contents=pickle.dumps(_ResumableUploadRetryHandler(
- 5, apitools_exceptions.BadStatusCodeError,
- ('unused', 'unused', 'unused'))))
- # Need to reduce the JSON chunk size since streaming uploads buffer a
- # full chunk.
- boto_configs_for_test = [('GSUtil', 'json_resumable_chunk_size',
- str(256 * ONE_KIB)),
- ('Boto', 'num_retries', '2')]
- with SetBotoConfigForTest(boto_configs_for_test):
- stderr = self.RunGsUtil(
- ['-D', 'cp', '--testcallbackfile', test_callback_file, '-',
- suri(bucket_uri, 'foo')],
- stdin='a' * 512 * ONE_KIB, return_stderr=1)
- self.assertIn('Retrying', stderr)
-
- @SkipForS3('No resumable upload support for S3.')
- def test_cp_resumable_upload(self):
- """Tests that a basic resumable upload completes successfully."""
- bucket_uri = self.CreateBucket()
- fpath = self.CreateTempFile(contents='a' * self.halt_size)
- boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB))
- with SetBotoConfigForTest([boto_config_for_test]):
- self.RunGsUtil(['cp', fpath, suri(bucket_uri)])
-
- @SkipForS3('No resumable upload support for S3.')
- def test_resumable_upload_break_leaves_tracker(self):
- """Tests that a tracker file is created with a resumable upload."""
- bucket_uri = self.CreateBucket()
- fpath = self.CreateTempFile(file_name='foo',
- contents='a' * self.halt_size)
- boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB))
- with SetBotoConfigForTest([boto_config_for_test]):
- tracker_filename = GetTrackerFilePath(
- StorageUrlFromString(suri(bucket_uri, 'foo')),
- TrackerFileType.UPLOAD, self.test_api)
- test_callback_file = self.CreateTempFile(
- contents=pickle.dumps(_HaltingCopyCallbackHandler(True, 5)))
- try:
- stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file,
- fpath, suri(bucket_uri, 'foo')],
- expected_status=1, return_stderr=True)
- self.assertIn('Artifically halting upload', stderr)
- self.assertTrue(os.path.exists(tracker_filename),
- 'Tracker file %s not present.' % tracker_filename)
- finally:
- if os.path.exists(tracker_filename):
- os.unlink(tracker_filename)
-
- @SkipForS3('No resumable upload support for S3.')
- def test_cp_resumable_upload_break_file_size_change(self):
- """Tests a resumable upload where the uploaded file changes size.
-
- This should fail when we read the tracker data.
- """
- bucket_uri = self.CreateBucket()
- tmp_dir = self.CreateTempDir()
- fpath = self.CreateTempFile(file_name='foo', tmpdir=tmp_dir,
- contents='a' * self.halt_size)
- test_callback_file = self.CreateTempFile(
- contents=pickle.dumps(_HaltingCopyCallbackHandler(True, 5)))
-
- boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB))
- with SetBotoConfigForTest([boto_config_for_test]):
- stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file,
- fpath, suri(bucket_uri)],
- expected_status=1, return_stderr=True)
- self.assertIn('Artifically halting upload', stderr)
- fpath = self.CreateTempFile(file_name='foo', tmpdir=tmp_dir,
- contents='a' * self.halt_size * 2)
- stderr = self.RunGsUtil(['cp', fpath, suri(bucket_uri)],
- expected_status=1, return_stderr=True)
- self.assertIn('ResumableUploadAbortException', stderr)
-
- @SkipForS3('No resumable upload support for S3.')
- def test_cp_resumable_upload_break_file_content_change(self):
- """Tests a resumable upload where the uploaded file changes content."""
- if self.test_api == ApiSelector.XML:
- return unittest.skip(
- 'XML doesn\'t make separate HTTP calls at fixed-size boundaries for '
- 'resumable uploads, so we can\'t guarantee that the server saves a '
- 'specific part of the upload.')
- bucket_uri = self.CreateBucket()
- tmp_dir = self.CreateTempDir()
- fpath = self.CreateTempFile(file_name='foo', tmpdir=tmp_dir,
- contents='a' * ONE_KIB * 512)
- test_callback_file = self.CreateTempFile(
- contents=pickle.dumps(_HaltingCopyCallbackHandler(True,
- int(ONE_KIB) * 384)))
- resumable_threshold_for_test = (
- 'GSUtil', 'resumable_threshold', str(ONE_KIB))
- resumable_chunk_size_for_test = (
- 'GSUtil', 'json_resumable_chunk_size', str(ONE_KIB * 256))
- with SetBotoConfigForTest([resumable_threshold_for_test,
- resumable_chunk_size_for_test]):
- stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file,
- fpath, suri(bucket_uri)],
- expected_status=1, return_stderr=True)
- self.assertIn('Artifically halting upload', stderr)
- fpath = self.CreateTempFile(file_name='foo', tmpdir=tmp_dir,
- contents='b' * ONE_KIB * 512)
- stderr = self.RunGsUtil(['cp', fpath, suri(bucket_uri)],
- expected_status=1, return_stderr=True)
- self.assertIn('doesn\'t match cloud-supplied digest', stderr)
-
- @SkipForS3('No resumable upload support for S3.')
- def test_cp_resumable_upload_break_file_smaller_size(self):
- """Tests a resumable upload where the uploaded file changes content.
-
- This should fail hash validation.
- """
- bucket_uri = self.CreateBucket()
- tmp_dir = self.CreateTempDir()
- fpath = self.CreateTempFile(file_name='foo', tmpdir=tmp_dir,
- contents='a' * ONE_KIB * 512)
- test_callback_file = self.CreateTempFile(
- contents=pickle.dumps(_HaltingCopyCallbackHandler(True,
- int(ONE_KIB) * 384)))
- resumable_threshold_for_test = (
- 'GSUtil', 'resumable_threshold', str(ONE_KIB))
- resumable_chunk_size_for_test = (
- 'GSUtil', 'json_resumable_chunk_size', str(ONE_KIB * 256))
- with SetBotoConfigForTest([resumable_threshold_for_test,
- resumable_chunk_size_for_test]):
- stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file,
- fpath, suri(bucket_uri)],
- expected_status=1, return_stderr=True)
- self.assertIn('Artifically halting upload', stderr)
- fpath = self.CreateTempFile(file_name='foo', tmpdir=tmp_dir,
- contents='a' * ONE_KIB)
- stderr = self.RunGsUtil(['cp', fpath, suri(bucket_uri)],
- expected_status=1, return_stderr=True)
- self.assertIn('ResumableUploadAbortException', stderr)
-
- # This temporarily changes the tracker directory to unwritable which
- # interferes with any parallel running tests that use the tracker directory.
- @NotParallelizable
- @SkipForS3('No resumable upload support for S3.')
- @unittest.skipIf(IS_WINDOWS, 'chmod on dir unsupported on Windows.')
- @PerformsFileToObjectUpload
- def test_cp_unwritable_tracker_file(self):
- """Tests a resumable upload with an unwritable tracker file."""
- bucket_uri = self.CreateBucket()
- tracker_filename = GetTrackerFilePath(
- StorageUrlFromString(suri(bucket_uri, 'foo')),
- TrackerFileType.UPLOAD, self.test_api)
- tracker_dir = os.path.dirname(tracker_filename)
- fpath = self.CreateTempFile(file_name='foo', contents='a' * ONE_KIB)
- boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB))
- save_mod = os.stat(tracker_dir).st_mode
-
- try:
- os.chmod(tracker_dir, 0)
- with SetBotoConfigForTest([boto_config_for_test]):
- stderr = self.RunGsUtil(['cp', fpath, suri(bucket_uri)],
- expected_status=1, return_stderr=True)
- self.assertIn('Couldn\'t write tracker file', stderr)
- finally:
- os.chmod(tracker_dir, save_mod)
- if os.path.exists(tracker_filename):
- os.unlink(tracker_filename)
-
- # This temporarily changes the tracker directory to unwritable which
- # interferes with any parallel running tests that use the tracker directory.
- @NotParallelizable
- @unittest.skipIf(IS_WINDOWS, 'chmod on dir unsupported on Windows.')
- def test_cp_unwritable_tracker_file_download(self):
- """Tests downloads with an unwritable tracker file."""
- object_uri = self.CreateObject(contents='foo' * ONE_KIB)
- tracker_filename = GetTrackerFilePath(
- StorageUrlFromString(suri(object_uri)),
- TrackerFileType.DOWNLOAD, self.test_api)
- tracker_dir = os.path.dirname(tracker_filename)
- fpath = self.CreateTempFile()
- save_mod = os.stat(tracker_dir).st_mode
-
- try:
- os.chmod(tracker_dir, 0)
- boto_config_for_test = ('GSUtil', 'resumable_threshold', str(EIGHT_MIB))
- with SetBotoConfigForTest([boto_config_for_test]):
- # Should succeed because we are below the threshold.
- self.RunGsUtil(['cp', suri(object_uri), fpath])
- boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB))
- with SetBotoConfigForTest([boto_config_for_test]):
- stderr = self.RunGsUtil(['cp', suri(object_uri), fpath],
- expected_status=1, return_stderr=True)
- self.assertIn('Couldn\'t write tracker file', stderr)
- finally:
- os.chmod(tracker_dir, save_mod)
- if os.path.exists(tracker_filename):
- os.unlink(tracker_filename)
-
- def test_cp_resumable_download_break(self):
- """Tests that a download can be resumed after a connection break."""
- bucket_uri = self.CreateBucket()
- object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo',
- contents='a' * self.halt_size)
- fpath = self.CreateTempFile()
- test_callback_file = self.CreateTempFile(
- contents=pickle.dumps(_HaltingCopyCallbackHandler(False, 5)))
-
- boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB))
- with SetBotoConfigForTest([boto_config_for_test]):
- stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file,
- suri(object_uri), fpath],
- expected_status=1, return_stderr=True)
- self.assertIn('Artifically halting download.', stderr)
- tracker_filename = GetTrackerFilePath(
- StorageUrlFromString(fpath), TrackerFileType.DOWNLOAD, self.test_api)
- self.assertTrue(os.path.isfile(tracker_filename))
- stderr = self.RunGsUtil(['cp', suri(object_uri), fpath],
- return_stderr=True)
- self.assertIn('Resuming download', stderr)
-
- def test_cp_resumable_download_etag_differs(self):
- """Tests that download restarts the file when the source object changes.
-
- This causes the etag not to match.
- """
- bucket_uri = self.CreateBucket()
- object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo',
- contents='a' * self.halt_size)
- fpath = self.CreateTempFile()
- test_callback_file = self.CreateTempFile(
- contents=pickle.dumps(_HaltingCopyCallbackHandler(False, 5)))
- boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB))
- with SetBotoConfigForTest([boto_config_for_test]):
- # This will create a tracker file with an ETag.
- stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file,
- suri(object_uri), fpath],
- expected_status=1, return_stderr=True)
- self.assertIn('Artifically halting download.', stderr)
- # Create a new object with different contents - it should have a
- # different ETag since the content has changed.
- object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo',
- contents='b' * self.halt_size)
- stderr = self.RunGsUtil(['cp', suri(object_uri), fpath],
- return_stderr=True)
- self.assertNotIn('Resuming download', stderr)
-
- def test_cp_resumable_download_file_larger(self):
- """Tests download deletes the tracker file when existing file is larger."""
- bucket_uri = self.CreateBucket()
- fpath = self.CreateTempFile()
- object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo',
- contents='a' * self.halt_size)
- test_callback_file = self.CreateTempFile(
- contents=pickle.dumps(_HaltingCopyCallbackHandler(False, 5)))
- boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB))
- with SetBotoConfigForTest([boto_config_for_test]):
- stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file,
- suri(object_uri), fpath],
- expected_status=1, return_stderr=True)
- self.assertIn('Artifically halting download.', stderr)
- with open(fpath, 'w') as larger_file:
- for _ in range(self.halt_size * 2):
- larger_file.write('a')
- stderr = self.RunGsUtil(['cp', suri(object_uri), fpath],
- expected_status=1, return_stderr=True)
- self.assertNotIn('Resuming download', stderr)
- self.assertIn('is larger', stderr)
- self.assertIn('Deleting tracker file', stderr)
-
- def test_cp_resumable_download_content_differs(self):
- """Tests that we do not re-download when tracker file matches existing file.
-
- We only compare size, not contents, so re-download should not occur even
- though the contents are technically different. However, hash validation on
- the file should still occur and we will delete the file then because
- the hashes differ.
- """
- bucket_uri = self.CreateBucket()
- tmp_dir = self.CreateTempDir()
- fpath = self.CreateTempFile(tmpdir=tmp_dir, contents='abcd' * ONE_KIB)
- object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo',
- contents='efgh' * ONE_KIB)
- stdout = self.RunGsUtil(['ls', '-L', suri(object_uri)], return_stdout=True)
- etag_match = re.search(r'\s*ETag:\s*(.*)', stdout)
- self.assertIsNotNone(etag_match, 'Could not get object ETag')
- self.assertEqual(len(etag_match.groups()), 1,
- 'Did not match expected single ETag')
- etag = etag_match.group(1)
-
- tracker_filename = GetTrackerFilePath(
- StorageUrlFromString(fpath), TrackerFileType.DOWNLOAD, self.test_api)
- try:
- with open(tracker_filename, 'w') as tracker_fp:
- tracker_fp.write(etag)
- boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB))
- with SetBotoConfigForTest([boto_config_for_test]):
- stderr = self.RunGsUtil(['cp', suri(object_uri), fpath],
- return_stderr=True, expected_status=1)
- self.assertIn('Download already complete for file', stderr)
- self.assertIn('doesn\'t match cloud-supplied digest', stderr)
- # File and tracker file should be deleted.
- self.assertFalse(os.path.isfile(fpath))
- self.assertFalse(os.path.isfile(tracker_filename))
- finally:
- if os.path.exists(tracker_filename):
- os.unlink(tracker_filename)
-
- def test_cp_resumable_download_content_matches(self):
- """Tests download no-ops when tracker file matches existing file."""
- bucket_uri = self.CreateBucket()
- tmp_dir = self.CreateTempDir()
- matching_contents = 'abcd' * ONE_KIB
- fpath = self.CreateTempFile(tmpdir=tmp_dir, contents=matching_contents)
- object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo',
- contents=matching_contents)
- stdout = self.RunGsUtil(['ls', '-L', suri(object_uri)], return_stdout=True)
- etag_match = re.search(r'\s*ETag:\s*(.*)', stdout)
- self.assertIsNotNone(etag_match, 'Could not get object ETag')
- self.assertEqual(len(etag_match.groups()), 1,
- 'Did not match expected single ETag')
- etag = etag_match.group(1)
- tracker_filename = GetTrackerFilePath(
- StorageUrlFromString(fpath), TrackerFileType.DOWNLOAD, self.test_api)
- with open(tracker_filename, 'w') as tracker_fp:
- tracker_fp.write(etag)
- try:
- boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB))
- with SetBotoConfigForTest([boto_config_for_test]):
- stderr = self.RunGsUtil(['cp', suri(object_uri), fpath],
- return_stderr=True)
- self.assertIn('Download already complete for file', stderr)
- # Tracker file should be removed after successful hash validation.
- self.assertFalse(os.path.isfile(tracker_filename))
- finally:
- if os.path.exists(tracker_filename):
- os.unlink(tracker_filename)
-
- def test_cp_resumable_download_tracker_file_not_matches(self):
- """Tests that download overwrites when tracker file etag does not match."""
- bucket_uri = self.CreateBucket()
- tmp_dir = self.CreateTempDir()
- fpath = self.CreateTempFile(tmpdir=tmp_dir, contents='abcd' * ONE_KIB)
- object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo',
- contents='efgh' * ONE_KIB)
- stdout = self.RunGsUtil(['ls', '-L', suri(object_uri)], return_stdout=True)
- etag_match = re.search(r'\s*ETag:\s*(.*)', stdout)
- self.assertIsNotNone(etag_match, 'Could not get object ETag')
- self.assertEqual(len(etag_match.groups()), 1,
- 'Did not match regex for exactly one object ETag')
- etag = etag_match.group(1)
- etag += 'nonmatching'
- tracker_filename = GetTrackerFilePath(
- StorageUrlFromString(fpath), TrackerFileType.DOWNLOAD, self.test_api)
- with open(tracker_filename, 'w') as tracker_fp:
- tracker_fp.write(etag)
- try:
- boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB))
- with SetBotoConfigForTest([boto_config_for_test]):
- stderr = self.RunGsUtil(['cp', suri(object_uri), fpath],
- return_stderr=True)
- self.assertNotIn('Resuming download', stderr)
- # Ensure the file was overwritten.
- with open(fpath, 'r') as in_fp:
- contents = in_fp.read()
- self.assertEqual(contents, 'efgh' * ONE_KIB,
- 'File not overwritten when it should have been '
- 'due to a non-matching tracker file.')
- self.assertFalse(os.path.isfile(tracker_filename))
- finally:
- if os.path.exists(tracker_filename):
- os.unlink(tracker_filename)
-
- def test_cp_resumable_download_gzip(self):
- """Tests that download can be resumed successfully with a gzipped file."""
- # Generate some reasonably incompressible data. This compresses to a bit
- # around 128K in practice, but we assert specifically below that it is
- # larger than self.halt_size to guarantee that we can halt the download
- # partway through.
- object_uri = self.CreateObject()
- random.seed(0)
- contents = str([random.choice(string.ascii_letters)
- for _ in xrange(ONE_KIB * 128)])
- random.seed() # Reset the seed for any other tests.
- fpath1 = self.CreateTempFile(file_name='unzipped.txt', contents=contents)
- self.RunGsUtil(['cp', '-z', 'txt', suri(fpath1), suri(object_uri)])
-
- # Use @Retry as hedge against bucket listing eventual consistency.
- @Retry(AssertionError, tries=3, timeout_secs=1)
- def _GetObjectSize():
- stdout = self.RunGsUtil(['du', suri(object_uri)], return_stdout=True)
- size_match = re.search(r'(\d+)\s+.*', stdout)
- self.assertIsNotNone(size_match, 'Could not get object size')
- self.assertEqual(len(size_match.groups()), 1,
- 'Did not match regex for exactly one object size.')
- return long(size_match.group(1))
-
- object_size = _GetObjectSize()
- self.assertGreaterEqual(object_size, self.halt_size,
- 'Compresed object size was not large enough to '
- 'allow for a halted download, so the test results '
- 'would be invalid. Please increase the compressed '
- 'object size in the test.')
- fpath2 = self.CreateTempFile()
- test_callback_file = self.CreateTempFile(
- contents=pickle.dumps(_HaltingCopyCallbackHandler(False, 5)))
-
- boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB))
- with SetBotoConfigForTest([boto_config_for_test]):
- stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file,
- suri(object_uri), suri(fpath2)],
- return_stderr=True, expected_status=1)
- self.assertIn('Artifically halting download.', stderr)
- tracker_filename = GetTrackerFilePath(
- StorageUrlFromString(fpath2), TrackerFileType.DOWNLOAD, self.test_api)
- self.assertTrue(os.path.isfile(tracker_filename))
- self.assertIn('Downloading to temp gzip filename', stderr)
- # We should have a temporary gzipped file, a tracker file, and no
- # final file yet.
- self.assertTrue(os.path.isfile('%s_.gztmp' % fpath2))
- stderr = self.RunGsUtil(['cp', suri(object_uri), suri(fpath2)],
- return_stderr=True)
- self.assertIn('Resuming download', stderr)
- with open(fpath2, 'r') as f:
- self.assertEqual(f.read(), contents, 'File contents did not match.')
- self.assertFalse(os.path.isfile(tracker_filename))
- self.assertFalse(os.path.isfile('%s_.gztmp' % fpath2))
-
- @SkipForS3('No resumable upload support for S3.')
- def test_cp_resumable_upload_bucket_deleted(self):
- """Tests that a not found exception is raised if bucket no longer exists."""
- bucket_uri = self.CreateBucket()
- fpath = self.CreateTempFile(contents='a' * 2 * ONE_KIB)
- boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB))
- test_callback_file = self.CreateTempFile(
- contents=pickle.dumps(
- _DeleteBucketThenStartOverCopyCallbackHandler(5, bucket_uri)))
-
- with SetBotoConfigForTest([boto_config_for_test]):
- stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file,
- fpath, suri(bucket_uri)], return_stderr=True,
- expected_status=1)
- self.assertIn('Deleting bucket', stderr)
- self.assertIn('bucket does not exist', stderr)
-
- @SkipForS3('No resumable upload support for S3.')
- def test_cp_resumable_upload_start_over_http_error(self):
- for start_over_error in (404, 410):
- self.start_over_error_test_helper(start_over_error)
-
- def start_over_error_test_helper(self, http_error_num):
- bucket_uri = self.CreateBucket()
- fpath = self.CreateTempFile(contents='a' * 2 * ONE_KIB)
- boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB))
- if self.test_api == ApiSelector.JSON:
- test_callback_file = self.CreateTempFile(
- contents=pickle.dumps(_JSONForceHTTPErrorCopyCallbackHandler(5, 404)))
- elif self.test_api == ApiSelector.XML:
- test_callback_file = self.CreateTempFile(
- contents=pickle.dumps(
- _XMLResumableUploadStartOverCopyCallbackHandler(5)))
-
- with SetBotoConfigForTest([boto_config_for_test]):
- stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file,
- fpath, suri(bucket_uri)], return_stderr=True)
- self.assertIn('Restarting upload from scratch', stderr)
-
- def test_cp_minus_c(self):
- bucket_uri = self.CreateBucket()
- object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo',
- contents='foo')
- self.RunGsUtil(
- ['cp', '-c', suri(bucket_uri) + '/foo2', suri(object_uri),
- suri(bucket_uri) + '/dir/'],
- expected_status=1)
- self.RunGsUtil(['stat', '%s/dir/foo' % suri(bucket_uri)])
-
- def test_rewrite_cp(self):
- """Tests the JSON Rewrite API."""
- if self.test_api == ApiSelector.XML:
- return unittest.skip('Rewrite API is only supported in JSON.')
- bucket_uri = self.CreateBucket()
- object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo',
- contents='bar')
- gsutil_api = GcsJsonApi(BucketStorageUri, logging.getLogger(),
- self.default_provider)
- key = object_uri.get_key()
- src_obj_metadata = apitools_messages.Object(
- name=key.name, bucket=key.bucket.name, contentType=key.content_type)
- dst_obj_metadata = apitools_messages.Object(
- bucket=src_obj_metadata.bucket,
- name=self.MakeTempName('object'),
- contentType=src_obj_metadata.contentType)
- gsutil_api.CopyObject(src_obj_metadata, dst_obj_metadata)
- self.assertEqual(
- gsutil_api.GetObjectMetadata(src_obj_metadata.bucket,
- src_obj_metadata.name,
- fields=['md5Hash']).md5Hash,
- gsutil_api.GetObjectMetadata(dst_obj_metadata.bucket,
- dst_obj_metadata.name,
- fields=['md5Hash']).md5Hash,
- 'Error: Rewritten object\'s hash doesn\'t match source object.')
-
- def test_rewrite_cp_resume(self):
- """Tests the JSON Rewrite API, breaking and resuming via a tracker file."""
- if self.test_api == ApiSelector.XML:
- return unittest.skip('Rewrite API is only supported in JSON.')
- bucket_uri = self.CreateBucket()
- # Second bucket needs to be a different storage class so the service
- # actually rewrites the bytes.
- bucket_uri2 = self.CreateBucket(
- storage_class='DURABLE_REDUCED_AVAILABILITY')
- # maxBytesPerCall must be >= 1 MiB, so create an object > 2 MiB because we
- # need 2 response from the service: 1 success, 1 failure prior to
- # completion.
- object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo',
- contents=('12'*ONE_MIB) + 'bar',
- prefer_json_api=True)
- gsutil_api = GcsJsonApi(BucketStorageUri, logging.getLogger(),
- self.default_provider)
- key = object_uri.get_key()
- src_obj_metadata = apitools_messages.Object(
- name=key.name, bucket=key.bucket.name, contentType=key.content_type,
- etag=key.etag.strip('"\''))
- dst_obj_name = self.MakeTempName('object')
- dst_obj_metadata = apitools_messages.Object(
- bucket=bucket_uri2.bucket_name,
- name=dst_obj_name,
- contentType=src_obj_metadata.contentType)
- tracker_file_name = GetRewriteTrackerFilePath(
- src_obj_metadata.bucket, src_obj_metadata.name,
- dst_obj_metadata.bucket, dst_obj_metadata.name, self.test_api)
- try:
- try:
- gsutil_api.CopyObject(
- src_obj_metadata, dst_obj_metadata,
- progress_callback=_HaltingRewriteCallbackHandler(ONE_MIB*2).call,
- max_bytes_per_call=ONE_MIB)
- self.fail('Expected _RewriteHaltException.')
- except _RewriteHaltException:
- pass
-
- # Tracker file should be left over.
- self.assertTrue(os.path.exists(tracker_file_name))
-
- # Now resume. Callback ensures we didn't start over.
- gsutil_api.CopyObject(
- src_obj_metadata, dst_obj_metadata,
- progress_callback=_EnsureRewriteResumeCallbackHandler(ONE_MIB*2).call,
- max_bytes_per_call=ONE_MIB)
-
- # Copy completed; tracker file should be deleted.
- self.assertFalse(os.path.exists(tracker_file_name))
-
- self.assertEqual(
- gsutil_api.GetObjectMetadata(src_obj_metadata.bucket,
- src_obj_metadata.name,
- fields=['md5Hash']).md5Hash,
- gsutil_api.GetObjectMetadata(dst_obj_metadata.bucket,
- dst_obj_metadata.name,
- fields=['md5Hash']).md5Hash,
- 'Error: Rewritten object\'s hash doesn\'t match source object.')
- finally:
- # Clean up if something went wrong.
- DeleteTrackerFile(tracker_file_name)
-
- def test_rewrite_cp_resume_source_changed(self):
- """Tests that Rewrite starts over when the source object has changed."""
- if self.test_api == ApiSelector.XML:
- return unittest.skip('Rewrite API is only supported in JSON.')
- bucket_uri = self.CreateBucket()
- # Second bucket needs to be a different storage class so the service
- # actually rewrites the bytes.
- bucket_uri2 = self.CreateBucket(
- storage_class='DURABLE_REDUCED_AVAILABILITY')
- # maxBytesPerCall must be >= 1 MiB, so create an object > 2 MiB because we
- # need 2 response from the service: 1 success, 1 failure prior to
- # completion.
- object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo',
- contents=('12'*ONE_MIB) + 'bar',
- prefer_json_api=True)
- gsutil_api = GcsJsonApi(BucketStorageUri, logging.getLogger(),
- self.default_provider)
- key = object_uri.get_key()
- src_obj_metadata = apitools_messages.Object(
- name=key.name, bucket=key.bucket.name, contentType=key.content_type,
- etag=key.etag.strip('"\''))
- dst_obj_name = self.MakeTempName('object')
- dst_obj_metadata = apitools_messages.Object(
- bucket=bucket_uri2.bucket_name,
- name=dst_obj_name,
- contentType=src_obj_metadata.contentType)
- tracker_file_name = GetRewriteTrackerFilePath(
- src_obj_metadata.bucket, src_obj_metadata.name,
- dst_obj_metadata.bucket, dst_obj_metadata.name, self.test_api)
- try:
- try:
- gsutil_api.CopyObject(
- src_obj_metadata, dst_obj_metadata,
- progress_callback=_HaltingRewriteCallbackHandler(ONE_MIB*2).call,
- max_bytes_per_call=ONE_MIB)
- self.fail('Expected _RewriteHaltException.')
- except _RewriteHaltException:
- pass
- # Overwrite the original object.
- object_uri2 = self.CreateObject(bucket_uri=bucket_uri, object_name='foo',
- contents='bar', prefer_json_api=True)
- key2 = object_uri2.get_key()
- src_obj_metadata2 = apitools_messages.Object(
- name=key2.name, bucket=key2.bucket.name,
- contentType=key2.content_type, etag=key2.etag.strip('"\''))
-
- # Tracker file for original object should still exist.
- self.assertTrue(os.path.exists(tracker_file_name))
-
- # Copy the new object.
- gsutil_api.CopyObject(src_obj_metadata2, dst_obj_metadata,
- max_bytes_per_call=ONE_MIB)
-
- # Copy completed; original tracker file should be deleted.
- self.assertFalse(os.path.exists(tracker_file_name))
-
- self.assertEqual(
- gsutil_api.GetObjectMetadata(src_obj_metadata2.bucket,
- src_obj_metadata2.name,
- fields=['md5Hash']).md5Hash,
- gsutil_api.GetObjectMetadata(dst_obj_metadata.bucket,
- dst_obj_metadata.name,
- fields=['md5Hash']).md5Hash,
- 'Error: Rewritten object\'s hash doesn\'t match source object.')
- finally:
- # Clean up if something went wrong.
- DeleteTrackerFile(tracker_file_name)
-
- def test_rewrite_cp_resume_command_changed(self):
- """Tests that Rewrite starts over when the arguments changed."""
- if self.test_api == ApiSelector.XML:
- return unittest.skip('Rewrite API is only supported in JSON.')
- bucket_uri = self.CreateBucket()
- # Second bucket needs to be a different storage class so the service
- # actually rewrites the bytes.
- bucket_uri2 = self.CreateBucket(
- storage_class='DURABLE_REDUCED_AVAILABILITY')
- # maxBytesPerCall must be >= 1 MiB, so create an object > 2 MiB because we
- # need 2 response from the service: 1 success, 1 failure prior to
- # completion.
- object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo',
- contents=('12'*ONE_MIB) + 'bar',
- prefer_json_api=True)
- gsutil_api = GcsJsonApi(BucketStorageUri, logging.getLogger(),
- self.default_provider)
- key = object_uri.get_key()
- src_obj_metadata = apitools_messages.Object(
- name=key.name, bucket=key.bucket.name, contentType=key.content_type,
- etag=key.etag.strip('"\''))
- dst_obj_name = self.MakeTempName('object')
- dst_obj_metadata = apitools_messages.Object(
- bucket=bucket_uri2.bucket_name,
- name=dst_obj_name,
- contentType=src_obj_metadata.contentType)
- tracker_file_name = GetRewriteTrackerFilePath(
- src_obj_metadata.bucket, src_obj_metadata.name,
- dst_obj_metadata.bucket, dst_obj_metadata.name, self.test_api)
- try:
- try:
- gsutil_api.CopyObject(
- src_obj_metadata, dst_obj_metadata, canned_acl='private',
- progress_callback=_HaltingRewriteCallbackHandler(ONE_MIB*2).call,
- max_bytes_per_call=ONE_MIB)
- self.fail('Expected _RewriteHaltException.')
- except _RewriteHaltException:
- pass
-
- # Tracker file for original object should still exist.
- self.assertTrue(os.path.exists(tracker_file_name))
-
- # Copy the same object but with different call parameters.
- gsutil_api.CopyObject(src_obj_metadata, dst_obj_metadata,
- canned_acl='public-read',
- max_bytes_per_call=ONE_MIB)
-
- # Copy completed; original tracker file should be deleted.
- self.assertFalse(os.path.exists(tracker_file_name))
-
- new_obj_metadata = gsutil_api.GetObjectMetadata(
- dst_obj_metadata.bucket, dst_obj_metadata.name,
- fields=['acl,md5Hash'])
- self.assertEqual(
- gsutil_api.GetObjectMetadata(src_obj_metadata.bucket,
- src_obj_metadata.name,
- fields=['md5Hash']).md5Hash,
- new_obj_metadata.md5Hash,
- 'Error: Rewritten object\'s hash doesn\'t match source object.')
- # New object should have a public-read ACL from the second command.
- found_public_acl = False
- for acl_entry in new_obj_metadata.acl:
- if acl_entry.entity == 'allUsers':
- found_public_acl = True
- self.assertTrue(found_public_acl,
- 'New object was not written with a public ACL.')
- finally:
- # Clean up if something went wrong.
- DeleteTrackerFile(tracker_file_name)
-
-
-class TestCpUnitTests(testcase.GsUtilUnitTestCase):
- """Unit tests for gsutil cp."""
-
- def testDownloadWithNoHashAvailable(self):
- """Tests a download with no valid server-supplied hash."""
- # S3 should have a special message for non-MD5 etags.
- bucket_uri = self.CreateBucket(provider='s3')
- object_uri = self.CreateObject(bucket_uri=bucket_uri, contents='foo')
- object_uri.get_key().etag = '12345' # Not an MD5
- dst_dir = self.CreateTempDir()
-
- log_handler = self.RunCommand(
- 'cp', [suri(object_uri), dst_dir], return_log_handler=True)
- warning_messages = log_handler.messages['warning']
- self.assertEquals(2, len(warning_messages))
- self.assertRegexpMatches(
- warning_messages[0],
- r'Non-MD5 etag \(12345\) present for key .*, '
- r'data integrity checks are not possible')
- self.assertIn('Integrity cannot be assured', warning_messages[1])
-
- def test_object_and_prefix_same_name(self):
- bucket_uri = self.CreateBucket()
- object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo',
- contents='foo')
- self.CreateObject(bucket_uri=bucket_uri,
- object_name='foo/bar', contents='bar')
- fpath = self.CreateTempFile()
- # MockKey doesn't support hash_algs, so the MD5 will not match.
- with SetBotoConfigForTest([('GSUtil', 'check_hashes', 'never')]):
- self.RunCommand('cp', [suri(object_uri), fpath])
- with open(fpath, 'r') as f:
- self.assertEqual(f.read(), 'foo')
-
- def test_cp_upload_respects_no_hashes(self):
- bucket_uri = self.CreateBucket()
- fpath = self.CreateTempFile(contents='abcd')
- with SetBotoConfigForTest([('GSUtil', 'check_hashes', 'never')]):
- log_handler = self.RunCommand('cp', [fpath, suri(bucket_uri)],
- return_log_handler=True)
- warning_messages = log_handler.messages['warning']
- self.assertEquals(1, len(warning_messages))
- self.assertIn('Found no hashes to validate object upload',
- warning_messages[0])

Powered by Google App Engine
This is Rietveld 408576698