Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2014 The Chromium Authors. All rights reserved. | 1 // Copyright 2014 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "sync/internal_api/public/attachments/attachment_service_impl.h" | 5 #include "sync/internal_api/public/attachments/attachment_service_impl.h" |
| 6 | 6 |
| 7 #include <iterator> | 7 #include <iterator> |
| 8 | 8 |
| 9 #include "base/bind.h" | 9 #include "base/bind.h" |
| 10 #include "base/message_loop/message_loop.h" | 10 #include "base/message_loop/message_loop.h" |
| 11 #include "base/thread_task_runner_handle.h" | 11 #include "base/thread_task_runner_handle.h" |
| 12 #include "base/time/time.h" | |
| 12 #include "sync/api/attachments/attachment.h" | 13 #include "sync/api/attachments/attachment.h" |
| 13 #include "sync/api/attachments/fake_attachment_store.h" | 14 #include "sync/api/attachments/fake_attachment_store.h" |
| 14 #include "sync/internal_api/public/attachments/fake_attachment_downloader.h" | 15 #include "sync/internal_api/public/attachments/fake_attachment_downloader.h" |
| 15 #include "sync/internal_api/public/attachments/fake_attachment_uploader.h" | 16 #include "sync/internal_api/public/attachments/fake_attachment_uploader.h" |
| 16 | 17 |
| 17 namespace syncer { | 18 namespace syncer { |
| 18 | 19 |
| 19 // GetOrDownloadAttachments starts multiple parallel DownloadAttachment calls. | 20 // GetOrDownloadAttachments starts multiple parallel DownloadAttachment calls. |
| 20 // GetOrDownloadState tracks completion of these calls and posts callback for | 21 // GetOrDownloadState tracks completion of these calls and posts callback for |
| 21 // consumer once all attachments are either retrieved or reported unavailable. | 22 // consumer once all attachments are either retrieved or reported unavailable. |
| (...skipping 84 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 106 base::MessageLoop::current()->PostTask( | 107 base::MessageLoop::current()->PostTask( |
| 107 FROM_HERE, | 108 FROM_HERE, |
| 108 base::Bind(callback_, result, base::Passed(&retrieved_attachments_))); | 109 base::Bind(callback_, result, base::Passed(&retrieved_attachments_))); |
| 109 } | 110 } |
| 110 } | 111 } |
| 111 | 112 |
| 112 AttachmentServiceImpl::AttachmentServiceImpl( | 113 AttachmentServiceImpl::AttachmentServiceImpl( |
| 113 scoped_ptr<AttachmentStore> attachment_store, | 114 scoped_ptr<AttachmentStore> attachment_store, |
| 114 scoped_ptr<AttachmentUploader> attachment_uploader, | 115 scoped_ptr<AttachmentUploader> attachment_uploader, |
| 115 scoped_ptr<AttachmentDownloader> attachment_downloader, | 116 scoped_ptr<AttachmentDownloader> attachment_downloader, |
| 116 Delegate* delegate) | 117 Delegate* delegate, |
| 118 const base::TimeDelta& initial_backoff_delay, | |
| 119 const base::TimeDelta& max_backoff_delay) | |
| 117 : attachment_store_(attachment_store.Pass()), | 120 : attachment_store_(attachment_store.Pass()), |
| 118 attachment_uploader_(attachment_uploader.Pass()), | 121 attachment_uploader_(attachment_uploader.Pass()), |
| 119 attachment_downloader_(attachment_downloader.Pass()), | 122 attachment_downloader_(attachment_downloader.Pass()), |
| 120 delegate_(delegate), | 123 delegate_(delegate), |
| 121 weak_ptr_factory_(this) { | 124 weak_ptr_factory_(this) { |
| 122 DCHECK(CalledOnValidThread()); | 125 DCHECK(CalledOnValidThread()); |
| 123 DCHECK(attachment_store_); | 126 DCHECK(attachment_store_); |
| 127 | |
| 128 // TODO(maniscalco): Observe network connectivity change events. When the | |
| 129 // network becomes disconnected, consider suspending queue dispatch. When | |
| 130 // connectivity is restored, consider clearing any dispatch backoff (bug | |
| 131 // 411981). | |
| 132 upload_task_queue_.reset(new TaskQueue<AttachmentId>( | |
| 133 base::Bind(&AttachmentServiceImpl::BeginUpload, | |
| 134 weak_ptr_factory_.GetWeakPtr()), | |
| 135 initial_backoff_delay, | |
| 136 max_backoff_delay)); | |
| 124 } | 137 } |
| 125 | 138 |
| 126 AttachmentServiceImpl::~AttachmentServiceImpl() { | 139 AttachmentServiceImpl::~AttachmentServiceImpl() { |
| 127 DCHECK(CalledOnValidThread()); | 140 DCHECK(CalledOnValidThread()); |
| 128 } | 141 } |
| 129 | 142 |
| 130 // Static. | 143 // Static. |
| 131 scoped_ptr<syncer::AttachmentService> AttachmentServiceImpl::CreateForTest() { | 144 scoped_ptr<syncer::AttachmentService> AttachmentServiceImpl::CreateForTest() { |
| 132 scoped_ptr<syncer::AttachmentStore> attachment_store( | 145 scoped_ptr<syncer::AttachmentStore> attachment_store( |
| 133 new syncer::FakeAttachmentStore(base::ThreadTaskRunnerHandle::Get())); | 146 new syncer::FakeAttachmentStore(base::ThreadTaskRunnerHandle::Get())); |
| 134 scoped_ptr<AttachmentUploader> attachment_uploader( | 147 scoped_ptr<AttachmentUploader> attachment_uploader( |
| 135 new FakeAttachmentUploader); | 148 new FakeAttachmentUploader); |
| 136 scoped_ptr<AttachmentDownloader> attachment_downloader( | 149 scoped_ptr<AttachmentDownloader> attachment_downloader( |
| 137 new FakeAttachmentDownloader()); | 150 new FakeAttachmentDownloader()); |
| 138 scoped_ptr<syncer::AttachmentService> attachment_service( | 151 scoped_ptr<syncer::AttachmentService> attachment_service( |
| 139 new syncer::AttachmentServiceImpl(attachment_store.Pass(), | 152 new syncer::AttachmentServiceImpl(attachment_store.Pass(), |
| 140 attachment_uploader.Pass(), | 153 attachment_uploader.Pass(), |
| 141 attachment_downloader.Pass(), | 154 attachment_downloader.Pass(), |
| 142 NULL)); | 155 NULL, |
| 156 base::TimeDelta(), | |
| 157 base::TimeDelta())); | |
| 143 return attachment_service.Pass(); | 158 return attachment_service.Pass(); |
| 144 } | 159 } |
| 145 | 160 |
| 146 AttachmentStore* AttachmentServiceImpl::GetStore() { | 161 AttachmentStore* AttachmentServiceImpl::GetStore() { |
| 147 return attachment_store_.get(); | 162 return attachment_store_.get(); |
| 148 } | 163 } |
| 149 | 164 |
| 150 void AttachmentServiceImpl::GetOrDownloadAttachments( | 165 void AttachmentServiceImpl::GetOrDownloadAttachments( |
| 151 const AttachmentIdList& attachment_ids, | 166 const AttachmentIdList& attachment_ids, |
| 152 const GetOrDownloadCallback& callback) { | 167 const GetOrDownloadCallback& callback) { |
| (...skipping 56 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 209 drop_result = AttachmentService::DROP_SUCCESS; | 224 drop_result = AttachmentService::DROP_SUCCESS; |
| 210 } | 225 } |
| 211 // TODO(maniscalco): Deal with case where an error occurred (bug 361251). | 226 // TODO(maniscalco): Deal with case where an error occurred (bug 361251). |
| 212 base::MessageLoop::current()->PostTask(FROM_HERE, | 227 base::MessageLoop::current()->PostTask(FROM_HERE, |
| 213 base::Bind(callback, drop_result)); | 228 base::Bind(callback, drop_result)); |
| 214 } | 229 } |
| 215 | 230 |
| 216 void AttachmentServiceImpl::UploadDone( | 231 void AttachmentServiceImpl::UploadDone( |
| 217 const AttachmentUploader::UploadResult& result, | 232 const AttachmentUploader::UploadResult& result, |
| 218 const AttachmentId& attachment_id) { | 233 const AttachmentId& attachment_id) { |
| 219 ids_in_queue_.erase(attachment_id); | 234 DCHECK(CalledOnValidThread()); |
| 220 // TODO(pavely): crbug/372622: Deal with UploadAttachment failures. | 235 switch (result) { |
| 221 if (result != AttachmentUploader::UPLOAD_SUCCESS) | 236 case AttachmentUploader::UPLOAD_SUCCESS: |
| 222 return; | 237 upload_task_queue_->MarkAsSucceeded(attachment_id); |
| 223 if (delegate_) { | 238 if (delegate_) { |
| 224 delegate_->OnAttachmentUploaded(attachment_id); | 239 delegate_->OnAttachmentUploaded(attachment_id); |
| 240 } | |
| 241 break; | |
| 242 case AttachmentUploader::UPLOAD_TRANSIENT_ERROR: | |
| 243 upload_task_queue_->Retry(attachment_id); | |
| 244 break; | |
| 245 case AttachmentUploader::UPLOAD_UNSPECIFIED_ERROR: | |
| 246 // TODO(pavely): crbug/372622: Deal with UploadAttachment failures. | |
| 247 break; | |
|
pavely
2014/09/09 19:42:23
You have to call exactly one of three functions ex
maniscalco
2014/09/09 21:15:43
Good catch. There is a bug here. There is no "co
| |
| 225 } | 248 } |
| 226 } | 249 } |
| 227 | 250 |
| 228 void AttachmentServiceImpl::DownloadDone( | 251 void AttachmentServiceImpl::DownloadDone( |
| 229 const scoped_refptr<GetOrDownloadState>& state, | 252 const scoped_refptr<GetOrDownloadState>& state, |
| 230 const AttachmentId& attachment_id, | 253 const AttachmentId& attachment_id, |
| 231 const AttachmentDownloader::DownloadResult& result, | 254 const AttachmentDownloader::DownloadResult& result, |
| 232 scoped_ptr<Attachment> attachment) { | 255 scoped_ptr<Attachment> attachment) { |
| 233 if (result == AttachmentDownloader::DOWNLOAD_SUCCESS) { | 256 switch (result) { |
| 234 state->AddAttachment(*attachment.get()); | 257 case AttachmentDownloader::DOWNLOAD_SUCCESS: |
| 235 } else { | 258 state->AddAttachment(*attachment.get()); |
| 236 state->AddUnavailableAttachmentId(attachment_id); | 259 break; |
| 260 case AttachmentDownloader::DOWNLOAD_TRANSIENT_ERROR: | |
| 261 case AttachmentDownloader::DOWNLOAD_UNSPECIFIED_ERROR: | |
| 262 state->AddUnavailableAttachmentId(attachment_id); | |
| 263 break; | |
| 237 } | 264 } |
|
pavely
2014/09/09 19:42:23
Add default: NOTREACHED(). It will show that you a
maniscalco
2014/09/09 21:15:43
I'm not sure that's preferable to no default. Wit
| |
| 238 } | 265 } |
| 239 | 266 |
| 267 void AttachmentServiceImpl::BeginUpload(const AttachmentId& attachment_id) { | |
| 268 DCHECK(CalledOnValidThread()); | |
| 269 AttachmentIdList attachment_ids; | |
| 270 attachment_ids.push_back(attachment_id); | |
| 271 attachment_store_->Read(attachment_ids, | |
| 272 base::Bind(&AttachmentServiceImpl::ReadDoneNowUpload, | |
| 273 weak_ptr_factory_.GetWeakPtr())); | |
| 274 } | |
| 275 | |
| 240 void AttachmentServiceImpl::UploadAttachments( | 276 void AttachmentServiceImpl::UploadAttachments( |
| 241 const AttachmentIdSet& attachment_ids) { | 277 const AttachmentIdSet& attachment_ids) { |
| 242 DCHECK(CalledOnValidThread()); | 278 DCHECK(CalledOnValidThread()); |
| 243 if (!attachment_uploader_.get()) { | 279 if (!attachment_uploader_.get()) { |
| 244 return; | 280 return; |
| 245 } | 281 } |
| 246 | |
| 247 // Enqueue the attachment ids that aren't already in the queue. | |
| 248 AttachmentIdSet::const_iterator iter = attachment_ids.begin(); | 282 AttachmentIdSet::const_iterator iter = attachment_ids.begin(); |
| 249 AttachmentIdSet::const_iterator end = attachment_ids.end(); | 283 AttachmentIdSet::const_iterator end = attachment_ids.end(); |
| 250 for (; iter != end; ++iter) { | 284 for (; iter != end; ++iter) { |
| 251 if (ids_in_queue_.find(*iter) == ids_in_queue_.end()) { | 285 upload_task_queue_->AddToQueue(*iter); |
| 252 queue_.push_back(*iter); | |
| 253 ids_in_queue_.insert(*iter); | |
| 254 } | |
| 255 } | |
| 256 | |
| 257 ProcessQueuedUploads(); | |
| 258 } | |
| 259 | |
| 260 void AttachmentServiceImpl::ProcessQueuedUploads() { | |
| 261 DCHECK(CalledOnValidThread()); | |
| 262 // TODO(maniscalco): Don't dequeue them all. Instead, limit the number of | |
| 263 // concurrent uploads and apply backoff on failure. | |
| 264 while (!queue_.empty()) { | |
| 265 const AttachmentId id = queue_.front(); | |
| 266 queue_.pop_front(); | |
| 267 AttachmentIdList attachment_ids; | |
| 268 attachment_ids.push_back(id); | |
| 269 attachment_store_->Read( | |
| 270 attachment_ids, | |
| 271 base::Bind(&AttachmentServiceImpl::ReadDoneNowUpload, | |
| 272 weak_ptr_factory_.GetWeakPtr())); | |
| 273 } | 286 } |
| 274 } | 287 } |
| 275 | 288 |
| 276 void AttachmentServiceImpl::ReadDoneNowUpload( | 289 void AttachmentServiceImpl::ReadDoneNowUpload( |
| 277 const AttachmentStore::Result& result, | 290 const AttachmentStore::Result& result, |
| 278 scoped_ptr<AttachmentMap> attachments, | 291 scoped_ptr<AttachmentMap> attachments, |
| 279 scoped_ptr<AttachmentIdList> unavailable_attachment_ids) { | 292 scoped_ptr<AttachmentIdList> unavailable_attachment_ids) { |
| 280 DCHECK(CalledOnValidThread()); | 293 DCHECK(CalledOnValidThread()); |
| 281 if (!unavailable_attachment_ids->empty()) { | 294 if (!unavailable_attachment_ids->empty()) { |
| 282 // TODO(maniscalco): We failed to read some attachments. What should we do | 295 // TODO(maniscalco): We failed to read some attachments. What should we do |
| 283 // now? | 296 // now? |
| 297 AttachmentIdList::const_iterator iter = unavailable_attachment_ids->begin(); | |
| 298 AttachmentIdList::const_iterator end = unavailable_attachment_ids->end(); | |
| 299 for (; iter != end; ++iter) { | |
| 300 upload_task_queue_->Cancel(*iter); | |
| 301 } | |
| 284 } | 302 } |
| 285 | 303 |
| 286 AttachmentMap::const_iterator iter = attachments->begin(); | 304 AttachmentMap::const_iterator iter = attachments->begin(); |
| 287 AttachmentMap::const_iterator end = attachments->end(); | 305 AttachmentMap::const_iterator end = attachments->end(); |
| 288 for (; iter != end; ++iter) { | 306 for (; iter != end; ++iter) { |
| 289 attachment_uploader_->UploadAttachment( | 307 attachment_uploader_->UploadAttachment( |
| 290 iter->second, | 308 iter->second, |
| 291 base::Bind(&AttachmentServiceImpl::UploadDone, | 309 base::Bind(&AttachmentServiceImpl::UploadDone, |
| 292 weak_ptr_factory_.GetWeakPtr())); | 310 weak_ptr_factory_.GetWeakPtr())); |
| 293 } | 311 } |
| 294 } | 312 } |
| 295 | 313 |
| 296 } // namespace syncer | 314 } // namespace syncer |
| OLD | NEW |