| OLD | NEW |
| 1 // Copyright (c) 2009 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2009 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 #include "chrome/browser/sync/engine/syncer_thread_pthreads.h" |
| 5 #include "chrome/browser/sync/engine/syncer_thread.h" | |
| 6 | 5 |
| 7 #include "build/build_config.h" | 6 #include "build/build_config.h" |
| 8 | 7 |
| 9 #ifdef OS_MACOSX | 8 #ifdef OS_MACOSX |
| 10 #include <CoreFoundation/CFNumber.h> | 9 #include <CoreFoundation/CFNumber.h> |
| 11 #include <IOKit/IOTypes.h> | 10 #include <IOKit/IOTypes.h> |
| 12 #include <IOKit/IOKitLib.h> | 11 #include <IOKit/IOKitLib.h> |
| 13 #endif | 12 #endif |
| 14 | 13 |
| 15 #include <algorithm> | 14 #include <algorithm> |
| (...skipping 85 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 101 } | 100 } |
| 102 #endif | 101 #endif |
| 103 | 102 |
| 104 return 0; | 103 return 0; |
| 105 } | 104 } |
| 106 | 105 |
| 107 } // namespace | 106 } // namespace |
| 108 | 107 |
| 109 namespace browser_sync { | 108 namespace browser_sync { |
| 110 | 109 |
| 111 bool SyncerThread::NudgeSyncer(int milliseconds_from_now, NudgeSource source) { | 110 SyncerThreadPthreads::SyncerThreadPthreads( |
| 111 ClientCommandChannel* command_channel, |
| 112 syncable::DirectoryManager* mgr, |
| 113 ServerConnectionManager* connection_manager, |
| 114 AllStatus* all_status, ModelSafeWorker* model_safe_worker) |
| 115 : SyncerThread() { |
| 116 impl_.reset(new SyncerThreadPthreadImpl(command_channel, mgr, |
| 117 connection_manager, all_status, model_safe_worker)); |
| 118 } |
| 119 |
| 120 bool SyncerThreadPthreadImpl::NudgeSyncer(int milliseconds_from_now, |
| 121 SyncerThread::NudgeSource source) { |
| 112 MutexLock lock(&mutex_); | 122 MutexLock lock(&mutex_); |
| 113 if (syncer_ == NULL) { | 123 if (syncer_ == NULL) { |
| 114 return false; | 124 return false; |
| 115 } | 125 } |
| 116 NudgeSyncImpl(milliseconds_from_now, source); | 126 NudgeSyncImpl(milliseconds_from_now, source); |
| 117 return true; | 127 return true; |
| 118 } | 128 } |
| 119 | 129 |
| 120 void* RunSyncerThread(void* syncer_thread) { | 130 void* RunSyncerThread(void* syncer_thread) { |
| 121 return (reinterpret_cast<SyncerThread*>(syncer_thread))->ThreadMain(); | 131 return (reinterpret_cast<SyncerThreadPthreadImpl*>( |
| 132 syncer_thread))->ThreadMain(); |
| 122 } | 133 } |
| 123 | 134 |
| 124 SyncerThread::SyncerThread( | 135 SyncerThreadPthreadImpl::SyncerThreadPthreadImpl( |
| 125 ClientCommandChannel* command_channel, | 136 ClientCommandChannel* command_channel, |
| 126 syncable::DirectoryManager* mgr, | 137 syncable::DirectoryManager* mgr, |
| 127 ServerConnectionManager* connection_manager, | 138 ServerConnectionManager* connection_manager, |
| 128 AllStatus* all_status, | 139 AllStatus* all_status, |
| 129 ModelSafeWorker* model_safe_worker) | 140 ModelSafeWorker* model_safe_worker) |
| 130 : dirman_(mgr), scm_(connection_manager), | 141 : dirman_(mgr), scm_(connection_manager), |
| 131 syncer_(NULL), syncer_events_(NULL), thread_running_(false), | 142 syncer_(NULL), syncer_events_(NULL), thread_running_(false), |
| 132 syncer_short_poll_interval_seconds_(kDefaultShortPollIntervalSeconds), | 143 syncer_short_poll_interval_seconds_( |
| 133 syncer_long_poll_interval_seconds_(kDefaultLongPollIntervalSeconds), | 144 SyncerThread::kDefaultShortPollIntervalSeconds), |
| 134 syncer_polling_interval_(kDefaultShortPollIntervalSeconds), | 145 syncer_long_poll_interval_seconds_( |
| 135 syncer_max_interval_(kDefaultMaxPollIntervalMs), | 146 SyncerThread::kDefaultLongPollIntervalSeconds), |
| 147 syncer_polling_interval_(SyncerThread::kDefaultShortPollIntervalSeconds), |
| 148 syncer_max_interval_(SyncerThread::kDefaultMaxPollIntervalMs), |
| 136 stop_syncer_thread_(false), connected_(false), conn_mgr_hookup_(NULL), | 149 stop_syncer_thread_(false), connected_(false), conn_mgr_hookup_(NULL), |
| 137 p2p_authenticated_(false), p2p_subscribed_(false), | 150 p2p_authenticated_(false), p2p_subscribed_(false), |
| 138 allstatus_(all_status), talk_mediator_hookup_(NULL), | 151 allstatus_(all_status), talk_mediator_hookup_(NULL), |
| 139 command_channel_(command_channel), directory_manager_hookup_(NULL), | 152 command_channel_(command_channel), directory_manager_hookup_(NULL), |
| 140 model_safe_worker_(model_safe_worker), | 153 model_safe_worker_(model_safe_worker), |
| 141 client_command_hookup_(NULL), disable_idle_detection_(false) { | 154 client_command_hookup_(NULL), disable_idle_detection_(false) { |
| 142 | 155 |
| 143 SyncerEvent shutdown = { SyncerEvent::SHUTDOWN_USE_WITH_CARE }; | 156 SyncerEvent shutdown = { SyncerEvent::SHUTDOWN_USE_WITH_CARE }; |
| 144 syncer_event_channel_.reset(new SyncerEventChannel(shutdown)); | 157 syncer_event_channel_.reset(new SyncerEventChannel(shutdown)); |
| 145 | 158 |
| 146 if (dirman_) { | 159 if (dirman_) { |
| 147 directory_manager_hookup_.reset(NewEventListenerHookup( | 160 directory_manager_hookup_.reset(NewEventListenerHookup( |
| 148 dirman_->channel(), this, &SyncerThread::HandleDirectoryManagerEvent)); | 161 dirman_->channel(), this, |
| 162 &SyncerThreadPthreadImpl::HandleDirectoryManagerEvent)); |
| 149 } | 163 } |
| 150 | 164 |
| 151 if (scm_) { | 165 if (scm_) { |
| 152 WatchConnectionManager(scm_); | 166 WatchConnectionManager(scm_); |
| 153 } | 167 } |
| 154 | 168 |
| 155 if (command_channel_) { | 169 if (command_channel_) { |
| 156 WatchClientCommands(command_channel_); | 170 WatchClientCommands(command_channel_); |
| 157 } | 171 } |
| 158 } | 172 } |
| 159 | 173 |
| 160 SyncerThread::~SyncerThread() { | 174 SyncerThreadPthreadImpl::~SyncerThreadPthreadImpl() { |
| 161 client_command_hookup_.reset(); | 175 client_command_hookup_.reset(); |
| 162 conn_mgr_hookup_.reset(); | 176 conn_mgr_hookup_.reset(); |
| 163 syncer_event_channel_.reset(); | 177 syncer_event_channel_.reset(); |
| 164 directory_manager_hookup_.reset(); | 178 directory_manager_hookup_.reset(); |
| 165 syncer_events_.reset(); | 179 syncer_events_.reset(); |
| 166 delete syncer_; | 180 delete syncer_; |
| 167 talk_mediator_hookup_.reset(); | 181 talk_mediator_hookup_.reset(); |
| 168 CHECK(!thread_running_); | 182 CHECK(!thread_running_); |
| 169 } | 183 } |
| 170 | 184 |
| 171 // Creates and starts a syncer thread. | 185 // Creates and starts a syncer thread. |
| 172 // Returns true if it creates a thread or if there's currently a thread running | 186 // Returns true if it creates a thread or if there's currently a thread running |
| 173 // and false otherwise. | 187 // and false otherwise. |
| 174 bool SyncerThread::Start() { | 188 bool SyncerThreadPthreadImpl::Start() { |
| 175 MutexLock lock(&mutex_); | 189 MutexLock lock(&mutex_); |
| 176 if (thread_running_) { | 190 if (thread_running_) { |
| 177 return true; | 191 return true; |
| 178 } | 192 } |
| 179 thread_running_ = | 193 thread_running_ = |
| 180 (0 == pthread_create(&thread_, NULL, RunSyncerThread, this)); | 194 (0 == pthread_create(&thread_, NULL, RunSyncerThread, this)); |
| 181 if (thread_running_) { | 195 if (thread_running_) { |
| 182 pthread_detach(thread_); | 196 pthread_detach(thread_); |
| 183 } | 197 } |
| 184 return thread_running_; | 198 return thread_running_; |
| 185 } | 199 } |
| 186 | 200 |
| 187 // Stop processing. A max wait of at least 2*server RTT time is recommended. | 201 // Stop processing. A max wait of at least 2*server RTT time is recommended. |
| 188 // Returns true if we stopped, false otherwise. | 202 // Returns true if we stopped, false otherwise. |
| 189 bool SyncerThread::Stop(int max_wait) { | 203 bool SyncerThreadPthreadImpl::Stop(int max_wait) { |
| 190 MutexLock lock(&mutex_); | 204 MutexLock lock(&mutex_); |
| 191 if (!thread_running_) | 205 if (!thread_running_) |
| 192 return true; | 206 return true; |
| 193 stop_syncer_thread_ = true; | 207 stop_syncer_thread_ = true; |
| 194 if (NULL != syncer_) { | 208 if (NULL != syncer_) { |
| 195 // Try to early exit the syncer. | 209 // Try to early exit the syncer. |
| 196 syncer_->RequestEarlyExit(); | 210 syncer_->RequestEarlyExit(); |
| 197 } | 211 } |
| 198 pthread_cond_broadcast(&changed_.condvar_); | 212 pthread_cond_broadcast(&changed_.condvar_); |
| 199 timespec deadline = { time(NULL) + (max_wait / 1000), 0 }; | 213 timespec deadline = { time(NULL) + (max_wait / 1000), 0 }; |
| 200 do { | 214 do { |
| 201 const int wait_result = max_wait < 0 ? | 215 const int wait_result = max_wait < 0 ? |
| 202 pthread_cond_wait(&changed_.condvar_, &mutex_.mutex_) : | 216 pthread_cond_wait(&changed_.condvar_, &mutex_.mutex_) : |
| 203 pthread_cond_timedwait(&changed_.condvar_, &mutex_.mutex_, | 217 pthread_cond_timedwait(&changed_.condvar_, &mutex_.mutex_, |
| 204 &deadline); | 218 &deadline); |
| 205 if (ETIMEDOUT == wait_result) { | 219 if (ETIMEDOUT == wait_result) { |
| 206 LOG(ERROR) << "SyncerThread::Stop timed out. Problems likely."; | 220 LOG(ERROR) << "SyncerThread::Stop timed out. Problems likely."; |
| 207 return false; | 221 return false; |
| 208 } | 222 } |
| 209 } while (thread_running_); | 223 } while (thread_running_); |
| 210 return true; | 224 return true; |
| 211 } | 225 } |
| 212 | 226 |
| 213 void SyncerThread::WatchClientCommands(ClientCommandChannel* channel) { | 227 void SyncerThreadPthreadImpl::WatchClientCommands( |
| 228 ClientCommandChannel* channel) { |
| 214 PThreadScopedLock<PThreadMutex> lock(&mutex_); | 229 PThreadScopedLock<PThreadMutex> lock(&mutex_); |
| 215 client_command_hookup_.reset(NewEventListenerHookup(channel, this, | 230 client_command_hookup_.reset(NewEventListenerHookup(channel, this, |
| 216 &SyncerThread::HandleClientCommand)); | 231 &SyncerThreadPthreadImpl::HandleClientCommand)); |
| 217 } | 232 } |
| 218 | 233 |
| 219 void SyncerThread::HandleClientCommand(ClientCommandChannel::EventType event) { | 234 void SyncerThreadPthreadImpl::HandleClientCommand( |
| 235 ClientCommandChannel::EventType event) { |
| 220 if (!event) { | 236 if (!event) { |
| 221 return; | 237 return; |
| 222 } | 238 } |
| 223 | 239 |
| 224 // Mutex not really necessary for these. | 240 // Mutex not really necessary for these. |
| 225 if (event->has_set_sync_poll_interval()) { | 241 if (event->has_set_sync_poll_interval()) { |
| 226 syncer_short_poll_interval_seconds_ = event->set_sync_poll_interval(); | 242 syncer_short_poll_interval_seconds_ = event->set_sync_poll_interval(); |
| 227 } | 243 } |
| 228 | 244 |
| 229 if (event->has_set_sync_long_poll_interval()) { | 245 if (event->has_set_sync_long_poll_interval()) { |
| 230 syncer_long_poll_interval_seconds_ = event->set_sync_long_poll_interval(); | 246 syncer_long_poll_interval_seconds_ = event->set_sync_long_poll_interval(); |
| 231 } | 247 } |
| 232 } | 248 } |
| 233 | 249 |
| 234 void SyncerThread::ThreadMainLoop() { | 250 void SyncerThreadPthreadImpl::ThreadMainLoop() { |
| 235 // Use the short poll value by default. | 251 // Use the short poll value by default. |
| 236 int poll_seconds = syncer_short_poll_interval_seconds_; | 252 int poll_seconds = syncer_short_poll_interval_seconds_; |
| 237 int user_idle_milliseconds = 0; | 253 int user_idle_milliseconds = 0; |
| 238 timespec last_sync_time = { 0 }; | 254 timespec last_sync_time = { 0 }; |
| 239 bool initial_sync_for_thread = true; | 255 bool initial_sync_for_thread = true; |
| 240 bool continue_sync_cycle = false; | 256 bool continue_sync_cycle = false; |
| 241 | 257 |
| 242 while (!stop_syncer_thread_) { | 258 while (!stop_syncer_thread_) { |
| 243 if (!connected_) { | 259 if (!connected_) { |
| 244 LOG(INFO) << "Syncer thread waiting for connection."; | 260 LOG(INFO) << "Syncer thread waiting for connection."; |
| (...skipping 38 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 283 LOG(INFO) << "Updating the next polling time after SyncMain"; | 299 LOG(INFO) << "Updating the next polling time after SyncMain"; |
| 284 poll_seconds = CalculatePollingWaitTime(allstatus_->status(), | 300 poll_seconds = CalculatePollingWaitTime(allstatus_->status(), |
| 285 poll_seconds, | 301 poll_seconds, |
| 286 &user_idle_milliseconds, | 302 &user_idle_milliseconds, |
| 287 &continue_sync_cycle); | 303 &continue_sync_cycle); |
| 288 } | 304 } |
| 289 } | 305 } |
| 290 | 306 |
| 291 // We check how long the user's been idle and sync less often if the machine is | 307 // We check how long the user's been idle and sync less often if the machine is |
| 292 // not in use. The aim is to reduce server load. | 308 // not in use. The aim is to reduce server load. |
| 293 int SyncerThread::CalculatePollingWaitTime( | 309 int SyncerThreadPthreadImpl::CalculatePollingWaitTime( |
| 294 const AllStatus::Status& status, | 310 const AllStatus::Status& status, |
| 295 int last_poll_wait, // in s | 311 int last_poll_wait, // in s |
| 296 int* user_idle_milliseconds, | 312 int* user_idle_milliseconds, |
| 297 bool* continue_sync_cycle) { | 313 bool* continue_sync_cycle) { |
| 298 bool is_continuing_sync_cyle = *continue_sync_cycle; | 314 bool is_continuing_sync_cyle = *continue_sync_cycle; |
| 299 *continue_sync_cycle = false; | 315 *continue_sync_cycle = false; |
| 300 | 316 |
| 301 // Determine if the syncer has unfinished work to do from allstatus_. | 317 // Determine if the syncer has unfinished work to do from allstatus_. |
| 302 const bool syncer_has_work_to_do = | 318 const bool syncer_has_work_to_do = |
| 303 status.updates_available > status.updates_received | 319 status.updates_available > status.updates_received |
| (...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 336 *user_idle_milliseconds) / 1000; | 352 *user_idle_milliseconds) / 1000; |
| 337 DCHECK_GE(actual_next_wait, default_next_wait); | 353 DCHECK_GE(actual_next_wait, default_next_wait); |
| 338 } | 354 } |
| 339 | 355 |
| 340 LOG(INFO) << "Sync wait: idle " << default_next_wait | 356 LOG(INFO) << "Sync wait: idle " << default_next_wait |
| 341 << " non-idle or backoff " << actual_next_wait << "."; | 357 << " non-idle or backoff " << actual_next_wait << "."; |
| 342 | 358 |
| 343 return actual_next_wait; | 359 return actual_next_wait; |
| 344 } | 360 } |
| 345 | 361 |
| 346 void* SyncerThread::ThreadMain() { | 362 void* SyncerThreadPthreadImpl::ThreadMain() { |
| 347 NameCurrentThreadForDebugging("SyncEngine_SyncerThread"); | 363 NameCurrentThreadForDebugging("SyncEngine_SyncerThread"); |
| 348 mutex_.Lock(); | 364 mutex_.Lock(); |
| 349 ThreadMainLoop(); | 365 ThreadMainLoop(); |
| 350 thread_running_ = false; | 366 thread_running_ = false; |
| 351 pthread_cond_broadcast(&changed_.condvar_); | 367 pthread_cond_broadcast(&changed_.condvar_); |
| 352 mutex_.Unlock(); | 368 mutex_.Unlock(); |
| 353 LOG(INFO) << "Syncer thread exiting."; | 369 LOG(INFO) << "Syncer thread exiting."; |
| 354 return 0; | 370 return 0; |
| 355 } | 371 } |
| 356 | 372 |
| 357 void SyncerThread::SyncMain(Syncer* syncer) { | 373 void SyncerThreadPthreadImpl::SyncMain(Syncer* syncer) { |
| 358 CHECK(syncer); | 374 CHECK(syncer); |
| 359 mutex_.Unlock(); | 375 mutex_.Unlock(); |
| 360 while (syncer->SyncShare()) { | 376 while (syncer->SyncShare()) { |
| 361 LOG(INFO) << "Looping in sync share"; | 377 LOG(INFO) << "Looping in sync share"; |
| 362 } | 378 } |
| 363 LOG(INFO) << "Done looping in sync share"; | 379 LOG(INFO) << "Done looping in sync share"; |
| 364 | 380 |
| 365 mutex_.Lock(); | 381 mutex_.Lock(); |
| 366 } | 382 } |
| 367 | 383 |
| 368 void SyncerThread::UpdateNudgeSource(const timespec& now, | 384 void SyncerThreadPthreadImpl::UpdateNudgeSource(const timespec& now, |
| 369 bool* continue_sync_cycle, | 385 bool* continue_sync_cycle, |
| 370 bool* initial_sync) { | 386 bool* initial_sync) { |
| 371 bool nudged = false; | 387 bool nudged = false; |
| 372 NudgeSource nudge_source = kUnknown; | 388 SyncerThread::NudgeSource nudge_source = SyncerThread::kUnknown; |
| 373 // Has the previous sync cycle completed? | 389 // Has the previous sync cycle completed? |
| 374 if (continue_sync_cycle) { | 390 if (continue_sync_cycle) { |
| 375 nudge_source = kContinuation; | 391 nudge_source = SyncerThread::kContinuation; |
| 376 } | 392 } |
| 377 // Update the nudge source if a new nudge has come through during the | 393 // Update the nudge source if a new nudge has come through during the |
| 378 // previous sync cycle. | 394 // previous sync cycle. |
| 379 while (!nudge_queue_.empty() && !(now < nudge_queue_.top().first)) { | 395 while (!nudge_queue_.empty() && !(now < nudge_queue_.top().first)) { |
| 380 if (!nudged) { | 396 if (!nudged) { |
| 381 nudge_source = nudge_queue_.top().second; | 397 nudge_source = nudge_queue_.top().second; |
| 382 *continue_sync_cycle = false; // Reset the continuation token on nudge. | 398 *continue_sync_cycle = false; // Reset the continuation token on nudge. |
| 383 nudged = true; | 399 nudged = true; |
| 384 } | 400 } |
| 385 nudge_queue_.pop(); | 401 nudge_queue_.pop(); |
| 386 } | 402 } |
| 387 SetUpdatesSource(nudged, nudge_source, initial_sync); | 403 SetUpdatesSource(nudged, nudge_source, initial_sync); |
| 388 } | 404 } |
| 389 | 405 |
| 390 void SyncerThread::SetUpdatesSource(bool nudged, NudgeSource nudge_source, | 406 void SyncerThreadPthreadImpl::SetUpdatesSource(bool nudged, |
| 391 bool* initial_sync) { | 407 SyncerThread::NudgeSource nudge_source, bool* initial_sync) { |
| 392 sync_pb::GetUpdatesCallerInfo::GET_UPDATES_SOURCE updates_source = | 408 sync_pb::GetUpdatesCallerInfo::GET_UPDATES_SOURCE updates_source = |
| 393 sync_pb::GetUpdatesCallerInfo::UNKNOWN; | 409 sync_pb::GetUpdatesCallerInfo::UNKNOWN; |
| 394 if (*initial_sync) { | 410 if (*initial_sync) { |
| 395 updates_source = sync_pb::GetUpdatesCallerInfo::FIRST_UPDATE; | 411 updates_source = sync_pb::GetUpdatesCallerInfo::FIRST_UPDATE; |
| 396 *initial_sync = false; | 412 *initial_sync = false; |
| 397 } else if (!nudged) { | 413 } else if (!nudged) { |
| 398 updates_source = sync_pb::GetUpdatesCallerInfo::PERIODIC; | 414 updates_source = sync_pb::GetUpdatesCallerInfo::PERIODIC; |
| 399 } else { | 415 } else { |
| 400 switch (nudge_source) { | 416 switch (nudge_source) { |
| 401 case kNotification: | 417 case SyncerThread::kNotification: |
| 402 updates_source = sync_pb::GetUpdatesCallerInfo::NOTIFICATION; | 418 updates_source = sync_pb::GetUpdatesCallerInfo::NOTIFICATION; |
| 403 break; | 419 break; |
| 404 case kLocal: | 420 case SyncerThread::kLocal: |
| 405 updates_source = sync_pb::GetUpdatesCallerInfo::LOCAL; | 421 updates_source = sync_pb::GetUpdatesCallerInfo::LOCAL; |
| 406 break; | 422 break; |
| 407 case kContinuation: | 423 case SyncerThread::kContinuation: |
| 408 updates_source = sync_pb::GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION; | 424 updates_source = sync_pb::GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION; |
| 409 break; | 425 break; |
| 410 case kUnknown: | 426 case SyncerThread::kUnknown: |
| 411 default: | 427 default: |
| 412 updates_source = sync_pb::GetUpdatesCallerInfo::UNKNOWN; | 428 updates_source = sync_pb::GetUpdatesCallerInfo::UNKNOWN; |
| 413 break; | 429 break; |
| 414 } | 430 } |
| 415 } | 431 } |
| 416 syncer_->set_updates_source(updates_source); | 432 syncer_->set_updates_source(updates_source); |
| 417 } | 433 } |
| 418 | 434 |
| 419 void SyncerThread::HandleSyncerEvent(const SyncerEvent& event) { | 435 void SyncerThreadPthreadImpl::HandleSyncerEvent(const SyncerEvent& event) { |
| 420 MutexLock lock(&mutex_); | 436 MutexLock lock(&mutex_); |
| 421 channel()->NotifyListeners(event); | 437 channel()->NotifyListeners(event); |
| 422 if (SyncerEvent::REQUEST_SYNC_NUDGE != event.what_happened) { | 438 if (SyncerEvent::REQUEST_SYNC_NUDGE != event.what_happened) { |
| 423 return; | 439 return; |
| 424 } | 440 } |
| 425 NudgeSyncImpl(event.nudge_delay_milliseconds, kUnknown); | 441 NudgeSyncImpl(event.nudge_delay_milliseconds, SyncerThread::kUnknown); |
| 426 } | 442 } |
| 427 | 443 |
| 428 void SyncerThread::HandleDirectoryManagerEvent( | 444 void SyncerThreadPthreadImpl::HandleDirectoryManagerEvent( |
| 429 const syncable::DirectoryManagerEvent& event) { | 445 const syncable::DirectoryManagerEvent& event) { |
| 430 LOG(INFO) << "Handling a directory manager event"; | 446 LOG(INFO) << "Handling a directory manager event"; |
| 431 if (syncable::DirectoryManagerEvent::OPENED == event.what_happened) { | 447 if (syncable::DirectoryManagerEvent::OPENED == event.what_happened) { |
| 432 MutexLock lock(&mutex_); | 448 MutexLock lock(&mutex_); |
| 433 LOG(INFO) << "Syncer starting up for: " << event.dirname; | 449 LOG(INFO) << "Syncer starting up for: " << event.dirname; |
| 434 // The underlying database structure is ready, and we should create | 450 // The underlying database structure is ready, and we should create |
| 435 // the syncer. | 451 // the syncer. |
| 436 CHECK(syncer_ == NULL); | 452 CHECK(syncer_ == NULL); |
| 437 syncer_ = | 453 syncer_ = |
| 438 new Syncer(dirman_, event.dirname, scm_, model_safe_worker_.get()); | 454 new Syncer(dirman_, event.dirname, scm_, model_safe_worker_.get()); |
| 439 | 455 |
| 440 syncer_->set_command_channel(command_channel_); | 456 syncer_->set_command_channel(command_channel_); |
| 441 syncer_events_.reset(NewEventListenerHookup( | 457 syncer_events_.reset(NewEventListenerHookup( |
| 442 syncer_->channel(), this, &SyncerThread::HandleSyncerEvent)); | 458 syncer_->channel(), this, &SyncerThreadPthreadImpl::HandleSyncerEvent)); |
| 443 pthread_cond_broadcast(&changed_.condvar_); | 459 pthread_cond_broadcast(&changed_.condvar_); |
| 444 } | 460 } |
| 445 } | 461 } |
| 446 | 462 |
| 447 static inline void CheckConnected(bool* connected, | 463 static inline void CheckConnected(bool* connected, |
| 448 HttpResponse::ServerConnectionCode code, | 464 HttpResponse::ServerConnectionCode code, |
| 449 pthread_cond_t* condvar) { | 465 pthread_cond_t* condvar) { |
| 450 if (*connected) { | 466 if (*connected) { |
| 451 if (HttpResponse::CONNECTION_UNAVAILABLE == code) { | 467 if (HttpResponse::CONNECTION_UNAVAILABLE == code) { |
| 452 *connected = false; | 468 *connected = false; |
| 453 pthread_cond_broadcast(condvar); | 469 pthread_cond_broadcast(condvar); |
| 454 } | 470 } |
| 455 } else { | 471 } else { |
| 456 if (HttpResponse::SERVER_CONNECTION_OK == code) { | 472 if (HttpResponse::SERVER_CONNECTION_OK == code) { |
| 457 *connected = true; | 473 *connected = true; |
| 458 pthread_cond_broadcast(condvar); | 474 pthread_cond_broadcast(condvar); |
| 459 } | 475 } |
| 460 } | 476 } |
| 461 } | 477 } |
| 462 | 478 |
| 463 void SyncerThread::WatchConnectionManager(ServerConnectionManager* conn_mgr) { | 479 void SyncerThreadPthreadImpl::WatchConnectionManager( |
| 480 ServerConnectionManager* conn_mgr) { |
| 464 conn_mgr_hookup_.reset(NewEventListenerHookup(conn_mgr->channel(), this, | 481 conn_mgr_hookup_.reset(NewEventListenerHookup(conn_mgr->channel(), this, |
| 465 &SyncerThread::HandleServerConnectionEvent)); | 482 &SyncerThreadPthreadImpl::HandleServerConnectionEvent)); |
| 466 CheckConnected(&connected_, conn_mgr->server_status(), | 483 CheckConnected(&connected_, conn_mgr->server_status(), |
| 467 &changed_.condvar_); | 484 &changed_.condvar_); |
| 468 } | 485 } |
| 469 | 486 |
| 470 void SyncerThread::HandleServerConnectionEvent( | 487 void SyncerThreadPthreadImpl::HandleServerConnectionEvent( |
| 471 const ServerConnectionEvent& event) { | 488 const ServerConnectionEvent& event) { |
| 472 if (ServerConnectionEvent::STATUS_CHANGED == event.what_happened) { | 489 if (ServerConnectionEvent::STATUS_CHANGED == event.what_happened) { |
| 473 MutexLock lock(&mutex_); | 490 MutexLock lock(&mutex_); |
| 474 CheckConnected(&connected_, event.connection_code, | 491 CheckConnected(&connected_, event.connection_code, |
| 475 &changed_.condvar_); | 492 &changed_.condvar_); |
| 476 } | 493 } |
| 477 } | 494 } |
| 478 | 495 |
| 479 SyncerEventChannel* SyncerThread::channel() { | 496 SyncerEventChannel* SyncerThreadPthreadImpl::channel() { |
| 480 return syncer_event_channel_.get(); | 497 return syncer_event_channel_.get(); |
| 481 } | 498 } |
| 482 | 499 |
| 483 // Inputs and return value in milliseconds. | 500 // Inputs and return value in milliseconds. |
| 484 int SyncerThread::CalculateSyncWaitTime(int last_interval, int user_idle_ms) { | 501 int SyncerThreadPthreadImpl::CalculateSyncWaitTime(int last_interval, |
| 502 int user_idle_ms) { |
| 485 // syncer_polling_interval_ is in seconds | 503 // syncer_polling_interval_ is in seconds |
| 486 int syncer_polling_interval_ms = syncer_polling_interval_ * 1000; | 504 int syncer_polling_interval_ms = syncer_polling_interval_ * 1000; |
| 487 | 505 |
| 488 // This is our default and lower bound. | 506 // This is our default and lower bound. |
| 489 int next_wait = syncer_polling_interval_ms; | 507 int next_wait = syncer_polling_interval_ms; |
| 490 | 508 |
| 491 // Get idle time, bounded by max wait. | 509 // Get idle time, bounded by max wait. |
| 492 int idle = min(user_idle_ms, syncer_max_interval_); | 510 int idle = min(user_idle_ms, syncer_max_interval_); |
| 493 | 511 |
| 494 // If the user has been idle for a while, we'll start decreasing the poll | 512 // If the user has been idle for a while, we'll start decreasing the poll |
| 495 // rate. | 513 // rate. |
| 496 if (idle >= kPollBackoffThresholdMultiplier * syncer_polling_interval_ms) { | 514 if (idle >= kPollBackoffThresholdMultiplier * syncer_polling_interval_ms) { |
| 497 next_wait = std::min(AllStatus::GetRecommendedDelaySeconds( | 515 next_wait = std::min(AllStatus::GetRecommendedDelaySeconds( |
| 498 last_interval / 1000), syncer_max_interval_ / 1000) * 1000; | 516 last_interval / 1000), syncer_max_interval_ / 1000) * 1000; |
| 499 } | 517 } |
| 500 | 518 |
| 501 return next_wait; | 519 return next_wait; |
| 502 } | 520 } |
| 503 | 521 |
| 504 // Called with mutex_ already locked. | 522 // Called with mutex_ already locked. |
| 505 void SyncerThread::NudgeSyncImpl(int milliseconds_from_now, | 523 void SyncerThreadPthreadImpl::NudgeSyncImpl(int milliseconds_from_now, |
| 506 NudgeSource source) { | 524 SyncerThread::NudgeSource source) { |
| 507 const timespec nudge_time = GetPThreadAbsoluteTime(milliseconds_from_now); | 525 const timespec nudge_time = GetPThreadAbsoluteTime(milliseconds_from_now); |
| 508 NudgeObject nudge_object(nudge_time, source); | 526 NudgeObject nudge_object(nudge_time, source); |
| 509 nudge_queue_.push(nudge_object); | 527 nudge_queue_.push(nudge_object); |
| 510 pthread_cond_broadcast(&changed_.condvar_); | 528 pthread_cond_broadcast(&changed_.condvar_); |
| 511 } | 529 } |
| 512 | 530 |
| 513 void SyncerThread::WatchTalkMediator(TalkMediator* mediator) { | 531 void SyncerThreadPthreadImpl::WatchTalkMediator(TalkMediator* mediator) { |
| 514 talk_mediator_hookup_.reset( | 532 talk_mediator_hookup_.reset( |
| 515 NewEventListenerHookup( | 533 NewEventListenerHookup( |
| 516 mediator->channel(), | 534 mediator->channel(), |
| 517 this, | 535 this, |
| 518 &SyncerThread::HandleTalkMediatorEvent)); | 536 &SyncerThreadPthreadImpl::HandleTalkMediatorEvent)); |
| 519 } | 537 } |
| 520 | 538 |
| 521 void SyncerThread::HandleTalkMediatorEvent(const TalkMediatorEvent& event) { | 539 void SyncerThreadPthreadImpl::HandleTalkMediatorEvent( |
| 540 const TalkMediatorEvent& event) { |
| 522 MutexLock lock(&mutex_); | 541 MutexLock lock(&mutex_); |
| 523 switch (event.what_happened) { | 542 switch (event.what_happened) { |
| 524 case TalkMediatorEvent::LOGIN_SUCCEEDED: | 543 case TalkMediatorEvent::LOGIN_SUCCEEDED: |
| 525 LOG(INFO) << "P2P: Login succeeded."; | 544 LOG(INFO) << "P2P: Login succeeded."; |
| 526 p2p_authenticated_ = true; | 545 p2p_authenticated_ = true; |
| 527 break; | 546 break; |
| 528 case TalkMediatorEvent::LOGOUT_SUCCEEDED: | 547 case TalkMediatorEvent::LOGOUT_SUCCEEDED: |
| 529 LOG(INFO) << "P2P: Login succeeded."; | 548 LOG(INFO) << "P2P: Login succeeded."; |
| 530 p2p_authenticated_ = false; | 549 p2p_authenticated_ = false; |
| 531 break; | 550 break; |
| 532 case TalkMediatorEvent::SUBSCRIPTIONS_ON: | 551 case TalkMediatorEvent::SUBSCRIPTIONS_ON: |
| 533 LOG(INFO) << "P2P: Subscriptions successfully enabled."; | 552 LOG(INFO) << "P2P: Subscriptions successfully enabled."; |
| 534 p2p_subscribed_ = true; | 553 p2p_subscribed_ = true; |
| 535 if (NULL != syncer_) { | 554 if (NULL != syncer_) { |
| 536 LOG(INFO) << "Subscriptions on. Nudging syncer for initial push."; | 555 LOG(INFO) << "Subscriptions on. Nudging syncer for initial push."; |
| 537 NudgeSyncImpl(0, kLocal); | 556 NudgeSyncImpl(0, SyncerThread::kLocal); |
| 538 } | 557 } |
| 539 break; | 558 break; |
| 540 case TalkMediatorEvent::SUBSCRIPTIONS_OFF: | 559 case TalkMediatorEvent::SUBSCRIPTIONS_OFF: |
| 541 LOG(INFO) << "P2P: Subscriptions are not enabled."; | 560 LOG(INFO) << "P2P: Subscriptions are not enabled."; |
| 542 p2p_subscribed_ = false; | 561 p2p_subscribed_ = false; |
| 543 break; | 562 break; |
| 544 case TalkMediatorEvent::NOTIFICATION_RECEIVED: | 563 case TalkMediatorEvent::NOTIFICATION_RECEIVED: |
| 545 LOG(INFO) << "P2P: Updates on server, pushing syncer"; | 564 LOG(INFO) << "P2P: Updates on server, pushing syncer"; |
| 546 if (NULL != syncer_) { | 565 if (NULL != syncer_) { |
| 547 NudgeSyncImpl(0, kNotification); | 566 NudgeSyncImpl(0, SyncerThread::kNotification); |
| 548 } | 567 } |
| 549 break; | 568 break; |
| 550 default: | 569 default: |
| 551 break; | 570 break; |
| 552 } | 571 } |
| 553 | 572 |
| 554 if (NULL != syncer_) { | 573 if (NULL != syncer_) { |
| 555 syncer_->set_notifications_enabled(p2p_authenticated_ && p2p_subscribed_); | 574 syncer_->set_notifications_enabled(p2p_authenticated_ && p2p_subscribed_); |
| 556 } | 575 } |
| 557 } | 576 } |
| 558 | 577 |
| 559 } // namespace browser_sync | 578 } // namespace browser_sync |
| OLD | NEW |