OLD | NEW |
(Empty) | |
| 1 #!/usr/bin/python |
| 2 |
| 3 """Tests for gs_utils.py. |
| 4 |
| 5 TODO(epoger): How should we exercise these self-tests? See http://skbug.com/2751 |
| 6 """ |
| 7 |
| 8 # System-level imports. |
| 9 import os |
| 10 import posixpath |
| 11 import random |
| 12 import shutil |
| 13 import sys |
| 14 import tempfile |
| 15 |
| 16 # Local imports. |
| 17 import gs_utils |
| 18 |
| 19 |
| 20 def _test_public_read(): |
| 21 """Make sure we can read from public files without .boto file credentials.""" |
| 22 gs = gs_utils.GSUtils() |
| 23 gs.list_bucket_contents(bucket='chromium-skia-gm-summaries', subdir=None) |
| 24 |
| 25 |
| 26 def _test_authenticated_round_trip(): |
| 27 try: |
| 28 gs = gs_utils.GSUtils( |
| 29 boto_file_path=os.path.expanduser(os.path.join('~','.boto'))) |
| 30 except: |
| 31 print """ |
| 32 Failed to instantiate GSUtils object with default .boto file path. |
| 33 Do you have a ~/.boto file that provides the credentials needed to read |
| 34 and write gs://chromium-skia-gm ? |
| 35 """ |
| 36 raise |
| 37 |
| 38 bucket = 'chromium-skia-gm' |
| 39 remote_dir = 'gs_utils_test/%d' % random.randint(0, sys.maxint) |
| 40 subdir = 'subdir' |
| 41 filenames_to_upload = ['file1', 'file2'] |
| 42 |
| 43 # Upload test files to Google Storage, checking that their fine-grained |
| 44 # ACLs were set correctly. |
| 45 id_type = gs.IdType.GROUP_BY_DOMAIN |
| 46 id_value = 'chromium.org' |
| 47 set_permission = gs.Permission.READ |
| 48 local_src_dir = tempfile.mkdtemp() |
| 49 os.mkdir(os.path.join(local_src_dir, subdir)) |
| 50 try: |
| 51 for filename in filenames_to_upload: |
| 52 with open(os.path.join(local_src_dir, subdir, filename), 'w') as f: |
| 53 f.write('contents of %s\n' % filename) |
| 54 dest_path = posixpath.join(remote_dir, subdir, filename) |
| 55 gs.upload_file( |
| 56 source_path=os.path.join(local_src_dir, subdir, filename), |
| 57 dest_bucket=bucket, dest_path=dest_path, |
| 58 fine_grained_acl_list=[(id_type, id_value, set_permission)]) |
| 59 got_permission = gs.get_acl(bucket=bucket, path=dest_path, |
| 60 id_type=id_type, id_value=id_value) |
| 61 assert got_permission == set_permission, '%s == %s' % ( |
| 62 got_permission, set_permission) |
| 63 finally: |
| 64 shutil.rmtree(local_src_dir) |
| 65 |
| 66 # Get a list of the files we uploaded to Google Storage. |
| 67 (dirs, files) = gs.list_bucket_contents( |
| 68 bucket=bucket, subdir=remote_dir) |
| 69 assert dirs == [subdir], '%s == [%s]' % (dirs, subdir) |
| 70 assert files == [], '%s == []' % files |
| 71 (dirs, files) = gs.list_bucket_contents( |
| 72 bucket=bucket, subdir=posixpath.join(remote_dir, subdir)) |
| 73 assert dirs == [], '%s == []' % dirs |
| 74 assert files == filenames_to_upload, '%s == %s' % (files, filenames_to_upload) |
| 75 |
| 76 # Manipulate ACLs on one of those files, and verify them. |
| 77 # TODO(epoger): Test IdTypes other than GROUP_BY_DOMAIN ? |
| 78 # TODO(epoger): Test setting multiple ACLs on the same file? |
| 79 id_type = gs.IdType.GROUP_BY_DOMAIN |
| 80 id_value = 'google.com' |
| 81 fullpath = posixpath.join(remote_dir, subdir, filenames_to_upload[0]) |
| 82 # Make sure ACL is empty to start with ... |
| 83 gs.set_acl(bucket=bucket, path=fullpath, |
| 84 id_type=id_type, id_value=id_value, permission=gs.Permission.EMPTY) |
| 85 permission = gs.get_acl(bucket=bucket, path=fullpath, |
| 86 id_type=id_type, id_value=id_value) |
| 87 assert permission == gs.Permission.EMPTY, '%s == %s' % ( |
| 88 permission, gs.Permission.EMPTY) |
| 89 # ... set it to OWNER ... |
| 90 gs.set_acl(bucket=bucket, path=fullpath, |
| 91 id_type=id_type, id_value=id_value, permission=gs.Permission.OWNER) |
| 92 permission = gs.get_acl(bucket=bucket, path=fullpath, |
| 93 id_type=id_type, id_value=id_value) |
| 94 assert permission == gs.Permission.OWNER, '%s == %s' % ( |
| 95 permission, gs.Permission.OWNER) |
| 96 # ... now set it to READ ... |
| 97 gs.set_acl(bucket=bucket, path=fullpath, |
| 98 id_type=id_type, id_value=id_value, permission=gs.Permission.READ) |
| 99 permission = gs.get_acl(bucket=bucket, path=fullpath, |
| 100 id_type=id_type, id_value=id_value) |
| 101 assert permission == gs.Permission.READ, '%s == %s' % ( |
| 102 permission, gs.Permission.READ) |
| 103 # ... and clear it again to finish. |
| 104 gs.set_acl(bucket=bucket, path=fullpath, |
| 105 id_type=id_type, id_value=id_value, permission=gs.Permission.EMPTY) |
| 106 permission = gs.get_acl(bucket=bucket, path=fullpath, |
| 107 id_type=id_type, id_value=id_value) |
| 108 assert permission == gs.Permission.EMPTY, '%s == %s' % ( |
| 109 permission, gs.Permission.EMPTY) |
| 110 |
| 111 # Download the files we uploaded to Google Storage, and validate contents. |
| 112 local_dest_dir = tempfile.mkdtemp() |
| 113 try: |
| 114 for filename in filenames_to_upload: |
| 115 gs.download_file(source_bucket=bucket, |
| 116 source_path=posixpath.join(remote_dir, subdir, filename), |
| 117 dest_path=os.path.join(local_dest_dir, subdir, filename), |
| 118 create_subdirs_if_needed=True) |
| 119 with open(os.path.join(local_dest_dir, subdir, filename)) as f: |
| 120 file_contents = f.read() |
| 121 assert file_contents == 'contents of %s\n' % filename, ( |
| 122 '%s == "contents of %s\n"' % (file_contents, filename)) |
| 123 finally: |
| 124 shutil.rmtree(local_dest_dir) |
| 125 |
| 126 # Delete all the files we uploaded to Google Storage. |
| 127 for filename in filenames_to_upload: |
| 128 gs.delete_file(bucket=bucket, |
| 129 path=posixpath.join(remote_dir, subdir, filename)) |
| 130 |
| 131 # Confirm that we deleted all the files we uploaded to Google Storage. |
| 132 (dirs, files) = gs.list_bucket_contents( |
| 133 bucket=bucket, subdir=posixpath.join(remote_dir, subdir)) |
| 134 assert dirs == [], '%s == []' % dirs |
| 135 assert files == [], '%s == []' % files |
| 136 |
| 137 |
| 138 def _test_dir_upload_and_download(): |
| 139 """Test upload_dir_contents() and download_dir_contents().""" |
| 140 try: |
| 141 gs = gs_utils.GSUtils( |
| 142 boto_file_path=os.path.expanduser(os.path.join('~','.boto'))) |
| 143 except: |
| 144 print """ |
| 145 Failed to instantiate GSUtils object with default .boto file path. |
| 146 Do you have a ~/.boto file that provides the credentials needed to read |
| 147 and write gs://chromium-skia-gm ? |
| 148 """ |
| 149 raise |
| 150 |
| 151 bucket = 'chromium-skia-gm' |
| 152 remote_dir = 'gs_utils_test/%d' % random.randint(0, sys.maxint) |
| 153 subdir = 'subdir' |
| 154 filenames = ['file1', 'file2'] |
| 155 |
| 156 # Create directory tree on local disk and upload it. |
| 157 id_type = gs.IdType.GROUP_BY_DOMAIN |
| 158 id_value = 'chromium.org' |
| 159 set_permission = gs.Permission.READ |
| 160 local_src_dir = tempfile.mkdtemp() |
| 161 os.mkdir(os.path.join(local_src_dir, subdir)) |
| 162 try: |
| 163 for filename in filenames: |
| 164 with open(os.path.join(local_src_dir, subdir, filename), 'w') as f: |
| 165 f.write('contents of %s\n' % filename) |
| 166 gs.upload_dir_contents( |
| 167 source_dir=local_src_dir, dest_bucket=bucket, dest_dir=remote_dir, |
| 168 predefined_acl=gs.PredefinedACL.PRIVATE, |
| 169 fine_grained_acl_list=[(id_type, id_value, set_permission)]) |
| 170 finally: |
| 171 shutil.rmtree(local_src_dir) |
| 172 |
| 173 # Validate the list of the files we uploaded to Google Storage. |
| 174 (dirs, files) = gs.list_bucket_contents( |
| 175 bucket=bucket, subdir=remote_dir) |
| 176 assert dirs == [subdir], '%s == [%s]' % (dirs, subdir) |
| 177 assert files == [], '%s == []' % files |
| 178 (dirs, files) = gs.list_bucket_contents( |
| 179 bucket=bucket, subdir=posixpath.join(remote_dir, subdir)) |
| 180 assert dirs == [], '%s == []' % dirs |
| 181 assert files == filenames, '%s == %s' % (files, filenames) |
| 182 |
| 183 # Check the fine-grained ACLs we set in Google Storage. |
| 184 for filename in filenames: |
| 185 got_permission = gs.get_acl( |
| 186 bucket=bucket, path=posixpath.join(remote_dir, subdir, filename), |
| 187 id_type=id_type, id_value=id_value) |
| 188 assert got_permission == set_permission, '%s == %s' % ( |
| 189 got_permission, set_permission) |
| 190 |
| 191 # Download the directory tree we just uploaded, make sure its contents |
| 192 # are what we expect, and then delete the tree in Google Storage. |
| 193 local_dest_dir = tempfile.mkdtemp() |
| 194 try: |
| 195 gs.download_dir_contents(source_bucket=bucket, source_dir=remote_dir, |
| 196 dest_dir=local_dest_dir) |
| 197 for filename in filenames: |
| 198 with open(os.path.join(local_dest_dir, subdir, filename)) as f: |
| 199 file_contents = f.read() |
| 200 assert file_contents == 'contents of %s\n' % filename, ( |
| 201 '%s == "contents of %s\n"' % (file_contents, filename)) |
| 202 finally: |
| 203 shutil.rmtree(local_dest_dir) |
| 204 for filename in filenames: |
| 205 gs.delete_file(bucket=bucket, |
| 206 path=posixpath.join(remote_dir, subdir, filename)) |
| 207 |
| 208 |
| 209 if __name__ == '__main__': |
| 210 _test_public_read() |
| 211 _test_authenticated_round_trip() |
| 212 _test_dir_upload_and_download() |
| 213 # TODO(epoger): Add _test_unauthenticated_access() to make sure we raise |
| 214 # an exception when we try to access without needed credentials. |
OLD | NEW |