Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(9)

Side by Side Diff: sync/internal_api/attachments/attachment_service_impl.cc

Issue 554743004: Update AttachmentServiceImpl to retry attachment uploads. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Check ShouldDispatch in Dispatch. Created 6 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2014 The Chromium Authors. All rights reserved. 1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "sync/internal_api/public/attachments/attachment_service_impl.h" 5 #include "sync/internal_api/public/attachments/attachment_service_impl.h"
6 6
7 #include <iterator> 7 #include <iterator>
8 8
9 #include "base/bind.h" 9 #include "base/bind.h"
10 #include "base/message_loop/message_loop.h" 10 #include "base/message_loop/message_loop.h"
11 #include "base/thread_task_runner_handle.h" 11 #include "base/thread_task_runner_handle.h"
12 #include "base/time/time.h"
12 #include "sync/api/attachments/attachment.h" 13 #include "sync/api/attachments/attachment.h"
13 #include "sync/api/attachments/fake_attachment_store.h" 14 #include "sync/api/attachments/fake_attachment_store.h"
14 #include "sync/internal_api/public/attachments/fake_attachment_downloader.h" 15 #include "sync/internal_api/public/attachments/fake_attachment_downloader.h"
15 #include "sync/internal_api/public/attachments/fake_attachment_uploader.h" 16 #include "sync/internal_api/public/attachments/fake_attachment_uploader.h"
16 17
17 namespace syncer { 18 namespace syncer {
18 19
19 // GetOrDownloadAttachments starts multiple parallel DownloadAttachment calls. 20 // GetOrDownloadAttachments starts multiple parallel DownloadAttachment calls.
20 // GetOrDownloadState tracks completion of these calls and posts callback for 21 // GetOrDownloadState tracks completion of these calls and posts callback for
21 // consumer once all attachments are either retrieved or reported unavailable. 22 // consumer once all attachments are either retrieved or reported unavailable.
(...skipping 84 matching lines...) Expand 10 before | Expand all | Expand 10 after
106 base::MessageLoop::current()->PostTask( 107 base::MessageLoop::current()->PostTask(
107 FROM_HERE, 108 FROM_HERE,
108 base::Bind(callback_, result, base::Passed(&retrieved_attachments_))); 109 base::Bind(callback_, result, base::Passed(&retrieved_attachments_)));
109 } 110 }
110 } 111 }
111 112
112 AttachmentServiceImpl::AttachmentServiceImpl( 113 AttachmentServiceImpl::AttachmentServiceImpl(
113 scoped_ptr<AttachmentStore> attachment_store, 114 scoped_ptr<AttachmentStore> attachment_store,
114 scoped_ptr<AttachmentUploader> attachment_uploader, 115 scoped_ptr<AttachmentUploader> attachment_uploader,
115 scoped_ptr<AttachmentDownloader> attachment_downloader, 116 scoped_ptr<AttachmentDownloader> attachment_downloader,
116 Delegate* delegate) 117 Delegate* delegate,
118 const base::TimeDelta& initial_backoff_delay,
119 const base::TimeDelta& max_backoff_delay)
117 : attachment_store_(attachment_store.Pass()), 120 : attachment_store_(attachment_store.Pass()),
118 attachment_uploader_(attachment_uploader.Pass()), 121 attachment_uploader_(attachment_uploader.Pass()),
119 attachment_downloader_(attachment_downloader.Pass()), 122 attachment_downloader_(attachment_downloader.Pass()),
120 delegate_(delegate), 123 delegate_(delegate),
121 weak_ptr_factory_(this) { 124 weak_ptr_factory_(this) {
122 DCHECK(CalledOnValidThread()); 125 DCHECK(CalledOnValidThread());
123 DCHECK(attachment_store_); 126 DCHECK(attachment_store_);
127
128 // TODO(maniscalco): Observe network connectivity change events. When the
129 // network becomes disconnected, consider suspending queue dispatch. When
130 // connectivity is restored, consider clearing any dispatch backoff (bug
131 // 411981).
132 upload_task_queue_.reset(new TaskQueue<AttachmentId>(
133 base::Bind(&AttachmentServiceImpl::BeginUpload,
134 weak_ptr_factory_.GetWeakPtr()),
135 initial_backoff_delay,
136 max_backoff_delay));
124 } 137 }
125 138
126 AttachmentServiceImpl::~AttachmentServiceImpl() { 139 AttachmentServiceImpl::~AttachmentServiceImpl() {
127 DCHECK(CalledOnValidThread()); 140 DCHECK(CalledOnValidThread());
128 } 141 }
129 142
130 // Static. 143 // Static.
131 scoped_ptr<syncer::AttachmentService> AttachmentServiceImpl::CreateForTest() { 144 scoped_ptr<syncer::AttachmentService> AttachmentServiceImpl::CreateForTest() {
132 scoped_ptr<syncer::AttachmentStore> attachment_store( 145 scoped_ptr<syncer::AttachmentStore> attachment_store(
133 new syncer::FakeAttachmentStore(base::ThreadTaskRunnerHandle::Get())); 146 new syncer::FakeAttachmentStore(base::ThreadTaskRunnerHandle::Get()));
134 scoped_ptr<AttachmentUploader> attachment_uploader( 147 scoped_ptr<AttachmentUploader> attachment_uploader(
135 new FakeAttachmentUploader); 148 new FakeAttachmentUploader);
136 scoped_ptr<AttachmentDownloader> attachment_downloader( 149 scoped_ptr<AttachmentDownloader> attachment_downloader(
137 new FakeAttachmentDownloader()); 150 new FakeAttachmentDownloader());
138 scoped_ptr<syncer::AttachmentService> attachment_service( 151 scoped_ptr<syncer::AttachmentService> attachment_service(
139 new syncer::AttachmentServiceImpl(attachment_store.Pass(), 152 new syncer::AttachmentServiceImpl(attachment_store.Pass(),
140 attachment_uploader.Pass(), 153 attachment_uploader.Pass(),
141 attachment_downloader.Pass(), 154 attachment_downloader.Pass(),
142 NULL)); 155 NULL,
156 base::TimeDelta(),
157 base::TimeDelta()));
143 return attachment_service.Pass(); 158 return attachment_service.Pass();
144 } 159 }
145 160
146 AttachmentStore* AttachmentServiceImpl::GetStore() { 161 AttachmentStore* AttachmentServiceImpl::GetStore() {
147 return attachment_store_.get(); 162 return attachment_store_.get();
148 } 163 }
149 164
150 void AttachmentServiceImpl::GetOrDownloadAttachments( 165 void AttachmentServiceImpl::GetOrDownloadAttachments(
151 const AttachmentIdList& attachment_ids, 166 const AttachmentIdList& attachment_ids,
152 const GetOrDownloadCallback& callback) { 167 const GetOrDownloadCallback& callback) {
(...skipping 56 matching lines...) Expand 10 before | Expand all | Expand 10 after
209 drop_result = AttachmentService::DROP_SUCCESS; 224 drop_result = AttachmentService::DROP_SUCCESS;
210 } 225 }
211 // TODO(maniscalco): Deal with case where an error occurred (bug 361251). 226 // TODO(maniscalco): Deal with case where an error occurred (bug 361251).
212 base::MessageLoop::current()->PostTask(FROM_HERE, 227 base::MessageLoop::current()->PostTask(FROM_HERE,
213 base::Bind(callback, drop_result)); 228 base::Bind(callback, drop_result));
214 } 229 }
215 230
216 void AttachmentServiceImpl::UploadDone( 231 void AttachmentServiceImpl::UploadDone(
217 const AttachmentUploader::UploadResult& result, 232 const AttachmentUploader::UploadResult& result,
218 const AttachmentId& attachment_id) { 233 const AttachmentId& attachment_id) {
219 ids_in_queue_.erase(attachment_id); 234 DCHECK(CalledOnValidThread());
220 // TODO(pavely): crbug/372622: Deal with UploadAttachment failures. 235 switch (result) {
221 if (result != AttachmentUploader::UPLOAD_SUCCESS) 236 case AttachmentUploader::UPLOAD_SUCCESS:
222 return; 237 upload_task_queue_->MarkAsSucceeded(attachment_id);
223 if (delegate_) { 238 if (delegate_) {
224 delegate_->OnAttachmentUploaded(attachment_id); 239 delegate_->OnAttachmentUploaded(attachment_id);
240 }
241 break;
242 case AttachmentUploader::UPLOAD_TRANSIENT_ERROR:
243 upload_task_queue_->Retry(attachment_id);
244 break;
245 case AttachmentUploader::UPLOAD_UNSPECIFIED_ERROR:
246 // TODO(pavely): crbug/372622: Deal with UploadAttachment failures.
247 break;
pavely 2014/09/09 19:42:23 You have to call exactly one of three functions ex
maniscalco 2014/09/09 21:15:43 Good catch. There is a bug here. There is no "co
225 } 248 }
226 } 249 }
227 250
228 void AttachmentServiceImpl::DownloadDone( 251 void AttachmentServiceImpl::DownloadDone(
229 const scoped_refptr<GetOrDownloadState>& state, 252 const scoped_refptr<GetOrDownloadState>& state,
230 const AttachmentId& attachment_id, 253 const AttachmentId& attachment_id,
231 const AttachmentDownloader::DownloadResult& result, 254 const AttachmentDownloader::DownloadResult& result,
232 scoped_ptr<Attachment> attachment) { 255 scoped_ptr<Attachment> attachment) {
233 if (result == AttachmentDownloader::DOWNLOAD_SUCCESS) { 256 switch (result) {
234 state->AddAttachment(*attachment.get()); 257 case AttachmentDownloader::DOWNLOAD_SUCCESS:
235 } else { 258 state->AddAttachment(*attachment.get());
236 state->AddUnavailableAttachmentId(attachment_id); 259 break;
260 case AttachmentDownloader::DOWNLOAD_TRANSIENT_ERROR:
261 case AttachmentDownloader::DOWNLOAD_UNSPECIFIED_ERROR:
262 state->AddUnavailableAttachmentId(attachment_id);
263 break;
237 } 264 }
pavely 2014/09/09 19:42:23 Add default: NOTREACHED(). It will show that you a
maniscalco 2014/09/09 21:15:43 I'm not sure that's preferable to no default. Wit
238 } 265 }
239 266
267 void AttachmentServiceImpl::BeginUpload(const AttachmentId& attachment_id) {
268 DCHECK(CalledOnValidThread());
269 AttachmentIdList attachment_ids;
270 attachment_ids.push_back(attachment_id);
271 attachment_store_->Read(attachment_ids,
272 base::Bind(&AttachmentServiceImpl::ReadDoneNowUpload,
273 weak_ptr_factory_.GetWeakPtr()));
274 }
275
240 void AttachmentServiceImpl::UploadAttachments( 276 void AttachmentServiceImpl::UploadAttachments(
241 const AttachmentIdSet& attachment_ids) { 277 const AttachmentIdSet& attachment_ids) {
242 DCHECK(CalledOnValidThread()); 278 DCHECK(CalledOnValidThread());
243 if (!attachment_uploader_.get()) { 279 if (!attachment_uploader_.get()) {
244 return; 280 return;
245 } 281 }
246
247 // Enqueue the attachment ids that aren't already in the queue.
248 AttachmentIdSet::const_iterator iter = attachment_ids.begin(); 282 AttachmentIdSet::const_iterator iter = attachment_ids.begin();
249 AttachmentIdSet::const_iterator end = attachment_ids.end(); 283 AttachmentIdSet::const_iterator end = attachment_ids.end();
250 for (; iter != end; ++iter) { 284 for (; iter != end; ++iter) {
251 if (ids_in_queue_.find(*iter) == ids_in_queue_.end()) { 285 upload_task_queue_->AddToQueue(*iter);
252 queue_.push_back(*iter);
253 ids_in_queue_.insert(*iter);
254 }
255 }
256
257 ProcessQueuedUploads();
258 }
259
260 void AttachmentServiceImpl::ProcessQueuedUploads() {
261 DCHECK(CalledOnValidThread());
262 // TODO(maniscalco): Don't dequeue them all. Instead, limit the number of
263 // concurrent uploads and apply backoff on failure.
264 while (!queue_.empty()) {
265 const AttachmentId id = queue_.front();
266 queue_.pop_front();
267 AttachmentIdList attachment_ids;
268 attachment_ids.push_back(id);
269 attachment_store_->Read(
270 attachment_ids,
271 base::Bind(&AttachmentServiceImpl::ReadDoneNowUpload,
272 weak_ptr_factory_.GetWeakPtr()));
273 } 286 }
274 } 287 }
275 288
276 void AttachmentServiceImpl::ReadDoneNowUpload( 289 void AttachmentServiceImpl::ReadDoneNowUpload(
277 const AttachmentStore::Result& result, 290 const AttachmentStore::Result& result,
278 scoped_ptr<AttachmentMap> attachments, 291 scoped_ptr<AttachmentMap> attachments,
279 scoped_ptr<AttachmentIdList> unavailable_attachment_ids) { 292 scoped_ptr<AttachmentIdList> unavailable_attachment_ids) {
280 DCHECK(CalledOnValidThread()); 293 DCHECK(CalledOnValidThread());
281 if (!unavailable_attachment_ids->empty()) { 294 if (!unavailable_attachment_ids->empty()) {
282 // TODO(maniscalco): We failed to read some attachments. What should we do 295 // TODO(maniscalco): We failed to read some attachments. What should we do
283 // now? 296 // now?
297 AttachmentIdList::const_iterator iter = unavailable_attachment_ids->begin();
298 AttachmentIdList::const_iterator end = unavailable_attachment_ids->end();
299 for (; iter != end; ++iter) {
300 upload_task_queue_->Cancel(*iter);
301 }
284 } 302 }
285 303
286 AttachmentMap::const_iterator iter = attachments->begin(); 304 AttachmentMap::const_iterator iter = attachments->begin();
287 AttachmentMap::const_iterator end = attachments->end(); 305 AttachmentMap::const_iterator end = attachments->end();
288 for (; iter != end; ++iter) { 306 for (; iter != end; ++iter) {
289 attachment_uploader_->UploadAttachment( 307 attachment_uploader_->UploadAttachment(
290 iter->second, 308 iter->second,
291 base::Bind(&AttachmentServiceImpl::UploadDone, 309 base::Bind(&AttachmentServiceImpl::UploadDone,
292 weak_ptr_factory_.GetWeakPtr())); 310 weak_ptr_factory_.GetWeakPtr()));
293 } 311 }
294 } 312 }
295 313
296 } // namespace syncer 314 } // namespace syncer
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698