| OLD | NEW |
| (Empty) |
| 1 // Copyright (c) 2009 The Chromium Authors. All rights reserved. | |
| 2 // Use of this source code is governed by a BSD-style license that can be | |
| 3 // found in the LICENSE file. | |
| 4 #include "chrome/browser/sync/engine/syncer_thread_pthreads.h" | |
| 5 | |
| 6 #include "build/build_config.h" | |
| 7 | |
| 8 #ifdef OS_MACOSX | |
| 9 #include <CoreFoundation/CFNumber.h> | |
| 10 #include <IOKit/IOTypes.h> | |
| 11 #include <IOKit/IOKitLib.h> | |
| 12 #endif | |
| 13 | |
| 14 #include <algorithm> | |
| 15 #include <map> | |
| 16 #include <queue> | |
| 17 | |
| 18 #include "chrome/browser/sync/engine/auth_watcher.h" | |
| 19 #include "chrome/browser/sync/engine/model_safe_worker.h" | |
| 20 #include "chrome/browser/sync/engine/net/server_connection_manager.h" | |
| 21 #include "chrome/browser/sync/engine/syncer.h" | |
| 22 #include "chrome/browser/sync/notifier/listener/talk_mediator.h" | |
| 23 #include "chrome/browser/sync/notifier/listener/talk_mediator_impl.h" | |
| 24 #include "chrome/browser/sync/syncable/directory_manager.h" | |
| 25 | |
| 26 using std::priority_queue; | |
| 27 using std::min; | |
| 28 | |
| 29 static inline bool operator < (const timespec& a, const timespec& b) { | |
| 30 return a.tv_sec == b.tv_sec ? a.tv_nsec < b.tv_nsec : a.tv_sec < b.tv_sec; | |
| 31 } | |
| 32 | |
| 33 namespace { | |
| 34 | |
| 35 // Returns the amount of time since the user last interacted with the computer, | |
| 36 // in milliseconds | |
| 37 int UserIdleTime() { | |
| 38 #ifdef OS_WIN | |
| 39 LASTINPUTINFO last_input_info; | |
| 40 last_input_info.cbSize = sizeof(LASTINPUTINFO); | |
| 41 | |
| 42 // Get time in windows ticks since system start of last activity. | |
| 43 BOOL b = ::GetLastInputInfo(&last_input_info); | |
| 44 if (b == TRUE) | |
| 45 return ::GetTickCount() - last_input_info.dwTime; | |
| 46 #elif defined(OS_MACOSX) | |
| 47 // It would be great to do something like: | |
| 48 // | |
| 49 // return 1000 * | |
| 50 // CGEventSourceSecondsSinceLastEventType( | |
| 51 // kCGEventSourceStateCombinedSessionState, | |
| 52 // kCGAnyInputEventType); | |
| 53 // | |
| 54 // Unfortunately, CGEvent* lives in ApplicationServices, and we're a daemon | |
| 55 // and can't link that high up the food chain. Thus this mucking in IOKit. | |
| 56 | |
| 57 io_service_t hid_service = | |
| 58 IOServiceGetMatchingService(kIOMasterPortDefault, | |
| 59 IOServiceMatching("IOHIDSystem")); | |
| 60 if (!hid_service) { | |
| 61 LOG(WARNING) << "Could not obtain IOHIDSystem"; | |
| 62 return 0; | |
| 63 } | |
| 64 | |
| 65 CFTypeRef object = IORegistryEntryCreateCFProperty(hid_service, | |
| 66 CFSTR("HIDIdleTime"), | |
| 67 kCFAllocatorDefault, | |
| 68 0); | |
| 69 if (!object) { | |
| 70 LOG(WARNING) << "Could not get IOHIDSystem's HIDIdleTime property"; | |
| 71 IOObjectRelease(hid_service); | |
| 72 return 0; | |
| 73 } | |
| 74 | |
| 75 int64 idle_time; // in nanoseconds | |
| 76 Boolean success; | |
| 77 if (CFGetTypeID(object) == CFNumberGetTypeID()) { | |
| 78 success = CFNumberGetValue((CFNumberRef)object, | |
| 79 kCFNumberSInt64Type, | |
| 80 &idle_time); | |
| 81 } else { | |
| 82 LOG(WARNING) << "IOHIDSystem's HIDIdleTime property isn't a number!"; | |
| 83 } | |
| 84 | |
| 85 CFRelease(object); | |
| 86 IOObjectRelease(hid_service); | |
| 87 | |
| 88 if (!success) { | |
| 89 LOG(WARNING) << "Could not get IOHIDSystem's HIDIdleTime property's value"; | |
| 90 return 0; | |
| 91 } else { | |
| 92 return idle_time / 1000000; // nano to milli | |
| 93 } | |
| 94 #else | |
| 95 static bool was_logged = false; | |
| 96 if (!was_logged) { | |
| 97 was_logged = true; | |
| 98 LOG(INFO) << "UserIdleTime unimplemented on this platform, " | |
| 99 "synchronization will not throttle when user idle"; | |
| 100 } | |
| 101 #endif | |
| 102 | |
| 103 return 0; | |
| 104 } | |
| 105 | |
| 106 } // namespace | |
| 107 | |
| 108 namespace browser_sync { | |
| 109 | |
| 110 SyncerThreadPthreads::SyncerThreadPthreads( | |
| 111 ClientCommandChannel* command_channel, | |
| 112 syncable::DirectoryManager* mgr, | |
| 113 ServerConnectionManager* connection_manager, | |
| 114 AllStatus* all_status, ModelSafeWorker* model_safe_worker) | |
| 115 : SyncerThread() { | |
| 116 impl_.reset(new SyncerThreadPthreadImpl(command_channel, mgr, | |
| 117 connection_manager, all_status, model_safe_worker)); | |
| 118 } | |
| 119 | |
| 120 bool SyncerThreadPthreadImpl::NudgeSyncer(int milliseconds_from_now, | |
| 121 SyncerThread::NudgeSource source) { | |
| 122 MutexLock lock(&mutex_); | |
| 123 if (syncer_ == NULL) { | |
| 124 return false; | |
| 125 } | |
| 126 NudgeSyncImpl(milliseconds_from_now, source); | |
| 127 return true; | |
| 128 } | |
| 129 | |
| 130 void* RunSyncerThread(void* syncer_thread) { | |
| 131 return (reinterpret_cast<SyncerThreadPthreadImpl*>( | |
| 132 syncer_thread))->ThreadMain(); | |
| 133 } | |
| 134 | |
| 135 SyncerThreadPthreadImpl::SyncerThreadPthreadImpl( | |
| 136 ClientCommandChannel* command_channel, | |
| 137 syncable::DirectoryManager* mgr, | |
| 138 ServerConnectionManager* connection_manager, | |
| 139 AllStatus* all_status, | |
| 140 ModelSafeWorker* model_safe_worker) | |
| 141 : dirman_(mgr), scm_(connection_manager), | |
| 142 syncer_(NULL), syncer_events_(NULL), thread_running_(false), | |
| 143 syncer_short_poll_interval_seconds_( | |
| 144 SyncerThread::kDefaultShortPollIntervalSeconds), | |
| 145 syncer_long_poll_interval_seconds_( | |
| 146 SyncerThread::kDefaultLongPollIntervalSeconds), | |
| 147 syncer_polling_interval_(SyncerThread::kDefaultShortPollIntervalSeconds), | |
| 148 syncer_max_interval_(SyncerThread::kDefaultMaxPollIntervalMs), | |
| 149 stop_syncer_thread_(false), connected_(false), conn_mgr_hookup_(NULL), | |
| 150 p2p_authenticated_(false), p2p_subscribed_(false), | |
| 151 allstatus_(all_status), talk_mediator_hookup_(NULL), | |
| 152 command_channel_(command_channel), directory_manager_hookup_(NULL), | |
| 153 model_safe_worker_(model_safe_worker), | |
| 154 client_command_hookup_(NULL), disable_idle_detection_(false) { | |
| 155 | |
| 156 SyncerEvent shutdown = { SyncerEvent::SHUTDOWN_USE_WITH_CARE }; | |
| 157 syncer_event_channel_.reset(new SyncerEventChannel(shutdown)); | |
| 158 | |
| 159 if (dirman_) { | |
| 160 directory_manager_hookup_.reset(NewEventListenerHookup( | |
| 161 dirman_->channel(), this, | |
| 162 &SyncerThreadPthreadImpl::HandleDirectoryManagerEvent)); | |
| 163 } | |
| 164 | |
| 165 if (scm_) { | |
| 166 WatchConnectionManager(scm_); | |
| 167 } | |
| 168 | |
| 169 if (command_channel_) { | |
| 170 WatchClientCommands(command_channel_); | |
| 171 } | |
| 172 } | |
| 173 | |
| 174 SyncerThreadPthreadImpl::~SyncerThreadPthreadImpl() { | |
| 175 client_command_hookup_.reset(); | |
| 176 conn_mgr_hookup_.reset(); | |
| 177 syncer_event_channel_.reset(); | |
| 178 directory_manager_hookup_.reset(); | |
| 179 syncer_events_.reset(); | |
| 180 delete syncer_; | |
| 181 talk_mediator_hookup_.reset(); | |
| 182 CHECK(!thread_running_); | |
| 183 } | |
| 184 | |
| 185 // Creates and starts a syncer thread. | |
| 186 // Returns true if it creates a thread or if there's currently a thread running | |
| 187 // and false otherwise. | |
| 188 bool SyncerThreadPthreadImpl::Start() { | |
| 189 MutexLock lock(&mutex_); | |
| 190 if (thread_running_) { | |
| 191 return true; | |
| 192 } | |
| 193 thread_running_ = | |
| 194 (0 == pthread_create(&thread_, NULL, RunSyncerThread, this)); | |
| 195 if (thread_running_) { | |
| 196 pthread_detach(thread_); | |
| 197 } | |
| 198 return thread_running_; | |
| 199 } | |
| 200 | |
| 201 // Stop processing. A max wait of at least 2*server RTT time is recommended. | |
| 202 // Returns true if we stopped, false otherwise. | |
| 203 bool SyncerThreadPthreadImpl::Stop(int max_wait) { | |
| 204 MutexLock lock(&mutex_); | |
| 205 if (!thread_running_) | |
| 206 return true; | |
| 207 stop_syncer_thread_ = true; | |
| 208 if (NULL != syncer_) { | |
| 209 // Try to early exit the syncer. | |
| 210 syncer_->RequestEarlyExit(); | |
| 211 } | |
| 212 pthread_cond_broadcast(&changed_.condvar_); | |
| 213 timespec deadline = { time(NULL) + (max_wait / 1000), 0 }; | |
| 214 do { | |
| 215 const int wait_result = max_wait < 0 ? | |
| 216 pthread_cond_wait(&changed_.condvar_, &mutex_.mutex_) : | |
| 217 pthread_cond_timedwait(&changed_.condvar_, &mutex_.mutex_, | |
| 218 &deadline); | |
| 219 if (ETIMEDOUT == wait_result) { | |
| 220 LOG(ERROR) << "SyncerThread::Stop timed out. Problems likely."; | |
| 221 return false; | |
| 222 } | |
| 223 } while (thread_running_); | |
| 224 return true; | |
| 225 } | |
| 226 | |
| 227 void SyncerThreadPthreadImpl::WatchClientCommands( | |
| 228 ClientCommandChannel* channel) { | |
| 229 PThreadScopedLock<PThreadMutex> lock(&mutex_); | |
| 230 client_command_hookup_.reset(NewEventListenerHookup(channel, this, | |
| 231 &SyncerThreadPthreadImpl::HandleClientCommand)); | |
| 232 } | |
| 233 | |
| 234 void SyncerThreadPthreadImpl::HandleClientCommand( | |
| 235 ClientCommandChannel::EventType event) { | |
| 236 if (!event) { | |
| 237 return; | |
| 238 } | |
| 239 | |
| 240 // Mutex not really necessary for these. | |
| 241 if (event->has_set_sync_poll_interval()) { | |
| 242 syncer_short_poll_interval_seconds_ = event->set_sync_poll_interval(); | |
| 243 } | |
| 244 | |
| 245 if (event->has_set_sync_long_poll_interval()) { | |
| 246 syncer_long_poll_interval_seconds_ = event->set_sync_long_poll_interval(); | |
| 247 } | |
| 248 } | |
| 249 | |
| 250 void SyncerThreadPthreadImpl::ThreadMainLoop() { | |
| 251 // Use the short poll value by default. | |
| 252 int poll_seconds = syncer_short_poll_interval_seconds_; | |
| 253 int user_idle_milliseconds = 0; | |
| 254 timespec last_sync_time = { 0 }; | |
| 255 bool initial_sync_for_thread = true; | |
| 256 bool continue_sync_cycle = false; | |
| 257 | |
| 258 while (!stop_syncer_thread_) { | |
| 259 if (!connected_) { | |
| 260 LOG(INFO) << "Syncer thread waiting for connection."; | |
| 261 while (!connected_ && !stop_syncer_thread_) | |
| 262 pthread_cond_wait(&changed_.condvar_, &mutex_.mutex_); | |
| 263 LOG_IF(INFO, connected_) << "Syncer thread found connection."; | |
| 264 continue; | |
| 265 } | |
| 266 | |
| 267 if (syncer_ == NULL) { | |
| 268 LOG(INFO) << "Syncer thread waiting for database initialization."; | |
| 269 while (syncer_ == NULL && !stop_syncer_thread_) | |
| 270 pthread_cond_wait(&changed_.condvar_, &mutex_.mutex_); | |
| 271 LOG_IF(INFO, !(syncer_ == NULL)) << "Syncer was found after DB started."; | |
| 272 continue; | |
| 273 } | |
| 274 | |
| 275 timespec const next_poll = { last_sync_time.tv_sec + poll_seconds, | |
| 276 last_sync_time.tv_nsec }; | |
| 277 const timespec wake_time = | |
| 278 !nudge_queue_.empty() && nudge_queue_.top().first < next_poll ? | |
| 279 nudge_queue_.top().first : next_poll; | |
| 280 LOG(INFO) << "wake time is " << wake_time.tv_sec; | |
| 281 LOG(INFO) << "next poll is " << next_poll.tv_sec; | |
| 282 | |
| 283 const int error = pthread_cond_timedwait(&changed_.condvar_, &mutex_.mutex_, | |
| 284 &wake_time); | |
| 285 if (ETIMEDOUT != error) { | |
| 286 continue; // Check all the conditions again. | |
| 287 } | |
| 288 | |
| 289 const timespec now = GetPThreadAbsoluteTime(0); | |
| 290 | |
| 291 // Handle a nudge, caused by either a notification or a local bookmark | |
| 292 // event. This will also update the source of the following SyncMain call. | |
| 293 UpdateNudgeSource(now, &continue_sync_cycle, &initial_sync_for_thread); | |
| 294 | |
| 295 LOG(INFO) << "Calling Sync Main at time " << now.tv_sec; | |
| 296 SyncMain(syncer_); | |
| 297 last_sync_time = now; | |
| 298 | |
| 299 LOG(INFO) << "Updating the next polling time after SyncMain"; | |
| 300 poll_seconds = CalculatePollingWaitTime(allstatus_->status(), | |
| 301 poll_seconds, | |
| 302 &user_idle_milliseconds, | |
| 303 &continue_sync_cycle); | |
| 304 } | |
| 305 } | |
| 306 | |
| 307 // We check how long the user's been idle and sync less often if the machine is | |
| 308 // not in use. The aim is to reduce server load. | |
| 309 int SyncerThreadPthreadImpl::CalculatePollingWaitTime( | |
| 310 const AllStatus::Status& status, | |
| 311 int last_poll_wait, // in s | |
| 312 int* user_idle_milliseconds, | |
| 313 bool* continue_sync_cycle) { | |
| 314 bool is_continuing_sync_cyle = *continue_sync_cycle; | |
| 315 *continue_sync_cycle = false; | |
| 316 | |
| 317 // Determine if the syncer has unfinished work to do from allstatus_. | |
| 318 const bool syncer_has_work_to_do = | |
| 319 status.updates_available > status.updates_received | |
| 320 || status.unsynced_count > 0; | |
| 321 LOG(INFO) << "syncer_has_work_to_do is " << syncer_has_work_to_do; | |
| 322 | |
| 323 // First calculate the expected wait time, figuring in any backoff because of | |
| 324 // user idle time. next_wait is in seconds | |
| 325 syncer_polling_interval_ = (!status.notifications_enabled) ? | |
| 326 syncer_short_poll_interval_seconds_ : | |
| 327 syncer_long_poll_interval_seconds_; | |
| 328 int default_next_wait = syncer_polling_interval_; | |
| 329 int actual_next_wait = default_next_wait; | |
| 330 | |
| 331 if (syncer_has_work_to_do) { | |
| 332 // Provide exponential backoff due to consecutive errors, else attempt to | |
| 333 // complete the work as soon as possible. | |
| 334 if (!is_continuing_sync_cyle) { | |
| 335 actual_next_wait = AllStatus::GetRecommendedDelaySeconds(0); | |
| 336 } else { | |
| 337 actual_next_wait = AllStatus::GetRecommendedDelaySeconds(last_poll_wait); | |
| 338 } | |
| 339 *continue_sync_cycle = true; | |
| 340 } else if (!status.notifications_enabled) { | |
| 341 // Ensure that we start exponential backoff from our base polling | |
| 342 // interval when we are not continuing a sync cycle. | |
| 343 last_poll_wait = std::max(last_poll_wait, syncer_polling_interval_); | |
| 344 | |
| 345 // Did the user start interacting with the computer again? | |
| 346 // If so, revise our idle time (and probably next_sync_time) downwards | |
| 347 int new_idle_time = disable_idle_detection_ ? 0 : UserIdleTime(); | |
| 348 if (new_idle_time < *user_idle_milliseconds) { | |
| 349 *user_idle_milliseconds = new_idle_time; | |
| 350 } | |
| 351 actual_next_wait = CalculateSyncWaitTime(last_poll_wait * 1000, | |
| 352 *user_idle_milliseconds) / 1000; | |
| 353 DCHECK_GE(actual_next_wait, default_next_wait); | |
| 354 } | |
| 355 | |
| 356 LOG(INFO) << "Sync wait: idle " << default_next_wait | |
| 357 << " non-idle or backoff " << actual_next_wait << "."; | |
| 358 | |
| 359 return actual_next_wait; | |
| 360 } | |
| 361 | |
| 362 void* SyncerThreadPthreadImpl::ThreadMain() { | |
| 363 NameCurrentThreadForDebugging("SyncEngine_SyncerThread"); | |
| 364 mutex_.Lock(); | |
| 365 ThreadMainLoop(); | |
| 366 thread_running_ = false; | |
| 367 pthread_cond_broadcast(&changed_.condvar_); | |
| 368 mutex_.Unlock(); | |
| 369 LOG(INFO) << "Syncer thread exiting."; | |
| 370 return 0; | |
| 371 } | |
| 372 | |
| 373 void SyncerThreadPthreadImpl::SyncMain(Syncer* syncer) { | |
| 374 CHECK(syncer); | |
| 375 mutex_.Unlock(); | |
| 376 while (syncer->SyncShare()) { | |
| 377 LOG(INFO) << "Looping in sync share"; | |
| 378 } | |
| 379 LOG(INFO) << "Done looping in sync share"; | |
| 380 | |
| 381 mutex_.Lock(); | |
| 382 } | |
| 383 | |
| 384 void SyncerThreadPthreadImpl::UpdateNudgeSource(const timespec& now, | |
| 385 bool* continue_sync_cycle, | |
| 386 bool* initial_sync) { | |
| 387 bool nudged = false; | |
| 388 SyncerThread::NudgeSource nudge_source = SyncerThread::kUnknown; | |
| 389 // Has the previous sync cycle completed? | |
| 390 if (continue_sync_cycle) { | |
| 391 nudge_source = SyncerThread::kContinuation; | |
| 392 } | |
| 393 // Update the nudge source if a new nudge has come through during the | |
| 394 // previous sync cycle. | |
| 395 while (!nudge_queue_.empty() && !(now < nudge_queue_.top().first)) { | |
| 396 if (!nudged) { | |
| 397 nudge_source = nudge_queue_.top().second; | |
| 398 *continue_sync_cycle = false; // Reset the continuation token on nudge. | |
| 399 nudged = true; | |
| 400 } | |
| 401 nudge_queue_.pop(); | |
| 402 } | |
| 403 SetUpdatesSource(nudged, nudge_source, initial_sync); | |
| 404 } | |
| 405 | |
| 406 void SyncerThreadPthreadImpl::SetUpdatesSource(bool nudged, | |
| 407 SyncerThread::NudgeSource nudge_source, bool* initial_sync) { | |
| 408 sync_pb::GetUpdatesCallerInfo::GET_UPDATES_SOURCE updates_source = | |
| 409 sync_pb::GetUpdatesCallerInfo::UNKNOWN; | |
| 410 if (*initial_sync) { | |
| 411 updates_source = sync_pb::GetUpdatesCallerInfo::FIRST_UPDATE; | |
| 412 *initial_sync = false; | |
| 413 } else if (!nudged) { | |
| 414 updates_source = sync_pb::GetUpdatesCallerInfo::PERIODIC; | |
| 415 } else { | |
| 416 switch (nudge_source) { | |
| 417 case SyncerThread::kNotification: | |
| 418 updates_source = sync_pb::GetUpdatesCallerInfo::NOTIFICATION; | |
| 419 break; | |
| 420 case SyncerThread::kLocal: | |
| 421 updates_source = sync_pb::GetUpdatesCallerInfo::LOCAL; | |
| 422 break; | |
| 423 case SyncerThread::kContinuation: | |
| 424 updates_source = sync_pb::GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION; | |
| 425 break; | |
| 426 case SyncerThread::kUnknown: | |
| 427 default: | |
| 428 updates_source = sync_pb::GetUpdatesCallerInfo::UNKNOWN; | |
| 429 break; | |
| 430 } | |
| 431 } | |
| 432 syncer_->set_updates_source(updates_source); | |
| 433 } | |
| 434 | |
| 435 void SyncerThreadPthreadImpl::HandleSyncerEvent(const SyncerEvent& event) { | |
| 436 MutexLock lock(&mutex_); | |
| 437 channel()->NotifyListeners(event); | |
| 438 if (SyncerEvent::REQUEST_SYNC_NUDGE != event.what_happened) { | |
| 439 return; | |
| 440 } | |
| 441 NudgeSyncImpl(event.nudge_delay_milliseconds, SyncerThread::kUnknown); | |
| 442 } | |
| 443 | |
| 444 void SyncerThreadPthreadImpl::HandleDirectoryManagerEvent( | |
| 445 const syncable::DirectoryManagerEvent& event) { | |
| 446 LOG(INFO) << "Handling a directory manager event"; | |
| 447 if (syncable::DirectoryManagerEvent::OPENED == event.what_happened) { | |
| 448 MutexLock lock(&mutex_); | |
| 449 LOG(INFO) << "Syncer starting up for: " << event.dirname; | |
| 450 // The underlying database structure is ready, and we should create | |
| 451 // the syncer. | |
| 452 CHECK(syncer_ == NULL); | |
| 453 syncer_ = | |
| 454 new Syncer(dirman_, event.dirname, scm_, model_safe_worker_.get()); | |
| 455 | |
| 456 syncer_->set_command_channel(command_channel_); | |
| 457 syncer_events_.reset(NewEventListenerHookup( | |
| 458 syncer_->channel(), this, &SyncerThreadPthreadImpl::HandleSyncerEvent)); | |
| 459 pthread_cond_broadcast(&changed_.condvar_); | |
| 460 } | |
| 461 } | |
| 462 | |
| 463 static inline void CheckConnected(bool* connected, | |
| 464 HttpResponse::ServerConnectionCode code, | |
| 465 pthread_cond_t* condvar) { | |
| 466 if (*connected) { | |
| 467 if (HttpResponse::CONNECTION_UNAVAILABLE == code) { | |
| 468 *connected = false; | |
| 469 pthread_cond_broadcast(condvar); | |
| 470 } | |
| 471 } else { | |
| 472 if (HttpResponse::SERVER_CONNECTION_OK == code) { | |
| 473 *connected = true; | |
| 474 pthread_cond_broadcast(condvar); | |
| 475 } | |
| 476 } | |
| 477 } | |
| 478 | |
| 479 void SyncerThreadPthreadImpl::WatchConnectionManager( | |
| 480 ServerConnectionManager* conn_mgr) { | |
| 481 conn_mgr_hookup_.reset(NewEventListenerHookup(conn_mgr->channel(), this, | |
| 482 &SyncerThreadPthreadImpl::HandleServerConnectionEvent)); | |
| 483 CheckConnected(&connected_, conn_mgr->server_status(), | |
| 484 &changed_.condvar_); | |
| 485 } | |
| 486 | |
| 487 void SyncerThreadPthreadImpl::HandleServerConnectionEvent( | |
| 488 const ServerConnectionEvent& event) { | |
| 489 if (ServerConnectionEvent::STATUS_CHANGED == event.what_happened) { | |
| 490 MutexLock lock(&mutex_); | |
| 491 CheckConnected(&connected_, event.connection_code, | |
| 492 &changed_.condvar_); | |
| 493 } | |
| 494 } | |
| 495 | |
| 496 SyncerEventChannel* SyncerThreadPthreadImpl::channel() { | |
| 497 return syncer_event_channel_.get(); | |
| 498 } | |
| 499 | |
| 500 // Inputs and return value in milliseconds. | |
| 501 int SyncerThreadPthreadImpl::CalculateSyncWaitTime(int last_interval, | |
| 502 int user_idle_ms) { | |
| 503 // syncer_polling_interval_ is in seconds | |
| 504 int syncer_polling_interval_ms = syncer_polling_interval_ * 1000; | |
| 505 | |
| 506 // This is our default and lower bound. | |
| 507 int next_wait = syncer_polling_interval_ms; | |
| 508 | |
| 509 // Get idle time, bounded by max wait. | |
| 510 int idle = min(user_idle_ms, syncer_max_interval_); | |
| 511 | |
| 512 // If the user has been idle for a while, we'll start decreasing the poll | |
| 513 // rate. | |
| 514 if (idle >= kPollBackoffThresholdMultiplier * syncer_polling_interval_ms) { | |
| 515 next_wait = std::min(AllStatus::GetRecommendedDelaySeconds( | |
| 516 last_interval / 1000), syncer_max_interval_ / 1000) * 1000; | |
| 517 } | |
| 518 | |
| 519 return next_wait; | |
| 520 } | |
| 521 | |
| 522 // Called with mutex_ already locked. | |
| 523 void SyncerThreadPthreadImpl::NudgeSyncImpl(int milliseconds_from_now, | |
| 524 SyncerThread::NudgeSource source) { | |
| 525 const timespec nudge_time = GetPThreadAbsoluteTime(milliseconds_from_now); | |
| 526 NudgeObject nudge_object(nudge_time, source); | |
| 527 nudge_queue_.push(nudge_object); | |
| 528 pthread_cond_broadcast(&changed_.condvar_); | |
| 529 } | |
| 530 | |
| 531 void SyncerThreadPthreadImpl::WatchTalkMediator(TalkMediator* mediator) { | |
| 532 talk_mediator_hookup_.reset( | |
| 533 NewEventListenerHookup( | |
| 534 mediator->channel(), | |
| 535 this, | |
| 536 &SyncerThreadPthreadImpl::HandleTalkMediatorEvent)); | |
| 537 } | |
| 538 | |
| 539 void SyncerThreadPthreadImpl::HandleTalkMediatorEvent( | |
| 540 const TalkMediatorEvent& event) { | |
| 541 MutexLock lock(&mutex_); | |
| 542 switch (event.what_happened) { | |
| 543 case TalkMediatorEvent::LOGIN_SUCCEEDED: | |
| 544 LOG(INFO) << "P2P: Login succeeded."; | |
| 545 p2p_authenticated_ = true; | |
| 546 break; | |
| 547 case TalkMediatorEvent::LOGOUT_SUCCEEDED: | |
| 548 LOG(INFO) << "P2P: Login succeeded."; | |
| 549 p2p_authenticated_ = false; | |
| 550 break; | |
| 551 case TalkMediatorEvent::SUBSCRIPTIONS_ON: | |
| 552 LOG(INFO) << "P2P: Subscriptions successfully enabled."; | |
| 553 p2p_subscribed_ = true; | |
| 554 if (NULL != syncer_) { | |
| 555 LOG(INFO) << "Subscriptions on. Nudging syncer for initial push."; | |
| 556 NudgeSyncImpl(0, SyncerThread::kLocal); | |
| 557 } | |
| 558 break; | |
| 559 case TalkMediatorEvent::SUBSCRIPTIONS_OFF: | |
| 560 LOG(INFO) << "P2P: Subscriptions are not enabled."; | |
| 561 p2p_subscribed_ = false; | |
| 562 break; | |
| 563 case TalkMediatorEvent::NOTIFICATION_RECEIVED: | |
| 564 LOG(INFO) << "P2P: Updates on server, pushing syncer"; | |
| 565 if (NULL != syncer_) { | |
| 566 NudgeSyncImpl(0, SyncerThread::kNotification); | |
| 567 } | |
| 568 break; | |
| 569 default: | |
| 570 break; | |
| 571 } | |
| 572 | |
| 573 if (NULL != syncer_) { | |
| 574 syncer_->set_notifications_enabled(p2p_authenticated_ && p2p_subscribed_); | |
| 575 } | |
| 576 } | |
| 577 | |
| 578 } // namespace browser_sync | |
| OLD | NEW |