Index: bin/s3multiput |
diff --git a/bin/s3put b/bin/s3multiput |
similarity index 50% |
copy from bin/s3put |
copy to bin/s3multiput |
index b5467d96b25f68e8321fd74f4db085adde9c9056..df6e9fe7c5877928906a914f4c4edaa996fa4ff5 100755 |
--- a/bin/s3put |
+++ b/bin/s3multiput |
@@ -15,22 +15,34 @@ |
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS |
# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL- |
# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT |
-# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, |
+# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, |
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, |
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS |
# IN THE SOFTWARE. |
# |
+ |
+# multipart portions copyright Fabian Topfstedt |
+# https://gist.github.com/924094 |
+ |
+ |
+import math |
+import mimetypes |
+from multiprocessing import Pool |
import getopt, sys, os |
+ |
import boto |
from boto.exception import S3ResponseError |
+from boto.s3.connection import S3Connection |
+from filechunkio import FileChunkIO |
+ |
usage_string = """ |
SYNOPSIS |
s3put [-a/--access_key <access_key>] [-s/--secret_key <secret_key>] |
-b/--bucket <bucket_name> [-c/--callback <num_cb>] |
[-d/--debug <debug_level>] [-i/--ignore <ignore_dirs>] |
[-n/--no_op] [-p/--prefix <prefix>] [-q/--quiet] |
- [-g/--grant grant] [-w/--no_overwrite] path |
+ [-g/--grant grant] [-w/--no_overwrite] [-r/--reduced] path |
Where |
access_key - Your AWS Access Key ID. If not supplied, boto will |
@@ -64,15 +76,16 @@ SYNOPSIS |
/bar/fie.baz |
The prefix must end in a trailing separator and if it |
does not then one will be added. |
+ reduced - Use Reduced Redundancy storage |
grant - A canned ACL policy that will be granted on each file |
transferred to S3. The value of provided must be one |
of the "canned" ACL policies supported by S3: |
private|public-read|public-read-write|authenticated-read |
- no_overwrite - No files will be overwritten on S3, if the file/key |
- exists on s3 it will be kept. This is useful for |
- resuming interrupted transfers. Note this is not a |
- sync, even if the file has been updated locally if |
- the key exists on s3 the file on s3 will not be |
+ no_overwrite - No files will be overwritten on S3, if the file/key |
+ exists on s3 it will be kept. This is useful for |
+ resuming interrupted transfers. Note this is not a |
+ sync, even if the file has been updated locally if |
+ the key exists on s3 the file on s3 will not be |
updated. |
If the -n option is provided, no files will be transferred to S3 but |
@@ -81,7 +94,7 @@ SYNOPSIS |
def usage(): |
print usage_string |
sys.exit() |
- |
+ |
def submit_cb(bytes_so_far, total_bytes): |
print '%d bytes transferred / %d bytes total' % (bytes_so_far, total_bytes) |
@@ -90,30 +103,108 @@ def get_key_name(fullpath, prefix): |
l = key_name.split(os.sep) |
return '/'.join(l) |
+def _upload_part(bucketname, aws_key, aws_secret, multipart_id, part_num, |
+ source_path, offset, bytes, debug, cb, num_cb, amount_of_retries=10): |
+ if debug == 1: |
+ print "_upload_part(%s, %s, %s)" % (source_path, offset, bytes) |
+ """ |
+ Uploads a part with retries. |
+ """ |
+ def _upload(retries_left=amount_of_retries): |
+ try: |
+ if debug == 1: |
+ print 'Start uploading part #%d ...' % part_num |
+ conn = S3Connection(aws_key, aws_secret) |
+ conn.debug = debug |
+ bucket = conn.get_bucket(bucketname) |
+ for mp in bucket.get_all_multipart_uploads(): |
+ if mp.id == multipart_id: |
+ with FileChunkIO(source_path, 'r', offset=offset, |
+ bytes=bytes) as fp: |
+ mp.upload_part_from_file(fp=fp, part_num=part_num, cb=cb, num_cb=num_cb) |
+ break |
+ except Exception, exc: |
+ if retries_left: |
+ _upload(retries_left=retries_left - 1) |
+ else: |
+ print 'Failed uploading part #%d' % part_num |
+ raise exc |
+ else: |
+ if debug == 1: |
+ print '... Uploaded part #%d' % part_num |
+ |
+ _upload() |
+ |
+def upload(bucketname, aws_key, aws_secret, source_path, keyname, |
+ reduced, debug, cb, num_cb, |
+ acl='private', headers={}, guess_mimetype=True, parallel_processes=4): |
+ """ |
+ Parallel multipart upload. |
+ """ |
+ conn = S3Connection(aws_key, aws_secret) |
+ conn.debug = debug |
+ bucket = conn.get_bucket(bucketname) |
+ |
+ if guess_mimetype: |
+ mtype = mimetypes.guess_type(keyname)[0] or 'application/octet-stream' |
+ headers.update({'Content-Type': mtype}) |
+ |
+ mp = bucket.initiate_multipart_upload(keyname, headers=headers, reduced_redundancy=reduced) |
+ |
+ source_size = os.stat(source_path).st_size |
+ bytes_per_chunk = max(int(math.sqrt(5242880) * math.sqrt(source_size)), |
+ 5242880) |
+ chunk_amount = int(math.ceil(source_size / float(bytes_per_chunk))) |
+ |
+ pool = Pool(processes=parallel_processes) |
+ for i in range(chunk_amount): |
+ offset = i * bytes_per_chunk |
+ remaining_bytes = source_size - offset |
+ bytes = min([bytes_per_chunk, remaining_bytes]) |
+ part_num = i + 1 |
+ pool.apply_async(_upload_part, [bucketname, aws_key, aws_secret, mp.id, |
+ part_num, source_path, offset, bytes, debug, cb, num_cb]) |
+ pool.close() |
+ pool.join() |
+ |
+ if len(mp.get_all_parts()) == chunk_amount: |
+ mp.complete_upload() |
+ key = bucket.get_key(keyname) |
+ key.set_acl(acl) |
+ else: |
+ mp.cancel_upload() |
+ |
+ |
def main(): |
- try: |
- opts, args = getopt.getopt(sys.argv[1:], 'a:b:c::d:g:hi:np:qs:vw', |
- ['access_key', 'bucket', 'callback', 'debug', 'help', 'grant', |
- 'ignore', 'no_op', 'prefix', 'quiet', 'secret_key', 'no_overwrite']) |
- except: |
- usage() |
- ignore_dirs = [] |
- aws_access_key_id = None |
+ |
+ # default values |
+ aws_access_key_id = None |
aws_secret_access_key = None |
bucket_name = '' |
- total = 0 |
- debug = 0 |
- cb = None |
+ ignore_dirs = [] |
+ total = 0 |
+ debug = 0 |
+ cb = None |
num_cb = 0 |
- quiet = False |
- no_op = False |
+ quiet = False |
+ no_op = False |
prefix = '/' |
- grant = None |
+ grant = None |
no_overwrite = False |
+ reduced = False |
+ |
+ try: |
+ opts, args = getopt.getopt(sys.argv[1:], 'a:b:c::d:g:hi:np:qs:wr', |
+ ['access_key', 'bucket', 'callback', 'debug', 'help', 'grant', |
+ 'ignore', 'no_op', 'prefix', 'quiet', 'secret_key', 'no_overwrite', |
+ 'reduced']) |
+ except: |
+ usage() |
+ |
+ # parse opts |
for o, a in opts: |
if o in ('-h', '--help'): |
usage() |
- sys.exit() |
if o in ('-a', '--access_key'): |
aws_access_key_id = a |
if o in ('-b', '--bucket'): |
@@ -139,58 +230,88 @@ def main(): |
quiet = True |
if o in ('-s', '--secret_key'): |
aws_secret_access_key = a |
+ if o in ('-r', '--reduced'): |
+ reduced = True |
+ |
if len(args) != 1: |
- print usage() |
+ usage() |
+ |
+ |
path = os.path.expanduser(args[0]) |
path = os.path.expandvars(path) |
path = os.path.abspath(path) |
- if bucket_name: |
- c = boto.connect_s3(aws_access_key_id=aws_access_key_id, |
- aws_secret_access_key=aws_secret_access_key) |
- c.debug = debug |
- b = c.get_bucket(bucket_name) |
- if os.path.isdir(path): |
- if no_overwrite: |
- if not quiet: |
- print 'Getting list of existing keys to check against' |
- keys = [] |
- for key in b.list(): |
- keys.append(key.name) |
- for root, dirs, files in os.walk(path): |
- for ignore in ignore_dirs: |
- if ignore in dirs: |
- dirs.remove(ignore) |
- for file in files: |
- fullpath = os.path.join(root, file) |
- key_name = get_key_name(fullpath, prefix) |
- copy_file = True |
- if no_overwrite: |
- if key_name in keys: |
- copy_file = False |
- if not quiet: |
- print 'Skipping %s as it exists in s3' % file |
- if copy_file: |
+ |
+ if not bucket_name: |
+ print "bucket name is required!" |
+ usage() |
+ |
+ c = boto.connect_s3(aws_access_key_id=aws_access_key_id, |
+ aws_secret_access_key=aws_secret_access_key) |
+ c.debug = debug |
+ b = c.get_bucket(bucket_name) |
+ |
+ # upload a directory of files recursively |
+ if os.path.isdir(path): |
+ if no_overwrite: |
+ if not quiet: |
+ print 'Getting list of existing keys to check against' |
+ keys = [] |
+ for key in b.list(): |
+ keys.append(key.name) |
+ for root, dirs, files in os.walk(path): |
+ for ignore in ignore_dirs: |
+ if ignore in dirs: |
+ dirs.remove(ignore) |
+ for file in files: |
+ fullpath = os.path.join(root, file) |
+ key_name = get_key_name(fullpath, prefix) |
+ copy_file = True |
+ if no_overwrite: |
+ if key_name in keys: |
+ copy_file = False |
if not quiet: |
- print 'Copying %s to %s/%s' % (file, bucket_name, key_name) |
- if not no_op: |
- k = b.new_key(key_name) |
- k.set_contents_from_filename(fullpath, cb=cb, |
- num_cb=num_cb, policy=grant) |
- total += 1 |
- elif os.path.isfile(path): |
- key_name = os.path.split(path)[1] |
- copy_file = True |
- if no_overwrite: |
- if b.get_key(key_name): |
- copy_file = False |
+ print 'Skipping %s as it exists in s3' % file |
+ |
+ if copy_file: |
if not quiet: |
- print 'Skipping %s as it exists in s3' % path |
- if copy_file: |
- k = b.new_key(key_name) |
- k.set_contents_from_filename(path, cb=cb, num_cb=num_cb, policy=grant) |
- else: |
- print usage() |
+ print 'Copying %s to %s/%s' % (file, bucket_name, key_name) |
+ |
+ if not no_op: |
+ if os.stat(fullpath).st_size == 0: |
+ # 0-byte files don't work and also don't need multipart upload |
+ k = b.new_key(key_name) |
+ k.set_contents_from_filename(fullpath, cb=cb, num_cb=num_cb, |
+ policy=grant, reduced_redundancy=reduced) |
+ else: |
+ upload(bucket_name, aws_access_key_id, |
+ aws_secret_access_key, fullpath, key_name, |
+ reduced, debug, cb, num_cb) |
+ total += 1 |
+ |
+ # upload a single file |
+ elif os.path.isfile(path): |
+ key_name = get_key_name(os.path.abspath(path), prefix) |
+ copy_file = True |
+ if no_overwrite: |
+ if b.get_key(key_name): |
+ copy_file = False |
+ if not quiet: |
+ print 'Skipping %s as it exists in s3' % path |
+ |
+ if copy_file: |
+ if not quiet: |
+ print 'Copying %s to %s/%s' % (path, bucket_name, key_name) |
+ |
+ if not no_op: |
+ if os.stat(path).st_size == 0: |
+ # 0-byte files don't work and also don't need multipart upload |
+ k = b.new_key(key_name) |
+ k.set_contents_from_filename(path, cb=cb, num_cb=num_cb, policy=grant, |
+ reduced_redundancy=reduced) |
+ else: |
+ upload(bucket_name, aws_access_key_id, |
+ aws_secret_access_key, path, key_name, |
+ reduced, debug, cb, num_cb) |
if __name__ == "__main__": |
main() |
- |