Chromium Code Reviews (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out

Unified Diff: py/utils/

Issue 405083005: use better mechanism for enums in (Closed) Base URL:
Patch Set: all caps for constants Created 6 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « py/utils/ ('k') | py/utils/ » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: py/utils/
diff --git a/py/utils/ b/py/utils/
new file mode 100755
index 0000000000000000000000000000000000000000..6751647abbad44391fadac50a03f7103fbf6797b
--- /dev/null
+++ b/py/utils/
@@ -0,0 +1,214 @@
+"""Tests for
+TODO(epoger): How should we exercise these self-tests? See
+# System-level imports.
+import os
+import posixpath
+import random
+import shutil
+import sys
+import tempfile
+# Local imports.
+import gs_utils
+def _test_public_read():
+ """Make sure we can read from public files without .boto file credentials."""
+ gs = gs_utils.GSUtils()
+ gs.list_bucket_contents(bucket='chromium-skia-gm-summaries', subdir=None)
+def _test_authenticated_round_trip():
+ try:
+ gs = gs_utils.GSUtils(
+ boto_file_path=os.path.expanduser(os.path.join('~','.boto')))
+ except:
+ print """
+Failed to instantiate GSUtils object with default .boto file path.
+Do you have a ~/.boto file that provides the credentials needed to read
+and write gs://chromium-skia-gm ?
+ raise
+ bucket = 'chromium-skia-gm'
+ remote_dir = 'gs_utils_test/%d' % random.randint(0, sys.maxint)
+ subdir = 'subdir'
+ filenames_to_upload = ['file1', 'file2']
+ # Upload test files to Google Storage, checking that their fine-grained
+ # ACLs were set correctly.
+ id_type = gs.IdType.GROUP_BY_DOMAIN
+ id_value = ''
+ set_permission = gs.Permission.READ
+ local_src_dir = tempfile.mkdtemp()
+ os.mkdir(os.path.join(local_src_dir, subdir))
+ try:
+ for filename in filenames_to_upload:
+ with open(os.path.join(local_src_dir, subdir, filename), 'w') as f:
+ f.write('contents of %s\n' % filename)
+ dest_path = posixpath.join(remote_dir, subdir, filename)
+ gs.upload_file(
+ source_path=os.path.join(local_src_dir, subdir, filename),
+ dest_bucket=bucket, dest_path=dest_path,
+ fine_grained_acl_list=[(id_type, id_value, set_permission)])
+ got_permission = gs.get_acl(bucket=bucket, path=dest_path,
+ id_type=id_type, id_value=id_value)
+ assert got_permission == set_permission, '%s == %s' % (
+ got_permission, set_permission)
+ finally:
+ shutil.rmtree(local_src_dir)
+ # Get a list of the files we uploaded to Google Storage.
+ (dirs, files) = gs.list_bucket_contents(
+ bucket=bucket, subdir=remote_dir)
+ assert dirs == [subdir], '%s == [%s]' % (dirs, subdir)
+ assert files == [], '%s == []' % files
+ (dirs, files) = gs.list_bucket_contents(
+ bucket=bucket, subdir=posixpath.join(remote_dir, subdir))
+ assert dirs == [], '%s == []' % dirs
+ assert files == filenames_to_upload, '%s == %s' % (files, filenames_to_upload)
+ # Manipulate ACLs on one of those files, and verify them.
+ # TODO(epoger): Test IdTypes other than GROUP_BY_DOMAIN ?
+ # TODO(epoger): Test setting multiple ACLs on the same file?
+ id_type = gs.IdType.GROUP_BY_DOMAIN
+ id_value = ''
+ fullpath = posixpath.join(remote_dir, subdir, filenames_to_upload[0])
+ # Make sure ACL is empty to start with ...
+ gs.set_acl(bucket=bucket, path=fullpath,
+ id_type=id_type, id_value=id_value, permission=gs.Permission.EMPTY)
+ permission = gs.get_acl(bucket=bucket, path=fullpath,
+ id_type=id_type, id_value=id_value)
+ assert permission == gs.Permission.EMPTY, '%s == %s' % (
+ permission, gs.Permission.EMPTY)
+ # ... set it to OWNER ...
+ gs.set_acl(bucket=bucket, path=fullpath,
+ id_type=id_type, id_value=id_value, permission=gs.Permission.OWNER)
+ permission = gs.get_acl(bucket=bucket, path=fullpath,
+ id_type=id_type, id_value=id_value)
+ assert permission == gs.Permission.OWNER, '%s == %s' % (
+ permission, gs.Permission.OWNER)
+ # ... now set it to READ ...
+ gs.set_acl(bucket=bucket, path=fullpath,
+ id_type=id_type, id_value=id_value, permission=gs.Permission.READ)
+ permission = gs.get_acl(bucket=bucket, path=fullpath,
+ id_type=id_type, id_value=id_value)
+ assert permission == gs.Permission.READ, '%s == %s' % (
+ permission, gs.Permission.READ)
+ # ... and clear it again to finish.
+ gs.set_acl(bucket=bucket, path=fullpath,
+ id_type=id_type, id_value=id_value, permission=gs.Permission.EMPTY)
+ permission = gs.get_acl(bucket=bucket, path=fullpath,
+ id_type=id_type, id_value=id_value)
+ assert permission == gs.Permission.EMPTY, '%s == %s' % (
+ permission, gs.Permission.EMPTY)
+ # Download the files we uploaded to Google Storage, and validate contents.
+ local_dest_dir = tempfile.mkdtemp()
+ try:
+ for filename in filenames_to_upload:
+ gs.download_file(source_bucket=bucket,
+ source_path=posixpath.join(remote_dir, subdir, filename),
+ dest_path=os.path.join(local_dest_dir, subdir, filename),
+ create_subdirs_if_needed=True)
+ with open(os.path.join(local_dest_dir, subdir, filename)) as f:
+ file_contents =
+ assert file_contents == 'contents of %s\n' % filename, (
+ '%s == "contents of %s\n"' % (file_contents, filename))
+ finally:
+ shutil.rmtree(local_dest_dir)
+ # Delete all the files we uploaded to Google Storage.
+ for filename in filenames_to_upload:
+ gs.delete_file(bucket=bucket,
+ path=posixpath.join(remote_dir, subdir, filename))
+ # Confirm that we deleted all the files we uploaded to Google Storage.
+ (dirs, files) = gs.list_bucket_contents(
+ bucket=bucket, subdir=posixpath.join(remote_dir, subdir))
+ assert dirs == [], '%s == []' % dirs
+ assert files == [], '%s == []' % files
+def _test_dir_upload_and_download():
+ """Test upload_dir_contents() and download_dir_contents()."""
+ try:
+ gs = gs_utils.GSUtils(
+ boto_file_path=os.path.expanduser(os.path.join('~','.boto')))
+ except:
+ print """
+Failed to instantiate GSUtils object with default .boto file path.
+Do you have a ~/.boto file that provides the credentials needed to read
+and write gs://chromium-skia-gm ?
+ raise
+ bucket = 'chromium-skia-gm'
+ remote_dir = 'gs_utils_test/%d' % random.randint(0, sys.maxint)
+ subdir = 'subdir'
+ filenames = ['file1', 'file2']
+ # Create directory tree on local disk and upload it.
+ id_type = gs.IdType.GROUP_BY_DOMAIN
+ id_value = ''
+ set_permission = gs.Permission.READ
+ local_src_dir = tempfile.mkdtemp()
+ os.mkdir(os.path.join(local_src_dir, subdir))
+ try:
+ for filename in filenames:
+ with open(os.path.join(local_src_dir, subdir, filename), 'w') as f:
+ f.write('contents of %s\n' % filename)
+ gs.upload_dir_contents(
+ source_dir=local_src_dir, dest_bucket=bucket, dest_dir=remote_dir,
+ predefined_acl=gs.PredefinedACL.PRIVATE,
+ fine_grained_acl_list=[(id_type, id_value, set_permission)])
+ finally:
+ shutil.rmtree(local_src_dir)
+ # Validate the list of the files we uploaded to Google Storage.
+ (dirs, files) = gs.list_bucket_contents(
+ bucket=bucket, subdir=remote_dir)
+ assert dirs == [subdir], '%s == [%s]' % (dirs, subdir)
+ assert files == [], '%s == []' % files
+ (dirs, files) = gs.list_bucket_contents(
+ bucket=bucket, subdir=posixpath.join(remote_dir, subdir))
+ assert dirs == [], '%s == []' % dirs
+ assert files == filenames, '%s == %s' % (files, filenames)
+ # Check the fine-grained ACLs we set in Google Storage.
+ for filename in filenames:
+ got_permission = gs.get_acl(
+ bucket=bucket, path=posixpath.join(remote_dir, subdir, filename),
+ id_type=id_type, id_value=id_value)
+ assert got_permission == set_permission, '%s == %s' % (
+ got_permission, set_permission)
+ # Download the directory tree we just uploaded, make sure its contents
+ # are what we expect, and then delete the tree in Google Storage.
+ local_dest_dir = tempfile.mkdtemp()
+ try:
+ gs.download_dir_contents(source_bucket=bucket, source_dir=remote_dir,
+ dest_dir=local_dest_dir)
+ for filename in filenames:
+ with open(os.path.join(local_dest_dir, subdir, filename)) as f:
+ file_contents =
+ assert file_contents == 'contents of %s\n' % filename, (
+ '%s == "contents of %s\n"' % (file_contents, filename))
+ finally:
+ shutil.rmtree(local_dest_dir)
+ for filename in filenames:
+ gs.delete_file(bucket=bucket,
+ path=posixpath.join(remote_dir, subdir, filename))
+if __name__ == '__main__':
+ _test_public_read()
+ _test_authenticated_round_trip()
+ _test_dir_upload_and_download()
+ # TODO(epoger): Add _test_unauthenticated_access() to make sure we raise
+ # an exception when we try to access without needed credentials.
« no previous file with comments | « py/utils/ ('k') | py/utils/ » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698