Index: third_party/upload.py |
diff --git a/third_party/upload.py b/third_party/upload.py |
index 31542663af90b4a3e344fac6512d6b29195d708b..37b1131c461301a8ee6f71768a145cd5e9a16a85 100755 |
--- a/third_party/upload.py |
+++ b/third_party/upload.py |
@@ -54,6 +54,8 @@ import urllib2 |
import urlparse |
import webbrowser |
+from multiprocessing.pool import ThreadPool |
+ |
# The md5 module was deprecated in Python 2.5. |
try: |
from hashlib import md5 |
@@ -475,7 +477,9 @@ class AbstractRpcServer(object): |
url_loc = urlparse.urlparse(url) |
self.host = '%s://%s' % (url_loc[0], url_loc[1]) |
elif e.code >= 500: |
- ErrorExit(e.read()) |
+ # TODO: We should error out on a 500, but the server is too flaky |
+ # for that at the moment. |
+ StatusUpdate('Upload got a 500 response: %d' % e.code) |
else: |
raise |
finally: |
@@ -617,6 +621,9 @@ group.add_option("--account_type", action="store", dest="account_type", |
help=("Override the default account type " |
"(defaults to '%default', " |
"valid choices are 'GOOGLE' and 'HOSTED').")) |
+group.add_option("-j", "--number-parallel-uploads", |
+ dest="num_upload_threads", default=8, |
+ help="Number of uploads to do in parallel.") |
# Issue |
group = parser.add_option_group("Issue options") |
group.add_option("-t", "--title", action="store", dest="title", |
@@ -1150,13 +1157,13 @@ class VersionControlSystem(object): |
else: |
type = "current" |
if len(content) > MAX_UPLOAD_SIZE: |
- print ("Not uploading the %s file for %s because it's too large." % |
- (type, filename)) |
+ result = ("Not uploading the %s file for %s because it's too large." % |
+ (type, filename)) |
file_too_large = True |
content = "" |
+ elif options.verbose: |
+ result = "Uploading %s file for %s" % (type, filename) |
checksum = md5(content).hexdigest() |
- if options.verbose > 0 and not file_too_large: |
- print "Uploading %s file for %s" % (type, filename) |
url = "/%d/upload_content/%d/%d" % (int(issue), int(patchset), file_id) |
form_fields = [("filename", filename), |
("status", status), |
@@ -1170,14 +1177,24 @@ class VersionControlSystem(object): |
form_fields.append(("user", options.email)) |
ctype, body = EncodeMultipartFormData(form_fields, |
[("data", filename, content)]) |
- response_body = rpc_server.Send(url, body, |
- content_type=ctype) |
+ try: |
+ response_body = rpc_server.Send(url, body, content_type=ctype) |
+ except urllib2.HTTPError, e: |
+ response_body = ("Failed to upload file for %s. Got %d status code." % |
+ (filename, e.code)) |
ghost stip (do not use)
2013/10/16 22:33:34
so it normally would have raised an exception, but
ojan
2013/10/16 22:39:32
The next line will sys.exit.
ghost stip (do not use)
2013/10/16 22:55:01
missed that, right on
|
+ |
if not response_body.startswith("OK"): |
StatusUpdate(" --> %s" % response_body) |
sys.exit(1) |
+ return result |
+ |
patches = dict() |
[patches.setdefault(v, k) for k, v in patch_list] |
+ |
+ threads = [] |
+ thread_pool = ThreadPool(options.num_upload_threads) |
+ |
for filename in patches.keys(): |
base_content, new_content, is_binary, status = files[filename] |
file_id_str = patches.get(filename) |
@@ -1186,9 +1203,17 @@ class VersionControlSystem(object): |
file_id_str = file_id_str[file_id_str.rfind("_") + 1:] |
file_id = int(file_id_str) |
if base_content != None: |
- UploadFile(filename, file_id, base_content, is_binary, status, True) |
+ t = thread_pool.apply_async(UploadFile, args=(filename, |
+ file_id, base_content, is_binary, status, True)) |
+ threads.append(t) |
if new_content != None: |
- UploadFile(filename, file_id, new_content, is_binary, status, False) |
+ t = thread_pool.apply_async(UploadFile, args=(filename, |
+ file_id, new_content, is_binary, status, False)) |
+ threads.append(t) |
+ |
+ for t in threads: |
+ print t.get(timeout=60) |
+ |
def IsImage(self, filename): |
"""Returns true if the filename has an image extension.""" |
@@ -2199,26 +2224,48 @@ def UploadSeparatePatches(issue, rpc_server, patchset, data, options): |
Returns a list of [patch_key, filename] for each file. |
""" |
- patches = SplitPatch(data) |
- rv = [] |
- for patch in patches: |
- if len(patch[1]) > MAX_UPLOAD_SIZE: |
- print ("Not uploading the patch for " + patch[0] + |
- " because the file is too large.") |
- continue |
- form_fields = [("filename", patch[0])] |
+ def UploadFile(filename, data): |
+ form_fields = [("filename", filename)] |
if not options.download_base: |
form_fields.append(("content_upload", "1")) |
- files = [("data", "data.diff", patch[1])] |
+ files = [("data", "data.diff", data)] |
ctype, body = EncodeMultipartFormData(form_fields, files) |
url = "/%d/upload_patch/%d" % (int(issue), int(patchset)) |
- print "Uploading patch for " + patch[0] |
- response_body = rpc_server.Send(url, body, content_type=ctype) |
+ |
+ try: |
+ response_body = rpc_server.Send(url, body, content_type=ctype) |
+ except urllib2.HTTPError, e: |
+ response_body = ("Failed to upload patch for %s. Got %d status code." % |
+ (filename, e.code)) |
+ |
lines = response_body.splitlines() |
if not lines or lines[0] != "OK": |
StatusUpdate(" --> %s" % response_body) |
sys.exit(1) |
- rv.append([lines[1], patch[0]]) |
+ return ("Uploaded patch for " + filename, [lines[1], filename]) |
+ |
+ threads = [] |
+ thread_pool = ThreadPool(options.num_upload_threads) |
+ |
+ patches = SplitPatch(data) |
+ rv = [] |
+ for patch in patches: |
+ if len(patch[1]) > MAX_UPLOAD_SIZE: |
+ print ("Not uploading the patch for " + patch[0] + |
+ " because the file is too large.") |
+ continue |
ghost stip (do not use)
2013/10/16 22:33:34
why would you continue and not exit with an error?
ojan
2013/10/16 22:39:32
This is just maintaining the old behavior. See abo
|
+ |
+ filename = patch[0] |
+ data = patch[1] |
+ |
+ t = thread_pool.apply_async(UploadFile, args=(filename, data)) |
+ threads.append(t) |
+ |
+ for t in threads: |
+ result = t.get(timeout=60) |
+ print result[0] |
+ rv.append(result[1]) |
+ |
return rv |