OLD | NEW |
---|---|
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "sync/engine/sync_scheduler.h" | 5 #include "sync/engine/sync_scheduler.h" |
6 | 6 |
7 #include <algorithm> | 7 #include <algorithm> |
8 #include <cstring> | 8 #include <cstring> |
9 | 9 |
10 #include "base/bind.h" | 10 #include "base/bind.h" |
(...skipping 49 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
60 return false; | 60 return false; |
61 } | 61 } |
62 } | 62 } |
63 | 63 |
64 bool IsActionableError( | 64 bool IsActionableError( |
65 const browser_sync::SyncProtocolError& error) { | 65 const browser_sync::SyncProtocolError& error) { |
66 return (error.action != browser_sync::UNKNOWN_ACTION); | 66 return (error.action != browser_sync::UNKNOWN_ACTION); |
67 } | 67 } |
68 } // namespace | 68 } // namespace |
69 | 69 |
70 ConfigureParams::ConfigureParams() | |
71 : source(GetUpdatesCallerInfo::UNKNOWN), | |
72 need_encryption_key(false) {} | |
73 ConfigureParams::ConfigureParams( | |
74 const sync_pb::GetUpdatesCallerInfo::GetUpdatesSource& source, | |
75 const syncable::ModelTypeSet& types_to_config, | |
76 const browser_sync::ModelSafeRoutingInfo& routing_info, | |
77 bool need_encryption_key, | |
78 const base::Closure& ready_task) | |
79 : source(source), | |
80 types_to_config(types_to_config), | |
81 routing_info(routing_info), | |
82 need_encryption_key(need_encryption_key), | |
83 ready_task(ready_task) { | |
84 DCHECK(!ready_task.is_null()); | |
85 } | |
86 ConfigureParams::~ConfigureParams() {} | |
87 | |
70 SyncScheduler::DelayProvider::DelayProvider() {} | 88 SyncScheduler::DelayProvider::DelayProvider() {} |
71 SyncScheduler::DelayProvider::~DelayProvider() {} | 89 SyncScheduler::DelayProvider::~DelayProvider() {} |
72 | 90 |
73 SyncScheduler::WaitInterval::WaitInterval() | 91 SyncScheduler::WaitInterval::WaitInterval() |
74 : mode(UNKNOWN), | 92 : mode(UNKNOWN), |
75 had_nudge(false) { | 93 had_nudge(false) { |
76 } | 94 } |
77 | 95 |
78 SyncScheduler::WaitInterval::~WaitInterval() {} | 96 SyncScheduler::WaitInterval::~WaitInterval() {} |
79 | 97 |
(...skipping 12 matching lines...) Expand all Loading... | |
92 SyncScheduler::SyncSessionJob::SyncSessionJob() | 110 SyncScheduler::SyncSessionJob::SyncSessionJob() |
93 : purpose(UNKNOWN), | 111 : purpose(UNKNOWN), |
94 is_canary_job(false) { | 112 is_canary_job(false) { |
95 } | 113 } |
96 | 114 |
97 SyncScheduler::SyncSessionJob::~SyncSessionJob() {} | 115 SyncScheduler::SyncSessionJob::~SyncSessionJob() {} |
98 | 116 |
99 SyncScheduler::SyncSessionJob::SyncSessionJob(SyncSessionJobPurpose purpose, | 117 SyncScheduler::SyncSessionJob::SyncSessionJob(SyncSessionJobPurpose purpose, |
100 base::TimeTicks start, | 118 base::TimeTicks start, |
101 linked_ptr<sessions::SyncSession> session, bool is_canary_job, | 119 linked_ptr<sessions::SyncSession> session, bool is_canary_job, |
102 const tracked_objects::Location& from_here) : purpose(purpose), | 120 ConfigureParams config_params, const tracked_objects::Location& from_here) |
103 scheduled_start(start), | 121 : purpose(purpose), |
104 session(session), | 122 scheduled_start(start), |
105 is_canary_job(is_canary_job), | 123 session(session), |
106 from_here(from_here) { | 124 is_canary_job(is_canary_job), |
125 config_params(config_params), | |
126 from_here(from_here) { | |
107 } | 127 } |
108 | 128 |
109 const char* SyncScheduler::SyncSessionJob::GetPurposeString( | 129 const char* SyncScheduler::SyncSessionJob::GetPurposeString( |
110 SyncScheduler::SyncSessionJob::SyncSessionJobPurpose purpose) { | 130 SyncScheduler::SyncSessionJob::SyncSessionJobPurpose purpose) { |
111 switch (purpose) { | 131 switch (purpose) { |
112 ENUM_CASE(UNKNOWN); | 132 ENUM_CASE(UNKNOWN); |
113 ENUM_CASE(POLL); | 133 ENUM_CASE(POLL); |
114 ENUM_CASE(NUDGE); | 134 ENUM_CASE(NUDGE); |
115 ENUM_CASE(CLEAR_USER_DATA); | 135 ENUM_CASE(CLEAR_USER_DATA); |
116 ENUM_CASE(CONFIGURATION); | 136 ENUM_CASE(CONFIGURATION); |
(...skipping 122 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
239 | 259 |
240 void SyncScheduler::UpdateServerConnectionManagerStatus( | 260 void SyncScheduler::UpdateServerConnectionManagerStatus( |
241 HttpResponse::ServerConnectionCode code) { | 261 HttpResponse::ServerConnectionCode code) { |
242 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 262 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
243 SDVLOG(2) << "New server connection code: " | 263 SDVLOG(2) << "New server connection code: " |
244 << HttpResponse::GetServerConnectionCodeString(code); | 264 << HttpResponse::GetServerConnectionCodeString(code); |
245 | 265 |
246 connection_code_ = code; | 266 connection_code_ = code; |
247 } | 267 } |
248 | 268 |
249 void SyncScheduler::Start(Mode mode, const base::Closure& callback) { | 269 void SyncScheduler::Start(Mode mode) { |
250 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 270 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
251 std::string thread_name = MessageLoop::current()->thread_name(); | 271 std::string thread_name = MessageLoop::current()->thread_name(); |
252 if (thread_name.empty()) | 272 if (thread_name.empty()) |
253 thread_name = "<Main thread>"; | 273 thread_name = "<Main thread>"; |
254 SDVLOG(2) << "Start called from thread " | 274 SDVLOG(2) << "Start called from thread " |
255 << thread_name << " with mode " << GetModeString(mode); | 275 << thread_name << " with mode " << GetModeString(mode); |
256 if (!started_) { | 276 if (!started_) { |
257 started_ = true; | 277 started_ = true; |
258 SendInitialSnapshot(); | 278 SendInitialSnapshot(); |
259 } | 279 } |
260 | 280 |
261 DCHECK(!session_context_->account_name().empty()); | 281 DCHECK(!session_context_->account_name().empty()); |
262 DCHECK(syncer_.get()); | 282 DCHECK(syncer_.get()); |
263 Mode old_mode = mode_; | 283 Mode old_mode = mode_; |
264 mode_ = mode; | 284 mode_ = mode; |
265 AdjustPolling(NULL); // Will kick start poll timer if needed. | 285 AdjustPolling(NULL); // Will kick start poll timer if needed. |
266 if (!callback.is_null()) | |
267 callback.Run(); | |
268 | 286 |
269 if (old_mode != mode_) { | 287 if (old_mode != mode_) { |
270 // We just changed our mode. See if there are any pending jobs that we could | 288 // We just changed our mode. See if there are any pending jobs that we could |
271 // execute in the new mode. | 289 // execute in the new mode. |
272 DoPendingJobIfPossible(false); | 290 DoPendingJobIfPossible(false); |
273 } | 291 } |
274 } | 292 } |
275 | 293 |
276 void SyncScheduler::SendInitialSnapshot() { | 294 void SyncScheduler::SendInitialSnapshot() { |
277 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 295 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
278 scoped_ptr<SyncSession> dummy(new SyncSession(session_context_, this, | 296 scoped_ptr<SyncSession> dummy(new SyncSession(session_context_, this, |
279 SyncSourceInfo(), ModelSafeRoutingInfo(), | 297 SyncSourceInfo(), ModelSafeRoutingInfo(), |
280 std::vector<ModelSafeWorker*>())); | 298 std::vector<ModelSafeWorker*>())); |
281 SyncEngineEvent event(SyncEngineEvent::STATUS_CHANGED); | 299 SyncEngineEvent event(SyncEngineEvent::STATUS_CHANGED); |
282 event.snapshot = dummy->TakeSnapshot(); | 300 event.snapshot = dummy->TakeSnapshot(); |
283 session_context_->NotifyListeners(event); | 301 session_context_->NotifyListeners(event); |
284 } | 302 } |
285 | 303 |
304 namespace { | |
305 | |
306 // Helper to extract the routing info and workers corresponding to types in | |
307 // |types| from |current_routes| and |current_workers|. | |
308 void BuildModelSafeParams( | |
309 const ModelTypeSet& types_to_config, | |
310 const ModelSafeRoutingInfo& current_routes, | |
311 const std::vector<ModelSafeWorker*>& current_workers, | |
312 ModelSafeRoutingInfo* result_routes, | |
313 std::vector<ModelSafeWorker*>* result_workers) { | |
314 std::set<ModelSafeGroup> active_groups; | |
315 active_groups.insert(GROUP_PASSIVE); | |
316 for (ModelTypeSet::Iterator iter = types_to_config.First(); iter.Good(); | |
317 iter.Inc()) { | |
318 syncable::ModelType type = iter.Get(); | |
319 ModelSafeRoutingInfo::const_iterator route = current_routes.find(type); | |
320 DCHECK(route != current_routes.end()); | |
321 ModelSafeGroup group = route->second; | |
322 (*result_routes)[type] = group; | |
323 active_groups.insert(group); | |
324 } | |
325 | |
326 for(std::vector<ModelSafeWorker*>::const_iterator iter = | |
327 current_workers.begin(); iter != current_workers.end(); ++iter) { | |
328 if (active_groups.count((*iter)->GetModelSafeGroup()) > 0) | |
329 result_workers->push_back(*iter); | |
330 } | |
331 } | |
332 | |
333 } // namespace. | |
334 | |
335 bool SyncScheduler::Configure(const ConfigureParams& params) { | |
336 DCHECK_EQ(MessageLoop::current(), sync_loop_); | |
337 DCHECK(IsConfigRelatedUpdateSourceValue(params.source)); | |
338 DCHECK_EQ(CONFIGURATION_MODE, mode_); | |
339 DCHECK(!params.ready_task.is_null()); | |
340 SDVLOG(2) << "Reconfiguring syncer."; | |
341 | |
342 // Only one configuration is allowed at a time. Verify we're not waiting | |
343 // for a pending configure job. | |
344 DCHECK(!wait_interval_.get() || !wait_interval_->pending_configure_job.get()); | |
345 | |
346 // We set the routing info for all enabled types in the session context, but | |
rlarocque
2012/06/09 01:44:35
This comment isn't very helpful. It describes wha
Nicolas Zea
2012/06/11 23:05:20
Removed comment.
| |
347 // the session for this configuration only knows about the routing info for | |
348 // those types_to_config (set via restricted_routes and restricted_workers). | |
349 browser_sync::ModelSafeRoutingInfo restricted_routes; | |
350 std::vector<ModelSafeWorker*> restricted_workers; | |
351 BuildModelSafeParams(params.types_to_config, | |
352 params.routing_info, | |
353 session_context_->workers(), | |
354 &restricted_routes, | |
355 &restricted_workers); | |
rlarocque
2012/06/09 01:44:35
I'm pretty sure Fred's work to only run the ModelS
Nicolas Zea
2012/06/11 23:05:20
Added TODO.
| |
356 session_context_->set_routing_info(params.routing_info); | |
357 | |
358 // TODO(sync): if it's confirmed that Cleanup has no effect on non-configures, | |
359 // remove this command and add a call to PurgeEntriesWithTypeIn here. This | |
360 // will also allow us to get rid of previous_session_routing_info. | |
361 SyncSessionJob cleanup_job( | |
362 SyncSessionJob::CLEANUP_DISABLED_TYPES, | |
363 TimeTicks::Now(), | |
364 make_linked_ptr(CreateSyncSession(SyncSourceInfo())), | |
365 false, | |
366 ConfigureParams(), | |
367 FROM_HERE); | |
368 DoSyncSessionJob(cleanup_job); | |
369 | |
370 if (params.need_encryption_key) { | |
371 // TODO(zea): implement in such a way that we can handle failures and the | |
372 // subsequent retrys the scheduler might perform. | |
373 NOTIMPLEMENTED(); | |
374 } | |
375 | |
376 // Only reconfigure if we have types to config. | |
377 if (!params.types_to_config.Empty()) { | |
378 DCHECK(!restricted_routes.empty()); | |
379 // TODO(tim): config-specific GetUpdatesCallerInfo value? | |
380 linked_ptr<SyncSession> session(new SyncSession( | |
381 session_context_, | |
382 this, | |
383 SyncSourceInfo(params.source, | |
384 syncable::ModelTypePayloadMapFromRoutingInfo( | |
385 restricted_routes, | |
386 std::string())), | |
387 restricted_routes, | |
388 restricted_workers)); | |
389 SyncSessionJob job(SyncSessionJob::CONFIGURATION, | |
390 TimeTicks::Now(), | |
391 session, | |
392 false, | |
393 params, | |
394 FROM_HERE); | |
395 DoSyncSessionJob(job); | |
396 | |
397 // If we failed, the job would have been saved as the pending configure | |
398 // job and a wait interval would have been set. | |
399 if (!session->Succeeded()) { | |
400 DCHECK(wait_interval_.get() && | |
401 wait_interval_->pending_configure_job.get()); | |
402 return false; | |
403 } | |
404 } else { | |
405 SDVLOG(2) << "No change in routing info, calling ready task directly."; | |
406 params.ready_task.Run(); | |
407 } | |
408 | |
409 return true; | |
410 } | |
411 | |
286 SyncScheduler::JobProcessDecision SyncScheduler::DecideWhileInWaitInterval( | 412 SyncScheduler::JobProcessDecision SyncScheduler::DecideWhileInWaitInterval( |
287 const SyncSessionJob& job) { | 413 const SyncSessionJob& job) { |
288 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 414 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
289 DCHECK(wait_interval_.get()); | 415 DCHECK(wait_interval_.get()); |
290 DCHECK_NE(job.purpose, SyncSessionJob::CLEAR_USER_DATA); | 416 DCHECK_NE(job.purpose, SyncSessionJob::CLEAR_USER_DATA); |
291 DCHECK_NE(job.purpose, SyncSessionJob::CLEANUP_DISABLED_TYPES); | 417 DCHECK_NE(job.purpose, SyncSessionJob::CLEANUP_DISABLED_TYPES); |
292 | 418 |
293 SDVLOG(2) << "DecideWhileInWaitInterval with WaitInterval mode " | 419 SDVLOG(2) << "DecideWhileInWaitInterval with WaitInterval mode " |
294 << WaitInterval::GetModeString(wait_interval_->mode) | 420 << WaitInterval::GetModeString(wait_interval_->mode) |
295 << (wait_interval_->had_nudge ? " (had nudge)" : "") | 421 << (wait_interval_->had_nudge ? " (had nudge)" : "") |
(...skipping 78 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
374 void SyncScheduler::InitOrCoalescePendingJob(const SyncSessionJob& job) { | 500 void SyncScheduler::InitOrCoalescePendingJob(const SyncSessionJob& job) { |
375 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 501 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
376 DCHECK(job.purpose != SyncSessionJob::CONFIGURATION); | 502 DCHECK(job.purpose != SyncSessionJob::CONFIGURATION); |
377 if (pending_nudge_.get() == NULL) { | 503 if (pending_nudge_.get() == NULL) { |
378 SDVLOG(2) << "Creating a pending nudge job"; | 504 SDVLOG(2) << "Creating a pending nudge job"; |
379 SyncSession* s = job.session.get(); | 505 SyncSession* s = job.session.get(); |
380 scoped_ptr<SyncSession> session(new SyncSession(s->context(), | 506 scoped_ptr<SyncSession> session(new SyncSession(s->context(), |
381 s->delegate(), s->source(), s->routing_info(), s->workers())); | 507 s->delegate(), s->source(), s->routing_info(), s->workers())); |
382 | 508 |
383 SyncSessionJob new_job(SyncSessionJob::NUDGE, job.scheduled_start, | 509 SyncSessionJob new_job(SyncSessionJob::NUDGE, job.scheduled_start, |
384 make_linked_ptr(session.release()), false, job.from_here); | 510 make_linked_ptr(session.release()), false, |
511 ConfigureParams(), job.from_here); | |
385 pending_nudge_.reset(new SyncSessionJob(new_job)); | 512 pending_nudge_.reset(new SyncSessionJob(new_job)); |
386 | 513 |
387 return; | 514 return; |
388 } | 515 } |
389 | 516 |
390 SDVLOG(2) << "Coalescing a pending nudge"; | 517 SDVLOG(2) << "Coalescing a pending nudge"; |
391 pending_nudge_->session->Coalesce(*(job.session.get())); | 518 pending_nudge_->session->Coalesce(*(job.session.get())); |
392 pending_nudge_->scheduled_start = job.scheduled_start; | 519 pending_nudge_->scheduled_start = job.scheduled_start; |
393 | 520 |
394 // Unfortunately the nudge location cannot be modified. So it stores the | 521 // Unfortunately the nudge location cannot be modified. So it stores the |
(...skipping 25 matching lines...) Expand all Loading... | |
420 // TODO(sync): Should we also check that job.purpose != | 547 // TODO(sync): Should we also check that job.purpose != |
421 // CLEANUP_DISABLED_TYPES? (See http://crbug.com/90868.) | 548 // CLEANUP_DISABLED_TYPES? (See http://crbug.com/90868.) |
422 if (job.purpose == SyncSessionJob::NUDGE) { | 549 if (job.purpose == SyncSessionJob::NUDGE) { |
423 SDVLOG(2) << "Saving a nudge job"; | 550 SDVLOG(2) << "Saving a nudge job"; |
424 InitOrCoalescePendingJob(job); | 551 InitOrCoalescePendingJob(job); |
425 } else if (job.purpose == SyncSessionJob::CONFIGURATION){ | 552 } else if (job.purpose == SyncSessionJob::CONFIGURATION){ |
426 SDVLOG(2) << "Saving a configuration job"; | 553 SDVLOG(2) << "Saving a configuration job"; |
427 DCHECK(wait_interval_.get()); | 554 DCHECK(wait_interval_.get()); |
428 DCHECK(mode_ == CONFIGURATION_MODE); | 555 DCHECK(mode_ == CONFIGURATION_MODE); |
429 | 556 |
557 // Config params should always get set. | |
558 DCHECK(!job.config_params.ready_task.is_null()); | |
430 SyncSession* old = job.session.get(); | 559 SyncSession* old = job.session.get(); |
431 SyncSession* s(new SyncSession(session_context_, this, old->source(), | 560 SyncSession* s(new SyncSession(session_context_, this, old->source(), |
432 old->routing_info(), old->workers())); | 561 old->routing_info(), old->workers())); |
433 SyncSessionJob new_job(job.purpose, TimeTicks::Now(), | 562 SyncSessionJob new_job(job.purpose, |
434 make_linked_ptr(s), false, job.from_here); | 563 TimeTicks::Now(), |
564 make_linked_ptr(s), | |
565 false, | |
566 job.config_params, | |
567 job.from_here); | |
435 wait_interval_->pending_configure_job.reset(new SyncSessionJob(new_job)); | 568 wait_interval_->pending_configure_job.reset(new SyncSessionJob(new_job)); |
436 } // drop the rest. | 569 } // drop the rest. |
437 // TODO(sync): Is it okay to drop the rest? It's weird that | 570 // TODO(sync): Is it okay to drop the rest? It's weird that |
438 // SaveJob() only does what it says sometimes. (See | 571 // SaveJob() only does what it says sometimes. (See |
439 // http://crbug.com/90868.) | 572 // http://crbug.com/90868.) |
440 } | 573 } |
441 | 574 |
442 // Functor for std::find_if to search by ModelSafeGroup. | 575 // Functor for std::find_if to search by ModelSafeGroup. |
443 struct ModelSafeWorkerGroupIs { | 576 struct ModelSafeWorkerGroupIs { |
444 explicit ModelSafeWorkerGroupIs(ModelSafeGroup group) : group(group) {} | 577 explicit ModelSafeWorkerGroupIs(ModelSafeGroup group) : group(group) {} |
445 bool operator()(ModelSafeWorker* w) { | 578 bool operator()(ModelSafeWorker* w) { |
446 return group == w->GetModelSafeGroup(); | 579 return group == w->GetModelSafeGroup(); |
447 } | 580 } |
448 ModelSafeGroup group; | 581 ModelSafeGroup group; |
449 }; | 582 }; |
450 | 583 |
451 void SyncScheduler::ClearUserData() { | 584 void SyncScheduler::ClearUserData() { |
452 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 585 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
453 SyncSessionJob job(SyncSessionJob::CLEAR_USER_DATA, TimeTicks::Now(), | 586 SyncSessionJob job(SyncSessionJob::CLEAR_USER_DATA, |
587 TimeTicks::Now(), | |
454 make_linked_ptr(CreateSyncSession(SyncSourceInfo())), | 588 make_linked_ptr(CreateSyncSession(SyncSourceInfo())), |
455 false, | 589 false, |
590 ConfigureParams(), | |
456 FROM_HERE); | 591 FROM_HERE); |
457 | 592 |
458 DoSyncSessionJob(job); | 593 DoSyncSessionJob(job); |
459 } | 594 } |
460 | 595 |
461 void SyncScheduler::CleanupDisabledTypes() { | |
462 DCHECK_EQ(MessageLoop::current(), sync_loop_); | |
463 SyncSessionJob job(SyncSessionJob::CLEANUP_DISABLED_TYPES, TimeTicks::Now(), | |
464 make_linked_ptr(CreateSyncSession(SyncSourceInfo())), | |
465 false, | |
466 FROM_HERE); | |
467 DoSyncSessionJob(job); | |
468 } | |
469 | |
470 void SyncScheduler::ScheduleNudge( | 596 void SyncScheduler::ScheduleNudge( |
471 const TimeDelta& delay, | 597 const TimeDelta& delay, |
472 NudgeSource source, ModelTypeSet types, | 598 NudgeSource source, ModelTypeSet types, |
473 const tracked_objects::Location& nudge_location) { | 599 const tracked_objects::Location& nudge_location) { |
474 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 600 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
475 SDVLOG_LOC(nudge_location, 2) | 601 SDVLOG_LOC(nudge_location, 2) |
476 << "Nudge scheduled with delay " << delay.InMilliseconds() << " ms, " | 602 << "Nudge scheduled with delay " << delay.InMilliseconds() << " ms, " |
477 << "source " << GetNudgeSourceString(source) << ", " | 603 << "source " << GetNudgeSourceString(source) << ", " |
478 << "types " << ModelTypeSetToString(types); | 604 << "types " << ModelTypeSetToString(types); |
479 | 605 |
(...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
517 << "source " << GetUpdatesSourceString(source) << ", " | 643 << "source " << GetUpdatesSourceString(source) << ", " |
518 << "payloads " | 644 << "payloads " |
519 << syncable::ModelTypePayloadMapToString(types_with_payloads) | 645 << syncable::ModelTypePayloadMapToString(types_with_payloads) |
520 << (is_canary_job ? " (canary)" : ""); | 646 << (is_canary_job ? " (canary)" : ""); |
521 | 647 |
522 SyncSourceInfo info(source, types_with_payloads); | 648 SyncSourceInfo info(source, types_with_payloads); |
523 | 649 |
524 SyncSession* session(CreateSyncSession(info)); | 650 SyncSession* session(CreateSyncSession(info)); |
525 SyncSessionJob job(SyncSessionJob::NUDGE, TimeTicks::Now() + delay, | 651 SyncSessionJob job(SyncSessionJob::NUDGE, TimeTicks::Now() + delay, |
526 make_linked_ptr(session), is_canary_job, | 652 make_linked_ptr(session), is_canary_job, |
527 nudge_location); | 653 ConfigureParams(), nudge_location); |
528 | 654 |
529 session = NULL; | 655 session = NULL; |
530 if (!ShouldRunJob(job)) | 656 if (!ShouldRunJob(job)) |
531 return; | 657 return; |
532 | 658 |
533 if (pending_nudge_.get()) { | 659 if (pending_nudge_.get()) { |
534 if (IsBackingOff() && delay > TimeDelta::FromSeconds(1)) { | 660 if (IsBackingOff() && delay > TimeDelta::FromSeconds(1)) { |
535 SDVLOG(2) << "Dropping the nudge because we are in backoff"; | 661 SDVLOG(2) << "Dropping the nudge because we are in backoff"; |
536 return; | 662 return; |
537 } | 663 } |
(...skipping 10 matching lines...) Expand all Loading... | |
548 job.scheduled_start = std::min(job.scheduled_start, | 674 job.scheduled_start = std::min(job.scheduled_start, |
549 pending_nudge_->scheduled_start); | 675 pending_nudge_->scheduled_start); |
550 pending_nudge_.reset(); | 676 pending_nudge_.reset(); |
551 } | 677 } |
552 | 678 |
553 // TODO(zea): Consider adding separate throttling/backoff for datatype | 679 // TODO(zea): Consider adding separate throttling/backoff for datatype |
554 // refresh requests. | 680 // refresh requests. |
555 ScheduleSyncSessionJob(job); | 681 ScheduleSyncSessionJob(job); |
556 } | 682 } |
557 | 683 |
558 // Helper to extract the routing info and workers corresponding to types in | |
559 // |types| from |current_routes| and |current_workers|. | |
560 void GetModelSafeParamsForTypes(ModelTypeSet types, | |
561 const ModelSafeRoutingInfo& current_routes, | |
562 const std::vector<ModelSafeWorker*>& current_workers, | |
563 ModelSafeRoutingInfo* result_routes, | |
564 std::vector<ModelSafeWorker*>* result_workers) { | |
565 bool passive_group_added = false; | |
566 | |
567 typedef std::vector<ModelSafeWorker*>::const_iterator iter; | |
568 for (ModelTypeSet::Iterator it = types.First(); | |
569 it.Good(); it.Inc()) { | |
570 const syncable::ModelType t = it.Get(); | |
571 ModelSafeRoutingInfo::const_iterator route = current_routes.find(t); | |
572 DCHECK(route != current_routes.end()); | |
573 ModelSafeGroup group = route->second; | |
574 | |
575 (*result_routes)[t] = group; | |
576 iter w_tmp_it = std::find_if(current_workers.begin(), current_workers.end(), | |
577 ModelSafeWorkerGroupIs(group)); | |
578 if (w_tmp_it != current_workers.end()) { | |
579 iter result_workers_it = std::find_if( | |
580 result_workers->begin(), result_workers->end(), | |
581 ModelSafeWorkerGroupIs(group)); | |
582 if (result_workers_it == result_workers->end()) | |
583 result_workers->push_back(*w_tmp_it); | |
584 | |
585 if (group == GROUP_PASSIVE) | |
586 passive_group_added = true; | |
587 } else { | |
588 NOTREACHED(); | |
589 } | |
590 } | |
591 | |
592 // Always add group passive. | |
593 if (passive_group_added == false) { | |
594 iter it = std::find_if(current_workers.begin(), current_workers.end(), | |
595 ModelSafeWorkerGroupIs(GROUP_PASSIVE)); | |
596 if (it != current_workers.end()) | |
597 result_workers->push_back(*it); | |
598 else | |
599 NOTREACHED(); | |
600 } | |
601 } | |
602 | |
603 void SyncScheduler::Configure( | |
604 ModelTypeSet types, | |
605 GetUpdatesCallerInfo::GetUpdatesSource source) { | |
606 DCHECK_EQ(MessageLoop::current(), sync_loop_); | |
607 DCHECK(IsConfigRelatedUpdateSourceValue(source)); | |
608 SDVLOG(2) << "Performing a config"; | |
609 | |
610 ModelSafeRoutingInfo routes; | |
611 std::vector<ModelSafeWorker*> workers; | |
612 GetModelSafeParamsForTypes(types, | |
613 session_context_->routing_info(), | |
614 session_context_->workers(), | |
615 &routes, &workers); | |
616 | |
617 // TODO(tim): config-specific GetUpdatesCallerInfo value? | |
618 SyncSession* session = new SyncSession(session_context_, this, | |
619 SyncSourceInfo(source, | |
620 syncable::ModelTypePayloadMapFromRoutingInfo( | |
621 routes, std::string())), | |
622 routes, workers); | |
623 SyncSessionJob job(SyncSessionJob::CONFIGURATION, TimeTicks::Now(), | |
624 make_linked_ptr(session), | |
625 false, | |
626 FROM_HERE); | |
627 DoSyncSessionJob(job); | |
628 } | |
629 | |
630 const char* SyncScheduler::GetModeString(SyncScheduler::Mode mode) { | 684 const char* SyncScheduler::GetModeString(SyncScheduler::Mode mode) { |
631 switch (mode) { | 685 switch (mode) { |
632 ENUM_CASE(CONFIGURATION_MODE); | 686 ENUM_CASE(CONFIGURATION_MODE); |
633 ENUM_CASE(NORMAL_MODE); | 687 ENUM_CASE(NORMAL_MODE); |
634 } | 688 } |
635 return ""; | 689 return ""; |
636 } | 690 } |
637 | 691 |
638 const char* SyncScheduler::GetDecisionString( | 692 const char* SyncScheduler::GetDecisionString( |
639 SyncScheduler::JobProcessDecision mode) { | 693 SyncScheduler::JobProcessDecision mode) { |
(...skipping 200 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
840 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 894 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
841 DCHECK(!old_job.session->HasMoreToSync()); | 895 DCHECK(!old_job.session->HasMoreToSync()); |
842 | 896 |
843 AdjustPolling(&old_job); | 897 AdjustPolling(&old_job); |
844 | 898 |
845 if (old_job.session->Succeeded()) { | 899 if (old_job.session->Succeeded()) { |
846 // Only reset backoff if we actually reached the server. | 900 // Only reset backoff if we actually reached the server. |
847 if (old_job.session->SuccessfullyReachedServer()) | 901 if (old_job.session->SuccessfullyReachedServer()) |
848 wait_interval_.reset(); | 902 wait_interval_.reset(); |
849 SDVLOG(2) << "Job succeeded so not scheduling more jobs"; | 903 SDVLOG(2) << "Job succeeded so not scheduling more jobs"; |
904 | |
905 // If this was a configuration job with a ready task, invoke it now that | |
906 // we finished successfully. | |
907 if (!old_job.config_params.ready_task.is_null()) | |
908 old_job.config_params.ready_task.Run(); | |
850 return; | 909 return; |
851 } | 910 } |
852 | 911 |
853 if (old_job.purpose == SyncSessionJob::POLL) { | 912 if (old_job.purpose == SyncSessionJob::POLL) { |
854 return; // We don't retry POLL jobs. | 913 return; // We don't retry POLL jobs. |
855 } | 914 } |
856 | 915 |
857 // TODO(rlarocque): There's no reason why we should blindly backoff and retry | 916 // TODO(rlarocque): There's no reason why we should blindly backoff and retry |
858 // if we don't succeed. Some types of errors are not likely to disappear on | 917 // if we don't succeed. Some types of errors are not likely to disappear on |
859 // their own. With the return values now available in the old_job.session, we | 918 // their own. With the return values now available in the old_job.session, we |
(...skipping 65 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
925 | 984 |
926 SDVLOG(2) << "In handle continuation error with " | 985 SDVLOG(2) << "In handle continuation error with " |
927 << SyncSessionJob::GetPurposeString(old_job.purpose) | 986 << SyncSessionJob::GetPurposeString(old_job.purpose) |
928 << " job. The time delta(ms) is " | 987 << " job. The time delta(ms) is " |
929 << length.InMilliseconds(); | 988 << length.InMilliseconds(); |
930 | 989 |
931 // This will reset the had_nudge variable as well. | 990 // This will reset the had_nudge variable as well. |
932 wait_interval_.reset(new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF, | 991 wait_interval_.reset(new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF, |
933 length)); | 992 length)); |
934 if (old_job.purpose == SyncSessionJob::CONFIGURATION) { | 993 if (old_job.purpose == SyncSessionJob::CONFIGURATION) { |
994 SDVLOG(2) << "Configuration did not succeed, scheduling retry."; | |
995 // Config params should always get set. | |
996 DCHECK(!old_job.config_params.ready_task.is_null()); | |
935 SyncSession* old = old_job.session.get(); | 997 SyncSession* old = old_job.session.get(); |
936 SyncSession* s(new SyncSession(session_context_, this, | 998 SyncSession* s(new SyncSession(session_context_, this, |
937 old->source(), old->routing_info(), old->workers())); | 999 old->source(), old->routing_info(), old->workers())); |
938 SyncSessionJob job(old_job.purpose, TimeTicks::Now() + length, | 1000 SyncSessionJob job(old_job.purpose, TimeTicks::Now() + length, |
939 make_linked_ptr(s), false, FROM_HERE); | 1001 make_linked_ptr(s), false, old_job.config_params, |
1002 FROM_HERE); | |
940 wait_interval_->pending_configure_job.reset(new SyncSessionJob(job)); | 1003 wait_interval_->pending_configure_job.reset(new SyncSessionJob(job)); |
941 } else { | 1004 } else { |
942 // We are not in configuration mode. So wait_interval's pending job | 1005 // We are not in configuration mode. So wait_interval's pending job |
943 // should be null. | 1006 // should be null. |
944 DCHECK(wait_interval_->pending_configure_job.get() == NULL); | 1007 DCHECK(wait_interval_->pending_configure_job.get() == NULL); |
945 | 1008 |
946 // TODO(lipalani) - handle clear user data. | 1009 // TODO(lipalani) - handle clear user data. |
947 InitOrCoalescePendingJob(old_job); | 1010 InitOrCoalescePendingJob(old_job); |
948 } | 1011 } |
949 RestartWaiting(); | 1012 RestartWaiting(); |
(...skipping 101 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
1051 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 1114 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
1052 ModelSafeRoutingInfo r; | 1115 ModelSafeRoutingInfo r; |
1053 ModelTypePayloadMap types_with_payloads = | 1116 ModelTypePayloadMap types_with_payloads = |
1054 syncable::ModelTypePayloadMapFromRoutingInfo(r, std::string()); | 1117 syncable::ModelTypePayloadMapFromRoutingInfo(r, std::string()); |
1055 SyncSourceInfo info(GetUpdatesCallerInfo::PERIODIC, types_with_payloads); | 1118 SyncSourceInfo info(GetUpdatesCallerInfo::PERIODIC, types_with_payloads); |
1056 SyncSession* s = CreateSyncSession(info); | 1119 SyncSession* s = CreateSyncSession(info); |
1057 | 1120 |
1058 SyncSessionJob job(SyncSessionJob::POLL, TimeTicks::Now(), | 1121 SyncSessionJob job(SyncSessionJob::POLL, TimeTicks::Now(), |
1059 make_linked_ptr(s), | 1122 make_linked_ptr(s), |
1060 false, | 1123 false, |
1124 ConfigureParams(), | |
1061 FROM_HERE); | 1125 FROM_HERE); |
1062 | 1126 |
1063 ScheduleSyncSessionJob(job); | 1127 ScheduleSyncSessionJob(job); |
1064 } | 1128 } |
1065 | 1129 |
1066 void SyncScheduler::Unthrottle() { | 1130 void SyncScheduler::Unthrottle() { |
1067 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 1131 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
1068 DCHECK_EQ(WaitInterval::THROTTLED, wait_interval_->mode); | 1132 DCHECK_EQ(WaitInterval::THROTTLED, wait_interval_->mode); |
1069 SDVLOG(2) << "Unthrottled."; | 1133 SDVLOG(2) << "Unthrottled."; |
1070 DoCanaryJob(); | 1134 DoCanaryJob(); |
(...skipping 82 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
1153 | 1217 |
1154 #undef SDVLOG_LOC | 1218 #undef SDVLOG_LOC |
1155 | 1219 |
1156 #undef SDVLOG | 1220 #undef SDVLOG |
1157 | 1221 |
1158 #undef SLOG | 1222 #undef SLOG |
1159 | 1223 |
1160 #undef ENUM_CASE | 1224 #undef ENUM_CASE |
1161 | 1225 |
1162 } // browser_sync | 1226 } // browser_sync |
OLD | NEW |