Index: sync/internal_api/attachments/attachment_service_impl.cc |
diff --git a/sync/internal_api/attachments/attachment_service_impl.cc b/sync/internal_api/attachments/attachment_service_impl.cc |
index 61fb1c663c9af0eae41eb681d173238a8601fefd..e9d3a8237ef483e40d502b34c34d9ec159a0f9e3 100644 |
--- a/sync/internal_api/attachments/attachment_service_impl.cc |
+++ b/sync/internal_api/attachments/attachment_service_impl.cc |
@@ -4,6 +4,8 @@ |
#include "sync/internal_api/public/attachments/attachment_service_impl.h" |
+#include <iterator> |
+ |
#include "base/bind.h" |
#include "base/message_loop/message_loop.h" |
#include "base/thread_task_runner_handle.h" |
@@ -141,6 +143,10 @@ scoped_ptr<syncer::AttachmentService> AttachmentServiceImpl::CreateForTest() { |
return attachment_service.Pass(); |
} |
+AttachmentStore* AttachmentServiceImpl::GetStore() { |
+ return attachment_store_.get(); |
+} |
+ |
void AttachmentServiceImpl::GetOrDownloadAttachments( |
const AttachmentIdList& attachment_ids, |
const GetOrDownloadCallback& callback) { |
@@ -163,25 +169,6 @@ void AttachmentServiceImpl::DropAttachments( |
callback)); |
} |
-void AttachmentServiceImpl::StoreAttachments(const AttachmentList& attachments, |
- const StoreCallback& callback) { |
- DCHECK(CalledOnValidThread()); |
- attachment_store_->Write(attachments, |
- base::Bind(&AttachmentServiceImpl::WriteDone, |
- weak_ptr_factory_.GetWeakPtr(), |
- callback)); |
- if (attachment_uploader_.get()) { |
- for (AttachmentList::const_iterator iter = attachments.begin(); |
- iter != attachments.end(); |
- ++iter) { |
- attachment_uploader_->UploadAttachment( |
- *iter, |
- base::Bind(&AttachmentServiceImpl::UploadDone, |
- weak_ptr_factory_.GetWeakPtr())); |
- } |
- } |
-} |
- |
void AttachmentServiceImpl::ReadDone( |
const scoped_refptr<GetOrDownloadState>& state, |
const AttachmentStore::Result& result, |
@@ -226,21 +213,10 @@ void AttachmentServiceImpl::DropDone(const DropCallback& callback, |
base::Bind(callback, drop_result)); |
} |
-void AttachmentServiceImpl::WriteDone(const StoreCallback& callback, |
- const AttachmentStore::Result& result) { |
- AttachmentService::StoreResult store_result = |
- AttachmentService::STORE_UNSPECIFIED_ERROR; |
- if (result == AttachmentStore::SUCCESS) { |
- store_result = AttachmentService::STORE_SUCCESS; |
- } |
- // TODO(maniscalco): Deal with case where an error occurred (bug 361251). |
- base::MessageLoop::current()->PostTask(FROM_HERE, |
- base::Bind(callback, store_result)); |
-} |
- |
void AttachmentServiceImpl::UploadDone( |
const AttachmentUploader::UploadResult& result, |
const AttachmentId& attachment_id) { |
+ ids_in_queue_.erase(attachment_id); |
// TODO(pavely): crbug/372622: Deal with UploadAttachment failures. |
if (result != AttachmentUploader::UPLOAD_SUCCESS) |
return; |
@@ -261,4 +237,60 @@ void AttachmentServiceImpl::DownloadDone( |
} |
} |
+void AttachmentServiceImpl::UploadAttachments( |
+ const AttachmentIdSet& attachment_ids) { |
+ DCHECK(CalledOnValidThread()); |
+ if (!attachment_uploader_.get()) { |
+ return; |
+ } |
+ |
+ // Enqueue the attachment ids that aren't already in the queue. |
+ AttachmentIdSet::const_iterator iter = attachment_ids.begin(); |
+ AttachmentIdSet::const_iterator end = attachment_ids.end(); |
+ for (; iter != end; ++iter) { |
+ if (ids_in_queue_.find(*iter) == ids_in_queue_.end()) { |
+ queue_.push_back(*iter); |
+ ids_in_queue_.insert(*iter); |
+ } |
+ } |
+ |
+ ProcessQueuedUploads(); |
+} |
+ |
+void AttachmentServiceImpl::ProcessQueuedUploads() { |
+ DCHECK(CalledOnValidThread()); |
+ // TODO(maniscalco): Don't dequeue them all. Instead, limit the number of |
+ // concurrent uploads and apply backoff on failure. |
+ while (!queue_.empty()) { |
+ const AttachmentId id = queue_.front(); |
+ queue_.pop_front(); |
+ AttachmentIdList attachment_ids; |
+ attachment_ids.push_back(id); |
+ attachment_store_->Read( |
+ attachment_ids, |
+ base::Bind(&AttachmentServiceImpl::ReadDoneNowUpload, |
+ weak_ptr_factory_.GetWeakPtr())); |
+ } |
+} |
+ |
+void AttachmentServiceImpl::ReadDoneNowUpload( |
+ const AttachmentStore::Result& result, |
+ scoped_ptr<AttachmentMap> attachments, |
+ scoped_ptr<AttachmentIdList> unavailable_attachment_ids) { |
+ DCHECK(CalledOnValidThread()); |
+ if (!unavailable_attachment_ids->empty()) { |
+ // TODO(maniscalco): We failed to read some attachments. What should we do |
+ // now? |
+ } |
+ |
+ AttachmentMap::const_iterator iter = attachments->begin(); |
+ AttachmentMap::const_iterator end = attachments->end(); |
+ for (; iter != end; ++iter) { |
+ attachment_uploader_->UploadAttachment( |
+ iter->second, |
+ base::Bind(&AttachmentServiceImpl::UploadDone, |
+ weak_ptr_factory_.GetWeakPtr())); |
+ } |
+} |
+ |
} // namespace syncer |