Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(610)

Side by Side Diff: chrome/browser/sync/engine/syncer_thread.cc

Issue 386030: Relieve SyncerSession,SyncCycleState, SyncProcessState, SyncerSession, Syncer... (Closed) Base URL: svn://chrome-svn/chrome/trunk/src/
Patch Set: '' Created 11 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2009 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2009 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 #include "chrome/browser/sync/engine/syncer_thread.h" 4 #include "chrome/browser/sync/engine/syncer_thread.h"
5 5
6 #include "build/build_config.h" 6 #include "build/build_config.h"
7 7
8 #if defined(OS_MACOSX) 8 #if defined(OS_MACOSX)
9 #include <CoreFoundation/CFNumber.h> 9 #include <CoreFoundation/CFNumber.h>
10 #include <IOKit/IOTypes.h> 10 #include <IOKit/IOTypes.h>
11 #include <IOKit/IOKitLib.h> 11 #include <IOKit/IOKitLib.h>
12 #endif 12 #endif
13 13
14 #include <algorithm> 14 #include <algorithm>
15 #include <map> 15 #include <map>
16 #include <queue> 16 #include <queue>
17 17
18 #include "base/command_line.h" 18 #include "base/command_line.h"
19 #include "chrome/browser/sync/engine/auth_watcher.h" 19 #include "chrome/browser/sync/engine/auth_watcher.h"
20 #include "chrome/browser/sync/engine/model_safe_worker.h" 20 #include "chrome/browser/sync/engine/model_safe_worker.h"
21 #include "chrome/browser/sync/engine/net/server_connection_manager.h" 21 #include "chrome/browser/sync/engine/net/server_connection_manager.h"
22 #include "chrome/browser/sync/engine/syncer.h" 22 #include "chrome/browser/sync/engine/syncer.h"
23 #include "chrome/browser/sync/engine/syncer_thread_timed_stop.h"
24 #include "chrome/browser/sync/notifier/listener/talk_mediator.h" 23 #include "chrome/browser/sync/notifier/listener/talk_mediator.h"
25 #include "chrome/browser/sync/notifier/listener/talk_mediator_impl.h" 24 #include "chrome/browser/sync/notifier/listener/talk_mediator_impl.h"
26 #include "chrome/browser/sync/syncable/directory_manager.h" 25 #include "chrome/browser/sync/syncable/directory_manager.h"
27 #include "chrome/common/chrome_switches.h" 26 #include "chrome/common/chrome_switches.h"
28 27
29 using std::priority_queue; 28 using std::priority_queue;
30 using std::min; 29 using std::min;
31 using base::Time; 30 using base::Time;
32 using base::TimeDelta; 31 using base::TimeDelta;
33 using base::TimeTicks; 32 using base::TimeTicks;
(...skipping 72 matching lines...) Expand 10 before | Expand all | Expand 10 after
106 } 105 }
107 106
108 } // namespace 107 } // namespace
109 108
110 namespace browser_sync { 109 namespace browser_sync {
111 110
112 const int SyncerThread::kDefaultShortPollIntervalSeconds = 60; 111 const int SyncerThread::kDefaultShortPollIntervalSeconds = 60;
113 const int SyncerThread::kDefaultLongPollIntervalSeconds = 3600; 112 const int SyncerThread::kDefaultLongPollIntervalSeconds = 3600;
114 const int SyncerThread::kDefaultMaxPollIntervalMs = 30 * 60 * 1000; 113 const int SyncerThread::kDefaultMaxPollIntervalMs = 30 * 60 * 1000;
115 114
116 SyncerThread* SyncerThreadFactory::Create(
117 ClientCommandChannel* command_channel,
118 syncable::DirectoryManager* mgr,
119 ServerConnectionManager* connection_manager, AllStatus* all_status,
120 ModelSafeWorker* model_safe_worker) {
121 const CommandLine* cmd = CommandLine::ForCurrentProcess();
122 if (cmd->HasSwitch(switches::kSyncerThreadTimedStop)) {
123 return new SyncerThreadTimedStop(command_channel, mgr, connection_manager,
124 all_status, model_safe_worker);
125 } else {
126 // The default SyncerThread implementation, which does not time-out when
127 // Stop is called.
128 return new SyncerThread(command_channel, mgr, connection_manager,
129 all_status, model_safe_worker);
130 }
131 }
132
133 void SyncerThread::NudgeSyncer(int milliseconds_from_now, NudgeSource source) { 115 void SyncerThread::NudgeSyncer(int milliseconds_from_now, NudgeSource source) {
134 AutoLock lock(lock_); 116 AutoLock lock(lock_);
135 if (vault_.syncer_ == NULL) { 117 if (vault_.syncer_ == NULL) {
136 return; 118 return;
137 } 119 }
138 120
139 NudgeSyncImpl(milliseconds_from_now, source); 121 NudgeSyncImpl(milliseconds_from_now, source);
140 } 122 }
141 123
142 SyncerThread::SyncerThread() 124 SyncerThread::SyncerThread(sessions::SyncSessionContext* context,
125 AllStatus* all_status)
143 : thread_main_started_(false, false), 126 : thread_main_started_(false, false),
144 thread_("SyncEngine_SyncerThread"), 127 thread_("SyncEngine_SyncerThread"),
145 vault_field_changed_(&lock_), 128 vault_field_changed_(&lock_),
146 p2p_authenticated_(false), 129 p2p_authenticated_(false),
147 p2p_subscribed_(false), 130 p2p_subscribed_(false),
148 client_command_hookup_(NULL),
149 conn_mgr_hookup_(NULL), 131 conn_mgr_hookup_(NULL),
150 allstatus_(NULL), 132 allstatus_(all_status),
151 dirman_(NULL),
152 scm_(NULL),
153 syncer_short_poll_interval_seconds_(kDefaultShortPollIntervalSeconds), 133 syncer_short_poll_interval_seconds_(kDefaultShortPollIntervalSeconds),
154 syncer_long_poll_interval_seconds_(kDefaultLongPollIntervalSeconds), 134 syncer_long_poll_interval_seconds_(kDefaultLongPollIntervalSeconds),
155 syncer_polling_interval_(kDefaultShortPollIntervalSeconds), 135 syncer_polling_interval_(kDefaultShortPollIntervalSeconds),
156 syncer_max_interval_(kDefaultMaxPollIntervalMs), 136 syncer_max_interval_(kDefaultMaxPollIntervalMs),
157 talk_mediator_hookup_(NULL), 137 talk_mediator_hookup_(NULL),
158 command_channel_(NULL),
159 directory_manager_hookup_(NULL), 138 directory_manager_hookup_(NULL),
160 syncer_events_(NULL), 139 syncer_events_(NULL),
161 model_safe_worker_(NULL), 140 session_context_(context),
162 disable_idle_detection_(false) { 141 disable_idle_detection_(false) {
163 } 142 DCHECK(context);
143 syncer_event_relay_channel_.reset(new SyncerEventChannel(SyncerEvent(
144 SyncerEvent::SHUTDOWN_USE_WITH_CARE)));
164 145
165 SyncerThread::SyncerThread( 146 if (context->directory_manager()) {
166 ClientCommandChannel* command_channel,
167 syncable::DirectoryManager* mgr,
168 ServerConnectionManager* connection_manager,
169 AllStatus* all_status,
170 ModelSafeWorker* model_safe_worker)
171 : thread_main_started_(false, false),
172 thread_("SyncEngine_SyncerThread"),
173 vault_field_changed_(&lock_),
174 p2p_authenticated_(false),
175 p2p_subscribed_(false),
176 client_command_hookup_(NULL),
177 conn_mgr_hookup_(NULL),
178 allstatus_(all_status),
179 dirman_(mgr),
180 scm_(connection_manager),
181 syncer_short_poll_interval_seconds_(kDefaultShortPollIntervalSeconds),
182 syncer_long_poll_interval_seconds_(kDefaultLongPollIntervalSeconds),
183 syncer_polling_interval_(kDefaultShortPollIntervalSeconds),
184 syncer_max_interval_(kDefaultMaxPollIntervalMs),
185 talk_mediator_hookup_(NULL),
186 command_channel_(command_channel),
187 directory_manager_hookup_(NULL),
188 syncer_events_(NULL),
189 model_safe_worker_(model_safe_worker),
190 disable_idle_detection_(false) {
191
192 SyncerEvent shutdown = { SyncerEvent::SHUTDOWN_USE_WITH_CARE };
193 syncer_event_channel_.reset(new SyncerEventChannel(shutdown));
194
195 if (dirman_) {
196 directory_manager_hookup_.reset(NewEventListenerHookup( 147 directory_manager_hookup_.reset(NewEventListenerHookup(
197 dirman_->channel(), this, &SyncerThread::HandleDirectoryManagerEvent)); 148 context->directory_manager()->channel(), this,
149 &SyncerThread::HandleDirectoryManagerEvent));
198 } 150 }
199 151
200 if (scm_) { 152 if (context->connection_manager())
201 WatchConnectionManager(scm_); 153 WatchConnectionManager(context->connection_manager());
202 }
203 154
204 if (command_channel_) {
205 WatchClientCommands(command_channel_);
206 }
207 } 155 }
208 156
209 SyncerThread::~SyncerThread() { 157 SyncerThread::~SyncerThread() {
210 client_command_hookup_.reset();
211 conn_mgr_hookup_.reset(); 158 conn_mgr_hookup_.reset();
212 syncer_event_channel_.reset(); 159 syncer_event_relay_channel_.reset();
213 directory_manager_hookup_.reset(); 160 directory_manager_hookup_.reset();
214 syncer_events_.reset(); 161 syncer_events_.reset();
215 delete vault_.syncer_; 162 delete vault_.syncer_;
216 talk_mediator_hookup_.reset(); 163 talk_mediator_hookup_.reset();
217 CHECK(!thread_.IsRunning()); 164 CHECK(!thread_.IsRunning());
218 } 165 }
219 166
220 // Creates and starts a syncer thread. 167 // Creates and starts a syncer thread.
221 // Returns true if it creates a thread or if there's currently a thread running 168 // Returns true if it creates a thread or if there's currently a thread running
222 // and false otherwise. 169 // and false otherwise.
(...skipping 50 matching lines...) Expand 10 before | Expand all | Expand 10 after
273 // the lock, and join on our internal thread which should soon run off the 220 // the lock, and join on our internal thread which should soon run off the
274 // end of ThreadMain. 221 // end of ThreadMain.
275 vault_field_changed_.Broadcast(); 222 vault_field_changed_.Broadcast();
276 } 223 }
277 224
278 // This will join, and finish when ThreadMain terminates. 225 // This will join, and finish when ThreadMain terminates.
279 thread_.Stop(); 226 thread_.Stop();
280 return true; 227 return true;
281 } 228 }
282 229
283 void SyncerThread::WatchClientCommands(ClientCommandChannel* channel) { 230 void SyncerThread::OnReceivedLongPollIntervalUpdate(
284 AutoLock lock(lock_); 231 const base::TimeDelta& new_interval) {
285 client_command_hookup_.reset(NewEventListenerHookup(channel, this, 232 syncer_long_poll_interval_seconds_ = static_cast<int>(
286 &SyncerThread::HandleClientCommand)); 233 new_interval.InSeconds());
287 } 234 }
288 235
289 void SyncerThread::HandleClientCommand(ClientCommandChannel::EventType event) { 236 void SyncerThread::OnReceivedShortPollIntervalUpdate(
290 if (!event) { 237 const base::TimeDelta& new_interval) {
291 return; 238 syncer_short_poll_interval_seconds_ = static_cast<int>(
292 } 239 new_interval.InSeconds());
240 }
293 241
294 // Mutex not really necessary for these. 242 void SyncerThread::OnSilencedUntil(const base::TimeTicks& silenced_until) {
295 if (event->has_set_sync_poll_interval()) { 243 silenced_until_ = silenced_until;
296 syncer_short_poll_interval_seconds_ = event->set_sync_poll_interval(); 244 }
297 }
298 245
299 if (event->has_set_sync_long_poll_interval()) { 246 bool SyncerThread::IsSyncingCurrentlySilenced() {
300 syncer_long_poll_interval_seconds_ = event->set_sync_long_poll_interval(); 247 return (silenced_until_ - TimeTicks::Now()) >= TimeDelta::FromSeconds(0);
301 }
302 } 248 }
303 249
304 void SyncerThread::ThreadMainLoop() { 250 void SyncerThread::ThreadMainLoop() {
305 // This is called with lock_ acquired. 251 // This is called with lock_ acquired.
306 lock_.AssertAcquired(); 252 lock_.AssertAcquired();
307 LOG(INFO) << "In thread main loop."; 253 LOG(INFO) << "In thread main loop.";
308 254
309 // Use the short poll value by default. 255 // Use the short poll value by default.
310 vault_.current_wait_interval_.poll_delta = 256 vault_.current_wait_interval_.poll_delta =
311 TimeDelta::FromSeconds(syncer_short_poll_interval_seconds_); 257 TimeDelta::FromSeconds(syncer_short_poll_interval_seconds_);
(...skipping 79 matching lines...) Expand 10 before | Expand all | Expand 10 after
391 SyncerThread::WaitInterval SyncerThread::CalculatePollingWaitTime( 337 SyncerThread::WaitInterval SyncerThread::CalculatePollingWaitTime(
392 const AllStatus::Status& status, 338 const AllStatus::Status& status,
393 int last_poll_wait, // Time in seconds. 339 int last_poll_wait, // Time in seconds.
394 int* user_idle_milliseconds, 340 int* user_idle_milliseconds,
395 bool* continue_sync_cycle, 341 bool* continue_sync_cycle,
396 bool was_nudged) { 342 bool was_nudged) {
397 lock_.AssertAcquired(); // We access 'vault' in here, so we need the lock. 343 lock_.AssertAcquired(); // We access 'vault' in here, so we need the lock.
398 WaitInterval return_interval; 344 WaitInterval return_interval;
399 345
400 // Server initiated throttling trumps everything. 346 // Server initiated throttling trumps everything.
401 if (vault_.syncer_ && vault_.syncer_->is_silenced()) { 347 if (!silenced_until_.is_null()) {
402 // We don't need to reset other state, it can continue where it left off. 348 // We don't need to reset other state, it can continue where it left off.
403 return_interval.mode = WaitInterval::THROTTLED; 349 return_interval.mode = WaitInterval::THROTTLED;
404 return_interval.poll_delta = vault_.syncer_->silenced_until() - 350 return_interval.poll_delta = silenced_until_ - TimeTicks::Now();
405 TimeTicks::Now();
406 return return_interval; 351 return return_interval;
407 } 352 }
408 353
409 bool is_continuing_sync_cyle = *continue_sync_cycle; 354 bool is_continuing_sync_cyle = *continue_sync_cycle;
410 *continue_sync_cycle = false; 355 *continue_sync_cycle = false;
411 356
412 // Determine if the syncer has unfinished work to do from allstatus_. 357 // Determine if the syncer has unfinished work to do from allstatus_.
413 const bool syncer_has_work_to_do = 358 const bool syncer_has_work_to_do =
414 status.updates_available > status.updates_received 359 status.updates_available > status.updates_received
415 || status.unsynced_count > 0; 360 || status.unsynced_count > 0;
(...skipping 57 matching lines...) Expand 10 before | Expand all | Expand 10 after
473 AutoLock lock(lock_); 418 AutoLock lock(lock_);
474 // Signal Start() to let it know we've made it safely onto the message loop, 419 // Signal Start() to let it know we've made it safely onto the message loop,
475 // and unblock it's caller. 420 // and unblock it's caller.
476 thread_main_started_.Signal(); 421 thread_main_started_.Signal();
477 ThreadMainLoop(); 422 ThreadMainLoop();
478 LOG(INFO) << "Syncer thread ThreadMain is done."; 423 LOG(INFO) << "Syncer thread ThreadMain is done.";
479 } 424 }
480 425
481 void SyncerThread::SyncMain(Syncer* syncer) { 426 void SyncerThread::SyncMain(Syncer* syncer) {
482 CHECK(syncer); 427 CHECK(syncer);
428
429 // Since we are initiating a new session for which we are the delegate, we
430 // are not currently silenced so reset this state for the next session which
431 // may need to use it.
432 silenced_until_ = base::TimeTicks();
433
483 AutoUnlock unlock(lock_); 434 AutoUnlock unlock(lock_);
484 while (syncer->SyncShare() && !syncer->is_silenced()) { 435 while (syncer->SyncShare(this) && silenced_until_.is_null()) {
485 LOG(INFO) << "Looping in sync share"; 436 LOG(INFO) << "Looping in sync share";
486 } 437 }
487 LOG(INFO) << "Done looping in sync share"; 438 LOG(INFO) << "Done looping in sync share";
488 } 439 }
489 440
490 bool SyncerThread::UpdateNudgeSource(bool was_throttled, 441 bool SyncerThread::UpdateNudgeSource(bool was_throttled,
491 bool continue_sync_cycle, 442 bool continue_sync_cycle,
492 bool* initial_sync) { 443 bool* initial_sync) {
493 bool nudged = false; 444 bool nudged = false;
494 NudgeSource nudge_source = kUnknown; 445 NudgeSource nudge_source = kUnknown;
(...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after
534 default: 485 default:
535 updates_source = sync_pb::GetUpdatesCallerInfo::UNKNOWN; 486 updates_source = sync_pb::GetUpdatesCallerInfo::UNKNOWN;
536 break; 487 break;
537 } 488 }
538 } 489 }
539 vault_.syncer_->set_updates_source(updates_source); 490 vault_.syncer_->set_updates_source(updates_source);
540 } 491 }
541 492
542 void SyncerThread::HandleSyncerEvent(const SyncerEvent& event) { 493 void SyncerThread::HandleSyncerEvent(const SyncerEvent& event) {
543 AutoLock lock(lock_); 494 AutoLock lock(lock_);
544 channel()->NotifyListeners(event); 495 relay_channel()->NotifyListeners(event);
545 if (SyncerEvent::REQUEST_SYNC_NUDGE != event.what_happened) { 496 if (SyncerEvent::REQUEST_SYNC_NUDGE != event.what_happened) {
546 return; 497 return;
547 } 498 }
548 NudgeSyncImpl(event.nudge_delay_milliseconds, kUnknown); 499 NudgeSyncImpl(event.nudge_delay_milliseconds, kUnknown);
549 } 500 }
550 501
551 void SyncerThread::HandleDirectoryManagerEvent( 502 void SyncerThread::HandleDirectoryManagerEvent(
552 const syncable::DirectoryManagerEvent& event) { 503 const syncable::DirectoryManagerEvent& event) {
553 LOG(INFO) << "Handling a directory manager event"; 504 LOG(INFO) << "Handling a directory manager event";
554 if (syncable::DirectoryManagerEvent::OPENED == event.what_happened) { 505 if (syncable::DirectoryManagerEvent::OPENED == event.what_happened) {
555 AutoLock lock(lock_); 506 AutoLock lock(lock_);
556 LOG(INFO) << "Syncer starting up for: " << event.dirname; 507 LOG(INFO) << "Syncer starting up for: " << event.dirname;
557 // The underlying database structure is ready, and we should create 508 // The underlying database structure is ready, and we should create
558 // the syncer. 509 // the syncer.
559 CHECK(vault_.syncer_ == NULL); 510 CHECK(vault_.syncer_ == NULL);
560 vault_.syncer_ = 511 session_context_->set_account_name(event.dirname);
561 new Syncer(dirman_, event.dirname, scm_, model_safe_worker_.get()); 512 vault_.syncer_ = new Syncer(session_context_.get());
562 513
563 vault_.syncer_->set_command_channel(command_channel_);
564 syncer_events_.reset(NewEventListenerHookup( 514 syncer_events_.reset(NewEventListenerHookup(
565 vault_.syncer_->channel(), this, &SyncerThread::HandleSyncerEvent)); 515 session_context_->syncer_event_channel(), this,
516 &SyncerThread::HandleSyncerEvent));
566 vault_field_changed_.Broadcast(); 517 vault_field_changed_.Broadcast();
567 } 518 }
568 } 519 }
569 520
570 static inline void CheckConnected(bool* connected, 521 static inline void CheckConnected(bool* connected,
571 HttpResponse::ServerConnectionCode code, 522 HttpResponse::ServerConnectionCode code,
572 ConditionVariable* condvar) { 523 ConditionVariable* condvar) {
573 if (*connected) { 524 if (*connected) {
574 if (HttpResponse::CONNECTION_UNAVAILABLE == code) { 525 if (HttpResponse::CONNECTION_UNAVAILABLE == code) {
575 *connected = false; 526 *connected = false;
(...skipping 16 matching lines...) Expand all
592 543
593 void SyncerThread::HandleServerConnectionEvent( 544 void SyncerThread::HandleServerConnectionEvent(
594 const ServerConnectionEvent& event) { 545 const ServerConnectionEvent& event) {
595 if (ServerConnectionEvent::STATUS_CHANGED == event.what_happened) { 546 if (ServerConnectionEvent::STATUS_CHANGED == event.what_happened) {
596 AutoLock lock(lock_); 547 AutoLock lock(lock_);
597 CheckConnected(&vault_.connected_, event.connection_code, 548 CheckConnected(&vault_.connected_, event.connection_code,
598 &vault_field_changed_); 549 &vault_field_changed_);
599 } 550 }
600 } 551 }
601 552
602 SyncerEventChannel* SyncerThread::channel() { 553 SyncerEventChannel* SyncerThread::relay_channel() {
603 return syncer_event_channel_.get(); 554 return syncer_event_relay_channel_.get();
604 } 555 }
605 556
606 // Inputs and return value in milliseconds. 557 // Inputs and return value in milliseconds.
607 int SyncerThread::CalculateSyncWaitTime(int last_interval, int user_idle_ms) { 558 int SyncerThread::CalculateSyncWaitTime(int last_interval, int user_idle_ms) {
608 // syncer_polling_interval_ is in seconds 559 // syncer_polling_interval_ is in seconds
609 int syncer_polling_interval_ms = syncer_polling_interval_ * 1000; 560 int syncer_polling_interval_ms = syncer_polling_interval_ * 1000;
610 561
611 // This is our default and lower bound. 562 // This is our default and lower bound.
612 int next_wait = syncer_polling_interval_ms; 563 int next_wait = syncer_polling_interval_ms;
613 564
(...skipping 61 matching lines...) Expand 10 before | Expand all | Expand 10 after
675 case TalkMediatorEvent::NOTIFICATION_RECEIVED: 626 case TalkMediatorEvent::NOTIFICATION_RECEIVED:
676 LOG(INFO) << "P2P: Updates on server, pushing syncer"; 627 LOG(INFO) << "P2P: Updates on server, pushing syncer";
677 if (NULL != vault_.syncer_) { 628 if (NULL != vault_.syncer_) {
678 NudgeSyncImpl(0, kNotification); 629 NudgeSyncImpl(0, kNotification);
679 } 630 }
680 break; 631 break;
681 default: 632 default:
682 break; 633 break;
683 } 634 }
684 635
685 if (NULL != vault_.syncer_) { 636 session_context_->set_notifications_enabled(p2p_authenticated_ &&
686 vault_.syncer_->set_notifications_enabled( 637 p2p_subscribed_);
687 p2p_authenticated_ && p2p_subscribed_);
688 }
689 } 638 }
690 639
691 } // namespace browser_sync 640 } // namespace browser_sync
OLDNEW
« no previous file with comments | « chrome/browser/sync/engine/syncer_thread.h ('k') | chrome/browser/sync/engine/syncer_thread_timed_stop.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698