OLD | NEW |
| (Empty) |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 #include "components/sync_driver/device_info_service.h" | |
6 | |
7 #include <set> | |
8 #include <utility> | |
9 #include <vector> | |
10 | |
11 #include "base/bind.h" | |
12 #include "base/location.h" | |
13 #include "base/memory/ptr_util.h" | |
14 #include "base/strings/string_util.h" | |
15 #include "components/sync/api/entity_change.h" | |
16 #include "components/sync/api/metadata_batch.h" | |
17 #include "components/sync/api/sync_error.h" | |
18 #include "components/sync/base/time.h" | |
19 #include "components/sync/core/data_batch_impl.h" | |
20 #include "components/sync/core/simple_metadata_change_list.h" | |
21 #include "components/sync/protocol/data_type_state.pb.h" | |
22 #include "components/sync/protocol/sync.pb.h" | |
23 #include "components/sync_driver/device_info_util.h" | |
24 | |
25 namespace sync_driver_v2 { | |
26 | |
27 using base::Time; | |
28 using base::TimeDelta; | |
29 using syncer::SyncError; | |
30 using syncer_v2::DataBatchImpl; | |
31 using syncer_v2::EntityChange; | |
32 using syncer_v2::EntityChangeList; | |
33 using syncer_v2::EntityData; | |
34 using syncer_v2::EntityDataMap; | |
35 using syncer_v2::MetadataBatch; | |
36 using syncer_v2::MetadataChangeList; | |
37 using syncer_v2::ModelTypeStore; | |
38 using syncer_v2::SimpleMetadataChangeList; | |
39 using sync_driver::DeviceInfo; | |
40 using sync_driver::DeviceInfoUtil; | |
41 using sync_pb::DataTypeState; | |
42 using sync_pb::DeviceInfoSpecifics; | |
43 using sync_pb::EntitySpecifics; | |
44 | |
45 using Record = ModelTypeStore::Record; | |
46 using RecordList = ModelTypeStore::RecordList; | |
47 using Result = ModelTypeStore::Result; | |
48 using WriteBatch = ModelTypeStore::WriteBatch; | |
49 | |
50 DeviceInfoService::DeviceInfoService( | |
51 sync_driver::LocalDeviceInfoProvider* local_device_info_provider, | |
52 const StoreFactoryFunction& callback, | |
53 const ChangeProcessorFactory& change_processor_factory) | |
54 : ModelTypeService(change_processor_factory, syncer::DEVICE_INFO), | |
55 local_device_info_provider_(local_device_info_provider), | |
56 weak_factory_(this) { | |
57 DCHECK(local_device_info_provider); | |
58 | |
59 // This is not threadsafe, but presuably the provider initializes on the same | |
60 // thread as us so we're okay. | |
61 if (local_device_info_provider->GetLocalDeviceInfo()) { | |
62 OnProviderInitialized(); | |
63 } else { | |
64 subscription_ = | |
65 local_device_info_provider->RegisterOnInitializedCallback(base::Bind( | |
66 &DeviceInfoService::OnProviderInitialized, base::Unretained(this))); | |
67 } | |
68 | |
69 callback.Run(base::Bind(&DeviceInfoService::OnStoreCreated, | |
70 weak_factory_.GetWeakPtr())); | |
71 } | |
72 | |
73 DeviceInfoService::~DeviceInfoService() {} | |
74 | |
75 std::unique_ptr<MetadataChangeList> | |
76 DeviceInfoService::CreateMetadataChangeList() { | |
77 return base::WrapUnique(new SimpleMetadataChangeList()); | |
78 } | |
79 | |
80 SyncError DeviceInfoService::MergeSyncData( | |
81 std::unique_ptr<MetadataChangeList> metadata_change_list, | |
82 EntityDataMap entity_data_map) { | |
83 DCHECK(has_provider_initialized_); | |
84 DCHECK(has_metadata_loaded_); | |
85 DCHECK(change_processor()); | |
86 | |
87 // Local data should typically be near empty, with the only possible value | |
88 // corresponding to this device. This is because on signout all device info | |
89 // data is blown away. However, this simplification is being ignored here and | |
90 // a full difference is going to be calculated to explore what other service | |
91 // implementations may look like. | |
92 std::set<std::string> local_guids_to_put; | |
93 for (const auto& kv : all_data_) { | |
94 local_guids_to_put.insert(kv.first); | |
95 } | |
96 | |
97 bool has_changes = false; | |
98 const DeviceInfo* local_info = | |
99 local_device_info_provider_->GetLocalDeviceInfo(); | |
100 std::string local_guid = local_info->guid(); | |
101 std::unique_ptr<WriteBatch> batch = store_->CreateWriteBatch(); | |
102 for (const auto& kv : entity_data_map) { | |
103 const DeviceInfoSpecifics& specifics = | |
104 kv.second.value().specifics.device_info(); | |
105 DCHECK_EQ(kv.first, specifics.cache_guid()); | |
106 if (specifics.cache_guid() == local_guid) { | |
107 // Don't Put local data if it's the same as the remote copy. | |
108 if (local_info->Equals(*CopyToModel(specifics))) { | |
109 local_guids_to_put.erase(local_guid); | |
110 } else { | |
111 // This device is valid right now and this entry is about to be | |
112 // committed, use this as an opportunity to refresh the timestamp. | |
113 all_data_[local_guid]->set_last_updated_timestamp( | |
114 syncer::TimeToProtoTime(Time::Now())); | |
115 } | |
116 } else { | |
117 // Remote data wins conflicts. | |
118 local_guids_to_put.erase(specifics.cache_guid()); | |
119 has_changes = true; | |
120 StoreSpecifics(base::WrapUnique(new DeviceInfoSpecifics(specifics)), | |
121 batch.get()); | |
122 } | |
123 } | |
124 | |
125 for (const std::string& guid : local_guids_to_put) { | |
126 change_processor()->Put(guid, CopyToEntityData(*all_data_[guid]), | |
127 metadata_change_list.get()); | |
128 } | |
129 | |
130 CommitAndNotify(std::move(batch), std::move(metadata_change_list), | |
131 has_changes); | |
132 return SyncError(); | |
133 } | |
134 | |
135 SyncError DeviceInfoService::ApplySyncChanges( | |
136 std::unique_ptr<MetadataChangeList> metadata_change_list, | |
137 EntityChangeList entity_changes) { | |
138 DCHECK(has_provider_initialized_); | |
139 DCHECK(has_metadata_loaded_); | |
140 | |
141 std::unique_ptr<WriteBatch> batch = store_->CreateWriteBatch(); | |
142 bool has_changes = false; | |
143 for (EntityChange& change : entity_changes) { | |
144 const std::string guid = change.storage_key(); | |
145 // Each device is the authoritative source for itself, ignore any remote | |
146 // changes that have our local cache guid. | |
147 if (guid == local_device_info_provider_->GetLocalDeviceInfo()->guid()) { | |
148 continue; | |
149 } | |
150 | |
151 if (change.type() == EntityChange::ACTION_DELETE) { | |
152 has_changes |= DeleteSpecifics(guid, batch.get()); | |
153 } else { | |
154 const DeviceInfoSpecifics& specifics = | |
155 change.data().specifics.device_info(); | |
156 DCHECK(guid == specifics.cache_guid()); | |
157 StoreSpecifics(base::WrapUnique(new DeviceInfoSpecifics(specifics)), | |
158 batch.get()); | |
159 has_changes = true; | |
160 } | |
161 } | |
162 | |
163 CommitAndNotify(std::move(batch), std::move(metadata_change_list), | |
164 has_changes); | |
165 return SyncError(); | |
166 } | |
167 | |
168 void DeviceInfoService::GetData(StorageKeyList storage_keys, | |
169 DataCallback callback) { | |
170 DCHECK(has_metadata_loaded_); | |
171 | |
172 std::unique_ptr<DataBatchImpl> batch(new DataBatchImpl()); | |
173 for (const auto& key : storage_keys) { | |
174 const auto& iter = all_data_.find(key); | |
175 if (iter != all_data_.end()) { | |
176 DCHECK_EQ(key, iter->second->cache_guid()); | |
177 batch->Put(key, CopyToEntityData(*iter->second)); | |
178 } | |
179 } | |
180 | |
181 callback.Run(SyncError(), std::move(batch)); | |
182 } | |
183 | |
184 void DeviceInfoService::GetAllData(DataCallback callback) { | |
185 DCHECK(has_metadata_loaded_); | |
186 | |
187 std::unique_ptr<DataBatchImpl> batch(new DataBatchImpl()); | |
188 for (const auto& kv : all_data_) { | |
189 batch->Put(kv.first, CopyToEntityData(*kv.second)); | |
190 } | |
191 | |
192 callback.Run(SyncError(), std::move(batch)); | |
193 } | |
194 | |
195 std::string DeviceInfoService::GetClientTag(const EntityData& entity_data) { | |
196 DCHECK(entity_data.specifics.has_device_info()); | |
197 return DeviceInfoUtil::SpecificsToTag(entity_data.specifics.device_info()); | |
198 } | |
199 | |
200 std::string DeviceInfoService::GetStorageKey( | |
201 const syncer_v2::EntityData& entity_data) { | |
202 DCHECK(entity_data.specifics.has_device_info()); | |
203 return entity_data.specifics.device_info().cache_guid(); | |
204 } | |
205 | |
206 void DeviceInfoService::OnChangeProcessorSet() { | |
207 // We've recieved a new processor that needs metadata. If we're still in the | |
208 // process of loading data and/or metadata, then |has_metadata_loaded_| is | |
209 // false and we'll wait for those async reads to happen. If we've already | |
210 // loaded metadata and then subsequently we get a new processor, we must not | |
211 // have created the processor ourselves because we had no metadata. So there | |
212 // must not be any metadata on disk. | |
213 if (has_metadata_loaded_) { | |
214 change_processor()->OnMetadataLoaded(SyncError(), | |
215 base::WrapUnique(new MetadataBatch())); | |
216 ReconcileLocalAndStored(); | |
217 } | |
218 } | |
219 | |
220 bool DeviceInfoService::IsSyncing() const { | |
221 return !all_data_.empty(); | |
222 } | |
223 | |
224 std::unique_ptr<DeviceInfo> DeviceInfoService::GetDeviceInfo( | |
225 const std::string& client_id) const { | |
226 const ClientIdToSpecifics::const_iterator iter = all_data_.find(client_id); | |
227 if (iter == all_data_.end()) { | |
228 return std::unique_ptr<DeviceInfo>(); | |
229 } | |
230 | |
231 return CopyToModel(*iter->second); | |
232 } | |
233 | |
234 ScopedVector<DeviceInfo> DeviceInfoService::GetAllDeviceInfo() const { | |
235 ScopedVector<DeviceInfo> list; | |
236 | |
237 for (ClientIdToSpecifics::const_iterator iter = all_data_.begin(); | |
238 iter != all_data_.end(); ++iter) { | |
239 list.push_back(CopyToModel(*iter->second)); | |
240 } | |
241 | |
242 return list; | |
243 } | |
244 | |
245 void DeviceInfoService::AddObserver(Observer* observer) { | |
246 observers_.AddObserver(observer); | |
247 } | |
248 | |
249 void DeviceInfoService::RemoveObserver(Observer* observer) { | |
250 observers_.RemoveObserver(observer); | |
251 } | |
252 | |
253 int DeviceInfoService::CountActiveDevices() const { | |
254 return CountActiveDevices(Time::Now()); | |
255 } | |
256 | |
257 void DeviceInfoService::NotifyObservers() { | |
258 FOR_EACH_OBSERVER(Observer, observers_, OnDeviceInfoChange()); | |
259 } | |
260 | |
261 // Static. | |
262 std::unique_ptr<DeviceInfoSpecifics> DeviceInfoService::CopyToSpecifics( | |
263 const DeviceInfo& info) { | |
264 std::unique_ptr<DeviceInfoSpecifics> specifics = | |
265 base::WrapUnique(new DeviceInfoSpecifics); | |
266 specifics->set_cache_guid(info.guid()); | |
267 specifics->set_client_name(info.client_name()); | |
268 specifics->set_chrome_version(info.chrome_version()); | |
269 specifics->set_sync_user_agent(info.sync_user_agent()); | |
270 specifics->set_device_type(info.device_type()); | |
271 specifics->set_signin_scoped_device_id(info.signin_scoped_device_id()); | |
272 return specifics; | |
273 } | |
274 | |
275 // Static. | |
276 std::unique_ptr<DeviceInfo> DeviceInfoService::CopyToModel( | |
277 const DeviceInfoSpecifics& specifics) { | |
278 return base::WrapUnique(new DeviceInfo( | |
279 specifics.cache_guid(), specifics.client_name(), | |
280 specifics.chrome_version(), specifics.sync_user_agent(), | |
281 specifics.device_type(), specifics.signin_scoped_device_id())); | |
282 } | |
283 | |
284 // Static. | |
285 std::unique_ptr<EntityData> DeviceInfoService::CopyToEntityData( | |
286 const DeviceInfoSpecifics& specifics) { | |
287 std::unique_ptr<EntityData> entity_data(new EntityData()); | |
288 *entity_data->specifics.mutable_device_info() = specifics; | |
289 entity_data->non_unique_name = specifics.client_name(); | |
290 return entity_data; | |
291 } | |
292 | |
293 void DeviceInfoService::StoreSpecifics( | |
294 std::unique_ptr<DeviceInfoSpecifics> specifics, | |
295 WriteBatch* batch) { | |
296 const std::string guid = specifics->cache_guid(); | |
297 DVLOG(1) << "Storing DEVICE_INFO for " << specifics->client_name() | |
298 << " with ID " << guid; | |
299 store_->WriteData(batch, guid, specifics->SerializeAsString()); | |
300 all_data_[guid] = std::move(specifics); | |
301 } | |
302 | |
303 bool DeviceInfoService::DeleteSpecifics(const std::string& guid, | |
304 WriteBatch* batch) { | |
305 ClientIdToSpecifics::const_iterator iter = all_data_.find(guid); | |
306 if (iter != all_data_.end()) { | |
307 DVLOG(1) << "Deleting DEVICE_INFO for " << iter->second->client_name() | |
308 << " with ID " << guid; | |
309 store_->DeleteData(batch, guid); | |
310 all_data_.erase(iter); | |
311 return true; | |
312 } else { | |
313 return false; | |
314 } | |
315 } | |
316 | |
317 void DeviceInfoService::OnProviderInitialized() { | |
318 has_provider_initialized_ = true; | |
319 LoadMetadataIfReady(); | |
320 } | |
321 | |
322 void DeviceInfoService::OnStoreCreated(Result result, | |
323 std::unique_ptr<ModelTypeStore> store) { | |
324 if (result == Result::SUCCESS) { | |
325 std::swap(store_, store); | |
326 store_->ReadAllData(base::Bind(&DeviceInfoService::OnReadAllData, | |
327 weak_factory_.GetWeakPtr())); | |
328 } else { | |
329 ReportStartupErrorToSync("ModelTypeStore creation failed."); | |
330 // TODO(skym, crbug.com/582460): Handle unrecoverable initialization | |
331 // failure. | |
332 } | |
333 } | |
334 | |
335 void DeviceInfoService::OnReadAllData(Result result, | |
336 std::unique_ptr<RecordList> record_list) { | |
337 if (result != Result::SUCCESS) { | |
338 ReportStartupErrorToSync("Initial load of data failed."); | |
339 // TODO(skym, crbug.com/582460): Handle unrecoverable initialization | |
340 // failure. | |
341 return; | |
342 } | |
343 | |
344 for (const Record& r : *record_list.get()) { | |
345 std::unique_ptr<DeviceInfoSpecifics> specifics( | |
346 base::WrapUnique(new DeviceInfoSpecifics())); | |
347 if (specifics->ParseFromString(r.value)) { | |
348 all_data_[specifics->cache_guid()] = std::move(specifics); | |
349 } else { | |
350 ReportStartupErrorToSync("Failed to deserialize specifics."); | |
351 // TODO(skym, crbug.com/582460): Handle unrecoverable initialization | |
352 // failure. | |
353 } | |
354 } | |
355 | |
356 has_data_loaded_ = true; | |
357 LoadMetadataIfReady(); | |
358 } | |
359 | |
360 void DeviceInfoService::LoadMetadataIfReady() { | |
361 if (has_data_loaded_ && has_provider_initialized_) { | |
362 store_->ReadAllMetadata(base::Bind(&DeviceInfoService::OnReadAllMetadata, | |
363 weak_factory_.GetWeakPtr())); | |
364 } | |
365 } | |
366 | |
367 void DeviceInfoService::OnReadAllMetadata( | |
368 Result result, | |
369 std::unique_ptr<RecordList> metadata_records, | |
370 const std::string& global_metadata) { | |
371 DCHECK(!has_metadata_loaded_); | |
372 | |
373 if (result != Result::SUCCESS) { | |
374 // Store has encountered some serious error. We should still be able to | |
375 // continue as a read only service, since if we got this far we must have | |
376 // loaded all data out succesfully. | |
377 ReportStartupErrorToSync("Load of metadata completely failed."); | |
378 return; | |
379 } | |
380 | |
381 // If we have no metadata then we don't want to create a processor. The idea | |
382 // is that by not having a processor, the services will suffer less of a | |
383 // performance hit. This isn't terribly applicable for this model type, but | |
384 // we want this class to be as similar to other services as possible so follow | |
385 // the convention. | |
386 if (metadata_records->size() > 0 || !global_metadata.empty()) { | |
387 CreateChangeProcessor(); | |
388 } | |
389 | |
390 // Set this after OnChangeProcessorSet so that we can correctly avoid giving | |
391 // the processor empty metadata. We always want to set |has_metadata_loaded_| | |
392 // at this point so that we'll know to give a processor empty metadata if it | |
393 // is created later. | |
394 has_metadata_loaded_ = true; | |
395 | |
396 if (!change_processor()) { | |
397 // This means we haven't been told to start syncing and we don't have any | |
398 // local metadata. | |
399 return; | |
400 } | |
401 | |
402 std::unique_ptr<MetadataBatch> batch(new MetadataBatch()); | |
403 DataTypeState state; | |
404 if (state.ParseFromString(global_metadata)) { | |
405 batch->SetDataTypeState(state); | |
406 } else { | |
407 // TODO(skym): How bad is this scenario? We may be able to just give an | |
408 // empty batch to the processor and we'll treat corrupted data type state | |
409 // as no data type state at all. The question is do we want to add any of | |
410 // the entity metadata to the batch or completely skip that step? We're | |
411 // going to have to perform a merge shortly. Does this decision/logic even | |
412 // belong in this service? | |
413 change_processor()->OnMetadataLoaded( | |
414 change_processor()->CreateAndUploadError( | |
415 FROM_HERE, "Failed to deserialize global metadata."), | |
416 nullptr); | |
417 } | |
418 for (const Record& r : *metadata_records.get()) { | |
419 sync_pb::EntityMetadata entity_metadata; | |
420 if (entity_metadata.ParseFromString(r.value)) { | |
421 batch->AddMetadata(r.id, entity_metadata); | |
422 } else { | |
423 // TODO(skym): This really isn't too bad. We just want to regenerate | |
424 // metadata for this particular entity. Unfortunately there isn't a | |
425 // convenient way to tell the processor to do this. | |
426 LOG(WARNING) << "Failed to deserialize entity metadata."; | |
427 } | |
428 } | |
429 change_processor()->OnMetadataLoaded(SyncError(), std::move(batch)); | |
430 ReconcileLocalAndStored(); | |
431 } | |
432 | |
433 void DeviceInfoService::OnCommit(Result result) { | |
434 if (result != Result::SUCCESS) { | |
435 LOG(WARNING) << "Failed a write to store."; | |
436 } | |
437 } | |
438 | |
439 void DeviceInfoService::ReconcileLocalAndStored() { | |
440 // On initial syncing we will have a change processor here, but it will not be | |
441 // tracking changes. We need to persist a copy of our local device info to | |
442 // disk, but the Put call to the processor will be ignored. That should be | |
443 // fine however, as the discrepancy will be picked up later in merge. We don't | |
444 // bother trying to track this case and act intelligently because simply not | |
445 // much of a benefit in doing so. | |
446 DCHECK(has_provider_initialized_); | |
447 DCHECK(has_metadata_loaded_); | |
448 DCHECK(change_processor()); | |
449 const DeviceInfo* current_info = | |
450 local_device_info_provider_->GetLocalDeviceInfo(); | |
451 auto iter = all_data_.find(current_info->guid()); | |
452 | |
453 // Convert to DeviceInfo for Equals function. | |
454 if (iter != all_data_.end() && | |
455 current_info->Equals(*CopyToModel(*iter->second))) { | |
456 const TimeDelta pulse_delay(DeviceInfoUtil::CalculatePulseDelay( | |
457 GetLastUpdateTime(*iter->second), Time::Now())); | |
458 if (!pulse_delay.is_zero()) { | |
459 pulse_timer_.Start(FROM_HERE, pulse_delay, | |
460 base::Bind(&DeviceInfoService::SendLocalData, | |
461 base::Unretained(this))); | |
462 return; | |
463 } | |
464 } | |
465 SendLocalData(); | |
466 } | |
467 | |
468 void DeviceInfoService::SendLocalData() { | |
469 DCHECK(has_provider_initialized_); | |
470 // TODO(skym): Handle disconnecting and reconnecting, this will currently halt | |
471 // the pulse timer and never restart it. | |
472 if (!change_processor()) { | |
473 return; | |
474 } | |
475 | |
476 std::unique_ptr<DeviceInfoSpecifics> specifics = | |
477 CopyToSpecifics(*local_device_info_provider_->GetLocalDeviceInfo()); | |
478 specifics->set_last_updated_timestamp(syncer::TimeToProtoTime(Time::Now())); | |
479 | |
480 std::unique_ptr<MetadataChangeList> metadata_change_list = | |
481 CreateMetadataChangeList(); | |
482 change_processor()->Put(specifics->cache_guid(), CopyToEntityData(*specifics), | |
483 metadata_change_list.get()); | |
484 | |
485 std::unique_ptr<WriteBatch> batch = store_->CreateWriteBatch(); | |
486 StoreSpecifics(std::move(specifics), batch.get()); | |
487 | |
488 CommitAndNotify(std::move(batch), std::move(metadata_change_list), true); | |
489 pulse_timer_.Start( | |
490 FROM_HERE, DeviceInfoUtil::kPulseInterval, | |
491 base::Bind(&DeviceInfoService::SendLocalData, base::Unretained(this))); | |
492 } | |
493 | |
494 void DeviceInfoService::CommitAndNotify( | |
495 std::unique_ptr<WriteBatch> batch, | |
496 std::unique_ptr<MetadataChangeList> metadata_change_list, | |
497 bool should_notify) { | |
498 static_cast<SimpleMetadataChangeList*>(metadata_change_list.get()) | |
499 ->TransferChanges(store_.get(), batch.get()); | |
500 store_->CommitWriteBatch( | |
501 std::move(batch), | |
502 base::Bind(&DeviceInfoService::OnCommit, weak_factory_.GetWeakPtr())); | |
503 if (should_notify) { | |
504 NotifyObservers(); | |
505 } | |
506 } | |
507 | |
508 int DeviceInfoService::CountActiveDevices(const Time now) const { | |
509 return std::count_if(all_data_.begin(), all_data_.end(), | |
510 [now](ClientIdToSpecifics::const_reference pair) { | |
511 return DeviceInfoUtil::IsActive( | |
512 GetLastUpdateTime(*pair.second), now); | |
513 }); | |
514 } | |
515 | |
516 void DeviceInfoService::ReportStartupErrorToSync(const std::string& msg) { | |
517 DCHECK(!has_metadata_loaded_); | |
518 LOG(WARNING) << msg; | |
519 | |
520 // Create a processor and give it the error in case sync tries to start. | |
521 if (!change_processor()) { | |
522 CreateChangeProcessor(); | |
523 } | |
524 change_processor()->OnMetadataLoaded( | |
525 change_processor()->CreateAndUploadError(FROM_HERE, msg), nullptr); | |
526 } | |
527 | |
528 // static | |
529 Time DeviceInfoService::GetLastUpdateTime( | |
530 const DeviceInfoSpecifics& specifics) { | |
531 if (specifics.has_last_updated_timestamp()) { | |
532 return syncer::ProtoTimeToTime(specifics.last_updated_timestamp()); | |
533 } else { | |
534 return Time(); | |
535 } | |
536 } | |
537 | |
538 } // namespace sync_driver_v2 | |
OLD | NEW |