Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1010)

Side by Side Diff: sync/internal_api/attachments/attachment_service_impl.cc

Issue 2130453004: [Sync] Move //sync to //components/sync. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Rebase. Created 4 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "sync/internal_api/public/attachments/attachment_service_impl.h"
6
7 #include <iterator>
8 #include <utility>
9
10 #include "base/bind.h"
11 #include "base/location.h"
12 #include "base/macros.h"
13 #include "base/single_thread_task_runner.h"
14 #include "base/threading/thread_task_runner_handle.h"
15 #include "base/time/time.h"
16 #include "sync/api/attachments/attachment.h"
17 #include "sync/internal_api/public/attachments/fake_attachment_downloader.h"
18 #include "sync/internal_api/public/attachments/fake_attachment_uploader.h"
19
20 namespace syncer {
21
22 // GetOrDownloadAttachments starts multiple parallel DownloadAttachment calls.
23 // GetOrDownloadState tracks completion of these calls and posts callback for
24 // consumer once all attachments are either retrieved or reported unavailable.
25 class AttachmentServiceImpl::GetOrDownloadState
26 : public base::RefCounted<GetOrDownloadState>,
27 public base::NonThreadSafe {
28 public:
29 // GetOrDownloadState gets parameter from values passed to
30 // AttachmentService::GetOrDownloadAttachments.
31 // |attachment_ids| is a list of attachmens to retrieve.
32 // |callback| will be posted on current thread when all attachments retrieved
33 // or confirmed unavailable.
34 GetOrDownloadState(const AttachmentIdList& attachment_ids,
35 const GetOrDownloadCallback& callback);
36
37 // Attachment was just retrieved. Add it to retrieved attachments.
38 void AddAttachment(const Attachment& attachment);
39
40 // Both reading from local store and downloading attachment failed.
41 // Add it to unavailable set.
42 void AddUnavailableAttachmentId(const AttachmentId& attachment_id);
43
44 private:
45 friend class base::RefCounted<GetOrDownloadState>;
46 virtual ~GetOrDownloadState();
47
48 // If all attachment requests completed then post callback to consumer with
49 // results.
50 void PostResultIfAllRequestsCompleted();
51
52 GetOrDownloadCallback callback_;
53
54 // Requests for these attachments are still in progress.
55 AttachmentIdSet in_progress_attachments_;
56
57 AttachmentIdSet unavailable_attachments_;
58 std::unique_ptr<AttachmentMap> retrieved_attachments_;
59
60 DISALLOW_COPY_AND_ASSIGN(GetOrDownloadState);
61 };
62
63 AttachmentServiceImpl::GetOrDownloadState::GetOrDownloadState(
64 const AttachmentIdList& attachment_ids,
65 const GetOrDownloadCallback& callback)
66 : callback_(callback), retrieved_attachments_(new AttachmentMap()) {
67 std::copy(
68 attachment_ids.begin(),
69 attachment_ids.end(),
70 std::inserter(in_progress_attachments_, in_progress_attachments_.end()));
71 PostResultIfAllRequestsCompleted();
72 }
73
74 AttachmentServiceImpl::GetOrDownloadState::~GetOrDownloadState() {
75 DCHECK(CalledOnValidThread());
76 }
77
78 void AttachmentServiceImpl::GetOrDownloadState::AddAttachment(
79 const Attachment& attachment) {
80 DCHECK(CalledOnValidThread());
81 DCHECK(retrieved_attachments_->find(attachment.GetId()) ==
82 retrieved_attachments_->end());
83 retrieved_attachments_->insert(
84 std::make_pair(attachment.GetId(), attachment));
85 DCHECK(in_progress_attachments_.find(attachment.GetId()) !=
86 in_progress_attachments_.end());
87 in_progress_attachments_.erase(attachment.GetId());
88 PostResultIfAllRequestsCompleted();
89 }
90
91 void AttachmentServiceImpl::GetOrDownloadState::AddUnavailableAttachmentId(
92 const AttachmentId& attachment_id) {
93 DCHECK(CalledOnValidThread());
94 DCHECK(unavailable_attachments_.find(attachment_id) ==
95 unavailable_attachments_.end());
96 unavailable_attachments_.insert(attachment_id);
97 DCHECK(in_progress_attachments_.find(attachment_id) !=
98 in_progress_attachments_.end());
99 in_progress_attachments_.erase(attachment_id);
100 PostResultIfAllRequestsCompleted();
101 }
102
103 void
104 AttachmentServiceImpl::GetOrDownloadState::PostResultIfAllRequestsCompleted() {
105 if (in_progress_attachments_.empty()) {
106 // All requests completed. Let's notify consumer.
107 GetOrDownloadResult result =
108 unavailable_attachments_.empty() ? GET_SUCCESS : GET_UNSPECIFIED_ERROR;
109 base::ThreadTaskRunnerHandle::Get()->PostTask(
110 FROM_HERE,
111 base::Bind(callback_, result, base::Passed(&retrieved_attachments_)));
112 }
113 }
114
115 AttachmentServiceImpl::AttachmentServiceImpl(
116 std::unique_ptr<AttachmentStoreForSync> attachment_store,
117 std::unique_ptr<AttachmentUploader> attachment_uploader,
118 std::unique_ptr<AttachmentDownloader> attachment_downloader,
119 Delegate* delegate,
120 const base::TimeDelta& initial_backoff_delay,
121 const base::TimeDelta& max_backoff_delay)
122 : attachment_store_(std::move(attachment_store)),
123 attachment_uploader_(std::move(attachment_uploader)),
124 attachment_downloader_(std::move(attachment_downloader)),
125 delegate_(delegate),
126 weak_ptr_factory_(this) {
127 DCHECK(CalledOnValidThread());
128 DCHECK(attachment_store_.get());
129
130 // TODO(maniscalco): Observe network connectivity change events. When the
131 // network becomes disconnected, consider suspending queue dispatch. When
132 // connectivity is restored, consider clearing any dispatch backoff (bug
133 // 411981).
134 upload_task_queue_.reset(new TaskQueue<AttachmentId>(
135 base::Bind(&AttachmentServiceImpl::BeginUpload,
136 weak_ptr_factory_.GetWeakPtr()),
137 initial_backoff_delay,
138 max_backoff_delay));
139
140 net::NetworkChangeNotifier::AddNetworkChangeObserver(this);
141 }
142
143 AttachmentServiceImpl::~AttachmentServiceImpl() {
144 DCHECK(CalledOnValidThread());
145 net::NetworkChangeNotifier::RemoveNetworkChangeObserver(this);
146 }
147
148 // Static.
149 std::unique_ptr<syncer::AttachmentService>
150 AttachmentServiceImpl::CreateForTest() {
151 std::unique_ptr<syncer::AttachmentStore> attachment_store =
152 AttachmentStore::CreateInMemoryStore();
153 std::unique_ptr<AttachmentUploader> attachment_uploader(
154 new FakeAttachmentUploader);
155 std::unique_ptr<AttachmentDownloader> attachment_downloader(
156 new FakeAttachmentDownloader());
157 std::unique_ptr<syncer::AttachmentService> attachment_service(
158 new syncer::AttachmentServiceImpl(
159 attachment_store->CreateAttachmentStoreForSync(),
160 std::move(attachment_uploader), std::move(attachment_downloader),
161 NULL, base::TimeDelta(), base::TimeDelta()));
162 return attachment_service;
163 }
164
165 void AttachmentServiceImpl::GetOrDownloadAttachments(
166 const AttachmentIdList& attachment_ids,
167 const GetOrDownloadCallback& callback) {
168 DCHECK(CalledOnValidThread());
169 scoped_refptr<GetOrDownloadState> state(
170 new GetOrDownloadState(attachment_ids, callback));
171 // SetModelTypeReference() makes attachments visible for model type.
172 // Needed when attachment doesn't have model type reference, but still
173 // available in local store.
174 attachment_store_->SetModelTypeReference(attachment_ids);
175 attachment_store_->Read(attachment_ids,
176 base::Bind(&AttachmentServiceImpl::ReadDone,
177 weak_ptr_factory_.GetWeakPtr(), state));
178 }
179
180 void AttachmentServiceImpl::ReadDone(
181 const scoped_refptr<GetOrDownloadState>& state,
182 const AttachmentStore::Result& result,
183 std::unique_ptr<AttachmentMap> attachments,
184 std::unique_ptr<AttachmentIdList> unavailable_attachment_ids) {
185 // Add read attachments to result.
186 for (AttachmentMap::const_iterator iter = attachments->begin();
187 iter != attachments->end();
188 ++iter) {
189 state->AddAttachment(iter->second);
190 }
191
192 AttachmentIdList::const_iterator iter = unavailable_attachment_ids->begin();
193 AttachmentIdList::const_iterator end = unavailable_attachment_ids->end();
194 if (result != AttachmentStore::STORE_INITIALIZATION_FAILED &&
195 attachment_downloader_.get()) {
196 // Try to download locally unavailable attachments.
197 for (; iter != end; ++iter) {
198 attachment_downloader_->DownloadAttachment(
199 *iter,
200 base::Bind(&AttachmentServiceImpl::DownloadDone,
201 weak_ptr_factory_.GetWeakPtr(),
202 state,
203 *iter));
204 }
205 } else {
206 // No downloader so all locally unavailable attachments are unavailable.
207 for (; iter != end; ++iter) {
208 state->AddUnavailableAttachmentId(*iter);
209 }
210 }
211 }
212
213 void AttachmentServiceImpl::WriteDone(
214 const scoped_refptr<GetOrDownloadState>& state,
215 const Attachment& attachment,
216 const AttachmentStore::Result& result) {
217 switch (result) {
218 case AttachmentStore::SUCCESS:
219 state->AddAttachment(attachment);
220 break;
221 case AttachmentStore::UNSPECIFIED_ERROR:
222 case AttachmentStore::STORE_INITIALIZATION_FAILED:
223 state->AddUnavailableAttachmentId(attachment.GetId());
224 break;
225 }
226 }
227
228 void AttachmentServiceImpl::UploadDone(
229 const AttachmentUploader::UploadResult& result,
230 const AttachmentId& attachment_id) {
231 DCHECK(CalledOnValidThread());
232 AttachmentIdList ids;
233 ids.push_back(attachment_id);
234 switch (result) {
235 case AttachmentUploader::UPLOAD_SUCCESS:
236 attachment_store_->DropSyncReference(ids);
237 upload_task_queue_->MarkAsSucceeded(attachment_id);
238 if (delegate_) {
239 delegate_->OnAttachmentUploaded(attachment_id);
240 }
241 break;
242 case AttachmentUploader::UPLOAD_TRANSIENT_ERROR:
243 upload_task_queue_->MarkAsFailed(attachment_id);
244 upload_task_queue_->AddToQueue(attachment_id);
245 break;
246 case AttachmentUploader::UPLOAD_UNSPECIFIED_ERROR:
247 // TODO(pavely): crbug/372622: Deal with UploadAttachment failures.
248 attachment_store_->DropSyncReference(ids);
249 upload_task_queue_->MarkAsFailed(attachment_id);
250 break;
251 }
252 }
253
254 void AttachmentServiceImpl::DownloadDone(
255 const scoped_refptr<GetOrDownloadState>& state,
256 const AttachmentId& attachment_id,
257 const AttachmentDownloader::DownloadResult& result,
258 std::unique_ptr<Attachment> attachment) {
259 switch (result) {
260 case AttachmentDownloader::DOWNLOAD_SUCCESS: {
261 AttachmentList attachment_list;
262 attachment_list.push_back(*attachment.get());
263 attachment_store_->Write(
264 attachment_list,
265 base::Bind(&AttachmentServiceImpl::WriteDone,
266 weak_ptr_factory_.GetWeakPtr(), state, *attachment.get()));
267 break;
268 }
269 case AttachmentDownloader::DOWNLOAD_TRANSIENT_ERROR:
270 case AttachmentDownloader::DOWNLOAD_UNSPECIFIED_ERROR:
271 state->AddUnavailableAttachmentId(attachment_id);
272 break;
273 }
274 }
275
276 void AttachmentServiceImpl::BeginUpload(const AttachmentId& attachment_id) {
277 DCHECK(CalledOnValidThread());
278 AttachmentIdList attachment_ids;
279 attachment_ids.push_back(attachment_id);
280 attachment_store_->Read(attachment_ids,
281 base::Bind(&AttachmentServiceImpl::ReadDoneNowUpload,
282 weak_ptr_factory_.GetWeakPtr()));
283 }
284
285 void AttachmentServiceImpl::UploadAttachments(
286 const AttachmentIdList& attachment_ids) {
287 DCHECK(CalledOnValidThread());
288 if (!attachment_uploader_.get()) {
289 return;
290 }
291 attachment_store_->SetSyncReference(attachment_ids);
292
293 for (auto iter = attachment_ids.begin(); iter != attachment_ids.end();
294 ++iter) {
295 upload_task_queue_->AddToQueue(*iter);
296 }
297 }
298
299 void AttachmentServiceImpl::OnNetworkChanged(
300 net::NetworkChangeNotifier::ConnectionType type) {
301 if (type != net::NetworkChangeNotifier::CONNECTION_NONE) {
302 upload_task_queue_->ResetBackoff();
303 }
304 }
305
306 void AttachmentServiceImpl::ReadDoneNowUpload(
307 const AttachmentStore::Result& result,
308 std::unique_ptr<AttachmentMap> attachments,
309 std::unique_ptr<AttachmentIdList> unavailable_attachment_ids) {
310 DCHECK(CalledOnValidThread());
311 if (!unavailable_attachment_ids->empty()) {
312 // TODO(maniscalco): We failed to read some attachments. What should we do
313 // now?
314 AttachmentIdList::const_iterator iter = unavailable_attachment_ids->begin();
315 AttachmentIdList::const_iterator end = unavailable_attachment_ids->end();
316 for (; iter != end; ++iter) {
317 upload_task_queue_->Cancel(*iter);
318 }
319 attachment_store_->DropSyncReference(*unavailable_attachment_ids);
320 }
321
322 AttachmentMap::const_iterator iter = attachments->begin();
323 AttachmentMap::const_iterator end = attachments->end();
324 for (; iter != end; ++iter) {
325 attachment_uploader_->UploadAttachment(
326 iter->second,
327 base::Bind(&AttachmentServiceImpl::UploadDone,
328 weak_ptr_factory_.GetWeakPtr()));
329 }
330 }
331
332 void AttachmentServiceImpl::SetTimerForTest(
333 std::unique_ptr<base::Timer> timer) {
334 upload_task_queue_->SetTimerForTest(std::move(timer));
335 }
336
337 } // namespace syncer
OLDNEW
« no previous file with comments | « sync/internal_api/attachments/attachment_service.cc ('k') | sync/internal_api/attachments/attachment_service_impl_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698