Index: sync/internal_api/attachments/attachment_service_impl.cc |
diff --git a/sync/internal_api/attachments/attachment_service_impl.cc b/sync/internal_api/attachments/attachment_service_impl.cc |
index 94ffead04fee09f54292b74fe448ab724bbb40aa..5ba540b188d684c66f9bcf4d1bab407f11aab820 100644 |
--- a/sync/internal_api/attachments/attachment_service_impl.cc |
+++ b/sync/internal_api/attachments/attachment_service_impl.cc |
@@ -9,6 +9,7 @@ |
#include "base/bind.h" |
#include "base/message_loop/message_loop.h" |
#include "base/thread_task_runner_handle.h" |
+#include "base/time/time.h" |
#include "sync/api/attachments/attachment.h" |
#include "sync/api/attachments/fake_attachment_store.h" |
#include "sync/internal_api/public/attachments/fake_attachment_downloader.h" |
@@ -113,7 +114,9 @@ AttachmentServiceImpl::AttachmentServiceImpl( |
scoped_refptr<AttachmentStore> attachment_store, |
scoped_ptr<AttachmentUploader> attachment_uploader, |
scoped_ptr<AttachmentDownloader> attachment_downloader, |
- Delegate* delegate) |
+ Delegate* delegate, |
+ const base::TimeDelta& initial_backoff_delay, |
+ const base::TimeDelta& max_backoff_delay) |
: attachment_store_(attachment_store), |
attachment_uploader_(attachment_uploader.Pass()), |
attachment_downloader_(attachment_downloader.Pass()), |
@@ -121,6 +124,16 @@ AttachmentServiceImpl::AttachmentServiceImpl( |
weak_ptr_factory_(this) { |
DCHECK(CalledOnValidThread()); |
DCHECK(attachment_store_.get()); |
+ |
+ // TODO(maniscalco): Observe network connectivity change events. When the |
+ // network becomes disconnected, consider suspending queue dispatch. When |
+ // connectivity is restored, consider clearing any dispatch backoff (bug |
+ // 411981). |
+ upload_task_queue_.reset(new TaskQueue<AttachmentId>( |
+ base::Bind(&AttachmentServiceImpl::BeginUpload, |
+ weak_ptr_factory_.GetWeakPtr()), |
+ initial_backoff_delay, |
+ max_backoff_delay)); |
} |
AttachmentServiceImpl::~AttachmentServiceImpl() { |
@@ -139,7 +152,9 @@ scoped_ptr<syncer::AttachmentService> AttachmentServiceImpl::CreateForTest() { |
new syncer::AttachmentServiceImpl(attachment_store, |
attachment_uploader.Pass(), |
attachment_downloader.Pass(), |
- NULL)); |
+ NULL, |
+ base::TimeDelta(), |
+ base::TimeDelta())); |
return attachment_service.Pass(); |
} |
@@ -216,12 +231,22 @@ void AttachmentServiceImpl::DropDone(const DropCallback& callback, |
void AttachmentServiceImpl::UploadDone( |
const AttachmentUploader::UploadResult& result, |
const AttachmentId& attachment_id) { |
- ids_in_queue_.erase(attachment_id); |
- // TODO(pavely): crbug/372622: Deal with UploadAttachment failures. |
- if (result != AttachmentUploader::UPLOAD_SUCCESS) |
- return; |
- if (delegate_) { |
- delegate_->OnAttachmentUploaded(attachment_id); |
+ DCHECK(CalledOnValidThread()); |
+ switch (result) { |
+ case AttachmentUploader::UPLOAD_SUCCESS: |
+ upload_task_queue_->MarkAsSucceeded(attachment_id); |
+ if (delegate_) { |
+ delegate_->OnAttachmentUploaded(attachment_id); |
+ } |
+ break; |
+ case AttachmentUploader::UPLOAD_TRANSIENT_ERROR: |
+ upload_task_queue_->MarkAsFailed(attachment_id); |
+ upload_task_queue_->AddToQueue(attachment_id); |
+ break; |
+ case AttachmentUploader::UPLOAD_UNSPECIFIED_ERROR: |
+ // TODO(pavely): crbug/372622: Deal with UploadAttachment failures. |
+ upload_task_queue_->MarkAsFailed(attachment_id); |
+ break; |
} |
} |
@@ -230,46 +255,36 @@ void AttachmentServiceImpl::DownloadDone( |
const AttachmentId& attachment_id, |
const AttachmentDownloader::DownloadResult& result, |
scoped_ptr<Attachment> attachment) { |
- if (result == AttachmentDownloader::DOWNLOAD_SUCCESS) { |
- state->AddAttachment(*attachment.get()); |
- } else { |
- state->AddUnavailableAttachmentId(attachment_id); |
+ switch (result) { |
+ case AttachmentDownloader::DOWNLOAD_SUCCESS: |
+ state->AddAttachment(*attachment.get()); |
+ break; |
+ case AttachmentDownloader::DOWNLOAD_TRANSIENT_ERROR: |
+ case AttachmentDownloader::DOWNLOAD_UNSPECIFIED_ERROR: |
+ state->AddUnavailableAttachmentId(attachment_id); |
+ break; |
} |
} |
+void AttachmentServiceImpl::BeginUpload(const AttachmentId& attachment_id) { |
+ DCHECK(CalledOnValidThread()); |
+ AttachmentIdList attachment_ids; |
+ attachment_ids.push_back(attachment_id); |
+ attachment_store_->Read(attachment_ids, |
+ base::Bind(&AttachmentServiceImpl::ReadDoneNowUpload, |
+ weak_ptr_factory_.GetWeakPtr())); |
+} |
+ |
void AttachmentServiceImpl::UploadAttachments( |
const AttachmentIdSet& attachment_ids) { |
DCHECK(CalledOnValidThread()); |
if (!attachment_uploader_.get()) { |
return; |
} |
- |
- // Enqueue the attachment ids that aren't already in the queue. |
AttachmentIdSet::const_iterator iter = attachment_ids.begin(); |
AttachmentIdSet::const_iterator end = attachment_ids.end(); |
for (; iter != end; ++iter) { |
- if (ids_in_queue_.find(*iter) == ids_in_queue_.end()) { |
- queue_.push_back(*iter); |
- ids_in_queue_.insert(*iter); |
- } |
- } |
- |
- ProcessQueuedUploads(); |
-} |
- |
-void AttachmentServiceImpl::ProcessQueuedUploads() { |
- DCHECK(CalledOnValidThread()); |
- // TODO(maniscalco): Don't dequeue them all. Instead, limit the number of |
- // concurrent uploads and apply backoff on failure. |
- while (!queue_.empty()) { |
- const AttachmentId id = queue_.front(); |
- queue_.pop_front(); |
- AttachmentIdList attachment_ids; |
- attachment_ids.push_back(id); |
- attachment_store_->Read( |
- attachment_ids, |
- base::Bind(&AttachmentServiceImpl::ReadDoneNowUpload, |
- weak_ptr_factory_.GetWeakPtr())); |
+ upload_task_queue_->AddToQueue(*iter); |
} |
} |
@@ -281,6 +296,11 @@ void AttachmentServiceImpl::ReadDoneNowUpload( |
if (!unavailable_attachment_ids->empty()) { |
// TODO(maniscalco): We failed to read some attachments. What should we do |
// now? |
+ AttachmentIdList::const_iterator iter = unavailable_attachment_ids->begin(); |
+ AttachmentIdList::const_iterator end = unavailable_attachment_ids->end(); |
+ for (; iter != end; ++iter) { |
+ upload_task_queue_->Cancel(*iter); |
+ } |
} |
AttachmentMap::const_iterator iter = attachments->begin(); |