Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(24)

Side by Side Diff: chrome/browser/google_apis/drive_uploader.cc

Issue 12209035: Implement retry flow on DriveUploader. (Closed) Base URL: http://git.chromium.org/chromium/src.git@b148632_wapi_get_upload_status_operation_impl
Patch Set: Created 7 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "chrome/browser/google_apis/drive_uploader.h" 5 #include "chrome/browser/google_apis/drive_uploader.h"
6 6
7 #include <algorithm> 7 #include <algorithm>
8 8
9 #include "base/bind.h" 9 #include "base/bind.h"
10 #include "base/callback.h" 10 #include "base/callback.h"
11 #include "base/message_loop.h" 11 #include "base/message_loop.h"
12 #include "base/rand_util.h"
12 #include "base/string_number_conversions.h" 13 #include "base/string_number_conversions.h"
14 #include "base/time.h"
13 #include "chrome/browser/google_apis/drive_service_interface.h" 15 #include "chrome/browser/google_apis/drive_service_interface.h"
14 #include "chrome/browser/google_apis/drive_upload_mode.h" 16 #include "chrome/browser/google_apis/drive_upload_mode.h"
15 #include "chrome/browser/google_apis/gdata_wapi_parser.h" 17 #include "chrome/browser/google_apis/gdata_wapi_parser.h"
16 #include "content/public/browser/browser_thread.h" 18 #include "content/public/browser/browser_thread.h"
17 #include "content/public/browser/power_save_blocker.h" 19 #include "content/public/browser/power_save_blocker.h"
18 #include "net/base/file_stream.h" 20 #include "net/base/file_stream.h"
19 #include "net/base/net_errors.h" 21 #include "net/base/net_errors.h"
20 22
21 using content::BrowserThread; 23 using content::BrowserThread;
22 24
23 namespace { 25 namespace {
24 26
25 // Google Documents List API requires uploading in chunks of 512kB. 27 // Google Documents List API requires uploading in chunks of 512kB.
26 const int64 kUploadChunkSize = 512 * 1024; 28 const int64 kUploadChunkSize = 512 * 1024;
27 29
28 // Opens |path| with |file_stream| and returns the file size. 30 // Opens |path| with |file_stream| and returns the file size.
29 // If failed, returns an error code in a negative value. 31 // If failed, returns an error code in a negative value.
30 int64 OpenFileStreamAndGetSizeOnBlockingPool(net::FileStream* file_stream, 32 int64 OpenFileStreamAndGetSizeOnBlockingPool(net::FileStream* file_stream,
31 const FilePath& path) { 33 const FilePath& path) {
32 int result = file_stream->OpenSync( 34 int result = file_stream->OpenSync(
33 path, base::PLATFORM_FILE_OPEN | base::PLATFORM_FILE_READ); 35 path, base::PLATFORM_FILE_OPEN | base::PLATFORM_FILE_READ);
34 if (result != net::OK) 36 if (result != net::OK)
35 return result; 37 return result;
36 return file_stream->Available(); 38 return file_stream->Available();
37 } 39 }
38 40
41 base::TimeDelta GetWaitDuration(int trial_count) {
42 // The waiting duration for the exponential back is
43 // 1st trial: 1sec + rand millisecs
44 // 2nd trial: 2secs + rand millisecs
45 // 3rd trial: 4secs + rand millisecs
46 // 4th trial: 8secs + rand millisecs
47 // :
48 // nth trial: 2^(n-1) + rand millisecs
49 // Please see also
50 // https://developers.google.com/drive/manage-uploads#exp-backoff
51 // for more details.
satorux1 2013/02/07 05:48:32 I thought we had similar code elsewhere. zork@ wha
Zachary Kuznia 2013/02/07 06:26:36 This should call into the scheduler instead. That
hidehiko 2013/02/07 06:53:26 So, all retrying logic will be in DriveScheduler s
52 DCHECK_GT(trial_count, 0);
53
54 // Note: The range for RandInt is inclusive.
55 return base::TimeDelta::FromSeconds(1 << (trial_count - 1))
56 + base::TimeDelta::FromMilliseconds(base::RandInt(0, 9));
57 }
58
59 // Returns true if the status_code is 5xx SERVER ERROR, otherwise false.
60 bool IsHttp5xxServerError(int status_code) {
61 return status_code >= 500 && status_code < 600;
62 }
63
39 } // namespace 64 } // namespace
40 65
41 namespace google_apis { 66 namespace google_apis {
42 67
43 // Structure containing current upload information of file, passed between 68 // Structure containing current upload information of file, passed between
44 // DriveServiceInterface methods and callbacks. 69 // DriveServiceInterface methods and callbacks.
45 struct DriveUploader::UploadFileInfo { 70 struct DriveUploader::UploadFileInfo {
46 UploadFileInfo(scoped_refptr<base::SequencedTaskRunner> task_runner, 71 UploadFileInfo(scoped_refptr<base::SequencedTaskRunner> task_runner,
47 UploadMode upload_mode, 72 UploadMode upload_mode,
48 const GURL& initial_upload_location, 73 const GURL& initial_upload_location,
49 const FilePath& drive_path, 74 const FilePath& drive_path,
50 const FilePath& local_path, 75 const FilePath& local_path,
51 const std::string& title, 76 const std::string& title,
52 const std::string& content_type, 77 const std::string& content_type,
53 const std::string& etag, 78 const std::string& etag,
54 const UploadCompletionCallback& callback) 79 const UploadCompletionCallback& callback)
55 : upload_mode(upload_mode), 80 : upload_mode(upload_mode),
56 initial_upload_location(initial_upload_location), 81 initial_upload_location(initial_upload_location),
57 drive_path(drive_path), 82 drive_path(drive_path),
58 file_path(local_path), 83 file_path(local_path),
59 title(title), 84 title(title),
60 content_type(content_type), 85 content_type(content_type),
61 etag(etag), 86 etag(etag),
62 completion_callback(callback), 87 completion_callback(callback),
63 content_length(0), 88 content_length(0),
64 next_send_position(0),
65 file_stream(new net::FileStream(NULL)), 89 file_stream(new net::FileStream(NULL)),
66 buf(new net::IOBuffer(kUploadChunkSize)), 90 buf(new net::IOBuffer(kUploadChunkSize)),
91 buf_position(0),
92 buf_size(0),
67 blocking_task_runner(task_runner), 93 blocking_task_runner(task_runner),
68 power_save_blocker(content::PowerSaveBlocker::Create( 94 power_save_blocker(content::PowerSaveBlocker::Create(
69 content::PowerSaveBlocker::kPowerSaveBlockPreventAppSuspension, 95 content::PowerSaveBlocker::kPowerSaveBlockPreventAppSuspension,
70 "Upload in progress")) { 96 "Upload in progress")) {
71 } 97 }
72 98
73 ~UploadFileInfo() { 99 ~UploadFileInfo() {
74 blocking_task_runner->DeleteSoon(FROM_HERE, file_stream.release()); 100 blocking_task_runner->DeleteSoon(FROM_HERE, file_stream.release());
75 } 101 }
76 102
77 // Bytes left to upload. 103 // Bytes left to upload.
78 int64 SizeRemaining() const { 104 int64 SizeRemaining() const {
79 DCHECK(content_length >= next_send_position); 105 DCHECK(content_length >= buf_position + buf_size);
80 return content_length - next_send_position; 106 return content_length - (buf_position + buf_size);
81 } 107 }
82 108
83 // Useful for printf debugging. 109 // Useful for printf debugging.
84 std::string DebugString() const { 110 std::string DebugString() const {
85 return "title=[" + title + 111 return "title=[" + title +
86 "], file_path=[" + file_path.AsUTF8Unsafe() + 112 "], file_path=[" + file_path.AsUTF8Unsafe() +
87 "], content_type=[" + content_type + 113 "], content_type=[" + content_type +
88 "], content_length=[" + base::UintToString(content_length) + 114 "], content_length=[" + base::UintToString(content_length) +
89 "], drive_path=[" + drive_path.AsUTF8Unsafe() + 115 "], drive_path=[" + drive_path.AsUTF8Unsafe() +
90 "]"; 116 "]";
(...skipping 22 matching lines...) Expand all
113 // Callback to be invoked once the upload has finished. 139 // Callback to be invoked once the upload has finished.
114 const UploadCompletionCallback completion_callback; 140 const UploadCompletionCallback completion_callback;
115 141
116 // Location URL where file is to be uploaded to, returned from 142 // Location URL where file is to be uploaded to, returned from
117 // InitiateUpload. Used for the subsequent ResumeUpload requests. 143 // InitiateUpload. Used for the subsequent ResumeUpload requests.
118 GURL upload_location; 144 GURL upload_location;
119 145
120 // Header content-Length. 146 // Header content-Length.
121 int64 content_length; 147 int64 content_length;
122 148
123 // The start position of the contents to be sent as the next upload chunk.
124 int64 next_send_position;
125
126 // For opening and reading from physical file. 149 // For opening and reading from physical file.
127 // 150 //
128 // File operations are posted to |blocking_task_runner|, while the ownership 151 // File operations are posted to |blocking_task_runner|, while the ownership
129 // of the stream is held in UI thread. At the point when this UploadFileInfo 152 // of the stream is held in UI thread. At the point when this UploadFileInfo
130 // is destroyed, the ownership of the stream is passed to the worker pool. 153 // is destroyed, the ownership of the stream is passed to the worker pool.
131 // TODO(kinaba): We should switch to async API of FileStream once 154 // TODO(kinaba): We should switch to async API of FileStream once
132 // crbug.com/164312 is fixed. 155 // crbug.com/164312 is fixed.
133 scoped_ptr<net::FileStream> file_stream; 156 scoped_ptr<net::FileStream> file_stream;
134 157
135 // Holds current content to be uploaded. 158 // Holds current content to be uploaded.
136 const scoped_refptr<net::IOBuffer> buf; 159 const scoped_refptr<net::IOBuffer> buf;
137 160
161 // The start position of the |buf| in the file.
162 int64 buf_position;
163
164 // The size of available data in |buf|.
165 int64 buf_size;
166
167 // The number of trials for the current buffer data.
168 // This is used to calculate exponential backoff waiting duration.
169 int trial_count;
170
138 // Runner for net::FileStream tasks. 171 // Runner for net::FileStream tasks.
139 const scoped_refptr<base::SequencedTaskRunner> blocking_task_runner; 172 const scoped_refptr<base::SequencedTaskRunner> blocking_task_runner;
140 173
141 // Blocks system suspend while upload is in progress. 174 // Blocks system suspend while upload is in progress.
142 scoped_ptr<content::PowerSaveBlocker> power_save_blocker; 175 scoped_ptr<content::PowerSaveBlocker> power_save_blocker;
143 }; 176 };
144 177
145 DriveUploader::DriveUploader(DriveServiceInterface* drive_service) 178 DriveUploader::DriveUploader(DriveServiceInterface* drive_service)
146 : drive_service_(drive_service), 179 : drive_service_(drive_service),
147 ALLOW_THIS_IN_INITIALIZER_LIST(weak_ptr_factory_(this)) { 180 ALLOW_THIS_IN_INITIALIZER_LIST(weak_ptr_factory_(this)) {
(...skipping 174 matching lines...) Expand 10 before | Expand all | Expand 10 after
322 DCHECK_EQ(bytes_to_read, bytes_read); 355 DCHECK_EQ(bytes_to_read, bytes_read);
323 DVLOG(1) << "ReadCompletionCallback bytes read=" << bytes_read; 356 DVLOG(1) << "ReadCompletionCallback bytes read=" << bytes_read;
324 357
325 if (bytes_read < 0) { 358 if (bytes_read < 0) {
326 LOG(ERROR) << "Error reading from file " 359 LOG(ERROR) << "Error reading from file "
327 << upload_file_info->file_path.value(); 360 << upload_file_info->file_path.value();
328 UploadFailed(upload_file_info.Pass(), DRIVE_UPLOAD_ERROR_ABORT); 361 UploadFailed(upload_file_info.Pass(), DRIVE_UPLOAD_ERROR_ABORT);
329 return; 362 return;
330 } 363 }
331 364
332 int64 start_position = upload_file_info->next_send_position; 365 // Update the buffer's position.
333 upload_file_info->next_send_position += bytes_read; 366 // First, move the start position of the buffer by previous buffer size,
334 int64 end_position = upload_file_info->next_send_position; 367 // and then store the current buffer size.
368 upload_file_info->buf_position += upload_file_info->buf_size;
369 upload_file_info->buf_size = bytes_read;
335 370
371 // Try to send all the data in the buffer.
372 int64 start_position = upload_file_info->buf_position;
373 ResumeUpload(upload_file_info.Pass(), start_position);
374 }
375
376 void DriveUploader::ResumeUpload(scoped_ptr<UploadFileInfo> upload_file_info,
377 int64 start_position) {
378 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
379 int64 end_position =
380 upload_file_info->buf_position + upload_file_info->buf_size;
381 DCHECK_GE(start_position, upload_file_info->buf_position);
382 DCHECK_LE(start_position, end_position);
383 int64 buffer_offset = start_position - upload_file_info->buf_position;
384
385 // Try to send all the data in the buffer.
336 UploadFileInfo* info_ptr = upload_file_info.get(); 386 UploadFileInfo* info_ptr = upload_file_info.get();
337 drive_service_->ResumeUpload( 387 drive_service_->ResumeUpload(
338 ResumeUploadParams(info_ptr->upload_mode, 388 ResumeUploadParams(info_ptr->upload_mode,
339 start_position, 389 start_position,
340 end_position, 390 end_position,
341 info_ptr->content_length, 391 info_ptr->content_length,
342 info_ptr->content_type, 392 info_ptr->content_type,
343 info_ptr->buf, 393 info_ptr->buf,
394 buffer_offset,
344 info_ptr->upload_location, 395 info_ptr->upload_location,
345 info_ptr->drive_path), 396 info_ptr->drive_path),
346 base::Bind(&DriveUploader::OnUploadRangeResponseReceived, 397 base::Bind(&DriveUploader::OnUploadRangeResponseReceived,
347 weak_ptr_factory_.GetWeakPtr(), 398 weak_ptr_factory_.GetWeakPtr(),
348 base::Passed(&upload_file_info))); 399 base::Passed(&upload_file_info)));
349 } 400 }
350 401
351 void DriveUploader::OnUploadRangeResponseReceived( 402 void DriveUploader::OnUploadRangeResponseReceived(
352 scoped_ptr<UploadFileInfo> upload_file_info, 403 scoped_ptr<UploadFileInfo> upload_file_info,
353 const UploadRangeResponse& response, 404 const UploadRangeResponse& response,
(...skipping 13 matching lines...) Expand all
367 entry.Pass()); 418 entry.Pass());
368 return; 419 return;
369 } 420 }
370 421
371 // ETag mismatch. 422 // ETag mismatch.
372 if (response.code == HTTP_PRECONDITION) { 423 if (response.code == HTTP_PRECONDITION) {
373 UploadFailed(upload_file_info.Pass(), DRIVE_UPLOAD_ERROR_CONFLICT); 424 UploadFailed(upload_file_info.Pass(), DRIVE_UPLOAD_ERROR_CONFLICT);
374 return; 425 return;
375 } 426 }
376 427
428 if (IsHttp5xxServerError(response.code)) {
429 // The upload is interrupted. So retry with after waiting for a short
430 // period.
431 ResumeInterruptedUpload(upload_file_info.Pass());
432 return;
433 }
434
377 // If code is 308 (RESUME_INCOMPLETE) and range_received is what has been 435 // If code is 308 (RESUME_INCOMPLETE) and range_received is what has been
378 // previously uploaded (i.e. = upload_file_info->end_position), proceed to 436 // previously uploaded (i.e. = upload_file_info->end_position), proceed to
379 // upload the next chunk. 437 // upload the next chunk.
380 if (response.code != HTTP_RESUME_INCOMPLETE || 438 if (response.code != HTTP_RESUME_INCOMPLETE ||
381 response.start_position_received != 0 || 439 response.start_position_received != 0 ||
382 response.end_position_received != upload_file_info->next_send_position) { 440 response.end_position_received < upload_file_info->buf_position ||
441 response.end_position_received >
442 upload_file_info->buf_position + upload_file_info->buf_size) {
383 // TODO(achuith): Handle error cases, e.g. 443 // TODO(achuith): Handle error cases, e.g.
384 // - when previously uploaded data wasn't received by Google Docs server, 444 // - when previously uploaded data wasn't received by Google Docs server,
385 // i.e. when end_position_received < upload_file_info->end_position 445 // i.e. when end_position_received < upload_file_info->end_position
386 LOG(ERROR) << "UploadNextChunk http code=" << response.code 446 LOG(ERROR) << "UploadNextChunk http code=" << response.code
387 << ", start_position_received=" << response.start_position_received 447 << ", start_position_received=" << response.start_position_received
388 << ", end_position_received=" << response.end_position_received 448 << ", end_position_received=" << response.end_position_received
389 << ", expected end range=" << upload_file_info->next_send_position; 449 << ", sending buffer = [" << upload_file_info->buf_position
450 << ":" << upload_file_info->buf_position + upload_file_info->buf_size
451 << ")";
390 UploadFailed(upload_file_info.Pass(), 452 UploadFailed(upload_file_info.Pass(),
391 response.code == HTTP_FORBIDDEN ? 453 response.code == HTTP_FORBIDDEN ?
392 DRIVE_UPLOAD_ERROR_NO_SPACE : DRIVE_UPLOAD_ERROR_ABORT); 454 DRIVE_UPLOAD_ERROR_NO_SPACE : DRIVE_UPLOAD_ERROR_ABORT);
393 return; 455 return;
394 } 456 }
395 457
396 DVLOG(1) << "Received range " << response.start_position_received 458 DVLOG(1) << "Received range " << response.start_position_received
397 << "-" << response.end_position_received 459 << "-" << response.end_position_received
398 << " for [" << upload_file_info->title << "]"; 460 << " for [" << upload_file_info->title << "]";
399 461
400 // Continue uploading. 462 // Continue uploading.
401 UploadNextChunk(upload_file_info.Pass()); 463 if (response.end_position_received <
464 upload_file_info->buf_position + upload_file_info->buf_size) {
465 // There is the remaining data in the buffer, so resend it.
466 // PostTask is necessary here because we have to finish the callback
467 // before calling ResumeUpload due to the implementation of
468 // OperationRegistry. (http://crbug.com/134814)
469 MessageLoop::current()->PostTask(
470 FROM_HERE,
471 base::Bind(&DriveUploader::ResumeUpload,
472 weak_ptr_factory_.GetWeakPtr(),
473 base::Passed(&upload_file_info),
474 response.end_position_received));
475 } else {
476 // The whole data in the current buffer is successfully sent.
477 // Send the next chunk.
478 UploadNextChunk(upload_file_info.Pass());
479 }
480 }
481
482 void DriveUploader::ResumeInterruptedUpload(
483 scoped_ptr<UploadFileInfo> upload_file_info) {
484 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
485
486 // Calculate the waiting duration based on the number of trial.
487 // See GetWaitDuration for more details.
488 ++upload_file_info->trial_count;
489 base::TimeDelta wait_duration =
490 GetWaitDuration(upload_file_info->trial_count);
491
492 MessageLoop::current()->PostDelayedTask(
493 FROM_HERE,
494 base::Bind(&DriveUploader::ResumeInterruptedUploadAfterWait,
495 weak_ptr_factory_.GetWeakPtr(),
496 base::Passed(&upload_file_info)),
497 wait_duration);
498 }
499
500 void DriveUploader::ResumeInterruptedUploadAfterWait(
501 scoped_ptr<UploadFileInfo> upload_file_info) {
502 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
503
504 UploadFileInfo* info_ptr = upload_file_info.get();
505 drive_service_->GetUploadStatus(
506 info_ptr->upload_mode,
507 info_ptr->drive_path,
508 info_ptr->upload_location,
509 info_ptr->content_length,
510 base::Bind(&DriveUploader::OnUploadRangeResponseReceived,
511 weak_ptr_factory_.GetWeakPtr(),
512 base::Passed(&upload_file_info)));
402 } 513 }
403 514
404 void DriveUploader::UploadFailed(scoped_ptr<UploadFileInfo> upload_file_info, 515 void DriveUploader::UploadFailed(scoped_ptr<UploadFileInfo> upload_file_info,
405 DriveUploadError error) { 516 DriveUploadError error) {
406 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 517 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
407 518
408 LOG(ERROR) << "Upload failed " << upload_file_info->DebugString(); 519 LOG(ERROR) << "Upload failed " << upload_file_info->DebugString();
409 520
410 upload_file_info->completion_callback.Run(error, 521 upload_file_info->completion_callback.Run(error,
411 upload_file_info->drive_path, 522 upload_file_info->drive_path,
412 upload_file_info->file_path, 523 upload_file_info->file_path,
413 scoped_ptr<ResourceEntry>()); 524 scoped_ptr<ResourceEntry>());
414 } 525 }
415 526
416 } // namespace google_apis 527 } // namespace google_apis
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698