OLD | NEW |
(Empty) | |
| 1 # -*- coding: utf-8 -*- |
| 2 # Copyright 2013 Google Inc. All Rights Reserved. |
| 3 # |
| 4 # Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 # you may not use this file except in compliance with the License. |
| 6 # You may obtain a copy of the License at |
| 7 # |
| 8 # http://www.apache.org/licenses/LICENSE-2.0 |
| 9 # |
| 10 # Unless required by applicable law or agreed to in writing, software |
| 11 # distributed under the License is distributed on an "AS IS" BASIS, |
| 12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 # See the License for the specific language governing permissions and |
| 14 # limitations under the License. |
| 15 """Integration tests for cp command.""" |
| 16 |
| 17 from __future__ import absolute_import |
| 18 |
| 19 import base64 |
| 20 import binascii |
| 21 import datetime |
| 22 import httplib |
| 23 import logging |
| 24 import os |
| 25 import pickle |
| 26 import pkgutil |
| 27 import random |
| 28 import re |
| 29 import string |
| 30 import sys |
| 31 |
| 32 from apitools.base.py import exceptions as apitools_exceptions |
| 33 import boto |
| 34 from boto import storage_uri |
| 35 from boto.exception import ResumableTransferDisposition |
| 36 from boto.exception import ResumableUploadException |
| 37 from boto.exception import StorageResponseError |
| 38 from boto.storage_uri import BucketStorageUri |
| 39 |
| 40 from gslib.cloud_api import ResumableDownloadException |
| 41 from gslib.cloud_api import ResumableUploadException |
| 42 from gslib.cloud_api import ResumableUploadStartOverException |
| 43 from gslib.copy_helper import GetTrackerFilePath |
| 44 from gslib.copy_helper import TrackerFileType |
| 45 from gslib.cs_api_map import ApiSelector |
| 46 from gslib.gcs_json_api import GcsJsonApi |
| 47 from gslib.hashing_helper import CalculateMd5FromContents |
| 48 from gslib.storage_url import StorageUrlFromString |
| 49 import gslib.tests.testcase as testcase |
| 50 from gslib.tests.testcase.base import NotParallelizable |
| 51 from gslib.tests.testcase.integration_testcase import SkipForS3 |
| 52 from gslib.tests.util import GenerationFromURI as urigen |
| 53 from gslib.tests.util import HAS_S3_CREDS |
| 54 from gslib.tests.util import ObjectToURI as suri |
| 55 from gslib.tests.util import PerformsFileToObjectUpload |
| 56 from gslib.tests.util import SetBotoConfigForTest |
| 57 from gslib.tests.util import unittest |
| 58 from gslib.third_party.storage_apitools import storage_v1_messages as apitools_m
essages |
| 59 from gslib.tracker_file import DeleteTrackerFile |
| 60 from gslib.tracker_file import GetRewriteTrackerFilePath |
| 61 from gslib.util import EIGHT_MIB |
| 62 from gslib.util import IS_WINDOWS |
| 63 from gslib.util import MakeHumanReadable |
| 64 from gslib.util import ONE_KIB |
| 65 from gslib.util import ONE_MIB |
| 66 from gslib.util import Retry |
| 67 from gslib.util import START_CALLBACK_PER_BYTES |
| 68 from gslib.util import UTF8 |
| 69 |
| 70 |
| 71 # Custom test callbacks must be pickleable, and therefore at global scope. |
| 72 class _HaltingCopyCallbackHandler(object): |
| 73 """Test callback handler for intentionally stopping a resumable transfer.""" |
| 74 |
| 75 def __init__(self, is_upload, halt_at_byte): |
| 76 self._is_upload = is_upload |
| 77 self._halt_at_byte = halt_at_byte |
| 78 |
| 79 # pylint: disable=invalid-name |
| 80 def call(self, total_bytes_transferred, total_size): |
| 81 """Forcibly exits if the transfer has passed the halting point.""" |
| 82 if total_bytes_transferred >= self._halt_at_byte: |
| 83 sys.stderr.write( |
| 84 'Halting transfer after byte %s. %s/%s transferred.\r\n' % ( |
| 85 self._halt_at_byte, MakeHumanReadable(total_bytes_transferred), |
| 86 MakeHumanReadable(total_size))) |
| 87 if self._is_upload: |
| 88 raise ResumableUploadException('Artifically halting upload.') |
| 89 else: |
| 90 raise ResumableDownloadException('Artifically halting download.') |
| 91 |
| 92 |
| 93 class _JSONForceHTTPErrorCopyCallbackHandler(object): |
| 94 """Test callback handler that raises an arbitrary HTTP error exception.""" |
| 95 |
| 96 def __init__(self, startover_at_byte, http_error_num): |
| 97 self._startover_at_byte = startover_at_byte |
| 98 self._http_error_num = http_error_num |
| 99 self.started_over_once = False |
| 100 |
| 101 # pylint: disable=invalid-name |
| 102 def call(self, total_bytes_transferred, total_size): |
| 103 """Forcibly exits if the transfer has passed the halting point.""" |
| 104 if (total_bytes_transferred >= self._startover_at_byte |
| 105 and not self.started_over_once): |
| 106 sys.stderr.write( |
| 107 'Forcing HTTP error %s after byte %s. ' |
| 108 '%s/%s transferred.\r\n' % ( |
| 109 self._http_error_num, |
| 110 self._startover_at_byte, |
| 111 MakeHumanReadable(total_bytes_transferred), |
| 112 MakeHumanReadable(total_size))) |
| 113 self.started_over_once = True |
| 114 raise apitools_exceptions.HttpError( |
| 115 {'status': self._http_error_num}, None, None) |
| 116 |
| 117 |
| 118 class _XMLResumableUploadStartOverCopyCallbackHandler(object): |
| 119 """Test callback handler that raises start-over exception during upload.""" |
| 120 |
| 121 def __init__(self, startover_at_byte): |
| 122 self._startover_at_byte = startover_at_byte |
| 123 self.started_over_once = False |
| 124 |
| 125 # pylint: disable=invalid-name |
| 126 def call(self, total_bytes_transferred, total_size): |
| 127 """Forcibly exits if the transfer has passed the halting point.""" |
| 128 if (total_bytes_transferred >= self._startover_at_byte |
| 129 and not self.started_over_once): |
| 130 sys.stderr.write( |
| 131 'Forcing ResumableUpload start over error after byte %s. ' |
| 132 '%s/%s transferred.\r\n' % ( |
| 133 self._startover_at_byte, |
| 134 MakeHumanReadable(total_bytes_transferred), |
| 135 MakeHumanReadable(total_size))) |
| 136 self.started_over_once = True |
| 137 raise boto.exception.ResumableUploadException( |
| 138 'Forcing upload start over', |
| 139 ResumableTransferDisposition.START_OVER) |
| 140 |
| 141 |
| 142 class _DeleteBucketThenStartOverCopyCallbackHandler(object): |
| 143 """Test callback handler that deletes bucket then raises start-over.""" |
| 144 |
| 145 def __init__(self, startover_at_byte, bucket_uri): |
| 146 self._startover_at_byte = startover_at_byte |
| 147 self._bucket_uri = bucket_uri |
| 148 self.started_over_once = False |
| 149 |
| 150 # pylint: disable=invalid-name |
| 151 def call(self, total_bytes_transferred, total_size): |
| 152 """Forcibly exits if the transfer has passed the halting point.""" |
| 153 if (total_bytes_transferred >= self._startover_at_byte |
| 154 and not self.started_over_once): |
| 155 sys.stderr.write('Deleting bucket (%s)' %(self._bucket_uri.bucket_name)) |
| 156 |
| 157 @Retry(StorageResponseError, tries=5, timeout_secs=1) |
| 158 def DeleteBucket(): |
| 159 bucket_list = list(self._bucket_uri.list_bucket(all_versions=True)) |
| 160 for k in bucket_list: |
| 161 self._bucket_uri.get_bucket().delete_key(k.name, |
| 162 version_id=k.version_id) |
| 163 self._bucket_uri.delete_bucket() |
| 164 |
| 165 DeleteBucket() |
| 166 sys.stderr.write( |
| 167 'Forcing ResumableUpload start over error after byte %s. ' |
| 168 '%s/%s transferred.\r\n' % ( |
| 169 self._startover_at_byte, |
| 170 MakeHumanReadable(total_bytes_transferred), |
| 171 MakeHumanReadable(total_size))) |
| 172 self.started_over_once = True |
| 173 raise ResumableUploadStartOverException( |
| 174 'Artificially forcing start-over') |
| 175 |
| 176 |
| 177 class _RewriteHaltException(Exception): |
| 178 pass |
| 179 |
| 180 |
| 181 class _HaltingRewriteCallbackHandler(object): |
| 182 """Test callback handler for intentionally stopping a rewrite operation.""" |
| 183 |
| 184 def __init__(self, halt_at_byte): |
| 185 self._halt_at_byte = halt_at_byte |
| 186 |
| 187 # pylint: disable=invalid-name |
| 188 def call(self, total_bytes_rewritten, unused_total_size): |
| 189 """Forcibly exits if the operation has passed the halting point.""" |
| 190 if total_bytes_rewritten >= self._halt_at_byte: |
| 191 raise _RewriteHaltException('Artificially halting rewrite') |
| 192 |
| 193 |
| 194 class _EnsureRewriteResumeCallbackHandler(object): |
| 195 """Test callback handler for ensuring a rewrite operation resumed.""" |
| 196 |
| 197 def __init__(self, required_byte): |
| 198 self._required_byte = required_byte |
| 199 |
| 200 # pylint: disable=invalid-name |
| 201 def call(self, total_bytes_rewritten, unused_total_size): |
| 202 """Forcibly exits if the operation has passed the halting point.""" |
| 203 if total_bytes_rewritten <= self._required_byte: |
| 204 raise _RewriteHaltException( |
| 205 'Rewrite did not resume; %s bytes written, but %s bytes should ' |
| 206 'have already been written.' % (total_bytes_rewritten, |
| 207 self._required_byte)) |
| 208 |
| 209 |
| 210 class _ResumableUploadRetryHandler(object): |
| 211 """Test callback handler for causing retries during a resumable transfer.""" |
| 212 |
| 213 def __init__(self, retry_at_byte, exception_to_raise, exc_args, |
| 214 num_retries=1): |
| 215 self._retry_at_byte = retry_at_byte |
| 216 self._exception_to_raise = exception_to_raise |
| 217 self._exception_args = exc_args |
| 218 self._num_retries = num_retries |
| 219 |
| 220 self._retries_made = 0 |
| 221 |
| 222 # pylint: disable=invalid-name |
| 223 def call(self, total_bytes_transferred, unused_total_size): |
| 224 """Cause a single retry at the retry point.""" |
| 225 if (total_bytes_transferred >= self._retry_at_byte |
| 226 and self._retries_made < self._num_retries): |
| 227 self._retries_made += 1 |
| 228 raise self._exception_to_raise(*self._exception_args) |
| 229 |
| 230 |
| 231 class TestCp(testcase.GsUtilIntegrationTestCase): |
| 232 """Integration tests for cp command.""" |
| 233 |
| 234 # For tests that artificially halt, we need to ensure at least one callback |
| 235 # occurs. |
| 236 halt_size = START_CALLBACK_PER_BYTES * 2 |
| 237 |
| 238 def _get_test_file(self, name): |
| 239 contents = pkgutil.get_data('gslib', 'tests/test_data/%s' % name) |
| 240 return self.CreateTempFile(file_name=name, contents=contents) |
| 241 |
| 242 @PerformsFileToObjectUpload |
| 243 def test_noclobber(self): |
| 244 key_uri = self.CreateObject(contents='foo') |
| 245 fpath = self.CreateTempFile(contents='bar') |
| 246 stderr = self.RunGsUtil(['cp', '-n', fpath, suri(key_uri)], |
| 247 return_stderr=True) |
| 248 self.assertIn('Skipping existing item: %s' % suri(key_uri), stderr) |
| 249 self.assertEqual(key_uri.get_contents_as_string(), 'foo') |
| 250 stderr = self.RunGsUtil(['cp', '-n', suri(key_uri), fpath], |
| 251 return_stderr=True) |
| 252 with open(fpath, 'r') as f: |
| 253 self.assertIn('Skipping existing item: %s' % suri(f), stderr) |
| 254 self.assertEqual(f.read(), 'bar') |
| 255 |
| 256 def test_dest_bucket_not_exist(self): |
| 257 fpath = self.CreateTempFile(contents='foo') |
| 258 invalid_bucket_uri = ( |
| 259 '%s://%s' % (self.default_provider, self.nonexistent_bucket_name)) |
| 260 stderr = self.RunGsUtil(['cp', fpath, invalid_bucket_uri], |
| 261 expected_status=1, return_stderr=True) |
| 262 self.assertIn('does not exist.', stderr) |
| 263 |
| 264 def test_copy_in_cloud_noclobber(self): |
| 265 bucket1_uri = self.CreateBucket() |
| 266 bucket2_uri = self.CreateBucket() |
| 267 key_uri = self.CreateObject(bucket_uri=bucket1_uri, contents='foo') |
| 268 stderr = self.RunGsUtil(['cp', suri(key_uri), suri(bucket2_uri)], |
| 269 return_stderr=True) |
| 270 # Rewrite API may output an additional 'Copying' progress notification. |
| 271 self.assertGreaterEqual(stderr.count('Copying'), 1) |
| 272 self.assertLessEqual(stderr.count('Copying'), 2) |
| 273 stderr = self.RunGsUtil(['cp', '-n', suri(key_uri), suri(bucket2_uri)], |
| 274 return_stderr=True) |
| 275 self.assertIn('Skipping existing item: %s' % |
| 276 suri(bucket2_uri, key_uri.object_name), stderr) |
| 277 |
| 278 @PerformsFileToObjectUpload |
| 279 def test_streaming(self): |
| 280 bucket_uri = self.CreateBucket() |
| 281 stderr = self.RunGsUtil(['cp', '-', '%s' % suri(bucket_uri, 'foo')], |
| 282 stdin='bar', return_stderr=True) |
| 283 self.assertIn('Copying from <STDIN>', stderr) |
| 284 key_uri = bucket_uri.clone_replace_name('foo') |
| 285 self.assertEqual(key_uri.get_contents_as_string(), 'bar') |
| 286 |
| 287 def test_streaming_multiple_arguments(self): |
| 288 bucket_uri = self.CreateBucket() |
| 289 stderr = self.RunGsUtil(['cp', '-', '-', suri(bucket_uri)], |
| 290 stdin='bar', return_stderr=True, expected_status=1) |
| 291 self.assertIn('Multiple URL strings are not supported with streaming', |
| 292 stderr) |
| 293 |
| 294 # TODO: Implement a way to test both with and without using magic file. |
| 295 |
| 296 @PerformsFileToObjectUpload |
| 297 def test_detect_content_type(self): |
| 298 """Tests local detection of content type.""" |
| 299 bucket_uri = self.CreateBucket() |
| 300 dsturi = suri(bucket_uri, 'foo') |
| 301 |
| 302 self.RunGsUtil(['cp', self._get_test_file('test.mp3'), dsturi]) |
| 303 |
| 304 # Use @Retry as hedge against bucket listing eventual consistency. |
| 305 @Retry(AssertionError, tries=3, timeout_secs=1) |
| 306 def _Check1(): |
| 307 stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True) |
| 308 if IS_WINDOWS: |
| 309 self.assertTrue( |
| 310 re.search(r'Content-Type:\s+audio/x-mpg', stdout) or |
| 311 re.search(r'Content-Type:\s+audio/mpeg', stdout)) |
| 312 else: |
| 313 self.assertRegexpMatches(stdout, r'Content-Type:\s+audio/mpeg') |
| 314 _Check1() |
| 315 |
| 316 self.RunGsUtil(['cp', self._get_test_file('test.gif'), dsturi]) |
| 317 |
| 318 # Use @Retry as hedge against bucket listing eventual consistency. |
| 319 @Retry(AssertionError, tries=3, timeout_secs=1) |
| 320 def _Check2(): |
| 321 stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True) |
| 322 self.assertRegexpMatches(stdout, r'Content-Type:\s+image/gif') |
| 323 _Check2() |
| 324 |
| 325 def test_content_type_override_default(self): |
| 326 """Tests overriding content type with the default value.""" |
| 327 bucket_uri = self.CreateBucket() |
| 328 dsturi = suri(bucket_uri, 'foo') |
| 329 |
| 330 self.RunGsUtil(['-h', 'Content-Type:', 'cp', |
| 331 self._get_test_file('test.mp3'), dsturi]) |
| 332 |
| 333 # Use @Retry as hedge against bucket listing eventual consistency. |
| 334 @Retry(AssertionError, tries=3, timeout_secs=1) |
| 335 def _Check1(): |
| 336 stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True) |
| 337 self.assertRegexpMatches(stdout, |
| 338 r'Content-Type:\s+application/octet-stream') |
| 339 _Check1() |
| 340 |
| 341 self.RunGsUtil(['-h', 'Content-Type:', 'cp', |
| 342 self._get_test_file('test.gif'), dsturi]) |
| 343 |
| 344 # Use @Retry as hedge against bucket listing eventual consistency. |
| 345 @Retry(AssertionError, tries=3, timeout_secs=1) |
| 346 def _Check2(): |
| 347 stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True) |
| 348 self.assertRegexpMatches(stdout, |
| 349 r'Content-Type:\s+application/octet-stream') |
| 350 _Check2() |
| 351 |
| 352 def test_content_type_override(self): |
| 353 """Tests overriding content type with a value.""" |
| 354 bucket_uri = self.CreateBucket() |
| 355 dsturi = suri(bucket_uri, 'foo') |
| 356 |
| 357 self.RunGsUtil(['-h', 'Content-Type:text/plain', 'cp', |
| 358 self._get_test_file('test.mp3'), dsturi]) |
| 359 |
| 360 # Use @Retry as hedge against bucket listing eventual consistency. |
| 361 @Retry(AssertionError, tries=3, timeout_secs=1) |
| 362 def _Check1(): |
| 363 stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True) |
| 364 self.assertRegexpMatches(stdout, r'Content-Type:\s+text/plain') |
| 365 _Check1() |
| 366 |
| 367 self.RunGsUtil(['-h', 'Content-Type:text/plain', 'cp', |
| 368 self._get_test_file('test.gif'), dsturi]) |
| 369 |
| 370 # Use @Retry as hedge against bucket listing eventual consistency. |
| 371 @Retry(AssertionError, tries=3, timeout_secs=1) |
| 372 def _Check2(): |
| 373 stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True) |
| 374 self.assertRegexpMatches(stdout, r'Content-Type:\s+text/plain') |
| 375 _Check2() |
| 376 |
| 377 @unittest.skipIf(IS_WINDOWS, 'magicfile is not available on Windows.') |
| 378 @PerformsFileToObjectUpload |
| 379 def test_magicfile_override(self): |
| 380 """Tests content type override with magicfile value.""" |
| 381 bucket_uri = self.CreateBucket() |
| 382 dsturi = suri(bucket_uri, 'foo') |
| 383 fpath = self.CreateTempFile(contents='foo/bar\n') |
| 384 self.RunGsUtil(['cp', fpath, dsturi]) |
| 385 |
| 386 # Use @Retry as hedge against bucket listing eventual consistency. |
| 387 @Retry(AssertionError, tries=3, timeout_secs=1) |
| 388 def _Check1(): |
| 389 stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True) |
| 390 use_magicfile = boto.config.getbool('GSUtil', 'use_magicfile', False) |
| 391 content_type = ('text/plain' if use_magicfile |
| 392 else 'application/octet-stream') |
| 393 self.assertRegexpMatches(stdout, r'Content-Type:\s+%s' % content_type) |
| 394 _Check1() |
| 395 |
| 396 @PerformsFileToObjectUpload |
| 397 def test_content_type_mismatches(self): |
| 398 """Tests overriding content type when it does not match the file type.""" |
| 399 bucket_uri = self.CreateBucket() |
| 400 dsturi = suri(bucket_uri, 'foo') |
| 401 fpath = self.CreateTempFile(contents='foo/bar\n') |
| 402 |
| 403 self.RunGsUtil(['-h', 'Content-Type:image/gif', 'cp', |
| 404 self._get_test_file('test.mp3'), dsturi]) |
| 405 |
| 406 # Use @Retry as hedge against bucket listing eventual consistency. |
| 407 @Retry(AssertionError, tries=3, timeout_secs=1) |
| 408 def _Check1(): |
| 409 stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True) |
| 410 self.assertRegexpMatches(stdout, r'Content-Type:\s+image/gif') |
| 411 _Check1() |
| 412 |
| 413 self.RunGsUtil(['-h', 'Content-Type:image/gif', 'cp', |
| 414 self._get_test_file('test.gif'), dsturi]) |
| 415 |
| 416 # Use @Retry as hedge against bucket listing eventual consistency. |
| 417 @Retry(AssertionError, tries=3, timeout_secs=1) |
| 418 def _Check2(): |
| 419 stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True) |
| 420 self.assertRegexpMatches(stdout, r'Content-Type:\s+image/gif') |
| 421 _Check2() |
| 422 |
| 423 self.RunGsUtil(['-h', 'Content-Type:image/gif', 'cp', fpath, dsturi]) |
| 424 |
| 425 # Use @Retry as hedge against bucket listing eventual consistency. |
| 426 @Retry(AssertionError, tries=3, timeout_secs=1) |
| 427 def _Check3(): |
| 428 stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True) |
| 429 self.assertRegexpMatches(stdout, r'Content-Type:\s+image/gif') |
| 430 _Check3() |
| 431 |
| 432 @PerformsFileToObjectUpload |
| 433 def test_content_type_header_case_insensitive(self): |
| 434 """Tests that content type header is treated with case insensitivity.""" |
| 435 bucket_uri = self.CreateBucket() |
| 436 dsturi = suri(bucket_uri, 'foo') |
| 437 fpath = self._get_test_file('test.gif') |
| 438 |
| 439 self.RunGsUtil(['-h', 'content-Type:text/plain', 'cp', |
| 440 fpath, dsturi]) |
| 441 |
| 442 # Use @Retry as hedge against bucket listing eventual consistency. |
| 443 @Retry(AssertionError, tries=3, timeout_secs=1) |
| 444 def _Check1(): |
| 445 stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True) |
| 446 self.assertRegexpMatches(stdout, r'Content-Type:\s+text/plain') |
| 447 self.assertNotRegexpMatches(stdout, r'image/gif') |
| 448 _Check1() |
| 449 |
| 450 self.RunGsUtil(['-h', 'CONTENT-TYPE:image/gif', |
| 451 '-h', 'content-type:image/gif', |
| 452 'cp', fpath, dsturi]) |
| 453 |
| 454 # Use @Retry as hedge against bucket listing eventual consistency. |
| 455 @Retry(AssertionError, tries=3, timeout_secs=1) |
| 456 def _Check2(): |
| 457 stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True) |
| 458 self.assertRegexpMatches(stdout, r'Content-Type:\s+image/gif') |
| 459 self.assertNotRegexpMatches(stdout, r'image/gif,\s*image/gif') |
| 460 _Check2() |
| 461 |
| 462 @PerformsFileToObjectUpload |
| 463 def test_other_headers(self): |
| 464 """Tests that non-content-type headers are applied successfully on copy.""" |
| 465 bucket_uri = self.CreateBucket() |
| 466 dst_uri = suri(bucket_uri, 'foo') |
| 467 fpath = self._get_test_file('test.gif') |
| 468 |
| 469 self.RunGsUtil(['-h', 'Cache-Control:public,max-age=12', |
| 470 '-h', 'x-%s-meta-1:abcd' % self.provider_custom_meta, 'cp', |
| 471 fpath, dst_uri]) |
| 472 |
| 473 stdout = self.RunGsUtil(['ls', '-L', dst_uri], return_stdout=True) |
| 474 self.assertRegexpMatches(stdout, r'Cache-Control\s*:\s*public,max-age=12') |
| 475 self.assertRegexpMatches(stdout, r'Metadata:\s*1:\s*abcd') |
| 476 |
| 477 dst_uri2 = suri(bucket_uri, 'bar') |
| 478 self.RunGsUtil(['cp', dst_uri, dst_uri2]) |
| 479 # Ensure metadata was preserved across copy. |
| 480 stdout = self.RunGsUtil(['ls', '-L', dst_uri2], return_stdout=True) |
| 481 self.assertRegexpMatches(stdout, r'Cache-Control\s*:\s*public,max-age=12') |
| 482 self.assertRegexpMatches(stdout, r'Metadata:\s*1:\s*abcd') |
| 483 |
| 484 @PerformsFileToObjectUpload |
| 485 def test_versioning(self): |
| 486 """Tests copy with versioning.""" |
| 487 bucket_uri = self.CreateVersionedBucket() |
| 488 k1_uri = self.CreateObject(bucket_uri=bucket_uri, contents='data2') |
| 489 k2_uri = self.CreateObject(bucket_uri=bucket_uri, contents='data1') |
| 490 g1 = urigen(k2_uri) |
| 491 self.RunGsUtil(['cp', suri(k1_uri), suri(k2_uri)]) |
| 492 k2_uri = bucket_uri.clone_replace_name(k2_uri.object_name) |
| 493 k2_uri = bucket_uri.clone_replace_key(k2_uri.get_key()) |
| 494 g2 = urigen(k2_uri) |
| 495 k2_uri.set_contents_from_string('data3') |
| 496 g3 = urigen(k2_uri) |
| 497 |
| 498 fpath = self.CreateTempFile() |
| 499 # Check to make sure current version is data3. |
| 500 self.RunGsUtil(['cp', k2_uri.versionless_uri, fpath]) |
| 501 with open(fpath, 'r') as f: |
| 502 self.assertEqual(f.read(), 'data3') |
| 503 |
| 504 # Check contents of all three versions |
| 505 self.RunGsUtil(['cp', '%s#%s' % (k2_uri.versionless_uri, g1), fpath]) |
| 506 with open(fpath, 'r') as f: |
| 507 self.assertEqual(f.read(), 'data1') |
| 508 self.RunGsUtil(['cp', '%s#%s' % (k2_uri.versionless_uri, g2), fpath]) |
| 509 with open(fpath, 'r') as f: |
| 510 self.assertEqual(f.read(), 'data2') |
| 511 self.RunGsUtil(['cp', '%s#%s' % (k2_uri.versionless_uri, g3), fpath]) |
| 512 with open(fpath, 'r') as f: |
| 513 self.assertEqual(f.read(), 'data3') |
| 514 |
| 515 # Copy first version to current and verify. |
| 516 self.RunGsUtil(['cp', '%s#%s' % (k2_uri.versionless_uri, g1), |
| 517 k2_uri.versionless_uri]) |
| 518 self.RunGsUtil(['cp', k2_uri.versionless_uri, fpath]) |
| 519 with open(fpath, 'r') as f: |
| 520 self.assertEqual(f.read(), 'data1') |
| 521 |
| 522 # Attempt to specify a version-specific URI for destination. |
| 523 stderr = self.RunGsUtil(['cp', fpath, k2_uri.uri], return_stderr=True, |
| 524 expected_status=1) |
| 525 self.assertIn('cannot be the destination for gsutil cp', stderr) |
| 526 |
| 527 @SkipForS3('S3 lists versioned objects in reverse timestamp order.') |
| 528 def test_recursive_copying_versioned_bucket(self): |
| 529 """Tests that cp -R with versioned buckets copies all versions in order.""" |
| 530 bucket1_uri = self.CreateVersionedBucket() |
| 531 bucket2_uri = self.CreateVersionedBucket() |
| 532 |
| 533 # Write two versions of an object to the bucket1. |
| 534 self.CreateObject(bucket_uri=bucket1_uri, object_name='k', contents='data0') |
| 535 self.CreateObject(bucket_uri=bucket1_uri, object_name='k', |
| 536 contents='longer_data1') |
| 537 |
| 538 self.AssertNObjectsInBucket(bucket1_uri, 2, versioned=True) |
| 539 self.AssertNObjectsInBucket(bucket2_uri, 0, versioned=True) |
| 540 |
| 541 # Recursively copy to second versioned bucket. |
| 542 self.RunGsUtil(['cp', '-R', suri(bucket1_uri, '*'), suri(bucket2_uri)]) |
| 543 |
| 544 # Use @Retry as hedge against bucket listing eventual consistency. |
| 545 @Retry(AssertionError, tries=3, timeout_secs=1) |
| 546 def _Check2(): |
| 547 """Validates the results of the cp -R.""" |
| 548 listing1 = self.RunGsUtil(['ls', '-la', suri(bucket1_uri)], |
| 549 return_stdout=True).split('\n') |
| 550 listing2 = self.RunGsUtil(['ls', '-la', suri(bucket2_uri)], |
| 551 return_stdout=True).split('\n') |
| 552 # 2 lines of listing output, 1 summary line, 1 empty line from \n split. |
| 553 self.assertEquals(len(listing1), 4) |
| 554 self.assertEquals(len(listing2), 4) |
| 555 |
| 556 # First object in each bucket should match in size and version-less name. |
| 557 size1, _, uri_str1, _ = listing1[0].split() |
| 558 self.assertEquals(size1, str(len('data0'))) |
| 559 self.assertEquals(storage_uri(uri_str1).object_name, 'k') |
| 560 size2, _, uri_str2, _ = listing2[0].split() |
| 561 self.assertEquals(size2, str(len('data0'))) |
| 562 self.assertEquals(storage_uri(uri_str2).object_name, 'k') |
| 563 |
| 564 # Similarly for second object in each bucket. |
| 565 size1, _, uri_str1, _ = listing1[1].split() |
| 566 self.assertEquals(size1, str(len('longer_data1'))) |
| 567 self.assertEquals(storage_uri(uri_str1).object_name, 'k') |
| 568 size2, _, uri_str2, _ = listing2[1].split() |
| 569 self.assertEquals(size2, str(len('longer_data1'))) |
| 570 self.assertEquals(storage_uri(uri_str2).object_name, 'k') |
| 571 _Check2() |
| 572 |
| 573 @PerformsFileToObjectUpload |
| 574 @SkipForS3('Preconditions not supported for S3.') |
| 575 def test_cp_generation_zero_match(self): |
| 576 """Tests that cp handles an object-not-exists precondition header.""" |
| 577 bucket_uri = self.CreateBucket() |
| 578 fpath1 = self.CreateTempFile(contents='data1') |
| 579 # Match 0 means only write the object if it doesn't already exist. |
| 580 gen_match_header = 'x-goog-if-generation-match:0' |
| 581 |
| 582 # First copy should succeed. |
| 583 # TODO: This can fail (rarely) if the server returns a 5xx but actually |
| 584 # commits the bytes. If we add restarts on small uploads, handle this |
| 585 # case. |
| 586 self.RunGsUtil(['-h', gen_match_header, 'cp', fpath1, suri(bucket_uri)]) |
| 587 |
| 588 # Second copy should fail with a precondition error. |
| 589 stderr = self.RunGsUtil(['-h', gen_match_header, 'cp', fpath1, |
| 590 suri(bucket_uri)], |
| 591 return_stderr=True, expected_status=1) |
| 592 self.assertIn('PreconditionException', stderr) |
| 593 |
| 594 @PerformsFileToObjectUpload |
| 595 @SkipForS3('Preconditions not supported for S3.') |
| 596 def test_cp_v_generation_match(self): |
| 597 """Tests that cp -v option handles the if-generation-match header.""" |
| 598 bucket_uri = self.CreateVersionedBucket() |
| 599 k1_uri = self.CreateObject(bucket_uri=bucket_uri, contents='data1') |
| 600 g1 = k1_uri.generation |
| 601 |
| 602 tmpdir = self.CreateTempDir() |
| 603 fpath1 = self.CreateTempFile(tmpdir=tmpdir, contents='data2') |
| 604 |
| 605 gen_match_header = 'x-goog-if-generation-match:%s' % g1 |
| 606 # First copy should succeed. |
| 607 self.RunGsUtil(['-h', gen_match_header, 'cp', fpath1, suri(k1_uri)]) |
| 608 |
| 609 # Second copy should fail the precondition. |
| 610 stderr = self.RunGsUtil(['-h', gen_match_header, 'cp', fpath1, |
| 611 suri(k1_uri)], |
| 612 return_stderr=True, expected_status=1) |
| 613 |
| 614 self.assertIn('PreconditionException', stderr) |
| 615 |
| 616 # Specifiying a generation with -n should fail before the request hits the |
| 617 # server. |
| 618 stderr = self.RunGsUtil(['-h', gen_match_header, 'cp', '-n', fpath1, |
| 619 suri(k1_uri)], |
| 620 return_stderr=True, expected_status=1) |
| 621 |
| 622 self.assertIn('ArgumentException', stderr) |
| 623 self.assertIn('Specifying x-goog-if-generation-match is not supported ' |
| 624 'with cp -n', stderr) |
| 625 |
| 626 @PerformsFileToObjectUpload |
| 627 def test_cp_nv(self): |
| 628 """Tests that cp -nv works when skipping existing file.""" |
| 629 bucket_uri = self.CreateVersionedBucket() |
| 630 k1_uri = self.CreateObject(bucket_uri=bucket_uri, contents='data1') |
| 631 |
| 632 tmpdir = self.CreateTempDir() |
| 633 fpath1 = self.CreateTempFile(tmpdir=tmpdir, contents='data2') |
| 634 |
| 635 # First copy should succeed. |
| 636 self.RunGsUtil(['cp', '-nv', fpath1, suri(k1_uri)]) |
| 637 |
| 638 # Second copy should skip copying. |
| 639 stderr = self.RunGsUtil(['cp', '-nv', fpath1, suri(k1_uri)], |
| 640 return_stderr=True) |
| 641 self.assertIn('Skipping existing item:', stderr) |
| 642 |
| 643 @PerformsFileToObjectUpload |
| 644 @SkipForS3('S3 lists versioned objects in reverse timestamp order.') |
| 645 def test_cp_v_option(self): |
| 646 """"Tests that cp -v returns the created object's version-specific URI.""" |
| 647 bucket_uri = self.CreateVersionedBucket() |
| 648 k1_uri = self.CreateObject(bucket_uri=bucket_uri, contents='data1') |
| 649 k2_uri = self.CreateObject(bucket_uri=bucket_uri, contents='data2') |
| 650 |
| 651 # Case 1: Upload file to object using one-shot PUT. |
| 652 tmpdir = self.CreateTempDir() |
| 653 fpath1 = self.CreateTempFile(tmpdir=tmpdir, contents='data1') |
| 654 self._run_cp_minus_v_test('-v', fpath1, k2_uri.uri) |
| 655 |
| 656 # Case 2: Upload file to object using resumable upload. |
| 657 size_threshold = ONE_KIB |
| 658 boto_config_for_test = ('GSUtil', 'resumable_threshold', |
| 659 str(size_threshold)) |
| 660 with SetBotoConfigForTest([boto_config_for_test]): |
| 661 file_as_string = os.urandom(size_threshold) |
| 662 tmpdir = self.CreateTempDir() |
| 663 fpath1 = self.CreateTempFile(tmpdir=tmpdir, contents=file_as_string) |
| 664 self._run_cp_minus_v_test('-v', fpath1, k2_uri.uri) |
| 665 |
| 666 # Case 3: Upload stream to object. |
| 667 self._run_cp_minus_v_test('-v', '-', k2_uri.uri) |
| 668 |
| 669 # Case 4: Download object to file. For this case we just expect output of |
| 670 # gsutil cp -v to be the URI of the file. |
| 671 tmpdir = self.CreateTempDir() |
| 672 fpath1 = self.CreateTempFile(tmpdir=tmpdir) |
| 673 dst_uri = storage_uri(fpath1) |
| 674 stderr = self.RunGsUtil(['cp', '-v', suri(k1_uri), suri(dst_uri)], |
| 675 return_stderr=True) |
| 676 self.assertIn('Created: %s' % dst_uri.uri, stderr.split('\n')[-2]) |
| 677 |
| 678 # Case 5: Daisy-chain from object to object. |
| 679 self._run_cp_minus_v_test('-Dv', k1_uri.uri, k2_uri.uri) |
| 680 |
| 681 # Case 6: Copy object to object in-the-cloud. |
| 682 self._run_cp_minus_v_test('-v', k1_uri.uri, k2_uri.uri) |
| 683 |
| 684 def _run_cp_minus_v_test(self, opt, src_str, dst_str): |
| 685 """Runs cp -v with the options and validates the results.""" |
| 686 stderr = self.RunGsUtil(['cp', opt, src_str, dst_str], return_stderr=True) |
| 687 match = re.search(r'Created: (.*)\n', stderr) |
| 688 self.assertIsNotNone(match) |
| 689 created_uri = match.group(1) |
| 690 |
| 691 # Use @Retry as hedge against bucket listing eventual consistency. |
| 692 @Retry(AssertionError, tries=3, timeout_secs=1) |
| 693 def _Check1(): |
| 694 stdout = self.RunGsUtil(['ls', '-a', dst_str], return_stdout=True) |
| 695 lines = stdout.split('\n') |
| 696 # Final (most recent) object should match the "Created:" URI. This is |
| 697 # in second-to-last line (last line is '\n'). |
| 698 self.assertGreater(len(lines), 2) |
| 699 self.assertEqual(created_uri, lines[-2]) |
| 700 _Check1() |
| 701 |
| 702 @PerformsFileToObjectUpload |
| 703 def test_stdin_args(self): |
| 704 """Tests cp with the -I option.""" |
| 705 tmpdir = self.CreateTempDir() |
| 706 fpath1 = self.CreateTempFile(tmpdir=tmpdir, contents='data1') |
| 707 fpath2 = self.CreateTempFile(tmpdir=tmpdir, contents='data2') |
| 708 bucket_uri = self.CreateBucket() |
| 709 self.RunGsUtil(['cp', '-I', suri(bucket_uri)], |
| 710 stdin='\n'.join((fpath1, fpath2))) |
| 711 |
| 712 # Use @Retry as hedge against bucket listing eventual consistency. |
| 713 @Retry(AssertionError, tries=3, timeout_secs=1) |
| 714 def _Check1(): |
| 715 stdout = self.RunGsUtil(['ls', suri(bucket_uri)], return_stdout=True) |
| 716 self.assertIn(os.path.basename(fpath1), stdout) |
| 717 self.assertIn(os.path.basename(fpath2), stdout) |
| 718 self.assertNumLines(stdout, 2) |
| 719 _Check1() |
| 720 |
| 721 def test_cross_storage_class_cloud_cp(self): |
| 722 bucket1_uri = self.CreateBucket(storage_class='STANDARD') |
| 723 bucket2_uri = self.CreateBucket( |
| 724 storage_class='DURABLE_REDUCED_AVAILABILITY') |
| 725 key_uri = self.CreateObject(bucket_uri=bucket1_uri, contents='foo') |
| 726 # Server now allows copy-in-the-cloud across storage classes. |
| 727 self.RunGsUtil(['cp', suri(key_uri), suri(bucket2_uri)]) |
| 728 |
| 729 @unittest.skipUnless(HAS_S3_CREDS, 'Test requires both S3 and GS credentials') |
| 730 def test_cross_provider_cp(self): |
| 731 s3_bucket = self.CreateBucket(provider='s3') |
| 732 gs_bucket = self.CreateBucket(provider='gs') |
| 733 s3_key = self.CreateObject(bucket_uri=s3_bucket, contents='foo') |
| 734 gs_key = self.CreateObject(bucket_uri=gs_bucket, contents='bar') |
| 735 self.RunGsUtil(['cp', suri(s3_key), suri(gs_bucket)]) |
| 736 self.RunGsUtil(['cp', suri(gs_key), suri(s3_bucket)]) |
| 737 |
| 738 @unittest.skipUnless(HAS_S3_CREDS, 'Test requires both S3 and GS credentials') |
| 739 @unittest.skip('This test performs a large copy but remains here for ' |
| 740 'debugging purposes.') |
| 741 def test_cross_provider_large_cp(self): |
| 742 s3_bucket = self.CreateBucket(provider='s3') |
| 743 gs_bucket = self.CreateBucket(provider='gs') |
| 744 s3_key = self.CreateObject(bucket_uri=s3_bucket, contents='f'*1024*1024) |
| 745 gs_key = self.CreateObject(bucket_uri=gs_bucket, contents='b'*1024*1024) |
| 746 self.RunGsUtil(['cp', suri(s3_key), suri(gs_bucket)]) |
| 747 self.RunGsUtil(['cp', suri(gs_key), suri(s3_bucket)]) |
| 748 with SetBotoConfigForTest([ |
| 749 ('GSUtil', 'resumable_threshold', str(ONE_KIB)), |
| 750 ('GSUtil', 'json_resumable_chunk_size', str(ONE_KIB * 256))]): |
| 751 # Ensure copy also works across json upload chunk boundaries. |
| 752 self.RunGsUtil(['cp', suri(s3_key), suri(gs_bucket)]) |
| 753 |
| 754 @unittest.skip('This test is slow due to creating many objects, ' |
| 755 'but remains here for debugging purposes.') |
| 756 def test_daisy_chain_cp_file_sizes(self): |
| 757 """Ensure daisy chain cp works with a wide of file sizes.""" |
| 758 bucket_uri = self.CreateBucket() |
| 759 bucket2_uri = self.CreateBucket() |
| 760 exponent_cap = 28 # Up to 256 MiB in size. |
| 761 for i in range(exponent_cap): |
| 762 one_byte_smaller = 2**i - 1 |
| 763 normal = 2**i |
| 764 one_byte_larger = 2**i + 1 |
| 765 self.CreateObject(bucket_uri=bucket_uri, contents='a'*one_byte_smaller) |
| 766 self.CreateObject(bucket_uri=bucket_uri, contents='b'*normal) |
| 767 self.CreateObject(bucket_uri=bucket_uri, contents='c'*one_byte_larger) |
| 768 |
| 769 self.AssertNObjectsInBucket(bucket_uri, exponent_cap*3) |
| 770 self.RunGsUtil(['-m', 'cp', '-D', suri(bucket_uri, '**'), |
| 771 suri(bucket2_uri)]) |
| 772 |
| 773 self.AssertNObjectsInBucket(bucket2_uri, exponent_cap*3) |
| 774 |
| 775 def test_daisy_chain_cp(self): |
| 776 """Tests cp with the -D option.""" |
| 777 bucket1_uri = self.CreateBucket(storage_class='STANDARD') |
| 778 bucket2_uri = self.CreateBucket( |
| 779 storage_class='DURABLE_REDUCED_AVAILABILITY') |
| 780 key_uri = self.CreateObject(bucket_uri=bucket1_uri, contents='foo') |
| 781 # Set some headers on source object so we can verify that headers are |
| 782 # presereved by daisy-chain copy. |
| 783 self.RunGsUtil(['setmeta', '-h', 'Cache-Control:public,max-age=12', |
| 784 '-h', 'Content-Type:image/gif', |
| 785 '-h', 'x-%s-meta-1:abcd' % self.provider_custom_meta, |
| 786 suri(key_uri)]) |
| 787 # Set public-read (non-default) ACL so we can verify that cp -D -p works. |
| 788 self.RunGsUtil(['acl', 'set', 'public-read', suri(key_uri)]) |
| 789 acl_json = self.RunGsUtil(['acl', 'get', suri(key_uri)], return_stdout=True) |
| 790 # Perform daisy-chain copy and verify that source object headers and ACL |
| 791 # were preserved. Also specify -n option to test that gsutil correctly |
| 792 # removes the x-goog-if-generation-match:0 header that was set at uploading |
| 793 # time when updating the ACL. |
| 794 stderr = self.RunGsUtil(['cp', '-Dpn', suri(key_uri), suri(bucket2_uri)], |
| 795 return_stderr=True) |
| 796 self.assertNotIn('Copy-in-the-cloud disallowed', stderr) |
| 797 |
| 798 @Retry(AssertionError, tries=3, timeout_secs=1) |
| 799 def _Check(): |
| 800 uri = suri(bucket2_uri, key_uri.object_name) |
| 801 stdout = self.RunGsUtil(['ls', '-L', uri], return_stdout=True) |
| 802 self.assertRegexpMatches(stdout, r'Cache-Control:\s+public,max-age=12') |
| 803 self.assertRegexpMatches(stdout, r'Content-Type:\s+image/gif') |
| 804 self.assertRegexpMatches(stdout, r'Metadata:\s+1:\s+abcd') |
| 805 new_acl_json = self.RunGsUtil(['acl', 'get', uri], return_stdout=True) |
| 806 self.assertEqual(acl_json, new_acl_json) |
| 807 _Check() |
| 808 |
| 809 def test_daisy_chain_cp_download_failure(self): |
| 810 """Tests cp with the -D option when the download thread dies.""" |
| 811 bucket1_uri = self.CreateBucket() |
| 812 bucket2_uri = self.CreateBucket() |
| 813 key_uri = self.CreateObject(bucket_uri=bucket1_uri, |
| 814 contents='a' * self.halt_size) |
| 815 boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB)) |
| 816 test_callback_file = self.CreateTempFile( |
| 817 contents=pickle.dumps(_HaltingCopyCallbackHandler(False, 5))) |
| 818 with SetBotoConfigForTest([boto_config_for_test]): |
| 819 stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file, |
| 820 '-D', suri(key_uri), suri(bucket2_uri)], |
| 821 expected_status=1, return_stderr=True) |
| 822 # Should have two exception traces; one from the download thread and |
| 823 # one from the upload thread. |
| 824 self.assertEqual(stderr.count( |
| 825 'ResumableDownloadException: Artifically halting download'), 2) |
| 826 |
| 827 def test_canned_acl_cp(self): |
| 828 """Tests copying with a canned ACL.""" |
| 829 bucket1_uri = self.CreateBucket() |
| 830 bucket2_uri = self.CreateBucket() |
| 831 key_uri = self.CreateObject(bucket_uri=bucket1_uri, contents='foo') |
| 832 self.RunGsUtil(['cp', '-a', 'public-read', suri(key_uri), |
| 833 suri(bucket2_uri)]) |
| 834 # Set public-read on the original key after the copy so we can compare |
| 835 # the ACLs. |
| 836 self.RunGsUtil(['acl', 'set', 'public-read', suri(key_uri)]) |
| 837 public_read_acl = self.RunGsUtil(['acl', 'get', suri(key_uri)], |
| 838 return_stdout=True) |
| 839 |
| 840 @Retry(AssertionError, tries=3, timeout_secs=1) |
| 841 def _Check(): |
| 842 uri = suri(bucket2_uri, key_uri.object_name) |
| 843 new_acl_json = self.RunGsUtil(['acl', 'get', uri], return_stdout=True) |
| 844 self.assertEqual(public_read_acl, new_acl_json) |
| 845 _Check() |
| 846 |
| 847 @PerformsFileToObjectUpload |
| 848 def test_canned_acl_upload(self): |
| 849 """Tests uploading a file with a canned ACL.""" |
| 850 bucket1_uri = self.CreateBucket() |
| 851 key_uri = self.CreateObject(bucket_uri=bucket1_uri, contents='foo') |
| 852 # Set public-read on the object so we can compare the ACLs. |
| 853 self.RunGsUtil(['acl', 'set', 'public-read', suri(key_uri)]) |
| 854 public_read_acl = self.RunGsUtil(['acl', 'get', suri(key_uri)], |
| 855 return_stdout=True) |
| 856 |
| 857 file_name = 'bar' |
| 858 fpath = self.CreateTempFile(file_name=file_name, contents='foo') |
| 859 self.RunGsUtil(['cp', '-a', 'public-read', fpath, suri(bucket1_uri)]) |
| 860 new_acl_json = self.RunGsUtil(['acl', 'get', suri(bucket1_uri, file_name)], |
| 861 return_stdout=True) |
| 862 self.assertEqual(public_read_acl, new_acl_json) |
| 863 |
| 864 resumable_size = ONE_KIB |
| 865 boto_config_for_test = ('GSUtil', 'resumable_threshold', |
| 866 str(resumable_size)) |
| 867 with SetBotoConfigForTest([boto_config_for_test]): |
| 868 resumable_file_name = 'resumable_bar' |
| 869 resumable_contents = os.urandom(resumable_size) |
| 870 resumable_fpath = self.CreateTempFile( |
| 871 file_name=resumable_file_name, contents=resumable_contents) |
| 872 self.RunGsUtil(['cp', '-a', 'public-read', resumable_fpath, |
| 873 suri(bucket1_uri)]) |
| 874 new_resumable_acl_json = self.RunGsUtil( |
| 875 ['acl', 'get', suri(bucket1_uri, resumable_file_name)], |
| 876 return_stdout=True) |
| 877 self.assertEqual(public_read_acl, new_resumable_acl_json) |
| 878 |
| 879 def test_cp_key_to_local_stream(self): |
| 880 bucket_uri = self.CreateBucket() |
| 881 contents = 'foo' |
| 882 key_uri = self.CreateObject(bucket_uri=bucket_uri, contents=contents) |
| 883 stdout = self.RunGsUtil(['cp', suri(key_uri), '-'], return_stdout=True) |
| 884 self.assertIn(contents, stdout) |
| 885 |
| 886 def test_cp_local_file_to_local_stream(self): |
| 887 contents = 'content' |
| 888 fpath = self.CreateTempFile(contents=contents) |
| 889 stdout = self.RunGsUtil(['cp', fpath, '-'], return_stdout=True) |
| 890 self.assertIn(contents, stdout) |
| 891 |
| 892 @PerformsFileToObjectUpload |
| 893 def test_cp_zero_byte_file(self): |
| 894 dst_bucket_uri = self.CreateBucket() |
| 895 src_dir = self.CreateTempDir() |
| 896 fpath = os.path.join(src_dir, 'zero_byte') |
| 897 with open(fpath, 'w') as unused_out_file: |
| 898 pass # Write a zero byte file |
| 899 self.RunGsUtil(['cp', fpath, suri(dst_bucket_uri)]) |
| 900 |
| 901 @Retry(AssertionError, tries=3, timeout_secs=1) |
| 902 def _Check1(): |
| 903 stdout = self.RunGsUtil(['ls', suri(dst_bucket_uri)], return_stdout=True) |
| 904 self.assertIn(os.path.basename(fpath), stdout) |
| 905 _Check1() |
| 906 |
| 907 download_path = os.path.join(src_dir, 'zero_byte_download') |
| 908 self.RunGsUtil(['cp', suri(dst_bucket_uri, 'zero_byte'), download_path]) |
| 909 self.assertTrue(os.stat(download_path)) |
| 910 |
| 911 def test_copy_bucket_to_bucket(self): |
| 912 """Tests that recursively copying from bucket to bucket. |
| 913 |
| 914 This should produce identically named objects (and not, in particular, |
| 915 destination objects named by the version-specific URI from source objects). |
| 916 """ |
| 917 src_bucket_uri = self.CreateVersionedBucket() |
| 918 dst_bucket_uri = self.CreateVersionedBucket() |
| 919 self.CreateObject(bucket_uri=src_bucket_uri, object_name='obj0', |
| 920 contents='abc') |
| 921 self.CreateObject(bucket_uri=src_bucket_uri, object_name='obj1', |
| 922 contents='def') |
| 923 |
| 924 # Use @Retry as hedge against bucket listing eventual consistency. |
| 925 @Retry(AssertionError, tries=3, timeout_secs=1) |
| 926 def _CopyAndCheck(): |
| 927 self.RunGsUtil(['cp', '-R', suri(src_bucket_uri), |
| 928 suri(dst_bucket_uri)]) |
| 929 stdout = self.RunGsUtil(['ls', '-R', dst_bucket_uri.uri], |
| 930 return_stdout=True) |
| 931 self.assertIn('%s%s/obj0\n' % (dst_bucket_uri, |
| 932 src_bucket_uri.bucket_name), stdout) |
| 933 self.assertIn('%s%s/obj1\n' % (dst_bucket_uri, |
| 934 src_bucket_uri.bucket_name), stdout) |
| 935 _CopyAndCheck() |
| 936 |
| 937 def test_copy_bucket_to_dir(self): |
| 938 """Tests recursively copying from bucket to a directory. |
| 939 |
| 940 This should produce identically named objects (and not, in particular, |
| 941 destination objects named by the version- specific URI from source objects). |
| 942 """ |
| 943 src_bucket_uri = self.CreateBucket() |
| 944 dst_dir = self.CreateTempDir() |
| 945 self.CreateObject(bucket_uri=src_bucket_uri, object_name='obj0', |
| 946 contents='abc') |
| 947 self.CreateObject(bucket_uri=src_bucket_uri, object_name='obj1', |
| 948 contents='def') |
| 949 |
| 950 # Use @Retry as hedge against bucket listing eventual consistency. |
| 951 @Retry(AssertionError, tries=3, timeout_secs=1) |
| 952 def _CopyAndCheck(): |
| 953 """Copies the bucket recursively and validates the results.""" |
| 954 self.RunGsUtil(['cp', '-R', suri(src_bucket_uri), dst_dir]) |
| 955 dir_list = [] |
| 956 for dirname, _, filenames in os.walk(dst_dir): |
| 957 for filename in filenames: |
| 958 dir_list.append(os.path.join(dirname, filename)) |
| 959 dir_list = sorted(dir_list) |
| 960 self.assertEqual(len(dir_list), 2) |
| 961 self.assertEqual(os.path.join(dst_dir, src_bucket_uri.bucket_name, |
| 962 'obj0'), dir_list[0]) |
| 963 self.assertEqual(os.path.join(dst_dir, src_bucket_uri.bucket_name, |
| 964 'obj1'), dir_list[1]) |
| 965 _CopyAndCheck() |
| 966 |
| 967 def test_recursive_download_with_leftover_dir_placeholder(self): |
| 968 """Tests that we correctly handle leftover dir placeholders.""" |
| 969 src_bucket_uri = self.CreateBucket() |
| 970 dst_dir = self.CreateTempDir() |
| 971 self.CreateObject(bucket_uri=src_bucket_uri, object_name='obj0', |
| 972 contents='abc') |
| 973 self.CreateObject(bucket_uri=src_bucket_uri, object_name='obj1', |
| 974 contents='def') |
| 975 |
| 976 # Create a placeholder like what can be left over by web GUI tools. |
| 977 key_uri = src_bucket_uri.clone_replace_name('/') |
| 978 key_uri.set_contents_from_string('') |
| 979 self.AssertNObjectsInBucket(src_bucket_uri, 3) |
| 980 |
| 981 stderr = self.RunGsUtil(['cp', '-R', suri(src_bucket_uri), dst_dir], |
| 982 return_stderr=True) |
| 983 self.assertIn('Skipping cloud sub-directory placeholder object', stderr) |
| 984 dir_list = [] |
| 985 for dirname, _, filenames in os.walk(dst_dir): |
| 986 for filename in filenames: |
| 987 dir_list.append(os.path.join(dirname, filename)) |
| 988 dir_list = sorted(dir_list) |
| 989 self.assertEqual(len(dir_list), 2) |
| 990 self.assertEqual(os.path.join(dst_dir, src_bucket_uri.bucket_name, |
| 991 'obj0'), dir_list[0]) |
| 992 self.assertEqual(os.path.join(dst_dir, src_bucket_uri.bucket_name, |
| 993 'obj1'), dir_list[1]) |
| 994 |
| 995 def test_copy_quiet(self): |
| 996 bucket_uri = self.CreateBucket() |
| 997 key_uri = self.CreateObject(bucket_uri=bucket_uri, contents='foo') |
| 998 stderr = self.RunGsUtil(['-q', 'cp', suri(key_uri), |
| 999 suri(bucket_uri.clone_replace_name('o2'))], |
| 1000 return_stderr=True) |
| 1001 self.assertEqual(stderr.count('Copying '), 0) |
| 1002 |
| 1003 def test_cp_md5_match(self): |
| 1004 """Tests that the uploaded object has the expected MD5. |
| 1005 |
| 1006 Note that while this does perform a file to object upload, MD5's are |
| 1007 not supported for composite objects so we don't use the decorator in this |
| 1008 case. |
| 1009 """ |
| 1010 bucket_uri = self.CreateBucket() |
| 1011 fpath = self.CreateTempFile(contents='bar') |
| 1012 with open(fpath, 'r') as f_in: |
| 1013 file_md5 = base64.encodestring(binascii.unhexlify( |
| 1014 CalculateMd5FromContents(f_in))).rstrip('\n') |
| 1015 self.RunGsUtil(['cp', fpath, suri(bucket_uri)]) |
| 1016 |
| 1017 # Use @Retry as hedge against bucket listing eventual consistency. |
| 1018 @Retry(AssertionError, tries=3, timeout_secs=1) |
| 1019 def _Check1(): |
| 1020 stdout = self.RunGsUtil(['ls', '-L', suri(bucket_uri)], |
| 1021 return_stdout=True) |
| 1022 self.assertRegexpMatches(stdout, |
| 1023 r'Hash\s+\(md5\):\s+%s' % re.escape(file_md5)) |
| 1024 _Check1() |
| 1025 |
| 1026 @unittest.skipIf(IS_WINDOWS, |
| 1027 'Unicode handling on Windows requires mods to site-packages') |
| 1028 @PerformsFileToObjectUpload |
| 1029 def test_cp_manifest_upload_unicode(self): |
| 1030 return self._ManifestUpload('foo-unicöde', 'bar-unicöde', |
| 1031 'manifest-unicöde') |
| 1032 |
| 1033 @PerformsFileToObjectUpload |
| 1034 def test_cp_manifest_upload(self): |
| 1035 """Tests uploading with a mnifest file.""" |
| 1036 return self._ManifestUpload('foo', 'bar', 'manifest') |
| 1037 |
| 1038 def _ManifestUpload(self, file_name, object_name, manifest_name): |
| 1039 """Tests uploading with a manifest file.""" |
| 1040 bucket_uri = self.CreateBucket() |
| 1041 dsturi = suri(bucket_uri, object_name) |
| 1042 |
| 1043 fpath = self.CreateTempFile(file_name=file_name, contents='bar') |
| 1044 logpath = self.CreateTempFile(file_name=manifest_name, contents='') |
| 1045 # Ensure the file is empty. |
| 1046 open(logpath, 'w').close() |
| 1047 self.RunGsUtil(['cp', '-L', logpath, fpath, dsturi]) |
| 1048 with open(logpath, 'r') as f: |
| 1049 lines = f.readlines() |
| 1050 self.assertEqual(len(lines), 2) |
| 1051 |
| 1052 expected_headers = ['Source', 'Destination', 'Start', 'End', 'Md5', |
| 1053 'UploadId', 'Source Size', 'Bytes Transferred', |
| 1054 'Result', 'Description'] |
| 1055 self.assertEqual(expected_headers, lines[0].strip().split(',')) |
| 1056 results = lines[1].strip().split(',') |
| 1057 self.assertEqual(results[0][:7], 'file://') # source |
| 1058 self.assertEqual(results[1][:5], '%s://' % |
| 1059 self.default_provider) # destination |
| 1060 date_format = '%Y-%m-%dT%H:%M:%S.%fZ' |
| 1061 start_date = datetime.datetime.strptime(results[2], date_format) |
| 1062 end_date = datetime.datetime.strptime(results[3], date_format) |
| 1063 self.assertEqual(end_date > start_date, True) |
| 1064 if self.RunGsUtil == testcase.GsUtilIntegrationTestCase.RunGsUtil: |
| 1065 # Check that we didn't do automatic parallel uploads - compose doesn't |
| 1066 # calculate the MD5 hash. Since RunGsUtil is overriden in |
| 1067 # TestCpParallelUploads to force parallel uploads, we can check which |
| 1068 # method was used. |
| 1069 self.assertEqual(results[4], 'rL0Y20zC+Fzt72VPzMSk2A==') # md5 |
| 1070 self.assertEqual(int(results[6]), 3) # Source Size |
| 1071 self.assertEqual(int(results[7]), 3) # Bytes Transferred |
| 1072 self.assertEqual(results[8], 'OK') # Result |
| 1073 |
| 1074 @PerformsFileToObjectUpload |
| 1075 def test_cp_manifest_download(self): |
| 1076 """Tests downloading with a manifest file.""" |
| 1077 key_uri = self.CreateObject(contents='foo') |
| 1078 fpath = self.CreateTempFile(contents='') |
| 1079 logpath = self.CreateTempFile(contents='') |
| 1080 # Ensure the file is empty. |
| 1081 open(logpath, 'w').close() |
| 1082 self.RunGsUtil(['cp', '-L', logpath, suri(key_uri), fpath], |
| 1083 return_stdout=True) |
| 1084 with open(logpath, 'r') as f: |
| 1085 lines = f.readlines() |
| 1086 self.assertEqual(len(lines), 2) |
| 1087 |
| 1088 expected_headers = ['Source', 'Destination', 'Start', 'End', 'Md5', |
| 1089 'UploadId', 'Source Size', 'Bytes Transferred', |
| 1090 'Result', 'Description'] |
| 1091 self.assertEqual(expected_headers, lines[0].strip().split(',')) |
| 1092 results = lines[1].strip().split(',') |
| 1093 self.assertEqual(results[0][:5], '%s://' % |
| 1094 self.default_provider) # source |
| 1095 self.assertEqual(results[1][:7], 'file://') # destination |
| 1096 date_format = '%Y-%m-%dT%H:%M:%S.%fZ' |
| 1097 start_date = datetime.datetime.strptime(results[2], date_format) |
| 1098 end_date = datetime.datetime.strptime(results[3], date_format) |
| 1099 self.assertEqual(end_date > start_date, True) |
| 1100 self.assertEqual(results[4], 'rL0Y20zC+Fzt72VPzMSk2A==') # md5 |
| 1101 self.assertEqual(int(results[6]), 3) # Source Size |
| 1102 # Bytes transferred might be more than 3 if the file was gzipped, since |
| 1103 # the minimum gzip header is 10 bytes. |
| 1104 self.assertGreaterEqual(int(results[7]), 3) # Bytes Transferred |
| 1105 self.assertEqual(results[8], 'OK') # Result |
| 1106 |
| 1107 @PerformsFileToObjectUpload |
| 1108 def test_copy_unicode_non_ascii_filename(self): |
| 1109 key_uri = self.CreateObject(contents='foo') |
| 1110 # Make file large enough to cause a resumable upload (which hashes filename |
| 1111 # to construct tracker filename). |
| 1112 fpath = self.CreateTempFile(file_name=u'Аудиоархив', |
| 1113 contents='x' * 3 * 1024 * 1024) |
| 1114 fpath_bytes = fpath.encode(UTF8) |
| 1115 stderr = self.RunGsUtil(['cp', fpath_bytes, suri(key_uri)], |
| 1116 return_stderr=True) |
| 1117 self.assertIn('Copying file:', stderr) |
| 1118 |
| 1119 # Note: We originally one time implemented a test |
| 1120 # (test_copy_invalid_unicode_filename) that invalid unicode filenames were |
| 1121 # skipped, but it turns out os.walk() on MacOS doesn't have problems with |
| 1122 # such files (so, failed that test). Given that, we decided to remove the |
| 1123 # test. |
| 1124 |
| 1125 def test_gzip_upload_and_download(self): |
| 1126 bucket_uri = self.CreateBucket() |
| 1127 contents = 'x' * 10000 |
| 1128 tmpdir = self.CreateTempDir() |
| 1129 self.CreateTempFile(file_name='test.html', tmpdir=tmpdir, contents=contents) |
| 1130 self.CreateTempFile(file_name='test.js', tmpdir=tmpdir, contents=contents) |
| 1131 self.CreateTempFile(file_name='test.txt', tmpdir=tmpdir, contents=contents) |
| 1132 # Test that copying specifying only 2 of the 3 prefixes gzips the correct |
| 1133 # files, and test that including whitespace in the extension list works. |
| 1134 self.RunGsUtil(['cp', '-z', 'js, html', |
| 1135 os.path.join(tmpdir, 'test.*'), suri(bucket_uri)]) |
| 1136 self.AssertNObjectsInBucket(bucket_uri, 3) |
| 1137 uri1 = suri(bucket_uri, 'test.html') |
| 1138 uri2 = suri(bucket_uri, 'test.js') |
| 1139 uri3 = suri(bucket_uri, 'test.txt') |
| 1140 stdout = self.RunGsUtil(['stat', uri1], return_stdout=True) |
| 1141 self.assertRegexpMatches(stdout, r'Content-Encoding:\s+gzip') |
| 1142 stdout = self.RunGsUtil(['stat', uri2], return_stdout=True) |
| 1143 self.assertRegexpMatches(stdout, r'Content-Encoding:\s+gzip') |
| 1144 stdout = self.RunGsUtil(['stat', uri3], return_stdout=True) |
| 1145 self.assertNotRegexpMatches(stdout, r'Content-Encoding:\s+gzip') |
| 1146 fpath4 = self.CreateTempFile() |
| 1147 for uri in (uri1, uri2, uri3): |
| 1148 self.RunGsUtil(['cp', uri, suri(fpath4)]) |
| 1149 with open(fpath4, 'r') as f: |
| 1150 self.assertEqual(f.read(), contents) |
| 1151 |
| 1152 def test_upload_with_subdir_and_unexpanded_wildcard(self): |
| 1153 fpath1 = self.CreateTempFile(file_name=('tmp', 'x', 'y', 'z')) |
| 1154 bucket_uri = self.CreateBucket() |
| 1155 wildcard_uri = '%s*' % fpath1[:-5] |
| 1156 stderr = self.RunGsUtil(['cp', '-R', wildcard_uri, suri(bucket_uri)], |
| 1157 return_stderr=True) |
| 1158 self.assertIn('Copying file:', stderr) |
| 1159 self.AssertNObjectsInBucket(bucket_uri, 1) |
| 1160 |
| 1161 def test_cp_object_ending_with_slash(self): |
| 1162 """Tests that cp works with object names ending with slash.""" |
| 1163 tmpdir = self.CreateTempDir() |
| 1164 bucket_uri = self.CreateBucket() |
| 1165 self.CreateObject(bucket_uri=bucket_uri, |
| 1166 object_name='abc/', |
| 1167 contents='dir') |
| 1168 self.CreateObject(bucket_uri=bucket_uri, |
| 1169 object_name='abc/def', |
| 1170 contents='def') |
| 1171 self.AssertNObjectsInBucket(bucket_uri, 2) |
| 1172 self.RunGsUtil(['cp', '-R', suri(bucket_uri), tmpdir]) |
| 1173 # Check that files in the subdir got copied even though subdir object |
| 1174 # download was skipped. |
| 1175 with open(os.path.join(tmpdir, bucket_uri.bucket_name, 'abc', 'def')) as f: |
| 1176 self.assertEquals('def', '\n'.join(f.readlines())) |
| 1177 |
| 1178 def test_cp_without_read_access(self): |
| 1179 """Tests that cp fails without read access to the object.""" |
| 1180 # TODO: With 401's triggering retries in apitools, this test will take |
| 1181 # a long time. Ideally, make apitools accept a num_retries config for this |
| 1182 # until we stop retrying the 401's. |
| 1183 bucket_uri = self.CreateBucket() |
| 1184 object_uri = self.CreateObject(bucket_uri=bucket_uri, contents='foo') |
| 1185 |
| 1186 # Use @Retry as hedge against bucket listing eventual consistency. |
| 1187 self.AssertNObjectsInBucket(bucket_uri, 1) |
| 1188 |
| 1189 with self.SetAnonymousBotoCreds(): |
| 1190 stderr = self.RunGsUtil(['cp', suri(object_uri), 'foo'], |
| 1191 return_stderr=True, expected_status=1) |
| 1192 self.assertIn('AccessDenied', stderr) |
| 1193 |
| 1194 @unittest.skipIf(IS_WINDOWS, 'os.symlink() is not available on Windows.') |
| 1195 def test_cp_minus_e(self): |
| 1196 fpath_dir = self.CreateTempDir() |
| 1197 fpath1 = self.CreateTempFile(tmpdir=fpath_dir) |
| 1198 fpath2 = os.path.join(fpath_dir, 'cp_minus_e') |
| 1199 bucket_uri = self.CreateBucket() |
| 1200 os.symlink(fpath1, fpath2) |
| 1201 stderr = self.RunGsUtil( |
| 1202 ['cp', '-e', '%s%s*' % (fpath_dir, os.path.sep), |
| 1203 suri(bucket_uri, 'files')], |
| 1204 return_stderr=True) |
| 1205 self.assertIn('Copying file', stderr) |
| 1206 self.assertIn('Skipping symbolic link file', stderr) |
| 1207 |
| 1208 def test_cp_multithreaded_wildcard(self): |
| 1209 """Tests that cp -m works with a wildcard.""" |
| 1210 num_test_files = 5 |
| 1211 tmp_dir = self.CreateTempDir(test_files=num_test_files) |
| 1212 bucket_uri = self.CreateBucket() |
| 1213 wildcard_uri = '%s%s*' % (tmp_dir, os.sep) |
| 1214 self.RunGsUtil(['-m', 'cp', wildcard_uri, suri(bucket_uri)]) |
| 1215 self.AssertNObjectsInBucket(bucket_uri, num_test_files) |
| 1216 |
| 1217 def test_cp_duplicate_source_args(self): |
| 1218 """Tests that cp -m works when a source argument is provided twice.""" |
| 1219 object_contents = 'edge' |
| 1220 object_uri = self.CreateObject(object_name='foo', contents=object_contents) |
| 1221 tmp_dir = self.CreateTempDir() |
| 1222 self.RunGsUtil(['-m', 'cp', suri(object_uri), suri(object_uri), tmp_dir]) |
| 1223 with open(os.path.join(tmp_dir, 'foo'), 'r') as in_fp: |
| 1224 contents = in_fp.read() |
| 1225 # Contents should be not duplicated. |
| 1226 self.assertEqual(contents, object_contents) |
| 1227 |
| 1228 @SkipForS3('No resumable upload support for S3.') |
| 1229 def test_cp_resumable_upload_break(self): |
| 1230 """Tests that an upload can be resumed after a connection break.""" |
| 1231 bucket_uri = self.CreateBucket() |
| 1232 fpath = self.CreateTempFile(contents='a' * self.halt_size) |
| 1233 boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB)) |
| 1234 test_callback_file = self.CreateTempFile( |
| 1235 contents=pickle.dumps(_HaltingCopyCallbackHandler(True, 5))) |
| 1236 |
| 1237 with SetBotoConfigForTest([boto_config_for_test]): |
| 1238 stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file, |
| 1239 fpath, suri(bucket_uri)], |
| 1240 expected_status=1, return_stderr=True) |
| 1241 self.assertIn('Artifically halting upload', stderr) |
| 1242 stderr = self.RunGsUtil(['cp', fpath, suri(bucket_uri)], |
| 1243 return_stderr=True) |
| 1244 self.assertIn('Resuming upload', stderr) |
| 1245 |
| 1246 @SkipForS3('No resumable upload support for S3.') |
| 1247 def test_cp_resumable_upload_retry(self): |
| 1248 """Tests that a resumable upload completes with one retry.""" |
| 1249 bucket_uri = self.CreateBucket() |
| 1250 fpath = self.CreateTempFile(contents='a' * self.halt_size) |
| 1251 # TODO: Raising an httplib or socket error blocks bucket teardown |
| 1252 # in JSON for 60-120s on a multiprocessing lock acquire. Figure out why; |
| 1253 # until then, raise an apitools retryable exception. |
| 1254 if self.test_api == ApiSelector.XML: |
| 1255 test_callback_file = self.CreateTempFile( |
| 1256 contents=pickle.dumps(_ResumableUploadRetryHandler( |
| 1257 5, httplib.BadStatusLine, ('unused',)))) |
| 1258 else: |
| 1259 test_callback_file = self.CreateTempFile( |
| 1260 contents=pickle.dumps(_ResumableUploadRetryHandler( |
| 1261 5, apitools_exceptions.BadStatusCodeError, |
| 1262 ('unused', 'unused', 'unused')))) |
| 1263 boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB)) |
| 1264 with SetBotoConfigForTest([boto_config_for_test]): |
| 1265 stderr = self.RunGsUtil(['-D', 'cp', '--testcallbackfile', |
| 1266 test_callback_file, fpath, suri(bucket_uri)], |
| 1267 return_stderr=1) |
| 1268 if self.test_api == ApiSelector.XML: |
| 1269 self.assertIn('Got retryable failure', stderr) |
| 1270 else: |
| 1271 self.assertIn('Retrying', stderr) |
| 1272 |
| 1273 @SkipForS3('No resumable upload support for S3.') |
| 1274 def test_cp_resumable_streaming_upload_retry(self): |
| 1275 """Tests that a streaming resumable upload completes with one retry.""" |
| 1276 if self.test_api == ApiSelector.XML: |
| 1277 return unittest.skip('XML does not support resumable streaming uploads.') |
| 1278 bucket_uri = self.CreateBucket() |
| 1279 |
| 1280 test_callback_file = self.CreateTempFile( |
| 1281 contents=pickle.dumps(_ResumableUploadRetryHandler( |
| 1282 5, apitools_exceptions.BadStatusCodeError, |
| 1283 ('unused', 'unused', 'unused')))) |
| 1284 # Need to reduce the JSON chunk size since streaming uploads buffer a |
| 1285 # full chunk. |
| 1286 boto_configs_for_test = [('GSUtil', 'json_resumable_chunk_size', |
| 1287 str(256 * ONE_KIB)), |
| 1288 ('Boto', 'num_retries', '2')] |
| 1289 with SetBotoConfigForTest(boto_configs_for_test): |
| 1290 stderr = self.RunGsUtil( |
| 1291 ['-D', 'cp', '--testcallbackfile', test_callback_file, '-', |
| 1292 suri(bucket_uri, 'foo')], |
| 1293 stdin='a' * 512 * ONE_KIB, return_stderr=1) |
| 1294 self.assertIn('Retrying', stderr) |
| 1295 |
| 1296 @SkipForS3('No resumable upload support for S3.') |
| 1297 def test_cp_resumable_upload(self): |
| 1298 """Tests that a basic resumable upload completes successfully.""" |
| 1299 bucket_uri = self.CreateBucket() |
| 1300 fpath = self.CreateTempFile(contents='a' * self.halt_size) |
| 1301 boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB)) |
| 1302 with SetBotoConfigForTest([boto_config_for_test]): |
| 1303 self.RunGsUtil(['cp', fpath, suri(bucket_uri)]) |
| 1304 |
| 1305 @SkipForS3('No resumable upload support for S3.') |
| 1306 def test_resumable_upload_break_leaves_tracker(self): |
| 1307 """Tests that a tracker file is created with a resumable upload.""" |
| 1308 bucket_uri = self.CreateBucket() |
| 1309 fpath = self.CreateTempFile(file_name='foo', |
| 1310 contents='a' * self.halt_size) |
| 1311 boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB)) |
| 1312 with SetBotoConfigForTest([boto_config_for_test]): |
| 1313 tracker_filename = GetTrackerFilePath( |
| 1314 StorageUrlFromString(suri(bucket_uri, 'foo')), |
| 1315 TrackerFileType.UPLOAD, self.test_api) |
| 1316 test_callback_file = self.CreateTempFile( |
| 1317 contents=pickle.dumps(_HaltingCopyCallbackHandler(True, 5))) |
| 1318 try: |
| 1319 stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file, |
| 1320 fpath, suri(bucket_uri, 'foo')], |
| 1321 expected_status=1, return_stderr=True) |
| 1322 self.assertIn('Artifically halting upload', stderr) |
| 1323 self.assertTrue(os.path.exists(tracker_filename), |
| 1324 'Tracker file %s not present.' % tracker_filename) |
| 1325 finally: |
| 1326 if os.path.exists(tracker_filename): |
| 1327 os.unlink(tracker_filename) |
| 1328 |
| 1329 @SkipForS3('No resumable upload support for S3.') |
| 1330 def test_cp_resumable_upload_break_file_size_change(self): |
| 1331 """Tests a resumable upload where the uploaded file changes size. |
| 1332 |
| 1333 This should fail when we read the tracker data. |
| 1334 """ |
| 1335 bucket_uri = self.CreateBucket() |
| 1336 tmp_dir = self.CreateTempDir() |
| 1337 fpath = self.CreateTempFile(file_name='foo', tmpdir=tmp_dir, |
| 1338 contents='a' * self.halt_size) |
| 1339 test_callback_file = self.CreateTempFile( |
| 1340 contents=pickle.dumps(_HaltingCopyCallbackHandler(True, 5))) |
| 1341 |
| 1342 boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB)) |
| 1343 with SetBotoConfigForTest([boto_config_for_test]): |
| 1344 stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file, |
| 1345 fpath, suri(bucket_uri)], |
| 1346 expected_status=1, return_stderr=True) |
| 1347 self.assertIn('Artifically halting upload', stderr) |
| 1348 fpath = self.CreateTempFile(file_name='foo', tmpdir=tmp_dir, |
| 1349 contents='a' * self.halt_size * 2) |
| 1350 stderr = self.RunGsUtil(['cp', fpath, suri(bucket_uri)], |
| 1351 expected_status=1, return_stderr=True) |
| 1352 self.assertIn('ResumableUploadAbortException', stderr) |
| 1353 |
| 1354 @SkipForS3('No resumable upload support for S3.') |
| 1355 def test_cp_resumable_upload_break_file_content_change(self): |
| 1356 """Tests a resumable upload where the uploaded file changes content.""" |
| 1357 if self.test_api == ApiSelector.XML: |
| 1358 return unittest.skip( |
| 1359 'XML doesn\'t make separate HTTP calls at fixed-size boundaries for ' |
| 1360 'resumable uploads, so we can\'t guarantee that the server saves a ' |
| 1361 'specific part of the upload.') |
| 1362 bucket_uri = self.CreateBucket() |
| 1363 tmp_dir = self.CreateTempDir() |
| 1364 fpath = self.CreateTempFile(file_name='foo', tmpdir=tmp_dir, |
| 1365 contents='a' * ONE_KIB * 512) |
| 1366 test_callback_file = self.CreateTempFile( |
| 1367 contents=pickle.dumps(_HaltingCopyCallbackHandler(True, |
| 1368 int(ONE_KIB) * 384))) |
| 1369 resumable_threshold_for_test = ( |
| 1370 'GSUtil', 'resumable_threshold', str(ONE_KIB)) |
| 1371 resumable_chunk_size_for_test = ( |
| 1372 'GSUtil', 'json_resumable_chunk_size', str(ONE_KIB * 256)) |
| 1373 with SetBotoConfigForTest([resumable_threshold_for_test, |
| 1374 resumable_chunk_size_for_test]): |
| 1375 stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file, |
| 1376 fpath, suri(bucket_uri)], |
| 1377 expected_status=1, return_stderr=True) |
| 1378 self.assertIn('Artifically halting upload', stderr) |
| 1379 fpath = self.CreateTempFile(file_name='foo', tmpdir=tmp_dir, |
| 1380 contents='b' * ONE_KIB * 512) |
| 1381 stderr = self.RunGsUtil(['cp', fpath, suri(bucket_uri)], |
| 1382 expected_status=1, return_stderr=True) |
| 1383 self.assertIn('doesn\'t match cloud-supplied digest', stderr) |
| 1384 |
| 1385 @SkipForS3('No resumable upload support for S3.') |
| 1386 def test_cp_resumable_upload_break_file_smaller_size(self): |
| 1387 """Tests a resumable upload where the uploaded file changes content. |
| 1388 |
| 1389 This should fail hash validation. |
| 1390 """ |
| 1391 bucket_uri = self.CreateBucket() |
| 1392 tmp_dir = self.CreateTempDir() |
| 1393 fpath = self.CreateTempFile(file_name='foo', tmpdir=tmp_dir, |
| 1394 contents='a' * ONE_KIB * 512) |
| 1395 test_callback_file = self.CreateTempFile( |
| 1396 contents=pickle.dumps(_HaltingCopyCallbackHandler(True, |
| 1397 int(ONE_KIB) * 384))) |
| 1398 resumable_threshold_for_test = ( |
| 1399 'GSUtil', 'resumable_threshold', str(ONE_KIB)) |
| 1400 resumable_chunk_size_for_test = ( |
| 1401 'GSUtil', 'json_resumable_chunk_size', str(ONE_KIB * 256)) |
| 1402 with SetBotoConfigForTest([resumable_threshold_for_test, |
| 1403 resumable_chunk_size_for_test]): |
| 1404 stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file, |
| 1405 fpath, suri(bucket_uri)], |
| 1406 expected_status=1, return_stderr=True) |
| 1407 self.assertIn('Artifically halting upload', stderr) |
| 1408 fpath = self.CreateTempFile(file_name='foo', tmpdir=tmp_dir, |
| 1409 contents='a' * ONE_KIB) |
| 1410 stderr = self.RunGsUtil(['cp', fpath, suri(bucket_uri)], |
| 1411 expected_status=1, return_stderr=True) |
| 1412 self.assertIn('ResumableUploadAbortException', stderr) |
| 1413 |
| 1414 # This temporarily changes the tracker directory to unwritable which |
| 1415 # interferes with any parallel running tests that use the tracker directory. |
| 1416 @NotParallelizable |
| 1417 @SkipForS3('No resumable upload support for S3.') |
| 1418 @unittest.skipIf(IS_WINDOWS, 'chmod on dir unsupported on Windows.') |
| 1419 @PerformsFileToObjectUpload |
| 1420 def test_cp_unwritable_tracker_file(self): |
| 1421 """Tests a resumable upload with an unwritable tracker file.""" |
| 1422 bucket_uri = self.CreateBucket() |
| 1423 tracker_filename = GetTrackerFilePath( |
| 1424 StorageUrlFromString(suri(bucket_uri, 'foo')), |
| 1425 TrackerFileType.UPLOAD, self.test_api) |
| 1426 tracker_dir = os.path.dirname(tracker_filename) |
| 1427 fpath = self.CreateTempFile(file_name='foo', contents='a' * ONE_KIB) |
| 1428 boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB)) |
| 1429 save_mod = os.stat(tracker_dir).st_mode |
| 1430 |
| 1431 try: |
| 1432 os.chmod(tracker_dir, 0) |
| 1433 with SetBotoConfigForTest([boto_config_for_test]): |
| 1434 stderr = self.RunGsUtil(['cp', fpath, suri(bucket_uri)], |
| 1435 expected_status=1, return_stderr=True) |
| 1436 self.assertIn('Couldn\'t write tracker file', stderr) |
| 1437 finally: |
| 1438 os.chmod(tracker_dir, save_mod) |
| 1439 if os.path.exists(tracker_filename): |
| 1440 os.unlink(tracker_filename) |
| 1441 |
| 1442 # This temporarily changes the tracker directory to unwritable which |
| 1443 # interferes with any parallel running tests that use the tracker directory. |
| 1444 @NotParallelizable |
| 1445 @unittest.skipIf(IS_WINDOWS, 'chmod on dir unsupported on Windows.') |
| 1446 def test_cp_unwritable_tracker_file_download(self): |
| 1447 """Tests downloads with an unwritable tracker file.""" |
| 1448 object_uri = self.CreateObject(contents='foo' * ONE_KIB) |
| 1449 tracker_filename = GetTrackerFilePath( |
| 1450 StorageUrlFromString(suri(object_uri)), |
| 1451 TrackerFileType.DOWNLOAD, self.test_api) |
| 1452 tracker_dir = os.path.dirname(tracker_filename) |
| 1453 fpath = self.CreateTempFile() |
| 1454 save_mod = os.stat(tracker_dir).st_mode |
| 1455 |
| 1456 try: |
| 1457 os.chmod(tracker_dir, 0) |
| 1458 boto_config_for_test = ('GSUtil', 'resumable_threshold', str(EIGHT_MIB)) |
| 1459 with SetBotoConfigForTest([boto_config_for_test]): |
| 1460 # Should succeed because we are below the threshold. |
| 1461 self.RunGsUtil(['cp', suri(object_uri), fpath]) |
| 1462 boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB)) |
| 1463 with SetBotoConfigForTest([boto_config_for_test]): |
| 1464 stderr = self.RunGsUtil(['cp', suri(object_uri), fpath], |
| 1465 expected_status=1, return_stderr=True) |
| 1466 self.assertIn('Couldn\'t write tracker file', stderr) |
| 1467 finally: |
| 1468 os.chmod(tracker_dir, save_mod) |
| 1469 if os.path.exists(tracker_filename): |
| 1470 os.unlink(tracker_filename) |
| 1471 |
| 1472 def test_cp_resumable_download_break(self): |
| 1473 """Tests that a download can be resumed after a connection break.""" |
| 1474 bucket_uri = self.CreateBucket() |
| 1475 object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo', |
| 1476 contents='a' * self.halt_size) |
| 1477 fpath = self.CreateTempFile() |
| 1478 test_callback_file = self.CreateTempFile( |
| 1479 contents=pickle.dumps(_HaltingCopyCallbackHandler(False, 5))) |
| 1480 |
| 1481 boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB)) |
| 1482 with SetBotoConfigForTest([boto_config_for_test]): |
| 1483 stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file, |
| 1484 suri(object_uri), fpath], |
| 1485 expected_status=1, return_stderr=True) |
| 1486 self.assertIn('Artifically halting download.', stderr) |
| 1487 tracker_filename = GetTrackerFilePath( |
| 1488 StorageUrlFromString(fpath), TrackerFileType.DOWNLOAD, self.test_api) |
| 1489 self.assertTrue(os.path.isfile(tracker_filename)) |
| 1490 stderr = self.RunGsUtil(['cp', suri(object_uri), fpath], |
| 1491 return_stderr=True) |
| 1492 self.assertIn('Resuming download', stderr) |
| 1493 |
| 1494 def test_cp_resumable_download_etag_differs(self): |
| 1495 """Tests that download restarts the file when the source object changes. |
| 1496 |
| 1497 This causes the etag not to match. |
| 1498 """ |
| 1499 bucket_uri = self.CreateBucket() |
| 1500 object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo', |
| 1501 contents='a' * self.halt_size) |
| 1502 fpath = self.CreateTempFile() |
| 1503 test_callback_file = self.CreateTempFile( |
| 1504 contents=pickle.dumps(_HaltingCopyCallbackHandler(False, 5))) |
| 1505 boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB)) |
| 1506 with SetBotoConfigForTest([boto_config_for_test]): |
| 1507 # This will create a tracker file with an ETag. |
| 1508 stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file, |
| 1509 suri(object_uri), fpath], |
| 1510 expected_status=1, return_stderr=True) |
| 1511 self.assertIn('Artifically halting download.', stderr) |
| 1512 # Create a new object with different contents - it should have a |
| 1513 # different ETag since the content has changed. |
| 1514 object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo', |
| 1515 contents='b' * self.halt_size) |
| 1516 stderr = self.RunGsUtil(['cp', suri(object_uri), fpath], |
| 1517 return_stderr=True) |
| 1518 self.assertNotIn('Resuming download', stderr) |
| 1519 |
| 1520 def test_cp_resumable_download_file_larger(self): |
| 1521 """Tests download deletes the tracker file when existing file is larger.""" |
| 1522 bucket_uri = self.CreateBucket() |
| 1523 fpath = self.CreateTempFile() |
| 1524 object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo', |
| 1525 contents='a' * self.halt_size) |
| 1526 test_callback_file = self.CreateTempFile( |
| 1527 contents=pickle.dumps(_HaltingCopyCallbackHandler(False, 5))) |
| 1528 boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB)) |
| 1529 with SetBotoConfigForTest([boto_config_for_test]): |
| 1530 stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file, |
| 1531 suri(object_uri), fpath], |
| 1532 expected_status=1, return_stderr=True) |
| 1533 self.assertIn('Artifically halting download.', stderr) |
| 1534 with open(fpath, 'w') as larger_file: |
| 1535 for _ in range(self.halt_size * 2): |
| 1536 larger_file.write('a') |
| 1537 stderr = self.RunGsUtil(['cp', suri(object_uri), fpath], |
| 1538 expected_status=1, return_stderr=True) |
| 1539 self.assertNotIn('Resuming download', stderr) |
| 1540 self.assertIn('is larger', stderr) |
| 1541 self.assertIn('Deleting tracker file', stderr) |
| 1542 |
| 1543 def test_cp_resumable_download_content_differs(self): |
| 1544 """Tests that we do not re-download when tracker file matches existing file. |
| 1545 |
| 1546 We only compare size, not contents, so re-download should not occur even |
| 1547 though the contents are technically different. However, hash validation on |
| 1548 the file should still occur and we will delete the file then because |
| 1549 the hashes differ. |
| 1550 """ |
| 1551 bucket_uri = self.CreateBucket() |
| 1552 tmp_dir = self.CreateTempDir() |
| 1553 fpath = self.CreateTempFile(tmpdir=tmp_dir, contents='abcd' * ONE_KIB) |
| 1554 object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo', |
| 1555 contents='efgh' * ONE_KIB) |
| 1556 stdout = self.RunGsUtil(['ls', '-L', suri(object_uri)], return_stdout=True) |
| 1557 etag_match = re.search(r'\s*ETag:\s*(.*)', stdout) |
| 1558 self.assertIsNotNone(etag_match, 'Could not get object ETag') |
| 1559 self.assertEqual(len(etag_match.groups()), 1, |
| 1560 'Did not match expected single ETag') |
| 1561 etag = etag_match.group(1) |
| 1562 |
| 1563 tracker_filename = GetTrackerFilePath( |
| 1564 StorageUrlFromString(fpath), TrackerFileType.DOWNLOAD, self.test_api) |
| 1565 try: |
| 1566 with open(tracker_filename, 'w') as tracker_fp: |
| 1567 tracker_fp.write(etag) |
| 1568 boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB)) |
| 1569 with SetBotoConfigForTest([boto_config_for_test]): |
| 1570 stderr = self.RunGsUtil(['cp', suri(object_uri), fpath], |
| 1571 return_stderr=True, expected_status=1) |
| 1572 self.assertIn('Download already complete for file', stderr) |
| 1573 self.assertIn('doesn\'t match cloud-supplied digest', stderr) |
| 1574 # File and tracker file should be deleted. |
| 1575 self.assertFalse(os.path.isfile(fpath)) |
| 1576 self.assertFalse(os.path.isfile(tracker_filename)) |
| 1577 finally: |
| 1578 if os.path.exists(tracker_filename): |
| 1579 os.unlink(tracker_filename) |
| 1580 |
| 1581 def test_cp_resumable_download_content_matches(self): |
| 1582 """Tests download no-ops when tracker file matches existing file.""" |
| 1583 bucket_uri = self.CreateBucket() |
| 1584 tmp_dir = self.CreateTempDir() |
| 1585 matching_contents = 'abcd' * ONE_KIB |
| 1586 fpath = self.CreateTempFile(tmpdir=tmp_dir, contents=matching_contents) |
| 1587 object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo', |
| 1588 contents=matching_contents) |
| 1589 stdout = self.RunGsUtil(['ls', '-L', suri(object_uri)], return_stdout=True) |
| 1590 etag_match = re.search(r'\s*ETag:\s*(.*)', stdout) |
| 1591 self.assertIsNotNone(etag_match, 'Could not get object ETag') |
| 1592 self.assertEqual(len(etag_match.groups()), 1, |
| 1593 'Did not match expected single ETag') |
| 1594 etag = etag_match.group(1) |
| 1595 tracker_filename = GetTrackerFilePath( |
| 1596 StorageUrlFromString(fpath), TrackerFileType.DOWNLOAD, self.test_api) |
| 1597 with open(tracker_filename, 'w') as tracker_fp: |
| 1598 tracker_fp.write(etag) |
| 1599 try: |
| 1600 boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB)) |
| 1601 with SetBotoConfigForTest([boto_config_for_test]): |
| 1602 stderr = self.RunGsUtil(['cp', suri(object_uri), fpath], |
| 1603 return_stderr=True) |
| 1604 self.assertIn('Download already complete for file', stderr) |
| 1605 # Tracker file should be removed after successful hash validation. |
| 1606 self.assertFalse(os.path.isfile(tracker_filename)) |
| 1607 finally: |
| 1608 if os.path.exists(tracker_filename): |
| 1609 os.unlink(tracker_filename) |
| 1610 |
| 1611 def test_cp_resumable_download_tracker_file_not_matches(self): |
| 1612 """Tests that download overwrites when tracker file etag does not match.""" |
| 1613 bucket_uri = self.CreateBucket() |
| 1614 tmp_dir = self.CreateTempDir() |
| 1615 fpath = self.CreateTempFile(tmpdir=tmp_dir, contents='abcd' * ONE_KIB) |
| 1616 object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo', |
| 1617 contents='efgh' * ONE_KIB) |
| 1618 stdout = self.RunGsUtil(['ls', '-L', suri(object_uri)], return_stdout=True) |
| 1619 etag_match = re.search(r'\s*ETag:\s*(.*)', stdout) |
| 1620 self.assertIsNotNone(etag_match, 'Could not get object ETag') |
| 1621 self.assertEqual(len(etag_match.groups()), 1, |
| 1622 'Did not match regex for exactly one object ETag') |
| 1623 etag = etag_match.group(1) |
| 1624 etag += 'nonmatching' |
| 1625 tracker_filename = GetTrackerFilePath( |
| 1626 StorageUrlFromString(fpath), TrackerFileType.DOWNLOAD, self.test_api) |
| 1627 with open(tracker_filename, 'w') as tracker_fp: |
| 1628 tracker_fp.write(etag) |
| 1629 try: |
| 1630 boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB)) |
| 1631 with SetBotoConfigForTest([boto_config_for_test]): |
| 1632 stderr = self.RunGsUtil(['cp', suri(object_uri), fpath], |
| 1633 return_stderr=True) |
| 1634 self.assertNotIn('Resuming download', stderr) |
| 1635 # Ensure the file was overwritten. |
| 1636 with open(fpath, 'r') as in_fp: |
| 1637 contents = in_fp.read() |
| 1638 self.assertEqual(contents, 'efgh' * ONE_KIB, |
| 1639 'File not overwritten when it should have been ' |
| 1640 'due to a non-matching tracker file.') |
| 1641 self.assertFalse(os.path.isfile(tracker_filename)) |
| 1642 finally: |
| 1643 if os.path.exists(tracker_filename): |
| 1644 os.unlink(tracker_filename) |
| 1645 |
| 1646 def test_cp_resumable_download_gzip(self): |
| 1647 """Tests that download can be resumed successfully with a gzipped file.""" |
| 1648 # Generate some reasonably incompressible data. This compresses to a bit |
| 1649 # around 128K in practice, but we assert specifically below that it is |
| 1650 # larger than self.halt_size to guarantee that we can halt the download |
| 1651 # partway through. |
| 1652 object_uri = self.CreateObject() |
| 1653 random.seed(0) |
| 1654 contents = str([random.choice(string.ascii_letters) |
| 1655 for _ in xrange(ONE_KIB * 128)]) |
| 1656 random.seed() # Reset the seed for any other tests. |
| 1657 fpath1 = self.CreateTempFile(file_name='unzipped.txt', contents=contents) |
| 1658 self.RunGsUtil(['cp', '-z', 'txt', suri(fpath1), suri(object_uri)]) |
| 1659 |
| 1660 # Use @Retry as hedge against bucket listing eventual consistency. |
| 1661 @Retry(AssertionError, tries=3, timeout_secs=1) |
| 1662 def _GetObjectSize(): |
| 1663 stdout = self.RunGsUtil(['du', suri(object_uri)], return_stdout=True) |
| 1664 size_match = re.search(r'(\d+)\s+.*', stdout) |
| 1665 self.assertIsNotNone(size_match, 'Could not get object size') |
| 1666 self.assertEqual(len(size_match.groups()), 1, |
| 1667 'Did not match regex for exactly one object size.') |
| 1668 return long(size_match.group(1)) |
| 1669 |
| 1670 object_size = _GetObjectSize() |
| 1671 self.assertGreaterEqual(object_size, self.halt_size, |
| 1672 'Compresed object size was not large enough to ' |
| 1673 'allow for a halted download, so the test results ' |
| 1674 'would be invalid. Please increase the compressed ' |
| 1675 'object size in the test.') |
| 1676 fpath2 = self.CreateTempFile() |
| 1677 test_callback_file = self.CreateTempFile( |
| 1678 contents=pickle.dumps(_HaltingCopyCallbackHandler(False, 5))) |
| 1679 |
| 1680 boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB)) |
| 1681 with SetBotoConfigForTest([boto_config_for_test]): |
| 1682 stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file, |
| 1683 suri(object_uri), suri(fpath2)], |
| 1684 return_stderr=True, expected_status=1) |
| 1685 self.assertIn('Artifically halting download.', stderr) |
| 1686 tracker_filename = GetTrackerFilePath( |
| 1687 StorageUrlFromString(fpath2), TrackerFileType.DOWNLOAD, self.test_api) |
| 1688 self.assertTrue(os.path.isfile(tracker_filename)) |
| 1689 self.assertIn('Downloading to temp gzip filename', stderr) |
| 1690 # We should have a temporary gzipped file, a tracker file, and no |
| 1691 # final file yet. |
| 1692 self.assertTrue(os.path.isfile('%s_.gztmp' % fpath2)) |
| 1693 stderr = self.RunGsUtil(['cp', suri(object_uri), suri(fpath2)], |
| 1694 return_stderr=True) |
| 1695 self.assertIn('Resuming download', stderr) |
| 1696 with open(fpath2, 'r') as f: |
| 1697 self.assertEqual(f.read(), contents, 'File contents did not match.') |
| 1698 self.assertFalse(os.path.isfile(tracker_filename)) |
| 1699 self.assertFalse(os.path.isfile('%s_.gztmp' % fpath2)) |
| 1700 |
| 1701 @SkipForS3('No resumable upload support for S3.') |
| 1702 def test_cp_resumable_upload_bucket_deleted(self): |
| 1703 """Tests that a not found exception is raised if bucket no longer exists.""" |
| 1704 bucket_uri = self.CreateBucket() |
| 1705 fpath = self.CreateTempFile(contents='a' * 2 * ONE_KIB) |
| 1706 boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB)) |
| 1707 test_callback_file = self.CreateTempFile( |
| 1708 contents=pickle.dumps( |
| 1709 _DeleteBucketThenStartOverCopyCallbackHandler(5, bucket_uri))) |
| 1710 |
| 1711 with SetBotoConfigForTest([boto_config_for_test]): |
| 1712 stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file, |
| 1713 fpath, suri(bucket_uri)], return_stderr=True, |
| 1714 expected_status=1) |
| 1715 self.assertIn('Deleting bucket', stderr) |
| 1716 self.assertIn('bucket does not exist', stderr) |
| 1717 |
| 1718 @SkipForS3('No resumable upload support for S3.') |
| 1719 def test_cp_resumable_upload_start_over_http_error(self): |
| 1720 for start_over_error in (404, 410): |
| 1721 self.start_over_error_test_helper(start_over_error) |
| 1722 |
| 1723 def start_over_error_test_helper(self, http_error_num): |
| 1724 bucket_uri = self.CreateBucket() |
| 1725 fpath = self.CreateTempFile(contents='a' * 2 * ONE_KIB) |
| 1726 boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB)) |
| 1727 if self.test_api == ApiSelector.JSON: |
| 1728 test_callback_file = self.CreateTempFile( |
| 1729 contents=pickle.dumps(_JSONForceHTTPErrorCopyCallbackHandler(5, 404))) |
| 1730 elif self.test_api == ApiSelector.XML: |
| 1731 test_callback_file = self.CreateTempFile( |
| 1732 contents=pickle.dumps( |
| 1733 _XMLResumableUploadStartOverCopyCallbackHandler(5))) |
| 1734 |
| 1735 with SetBotoConfigForTest([boto_config_for_test]): |
| 1736 stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file, |
| 1737 fpath, suri(bucket_uri)], return_stderr=True) |
| 1738 self.assertIn('Restarting upload from scratch', stderr) |
| 1739 |
| 1740 def test_cp_minus_c(self): |
| 1741 bucket_uri = self.CreateBucket() |
| 1742 object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo', |
| 1743 contents='foo') |
| 1744 self.RunGsUtil( |
| 1745 ['cp', '-c', suri(bucket_uri) + '/foo2', suri(object_uri), |
| 1746 suri(bucket_uri) + '/dir/'], |
| 1747 expected_status=1) |
| 1748 self.RunGsUtil(['stat', '%s/dir/foo' % suri(bucket_uri)]) |
| 1749 |
| 1750 def test_rewrite_cp(self): |
| 1751 """Tests the JSON Rewrite API.""" |
| 1752 if self.test_api == ApiSelector.XML: |
| 1753 return unittest.skip('Rewrite API is only supported in JSON.') |
| 1754 bucket_uri = self.CreateBucket() |
| 1755 object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo', |
| 1756 contents='bar') |
| 1757 gsutil_api = GcsJsonApi(BucketStorageUri, logging.getLogger(), |
| 1758 self.default_provider) |
| 1759 key = object_uri.get_key() |
| 1760 src_obj_metadata = apitools_messages.Object( |
| 1761 name=key.name, bucket=key.bucket.name, contentType=key.content_type) |
| 1762 dst_obj_metadata = apitools_messages.Object( |
| 1763 bucket=src_obj_metadata.bucket, |
| 1764 name=self.MakeTempName('object'), |
| 1765 contentType=src_obj_metadata.contentType) |
| 1766 gsutil_api.CopyObject(src_obj_metadata, dst_obj_metadata) |
| 1767 self.assertEqual( |
| 1768 gsutil_api.GetObjectMetadata(src_obj_metadata.bucket, |
| 1769 src_obj_metadata.name, |
| 1770 fields=['md5Hash']).md5Hash, |
| 1771 gsutil_api.GetObjectMetadata(dst_obj_metadata.bucket, |
| 1772 dst_obj_metadata.name, |
| 1773 fields=['md5Hash']).md5Hash, |
| 1774 'Error: Rewritten object\'s hash doesn\'t match source object.') |
| 1775 |
| 1776 def test_rewrite_cp_resume(self): |
| 1777 """Tests the JSON Rewrite API, breaking and resuming via a tracker file.""" |
| 1778 if self.test_api == ApiSelector.XML: |
| 1779 return unittest.skip('Rewrite API is only supported in JSON.') |
| 1780 bucket_uri = self.CreateBucket() |
| 1781 # Second bucket needs to be a different storage class so the service |
| 1782 # actually rewrites the bytes. |
| 1783 bucket_uri2 = self.CreateBucket( |
| 1784 storage_class='DURABLE_REDUCED_AVAILABILITY') |
| 1785 # maxBytesPerCall must be >= 1 MiB, so create an object > 2 MiB because we |
| 1786 # need 2 response from the service: 1 success, 1 failure prior to |
| 1787 # completion. |
| 1788 object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo', |
| 1789 contents=('12'*ONE_MIB) + 'bar', |
| 1790 prefer_json_api=True) |
| 1791 gsutil_api = GcsJsonApi(BucketStorageUri, logging.getLogger(), |
| 1792 self.default_provider) |
| 1793 key = object_uri.get_key() |
| 1794 src_obj_metadata = apitools_messages.Object( |
| 1795 name=key.name, bucket=key.bucket.name, contentType=key.content_type, |
| 1796 etag=key.etag.strip('"\'')) |
| 1797 dst_obj_name = self.MakeTempName('object') |
| 1798 dst_obj_metadata = apitools_messages.Object( |
| 1799 bucket=bucket_uri2.bucket_name, |
| 1800 name=dst_obj_name, |
| 1801 contentType=src_obj_metadata.contentType) |
| 1802 tracker_file_name = GetRewriteTrackerFilePath( |
| 1803 src_obj_metadata.bucket, src_obj_metadata.name, |
| 1804 dst_obj_metadata.bucket, dst_obj_metadata.name, self.test_api) |
| 1805 try: |
| 1806 try: |
| 1807 gsutil_api.CopyObject( |
| 1808 src_obj_metadata, dst_obj_metadata, |
| 1809 progress_callback=_HaltingRewriteCallbackHandler(ONE_MIB*2).call, |
| 1810 max_bytes_per_call=ONE_MIB) |
| 1811 self.fail('Expected _RewriteHaltException.') |
| 1812 except _RewriteHaltException: |
| 1813 pass |
| 1814 |
| 1815 # Tracker file should be left over. |
| 1816 self.assertTrue(os.path.exists(tracker_file_name)) |
| 1817 |
| 1818 # Now resume. Callback ensures we didn't start over. |
| 1819 gsutil_api.CopyObject( |
| 1820 src_obj_metadata, dst_obj_metadata, |
| 1821 progress_callback=_EnsureRewriteResumeCallbackHandler(ONE_MIB*2).call, |
| 1822 max_bytes_per_call=ONE_MIB) |
| 1823 |
| 1824 # Copy completed; tracker file should be deleted. |
| 1825 self.assertFalse(os.path.exists(tracker_file_name)) |
| 1826 |
| 1827 self.assertEqual( |
| 1828 gsutil_api.GetObjectMetadata(src_obj_metadata.bucket, |
| 1829 src_obj_metadata.name, |
| 1830 fields=['md5Hash']).md5Hash, |
| 1831 gsutil_api.GetObjectMetadata(dst_obj_metadata.bucket, |
| 1832 dst_obj_metadata.name, |
| 1833 fields=['md5Hash']).md5Hash, |
| 1834 'Error: Rewritten object\'s hash doesn\'t match source object.') |
| 1835 finally: |
| 1836 # Clean up if something went wrong. |
| 1837 DeleteTrackerFile(tracker_file_name) |
| 1838 |
| 1839 def test_rewrite_cp_resume_source_changed(self): |
| 1840 """Tests that Rewrite starts over when the source object has changed.""" |
| 1841 if self.test_api == ApiSelector.XML: |
| 1842 return unittest.skip('Rewrite API is only supported in JSON.') |
| 1843 bucket_uri = self.CreateBucket() |
| 1844 # Second bucket needs to be a different storage class so the service |
| 1845 # actually rewrites the bytes. |
| 1846 bucket_uri2 = self.CreateBucket( |
| 1847 storage_class='DURABLE_REDUCED_AVAILABILITY') |
| 1848 # maxBytesPerCall must be >= 1 MiB, so create an object > 2 MiB because we |
| 1849 # need 2 response from the service: 1 success, 1 failure prior to |
| 1850 # completion. |
| 1851 object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo', |
| 1852 contents=('12'*ONE_MIB) + 'bar', |
| 1853 prefer_json_api=True) |
| 1854 gsutil_api = GcsJsonApi(BucketStorageUri, logging.getLogger(), |
| 1855 self.default_provider) |
| 1856 key = object_uri.get_key() |
| 1857 src_obj_metadata = apitools_messages.Object( |
| 1858 name=key.name, bucket=key.bucket.name, contentType=key.content_type, |
| 1859 etag=key.etag.strip('"\'')) |
| 1860 dst_obj_name = self.MakeTempName('object') |
| 1861 dst_obj_metadata = apitools_messages.Object( |
| 1862 bucket=bucket_uri2.bucket_name, |
| 1863 name=dst_obj_name, |
| 1864 contentType=src_obj_metadata.contentType) |
| 1865 tracker_file_name = GetRewriteTrackerFilePath( |
| 1866 src_obj_metadata.bucket, src_obj_metadata.name, |
| 1867 dst_obj_metadata.bucket, dst_obj_metadata.name, self.test_api) |
| 1868 try: |
| 1869 try: |
| 1870 gsutil_api.CopyObject( |
| 1871 src_obj_metadata, dst_obj_metadata, |
| 1872 progress_callback=_HaltingRewriteCallbackHandler(ONE_MIB*2).call, |
| 1873 max_bytes_per_call=ONE_MIB) |
| 1874 self.fail('Expected _RewriteHaltException.') |
| 1875 except _RewriteHaltException: |
| 1876 pass |
| 1877 # Overwrite the original object. |
| 1878 object_uri2 = self.CreateObject(bucket_uri=bucket_uri, object_name='foo', |
| 1879 contents='bar', prefer_json_api=True) |
| 1880 key2 = object_uri2.get_key() |
| 1881 src_obj_metadata2 = apitools_messages.Object( |
| 1882 name=key2.name, bucket=key2.bucket.name, |
| 1883 contentType=key2.content_type, etag=key2.etag.strip('"\'')) |
| 1884 |
| 1885 # Tracker file for original object should still exist. |
| 1886 self.assertTrue(os.path.exists(tracker_file_name)) |
| 1887 |
| 1888 # Copy the new object. |
| 1889 gsutil_api.CopyObject(src_obj_metadata2, dst_obj_metadata, |
| 1890 max_bytes_per_call=ONE_MIB) |
| 1891 |
| 1892 # Copy completed; original tracker file should be deleted. |
| 1893 self.assertFalse(os.path.exists(tracker_file_name)) |
| 1894 |
| 1895 self.assertEqual( |
| 1896 gsutil_api.GetObjectMetadata(src_obj_metadata2.bucket, |
| 1897 src_obj_metadata2.name, |
| 1898 fields=['md5Hash']).md5Hash, |
| 1899 gsutil_api.GetObjectMetadata(dst_obj_metadata.bucket, |
| 1900 dst_obj_metadata.name, |
| 1901 fields=['md5Hash']).md5Hash, |
| 1902 'Error: Rewritten object\'s hash doesn\'t match source object.') |
| 1903 finally: |
| 1904 # Clean up if something went wrong. |
| 1905 DeleteTrackerFile(tracker_file_name) |
| 1906 |
| 1907 def test_rewrite_cp_resume_command_changed(self): |
| 1908 """Tests that Rewrite starts over when the arguments changed.""" |
| 1909 if self.test_api == ApiSelector.XML: |
| 1910 return unittest.skip('Rewrite API is only supported in JSON.') |
| 1911 bucket_uri = self.CreateBucket() |
| 1912 # Second bucket needs to be a different storage class so the service |
| 1913 # actually rewrites the bytes. |
| 1914 bucket_uri2 = self.CreateBucket( |
| 1915 storage_class='DURABLE_REDUCED_AVAILABILITY') |
| 1916 # maxBytesPerCall must be >= 1 MiB, so create an object > 2 MiB because we |
| 1917 # need 2 response from the service: 1 success, 1 failure prior to |
| 1918 # completion. |
| 1919 object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo', |
| 1920 contents=('12'*ONE_MIB) + 'bar', |
| 1921 prefer_json_api=True) |
| 1922 gsutil_api = GcsJsonApi(BucketStorageUri, logging.getLogger(), |
| 1923 self.default_provider) |
| 1924 key = object_uri.get_key() |
| 1925 src_obj_metadata = apitools_messages.Object( |
| 1926 name=key.name, bucket=key.bucket.name, contentType=key.content_type, |
| 1927 etag=key.etag.strip('"\'')) |
| 1928 dst_obj_name = self.MakeTempName('object') |
| 1929 dst_obj_metadata = apitools_messages.Object( |
| 1930 bucket=bucket_uri2.bucket_name, |
| 1931 name=dst_obj_name, |
| 1932 contentType=src_obj_metadata.contentType) |
| 1933 tracker_file_name = GetRewriteTrackerFilePath( |
| 1934 src_obj_metadata.bucket, src_obj_metadata.name, |
| 1935 dst_obj_metadata.bucket, dst_obj_metadata.name, self.test_api) |
| 1936 try: |
| 1937 try: |
| 1938 gsutil_api.CopyObject( |
| 1939 src_obj_metadata, dst_obj_metadata, canned_acl='private', |
| 1940 progress_callback=_HaltingRewriteCallbackHandler(ONE_MIB*2).call, |
| 1941 max_bytes_per_call=ONE_MIB) |
| 1942 self.fail('Expected _RewriteHaltException.') |
| 1943 except _RewriteHaltException: |
| 1944 pass |
| 1945 |
| 1946 # Tracker file for original object should still exist. |
| 1947 self.assertTrue(os.path.exists(tracker_file_name)) |
| 1948 |
| 1949 # Copy the same object but with different call parameters. |
| 1950 gsutil_api.CopyObject(src_obj_metadata, dst_obj_metadata, |
| 1951 canned_acl='public-read', |
| 1952 max_bytes_per_call=ONE_MIB) |
| 1953 |
| 1954 # Copy completed; original tracker file should be deleted. |
| 1955 self.assertFalse(os.path.exists(tracker_file_name)) |
| 1956 |
| 1957 new_obj_metadata = gsutil_api.GetObjectMetadata( |
| 1958 dst_obj_metadata.bucket, dst_obj_metadata.name, |
| 1959 fields=['acl,md5Hash']) |
| 1960 self.assertEqual( |
| 1961 gsutil_api.GetObjectMetadata(src_obj_metadata.bucket, |
| 1962 src_obj_metadata.name, |
| 1963 fields=['md5Hash']).md5Hash, |
| 1964 new_obj_metadata.md5Hash, |
| 1965 'Error: Rewritten object\'s hash doesn\'t match source object.') |
| 1966 # New object should have a public-read ACL from the second command. |
| 1967 found_public_acl = False |
| 1968 for acl_entry in new_obj_metadata.acl: |
| 1969 if acl_entry.entity == 'allUsers': |
| 1970 found_public_acl = True |
| 1971 self.assertTrue(found_public_acl, |
| 1972 'New object was not written with a public ACL.') |
| 1973 finally: |
| 1974 # Clean up if something went wrong. |
| 1975 DeleteTrackerFile(tracker_file_name) |
| 1976 |
| 1977 |
| 1978 class TestCpUnitTests(testcase.GsUtilUnitTestCase): |
| 1979 """Unit tests for gsutil cp.""" |
| 1980 |
| 1981 def testDownloadWithNoHashAvailable(self): |
| 1982 """Tests a download with no valid server-supplied hash.""" |
| 1983 # S3 should have a special message for non-MD5 etags. |
| 1984 bucket_uri = self.CreateBucket(provider='s3') |
| 1985 object_uri = self.CreateObject(bucket_uri=bucket_uri, contents='foo') |
| 1986 object_uri.get_key().etag = '12345' # Not an MD5 |
| 1987 dst_dir = self.CreateTempDir() |
| 1988 |
| 1989 log_handler = self.RunCommand( |
| 1990 'cp', [suri(object_uri), dst_dir], return_log_handler=True) |
| 1991 warning_messages = log_handler.messages['warning'] |
| 1992 self.assertEquals(2, len(warning_messages)) |
| 1993 self.assertRegexpMatches( |
| 1994 warning_messages[0], |
| 1995 r'Non-MD5 etag \(12345\) present for key .*, ' |
| 1996 r'data integrity checks are not possible') |
| 1997 self.assertIn('Integrity cannot be assured', warning_messages[1]) |
| 1998 |
| 1999 def test_object_and_prefix_same_name(self): |
| 2000 bucket_uri = self.CreateBucket() |
| 2001 object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo', |
| 2002 contents='foo') |
| 2003 self.CreateObject(bucket_uri=bucket_uri, |
| 2004 object_name='foo/bar', contents='bar') |
| 2005 fpath = self.CreateTempFile() |
| 2006 # MockKey doesn't support hash_algs, so the MD5 will not match. |
| 2007 with SetBotoConfigForTest([('GSUtil', 'check_hashes', 'never')]): |
| 2008 self.RunCommand('cp', [suri(object_uri), fpath]) |
| 2009 with open(fpath, 'r') as f: |
| 2010 self.assertEqual(f.read(), 'foo') |
| 2011 |
| 2012 def test_cp_upload_respects_no_hashes(self): |
| 2013 bucket_uri = self.CreateBucket() |
| 2014 fpath = self.CreateTempFile(contents='abcd') |
| 2015 with SetBotoConfigForTest([('GSUtil', 'check_hashes', 'never')]): |
| 2016 log_handler = self.RunCommand('cp', [fpath, suri(bucket_uri)], |
| 2017 return_log_handler=True) |
| 2018 warning_messages = log_handler.messages['warning'] |
| 2019 self.assertEquals(1, len(warning_messages)) |
| 2020 self.assertIn('Found no hashes to validate object upload', |
| 2021 warning_messages[0]) |
OLD | NEW |