Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(239)

Side by Side Diff: third_party/gsutil/gslib/tests/test_mv.py

Issue 1377933002: [catapult] - Copy Telemetry's gsutilz over to third_party. (Closed) Base URL: https://github.com/catapult-project/catapult.git@master
Patch Set: Rename to gsutil. Created 5 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 # -*- coding: utf-8 -*-
2 # Copyright 2013 Google Inc. All Rights Reserved.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 """Integration tests for mv command."""
16
17 from __future__ import absolute_import
18
19 import os
20
21 import gslib.tests.testcase as testcase
22 from gslib.tests.util import ObjectToURI as suri
23 from gslib.tests.util import PerformsFileToObjectUpload
24 from gslib.util import Retry
25
26
27 class TestMv(testcase.GsUtilIntegrationTestCase):
28 """Integration tests for mv command."""
29
30 def test_moving(self):
31 """Tests moving two buckets, one with 2 objects and one with 0 objects."""
32 bucket1_uri = self.CreateBucket(test_objects=2)
33 self.AssertNObjectsInBucket(bucket1_uri, 2)
34 bucket2_uri = self.CreateBucket()
35 self.AssertNObjectsInBucket(bucket2_uri, 0)
36
37 # Move two objects from bucket1 to bucket2.
38 objs = [bucket1_uri.clone_replace_key(key).versionless_uri
39 for key in bucket1_uri.list_bucket()]
40 cmd = (['-m', 'mv'] + objs + [suri(bucket2_uri)])
41 stderr = self.RunGsUtil(cmd, return_stderr=True)
42 # Rewrite API may output an additional 'Copying' progress notification.
43 self.assertGreaterEqual(stderr.count('Copying'), 2)
44 self.assertLessEqual(stderr.count('Copying'), 4)
45 self.assertEqual(stderr.count('Copying') % 2, 0)
46 self.assertEqual(stderr.count('Removing'), 2)
47
48 self.AssertNObjectsInBucket(bucket1_uri, 0)
49 self.AssertNObjectsInBucket(bucket2_uri, 2)
50
51 # Remove one of the objects.
52 objs = [bucket2_uri.clone_replace_key(key).versionless_uri
53 for key in bucket2_uri.list_bucket()]
54 obj1 = objs[0]
55 self.RunGsUtil(['rm', obj1])
56
57 self.AssertNObjectsInBucket(bucket1_uri, 0)
58 self.AssertNObjectsInBucket(bucket2_uri, 1)
59
60 # Move the 1 remaining object back.
61 objs = [suri(bucket2_uri.clone_replace_key(key))
62 for key in bucket2_uri.list_bucket()]
63 cmd = (['-m', 'mv'] + objs + [suri(bucket1_uri)])
64 stderr = self.RunGsUtil(cmd, return_stderr=True)
65 # Rewrite API may output an additional 'Copying' progress notification.
66 self.assertGreaterEqual(stderr.count('Copying'), 1)
67 self.assertLessEqual(stderr.count('Copying'), 2)
68 self.assertEqual(stderr.count('Removing'), 1)
69
70 self.AssertNObjectsInBucket(bucket1_uri, 1)
71 self.AssertNObjectsInBucket(bucket2_uri, 0)
72
73 def test_move_dir_to_bucket(self):
74 """Tests moving a local directory to a bucket."""
75 bucket_uri = self.CreateBucket()
76 dir_to_move = self.CreateTempDir(test_files=2)
77 self.RunGsUtil(['mv', dir_to_move, suri(bucket_uri)])
78 self.AssertNObjectsInBucket(bucket_uri, 2)
79
80 @PerformsFileToObjectUpload
81 def test_stdin_args(self):
82 """Tests mv with the -I option."""
83 tmpdir = self.CreateTempDir()
84 fpath1 = self.CreateTempFile(tmpdir=tmpdir, contents='data1')
85 fpath2 = self.CreateTempFile(tmpdir=tmpdir, contents='data2')
86 bucket_uri = self.CreateBucket()
87 self.RunGsUtil(['mv', '-I', suri(bucket_uri)],
88 stdin='\n'.join((fpath1, fpath2)))
89
90 # Use @Retry as hedge against bucket listing eventual consistency.
91 @Retry(AssertionError, tries=3, timeout_secs=1)
92 def _Check1():
93 stdout = self.RunGsUtil(['ls', suri(bucket_uri)], return_stdout=True)
94 self.assertIn(os.path.basename(fpath1), stdout)
95 self.assertIn(os.path.basename(fpath2), stdout)
96 self.assertNumLines(stdout, 2)
97 _Check1()
98
99 def test_mv_no_clobber(self):
100 """Tests mv with the -n option."""
101 fpath1 = self.CreateTempFile(contents='data1')
102 bucket_uri = self.CreateBucket()
103 object_uri = self.CreateObject(bucket_uri=bucket_uri, contents='data2')
104 stderr = self.RunGsUtil(['mv', '-n', fpath1, suri(object_uri)],
105 return_stderr=True)
106 # Copy should be skipped and source file should not be removed.
107 self.assertIn('Skipping existing item: %s' % suri(object_uri), stderr)
108 self.assertNotIn('Removing %s' % suri(fpath1), stderr)
109 # Object content should be unchanged.
110 contents = self.RunGsUtil(['cat', suri(object_uri)], return_stdout=True)
111 self.assertEqual(contents, 'data2')
112
OLDNEW
« no previous file with comments | « third_party/gsutil/gslib/tests/test_mb.py ('k') | third_party/gsutil/gslib/tests/test_naming.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698