OLD | NEW |
1 // Copyright 2014 The Chromium Authors. All rights reserved. | 1 // Copyright 2014 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "sync/internal_api/public/attachments/attachment_service_impl.h" | 5 #include "sync/internal_api/public/attachments/attachment_service_impl.h" |
6 | 6 |
7 #include <iterator> | 7 #include <iterator> |
8 | 8 |
9 #include "base/bind.h" | 9 #include "base/bind.h" |
10 #include "base/message_loop/message_loop.h" | 10 #include "base/message_loop/message_loop.h" |
11 #include "base/thread_task_runner_handle.h" | 11 #include "base/thread_task_runner_handle.h" |
| 12 #include "base/time/time.h" |
12 #include "sync/api/attachments/attachment.h" | 13 #include "sync/api/attachments/attachment.h" |
13 #include "sync/api/attachments/fake_attachment_store.h" | 14 #include "sync/api/attachments/fake_attachment_store.h" |
14 #include "sync/internal_api/public/attachments/fake_attachment_downloader.h" | 15 #include "sync/internal_api/public/attachments/fake_attachment_downloader.h" |
15 #include "sync/internal_api/public/attachments/fake_attachment_uploader.h" | 16 #include "sync/internal_api/public/attachments/fake_attachment_uploader.h" |
16 | 17 |
17 namespace syncer { | 18 namespace syncer { |
18 | 19 |
19 // GetOrDownloadAttachments starts multiple parallel DownloadAttachment calls. | 20 // GetOrDownloadAttachments starts multiple parallel DownloadAttachment calls. |
20 // GetOrDownloadState tracks completion of these calls and posts callback for | 21 // GetOrDownloadState tracks completion of these calls and posts callback for |
21 // consumer once all attachments are either retrieved or reported unavailable. | 22 // consumer once all attachments are either retrieved or reported unavailable. |
(...skipping 84 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
106 base::MessageLoop::current()->PostTask( | 107 base::MessageLoop::current()->PostTask( |
107 FROM_HERE, | 108 FROM_HERE, |
108 base::Bind(callback_, result, base::Passed(&retrieved_attachments_))); | 109 base::Bind(callback_, result, base::Passed(&retrieved_attachments_))); |
109 } | 110 } |
110 } | 111 } |
111 | 112 |
112 AttachmentServiceImpl::AttachmentServiceImpl( | 113 AttachmentServiceImpl::AttachmentServiceImpl( |
113 scoped_refptr<AttachmentStore> attachment_store, | 114 scoped_refptr<AttachmentStore> attachment_store, |
114 scoped_ptr<AttachmentUploader> attachment_uploader, | 115 scoped_ptr<AttachmentUploader> attachment_uploader, |
115 scoped_ptr<AttachmentDownloader> attachment_downloader, | 116 scoped_ptr<AttachmentDownloader> attachment_downloader, |
116 Delegate* delegate) | 117 Delegate* delegate, |
| 118 const base::TimeDelta& initial_backoff_delay, |
| 119 const base::TimeDelta& max_backoff_delay) |
117 : attachment_store_(attachment_store), | 120 : attachment_store_(attachment_store), |
118 attachment_uploader_(attachment_uploader.Pass()), | 121 attachment_uploader_(attachment_uploader.Pass()), |
119 attachment_downloader_(attachment_downloader.Pass()), | 122 attachment_downloader_(attachment_downloader.Pass()), |
120 delegate_(delegate), | 123 delegate_(delegate), |
121 weak_ptr_factory_(this) { | 124 weak_ptr_factory_(this) { |
122 DCHECK(CalledOnValidThread()); | 125 DCHECK(CalledOnValidThread()); |
123 DCHECK(attachment_store_.get()); | 126 DCHECK(attachment_store_.get()); |
| 127 |
| 128 // TODO(maniscalco): Observe network connectivity change events. When the |
| 129 // network becomes disconnected, consider suspending queue dispatch. When |
| 130 // connectivity is restored, consider clearing any dispatch backoff (bug |
| 131 // 411981). |
| 132 upload_task_queue_.reset(new TaskQueue<AttachmentId>( |
| 133 base::Bind(&AttachmentServiceImpl::BeginUpload, |
| 134 weak_ptr_factory_.GetWeakPtr()), |
| 135 initial_backoff_delay, |
| 136 max_backoff_delay)); |
124 } | 137 } |
125 | 138 |
126 AttachmentServiceImpl::~AttachmentServiceImpl() { | 139 AttachmentServiceImpl::~AttachmentServiceImpl() { |
127 DCHECK(CalledOnValidThread()); | 140 DCHECK(CalledOnValidThread()); |
128 } | 141 } |
129 | 142 |
130 // Static. | 143 // Static. |
131 scoped_ptr<syncer::AttachmentService> AttachmentServiceImpl::CreateForTest() { | 144 scoped_ptr<syncer::AttachmentService> AttachmentServiceImpl::CreateForTest() { |
132 scoped_refptr<syncer::AttachmentStore> attachment_store( | 145 scoped_refptr<syncer::AttachmentStore> attachment_store( |
133 new syncer::FakeAttachmentStore(base::ThreadTaskRunnerHandle::Get())); | 146 new syncer::FakeAttachmentStore(base::ThreadTaskRunnerHandle::Get())); |
134 scoped_ptr<AttachmentUploader> attachment_uploader( | 147 scoped_ptr<AttachmentUploader> attachment_uploader( |
135 new FakeAttachmentUploader); | 148 new FakeAttachmentUploader); |
136 scoped_ptr<AttachmentDownloader> attachment_downloader( | 149 scoped_ptr<AttachmentDownloader> attachment_downloader( |
137 new FakeAttachmentDownloader()); | 150 new FakeAttachmentDownloader()); |
138 scoped_ptr<syncer::AttachmentService> attachment_service( | 151 scoped_ptr<syncer::AttachmentService> attachment_service( |
139 new syncer::AttachmentServiceImpl(attachment_store, | 152 new syncer::AttachmentServiceImpl(attachment_store, |
140 attachment_uploader.Pass(), | 153 attachment_uploader.Pass(), |
141 attachment_downloader.Pass(), | 154 attachment_downloader.Pass(), |
142 NULL)); | 155 NULL, |
| 156 base::TimeDelta(), |
| 157 base::TimeDelta())); |
143 return attachment_service.Pass(); | 158 return attachment_service.Pass(); |
144 } | 159 } |
145 | 160 |
146 AttachmentStore* AttachmentServiceImpl::GetStore() { | 161 AttachmentStore* AttachmentServiceImpl::GetStore() { |
147 return attachment_store_.get(); | 162 return attachment_store_.get(); |
148 } | 163 } |
149 | 164 |
150 void AttachmentServiceImpl::GetOrDownloadAttachments( | 165 void AttachmentServiceImpl::GetOrDownloadAttachments( |
151 const AttachmentIdList& attachment_ids, | 166 const AttachmentIdList& attachment_ids, |
152 const GetOrDownloadCallback& callback) { | 167 const GetOrDownloadCallback& callback) { |
(...skipping 56 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
209 drop_result = AttachmentService::DROP_SUCCESS; | 224 drop_result = AttachmentService::DROP_SUCCESS; |
210 } | 225 } |
211 // TODO(maniscalco): Deal with case where an error occurred (bug 361251). | 226 // TODO(maniscalco): Deal with case where an error occurred (bug 361251). |
212 base::MessageLoop::current()->PostTask(FROM_HERE, | 227 base::MessageLoop::current()->PostTask(FROM_HERE, |
213 base::Bind(callback, drop_result)); | 228 base::Bind(callback, drop_result)); |
214 } | 229 } |
215 | 230 |
216 void AttachmentServiceImpl::UploadDone( | 231 void AttachmentServiceImpl::UploadDone( |
217 const AttachmentUploader::UploadResult& result, | 232 const AttachmentUploader::UploadResult& result, |
218 const AttachmentId& attachment_id) { | 233 const AttachmentId& attachment_id) { |
219 ids_in_queue_.erase(attachment_id); | 234 DCHECK(CalledOnValidThread()); |
220 // TODO(pavely): crbug/372622: Deal with UploadAttachment failures. | 235 switch (result) { |
221 if (result != AttachmentUploader::UPLOAD_SUCCESS) | 236 case AttachmentUploader::UPLOAD_SUCCESS: |
222 return; | 237 upload_task_queue_->MarkAsSucceeded(attachment_id); |
223 if (delegate_) { | 238 if (delegate_) { |
224 delegate_->OnAttachmentUploaded(attachment_id); | 239 delegate_->OnAttachmentUploaded(attachment_id); |
| 240 } |
| 241 break; |
| 242 case AttachmentUploader::UPLOAD_TRANSIENT_ERROR: |
| 243 upload_task_queue_->MarkAsFailed(attachment_id); |
| 244 upload_task_queue_->AddToQueue(attachment_id); |
| 245 break; |
| 246 case AttachmentUploader::UPLOAD_UNSPECIFIED_ERROR: |
| 247 // TODO(pavely): crbug/372622: Deal with UploadAttachment failures. |
| 248 upload_task_queue_->MarkAsFailed(attachment_id); |
| 249 break; |
225 } | 250 } |
226 } | 251 } |
227 | 252 |
228 void AttachmentServiceImpl::DownloadDone( | 253 void AttachmentServiceImpl::DownloadDone( |
229 const scoped_refptr<GetOrDownloadState>& state, | 254 const scoped_refptr<GetOrDownloadState>& state, |
230 const AttachmentId& attachment_id, | 255 const AttachmentId& attachment_id, |
231 const AttachmentDownloader::DownloadResult& result, | 256 const AttachmentDownloader::DownloadResult& result, |
232 scoped_ptr<Attachment> attachment) { | 257 scoped_ptr<Attachment> attachment) { |
233 if (result == AttachmentDownloader::DOWNLOAD_SUCCESS) { | 258 switch (result) { |
234 state->AddAttachment(*attachment.get()); | 259 case AttachmentDownloader::DOWNLOAD_SUCCESS: |
235 } else { | 260 state->AddAttachment(*attachment.get()); |
236 state->AddUnavailableAttachmentId(attachment_id); | 261 break; |
| 262 case AttachmentDownloader::DOWNLOAD_TRANSIENT_ERROR: |
| 263 case AttachmentDownloader::DOWNLOAD_UNSPECIFIED_ERROR: |
| 264 state->AddUnavailableAttachmentId(attachment_id); |
| 265 break; |
237 } | 266 } |
238 } | 267 } |
239 | 268 |
| 269 void AttachmentServiceImpl::BeginUpload(const AttachmentId& attachment_id) { |
| 270 DCHECK(CalledOnValidThread()); |
| 271 AttachmentIdList attachment_ids; |
| 272 attachment_ids.push_back(attachment_id); |
| 273 attachment_store_->Read(attachment_ids, |
| 274 base::Bind(&AttachmentServiceImpl::ReadDoneNowUpload, |
| 275 weak_ptr_factory_.GetWeakPtr())); |
| 276 } |
| 277 |
240 void AttachmentServiceImpl::UploadAttachments( | 278 void AttachmentServiceImpl::UploadAttachments( |
241 const AttachmentIdSet& attachment_ids) { | 279 const AttachmentIdSet& attachment_ids) { |
242 DCHECK(CalledOnValidThread()); | 280 DCHECK(CalledOnValidThread()); |
243 if (!attachment_uploader_.get()) { | 281 if (!attachment_uploader_.get()) { |
244 return; | 282 return; |
245 } | 283 } |
246 | |
247 // Enqueue the attachment ids that aren't already in the queue. | |
248 AttachmentIdSet::const_iterator iter = attachment_ids.begin(); | 284 AttachmentIdSet::const_iterator iter = attachment_ids.begin(); |
249 AttachmentIdSet::const_iterator end = attachment_ids.end(); | 285 AttachmentIdSet::const_iterator end = attachment_ids.end(); |
250 for (; iter != end; ++iter) { | 286 for (; iter != end; ++iter) { |
251 if (ids_in_queue_.find(*iter) == ids_in_queue_.end()) { | 287 upload_task_queue_->AddToQueue(*iter); |
252 queue_.push_back(*iter); | |
253 ids_in_queue_.insert(*iter); | |
254 } | |
255 } | |
256 | |
257 ProcessQueuedUploads(); | |
258 } | |
259 | |
260 void AttachmentServiceImpl::ProcessQueuedUploads() { | |
261 DCHECK(CalledOnValidThread()); | |
262 // TODO(maniscalco): Don't dequeue them all. Instead, limit the number of | |
263 // concurrent uploads and apply backoff on failure. | |
264 while (!queue_.empty()) { | |
265 const AttachmentId id = queue_.front(); | |
266 queue_.pop_front(); | |
267 AttachmentIdList attachment_ids; | |
268 attachment_ids.push_back(id); | |
269 attachment_store_->Read( | |
270 attachment_ids, | |
271 base::Bind(&AttachmentServiceImpl::ReadDoneNowUpload, | |
272 weak_ptr_factory_.GetWeakPtr())); | |
273 } | 288 } |
274 } | 289 } |
275 | 290 |
276 void AttachmentServiceImpl::ReadDoneNowUpload( | 291 void AttachmentServiceImpl::ReadDoneNowUpload( |
277 const AttachmentStore::Result& result, | 292 const AttachmentStore::Result& result, |
278 scoped_ptr<AttachmentMap> attachments, | 293 scoped_ptr<AttachmentMap> attachments, |
279 scoped_ptr<AttachmentIdList> unavailable_attachment_ids) { | 294 scoped_ptr<AttachmentIdList> unavailable_attachment_ids) { |
280 DCHECK(CalledOnValidThread()); | 295 DCHECK(CalledOnValidThread()); |
281 if (!unavailable_attachment_ids->empty()) { | 296 if (!unavailable_attachment_ids->empty()) { |
282 // TODO(maniscalco): We failed to read some attachments. What should we do | 297 // TODO(maniscalco): We failed to read some attachments. What should we do |
283 // now? | 298 // now? |
| 299 AttachmentIdList::const_iterator iter = unavailable_attachment_ids->begin(); |
| 300 AttachmentIdList::const_iterator end = unavailable_attachment_ids->end(); |
| 301 for (; iter != end; ++iter) { |
| 302 upload_task_queue_->Cancel(*iter); |
| 303 } |
284 } | 304 } |
285 | 305 |
286 AttachmentMap::const_iterator iter = attachments->begin(); | 306 AttachmentMap::const_iterator iter = attachments->begin(); |
287 AttachmentMap::const_iterator end = attachments->end(); | 307 AttachmentMap::const_iterator end = attachments->end(); |
288 for (; iter != end; ++iter) { | 308 for (; iter != end; ++iter) { |
289 attachment_uploader_->UploadAttachment( | 309 attachment_uploader_->UploadAttachment( |
290 iter->second, | 310 iter->second, |
291 base::Bind(&AttachmentServiceImpl::UploadDone, | 311 base::Bind(&AttachmentServiceImpl::UploadDone, |
292 weak_ptr_factory_.GetWeakPtr())); | 312 weak_ptr_factory_.GetWeakPtr())); |
293 } | 313 } |
294 } | 314 } |
295 | 315 |
296 } // namespace syncer | 316 } // namespace syncer |
OLD | NEW |