Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(110)

Side by Side Diff: py/utils/gs_utils_manualtest.py

Issue 405083005: use better mechanism for enums in gs_utils.py (Closed) Base URL: https://skia.googlesource.com/common.git@master
Patch Set: all caps for constants Created 6 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « py/utils/gs_utils.py ('k') | py/utils/url_utils.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 #!/usr/bin/python
2
3 """Tests for gs_utils.py.
4
5 TODO(epoger): How should we exercise these self-tests? See http://skbug.com/2751
6 """
7
8 # System-level imports.
9 import os
10 import posixpath
11 import random
12 import shutil
13 import sys
14 import tempfile
15
16 # Local imports.
17 import gs_utils
18
19
20 def _test_public_read():
21 """Make sure we can read from public files without .boto file credentials."""
22 gs = gs_utils.GSUtils()
23 gs.list_bucket_contents(bucket='chromium-skia-gm-summaries', subdir=None)
24
25
26 def _test_authenticated_round_trip():
27 try:
28 gs = gs_utils.GSUtils(
29 boto_file_path=os.path.expanduser(os.path.join('~','.boto')))
30 except:
31 print """
32 Failed to instantiate GSUtils object with default .boto file path.
33 Do you have a ~/.boto file that provides the credentials needed to read
34 and write gs://chromium-skia-gm ?
35 """
36 raise
37
38 bucket = 'chromium-skia-gm'
39 remote_dir = 'gs_utils_test/%d' % random.randint(0, sys.maxint)
40 subdir = 'subdir'
41 filenames_to_upload = ['file1', 'file2']
42
43 # Upload test files to Google Storage, checking that their fine-grained
44 # ACLs were set correctly.
45 id_type = gs.IdType.GROUP_BY_DOMAIN
46 id_value = 'chromium.org'
47 set_permission = gs.Permission.READ
48 local_src_dir = tempfile.mkdtemp()
49 os.mkdir(os.path.join(local_src_dir, subdir))
50 try:
51 for filename in filenames_to_upload:
52 with open(os.path.join(local_src_dir, subdir, filename), 'w') as f:
53 f.write('contents of %s\n' % filename)
54 dest_path = posixpath.join(remote_dir, subdir, filename)
55 gs.upload_file(
56 source_path=os.path.join(local_src_dir, subdir, filename),
57 dest_bucket=bucket, dest_path=dest_path,
58 fine_grained_acl_list=[(id_type, id_value, set_permission)])
59 got_permission = gs.get_acl(bucket=bucket, path=dest_path,
60 id_type=id_type, id_value=id_value)
61 assert got_permission == set_permission, '%s == %s' % (
62 got_permission, set_permission)
63 finally:
64 shutil.rmtree(local_src_dir)
65
66 # Get a list of the files we uploaded to Google Storage.
67 (dirs, files) = gs.list_bucket_contents(
68 bucket=bucket, subdir=remote_dir)
69 assert dirs == [subdir], '%s == [%s]' % (dirs, subdir)
70 assert files == [], '%s == []' % files
71 (dirs, files) = gs.list_bucket_contents(
72 bucket=bucket, subdir=posixpath.join(remote_dir, subdir))
73 assert dirs == [], '%s == []' % dirs
74 assert files == filenames_to_upload, '%s == %s' % (files, filenames_to_upload)
75
76 # Manipulate ACLs on one of those files, and verify them.
77 # TODO(epoger): Test IdTypes other than GROUP_BY_DOMAIN ?
78 # TODO(epoger): Test setting multiple ACLs on the same file?
79 id_type = gs.IdType.GROUP_BY_DOMAIN
80 id_value = 'google.com'
81 fullpath = posixpath.join(remote_dir, subdir, filenames_to_upload[0])
82 # Make sure ACL is empty to start with ...
83 gs.set_acl(bucket=bucket, path=fullpath,
84 id_type=id_type, id_value=id_value, permission=gs.Permission.EMPTY)
85 permission = gs.get_acl(bucket=bucket, path=fullpath,
86 id_type=id_type, id_value=id_value)
87 assert permission == gs.Permission.EMPTY, '%s == %s' % (
88 permission, gs.Permission.EMPTY)
89 # ... set it to OWNER ...
90 gs.set_acl(bucket=bucket, path=fullpath,
91 id_type=id_type, id_value=id_value, permission=gs.Permission.OWNER)
92 permission = gs.get_acl(bucket=bucket, path=fullpath,
93 id_type=id_type, id_value=id_value)
94 assert permission == gs.Permission.OWNER, '%s == %s' % (
95 permission, gs.Permission.OWNER)
96 # ... now set it to READ ...
97 gs.set_acl(bucket=bucket, path=fullpath,
98 id_type=id_type, id_value=id_value, permission=gs.Permission.READ)
99 permission = gs.get_acl(bucket=bucket, path=fullpath,
100 id_type=id_type, id_value=id_value)
101 assert permission == gs.Permission.READ, '%s == %s' % (
102 permission, gs.Permission.READ)
103 # ... and clear it again to finish.
104 gs.set_acl(bucket=bucket, path=fullpath,
105 id_type=id_type, id_value=id_value, permission=gs.Permission.EMPTY)
106 permission = gs.get_acl(bucket=bucket, path=fullpath,
107 id_type=id_type, id_value=id_value)
108 assert permission == gs.Permission.EMPTY, '%s == %s' % (
109 permission, gs.Permission.EMPTY)
110
111 # Download the files we uploaded to Google Storage, and validate contents.
112 local_dest_dir = tempfile.mkdtemp()
113 try:
114 for filename in filenames_to_upload:
115 gs.download_file(source_bucket=bucket,
116 source_path=posixpath.join(remote_dir, subdir, filename),
117 dest_path=os.path.join(local_dest_dir, subdir, filename),
118 create_subdirs_if_needed=True)
119 with open(os.path.join(local_dest_dir, subdir, filename)) as f:
120 file_contents = f.read()
121 assert file_contents == 'contents of %s\n' % filename, (
122 '%s == "contents of %s\n"' % (file_contents, filename))
123 finally:
124 shutil.rmtree(local_dest_dir)
125
126 # Delete all the files we uploaded to Google Storage.
127 for filename in filenames_to_upload:
128 gs.delete_file(bucket=bucket,
129 path=posixpath.join(remote_dir, subdir, filename))
130
131 # Confirm that we deleted all the files we uploaded to Google Storage.
132 (dirs, files) = gs.list_bucket_contents(
133 bucket=bucket, subdir=posixpath.join(remote_dir, subdir))
134 assert dirs == [], '%s == []' % dirs
135 assert files == [], '%s == []' % files
136
137
138 def _test_dir_upload_and_download():
139 """Test upload_dir_contents() and download_dir_contents()."""
140 try:
141 gs = gs_utils.GSUtils(
142 boto_file_path=os.path.expanduser(os.path.join('~','.boto')))
143 except:
144 print """
145 Failed to instantiate GSUtils object with default .boto file path.
146 Do you have a ~/.boto file that provides the credentials needed to read
147 and write gs://chromium-skia-gm ?
148 """
149 raise
150
151 bucket = 'chromium-skia-gm'
152 remote_dir = 'gs_utils_test/%d' % random.randint(0, sys.maxint)
153 subdir = 'subdir'
154 filenames = ['file1', 'file2']
155
156 # Create directory tree on local disk and upload it.
157 id_type = gs.IdType.GROUP_BY_DOMAIN
158 id_value = 'chromium.org'
159 set_permission = gs.Permission.READ
160 local_src_dir = tempfile.mkdtemp()
161 os.mkdir(os.path.join(local_src_dir, subdir))
162 try:
163 for filename in filenames:
164 with open(os.path.join(local_src_dir, subdir, filename), 'w') as f:
165 f.write('contents of %s\n' % filename)
166 gs.upload_dir_contents(
167 source_dir=local_src_dir, dest_bucket=bucket, dest_dir=remote_dir,
168 predefined_acl=gs.PredefinedACL.PRIVATE,
169 fine_grained_acl_list=[(id_type, id_value, set_permission)])
170 finally:
171 shutil.rmtree(local_src_dir)
172
173 # Validate the list of the files we uploaded to Google Storage.
174 (dirs, files) = gs.list_bucket_contents(
175 bucket=bucket, subdir=remote_dir)
176 assert dirs == [subdir], '%s == [%s]' % (dirs, subdir)
177 assert files == [], '%s == []' % files
178 (dirs, files) = gs.list_bucket_contents(
179 bucket=bucket, subdir=posixpath.join(remote_dir, subdir))
180 assert dirs == [], '%s == []' % dirs
181 assert files == filenames, '%s == %s' % (files, filenames)
182
183 # Check the fine-grained ACLs we set in Google Storage.
184 for filename in filenames:
185 got_permission = gs.get_acl(
186 bucket=bucket, path=posixpath.join(remote_dir, subdir, filename),
187 id_type=id_type, id_value=id_value)
188 assert got_permission == set_permission, '%s == %s' % (
189 got_permission, set_permission)
190
191 # Download the directory tree we just uploaded, make sure its contents
192 # are what we expect, and then delete the tree in Google Storage.
193 local_dest_dir = tempfile.mkdtemp()
194 try:
195 gs.download_dir_contents(source_bucket=bucket, source_dir=remote_dir,
196 dest_dir=local_dest_dir)
197 for filename in filenames:
198 with open(os.path.join(local_dest_dir, subdir, filename)) as f:
199 file_contents = f.read()
200 assert file_contents == 'contents of %s\n' % filename, (
201 '%s == "contents of %s\n"' % (file_contents, filename))
202 finally:
203 shutil.rmtree(local_dest_dir)
204 for filename in filenames:
205 gs.delete_file(bucket=bucket,
206 path=posixpath.join(remote_dir, subdir, filename))
207
208
209 if __name__ == '__main__':
210 _test_public_read()
211 _test_authenticated_round_trip()
212 _test_dir_upload_and_download()
213 # TODO(epoger): Add _test_unauthenticated_access() to make sure we raise
214 # an exception when we try to access without needed credentials.
OLDNEW
« no previous file with comments | « py/utils/gs_utils.py ('k') | py/utils/url_utils.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698