Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(314)

Side by Side Diff: components/sync/api_impl/attachments/attachment_service_impl.cc

Issue 2401223002: [Sync] Renaming sync/api* to sync/model*. (Closed)
Patch Set: Missed a comment in a DEPS file, and rebasing. Created 4 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "components/sync/api_impl/attachments/attachment_service_impl.h"
6
7 #include <iterator>
8 #include <utility>
9
10 #include "base/bind.h"
11 #include "base/location.h"
12 #include "base/single_thread_task_runner.h"
13 #include "base/threading/thread_task_runner_handle.h"
14 #include "base/time/time.h"
15 #include "components/sync/api/attachments/attachment.h"
16 #include "components/sync/engine/attachments/fake_attachment_downloader.h"
17 #include "components/sync/engine/attachments/fake_attachment_uploader.h"
18
19 namespace syncer {
20
21 // GetOrDownloadAttachments starts multiple parallel DownloadAttachment calls.
22 // GetOrDownloadState tracks completion of these calls and posts callback for
23 // consumer once all attachments are either retrieved or reported unavailable.
24 class AttachmentServiceImpl::GetOrDownloadState
25 : public base::RefCounted<GetOrDownloadState>,
26 public base::NonThreadSafe {
27 public:
28 // GetOrDownloadState gets parameter from values passed to
29 // AttachmentService::GetOrDownloadAttachments.
30 // |attachment_ids| is a list of attachmens to retrieve.
31 // |callback| will be posted on current thread when all attachments retrieved
32 // or confirmed unavailable.
33 GetOrDownloadState(const AttachmentIdList& attachment_ids,
34 const GetOrDownloadCallback& callback);
35
36 // Attachment was just retrieved. Add it to retrieved attachments.
37 void AddAttachment(const Attachment& attachment);
38
39 // Both reading from local store and downloading attachment failed.
40 // Add it to unavailable set.
41 void AddUnavailableAttachmentId(const AttachmentId& attachment_id);
42
43 private:
44 friend class base::RefCounted<GetOrDownloadState>;
45 virtual ~GetOrDownloadState();
46
47 // If all attachment requests completed then post callback to consumer with
48 // results.
49 void PostResultIfAllRequestsCompleted();
50
51 GetOrDownloadCallback callback_;
52
53 // Requests for these attachments are still in progress.
54 AttachmentIdSet in_progress_attachments_;
55
56 AttachmentIdSet unavailable_attachments_;
57 std::unique_ptr<AttachmentMap> retrieved_attachments_;
58
59 DISALLOW_COPY_AND_ASSIGN(GetOrDownloadState);
60 };
61
62 AttachmentServiceImpl::GetOrDownloadState::GetOrDownloadState(
63 const AttachmentIdList& attachment_ids,
64 const GetOrDownloadCallback& callback)
65 : callback_(callback), retrieved_attachments_(new AttachmentMap()) {
66 std::copy(
67 attachment_ids.begin(), attachment_ids.end(),
68 std::inserter(in_progress_attachments_, in_progress_attachments_.end()));
69 PostResultIfAllRequestsCompleted();
70 }
71
72 AttachmentServiceImpl::GetOrDownloadState::~GetOrDownloadState() {
73 DCHECK(CalledOnValidThread());
74 }
75
76 void AttachmentServiceImpl::GetOrDownloadState::AddAttachment(
77 const Attachment& attachment) {
78 DCHECK(CalledOnValidThread());
79 DCHECK(retrieved_attachments_->find(attachment.GetId()) ==
80 retrieved_attachments_->end());
81 retrieved_attachments_->insert(
82 std::make_pair(attachment.GetId(), attachment));
83 DCHECK(in_progress_attachments_.find(attachment.GetId()) !=
84 in_progress_attachments_.end());
85 in_progress_attachments_.erase(attachment.GetId());
86 PostResultIfAllRequestsCompleted();
87 }
88
89 void AttachmentServiceImpl::GetOrDownloadState::AddUnavailableAttachmentId(
90 const AttachmentId& attachment_id) {
91 DCHECK(CalledOnValidThread());
92 DCHECK(unavailable_attachments_.find(attachment_id) ==
93 unavailable_attachments_.end());
94 unavailable_attachments_.insert(attachment_id);
95 DCHECK(in_progress_attachments_.find(attachment_id) !=
96 in_progress_attachments_.end());
97 in_progress_attachments_.erase(attachment_id);
98 PostResultIfAllRequestsCompleted();
99 }
100
101 void AttachmentServiceImpl::GetOrDownloadState::
102 PostResultIfAllRequestsCompleted() {
103 if (in_progress_attachments_.empty()) {
104 // All requests completed. Let's notify consumer.
105 GetOrDownloadResult result =
106 unavailable_attachments_.empty() ? GET_SUCCESS : GET_UNSPECIFIED_ERROR;
107 base::ThreadTaskRunnerHandle::Get()->PostTask(
108 FROM_HERE,
109 base::Bind(callback_, result, base::Passed(&retrieved_attachments_)));
110 }
111 }
112
113 AttachmentServiceImpl::AttachmentServiceImpl(
114 std::unique_ptr<AttachmentStoreForSync> attachment_store,
115 std::unique_ptr<AttachmentUploader> attachment_uploader,
116 std::unique_ptr<AttachmentDownloader> attachment_downloader,
117 Delegate* delegate,
118 const base::TimeDelta& initial_backoff_delay,
119 const base::TimeDelta& max_backoff_delay)
120 : attachment_store_(std::move(attachment_store)),
121 attachment_uploader_(std::move(attachment_uploader)),
122 attachment_downloader_(std::move(attachment_downloader)),
123 delegate_(delegate),
124 weak_ptr_factory_(this) {
125 DCHECK(CalledOnValidThread());
126 DCHECK(attachment_store_.get());
127
128 // TODO(maniscalco): Observe network connectivity change events. When the
129 // network becomes disconnected, consider suspending queue dispatch. When
130 // connectivity is restored, consider clearing any dispatch backoff (bug
131 // 411981).
132 upload_task_queue_.reset(new TaskQueue<AttachmentId>(
133 base::Bind(&AttachmentServiceImpl::BeginUpload,
134 weak_ptr_factory_.GetWeakPtr()),
135 initial_backoff_delay, max_backoff_delay));
136
137 net::NetworkChangeNotifier::AddNetworkChangeObserver(this);
138 }
139
140 AttachmentServiceImpl::~AttachmentServiceImpl() {
141 DCHECK(CalledOnValidThread());
142 net::NetworkChangeNotifier::RemoveNetworkChangeObserver(this);
143 }
144
145 void AttachmentServiceImpl::GetOrDownloadAttachments(
146 const AttachmentIdList& attachment_ids,
147 const GetOrDownloadCallback& callback) {
148 DCHECK(CalledOnValidThread());
149 scoped_refptr<GetOrDownloadState> state(
150 new GetOrDownloadState(attachment_ids, callback));
151 // SetModelTypeReference() makes attachments visible for model type.
152 // Needed when attachment doesn't have model type reference, but still
153 // available in local store.
154 attachment_store_->SetModelTypeReference(attachment_ids);
155 attachment_store_->Read(attachment_ids,
156 base::Bind(&AttachmentServiceImpl::ReadDone,
157 weak_ptr_factory_.GetWeakPtr(), state));
158 }
159
160 void AttachmentServiceImpl::ReadDone(
161 const scoped_refptr<GetOrDownloadState>& state,
162 const AttachmentStore::Result& result,
163 std::unique_ptr<AttachmentMap> attachments,
164 std::unique_ptr<AttachmentIdList> unavailable_attachment_ids) {
165 // Add read attachments to result.
166 for (AttachmentMap::const_iterator iter = attachments->begin();
167 iter != attachments->end(); ++iter) {
168 state->AddAttachment(iter->second);
169 }
170
171 AttachmentIdList::const_iterator iter = unavailable_attachment_ids->begin();
172 AttachmentIdList::const_iterator end = unavailable_attachment_ids->end();
173 if (result != AttachmentStore::STORE_INITIALIZATION_FAILED &&
174 attachment_downloader_.get()) {
175 // Try to download locally unavailable attachments.
176 for (; iter != end; ++iter) {
177 attachment_downloader_->DownloadAttachment(
178 *iter, base::Bind(&AttachmentServiceImpl::DownloadDone,
179 weak_ptr_factory_.GetWeakPtr(), state, *iter));
180 }
181 } else {
182 // No downloader so all locally unavailable attachments are unavailable.
183 for (; iter != end; ++iter) {
184 state->AddUnavailableAttachmentId(*iter);
185 }
186 }
187 }
188
189 void AttachmentServiceImpl::WriteDone(
190 const scoped_refptr<GetOrDownloadState>& state,
191 const Attachment& attachment,
192 const AttachmentStore::Result& result) {
193 switch (result) {
194 case AttachmentStore::SUCCESS:
195 state->AddAttachment(attachment);
196 break;
197 case AttachmentStore::UNSPECIFIED_ERROR:
198 case AttachmentStore::STORE_INITIALIZATION_FAILED:
199 state->AddUnavailableAttachmentId(attachment.GetId());
200 break;
201 }
202 }
203
204 void AttachmentServiceImpl::UploadDone(
205 const AttachmentUploader::UploadResult& result,
206 const AttachmentId& attachment_id) {
207 DCHECK(CalledOnValidThread());
208 AttachmentIdList ids;
209 ids.push_back(attachment_id);
210 switch (result) {
211 case AttachmentUploader::UPLOAD_SUCCESS:
212 attachment_store_->DropSyncReference(ids);
213 upload_task_queue_->MarkAsSucceeded(attachment_id);
214 if (delegate_) {
215 delegate_->OnAttachmentUploaded(attachment_id);
216 }
217 break;
218 case AttachmentUploader::UPLOAD_TRANSIENT_ERROR:
219 upload_task_queue_->MarkAsFailed(attachment_id);
220 upload_task_queue_->AddToQueue(attachment_id);
221 break;
222 case AttachmentUploader::UPLOAD_UNSPECIFIED_ERROR:
223 // TODO(pavely): crbug/372622: Deal with UploadAttachment failures.
224 attachment_store_->DropSyncReference(ids);
225 upload_task_queue_->MarkAsFailed(attachment_id);
226 break;
227 }
228 }
229
230 void AttachmentServiceImpl::DownloadDone(
231 const scoped_refptr<GetOrDownloadState>& state,
232 const AttachmentId& attachment_id,
233 const AttachmentDownloader::DownloadResult& result,
234 std::unique_ptr<Attachment> attachment) {
235 switch (result) {
236 case AttachmentDownloader::DOWNLOAD_SUCCESS: {
237 AttachmentList attachment_list;
238 attachment_list.push_back(*attachment.get());
239 attachment_store_->Write(
240 attachment_list,
241 base::Bind(&AttachmentServiceImpl::WriteDone,
242 weak_ptr_factory_.GetWeakPtr(), state, *attachment.get()));
243 break;
244 }
245 case AttachmentDownloader::DOWNLOAD_TRANSIENT_ERROR:
246 case AttachmentDownloader::DOWNLOAD_UNSPECIFIED_ERROR:
247 state->AddUnavailableAttachmentId(attachment_id);
248 break;
249 }
250 }
251
252 void AttachmentServiceImpl::BeginUpload(const AttachmentId& attachment_id) {
253 DCHECK(CalledOnValidThread());
254 AttachmentIdList attachment_ids;
255 attachment_ids.push_back(attachment_id);
256 attachment_store_->Read(attachment_ids,
257 base::Bind(&AttachmentServiceImpl::ReadDoneNowUpload,
258 weak_ptr_factory_.GetWeakPtr()));
259 }
260
261 void AttachmentServiceImpl::UploadAttachments(
262 const AttachmentIdList& attachment_ids) {
263 DCHECK(CalledOnValidThread());
264 if (!attachment_uploader_.get()) {
265 return;
266 }
267 attachment_store_->SetSyncReference(attachment_ids);
268
269 for (auto iter = attachment_ids.begin(); iter != attachment_ids.end();
270 ++iter) {
271 upload_task_queue_->AddToQueue(*iter);
272 }
273 }
274
275 void AttachmentServiceImpl::OnNetworkChanged(
276 net::NetworkChangeNotifier::ConnectionType type) {
277 if (type != net::NetworkChangeNotifier::CONNECTION_NONE) {
278 upload_task_queue_->ResetBackoff();
279 }
280 }
281
282 void AttachmentServiceImpl::ReadDoneNowUpload(
283 const AttachmentStore::Result& result,
284 std::unique_ptr<AttachmentMap> attachments,
285 std::unique_ptr<AttachmentIdList> unavailable_attachment_ids) {
286 DCHECK(CalledOnValidThread());
287 if (!unavailable_attachment_ids->empty()) {
288 // TODO(maniscalco): We failed to read some attachments. What should we do
289 // now?
290 AttachmentIdList::const_iterator iter = unavailable_attachment_ids->begin();
291 AttachmentIdList::const_iterator end = unavailable_attachment_ids->end();
292 for (; iter != end; ++iter) {
293 upload_task_queue_->Cancel(*iter);
294 }
295 attachment_store_->DropSyncReference(*unavailable_attachment_ids);
296 }
297
298 AttachmentMap::const_iterator iter = attachments->begin();
299 AttachmentMap::const_iterator end = attachments->end();
300 for (; iter != end; ++iter) {
301 attachment_uploader_->UploadAttachment(
302 iter->second, base::Bind(&AttachmentServiceImpl::UploadDone,
303 weak_ptr_factory_.GetWeakPtr()));
304 }
305 }
306
307 void AttachmentServiceImpl::SetTimerForTest(
308 std::unique_ptr<base::Timer> timer) {
309 upload_task_queue_->SetTimerForTest(std::move(timer));
310 }
311
312 } // namespace syncer
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698