Chromium Code Reviews| Index: sync/internal_api/attachments/attachment_service_impl.cc |
| diff --git a/sync/internal_api/attachments/attachment_service_impl.cc b/sync/internal_api/attachments/attachment_service_impl.cc |
| index e9d3a8237ef483e40d502b34c34d9ec159a0f9e3..56432289e323303202dd5ed44742a657090fc939 100644 |
| --- a/sync/internal_api/attachments/attachment_service_impl.cc |
| +++ b/sync/internal_api/attachments/attachment_service_impl.cc |
| @@ -9,6 +9,7 @@ |
| #include "base/bind.h" |
| #include "base/message_loop/message_loop.h" |
| #include "base/thread_task_runner_handle.h" |
| +#include "base/time/time.h" |
| #include "sync/api/attachments/attachment.h" |
| #include "sync/api/attachments/fake_attachment_store.h" |
| #include "sync/internal_api/public/attachments/fake_attachment_downloader.h" |
| @@ -113,7 +114,9 @@ AttachmentServiceImpl::AttachmentServiceImpl( |
| scoped_ptr<AttachmentStore> attachment_store, |
| scoped_ptr<AttachmentUploader> attachment_uploader, |
| scoped_ptr<AttachmentDownloader> attachment_downloader, |
| - Delegate* delegate) |
| + Delegate* delegate, |
| + const base::TimeDelta& initial_backoff_delay, |
| + const base::TimeDelta& max_backoff_delay) |
| : attachment_store_(attachment_store.Pass()), |
| attachment_uploader_(attachment_uploader.Pass()), |
| attachment_downloader_(attachment_downloader.Pass()), |
| @@ -121,6 +124,16 @@ AttachmentServiceImpl::AttachmentServiceImpl( |
| weak_ptr_factory_(this) { |
| DCHECK(CalledOnValidThread()); |
| DCHECK(attachment_store_); |
| + |
| + // TODO(maniscalco): Observe network connectivity change events. When the |
| + // network becomes disconnected, consider suspending queue dispatch. When |
| + // connectivity is restored, consider clearing any dispatch backoff (bug |
| + // 411981). |
| + upload_task_queue_.reset(new TaskQueue<AttachmentId>( |
| + base::Bind(&AttachmentServiceImpl::BeginUpload, |
| + weak_ptr_factory_.GetWeakPtr()), |
| + initial_backoff_delay, |
| + max_backoff_delay)); |
| } |
| AttachmentServiceImpl::~AttachmentServiceImpl() { |
| @@ -139,7 +152,9 @@ scoped_ptr<syncer::AttachmentService> AttachmentServiceImpl::CreateForTest() { |
| new syncer::AttachmentServiceImpl(attachment_store.Pass(), |
| attachment_uploader.Pass(), |
| attachment_downloader.Pass(), |
| - NULL)); |
| + NULL, |
| + base::TimeDelta(), |
| + base::TimeDelta())); |
| return attachment_service.Pass(); |
| } |
| @@ -216,12 +231,20 @@ void AttachmentServiceImpl::DropDone(const DropCallback& callback, |
| void AttachmentServiceImpl::UploadDone( |
| const AttachmentUploader::UploadResult& result, |
| const AttachmentId& attachment_id) { |
| - ids_in_queue_.erase(attachment_id); |
| - // TODO(pavely): crbug/372622: Deal with UploadAttachment failures. |
| - if (result != AttachmentUploader::UPLOAD_SUCCESS) |
| - return; |
| - if (delegate_) { |
| - delegate_->OnAttachmentUploaded(attachment_id); |
| + DCHECK(CalledOnValidThread()); |
| + switch (result) { |
| + case AttachmentUploader::UPLOAD_SUCCESS: |
| + upload_task_queue_->MarkAsSucceeded(attachment_id); |
| + if (delegate_) { |
| + delegate_->OnAttachmentUploaded(attachment_id); |
| + } |
| + break; |
| + case AttachmentUploader::UPLOAD_TRANSIENT_ERROR: |
| + upload_task_queue_->Retry(attachment_id); |
| + break; |
| + case AttachmentUploader::UPLOAD_UNSPECIFIED_ERROR: |
| + // TODO(pavely): crbug/372622: Deal with UploadAttachment failures. |
| + break; |
|
pavely
2014/09/09 19:42:23
You have to call exactly one of three functions ex
maniscalco
2014/09/09 21:15:43
Good catch. There is a bug here. There is no "co
|
| } |
| } |
| @@ -230,46 +253,36 @@ void AttachmentServiceImpl::DownloadDone( |
| const AttachmentId& attachment_id, |
| const AttachmentDownloader::DownloadResult& result, |
| scoped_ptr<Attachment> attachment) { |
| - if (result == AttachmentDownloader::DOWNLOAD_SUCCESS) { |
| - state->AddAttachment(*attachment.get()); |
| - } else { |
| - state->AddUnavailableAttachmentId(attachment_id); |
| + switch (result) { |
| + case AttachmentDownloader::DOWNLOAD_SUCCESS: |
| + state->AddAttachment(*attachment.get()); |
| + break; |
| + case AttachmentDownloader::DOWNLOAD_TRANSIENT_ERROR: |
| + case AttachmentDownloader::DOWNLOAD_UNSPECIFIED_ERROR: |
| + state->AddUnavailableAttachmentId(attachment_id); |
| + break; |
| } |
|
pavely
2014/09/09 19:42:23
Add default: NOTREACHED(). It will show that you a
maniscalco
2014/09/09 21:15:43
I'm not sure that's preferable to no default. Wit
|
| } |
| +void AttachmentServiceImpl::BeginUpload(const AttachmentId& attachment_id) { |
| + DCHECK(CalledOnValidThread()); |
| + AttachmentIdList attachment_ids; |
| + attachment_ids.push_back(attachment_id); |
| + attachment_store_->Read(attachment_ids, |
| + base::Bind(&AttachmentServiceImpl::ReadDoneNowUpload, |
| + weak_ptr_factory_.GetWeakPtr())); |
| +} |
| + |
| void AttachmentServiceImpl::UploadAttachments( |
| const AttachmentIdSet& attachment_ids) { |
| DCHECK(CalledOnValidThread()); |
| if (!attachment_uploader_.get()) { |
| return; |
| } |
| - |
| - // Enqueue the attachment ids that aren't already in the queue. |
| AttachmentIdSet::const_iterator iter = attachment_ids.begin(); |
| AttachmentIdSet::const_iterator end = attachment_ids.end(); |
| for (; iter != end; ++iter) { |
| - if (ids_in_queue_.find(*iter) == ids_in_queue_.end()) { |
| - queue_.push_back(*iter); |
| - ids_in_queue_.insert(*iter); |
| - } |
| - } |
| - |
| - ProcessQueuedUploads(); |
| -} |
| - |
| -void AttachmentServiceImpl::ProcessQueuedUploads() { |
| - DCHECK(CalledOnValidThread()); |
| - // TODO(maniscalco): Don't dequeue them all. Instead, limit the number of |
| - // concurrent uploads and apply backoff on failure. |
| - while (!queue_.empty()) { |
| - const AttachmentId id = queue_.front(); |
| - queue_.pop_front(); |
| - AttachmentIdList attachment_ids; |
| - attachment_ids.push_back(id); |
| - attachment_store_->Read( |
| - attachment_ids, |
| - base::Bind(&AttachmentServiceImpl::ReadDoneNowUpload, |
| - weak_ptr_factory_.GetWeakPtr())); |
| + upload_task_queue_->AddToQueue(*iter); |
| } |
| } |
| @@ -281,6 +294,11 @@ void AttachmentServiceImpl::ReadDoneNowUpload( |
| if (!unavailable_attachment_ids->empty()) { |
| // TODO(maniscalco): We failed to read some attachments. What should we do |
| // now? |
| + AttachmentIdList::const_iterator iter = unavailable_attachment_ids->begin(); |
| + AttachmentIdList::const_iterator end = unavailable_attachment_ids->end(); |
| + for (; iter != end; ++iter) { |
| + upload_task_queue_->Cancel(*iter); |
| + } |
| } |
| AttachmentMap::const_iterator iter = attachments->begin(); |