OLD | NEW |
| (Empty) |
1 // Copyright 2014 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 #include "components/sync/api_impl/attachments/attachment_service_impl.h" | |
6 | |
7 #include <iterator> | |
8 #include <utility> | |
9 | |
10 #include "base/bind.h" | |
11 #include "base/location.h" | |
12 #include "base/single_thread_task_runner.h" | |
13 #include "base/threading/thread_task_runner_handle.h" | |
14 #include "base/time/time.h" | |
15 #include "components/sync/api/attachments/attachment.h" | |
16 #include "components/sync/engine/attachments/fake_attachment_downloader.h" | |
17 #include "components/sync/engine/attachments/fake_attachment_uploader.h" | |
18 | |
19 namespace syncer { | |
20 | |
21 // GetOrDownloadAttachments starts multiple parallel DownloadAttachment calls. | |
22 // GetOrDownloadState tracks completion of these calls and posts callback for | |
23 // consumer once all attachments are either retrieved or reported unavailable. | |
24 class AttachmentServiceImpl::GetOrDownloadState | |
25 : public base::RefCounted<GetOrDownloadState>, | |
26 public base::NonThreadSafe { | |
27 public: | |
28 // GetOrDownloadState gets parameter from values passed to | |
29 // AttachmentService::GetOrDownloadAttachments. | |
30 // |attachment_ids| is a list of attachmens to retrieve. | |
31 // |callback| will be posted on current thread when all attachments retrieved | |
32 // or confirmed unavailable. | |
33 GetOrDownloadState(const AttachmentIdList& attachment_ids, | |
34 const GetOrDownloadCallback& callback); | |
35 | |
36 // Attachment was just retrieved. Add it to retrieved attachments. | |
37 void AddAttachment(const Attachment& attachment); | |
38 | |
39 // Both reading from local store and downloading attachment failed. | |
40 // Add it to unavailable set. | |
41 void AddUnavailableAttachmentId(const AttachmentId& attachment_id); | |
42 | |
43 private: | |
44 friend class base::RefCounted<GetOrDownloadState>; | |
45 virtual ~GetOrDownloadState(); | |
46 | |
47 // If all attachment requests completed then post callback to consumer with | |
48 // results. | |
49 void PostResultIfAllRequestsCompleted(); | |
50 | |
51 GetOrDownloadCallback callback_; | |
52 | |
53 // Requests for these attachments are still in progress. | |
54 AttachmentIdSet in_progress_attachments_; | |
55 | |
56 AttachmentIdSet unavailable_attachments_; | |
57 std::unique_ptr<AttachmentMap> retrieved_attachments_; | |
58 | |
59 DISALLOW_COPY_AND_ASSIGN(GetOrDownloadState); | |
60 }; | |
61 | |
62 AttachmentServiceImpl::GetOrDownloadState::GetOrDownloadState( | |
63 const AttachmentIdList& attachment_ids, | |
64 const GetOrDownloadCallback& callback) | |
65 : callback_(callback), retrieved_attachments_(new AttachmentMap()) { | |
66 std::copy( | |
67 attachment_ids.begin(), attachment_ids.end(), | |
68 std::inserter(in_progress_attachments_, in_progress_attachments_.end())); | |
69 PostResultIfAllRequestsCompleted(); | |
70 } | |
71 | |
72 AttachmentServiceImpl::GetOrDownloadState::~GetOrDownloadState() { | |
73 DCHECK(CalledOnValidThread()); | |
74 } | |
75 | |
76 void AttachmentServiceImpl::GetOrDownloadState::AddAttachment( | |
77 const Attachment& attachment) { | |
78 DCHECK(CalledOnValidThread()); | |
79 DCHECK(retrieved_attachments_->find(attachment.GetId()) == | |
80 retrieved_attachments_->end()); | |
81 retrieved_attachments_->insert( | |
82 std::make_pair(attachment.GetId(), attachment)); | |
83 DCHECK(in_progress_attachments_.find(attachment.GetId()) != | |
84 in_progress_attachments_.end()); | |
85 in_progress_attachments_.erase(attachment.GetId()); | |
86 PostResultIfAllRequestsCompleted(); | |
87 } | |
88 | |
89 void AttachmentServiceImpl::GetOrDownloadState::AddUnavailableAttachmentId( | |
90 const AttachmentId& attachment_id) { | |
91 DCHECK(CalledOnValidThread()); | |
92 DCHECK(unavailable_attachments_.find(attachment_id) == | |
93 unavailable_attachments_.end()); | |
94 unavailable_attachments_.insert(attachment_id); | |
95 DCHECK(in_progress_attachments_.find(attachment_id) != | |
96 in_progress_attachments_.end()); | |
97 in_progress_attachments_.erase(attachment_id); | |
98 PostResultIfAllRequestsCompleted(); | |
99 } | |
100 | |
101 void AttachmentServiceImpl::GetOrDownloadState:: | |
102 PostResultIfAllRequestsCompleted() { | |
103 if (in_progress_attachments_.empty()) { | |
104 // All requests completed. Let's notify consumer. | |
105 GetOrDownloadResult result = | |
106 unavailable_attachments_.empty() ? GET_SUCCESS : GET_UNSPECIFIED_ERROR; | |
107 base::ThreadTaskRunnerHandle::Get()->PostTask( | |
108 FROM_HERE, | |
109 base::Bind(callback_, result, base::Passed(&retrieved_attachments_))); | |
110 } | |
111 } | |
112 | |
113 AttachmentServiceImpl::AttachmentServiceImpl( | |
114 std::unique_ptr<AttachmentStoreForSync> attachment_store, | |
115 std::unique_ptr<AttachmentUploader> attachment_uploader, | |
116 std::unique_ptr<AttachmentDownloader> attachment_downloader, | |
117 Delegate* delegate, | |
118 const base::TimeDelta& initial_backoff_delay, | |
119 const base::TimeDelta& max_backoff_delay) | |
120 : attachment_store_(std::move(attachment_store)), | |
121 attachment_uploader_(std::move(attachment_uploader)), | |
122 attachment_downloader_(std::move(attachment_downloader)), | |
123 delegate_(delegate), | |
124 weak_ptr_factory_(this) { | |
125 DCHECK(CalledOnValidThread()); | |
126 DCHECK(attachment_store_.get()); | |
127 | |
128 // TODO(maniscalco): Observe network connectivity change events. When the | |
129 // network becomes disconnected, consider suspending queue dispatch. When | |
130 // connectivity is restored, consider clearing any dispatch backoff (bug | |
131 // 411981). | |
132 upload_task_queue_.reset(new TaskQueue<AttachmentId>( | |
133 base::Bind(&AttachmentServiceImpl::BeginUpload, | |
134 weak_ptr_factory_.GetWeakPtr()), | |
135 initial_backoff_delay, max_backoff_delay)); | |
136 | |
137 net::NetworkChangeNotifier::AddNetworkChangeObserver(this); | |
138 } | |
139 | |
140 AttachmentServiceImpl::~AttachmentServiceImpl() { | |
141 DCHECK(CalledOnValidThread()); | |
142 net::NetworkChangeNotifier::RemoveNetworkChangeObserver(this); | |
143 } | |
144 | |
145 void AttachmentServiceImpl::GetOrDownloadAttachments( | |
146 const AttachmentIdList& attachment_ids, | |
147 const GetOrDownloadCallback& callback) { | |
148 DCHECK(CalledOnValidThread()); | |
149 scoped_refptr<GetOrDownloadState> state( | |
150 new GetOrDownloadState(attachment_ids, callback)); | |
151 // SetModelTypeReference() makes attachments visible for model type. | |
152 // Needed when attachment doesn't have model type reference, but still | |
153 // available in local store. | |
154 attachment_store_->SetModelTypeReference(attachment_ids); | |
155 attachment_store_->Read(attachment_ids, | |
156 base::Bind(&AttachmentServiceImpl::ReadDone, | |
157 weak_ptr_factory_.GetWeakPtr(), state)); | |
158 } | |
159 | |
160 void AttachmentServiceImpl::ReadDone( | |
161 const scoped_refptr<GetOrDownloadState>& state, | |
162 const AttachmentStore::Result& result, | |
163 std::unique_ptr<AttachmentMap> attachments, | |
164 std::unique_ptr<AttachmentIdList> unavailable_attachment_ids) { | |
165 // Add read attachments to result. | |
166 for (AttachmentMap::const_iterator iter = attachments->begin(); | |
167 iter != attachments->end(); ++iter) { | |
168 state->AddAttachment(iter->second); | |
169 } | |
170 | |
171 AttachmentIdList::const_iterator iter = unavailable_attachment_ids->begin(); | |
172 AttachmentIdList::const_iterator end = unavailable_attachment_ids->end(); | |
173 if (result != AttachmentStore::STORE_INITIALIZATION_FAILED && | |
174 attachment_downloader_.get()) { | |
175 // Try to download locally unavailable attachments. | |
176 for (; iter != end; ++iter) { | |
177 attachment_downloader_->DownloadAttachment( | |
178 *iter, base::Bind(&AttachmentServiceImpl::DownloadDone, | |
179 weak_ptr_factory_.GetWeakPtr(), state, *iter)); | |
180 } | |
181 } else { | |
182 // No downloader so all locally unavailable attachments are unavailable. | |
183 for (; iter != end; ++iter) { | |
184 state->AddUnavailableAttachmentId(*iter); | |
185 } | |
186 } | |
187 } | |
188 | |
189 void AttachmentServiceImpl::WriteDone( | |
190 const scoped_refptr<GetOrDownloadState>& state, | |
191 const Attachment& attachment, | |
192 const AttachmentStore::Result& result) { | |
193 switch (result) { | |
194 case AttachmentStore::SUCCESS: | |
195 state->AddAttachment(attachment); | |
196 break; | |
197 case AttachmentStore::UNSPECIFIED_ERROR: | |
198 case AttachmentStore::STORE_INITIALIZATION_FAILED: | |
199 state->AddUnavailableAttachmentId(attachment.GetId()); | |
200 break; | |
201 } | |
202 } | |
203 | |
204 void AttachmentServiceImpl::UploadDone( | |
205 const AttachmentUploader::UploadResult& result, | |
206 const AttachmentId& attachment_id) { | |
207 DCHECK(CalledOnValidThread()); | |
208 AttachmentIdList ids; | |
209 ids.push_back(attachment_id); | |
210 switch (result) { | |
211 case AttachmentUploader::UPLOAD_SUCCESS: | |
212 attachment_store_->DropSyncReference(ids); | |
213 upload_task_queue_->MarkAsSucceeded(attachment_id); | |
214 if (delegate_) { | |
215 delegate_->OnAttachmentUploaded(attachment_id); | |
216 } | |
217 break; | |
218 case AttachmentUploader::UPLOAD_TRANSIENT_ERROR: | |
219 upload_task_queue_->MarkAsFailed(attachment_id); | |
220 upload_task_queue_->AddToQueue(attachment_id); | |
221 break; | |
222 case AttachmentUploader::UPLOAD_UNSPECIFIED_ERROR: | |
223 // TODO(pavely): crbug/372622: Deal with UploadAttachment failures. | |
224 attachment_store_->DropSyncReference(ids); | |
225 upload_task_queue_->MarkAsFailed(attachment_id); | |
226 break; | |
227 } | |
228 } | |
229 | |
230 void AttachmentServiceImpl::DownloadDone( | |
231 const scoped_refptr<GetOrDownloadState>& state, | |
232 const AttachmentId& attachment_id, | |
233 const AttachmentDownloader::DownloadResult& result, | |
234 std::unique_ptr<Attachment> attachment) { | |
235 switch (result) { | |
236 case AttachmentDownloader::DOWNLOAD_SUCCESS: { | |
237 AttachmentList attachment_list; | |
238 attachment_list.push_back(*attachment.get()); | |
239 attachment_store_->Write( | |
240 attachment_list, | |
241 base::Bind(&AttachmentServiceImpl::WriteDone, | |
242 weak_ptr_factory_.GetWeakPtr(), state, *attachment.get())); | |
243 break; | |
244 } | |
245 case AttachmentDownloader::DOWNLOAD_TRANSIENT_ERROR: | |
246 case AttachmentDownloader::DOWNLOAD_UNSPECIFIED_ERROR: | |
247 state->AddUnavailableAttachmentId(attachment_id); | |
248 break; | |
249 } | |
250 } | |
251 | |
252 void AttachmentServiceImpl::BeginUpload(const AttachmentId& attachment_id) { | |
253 DCHECK(CalledOnValidThread()); | |
254 AttachmentIdList attachment_ids; | |
255 attachment_ids.push_back(attachment_id); | |
256 attachment_store_->Read(attachment_ids, | |
257 base::Bind(&AttachmentServiceImpl::ReadDoneNowUpload, | |
258 weak_ptr_factory_.GetWeakPtr())); | |
259 } | |
260 | |
261 void AttachmentServiceImpl::UploadAttachments( | |
262 const AttachmentIdList& attachment_ids) { | |
263 DCHECK(CalledOnValidThread()); | |
264 if (!attachment_uploader_.get()) { | |
265 return; | |
266 } | |
267 attachment_store_->SetSyncReference(attachment_ids); | |
268 | |
269 for (auto iter = attachment_ids.begin(); iter != attachment_ids.end(); | |
270 ++iter) { | |
271 upload_task_queue_->AddToQueue(*iter); | |
272 } | |
273 } | |
274 | |
275 void AttachmentServiceImpl::OnNetworkChanged( | |
276 net::NetworkChangeNotifier::ConnectionType type) { | |
277 if (type != net::NetworkChangeNotifier::CONNECTION_NONE) { | |
278 upload_task_queue_->ResetBackoff(); | |
279 } | |
280 } | |
281 | |
282 void AttachmentServiceImpl::ReadDoneNowUpload( | |
283 const AttachmentStore::Result& result, | |
284 std::unique_ptr<AttachmentMap> attachments, | |
285 std::unique_ptr<AttachmentIdList> unavailable_attachment_ids) { | |
286 DCHECK(CalledOnValidThread()); | |
287 if (!unavailable_attachment_ids->empty()) { | |
288 // TODO(maniscalco): We failed to read some attachments. What should we do | |
289 // now? | |
290 AttachmentIdList::const_iterator iter = unavailable_attachment_ids->begin(); | |
291 AttachmentIdList::const_iterator end = unavailable_attachment_ids->end(); | |
292 for (; iter != end; ++iter) { | |
293 upload_task_queue_->Cancel(*iter); | |
294 } | |
295 attachment_store_->DropSyncReference(*unavailable_attachment_ids); | |
296 } | |
297 | |
298 AttachmentMap::const_iterator iter = attachments->begin(); | |
299 AttachmentMap::const_iterator end = attachments->end(); | |
300 for (; iter != end; ++iter) { | |
301 attachment_uploader_->UploadAttachment( | |
302 iter->second, base::Bind(&AttachmentServiceImpl::UploadDone, | |
303 weak_ptr_factory_.GetWeakPtr())); | |
304 } | |
305 } | |
306 | |
307 void AttachmentServiceImpl::SetTimerForTest( | |
308 std::unique_ptr<base::Timer> timer) { | |
309 upload_task_queue_->SetTimerForTest(std::move(timer)); | |
310 } | |
311 | |
312 } // namespace syncer | |
OLD | NEW |