| Index: py/utils/gs_utils_manualtest.py
|
| diff --git a/py/utils/gs_utils_manualtest.py b/py/utils/gs_utils_manualtest.py
|
| new file mode 100755
|
| index 0000000000000000000000000000000000000000..6751647abbad44391fadac50a03f7103fbf6797b
|
| --- /dev/null
|
| +++ b/py/utils/gs_utils_manualtest.py
|
| @@ -0,0 +1,214 @@
|
| +#!/usr/bin/python
|
| +
|
| +"""Tests for gs_utils.py.
|
| +
|
| +TODO(epoger): How should we exercise these self-tests? See http://skbug.com/2751
|
| +"""
|
| +
|
| +# System-level imports.
|
| +import os
|
| +import posixpath
|
| +import random
|
| +import shutil
|
| +import sys
|
| +import tempfile
|
| +
|
| +# Local imports.
|
| +import gs_utils
|
| +
|
| +
|
| +def _test_public_read():
|
| + """Make sure we can read from public files without .boto file credentials."""
|
| + gs = gs_utils.GSUtils()
|
| + gs.list_bucket_contents(bucket='chromium-skia-gm-summaries', subdir=None)
|
| +
|
| +
|
| +def _test_authenticated_round_trip():
|
| + try:
|
| + gs = gs_utils.GSUtils(
|
| + boto_file_path=os.path.expanduser(os.path.join('~','.boto')))
|
| + except:
|
| + print """
|
| +Failed to instantiate GSUtils object with default .boto file path.
|
| +Do you have a ~/.boto file that provides the credentials needed to read
|
| +and write gs://chromium-skia-gm ?
|
| +"""
|
| + raise
|
| +
|
| + bucket = 'chromium-skia-gm'
|
| + remote_dir = 'gs_utils_test/%d' % random.randint(0, sys.maxint)
|
| + subdir = 'subdir'
|
| + filenames_to_upload = ['file1', 'file2']
|
| +
|
| + # Upload test files to Google Storage, checking that their fine-grained
|
| + # ACLs were set correctly.
|
| + id_type = gs.IdType.GROUP_BY_DOMAIN
|
| + id_value = 'chromium.org'
|
| + set_permission = gs.Permission.READ
|
| + local_src_dir = tempfile.mkdtemp()
|
| + os.mkdir(os.path.join(local_src_dir, subdir))
|
| + try:
|
| + for filename in filenames_to_upload:
|
| + with open(os.path.join(local_src_dir, subdir, filename), 'w') as f:
|
| + f.write('contents of %s\n' % filename)
|
| + dest_path = posixpath.join(remote_dir, subdir, filename)
|
| + gs.upload_file(
|
| + source_path=os.path.join(local_src_dir, subdir, filename),
|
| + dest_bucket=bucket, dest_path=dest_path,
|
| + fine_grained_acl_list=[(id_type, id_value, set_permission)])
|
| + got_permission = gs.get_acl(bucket=bucket, path=dest_path,
|
| + id_type=id_type, id_value=id_value)
|
| + assert got_permission == set_permission, '%s == %s' % (
|
| + got_permission, set_permission)
|
| + finally:
|
| + shutil.rmtree(local_src_dir)
|
| +
|
| + # Get a list of the files we uploaded to Google Storage.
|
| + (dirs, files) = gs.list_bucket_contents(
|
| + bucket=bucket, subdir=remote_dir)
|
| + assert dirs == [subdir], '%s == [%s]' % (dirs, subdir)
|
| + assert files == [], '%s == []' % files
|
| + (dirs, files) = gs.list_bucket_contents(
|
| + bucket=bucket, subdir=posixpath.join(remote_dir, subdir))
|
| + assert dirs == [], '%s == []' % dirs
|
| + assert files == filenames_to_upload, '%s == %s' % (files, filenames_to_upload)
|
| +
|
| + # Manipulate ACLs on one of those files, and verify them.
|
| + # TODO(epoger): Test IdTypes other than GROUP_BY_DOMAIN ?
|
| + # TODO(epoger): Test setting multiple ACLs on the same file?
|
| + id_type = gs.IdType.GROUP_BY_DOMAIN
|
| + id_value = 'google.com'
|
| + fullpath = posixpath.join(remote_dir, subdir, filenames_to_upload[0])
|
| + # Make sure ACL is empty to start with ...
|
| + gs.set_acl(bucket=bucket, path=fullpath,
|
| + id_type=id_type, id_value=id_value, permission=gs.Permission.EMPTY)
|
| + permission = gs.get_acl(bucket=bucket, path=fullpath,
|
| + id_type=id_type, id_value=id_value)
|
| + assert permission == gs.Permission.EMPTY, '%s == %s' % (
|
| + permission, gs.Permission.EMPTY)
|
| + # ... set it to OWNER ...
|
| + gs.set_acl(bucket=bucket, path=fullpath,
|
| + id_type=id_type, id_value=id_value, permission=gs.Permission.OWNER)
|
| + permission = gs.get_acl(bucket=bucket, path=fullpath,
|
| + id_type=id_type, id_value=id_value)
|
| + assert permission == gs.Permission.OWNER, '%s == %s' % (
|
| + permission, gs.Permission.OWNER)
|
| + # ... now set it to READ ...
|
| + gs.set_acl(bucket=bucket, path=fullpath,
|
| + id_type=id_type, id_value=id_value, permission=gs.Permission.READ)
|
| + permission = gs.get_acl(bucket=bucket, path=fullpath,
|
| + id_type=id_type, id_value=id_value)
|
| + assert permission == gs.Permission.READ, '%s == %s' % (
|
| + permission, gs.Permission.READ)
|
| + # ... and clear it again to finish.
|
| + gs.set_acl(bucket=bucket, path=fullpath,
|
| + id_type=id_type, id_value=id_value, permission=gs.Permission.EMPTY)
|
| + permission = gs.get_acl(bucket=bucket, path=fullpath,
|
| + id_type=id_type, id_value=id_value)
|
| + assert permission == gs.Permission.EMPTY, '%s == %s' % (
|
| + permission, gs.Permission.EMPTY)
|
| +
|
| + # Download the files we uploaded to Google Storage, and validate contents.
|
| + local_dest_dir = tempfile.mkdtemp()
|
| + try:
|
| + for filename in filenames_to_upload:
|
| + gs.download_file(source_bucket=bucket,
|
| + source_path=posixpath.join(remote_dir, subdir, filename),
|
| + dest_path=os.path.join(local_dest_dir, subdir, filename),
|
| + create_subdirs_if_needed=True)
|
| + with open(os.path.join(local_dest_dir, subdir, filename)) as f:
|
| + file_contents = f.read()
|
| + assert file_contents == 'contents of %s\n' % filename, (
|
| + '%s == "contents of %s\n"' % (file_contents, filename))
|
| + finally:
|
| + shutil.rmtree(local_dest_dir)
|
| +
|
| + # Delete all the files we uploaded to Google Storage.
|
| + for filename in filenames_to_upload:
|
| + gs.delete_file(bucket=bucket,
|
| + path=posixpath.join(remote_dir, subdir, filename))
|
| +
|
| + # Confirm that we deleted all the files we uploaded to Google Storage.
|
| + (dirs, files) = gs.list_bucket_contents(
|
| + bucket=bucket, subdir=posixpath.join(remote_dir, subdir))
|
| + assert dirs == [], '%s == []' % dirs
|
| + assert files == [], '%s == []' % files
|
| +
|
| +
|
| +def _test_dir_upload_and_download():
|
| + """Test upload_dir_contents() and download_dir_contents()."""
|
| + try:
|
| + gs = gs_utils.GSUtils(
|
| + boto_file_path=os.path.expanduser(os.path.join('~','.boto')))
|
| + except:
|
| + print """
|
| +Failed to instantiate GSUtils object with default .boto file path.
|
| +Do you have a ~/.boto file that provides the credentials needed to read
|
| +and write gs://chromium-skia-gm ?
|
| +"""
|
| + raise
|
| +
|
| + bucket = 'chromium-skia-gm'
|
| + remote_dir = 'gs_utils_test/%d' % random.randint(0, sys.maxint)
|
| + subdir = 'subdir'
|
| + filenames = ['file1', 'file2']
|
| +
|
| + # Create directory tree on local disk and upload it.
|
| + id_type = gs.IdType.GROUP_BY_DOMAIN
|
| + id_value = 'chromium.org'
|
| + set_permission = gs.Permission.READ
|
| + local_src_dir = tempfile.mkdtemp()
|
| + os.mkdir(os.path.join(local_src_dir, subdir))
|
| + try:
|
| + for filename in filenames:
|
| + with open(os.path.join(local_src_dir, subdir, filename), 'w') as f:
|
| + f.write('contents of %s\n' % filename)
|
| + gs.upload_dir_contents(
|
| + source_dir=local_src_dir, dest_bucket=bucket, dest_dir=remote_dir,
|
| + predefined_acl=gs.PredefinedACL.PRIVATE,
|
| + fine_grained_acl_list=[(id_type, id_value, set_permission)])
|
| + finally:
|
| + shutil.rmtree(local_src_dir)
|
| +
|
| + # Validate the list of the files we uploaded to Google Storage.
|
| + (dirs, files) = gs.list_bucket_contents(
|
| + bucket=bucket, subdir=remote_dir)
|
| + assert dirs == [subdir], '%s == [%s]' % (dirs, subdir)
|
| + assert files == [], '%s == []' % files
|
| + (dirs, files) = gs.list_bucket_contents(
|
| + bucket=bucket, subdir=posixpath.join(remote_dir, subdir))
|
| + assert dirs == [], '%s == []' % dirs
|
| + assert files == filenames, '%s == %s' % (files, filenames)
|
| +
|
| + # Check the fine-grained ACLs we set in Google Storage.
|
| + for filename in filenames:
|
| + got_permission = gs.get_acl(
|
| + bucket=bucket, path=posixpath.join(remote_dir, subdir, filename),
|
| + id_type=id_type, id_value=id_value)
|
| + assert got_permission == set_permission, '%s == %s' % (
|
| + got_permission, set_permission)
|
| +
|
| + # Download the directory tree we just uploaded, make sure its contents
|
| + # are what we expect, and then delete the tree in Google Storage.
|
| + local_dest_dir = tempfile.mkdtemp()
|
| + try:
|
| + gs.download_dir_contents(source_bucket=bucket, source_dir=remote_dir,
|
| + dest_dir=local_dest_dir)
|
| + for filename in filenames:
|
| + with open(os.path.join(local_dest_dir, subdir, filename)) as f:
|
| + file_contents = f.read()
|
| + assert file_contents == 'contents of %s\n' % filename, (
|
| + '%s == "contents of %s\n"' % (file_contents, filename))
|
| + finally:
|
| + shutil.rmtree(local_dest_dir)
|
| + for filename in filenames:
|
| + gs.delete_file(bucket=bucket,
|
| + path=posixpath.join(remote_dir, subdir, filename))
|
| +
|
| +
|
| +if __name__ == '__main__':
|
| + _test_public_read()
|
| + _test_authenticated_round_trip()
|
| + _test_dir_upload_and_download()
|
| + # TODO(epoger): Add _test_unauthenticated_access() to make sure we raise
|
| + # an exception when we try to access without needed credentials.
|
|
|