Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(732)

Unified Diff: bin/s3multiput

Issue 8386013: Merging in latest boto. (Closed) Base URL: svn://svn.chromium.org/boto
Patch Set: Redoing vendor drop by deleting and then merging. Created 9 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « bin/route53 ('k') | bin/s3put » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: bin/s3multiput
diff --git a/bin/s3put b/bin/s3multiput
similarity index 50%
copy from bin/s3put
copy to bin/s3multiput
index b5467d96b25f68e8321fd74f4db085adde9c9056..df6e9fe7c5877928906a914f4c4edaa996fa4ff5 100755
--- a/bin/s3put
+++ b/bin/s3multiput
@@ -15,22 +15,34 @@
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
-# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
# IN THE SOFTWARE.
#
+
+# multipart portions copyright Fabian Topfstedt
+# https://gist.github.com/924094
+
+
+import math
+import mimetypes
+from multiprocessing import Pool
import getopt, sys, os
+
import boto
from boto.exception import S3ResponseError
+from boto.s3.connection import S3Connection
+from filechunkio import FileChunkIO
+
usage_string = """
SYNOPSIS
s3put [-a/--access_key <access_key>] [-s/--secret_key <secret_key>]
-b/--bucket <bucket_name> [-c/--callback <num_cb>]
[-d/--debug <debug_level>] [-i/--ignore <ignore_dirs>]
[-n/--no_op] [-p/--prefix <prefix>] [-q/--quiet]
- [-g/--grant grant] [-w/--no_overwrite] path
+ [-g/--grant grant] [-w/--no_overwrite] [-r/--reduced] path
Where
access_key - Your AWS Access Key ID. If not supplied, boto will
@@ -64,15 +76,16 @@ SYNOPSIS
/bar/fie.baz
The prefix must end in a trailing separator and if it
does not then one will be added.
+ reduced - Use Reduced Redundancy storage
grant - A canned ACL policy that will be granted on each file
transferred to S3. The value of provided must be one
of the "canned" ACL policies supported by S3:
private|public-read|public-read-write|authenticated-read
- no_overwrite - No files will be overwritten on S3, if the file/key
- exists on s3 it will be kept. This is useful for
- resuming interrupted transfers. Note this is not a
- sync, even if the file has been updated locally if
- the key exists on s3 the file on s3 will not be
+ no_overwrite - No files will be overwritten on S3, if the file/key
+ exists on s3 it will be kept. This is useful for
+ resuming interrupted transfers. Note this is not a
+ sync, even if the file has been updated locally if
+ the key exists on s3 the file on s3 will not be
updated.
If the -n option is provided, no files will be transferred to S3 but
@@ -81,7 +94,7 @@ SYNOPSIS
def usage():
print usage_string
sys.exit()
-
+
def submit_cb(bytes_so_far, total_bytes):
print '%d bytes transferred / %d bytes total' % (bytes_so_far, total_bytes)
@@ -90,30 +103,108 @@ def get_key_name(fullpath, prefix):
l = key_name.split(os.sep)
return '/'.join(l)
+def _upload_part(bucketname, aws_key, aws_secret, multipart_id, part_num,
+ source_path, offset, bytes, debug, cb, num_cb, amount_of_retries=10):
+ if debug == 1:
+ print "_upload_part(%s, %s, %s)" % (source_path, offset, bytes)
+ """
+ Uploads a part with retries.
+ """
+ def _upload(retries_left=amount_of_retries):
+ try:
+ if debug == 1:
+ print 'Start uploading part #%d ...' % part_num
+ conn = S3Connection(aws_key, aws_secret)
+ conn.debug = debug
+ bucket = conn.get_bucket(bucketname)
+ for mp in bucket.get_all_multipart_uploads():
+ if mp.id == multipart_id:
+ with FileChunkIO(source_path, 'r', offset=offset,
+ bytes=bytes) as fp:
+ mp.upload_part_from_file(fp=fp, part_num=part_num, cb=cb, num_cb=num_cb)
+ break
+ except Exception, exc:
+ if retries_left:
+ _upload(retries_left=retries_left - 1)
+ else:
+ print 'Failed uploading part #%d' % part_num
+ raise exc
+ else:
+ if debug == 1:
+ print '... Uploaded part #%d' % part_num
+
+ _upload()
+
+def upload(bucketname, aws_key, aws_secret, source_path, keyname,
+ reduced, debug, cb, num_cb,
+ acl='private', headers={}, guess_mimetype=True, parallel_processes=4):
+ """
+ Parallel multipart upload.
+ """
+ conn = S3Connection(aws_key, aws_secret)
+ conn.debug = debug
+ bucket = conn.get_bucket(bucketname)
+
+ if guess_mimetype:
+ mtype = mimetypes.guess_type(keyname)[0] or 'application/octet-stream'
+ headers.update({'Content-Type': mtype})
+
+ mp = bucket.initiate_multipart_upload(keyname, headers=headers, reduced_redundancy=reduced)
+
+ source_size = os.stat(source_path).st_size
+ bytes_per_chunk = max(int(math.sqrt(5242880) * math.sqrt(source_size)),
+ 5242880)
+ chunk_amount = int(math.ceil(source_size / float(bytes_per_chunk)))
+
+ pool = Pool(processes=parallel_processes)
+ for i in range(chunk_amount):
+ offset = i * bytes_per_chunk
+ remaining_bytes = source_size - offset
+ bytes = min([bytes_per_chunk, remaining_bytes])
+ part_num = i + 1
+ pool.apply_async(_upload_part, [bucketname, aws_key, aws_secret, mp.id,
+ part_num, source_path, offset, bytes, debug, cb, num_cb])
+ pool.close()
+ pool.join()
+
+ if len(mp.get_all_parts()) == chunk_amount:
+ mp.complete_upload()
+ key = bucket.get_key(keyname)
+ key.set_acl(acl)
+ else:
+ mp.cancel_upload()
+
+
def main():
- try:
- opts, args = getopt.getopt(sys.argv[1:], 'a:b:c::d:g:hi:np:qs:vw',
- ['access_key', 'bucket', 'callback', 'debug', 'help', 'grant',
- 'ignore', 'no_op', 'prefix', 'quiet', 'secret_key', 'no_overwrite'])
- except:
- usage()
- ignore_dirs = []
- aws_access_key_id = None
+
+ # default values
+ aws_access_key_id = None
aws_secret_access_key = None
bucket_name = ''
- total = 0
- debug = 0
- cb = None
+ ignore_dirs = []
+ total = 0
+ debug = 0
+ cb = None
num_cb = 0
- quiet = False
- no_op = False
+ quiet = False
+ no_op = False
prefix = '/'
- grant = None
+ grant = None
no_overwrite = False
+ reduced = False
+
+ try:
+ opts, args = getopt.getopt(sys.argv[1:], 'a:b:c::d:g:hi:np:qs:wr',
+ ['access_key', 'bucket', 'callback', 'debug', 'help', 'grant',
+ 'ignore', 'no_op', 'prefix', 'quiet', 'secret_key', 'no_overwrite',
+ 'reduced'])
+ except:
+ usage()
+
+ # parse opts
for o, a in opts:
if o in ('-h', '--help'):
usage()
- sys.exit()
if o in ('-a', '--access_key'):
aws_access_key_id = a
if o in ('-b', '--bucket'):
@@ -139,58 +230,88 @@ def main():
quiet = True
if o in ('-s', '--secret_key'):
aws_secret_access_key = a
+ if o in ('-r', '--reduced'):
+ reduced = True
+
if len(args) != 1:
- print usage()
+ usage()
+
+
path = os.path.expanduser(args[0])
path = os.path.expandvars(path)
path = os.path.abspath(path)
- if bucket_name:
- c = boto.connect_s3(aws_access_key_id=aws_access_key_id,
- aws_secret_access_key=aws_secret_access_key)
- c.debug = debug
- b = c.get_bucket(bucket_name)
- if os.path.isdir(path):
- if no_overwrite:
- if not quiet:
- print 'Getting list of existing keys to check against'
- keys = []
- for key in b.list():
- keys.append(key.name)
- for root, dirs, files in os.walk(path):
- for ignore in ignore_dirs:
- if ignore in dirs:
- dirs.remove(ignore)
- for file in files:
- fullpath = os.path.join(root, file)
- key_name = get_key_name(fullpath, prefix)
- copy_file = True
- if no_overwrite:
- if key_name in keys:
- copy_file = False
- if not quiet:
- print 'Skipping %s as it exists in s3' % file
- if copy_file:
+
+ if not bucket_name:
+ print "bucket name is required!"
+ usage()
+
+ c = boto.connect_s3(aws_access_key_id=aws_access_key_id,
+ aws_secret_access_key=aws_secret_access_key)
+ c.debug = debug
+ b = c.get_bucket(bucket_name)
+
+ # upload a directory of files recursively
+ if os.path.isdir(path):
+ if no_overwrite:
+ if not quiet:
+ print 'Getting list of existing keys to check against'
+ keys = []
+ for key in b.list():
+ keys.append(key.name)
+ for root, dirs, files in os.walk(path):
+ for ignore in ignore_dirs:
+ if ignore in dirs:
+ dirs.remove(ignore)
+ for file in files:
+ fullpath = os.path.join(root, file)
+ key_name = get_key_name(fullpath, prefix)
+ copy_file = True
+ if no_overwrite:
+ if key_name in keys:
+ copy_file = False
if not quiet:
- print 'Copying %s to %s/%s' % (file, bucket_name, key_name)
- if not no_op:
- k = b.new_key(key_name)
- k.set_contents_from_filename(fullpath, cb=cb,
- num_cb=num_cb, policy=grant)
- total += 1
- elif os.path.isfile(path):
- key_name = os.path.split(path)[1]
- copy_file = True
- if no_overwrite:
- if b.get_key(key_name):
- copy_file = False
+ print 'Skipping %s as it exists in s3' % file
+
+ if copy_file:
if not quiet:
- print 'Skipping %s as it exists in s3' % path
- if copy_file:
- k = b.new_key(key_name)
- k.set_contents_from_filename(path, cb=cb, num_cb=num_cb, policy=grant)
- else:
- print usage()
+ print 'Copying %s to %s/%s' % (file, bucket_name, key_name)
+
+ if not no_op:
+ if os.stat(fullpath).st_size == 0:
+ # 0-byte files don't work and also don't need multipart upload
+ k = b.new_key(key_name)
+ k.set_contents_from_filename(fullpath, cb=cb, num_cb=num_cb,
+ policy=grant, reduced_redundancy=reduced)
+ else:
+ upload(bucket_name, aws_access_key_id,
+ aws_secret_access_key, fullpath, key_name,
+ reduced, debug, cb, num_cb)
+ total += 1
+
+ # upload a single file
+ elif os.path.isfile(path):
+ key_name = get_key_name(os.path.abspath(path), prefix)
+ copy_file = True
+ if no_overwrite:
+ if b.get_key(key_name):
+ copy_file = False
+ if not quiet:
+ print 'Skipping %s as it exists in s3' % path
+
+ if copy_file:
+ if not quiet:
+ print 'Copying %s to %s/%s' % (path, bucket_name, key_name)
+
+ if not no_op:
+ if os.stat(path).st_size == 0:
+ # 0-byte files don't work and also don't need multipart upload
+ k = b.new_key(key_name)
+ k.set_contents_from_filename(path, cb=cb, num_cb=num_cb, policy=grant,
+ reduced_redundancy=reduced)
+ else:
+ upload(bucket_name, aws_access_key_id,
+ aws_secret_access_key, path, key_name,
+ reduced, debug, cb, num_cb)
if __name__ == "__main__":
main()
-
« no previous file with comments | « bin/route53 ('k') | bin/s3put » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698