| OLD | NEW |
| (Empty) |
| 1 // Copyright 2014 The Chromium Authors. All rights reserved. | |
| 2 // Use of this source code is governed by a BSD-style license that can be | |
| 3 // found in the LICENSE file. | |
| 4 | |
| 5 #include "sync/engine/model_type_worker.h" | |
| 6 | |
| 7 #include <stddef.h> | |
| 8 #include <stdint.h> | |
| 9 | |
| 10 #include <utility> | |
| 11 #include <vector> | |
| 12 | |
| 13 #include "base/bind.h" | |
| 14 #include "base/format_macros.h" | |
| 15 #include "base/guid.h" | |
| 16 #include "base/logging.h" | |
| 17 #include "base/memory/ptr_util.h" | |
| 18 #include "base/strings/stringprintf.h" | |
| 19 #include "sync/engine/commit_contribution.h" | |
| 20 #include "sync/engine/non_blocking_type_commit_contribution.h" | |
| 21 #include "sync/engine/worker_entity_tracker.h" | |
| 22 #include "sync/internal_api/public/model_type_processor.h" | |
| 23 #include "sync/syncable/syncable_util.h" | |
| 24 #include "sync/util/cryptographer.h" | |
| 25 #include "sync/util/time.h" | |
| 26 | |
| 27 namespace syncer_v2 { | |
| 28 | |
| 29 using syncer::CommitContribution; | |
| 30 using syncer::Cryptographer; | |
| 31 using syncer::ModelType; | |
| 32 using syncer::NudgeHandler; | |
| 33 using syncer::SyncerError; | |
| 34 | |
| 35 ModelTypeWorker::ModelTypeWorker( | |
| 36 ModelType type, | |
| 37 const sync_pb::DataTypeState& initial_state, | |
| 38 std::unique_ptr<Cryptographer> cryptographer, | |
| 39 NudgeHandler* nudge_handler, | |
| 40 std::unique_ptr<ModelTypeProcessor> model_type_processor) | |
| 41 : type_(type), | |
| 42 data_type_state_(initial_state), | |
| 43 model_type_processor_(std::move(model_type_processor)), | |
| 44 cryptographer_(std::move(cryptographer)), | |
| 45 nudge_handler_(nudge_handler), | |
| 46 weak_ptr_factory_(this) { | |
| 47 DCHECK(model_type_processor_); | |
| 48 | |
| 49 // Request an initial sync if it hasn't been completed yet. | |
| 50 if (!data_type_state_.initial_sync_done()) { | |
| 51 nudge_handler_->NudgeForInitialDownload(type_); | |
| 52 } | |
| 53 | |
| 54 if (cryptographer_) { | |
| 55 DVLOG(1) << ModelTypeToString(type_) << ": Starting with encryption key " | |
| 56 << cryptographer_->GetDefaultNigoriKeyName(); | |
| 57 OnCryptographerUpdated(); | |
| 58 } | |
| 59 } | |
| 60 | |
| 61 ModelTypeWorker::~ModelTypeWorker() { | |
| 62 model_type_processor_->DisconnectSync(); | |
| 63 } | |
| 64 | |
| 65 ModelType ModelTypeWorker::GetModelType() const { | |
| 66 DCHECK(CalledOnValidThread()); | |
| 67 return type_; | |
| 68 } | |
| 69 | |
| 70 bool ModelTypeWorker::IsEncryptionRequired() const { | |
| 71 return !!cryptographer_; | |
| 72 } | |
| 73 | |
| 74 void ModelTypeWorker::UpdateCryptographer( | |
| 75 std::unique_ptr<Cryptographer> cryptographer) { | |
| 76 DCHECK(cryptographer); | |
| 77 cryptographer_ = std::move(cryptographer); | |
| 78 | |
| 79 // Update our state and that of the proxy. | |
| 80 OnCryptographerUpdated(); | |
| 81 | |
| 82 // Nudge the scheduler if we're now allowed to commit. | |
| 83 if (CanCommitItems()) | |
| 84 nudge_handler_->NudgeForCommit(type_); | |
| 85 } | |
| 86 | |
| 87 // UpdateHandler implementation. | |
| 88 bool ModelTypeWorker::IsInitialSyncEnded() const { | |
| 89 return data_type_state_.initial_sync_done(); | |
| 90 } | |
| 91 | |
| 92 void ModelTypeWorker::GetDownloadProgress( | |
| 93 sync_pb::DataTypeProgressMarker* progress_marker) const { | |
| 94 DCHECK(CalledOnValidThread()); | |
| 95 progress_marker->CopyFrom(data_type_state_.progress_marker()); | |
| 96 } | |
| 97 | |
| 98 void ModelTypeWorker::GetDataTypeContext( | |
| 99 sync_pb::DataTypeContext* context) const { | |
| 100 DCHECK(CalledOnValidThread()); | |
| 101 context->CopyFrom(data_type_state_.type_context()); | |
| 102 } | |
| 103 | |
| 104 SyncerError ModelTypeWorker::ProcessGetUpdatesResponse( | |
| 105 const sync_pb::DataTypeProgressMarker& progress_marker, | |
| 106 const sync_pb::DataTypeContext& mutated_context, | |
| 107 const SyncEntityList& applicable_updates, | |
| 108 syncer::sessions::StatusController* status) { | |
| 109 DCHECK(CalledOnValidThread()); | |
| 110 | |
| 111 // TODO(rlarocque): Handle data type context conflicts. | |
| 112 *data_type_state_.mutable_type_context() = mutated_context; | |
| 113 *data_type_state_.mutable_progress_marker() = progress_marker; | |
| 114 | |
| 115 for (const sync_pb::SyncEntity* update_entity : applicable_updates) { | |
| 116 // Skip updates for permanent folders. | |
| 117 // TODO(stanisc): crbug.com/516866: might need to handle this for | |
| 118 // hierarchical datatypes. | |
| 119 if (!update_entity->server_defined_unique_tag().empty()) | |
| 120 continue; | |
| 121 | |
| 122 // Normal updates are handled here. | |
| 123 const std::string& client_tag_hash = | |
| 124 update_entity->client_defined_unique_tag(); | |
| 125 | |
| 126 // TODO(stanisc): crbug.com/516866: this wouldn't be true for bookmarks. | |
| 127 DCHECK(!client_tag_hash.empty()); | |
| 128 | |
| 129 // Prepare the message for the model thread. | |
| 130 EntityData data; | |
| 131 data.id = update_entity->id_string(); | |
| 132 data.client_tag_hash = client_tag_hash; | |
| 133 data.creation_time = syncer::ProtoTimeToTime(update_entity->ctime()); | |
| 134 data.modification_time = syncer::ProtoTimeToTime(update_entity->mtime()); | |
| 135 data.non_unique_name = update_entity->name(); | |
| 136 | |
| 137 UpdateResponseData response_data; | |
| 138 response_data.response_version = update_entity->version(); | |
| 139 | |
| 140 WorkerEntityTracker* entity = GetOrCreateEntityTracker(data); | |
| 141 | |
| 142 // Check if specifics are encrypted and try to decrypt if so. | |
| 143 const sync_pb::EntitySpecifics& specifics = update_entity->specifics(); | |
| 144 if (!specifics.has_encrypted()) { | |
| 145 // No encryption. | |
| 146 entity->ReceiveUpdate(update_entity->version()); | |
| 147 data.specifics = specifics; | |
| 148 response_data.entity = data.PassToPtr(); | |
| 149 pending_updates_.push_back(response_data); | |
| 150 } else if (specifics.has_encrypted() && cryptographer_ && | |
| 151 cryptographer_->CanDecrypt(specifics.encrypted())) { | |
| 152 // Encrypted, but we know the key. | |
| 153 if (DecryptSpecifics(cryptographer_.get(), specifics, &data.specifics)) { | |
| 154 entity->ReceiveUpdate(update_entity->version()); | |
| 155 response_data.entity = data.PassToPtr(); | |
| 156 response_data.encryption_key_name = specifics.encrypted().key_name(); | |
| 157 pending_updates_.push_back(response_data); | |
| 158 } | |
| 159 } else if (specifics.has_encrypted() && | |
| 160 (!cryptographer_ || | |
| 161 !cryptographer_->CanDecrypt(specifics.encrypted()))) { | |
| 162 // Can't decrypt right now. Ask the entity tracker to handle it. | |
| 163 data.specifics = specifics; | |
| 164 response_data.entity = data.PassToPtr(); | |
| 165 entity->ReceiveEncryptedUpdate(response_data); | |
| 166 } | |
| 167 } | |
| 168 | |
| 169 return syncer::SYNCER_OK; | |
| 170 } | |
| 171 | |
| 172 void ModelTypeWorker::ApplyUpdates(syncer::sessions::StatusController* status) { | |
| 173 DCHECK(CalledOnValidThread()); | |
| 174 // This should only ever be called after one PassiveApplyUpdates. | |
| 175 DCHECK(data_type_state_.initial_sync_done()); | |
| 176 // Download cycle is done, pass all updates to the processor. | |
| 177 ApplyPendingUpdates(); | |
| 178 } | |
| 179 | |
| 180 void ModelTypeWorker::PassiveApplyUpdates( | |
| 181 syncer::sessions::StatusController* status) { | |
| 182 // This should only be called at the end of the very first download cycle. | |
| 183 DCHECK(!data_type_state_.initial_sync_done()); | |
| 184 // Indicate to the processor that the initial download is done. The initial | |
| 185 // sync technically isn't done yet but by the time this value is persisted to | |
| 186 // disk on the model thread it will be. | |
| 187 data_type_state_.set_initial_sync_done(true); | |
| 188 ApplyPendingUpdates(); | |
| 189 } | |
| 190 | |
| 191 void ModelTypeWorker::ApplyPendingUpdates() { | |
| 192 DVLOG(1) << ModelTypeToString(type_) << ": " | |
| 193 << base::StringPrintf("Delivering %" PRIuS " applicable updates.", | |
| 194 pending_updates_.size()); | |
| 195 model_type_processor_->OnUpdateReceived(data_type_state_, pending_updates_); | |
| 196 pending_updates_.clear(); | |
| 197 } | |
| 198 | |
| 199 void ModelTypeWorker::EnqueueForCommit(const CommitRequestDataList& list) { | |
| 200 DCHECK(CalledOnValidThread()); | |
| 201 | |
| 202 DCHECK(IsTypeInitialized()) | |
| 203 << "Asked to commit items before type was initialized. " | |
| 204 << "ModelType is: " << ModelTypeToString(type_); | |
| 205 | |
| 206 for (const CommitRequestData& commit : list) { | |
| 207 const EntityData& data = commit.entity.value(); | |
| 208 if (!data.is_deleted()) { | |
| 209 DCHECK_EQ(type_, syncer::GetModelTypeFromSpecifics(data.specifics)); | |
| 210 } | |
| 211 GetOrCreateEntityTracker(data)->RequestCommit(commit); | |
| 212 } | |
| 213 | |
| 214 if (CanCommitItems()) | |
| 215 nudge_handler_->NudgeForCommit(type_); | |
| 216 } | |
| 217 | |
| 218 // CommitContributor implementation. | |
| 219 std::unique_ptr<CommitContribution> ModelTypeWorker::GetContribution( | |
| 220 size_t max_entries) { | |
| 221 DCHECK(CalledOnValidThread()); | |
| 222 // There shouldn't be a GetUpdates in progress when a commit is triggered. | |
| 223 DCHECK(pending_updates_.empty()); | |
| 224 | |
| 225 size_t space_remaining = max_entries; | |
| 226 google::protobuf::RepeatedPtrField<sync_pb::SyncEntity> commit_entities; | |
| 227 | |
| 228 if (!CanCommitItems()) | |
| 229 return std::unique_ptr<CommitContribution>(); | |
| 230 | |
| 231 // TODO(rlarocque): Avoid iterating here. | |
| 232 for (EntityMap::const_iterator it = entities_.begin(); | |
| 233 it != entities_.end() && space_remaining > 0; ++it) { | |
| 234 WorkerEntityTracker* entity = it->second.get(); | |
| 235 if (entity->HasPendingCommit()) { | |
| 236 sync_pb::SyncEntity* commit_entity = commit_entities.Add(); | |
| 237 entity->PopulateCommitProto(commit_entity); | |
| 238 AdjustCommitProto(commit_entity); | |
| 239 space_remaining--; | |
| 240 } | |
| 241 } | |
| 242 | |
| 243 if (commit_entities.size() == 0) | |
| 244 return std::unique_ptr<CommitContribution>(); | |
| 245 | |
| 246 return std::unique_ptr<CommitContribution>( | |
| 247 new NonBlockingTypeCommitContribution(data_type_state_.type_context(), | |
| 248 commit_entities, this)); | |
| 249 } | |
| 250 | |
| 251 void ModelTypeWorker::OnCommitResponse(CommitResponseDataList* response_list) { | |
| 252 for (CommitResponseData& response : *response_list) { | |
| 253 WorkerEntityTracker* entity = GetEntityTracker(response.client_tag_hash); | |
| 254 | |
| 255 // There's no way we could have committed an entry we know nothing about. | |
| 256 if (entity == nullptr) { | |
| 257 NOTREACHED() << "Received commit response for item unknown to us." | |
| 258 << " Model type: " << ModelTypeToString(type_) | |
| 259 << " ID: " << response.id; | |
| 260 continue; | |
| 261 } | |
| 262 | |
| 263 entity->ReceiveCommitResponse(&response); | |
| 264 } | |
| 265 | |
| 266 // Send the responses back to the model thread. It needs to know which | |
| 267 // items have been successfully committed so it can save that information in | |
| 268 // permanent storage. | |
| 269 model_type_processor_->OnCommitCompleted(data_type_state_, *response_list); | |
| 270 } | |
| 271 | |
| 272 base::WeakPtr<ModelTypeWorker> ModelTypeWorker::AsWeakPtr() { | |
| 273 return weak_ptr_factory_.GetWeakPtr(); | |
| 274 } | |
| 275 | |
| 276 bool ModelTypeWorker::IsTypeInitialized() const { | |
| 277 return data_type_state_.initial_sync_done() && | |
| 278 !data_type_state_.progress_marker().token().empty(); | |
| 279 } | |
| 280 | |
| 281 bool ModelTypeWorker::CanCommitItems() const { | |
| 282 // We can't commit anything until we know the type's parent node. | |
| 283 // We'll get it in the first update response. | |
| 284 if (!IsTypeInitialized()) | |
| 285 return false; | |
| 286 | |
| 287 // Don't commit if we should be encrypting but don't have the required keys. | |
| 288 if (IsEncryptionRequired() && | |
| 289 (!cryptographer_ || !cryptographer_->is_ready())) { | |
| 290 return false; | |
| 291 } | |
| 292 | |
| 293 return true; | |
| 294 } | |
| 295 | |
| 296 void ModelTypeWorker::AdjustCommitProto(sync_pb::SyncEntity* sync_entity) { | |
| 297 DCHECK(CanCommitItems()); | |
| 298 | |
| 299 // Initial commits need our help to generate a client ID. | |
| 300 if (sync_entity->version() == kUncommittedVersion) { | |
| 301 DCHECK(!sync_entity->has_id_string()); | |
| 302 // TODO(stanisc): This is incorrect for bookmarks for two reasons: | |
| 303 // 1) Won't be able to match previously committed bookmarks to the ones | |
| 304 // with server ID. | |
| 305 // 2) Recommitting an item in a case of failing to receive commit response | |
| 306 // would result in generating a different client ID, which in turn | |
| 307 // would result in a duplication. | |
| 308 // We should generate client ID on the frontend side instead. | |
| 309 sync_entity->set_id_string(base::GenerateGUID()); | |
| 310 sync_entity->set_version(0); | |
| 311 } else { | |
| 312 DCHECK(sync_entity->has_id_string()); | |
| 313 } | |
| 314 | |
| 315 // Encrypt the specifics and hide the title if necessary. | |
| 316 if (IsEncryptionRequired()) { | |
| 317 // IsEncryptionRequired() && CanCommitItems() implies | |
| 318 // that the cryptographer is valid and ready to encrypt. | |
| 319 sync_pb::EntitySpecifics encrypted_specifics; | |
| 320 bool result = cryptographer_->Encrypt( | |
| 321 sync_entity->specifics(), encrypted_specifics.mutable_encrypted()); | |
| 322 DCHECK(result); | |
| 323 sync_entity->mutable_specifics()->CopyFrom(encrypted_specifics); | |
| 324 sync_entity->set_name("encrypted"); | |
| 325 } | |
| 326 | |
| 327 // Always include enough specifics to identify the type. Do this even in | |
| 328 // deletion requests, where the specifics are otherwise invalid. | |
| 329 AddDefaultFieldValue(type_, sync_entity->mutable_specifics()); | |
| 330 | |
| 331 // TODO(stanisc): crbug.com/516866: | |
| 332 // Call sync_entity->set_parent_id_string(...) for hierarchical entities here. | |
| 333 } | |
| 334 | |
| 335 void ModelTypeWorker::OnCryptographerUpdated() { | |
| 336 DCHECK(cryptographer_); | |
| 337 | |
| 338 bool new_encryption_key = false; | |
| 339 UpdateResponseDataList response_datas; | |
| 340 | |
| 341 const std::string& new_key_name = cryptographer_->GetDefaultNigoriKeyName(); | |
| 342 | |
| 343 // Handle a change in encryption key. | |
| 344 if (data_type_state_.encryption_key_name() != new_key_name) { | |
| 345 DVLOG(1) << ModelTypeToString(type_) << ": Updating encryption key " | |
| 346 << data_type_state_.encryption_key_name() << " -> " | |
| 347 << new_key_name; | |
| 348 data_type_state_.set_encryption_key_name(new_key_name); | |
| 349 new_encryption_key = true; | |
| 350 } | |
| 351 | |
| 352 for (EntityMap::const_iterator it = entities_.begin(); it != entities_.end(); | |
| 353 ++it) { | |
| 354 if (it->second->HasEncryptedUpdate()) { | |
| 355 const UpdateResponseData& encrypted_update = | |
| 356 it->second->GetEncryptedUpdate(); | |
| 357 const EntityData& data = encrypted_update.entity.value(); | |
| 358 | |
| 359 // We assume all pending updates are encrypted items for which we | |
| 360 // don't have the key. | |
| 361 DCHECK(data.specifics.has_encrypted()); | |
| 362 | |
| 363 if (cryptographer_->CanDecrypt(data.specifics.encrypted())) { | |
| 364 EntityData decrypted_data; | |
| 365 if (DecryptSpecifics(cryptographer_.get(), data.specifics, | |
| 366 &decrypted_data.specifics)) { | |
| 367 // Copy other fields one by one since EntityData doesn't allow | |
| 368 // copying. | |
| 369 // TODO(stanisc): this code is likely to be removed once we get | |
| 370 // rid of pending updates. | |
| 371 decrypted_data.id = data.id; | |
| 372 decrypted_data.client_tag_hash = data.client_tag_hash; | |
| 373 decrypted_data.non_unique_name = data.non_unique_name; | |
| 374 decrypted_data.creation_time = data.creation_time; | |
| 375 decrypted_data.modification_time = data.modification_time; | |
| 376 | |
| 377 UpdateResponseData decrypted_update; | |
| 378 decrypted_update.entity = decrypted_data.PassToPtr(); | |
| 379 decrypted_update.response_version = encrypted_update.response_version; | |
| 380 decrypted_update.encryption_key_name = | |
| 381 data.specifics.encrypted().key_name(); | |
| 382 response_datas.push_back(decrypted_update); | |
| 383 | |
| 384 it->second->ClearEncryptedUpdate(); | |
| 385 } | |
| 386 } | |
| 387 } | |
| 388 } | |
| 389 | |
| 390 if (new_encryption_key || response_datas.size() > 0) { | |
| 391 DVLOG(1) << ModelTypeToString(type_) << ": " | |
| 392 << base::StringPrintf("Delivering encryption key and %" PRIuS | |
| 393 " decrypted updates.", | |
| 394 response_datas.size()); | |
| 395 model_type_processor_->OnUpdateReceived(data_type_state_, response_datas); | |
| 396 } | |
| 397 } | |
| 398 | |
| 399 bool ModelTypeWorker::DecryptSpecifics(Cryptographer* cryptographer, | |
| 400 const sync_pb::EntitySpecifics& in, | |
| 401 sync_pb::EntitySpecifics* out) { | |
| 402 DCHECK(in.has_encrypted()); | |
| 403 DCHECK(cryptographer->CanDecrypt(in.encrypted())); | |
| 404 | |
| 405 std::string plaintext; | |
| 406 plaintext = cryptographer->DecryptToString(in.encrypted()); | |
| 407 if (plaintext.empty()) { | |
| 408 LOG(ERROR) << "Failed to decrypt a decryptable entity"; | |
| 409 return false; | |
| 410 } | |
| 411 if (!out->ParseFromString(plaintext)) { | |
| 412 LOG(ERROR) << "Failed to parse decrypted entity"; | |
| 413 return false; | |
| 414 } | |
| 415 return true; | |
| 416 } | |
| 417 | |
| 418 WorkerEntityTracker* ModelTypeWorker::GetEntityTracker( | |
| 419 const std::string& tag_hash) { | |
| 420 auto it = entities_.find(tag_hash); | |
| 421 return it != entities_.end() ? it->second.get() : nullptr; | |
| 422 } | |
| 423 | |
| 424 WorkerEntityTracker* ModelTypeWorker::CreateEntityTracker( | |
| 425 const EntityData& data) { | |
| 426 DCHECK(entities_.find(data.client_tag_hash) == entities_.end()); | |
| 427 std::unique_ptr<WorkerEntityTracker> entity = | |
| 428 base::WrapUnique(new WorkerEntityTracker(data.id, data.client_tag_hash)); | |
| 429 WorkerEntityTracker* entity_ptr = entity.get(); | |
| 430 entities_[data.client_tag_hash] = std::move(entity); | |
| 431 return entity_ptr; | |
| 432 } | |
| 433 | |
| 434 WorkerEntityTracker* ModelTypeWorker::GetOrCreateEntityTracker( | |
| 435 const EntityData& data) { | |
| 436 WorkerEntityTracker* entity = GetEntityTracker(data.client_tag_hash); | |
| 437 return entity ? entity : CreateEntityTracker(data); | |
| 438 } | |
| 439 | |
| 440 } // namespace syncer_v2 | |
| OLD | NEW |