OLD | NEW |
| (Empty) |
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 #include "sync/engine/sync_scheduler_impl.h" | |
6 | |
7 #include <algorithm> | |
8 #include <cstring> | |
9 #include <utility> | |
10 | |
11 #include "base/bind.h" | |
12 #include "base/bind_helpers.h" | |
13 #include "base/compiler_specific.h" | |
14 #include "base/location.h" | |
15 #include "base/logging.h" | |
16 #include "base/single_thread_task_runner.h" | |
17 #include "base/threading/platform_thread.h" | |
18 #include "base/threading/thread_task_runner_handle.h" | |
19 #include "sync/engine/backoff_delay_provider.h" | |
20 #include "sync/engine/syncer.h" | |
21 #include "sync/protocol/proto_enum_conversions.h" | |
22 #include "sync/protocol/sync.pb.h" | |
23 #include "sync/util/data_type_histogram.h" | |
24 #include "sync/util/logging.h" | |
25 | |
26 using base::TimeDelta; | |
27 using base::TimeTicks; | |
28 | |
29 namespace syncer { | |
30 | |
31 using sessions::SyncSession; | |
32 using sessions::SyncSessionSnapshot; | |
33 using sync_pb::GetUpdatesCallerInfo; | |
34 | |
35 namespace { | |
36 | |
37 bool IsConfigRelatedUpdateSourceValue( | |
38 GetUpdatesCallerInfo::GetUpdatesSource source) { | |
39 switch (source) { | |
40 case GetUpdatesCallerInfo::RECONFIGURATION: | |
41 case GetUpdatesCallerInfo::MIGRATION: | |
42 case GetUpdatesCallerInfo::NEW_CLIENT: | |
43 case GetUpdatesCallerInfo::NEWLY_SUPPORTED_DATATYPE: | |
44 case GetUpdatesCallerInfo::PROGRAMMATIC: | |
45 return true; | |
46 default: | |
47 return false; | |
48 } | |
49 } | |
50 | |
51 bool ShouldRequestEarlyExit(const SyncProtocolError& error) { | |
52 switch (error.error_type) { | |
53 case SYNC_SUCCESS: | |
54 case MIGRATION_DONE: | |
55 case THROTTLED: | |
56 case TRANSIENT_ERROR: | |
57 case PARTIAL_FAILURE: | |
58 return false; | |
59 case NOT_MY_BIRTHDAY: | |
60 case CLIENT_DATA_OBSOLETE: | |
61 case CLEAR_PENDING: | |
62 case DISABLED_BY_ADMIN: | |
63 // If we send terminate sync early then |sync_cycle_ended| notification | |
64 // would not be sent. If there were no actions then |ACTIONABLE_ERROR| | |
65 // notification wouldnt be sent either. Then the UI layer would be left | |
66 // waiting forever. So assert we would send something. | |
67 DCHECK_NE(error.action, UNKNOWN_ACTION); | |
68 return true; | |
69 case INVALID_CREDENTIAL: | |
70 // The notification for this is handled by PostAndProcessHeaders|. | |
71 // Server does no have to send any action for this. | |
72 return true; | |
73 // Make UNKNOWN_ERROR a NOTREACHED. All the other error should be explicitly | |
74 // handled. | |
75 case UNKNOWN_ERROR: | |
76 NOTREACHED(); | |
77 return false; | |
78 } | |
79 return false; | |
80 } | |
81 | |
82 bool IsActionableError( | |
83 const SyncProtocolError& error) { | |
84 return (error.action != UNKNOWN_ACTION); | |
85 } | |
86 | |
87 void RunAndReset(base::Closure* task) { | |
88 DCHECK(task); | |
89 if (task->is_null()) | |
90 return; | |
91 task->Run(); | |
92 task->Reset(); | |
93 } | |
94 | |
95 } // namespace | |
96 | |
97 ConfigurationParams::ConfigurationParams() | |
98 : source(GetUpdatesCallerInfo::UNKNOWN) {} | |
99 ConfigurationParams::ConfigurationParams( | |
100 const sync_pb::GetUpdatesCallerInfo::GetUpdatesSource& source, | |
101 ModelTypeSet types_to_download, | |
102 const ModelSafeRoutingInfo& routing_info, | |
103 const base::Closure& ready_task, | |
104 const base::Closure& retry_task) | |
105 : source(source), | |
106 types_to_download(types_to_download), | |
107 routing_info(routing_info), | |
108 ready_task(ready_task), | |
109 retry_task(retry_task) { | |
110 DCHECK(!ready_task.is_null()); | |
111 } | |
112 ConfigurationParams::ConfigurationParams(const ConfigurationParams& other) = | |
113 default; | |
114 ConfigurationParams::~ConfigurationParams() {} | |
115 | |
116 ClearParams::ClearParams(const base::Closure& report_success_task) | |
117 : report_success_task(report_success_task) { | |
118 DCHECK(!report_success_task.is_null()); | |
119 } | |
120 ClearParams::ClearParams(const ClearParams& other) = default; | |
121 ClearParams::~ClearParams() {} | |
122 | |
123 SyncSchedulerImpl::WaitInterval::WaitInterval() | |
124 : mode(UNKNOWN) {} | |
125 | |
126 SyncSchedulerImpl::WaitInterval::WaitInterval(Mode mode, TimeDelta length) | |
127 : mode(mode), length(length) {} | |
128 | |
129 SyncSchedulerImpl::WaitInterval::~WaitInterval() {} | |
130 | |
131 #define ENUM_CASE(x) case x: return #x; break; | |
132 | |
133 const char* SyncSchedulerImpl::WaitInterval::GetModeString(Mode mode) { | |
134 switch (mode) { | |
135 ENUM_CASE(UNKNOWN); | |
136 ENUM_CASE(EXPONENTIAL_BACKOFF); | |
137 ENUM_CASE(THROTTLED); | |
138 } | |
139 NOTREACHED(); | |
140 return ""; | |
141 } | |
142 | |
143 GetUpdatesCallerInfo::GetUpdatesSource GetUpdatesFromNudgeSource( | |
144 NudgeSource source) { | |
145 switch (source) { | |
146 case NUDGE_SOURCE_NOTIFICATION: | |
147 return GetUpdatesCallerInfo::NOTIFICATION; | |
148 case NUDGE_SOURCE_LOCAL: | |
149 return GetUpdatesCallerInfo::LOCAL; | |
150 case NUDGE_SOURCE_LOCAL_REFRESH: | |
151 return GetUpdatesCallerInfo::DATATYPE_REFRESH; | |
152 case NUDGE_SOURCE_UNKNOWN: | |
153 return GetUpdatesCallerInfo::UNKNOWN; | |
154 default: | |
155 NOTREACHED(); | |
156 return GetUpdatesCallerInfo::UNKNOWN; | |
157 } | |
158 } | |
159 | |
160 // Helper macros to log with the syncer thread name; useful when there | |
161 // are multiple syncer threads involved. | |
162 | |
163 #define SLOG(severity) LOG(severity) << name_ << ": " | |
164 | |
165 #define SDVLOG(verbose_level) DVLOG(verbose_level) << name_ << ": " | |
166 | |
167 #define SDVLOG_LOC(from_here, verbose_level) \ | |
168 DVLOG_LOC(from_here, verbose_level) << name_ << ": " | |
169 | |
170 SyncSchedulerImpl::SyncSchedulerImpl(const std::string& name, | |
171 BackoffDelayProvider* delay_provider, | |
172 sessions::SyncSessionContext* context, | |
173 Syncer* syncer) | |
174 : name_(name), | |
175 started_(false), | |
176 syncer_short_poll_interval_seconds_( | |
177 TimeDelta::FromSeconds(kDefaultShortPollIntervalSeconds)), | |
178 syncer_long_poll_interval_seconds_( | |
179 TimeDelta::FromSeconds(kDefaultLongPollIntervalSeconds)), | |
180 mode_(CONFIGURATION_MODE), | |
181 delay_provider_(delay_provider), | |
182 syncer_(syncer), | |
183 session_context_(context), | |
184 next_sync_session_job_priority_(NORMAL_PRIORITY), | |
185 weak_ptr_factory_(this), | |
186 weak_ptr_factory_for_weak_handle_(this) { | |
187 weak_handle_this_ = MakeWeakHandle( | |
188 weak_ptr_factory_for_weak_handle_.GetWeakPtr()); | |
189 } | |
190 | |
191 SyncSchedulerImpl::~SyncSchedulerImpl() { | |
192 DCHECK(CalledOnValidThread()); | |
193 Stop(); | |
194 } | |
195 | |
196 void SyncSchedulerImpl::OnCredentialsUpdated() { | |
197 DCHECK(CalledOnValidThread()); | |
198 | |
199 if (HttpResponse::SYNC_AUTH_ERROR == | |
200 session_context_->connection_manager()->server_status()) { | |
201 OnServerConnectionErrorFixed(); | |
202 } | |
203 } | |
204 | |
205 void SyncSchedulerImpl::OnConnectionStatusChange() { | |
206 if (HttpResponse::CONNECTION_UNAVAILABLE == | |
207 session_context_->connection_manager()->server_status()) { | |
208 // Optimistically assume that the connection is fixed and try | |
209 // connecting. | |
210 OnServerConnectionErrorFixed(); | |
211 } | |
212 } | |
213 | |
214 void SyncSchedulerImpl::OnServerConnectionErrorFixed() { | |
215 // There could be a pending nudge or configuration job in several cases: | |
216 // | |
217 // 1. We're in exponential backoff. | |
218 // 2. We're silenced / throttled. | |
219 // 3. A nudge was saved previously due to not having a valid auth token. | |
220 // 4. A nudge was scheduled + saved while in configuration mode. | |
221 // | |
222 // In all cases except (2), we want to retry contacting the server. We | |
223 // call TryCanaryJob to achieve this, and note that nothing -- not even a | |
224 // canary job -- can bypass a THROTTLED WaitInterval. The only thing that | |
225 // has the authority to do that is the Unthrottle timer. | |
226 TryCanaryJob(); | |
227 } | |
228 | |
229 void SyncSchedulerImpl::Start(Mode mode, base::Time last_poll_time) { | |
230 DCHECK(CalledOnValidThread()); | |
231 std::string thread_name = base::PlatformThread::GetName(); | |
232 if (thread_name.empty()) | |
233 thread_name = "<Main thread>"; | |
234 SDVLOG(2) << "Start called from thread " | |
235 << thread_name << " with mode " << GetModeString(mode); | |
236 if (!started_) { | |
237 started_ = true; | |
238 SendInitialSnapshot(); | |
239 } | |
240 | |
241 DCHECK(syncer_.get()); | |
242 | |
243 if (mode == CLEAR_SERVER_DATA_MODE) { | |
244 DCHECK_EQ(mode_, CONFIGURATION_MODE); | |
245 } | |
246 Mode old_mode = mode_; | |
247 mode_ = mode; | |
248 // Only adjust the poll reset time if it was valid and in the past. | |
249 if (!last_poll_time.is_null() && last_poll_time < base::Time::Now()) { | |
250 // Convert from base::Time to base::TimeTicks. The reason we use Time | |
251 // for persisting is that TimeTicks can stop making forward progress when | |
252 // the machine is suspended. This implies that on resume the client might | |
253 // actually have miss the real poll, unless the client is restarted. Fixing | |
254 // that would require using an AlarmTimer though, which is only supported | |
255 // on certain platforms. | |
256 last_poll_reset_ = | |
257 base::TimeTicks::Now() - (base::Time::Now() - last_poll_time); | |
258 } | |
259 | |
260 if (old_mode != mode_ && mode_ == NORMAL_MODE) { | |
261 // We just got back to normal mode. Let's try to run the work that was | |
262 // queued up while we were configuring. | |
263 | |
264 AdjustPolling(UPDATE_INTERVAL); // Will kick start poll timer if needed. | |
265 | |
266 // Update our current time before checking IsRetryRequired(). | |
267 nudge_tracker_.SetSyncCycleStartTime(base::TimeTicks::Now()); | |
268 if (nudge_tracker_.IsSyncRequired() && CanRunNudgeJobNow(NORMAL_PRIORITY)) { | |
269 TrySyncSessionJob(); | |
270 } | |
271 } | |
272 } | |
273 | |
274 ModelTypeSet SyncSchedulerImpl::GetEnabledAndUnthrottledTypes() { | |
275 ModelTypeSet enabled_types = session_context_->GetEnabledTypes(); | |
276 ModelTypeSet enabled_protocol_types = | |
277 Intersection(ProtocolTypes(), enabled_types); | |
278 ModelTypeSet throttled_types = nudge_tracker_.GetThrottledTypes(); | |
279 return Difference(enabled_protocol_types, throttled_types); | |
280 } | |
281 | |
282 void SyncSchedulerImpl::SendInitialSnapshot() { | |
283 DCHECK(CalledOnValidThread()); | |
284 std::unique_ptr<SyncSession> dummy( | |
285 SyncSession::Build(session_context_, this)); | |
286 SyncCycleEvent event(SyncCycleEvent::STATUS_CHANGED); | |
287 event.snapshot = dummy->TakeSnapshot(); | |
288 FOR_EACH_OBSERVER(SyncEngineEventListener, | |
289 *session_context_->listeners(), | |
290 OnSyncCycleEvent(event)); | |
291 } | |
292 | |
293 namespace { | |
294 | |
295 // Helper to extract the routing info corresponding to types in | |
296 // |types_to_download| from |current_routes|. | |
297 void BuildModelSafeParams( | |
298 ModelTypeSet types_to_download, | |
299 const ModelSafeRoutingInfo& current_routes, | |
300 ModelSafeRoutingInfo* result_routes) { | |
301 for (ModelTypeSet::Iterator iter = types_to_download.First(); iter.Good(); | |
302 iter.Inc()) { | |
303 ModelType type = iter.Get(); | |
304 ModelSafeRoutingInfo::const_iterator route = current_routes.find(type); | |
305 DCHECK(route != current_routes.end()); | |
306 ModelSafeGroup group = route->second; | |
307 (*result_routes)[type] = group; | |
308 } | |
309 } | |
310 | |
311 } // namespace. | |
312 | |
313 void SyncSchedulerImpl::ScheduleConfiguration( | |
314 const ConfigurationParams& params) { | |
315 DCHECK(CalledOnValidThread()); | |
316 DCHECK(IsConfigRelatedUpdateSourceValue(params.source)); | |
317 DCHECK_EQ(CONFIGURATION_MODE, mode_); | |
318 DCHECK(!params.ready_task.is_null()); | |
319 CHECK(started_) << "Scheduler must be running to configure."; | |
320 SDVLOG(2) << "Reconfiguring syncer."; | |
321 | |
322 // Only one configuration is allowed at a time. Verify we're not waiting | |
323 // for a pending configure job. | |
324 DCHECK(!pending_configure_params_); | |
325 | |
326 ModelSafeRoutingInfo restricted_routes; | |
327 BuildModelSafeParams(params.types_to_download, | |
328 params.routing_info, | |
329 &restricted_routes); | |
330 session_context_->SetRoutingInfo(restricted_routes); | |
331 | |
332 // Only reconfigure if we have types to download. | |
333 if (!params.types_to_download.Empty()) { | |
334 pending_configure_params_.reset(new ConfigurationParams(params)); | |
335 TrySyncSessionJob(); | |
336 } else { | |
337 SDVLOG(2) << "No change in routing info, calling ready task directly."; | |
338 params.ready_task.Run(); | |
339 } | |
340 } | |
341 | |
342 void SyncSchedulerImpl::ScheduleClearServerData(const ClearParams& params) { | |
343 DCHECK(CalledOnValidThread()); | |
344 DCHECK_EQ(CLEAR_SERVER_DATA_MODE, mode_); | |
345 DCHECK(!pending_configure_params_); | |
346 DCHECK(!params.report_success_task.is_null()); | |
347 CHECK(started_) << "Scheduler must be running to clear."; | |
348 pending_clear_params_.reset(new ClearParams(params)); | |
349 TrySyncSessionJob(); | |
350 } | |
351 | |
352 bool SyncSchedulerImpl::CanRunJobNow(JobPriority priority) { | |
353 DCHECK(CalledOnValidThread()); | |
354 if (IsCurrentlyThrottled()) { | |
355 SDVLOG(1) << "Unable to run a job because we're throttled."; | |
356 return false; | |
357 } | |
358 | |
359 if (IsBackingOff() && priority != CANARY_PRIORITY) { | |
360 SDVLOG(1) << "Unable to run a job because we're backing off."; | |
361 return false; | |
362 } | |
363 | |
364 if (session_context_->connection_manager()->HasInvalidAuthToken()) { | |
365 SDVLOG(1) << "Unable to run a job because we have no valid auth token."; | |
366 return false; | |
367 } | |
368 | |
369 return true; | |
370 } | |
371 | |
372 bool SyncSchedulerImpl::CanRunNudgeJobNow(JobPriority priority) { | |
373 DCHECK(CalledOnValidThread()); | |
374 | |
375 if (!CanRunJobNow(priority)) { | |
376 SDVLOG(1) << "Unable to run a nudge job right now"; | |
377 return false; | |
378 } | |
379 | |
380 const ModelTypeSet enabled_types = session_context_->GetEnabledTypes(); | |
381 if (nudge_tracker_.GetThrottledTypes().HasAll(enabled_types)) { | |
382 SDVLOG(1) << "Not running a nudge because we're fully type throttled."; | |
383 return false; | |
384 } | |
385 | |
386 if (mode_ != NORMAL_MODE) { | |
387 SDVLOG(1) << "Not running nudge because we're not in normal mode."; | |
388 return false; | |
389 } | |
390 | |
391 return true; | |
392 } | |
393 | |
394 void SyncSchedulerImpl::ScheduleLocalNudge( | |
395 ModelTypeSet types, | |
396 const tracked_objects::Location& nudge_location) { | |
397 DCHECK(CalledOnValidThread()); | |
398 DCHECK(!types.Empty()); | |
399 | |
400 SDVLOG_LOC(nudge_location, 2) | |
401 << "Scheduling sync because of local change to " | |
402 << ModelTypeSetToString(types); | |
403 UpdateNudgeTimeRecords(types); | |
404 base::TimeDelta nudge_delay = nudge_tracker_.RecordLocalChange(types); | |
405 ScheduleNudgeImpl(nudge_delay, nudge_location); | |
406 } | |
407 | |
408 void SyncSchedulerImpl::ScheduleLocalRefreshRequest( | |
409 ModelTypeSet types, | |
410 const tracked_objects::Location& nudge_location) { | |
411 DCHECK(CalledOnValidThread()); | |
412 DCHECK(!types.Empty()); | |
413 | |
414 SDVLOG_LOC(nudge_location, 2) | |
415 << "Scheduling sync because of local refresh request for " | |
416 << ModelTypeSetToString(types); | |
417 base::TimeDelta nudge_delay = nudge_tracker_.RecordLocalRefreshRequest(types); | |
418 ScheduleNudgeImpl(nudge_delay, nudge_location); | |
419 } | |
420 | |
421 void SyncSchedulerImpl::ScheduleInvalidationNudge( | |
422 syncer::ModelType model_type, | |
423 std::unique_ptr<InvalidationInterface> invalidation, | |
424 const tracked_objects::Location& nudge_location) { | |
425 DCHECK(CalledOnValidThread()); | |
426 | |
427 SDVLOG_LOC(nudge_location, 2) | |
428 << "Scheduling sync because we received invalidation for " | |
429 << ModelTypeToString(model_type); | |
430 base::TimeDelta nudge_delay = nudge_tracker_.RecordRemoteInvalidation( | |
431 model_type, std::move(invalidation)); | |
432 ScheduleNudgeImpl(nudge_delay, nudge_location); | |
433 } | |
434 | |
435 void SyncSchedulerImpl::ScheduleInitialSyncNudge(syncer::ModelType model_type) { | |
436 DCHECK(CalledOnValidThread()); | |
437 | |
438 SDVLOG(2) << "Scheduling non-blocking initial sync for " | |
439 << ModelTypeToString(model_type); | |
440 nudge_tracker_.RecordInitialSyncRequired(model_type); | |
441 ScheduleNudgeImpl(TimeDelta::FromSeconds(0), FROM_HERE); | |
442 } | |
443 | |
444 // TODO(zea): Consider adding separate throttling/backoff for datatype | |
445 // refresh requests. | |
446 void SyncSchedulerImpl::ScheduleNudgeImpl( | |
447 const TimeDelta& delay, | |
448 const tracked_objects::Location& nudge_location) { | |
449 DCHECK(CalledOnValidThread()); | |
450 CHECK(!syncer_->IsSyncing()); | |
451 | |
452 if (!started_) { | |
453 SDVLOG_LOC(nudge_location, 2) | |
454 << "Dropping nudge, scheduler is not running."; | |
455 return; | |
456 } | |
457 | |
458 SDVLOG_LOC(nudge_location, 2) | |
459 << "In ScheduleNudgeImpl with delay " | |
460 << delay.InMilliseconds() << " ms"; | |
461 | |
462 if (!CanRunNudgeJobNow(NORMAL_PRIORITY)) | |
463 return; | |
464 | |
465 TimeTicks incoming_run_time = TimeTicks::Now() + delay; | |
466 if (!scheduled_nudge_time_.is_null() && | |
467 (scheduled_nudge_time_ < incoming_run_time)) { | |
468 // Old job arrives sooner than this one. Don't reschedule it. | |
469 return; | |
470 } | |
471 | |
472 // Either there is no existing nudge in flight or the incoming nudge should be | |
473 // made to arrive first (preempt) the existing nudge. We reschedule in either | |
474 // case. | |
475 SDVLOG_LOC(nudge_location, 2) | |
476 << "Scheduling a nudge with " | |
477 << delay.InMilliseconds() << " ms delay"; | |
478 scheduled_nudge_time_ = incoming_run_time; | |
479 pending_wakeup_timer_.Start( | |
480 nudge_location, | |
481 delay, | |
482 base::Bind(&SyncSchedulerImpl::PerformDelayedNudge, | |
483 weak_ptr_factory_.GetWeakPtr())); | |
484 } | |
485 | |
486 const char* SyncSchedulerImpl::GetModeString(SyncScheduler::Mode mode) { | |
487 switch (mode) { | |
488 ENUM_CASE(CONFIGURATION_MODE); | |
489 ENUM_CASE(CLEAR_SERVER_DATA_MODE); | |
490 ENUM_CASE(NORMAL_MODE); | |
491 } | |
492 return ""; | |
493 } | |
494 | |
495 void SyncSchedulerImpl::SetDefaultNudgeDelay(base::TimeDelta delay_ms) { | |
496 DCHECK(CalledOnValidThread()); | |
497 nudge_tracker_.SetDefaultNudgeDelay(delay_ms); | |
498 } | |
499 | |
500 void SyncSchedulerImpl::DoNudgeSyncSessionJob(JobPriority priority) { | |
501 DCHECK(CalledOnValidThread()); | |
502 DCHECK(CanRunNudgeJobNow(priority)); | |
503 | |
504 DVLOG(2) << "Will run normal mode sync cycle with types " | |
505 << ModelTypeSetToString(session_context_->GetEnabledTypes()); | |
506 std::unique_ptr<SyncSession> session( | |
507 SyncSession::Build(session_context_, this)); | |
508 bool success = syncer_->NormalSyncShare( | |
509 GetEnabledAndUnthrottledTypes(), &nudge_tracker_, session.get()); | |
510 | |
511 if (success) { | |
512 // That cycle took care of any outstanding work we had. | |
513 SDVLOG(2) << "Nudge succeeded."; | |
514 nudge_tracker_.RecordSuccessfulSyncCycle(); | |
515 scheduled_nudge_time_ = base::TimeTicks(); | |
516 HandleSuccess(); | |
517 | |
518 // If this was a canary, we may need to restart the poll timer (the poll | |
519 // timer may have fired while the scheduler was in an error state, ignoring | |
520 // the poll). | |
521 if (!poll_timer_.IsRunning()) { | |
522 SDVLOG(1) << "Canary succeeded, restarting polling."; | |
523 AdjustPolling(UPDATE_INTERVAL); | |
524 } | |
525 } else { | |
526 HandleFailure(session->status_controller().model_neutral_state()); | |
527 } | |
528 } | |
529 | |
530 void SyncSchedulerImpl::DoConfigurationSyncSessionJob(JobPriority priority) { | |
531 DCHECK(CalledOnValidThread()); | |
532 DCHECK_EQ(mode_, CONFIGURATION_MODE); | |
533 DCHECK(pending_configure_params_ != NULL); | |
534 | |
535 if (!CanRunJobNow(priority)) { | |
536 SDVLOG(2) << "Unable to run configure job right now."; | |
537 RunAndReset(&pending_configure_params_->retry_task); | |
538 return; | |
539 } | |
540 | |
541 SDVLOG(2) << "Will run configure SyncShare with types " | |
542 << ModelTypeSetToString(session_context_->GetEnabledTypes()); | |
543 std::unique_ptr<SyncSession> session( | |
544 SyncSession::Build(session_context_, this)); | |
545 bool success = syncer_->ConfigureSyncShare( | |
546 pending_configure_params_->types_to_download, | |
547 pending_configure_params_->source, | |
548 session.get()); | |
549 | |
550 if (success) { | |
551 SDVLOG(2) << "Configure succeeded."; | |
552 pending_configure_params_->ready_task.Run(); | |
553 pending_configure_params_.reset(); | |
554 HandleSuccess(); | |
555 } else { | |
556 HandleFailure(session->status_controller().model_neutral_state()); | |
557 // Sync cycle might receive response from server that causes scheduler to | |
558 // stop and draws pending_configure_params_ invalid. | |
559 if (started_) | |
560 RunAndReset(&pending_configure_params_->retry_task); | |
561 } | |
562 } | |
563 | |
564 void SyncSchedulerImpl::DoClearServerDataSyncSessionJob(JobPriority priority) { | |
565 DCHECK(CalledOnValidThread()); | |
566 DCHECK_EQ(mode_, CLEAR_SERVER_DATA_MODE); | |
567 | |
568 if (!CanRunJobNow(priority)) { | |
569 SDVLOG(2) << "Unable to run clear server data job right now."; | |
570 RunAndReset(&pending_configure_params_->retry_task); | |
571 return; | |
572 } | |
573 | |
574 std::unique_ptr<SyncSession> session( | |
575 SyncSession::Build(session_context_, this)); | |
576 const bool success = syncer_->PostClearServerData(session.get()); | |
577 if (!success) { | |
578 HandleFailure(session->status_controller().model_neutral_state()); | |
579 return; | |
580 } | |
581 | |
582 SDVLOG(2) << "Clear succeeded."; | |
583 pending_clear_params_->report_success_task.Run(); | |
584 pending_clear_params_.reset(); | |
585 HandleSuccess(); | |
586 } | |
587 | |
588 void SyncSchedulerImpl::HandleSuccess() { | |
589 // If we're here, then we successfully reached the server. End all backoff. | |
590 wait_interval_.reset(); | |
591 NotifyRetryTime(base::Time()); | |
592 } | |
593 | |
594 void SyncSchedulerImpl::HandleFailure( | |
595 const sessions::ModelNeutralState& model_neutral_state) { | |
596 if (IsCurrentlyThrottled()) { | |
597 SDVLOG(2) << "Was throttled during previous sync cycle."; | |
598 } else if (!IsBackingOff()) { | |
599 // Setup our backoff if this is our first such failure. | |
600 TimeDelta length = delay_provider_->GetDelay( | |
601 delay_provider_->GetInitialDelay(model_neutral_state)); | |
602 wait_interval_.reset( | |
603 new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF, length)); | |
604 SDVLOG(2) << "Sync cycle failed. Will back off for " | |
605 << wait_interval_->length.InMilliseconds() << "ms."; | |
606 } else { | |
607 // Increase our backoff interval and schedule another retry. | |
608 TimeDelta length = delay_provider_->GetDelay(wait_interval_->length); | |
609 wait_interval_.reset( | |
610 new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF, length)); | |
611 SDVLOG(2) << "Sync cycle failed. Will back off for " | |
612 << wait_interval_->length.InMilliseconds() << "ms."; | |
613 } | |
614 RestartWaiting(); | |
615 } | |
616 | |
617 void SyncSchedulerImpl::DoPollSyncSessionJob() { | |
618 SDVLOG(2) << "Polling with types " | |
619 << ModelTypeSetToString(GetEnabledAndUnthrottledTypes()); | |
620 std::unique_ptr<SyncSession> session( | |
621 SyncSession::Build(session_context_, this)); | |
622 bool success = syncer_->PollSyncShare( | |
623 GetEnabledAndUnthrottledTypes(), | |
624 session.get()); | |
625 | |
626 // Only restart the timer if the poll succeeded. Otherwise rely on normal | |
627 // failure handling to retry with backoff. | |
628 if (success) { | |
629 AdjustPolling(FORCE_RESET); | |
630 HandleSuccess(); | |
631 } else { | |
632 HandleFailure(session->status_controller().model_neutral_state()); | |
633 } | |
634 } | |
635 | |
636 void SyncSchedulerImpl::UpdateNudgeTimeRecords(ModelTypeSet types) { | |
637 DCHECK(CalledOnValidThread()); | |
638 base::TimeTicks now = TimeTicks::Now(); | |
639 // Update timing information for how often datatypes are triggering nudges. | |
640 for (ModelTypeSet::Iterator iter = types.First(); iter.Good(); iter.Inc()) { | |
641 base::TimeTicks previous = last_local_nudges_by_model_type_[iter.Get()]; | |
642 last_local_nudges_by_model_type_[iter.Get()] = now; | |
643 if (previous.is_null()) | |
644 continue; | |
645 | |
646 #define PER_DATA_TYPE_MACRO(type_str) \ | |
647 SYNC_FREQ_HISTOGRAM("Sync.Freq" type_str, now - previous); | |
648 SYNC_DATA_TYPE_HISTOGRAM(iter.Get()); | |
649 #undef PER_DATA_TYPE_MACRO | |
650 } | |
651 } | |
652 | |
653 TimeDelta SyncSchedulerImpl::GetPollInterval() { | |
654 return (!session_context_->notifications_enabled() || | |
655 !session_context_->ShouldFetchUpdatesBeforeCommit()) ? | |
656 syncer_short_poll_interval_seconds_ : | |
657 syncer_long_poll_interval_seconds_; | |
658 } | |
659 | |
660 void SyncSchedulerImpl::AdjustPolling(PollAdjustType type) { | |
661 DCHECK(CalledOnValidThread()); | |
662 if (!started_) | |
663 return; | |
664 | |
665 TimeDelta poll_interval = GetPollInterval(); | |
666 TimeDelta poll_delay = poll_interval; | |
667 const TimeTicks now = TimeTicks::Now(); | |
668 | |
669 if (type == UPDATE_INTERVAL) { | |
670 if (!last_poll_reset_.is_null()) { | |
671 // Override the delay based on the last successful poll time (if it was | |
672 // set). | |
673 TimeTicks new_poll_time = poll_interval + last_poll_reset_; | |
674 poll_delay = new_poll_time - TimeTicks::Now(); | |
675 | |
676 if (poll_delay < TimeDelta()) { | |
677 // The desired poll time was in the past, so trigger a poll now (the | |
678 // timer will post the task asynchronously, so re-entrancy isn't an | |
679 // issue). | |
680 poll_delay = TimeDelta(); | |
681 } | |
682 } else { | |
683 // There was no previous poll. Keep the delay set to the normal interval, | |
684 // as if we had just completed a poll. | |
685 DCHECK_EQ(GetPollInterval(), poll_delay); | |
686 last_poll_reset_ = now; | |
687 } | |
688 } else { | |
689 // Otherwise just restart the timer. | |
690 DCHECK_EQ(FORCE_RESET, type); | |
691 DCHECK_EQ(GetPollInterval(), poll_delay); | |
692 last_poll_reset_ = now; | |
693 } | |
694 | |
695 SDVLOG(1) << "Updating polling delay to " << poll_delay.InMinutes() | |
696 << " minutes."; | |
697 | |
698 // Adjust poll rate. Start will reset the timer if it was already running. | |
699 poll_timer_.Start(FROM_HERE, poll_delay, this, | |
700 &SyncSchedulerImpl::PollTimerCallback); | |
701 } | |
702 | |
703 void SyncSchedulerImpl::RestartWaiting() { | |
704 CHECK(wait_interval_.get()); | |
705 DCHECK(wait_interval_->length >= TimeDelta::FromSeconds(0)); | |
706 NotifyRetryTime(base::Time::Now() + wait_interval_->length); | |
707 SDVLOG(2) << "Starting WaitInterval timer of length " | |
708 << wait_interval_->length.InMilliseconds() << "ms."; | |
709 if (wait_interval_->mode == WaitInterval::THROTTLED) { | |
710 pending_wakeup_timer_.Start( | |
711 FROM_HERE, | |
712 wait_interval_->length, | |
713 base::Bind(&SyncSchedulerImpl::Unthrottle, | |
714 weak_ptr_factory_.GetWeakPtr())); | |
715 } else { | |
716 pending_wakeup_timer_.Start( | |
717 FROM_HERE, | |
718 wait_interval_->length, | |
719 base::Bind(&SyncSchedulerImpl::ExponentialBackoffRetry, | |
720 weak_ptr_factory_.GetWeakPtr())); | |
721 } | |
722 } | |
723 | |
724 void SyncSchedulerImpl::Stop() { | |
725 DCHECK(CalledOnValidThread()); | |
726 SDVLOG(2) << "Stop called"; | |
727 | |
728 // Kill any in-flight method calls. | |
729 weak_ptr_factory_.InvalidateWeakPtrs(); | |
730 wait_interval_.reset(); | |
731 NotifyRetryTime(base::Time()); | |
732 poll_timer_.Stop(); | |
733 pending_wakeup_timer_.Stop(); | |
734 pending_configure_params_.reset(); | |
735 pending_clear_params_.reset(); | |
736 if (started_) | |
737 started_ = false; | |
738 } | |
739 | |
740 // This is the only place where we invoke DoSyncSessionJob with canary | |
741 // privileges. Everyone else should use NORMAL_PRIORITY. | |
742 void SyncSchedulerImpl::TryCanaryJob() { | |
743 next_sync_session_job_priority_ = CANARY_PRIORITY; | |
744 SDVLOG(2) << "Attempting canary job"; | |
745 TrySyncSessionJob(); | |
746 } | |
747 | |
748 void SyncSchedulerImpl::TrySyncSessionJob() { | |
749 // Post call to TrySyncSessionJobImpl on current thread. Later request for | |
750 // access token will be here. | |
751 base::ThreadTaskRunnerHandle::Get()->PostTask( | |
752 FROM_HERE, base::Bind(&SyncSchedulerImpl::TrySyncSessionJobImpl, | |
753 weak_ptr_factory_.GetWeakPtr())); | |
754 } | |
755 | |
756 void SyncSchedulerImpl::TrySyncSessionJobImpl() { | |
757 JobPriority priority = next_sync_session_job_priority_; | |
758 next_sync_session_job_priority_ = NORMAL_PRIORITY; | |
759 | |
760 nudge_tracker_.SetSyncCycleStartTime(base::TimeTicks::Now()); | |
761 | |
762 DCHECK(CalledOnValidThread()); | |
763 if (mode_ == CONFIGURATION_MODE) { | |
764 if (pending_configure_params_) { | |
765 SDVLOG(2) << "Found pending configure job"; | |
766 DoConfigurationSyncSessionJob(priority); | |
767 } | |
768 } else if (mode_ == CLEAR_SERVER_DATA_MODE) { | |
769 if (pending_clear_params_) { | |
770 DoClearServerDataSyncSessionJob(priority); | |
771 } | |
772 } else if (CanRunNudgeJobNow(priority)) { | |
773 if (nudge_tracker_.IsSyncRequired()) { | |
774 SDVLOG(2) << "Found pending nudge job"; | |
775 DoNudgeSyncSessionJob(priority); | |
776 } else if (((base::TimeTicks::Now() - last_poll_reset_) >= | |
777 GetPollInterval())) { | |
778 SDVLOG(2) << "Found pending poll"; | |
779 DoPollSyncSessionJob(); | |
780 } | |
781 } else { | |
782 // We must be in an error state. Transitioning out of each of these | |
783 // error states should trigger a canary job. | |
784 DCHECK(IsCurrentlyThrottled() || IsBackingOff() || | |
785 session_context_->connection_manager()->HasInvalidAuthToken()); | |
786 } | |
787 | |
788 if (IsBackingOff() && !pending_wakeup_timer_.IsRunning()) { | |
789 // If we succeeded, our wait interval would have been cleared. If it hasn't | |
790 // been cleared, then we should increase our backoff interval and schedule | |
791 // another retry. | |
792 TimeDelta length = delay_provider_->GetDelay(wait_interval_->length); | |
793 wait_interval_.reset( | |
794 new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF, length)); | |
795 SDVLOG(2) << "Sync cycle failed. Will back off for " | |
796 << wait_interval_->length.InMilliseconds() << "ms."; | |
797 RestartWaiting(); | |
798 } | |
799 } | |
800 | |
801 void SyncSchedulerImpl::PollTimerCallback() { | |
802 DCHECK(CalledOnValidThread()); | |
803 CHECK(!syncer_->IsSyncing()); | |
804 | |
805 TrySyncSessionJob(); | |
806 } | |
807 | |
808 void SyncSchedulerImpl::RetryTimerCallback() { | |
809 TrySyncSessionJob(); | |
810 } | |
811 | |
812 void SyncSchedulerImpl::Unthrottle() { | |
813 DCHECK(CalledOnValidThread()); | |
814 DCHECK_EQ(WaitInterval::THROTTLED, wait_interval_->mode); | |
815 | |
816 // We're no longer throttled, so clear the wait interval. | |
817 wait_interval_.reset(); | |
818 NotifyRetryTime(base::Time()); | |
819 NotifyThrottledTypesChanged(nudge_tracker_.GetThrottledTypes()); | |
820 | |
821 // We treat this as a 'canary' in the sense that it was originally scheduled | |
822 // to run some time ago, failed, and we now want to retry, versus a job that | |
823 // was just created (e.g via ScheduleNudgeImpl). The main implication is | |
824 // that we're careful to update routing info (etc) with such potentially | |
825 // stale canary jobs. | |
826 TryCanaryJob(); | |
827 } | |
828 | |
829 void SyncSchedulerImpl::TypeUnthrottle(base::TimeTicks unthrottle_time) { | |
830 DCHECK(CalledOnValidThread()); | |
831 nudge_tracker_.UpdateTypeThrottlingState(unthrottle_time); | |
832 NotifyThrottledTypesChanged(nudge_tracker_.GetThrottledTypes()); | |
833 | |
834 if (nudge_tracker_.IsAnyTypeThrottled()) { | |
835 const base::TimeTicks now = base::TimeTicks::Now(); | |
836 base::TimeDelta time_until_next_unthrottle = | |
837 nudge_tracker_.GetTimeUntilNextUnthrottle(now); | |
838 type_unthrottle_timer_.Start( | |
839 FROM_HERE, | |
840 time_until_next_unthrottle, | |
841 base::Bind(&SyncSchedulerImpl::TypeUnthrottle, | |
842 weak_ptr_factory_.GetWeakPtr(), | |
843 now + time_until_next_unthrottle)); | |
844 } | |
845 | |
846 // Maybe this is a good time to run a nudge job. Let's try it. | |
847 if (nudge_tracker_.IsSyncRequired() && CanRunNudgeJobNow(NORMAL_PRIORITY)) | |
848 TrySyncSessionJob(); | |
849 } | |
850 | |
851 void SyncSchedulerImpl::PerformDelayedNudge() { | |
852 // Circumstances may have changed since we scheduled this delayed nudge. | |
853 // We must check to see if it's OK to run the job before we do so. | |
854 if (CanRunNudgeJobNow(NORMAL_PRIORITY)) | |
855 TrySyncSessionJob(); | |
856 | |
857 // We're not responsible for setting up any retries here. The functions that | |
858 // first put us into a state that prevents successful sync cycles (eg. global | |
859 // throttling, type throttling, network errors, transient errors) will also | |
860 // setup the appropriate retry logic (eg. retry after timeout, exponential | |
861 // backoff, retry when the network changes). | |
862 } | |
863 | |
864 void SyncSchedulerImpl::ExponentialBackoffRetry() { | |
865 TryCanaryJob(); | |
866 } | |
867 | |
868 void SyncSchedulerImpl::NotifyRetryTime(base::Time retry_time) { | |
869 FOR_EACH_OBSERVER(SyncEngineEventListener, | |
870 *session_context_->listeners(), | |
871 OnRetryTimeChanged(retry_time)); | |
872 } | |
873 | |
874 void SyncSchedulerImpl::NotifyThrottledTypesChanged(ModelTypeSet types) { | |
875 FOR_EACH_OBSERVER(SyncEngineEventListener, | |
876 *session_context_->listeners(), | |
877 OnThrottledTypesChanged(types)); | |
878 } | |
879 | |
880 bool SyncSchedulerImpl::IsBackingOff() const { | |
881 DCHECK(CalledOnValidThread()); | |
882 return wait_interval_.get() && wait_interval_->mode == | |
883 WaitInterval::EXPONENTIAL_BACKOFF; | |
884 } | |
885 | |
886 void SyncSchedulerImpl::OnThrottled(const base::TimeDelta& throttle_duration) { | |
887 DCHECK(CalledOnValidThread()); | |
888 wait_interval_.reset(new WaitInterval(WaitInterval::THROTTLED, | |
889 throttle_duration)); | |
890 NotifyRetryTime(base::Time::Now() + wait_interval_->length); | |
891 NotifyThrottledTypesChanged(ModelTypeSet::All()); | |
892 } | |
893 | |
894 void SyncSchedulerImpl::OnTypesThrottled( | |
895 ModelTypeSet types, | |
896 const base::TimeDelta& throttle_duration) { | |
897 base::TimeTicks now = base::TimeTicks::Now(); | |
898 | |
899 SDVLOG(1) << "Throttling " << ModelTypeSetToString(types) << " for " | |
900 << throttle_duration.InMinutes() << " minutes."; | |
901 | |
902 nudge_tracker_.SetTypesThrottledUntil(types, throttle_duration, now); | |
903 base::TimeDelta time_until_next_unthrottle = | |
904 nudge_tracker_.GetTimeUntilNextUnthrottle(now); | |
905 type_unthrottle_timer_.Start( | |
906 FROM_HERE, | |
907 time_until_next_unthrottle, | |
908 base::Bind(&SyncSchedulerImpl::TypeUnthrottle, | |
909 weak_ptr_factory_.GetWeakPtr(), | |
910 now + time_until_next_unthrottle)); | |
911 NotifyThrottledTypesChanged(nudge_tracker_.GetThrottledTypes()); | |
912 } | |
913 | |
914 bool SyncSchedulerImpl::IsCurrentlyThrottled() { | |
915 DCHECK(CalledOnValidThread()); | |
916 return wait_interval_.get() && wait_interval_->mode == | |
917 WaitInterval::THROTTLED; | |
918 } | |
919 | |
920 void SyncSchedulerImpl::OnReceivedShortPollIntervalUpdate( | |
921 const base::TimeDelta& new_interval) { | |
922 DCHECK(CalledOnValidThread()); | |
923 if (new_interval == syncer_short_poll_interval_seconds_) | |
924 return; | |
925 SDVLOG(1) << "Updating short poll interval to " << new_interval.InMinutes() | |
926 << " minutes."; | |
927 syncer_short_poll_interval_seconds_ = new_interval; | |
928 AdjustPolling(UPDATE_INTERVAL); | |
929 } | |
930 | |
931 void SyncSchedulerImpl::OnReceivedLongPollIntervalUpdate( | |
932 const base::TimeDelta& new_interval) { | |
933 DCHECK(CalledOnValidThread()); | |
934 if (new_interval == syncer_long_poll_interval_seconds_) | |
935 return; | |
936 SDVLOG(1) << "Updating long poll interval to " << new_interval.InMinutes() | |
937 << " minutes."; | |
938 syncer_long_poll_interval_seconds_ = new_interval; | |
939 AdjustPolling(UPDATE_INTERVAL); | |
940 } | |
941 | |
942 void SyncSchedulerImpl::OnReceivedCustomNudgeDelays( | |
943 const std::map<ModelType, base::TimeDelta>& nudge_delays) { | |
944 DCHECK(CalledOnValidThread()); | |
945 nudge_tracker_.OnReceivedCustomNudgeDelays(nudge_delays); | |
946 } | |
947 | |
948 void SyncSchedulerImpl::OnReceivedClientInvalidationHintBufferSize(int size) { | |
949 if (size > 0) | |
950 nudge_tracker_.SetHintBufferSize(size); | |
951 else | |
952 NOTREACHED() << "Hint buffer size should be > 0."; | |
953 } | |
954 | |
955 void SyncSchedulerImpl::OnSyncProtocolError( | |
956 const SyncProtocolError& sync_protocol_error) { | |
957 DCHECK(CalledOnValidThread()); | |
958 if (ShouldRequestEarlyExit(sync_protocol_error)) { | |
959 SDVLOG(2) << "Sync Scheduler requesting early exit."; | |
960 Stop(); | |
961 } | |
962 if (IsActionableError(sync_protocol_error)) { | |
963 SDVLOG(2) << "OnActionableError"; | |
964 FOR_EACH_OBSERVER(SyncEngineEventListener, | |
965 *session_context_->listeners(), | |
966 OnActionableError(sync_protocol_error)); | |
967 } | |
968 } | |
969 | |
970 void SyncSchedulerImpl::OnReceivedGuRetryDelay(const base::TimeDelta& delay) { | |
971 nudge_tracker_.SetNextRetryTime(TimeTicks::Now() + delay); | |
972 retry_timer_.Start(FROM_HERE, delay, this, | |
973 &SyncSchedulerImpl::RetryTimerCallback); | |
974 } | |
975 | |
976 void SyncSchedulerImpl::OnReceivedMigrationRequest(ModelTypeSet types) { | |
977 FOR_EACH_OBSERVER(SyncEngineEventListener, | |
978 *session_context_->listeners(), | |
979 OnMigrationRequested(types)); | |
980 } | |
981 | |
982 void SyncSchedulerImpl::SetNotificationsEnabled(bool notifications_enabled) { | |
983 DCHECK(CalledOnValidThread()); | |
984 session_context_->set_notifications_enabled(notifications_enabled); | |
985 if (notifications_enabled) | |
986 nudge_tracker_.OnInvalidationsEnabled(); | |
987 else | |
988 nudge_tracker_.OnInvalidationsDisabled(); | |
989 } | |
990 | |
991 #undef SDVLOG_LOC | |
992 | |
993 #undef SDVLOG | |
994 | |
995 #undef SLOG | |
996 | |
997 #undef ENUM_CASE | |
998 | |
999 } // namespace syncer | |
OLD | NEW |