OLD | NEW |
| (Empty) |
1 // Copyright 2014 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 #include "sync/engine/model_type_worker.h" | |
6 | |
7 #include <stddef.h> | |
8 #include <stdint.h> | |
9 | |
10 #include <utility> | |
11 #include <vector> | |
12 | |
13 #include "base/bind.h" | |
14 #include "base/format_macros.h" | |
15 #include "base/guid.h" | |
16 #include "base/logging.h" | |
17 #include "base/memory/ptr_util.h" | |
18 #include "base/strings/stringprintf.h" | |
19 #include "sync/engine/commit_contribution.h" | |
20 #include "sync/engine/non_blocking_type_commit_contribution.h" | |
21 #include "sync/engine/worker_entity_tracker.h" | |
22 #include "sync/internal_api/public/model_type_processor.h" | |
23 #include "sync/syncable/syncable_util.h" | |
24 #include "sync/util/cryptographer.h" | |
25 #include "sync/util/time.h" | |
26 | |
27 namespace syncer_v2 { | |
28 | |
29 using syncer::CommitContribution; | |
30 using syncer::Cryptographer; | |
31 using syncer::ModelType; | |
32 using syncer::NudgeHandler; | |
33 using syncer::SyncerError; | |
34 | |
35 ModelTypeWorker::ModelTypeWorker( | |
36 ModelType type, | |
37 const sync_pb::DataTypeState& initial_state, | |
38 std::unique_ptr<Cryptographer> cryptographer, | |
39 NudgeHandler* nudge_handler, | |
40 std::unique_ptr<ModelTypeProcessor> model_type_processor) | |
41 : type_(type), | |
42 data_type_state_(initial_state), | |
43 model_type_processor_(std::move(model_type_processor)), | |
44 cryptographer_(std::move(cryptographer)), | |
45 nudge_handler_(nudge_handler), | |
46 weak_ptr_factory_(this) { | |
47 DCHECK(model_type_processor_); | |
48 | |
49 // Request an initial sync if it hasn't been completed yet. | |
50 if (!data_type_state_.initial_sync_done()) { | |
51 nudge_handler_->NudgeForInitialDownload(type_); | |
52 } | |
53 | |
54 if (cryptographer_) { | |
55 DVLOG(1) << ModelTypeToString(type_) << ": Starting with encryption key " | |
56 << cryptographer_->GetDefaultNigoriKeyName(); | |
57 OnCryptographerUpdated(); | |
58 } | |
59 } | |
60 | |
61 ModelTypeWorker::~ModelTypeWorker() { | |
62 model_type_processor_->DisconnectSync(); | |
63 } | |
64 | |
65 ModelType ModelTypeWorker::GetModelType() const { | |
66 DCHECK(CalledOnValidThread()); | |
67 return type_; | |
68 } | |
69 | |
70 bool ModelTypeWorker::IsEncryptionRequired() const { | |
71 return !!cryptographer_; | |
72 } | |
73 | |
74 void ModelTypeWorker::UpdateCryptographer( | |
75 std::unique_ptr<Cryptographer> cryptographer) { | |
76 DCHECK(cryptographer); | |
77 cryptographer_ = std::move(cryptographer); | |
78 | |
79 // Update our state and that of the proxy. | |
80 OnCryptographerUpdated(); | |
81 | |
82 // Nudge the scheduler if we're now allowed to commit. | |
83 if (CanCommitItems()) | |
84 nudge_handler_->NudgeForCommit(type_); | |
85 } | |
86 | |
87 // UpdateHandler implementation. | |
88 bool ModelTypeWorker::IsInitialSyncEnded() const { | |
89 return data_type_state_.initial_sync_done(); | |
90 } | |
91 | |
92 void ModelTypeWorker::GetDownloadProgress( | |
93 sync_pb::DataTypeProgressMarker* progress_marker) const { | |
94 DCHECK(CalledOnValidThread()); | |
95 progress_marker->CopyFrom(data_type_state_.progress_marker()); | |
96 } | |
97 | |
98 void ModelTypeWorker::GetDataTypeContext( | |
99 sync_pb::DataTypeContext* context) const { | |
100 DCHECK(CalledOnValidThread()); | |
101 context->CopyFrom(data_type_state_.type_context()); | |
102 } | |
103 | |
104 SyncerError ModelTypeWorker::ProcessGetUpdatesResponse( | |
105 const sync_pb::DataTypeProgressMarker& progress_marker, | |
106 const sync_pb::DataTypeContext& mutated_context, | |
107 const SyncEntityList& applicable_updates, | |
108 syncer::sessions::StatusController* status) { | |
109 DCHECK(CalledOnValidThread()); | |
110 | |
111 // TODO(rlarocque): Handle data type context conflicts. | |
112 *data_type_state_.mutable_type_context() = mutated_context; | |
113 *data_type_state_.mutable_progress_marker() = progress_marker; | |
114 | |
115 for (const sync_pb::SyncEntity* update_entity : applicable_updates) { | |
116 // Skip updates for permanent folders. | |
117 // TODO(stanisc): crbug.com/516866: might need to handle this for | |
118 // hierarchical datatypes. | |
119 if (!update_entity->server_defined_unique_tag().empty()) | |
120 continue; | |
121 | |
122 // Normal updates are handled here. | |
123 const std::string& client_tag_hash = | |
124 update_entity->client_defined_unique_tag(); | |
125 | |
126 // TODO(stanisc): crbug.com/516866: this wouldn't be true for bookmarks. | |
127 DCHECK(!client_tag_hash.empty()); | |
128 | |
129 // Prepare the message for the model thread. | |
130 EntityData data; | |
131 data.id = update_entity->id_string(); | |
132 data.client_tag_hash = client_tag_hash; | |
133 data.creation_time = syncer::ProtoTimeToTime(update_entity->ctime()); | |
134 data.modification_time = syncer::ProtoTimeToTime(update_entity->mtime()); | |
135 data.non_unique_name = update_entity->name(); | |
136 | |
137 UpdateResponseData response_data; | |
138 response_data.response_version = update_entity->version(); | |
139 | |
140 WorkerEntityTracker* entity = GetOrCreateEntityTracker(data); | |
141 | |
142 // Check if specifics are encrypted and try to decrypt if so. | |
143 const sync_pb::EntitySpecifics& specifics = update_entity->specifics(); | |
144 if (!specifics.has_encrypted()) { | |
145 // No encryption. | |
146 entity->ReceiveUpdate(update_entity->version()); | |
147 data.specifics = specifics; | |
148 response_data.entity = data.PassToPtr(); | |
149 pending_updates_.push_back(response_data); | |
150 } else if (specifics.has_encrypted() && cryptographer_ && | |
151 cryptographer_->CanDecrypt(specifics.encrypted())) { | |
152 // Encrypted, but we know the key. | |
153 if (DecryptSpecifics(cryptographer_.get(), specifics, &data.specifics)) { | |
154 entity->ReceiveUpdate(update_entity->version()); | |
155 response_data.entity = data.PassToPtr(); | |
156 response_data.encryption_key_name = specifics.encrypted().key_name(); | |
157 pending_updates_.push_back(response_data); | |
158 } | |
159 } else if (specifics.has_encrypted() && | |
160 (!cryptographer_ || | |
161 !cryptographer_->CanDecrypt(specifics.encrypted()))) { | |
162 // Can't decrypt right now. Ask the entity tracker to handle it. | |
163 data.specifics = specifics; | |
164 response_data.entity = data.PassToPtr(); | |
165 entity->ReceiveEncryptedUpdate(response_data); | |
166 } | |
167 } | |
168 | |
169 return syncer::SYNCER_OK; | |
170 } | |
171 | |
172 void ModelTypeWorker::ApplyUpdates(syncer::sessions::StatusController* status) { | |
173 DCHECK(CalledOnValidThread()); | |
174 // This should only ever be called after one PassiveApplyUpdates. | |
175 DCHECK(data_type_state_.initial_sync_done()); | |
176 // Download cycle is done, pass all updates to the processor. | |
177 ApplyPendingUpdates(); | |
178 } | |
179 | |
180 void ModelTypeWorker::PassiveApplyUpdates( | |
181 syncer::sessions::StatusController* status) { | |
182 // This should only be called at the end of the very first download cycle. | |
183 DCHECK(!data_type_state_.initial_sync_done()); | |
184 // Indicate to the processor that the initial download is done. The initial | |
185 // sync technically isn't done yet but by the time this value is persisted to | |
186 // disk on the model thread it will be. | |
187 data_type_state_.set_initial_sync_done(true); | |
188 ApplyPendingUpdates(); | |
189 } | |
190 | |
191 void ModelTypeWorker::ApplyPendingUpdates() { | |
192 DVLOG(1) << ModelTypeToString(type_) << ": " | |
193 << base::StringPrintf("Delivering %" PRIuS " applicable updates.", | |
194 pending_updates_.size()); | |
195 model_type_processor_->OnUpdateReceived(data_type_state_, pending_updates_); | |
196 pending_updates_.clear(); | |
197 } | |
198 | |
199 void ModelTypeWorker::EnqueueForCommit(const CommitRequestDataList& list) { | |
200 DCHECK(CalledOnValidThread()); | |
201 | |
202 DCHECK(IsTypeInitialized()) | |
203 << "Asked to commit items before type was initialized. " | |
204 << "ModelType is: " << ModelTypeToString(type_); | |
205 | |
206 for (const CommitRequestData& commit : list) { | |
207 const EntityData& data = commit.entity.value(); | |
208 if (!data.is_deleted()) { | |
209 DCHECK_EQ(type_, syncer::GetModelTypeFromSpecifics(data.specifics)); | |
210 } | |
211 GetOrCreateEntityTracker(data)->RequestCommit(commit); | |
212 } | |
213 | |
214 if (CanCommitItems()) | |
215 nudge_handler_->NudgeForCommit(type_); | |
216 } | |
217 | |
218 // CommitContributor implementation. | |
219 std::unique_ptr<CommitContribution> ModelTypeWorker::GetContribution( | |
220 size_t max_entries) { | |
221 DCHECK(CalledOnValidThread()); | |
222 // There shouldn't be a GetUpdates in progress when a commit is triggered. | |
223 DCHECK(pending_updates_.empty()); | |
224 | |
225 size_t space_remaining = max_entries; | |
226 google::protobuf::RepeatedPtrField<sync_pb::SyncEntity> commit_entities; | |
227 | |
228 if (!CanCommitItems()) | |
229 return std::unique_ptr<CommitContribution>(); | |
230 | |
231 // TODO(rlarocque): Avoid iterating here. | |
232 for (EntityMap::const_iterator it = entities_.begin(); | |
233 it != entities_.end() && space_remaining > 0; ++it) { | |
234 WorkerEntityTracker* entity = it->second.get(); | |
235 if (entity->HasPendingCommit()) { | |
236 sync_pb::SyncEntity* commit_entity = commit_entities.Add(); | |
237 entity->PopulateCommitProto(commit_entity); | |
238 AdjustCommitProto(commit_entity); | |
239 space_remaining--; | |
240 } | |
241 } | |
242 | |
243 if (commit_entities.size() == 0) | |
244 return std::unique_ptr<CommitContribution>(); | |
245 | |
246 return std::unique_ptr<CommitContribution>( | |
247 new NonBlockingTypeCommitContribution(data_type_state_.type_context(), | |
248 commit_entities, this)); | |
249 } | |
250 | |
251 void ModelTypeWorker::OnCommitResponse(CommitResponseDataList* response_list) { | |
252 for (CommitResponseData& response : *response_list) { | |
253 WorkerEntityTracker* entity = GetEntityTracker(response.client_tag_hash); | |
254 | |
255 // There's no way we could have committed an entry we know nothing about. | |
256 if (entity == nullptr) { | |
257 NOTREACHED() << "Received commit response for item unknown to us." | |
258 << " Model type: " << ModelTypeToString(type_) | |
259 << " ID: " << response.id; | |
260 continue; | |
261 } | |
262 | |
263 entity->ReceiveCommitResponse(&response); | |
264 } | |
265 | |
266 // Send the responses back to the model thread. It needs to know which | |
267 // items have been successfully committed so it can save that information in | |
268 // permanent storage. | |
269 model_type_processor_->OnCommitCompleted(data_type_state_, *response_list); | |
270 } | |
271 | |
272 base::WeakPtr<ModelTypeWorker> ModelTypeWorker::AsWeakPtr() { | |
273 return weak_ptr_factory_.GetWeakPtr(); | |
274 } | |
275 | |
276 bool ModelTypeWorker::IsTypeInitialized() const { | |
277 return data_type_state_.initial_sync_done() && | |
278 !data_type_state_.progress_marker().token().empty(); | |
279 } | |
280 | |
281 bool ModelTypeWorker::CanCommitItems() const { | |
282 // We can't commit anything until we know the type's parent node. | |
283 // We'll get it in the first update response. | |
284 if (!IsTypeInitialized()) | |
285 return false; | |
286 | |
287 // Don't commit if we should be encrypting but don't have the required keys. | |
288 if (IsEncryptionRequired() && | |
289 (!cryptographer_ || !cryptographer_->is_ready())) { | |
290 return false; | |
291 } | |
292 | |
293 return true; | |
294 } | |
295 | |
296 void ModelTypeWorker::AdjustCommitProto(sync_pb::SyncEntity* sync_entity) { | |
297 DCHECK(CanCommitItems()); | |
298 | |
299 // Initial commits need our help to generate a client ID. | |
300 if (sync_entity->version() == kUncommittedVersion) { | |
301 DCHECK(!sync_entity->has_id_string()); | |
302 // TODO(stanisc): This is incorrect for bookmarks for two reasons: | |
303 // 1) Won't be able to match previously committed bookmarks to the ones | |
304 // with server ID. | |
305 // 2) Recommitting an item in a case of failing to receive commit response | |
306 // would result in generating a different client ID, which in turn | |
307 // would result in a duplication. | |
308 // We should generate client ID on the frontend side instead. | |
309 sync_entity->set_id_string(base::GenerateGUID()); | |
310 sync_entity->set_version(0); | |
311 } else { | |
312 DCHECK(sync_entity->has_id_string()); | |
313 } | |
314 | |
315 // Encrypt the specifics and hide the title if necessary. | |
316 if (IsEncryptionRequired()) { | |
317 // IsEncryptionRequired() && CanCommitItems() implies | |
318 // that the cryptographer is valid and ready to encrypt. | |
319 sync_pb::EntitySpecifics encrypted_specifics; | |
320 bool result = cryptographer_->Encrypt( | |
321 sync_entity->specifics(), encrypted_specifics.mutable_encrypted()); | |
322 DCHECK(result); | |
323 sync_entity->mutable_specifics()->CopyFrom(encrypted_specifics); | |
324 sync_entity->set_name("encrypted"); | |
325 } | |
326 | |
327 // Always include enough specifics to identify the type. Do this even in | |
328 // deletion requests, where the specifics are otherwise invalid. | |
329 AddDefaultFieldValue(type_, sync_entity->mutable_specifics()); | |
330 | |
331 // TODO(stanisc): crbug.com/516866: | |
332 // Call sync_entity->set_parent_id_string(...) for hierarchical entities here. | |
333 } | |
334 | |
335 void ModelTypeWorker::OnCryptographerUpdated() { | |
336 DCHECK(cryptographer_); | |
337 | |
338 bool new_encryption_key = false; | |
339 UpdateResponseDataList response_datas; | |
340 | |
341 const std::string& new_key_name = cryptographer_->GetDefaultNigoriKeyName(); | |
342 | |
343 // Handle a change in encryption key. | |
344 if (data_type_state_.encryption_key_name() != new_key_name) { | |
345 DVLOG(1) << ModelTypeToString(type_) << ": Updating encryption key " | |
346 << data_type_state_.encryption_key_name() << " -> " | |
347 << new_key_name; | |
348 data_type_state_.set_encryption_key_name(new_key_name); | |
349 new_encryption_key = true; | |
350 } | |
351 | |
352 for (EntityMap::const_iterator it = entities_.begin(); it != entities_.end(); | |
353 ++it) { | |
354 if (it->second->HasEncryptedUpdate()) { | |
355 const UpdateResponseData& encrypted_update = | |
356 it->second->GetEncryptedUpdate(); | |
357 const EntityData& data = encrypted_update.entity.value(); | |
358 | |
359 // We assume all pending updates are encrypted items for which we | |
360 // don't have the key. | |
361 DCHECK(data.specifics.has_encrypted()); | |
362 | |
363 if (cryptographer_->CanDecrypt(data.specifics.encrypted())) { | |
364 EntityData decrypted_data; | |
365 if (DecryptSpecifics(cryptographer_.get(), data.specifics, | |
366 &decrypted_data.specifics)) { | |
367 // Copy other fields one by one since EntityData doesn't allow | |
368 // copying. | |
369 // TODO(stanisc): this code is likely to be removed once we get | |
370 // rid of pending updates. | |
371 decrypted_data.id = data.id; | |
372 decrypted_data.client_tag_hash = data.client_tag_hash; | |
373 decrypted_data.non_unique_name = data.non_unique_name; | |
374 decrypted_data.creation_time = data.creation_time; | |
375 decrypted_data.modification_time = data.modification_time; | |
376 | |
377 UpdateResponseData decrypted_update; | |
378 decrypted_update.entity = decrypted_data.PassToPtr(); | |
379 decrypted_update.response_version = encrypted_update.response_version; | |
380 decrypted_update.encryption_key_name = | |
381 data.specifics.encrypted().key_name(); | |
382 response_datas.push_back(decrypted_update); | |
383 | |
384 it->second->ClearEncryptedUpdate(); | |
385 } | |
386 } | |
387 } | |
388 } | |
389 | |
390 if (new_encryption_key || response_datas.size() > 0) { | |
391 DVLOG(1) << ModelTypeToString(type_) << ": " | |
392 << base::StringPrintf("Delivering encryption key and %" PRIuS | |
393 " decrypted updates.", | |
394 response_datas.size()); | |
395 model_type_processor_->OnUpdateReceived(data_type_state_, response_datas); | |
396 } | |
397 } | |
398 | |
399 bool ModelTypeWorker::DecryptSpecifics(Cryptographer* cryptographer, | |
400 const sync_pb::EntitySpecifics& in, | |
401 sync_pb::EntitySpecifics* out) { | |
402 DCHECK(in.has_encrypted()); | |
403 DCHECK(cryptographer->CanDecrypt(in.encrypted())); | |
404 | |
405 std::string plaintext; | |
406 plaintext = cryptographer->DecryptToString(in.encrypted()); | |
407 if (plaintext.empty()) { | |
408 LOG(ERROR) << "Failed to decrypt a decryptable entity"; | |
409 return false; | |
410 } | |
411 if (!out->ParseFromString(plaintext)) { | |
412 LOG(ERROR) << "Failed to parse decrypted entity"; | |
413 return false; | |
414 } | |
415 return true; | |
416 } | |
417 | |
418 WorkerEntityTracker* ModelTypeWorker::GetEntityTracker( | |
419 const std::string& tag_hash) { | |
420 auto it = entities_.find(tag_hash); | |
421 return it != entities_.end() ? it->second.get() : nullptr; | |
422 } | |
423 | |
424 WorkerEntityTracker* ModelTypeWorker::CreateEntityTracker( | |
425 const EntityData& data) { | |
426 DCHECK(entities_.find(data.client_tag_hash) == entities_.end()); | |
427 std::unique_ptr<WorkerEntityTracker> entity = | |
428 base::WrapUnique(new WorkerEntityTracker(data.id, data.client_tag_hash)); | |
429 WorkerEntityTracker* entity_ptr = entity.get(); | |
430 entities_[data.client_tag_hash] = std::move(entity); | |
431 return entity_ptr; | |
432 } | |
433 | |
434 WorkerEntityTracker* ModelTypeWorker::GetOrCreateEntityTracker( | |
435 const EntityData& data) { | |
436 WorkerEntityTracker* entity = GetEntityTracker(data.client_tag_hash); | |
437 return entity ? entity : CreateEntityTracker(data); | |
438 } | |
439 | |
440 } // namespace syncer_v2 | |
OLD | NEW |