OLD | NEW |
---|---|
1 // Copyright 2014 The Chromium Authors. All rights reserved. | 1 // Copyright 2014 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "sync/internal_api/public/attachments/attachment_service_impl.h" | 5 #include "sync/internal_api/public/attachments/attachment_service_impl.h" |
6 | 6 |
7 #include <iterator> | 7 #include <iterator> |
8 | 8 |
9 #include "base/bind.h" | 9 #include "base/bind.h" |
10 #include "base/message_loop/message_loop.h" | 10 #include "base/message_loop/message_loop.h" |
11 #include "base/thread_task_runner_handle.h" | 11 #include "base/thread_task_runner_handle.h" |
12 #include "base/time/time.h" | |
12 #include "sync/api/attachments/attachment.h" | 13 #include "sync/api/attachments/attachment.h" |
13 #include "sync/api/attachments/fake_attachment_store.h" | 14 #include "sync/api/attachments/fake_attachment_store.h" |
14 #include "sync/internal_api/public/attachments/fake_attachment_downloader.h" | 15 #include "sync/internal_api/public/attachments/fake_attachment_downloader.h" |
15 #include "sync/internal_api/public/attachments/fake_attachment_uploader.h" | 16 #include "sync/internal_api/public/attachments/fake_attachment_uploader.h" |
16 | 17 |
17 namespace syncer { | 18 namespace syncer { |
18 | 19 |
19 // GetOrDownloadAttachments starts multiple parallel DownloadAttachment calls. | 20 // GetOrDownloadAttachments starts multiple parallel DownloadAttachment calls. |
20 // GetOrDownloadState tracks completion of these calls and posts callback for | 21 // GetOrDownloadState tracks completion of these calls and posts callback for |
21 // consumer once all attachments are either retrieved or reported unavailable. | 22 // consumer once all attachments are either retrieved or reported unavailable. |
(...skipping 84 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
106 base::MessageLoop::current()->PostTask( | 107 base::MessageLoop::current()->PostTask( |
107 FROM_HERE, | 108 FROM_HERE, |
108 base::Bind(callback_, result, base::Passed(&retrieved_attachments_))); | 109 base::Bind(callback_, result, base::Passed(&retrieved_attachments_))); |
109 } | 110 } |
110 } | 111 } |
111 | 112 |
112 AttachmentServiceImpl::AttachmentServiceImpl( | 113 AttachmentServiceImpl::AttachmentServiceImpl( |
113 scoped_ptr<AttachmentStore> attachment_store, | 114 scoped_ptr<AttachmentStore> attachment_store, |
114 scoped_ptr<AttachmentUploader> attachment_uploader, | 115 scoped_ptr<AttachmentUploader> attachment_uploader, |
115 scoped_ptr<AttachmentDownloader> attachment_downloader, | 116 scoped_ptr<AttachmentDownloader> attachment_downloader, |
116 Delegate* delegate) | 117 Delegate* delegate, |
118 const base::TimeDelta& initial_backoff_delay, | |
119 const base::TimeDelta& max_backoff_delay) | |
117 : attachment_store_(attachment_store.Pass()), | 120 : attachment_store_(attachment_store.Pass()), |
118 attachment_uploader_(attachment_uploader.Pass()), | 121 attachment_uploader_(attachment_uploader.Pass()), |
119 attachment_downloader_(attachment_downloader.Pass()), | 122 attachment_downloader_(attachment_downloader.Pass()), |
120 delegate_(delegate), | 123 delegate_(delegate), |
121 weak_ptr_factory_(this) { | 124 weak_ptr_factory_(this) { |
122 DCHECK(CalledOnValidThread()); | 125 DCHECK(CalledOnValidThread()); |
123 DCHECK(attachment_store_); | 126 DCHECK(attachment_store_); |
127 | |
128 // TODO(maniscalco): Observe network connectivity change events. When the | |
129 // network becomes disconnected, consider suspending queue dispatch. When | |
130 // connectivity is restored, consider clearing any dispatch backoff (bug | |
131 // 411981). | |
132 upload_task_queue_.reset(new TaskQueue<AttachmentId>( | |
133 base::Bind(&AttachmentServiceImpl::BeginUpload, | |
134 weak_ptr_factory_.GetWeakPtr()), | |
135 initial_backoff_delay, | |
136 max_backoff_delay)); | |
124 } | 137 } |
125 | 138 |
126 AttachmentServiceImpl::~AttachmentServiceImpl() { | 139 AttachmentServiceImpl::~AttachmentServiceImpl() { |
127 DCHECK(CalledOnValidThread()); | 140 DCHECK(CalledOnValidThread()); |
128 } | 141 } |
129 | 142 |
130 // Static. | 143 // Static. |
131 scoped_ptr<syncer::AttachmentService> AttachmentServiceImpl::CreateForTest() { | 144 scoped_ptr<syncer::AttachmentService> AttachmentServiceImpl::CreateForTest() { |
132 scoped_ptr<syncer::AttachmentStore> attachment_store( | 145 scoped_ptr<syncer::AttachmentStore> attachment_store( |
133 new syncer::FakeAttachmentStore(base::ThreadTaskRunnerHandle::Get())); | 146 new syncer::FakeAttachmentStore(base::ThreadTaskRunnerHandle::Get())); |
134 scoped_ptr<AttachmentUploader> attachment_uploader( | 147 scoped_ptr<AttachmentUploader> attachment_uploader( |
135 new FakeAttachmentUploader); | 148 new FakeAttachmentUploader); |
136 scoped_ptr<AttachmentDownloader> attachment_downloader( | 149 scoped_ptr<AttachmentDownloader> attachment_downloader( |
137 new FakeAttachmentDownloader()); | 150 new FakeAttachmentDownloader()); |
138 scoped_ptr<syncer::AttachmentService> attachment_service( | 151 scoped_ptr<syncer::AttachmentService> attachment_service( |
139 new syncer::AttachmentServiceImpl(attachment_store.Pass(), | 152 new syncer::AttachmentServiceImpl(attachment_store.Pass(), |
140 attachment_uploader.Pass(), | 153 attachment_uploader.Pass(), |
141 attachment_downloader.Pass(), | 154 attachment_downloader.Pass(), |
142 NULL)); | 155 NULL, |
156 base::TimeDelta(), | |
157 base::TimeDelta())); | |
143 return attachment_service.Pass(); | 158 return attachment_service.Pass(); |
144 } | 159 } |
145 | 160 |
146 AttachmentStore* AttachmentServiceImpl::GetStore() { | 161 AttachmentStore* AttachmentServiceImpl::GetStore() { |
147 return attachment_store_.get(); | 162 return attachment_store_.get(); |
148 } | 163 } |
149 | 164 |
150 void AttachmentServiceImpl::GetOrDownloadAttachments( | 165 void AttachmentServiceImpl::GetOrDownloadAttachments( |
151 const AttachmentIdList& attachment_ids, | 166 const AttachmentIdList& attachment_ids, |
152 const GetOrDownloadCallback& callback) { | 167 const GetOrDownloadCallback& callback) { |
(...skipping 56 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
209 drop_result = AttachmentService::DROP_SUCCESS; | 224 drop_result = AttachmentService::DROP_SUCCESS; |
210 } | 225 } |
211 // TODO(maniscalco): Deal with case where an error occurred (bug 361251). | 226 // TODO(maniscalco): Deal with case where an error occurred (bug 361251). |
212 base::MessageLoop::current()->PostTask(FROM_HERE, | 227 base::MessageLoop::current()->PostTask(FROM_HERE, |
213 base::Bind(callback, drop_result)); | 228 base::Bind(callback, drop_result)); |
214 } | 229 } |
215 | 230 |
216 void AttachmentServiceImpl::UploadDone( | 231 void AttachmentServiceImpl::UploadDone( |
217 const AttachmentUploader::UploadResult& result, | 232 const AttachmentUploader::UploadResult& result, |
218 const AttachmentId& attachment_id) { | 233 const AttachmentId& attachment_id) { |
219 ids_in_queue_.erase(attachment_id); | 234 DCHECK(CalledOnValidThread()); |
220 // TODO(pavely): crbug/372622: Deal with UploadAttachment failures. | 235 switch (result) { |
221 if (result != AttachmentUploader::UPLOAD_SUCCESS) | 236 case AttachmentUploader::UPLOAD_SUCCESS: |
222 return; | 237 upload_task_queue_->MarkAsSucceeded(attachment_id); |
223 if (delegate_) { | 238 if (delegate_) { |
224 delegate_->OnAttachmentUploaded(attachment_id); | 239 delegate_->OnAttachmentUploaded(attachment_id); |
240 } | |
241 break; | |
242 case AttachmentUploader::UPLOAD_TRANSIENT_ERROR: | |
243 upload_task_queue_->Retry(attachment_id); | |
244 break; | |
245 case AttachmentUploader::UPLOAD_UNSPECIFIED_ERROR: | |
246 // TODO(pavely): crbug/372622: Deal with UploadAttachment failures. | |
247 break; | |
pavely
2014/09/09 19:42:23
You have to call exactly one of three functions ex
maniscalco
2014/09/09 21:15:43
Good catch. There is a bug here. There is no "co
| |
225 } | 248 } |
226 } | 249 } |
227 | 250 |
228 void AttachmentServiceImpl::DownloadDone( | 251 void AttachmentServiceImpl::DownloadDone( |
229 const scoped_refptr<GetOrDownloadState>& state, | 252 const scoped_refptr<GetOrDownloadState>& state, |
230 const AttachmentId& attachment_id, | 253 const AttachmentId& attachment_id, |
231 const AttachmentDownloader::DownloadResult& result, | 254 const AttachmentDownloader::DownloadResult& result, |
232 scoped_ptr<Attachment> attachment) { | 255 scoped_ptr<Attachment> attachment) { |
233 if (result == AttachmentDownloader::DOWNLOAD_SUCCESS) { | 256 switch (result) { |
234 state->AddAttachment(*attachment.get()); | 257 case AttachmentDownloader::DOWNLOAD_SUCCESS: |
235 } else { | 258 state->AddAttachment(*attachment.get()); |
236 state->AddUnavailableAttachmentId(attachment_id); | 259 break; |
260 case AttachmentDownloader::DOWNLOAD_TRANSIENT_ERROR: | |
261 case AttachmentDownloader::DOWNLOAD_UNSPECIFIED_ERROR: | |
262 state->AddUnavailableAttachmentId(attachment_id); | |
263 break; | |
237 } | 264 } |
pavely
2014/09/09 19:42:23
Add default: NOTREACHED(). It will show that you a
maniscalco
2014/09/09 21:15:43
I'm not sure that's preferable to no default. Wit
| |
238 } | 265 } |
239 | 266 |
267 void AttachmentServiceImpl::BeginUpload(const AttachmentId& attachment_id) { | |
268 DCHECK(CalledOnValidThread()); | |
269 AttachmentIdList attachment_ids; | |
270 attachment_ids.push_back(attachment_id); | |
271 attachment_store_->Read(attachment_ids, | |
272 base::Bind(&AttachmentServiceImpl::ReadDoneNowUpload, | |
273 weak_ptr_factory_.GetWeakPtr())); | |
274 } | |
275 | |
240 void AttachmentServiceImpl::UploadAttachments( | 276 void AttachmentServiceImpl::UploadAttachments( |
241 const AttachmentIdSet& attachment_ids) { | 277 const AttachmentIdSet& attachment_ids) { |
242 DCHECK(CalledOnValidThread()); | 278 DCHECK(CalledOnValidThread()); |
243 if (!attachment_uploader_.get()) { | 279 if (!attachment_uploader_.get()) { |
244 return; | 280 return; |
245 } | 281 } |
246 | |
247 // Enqueue the attachment ids that aren't already in the queue. | |
248 AttachmentIdSet::const_iterator iter = attachment_ids.begin(); | 282 AttachmentIdSet::const_iterator iter = attachment_ids.begin(); |
249 AttachmentIdSet::const_iterator end = attachment_ids.end(); | 283 AttachmentIdSet::const_iterator end = attachment_ids.end(); |
250 for (; iter != end; ++iter) { | 284 for (; iter != end; ++iter) { |
251 if (ids_in_queue_.find(*iter) == ids_in_queue_.end()) { | 285 upload_task_queue_->AddToQueue(*iter); |
252 queue_.push_back(*iter); | |
253 ids_in_queue_.insert(*iter); | |
254 } | |
255 } | |
256 | |
257 ProcessQueuedUploads(); | |
258 } | |
259 | |
260 void AttachmentServiceImpl::ProcessQueuedUploads() { | |
261 DCHECK(CalledOnValidThread()); | |
262 // TODO(maniscalco): Don't dequeue them all. Instead, limit the number of | |
263 // concurrent uploads and apply backoff on failure. | |
264 while (!queue_.empty()) { | |
265 const AttachmentId id = queue_.front(); | |
266 queue_.pop_front(); | |
267 AttachmentIdList attachment_ids; | |
268 attachment_ids.push_back(id); | |
269 attachment_store_->Read( | |
270 attachment_ids, | |
271 base::Bind(&AttachmentServiceImpl::ReadDoneNowUpload, | |
272 weak_ptr_factory_.GetWeakPtr())); | |
273 } | 286 } |
274 } | 287 } |
275 | 288 |
276 void AttachmentServiceImpl::ReadDoneNowUpload( | 289 void AttachmentServiceImpl::ReadDoneNowUpload( |
277 const AttachmentStore::Result& result, | 290 const AttachmentStore::Result& result, |
278 scoped_ptr<AttachmentMap> attachments, | 291 scoped_ptr<AttachmentMap> attachments, |
279 scoped_ptr<AttachmentIdList> unavailable_attachment_ids) { | 292 scoped_ptr<AttachmentIdList> unavailable_attachment_ids) { |
280 DCHECK(CalledOnValidThread()); | 293 DCHECK(CalledOnValidThread()); |
281 if (!unavailable_attachment_ids->empty()) { | 294 if (!unavailable_attachment_ids->empty()) { |
282 // TODO(maniscalco): We failed to read some attachments. What should we do | 295 // TODO(maniscalco): We failed to read some attachments. What should we do |
283 // now? | 296 // now? |
297 AttachmentIdList::const_iterator iter = unavailable_attachment_ids->begin(); | |
298 AttachmentIdList::const_iterator end = unavailable_attachment_ids->end(); | |
299 for (; iter != end; ++iter) { | |
300 upload_task_queue_->Cancel(*iter); | |
301 } | |
284 } | 302 } |
285 | 303 |
286 AttachmentMap::const_iterator iter = attachments->begin(); | 304 AttachmentMap::const_iterator iter = attachments->begin(); |
287 AttachmentMap::const_iterator end = attachments->end(); | 305 AttachmentMap::const_iterator end = attachments->end(); |
288 for (; iter != end; ++iter) { | 306 for (; iter != end; ++iter) { |
289 attachment_uploader_->UploadAttachment( | 307 attachment_uploader_->UploadAttachment( |
290 iter->second, | 308 iter->second, |
291 base::Bind(&AttachmentServiceImpl::UploadDone, | 309 base::Bind(&AttachmentServiceImpl::UploadDone, |
292 weak_ptr_factory_.GetWeakPtr())); | 310 weak_ptr_factory_.GetWeakPtr())); |
293 } | 311 } |
294 } | 312 } |
295 | 313 |
296 } // namespace syncer | 314 } // namespace syncer |
OLD | NEW |