| OLD | NEW | 
|---|
|  | (Empty) | 
| 1 // Copyright (c) 2009 The Chromium Authors. All rights reserved. |  | 
| 2 // Use of this source code is governed by a BSD-style license that can be |  | 
| 3 // found in the LICENSE file. |  | 
| 4 #include "chrome/browser/sync/engine/syncer_thread_pthreads.h" |  | 
| 5 |  | 
| 6 #include "build/build_config.h" |  | 
| 7 |  | 
| 8 #ifdef OS_MACOSX |  | 
| 9 #include <CoreFoundation/CFNumber.h> |  | 
| 10 #include <IOKit/IOTypes.h> |  | 
| 11 #include <IOKit/IOKitLib.h> |  | 
| 12 #endif |  | 
| 13 |  | 
| 14 #include <algorithm> |  | 
| 15 #include <map> |  | 
| 16 #include <queue> |  | 
| 17 |  | 
| 18 #include "chrome/browser/sync/engine/auth_watcher.h" |  | 
| 19 #include "chrome/browser/sync/engine/model_safe_worker.h" |  | 
| 20 #include "chrome/browser/sync/engine/net/server_connection_manager.h" |  | 
| 21 #include "chrome/browser/sync/engine/syncer.h" |  | 
| 22 #include "chrome/browser/sync/notifier/listener/talk_mediator.h" |  | 
| 23 #include "chrome/browser/sync/notifier/listener/talk_mediator_impl.h" |  | 
| 24 #include "chrome/browser/sync/syncable/directory_manager.h" |  | 
| 25 |  | 
| 26 using std::priority_queue; |  | 
| 27 using std::min; |  | 
| 28 |  | 
| 29 static inline bool operator < (const timespec& a, const timespec& b) { |  | 
| 30   return a.tv_sec == b.tv_sec ? a.tv_nsec < b.tv_nsec : a.tv_sec < b.tv_sec; |  | 
| 31 } |  | 
| 32 |  | 
| 33 namespace { |  | 
| 34 |  | 
| 35 // Returns the amount of time since the user last interacted with the computer, |  | 
| 36 // in milliseconds |  | 
| 37 int UserIdleTime() { |  | 
| 38 #ifdef OS_WIN |  | 
| 39   LASTINPUTINFO last_input_info; |  | 
| 40   last_input_info.cbSize = sizeof(LASTINPUTINFO); |  | 
| 41 |  | 
| 42   // Get time in windows ticks since system start of last activity. |  | 
| 43   BOOL b = ::GetLastInputInfo(&last_input_info); |  | 
| 44   if (b == TRUE) |  | 
| 45     return ::GetTickCount() - last_input_info.dwTime; |  | 
| 46 #elif defined(OS_MACOSX) |  | 
| 47   // It would be great to do something like: |  | 
| 48   // |  | 
| 49   // return 1000 * |  | 
| 50   //     CGEventSourceSecondsSinceLastEventType( |  | 
| 51   //         kCGEventSourceStateCombinedSessionState, |  | 
| 52   //         kCGAnyInputEventType); |  | 
| 53   // |  | 
| 54   // Unfortunately, CGEvent* lives in ApplicationServices, and we're a daemon |  | 
| 55   // and can't link that high up the food chain. Thus this mucking in IOKit. |  | 
| 56 |  | 
| 57   io_service_t hid_service = |  | 
| 58       IOServiceGetMatchingService(kIOMasterPortDefault, |  | 
| 59                                   IOServiceMatching("IOHIDSystem")); |  | 
| 60   if (!hid_service) { |  | 
| 61     LOG(WARNING) << "Could not obtain IOHIDSystem"; |  | 
| 62     return 0; |  | 
| 63   } |  | 
| 64 |  | 
| 65   CFTypeRef object = IORegistryEntryCreateCFProperty(hid_service, |  | 
| 66                                                      CFSTR("HIDIdleTime"), |  | 
| 67                                                      kCFAllocatorDefault, |  | 
| 68                                                      0); |  | 
| 69   if (!object) { |  | 
| 70     LOG(WARNING) << "Could not get IOHIDSystem's HIDIdleTime property"; |  | 
| 71     IOObjectRelease(hid_service); |  | 
| 72     return 0; |  | 
| 73   } |  | 
| 74 |  | 
| 75   int64 idle_time;  // in nanoseconds |  | 
| 76   Boolean success; |  | 
| 77   if (CFGetTypeID(object) == CFNumberGetTypeID()) { |  | 
| 78     success = CFNumberGetValue((CFNumberRef)object, |  | 
| 79                                kCFNumberSInt64Type, |  | 
| 80                                &idle_time); |  | 
| 81   } else { |  | 
| 82     LOG(WARNING) << "IOHIDSystem's HIDIdleTime property isn't a number!"; |  | 
| 83   } |  | 
| 84 |  | 
| 85   CFRelease(object); |  | 
| 86   IOObjectRelease(hid_service); |  | 
| 87 |  | 
| 88   if (!success) { |  | 
| 89     LOG(WARNING) << "Could not get IOHIDSystem's HIDIdleTime property's value"; |  | 
| 90     return 0; |  | 
| 91   } else { |  | 
| 92     return idle_time / 1000000;  // nano to milli |  | 
| 93   } |  | 
| 94 #else |  | 
| 95   static bool was_logged = false; |  | 
| 96   if (!was_logged) { |  | 
| 97     was_logged = true; |  | 
| 98     LOG(INFO) << "UserIdleTime unimplemented on this platform, " |  | 
| 99         "synchronization will not throttle when user idle"; |  | 
| 100   } |  | 
| 101 #endif |  | 
| 102 |  | 
| 103   return 0; |  | 
| 104 } |  | 
| 105 |  | 
| 106 }  // namespace |  | 
| 107 |  | 
| 108 namespace browser_sync { |  | 
| 109 |  | 
| 110 SyncerThreadPthreads::SyncerThreadPthreads( |  | 
| 111     ClientCommandChannel* command_channel, |  | 
| 112     syncable::DirectoryManager* mgr, |  | 
| 113     ServerConnectionManager* connection_manager, |  | 
| 114     AllStatus* all_status, ModelSafeWorker* model_safe_worker) |  | 
| 115     : SyncerThread() { |  | 
| 116   impl_.reset(new SyncerThreadPthreadImpl(command_channel, mgr, |  | 
| 117       connection_manager, all_status, model_safe_worker)); |  | 
| 118 } |  | 
| 119 |  | 
| 120 bool SyncerThreadPthreadImpl::NudgeSyncer(int milliseconds_from_now, |  | 
| 121                                           SyncerThread::NudgeSource source) { |  | 
| 122   MutexLock lock(&mutex_); |  | 
| 123   if (syncer_ == NULL) { |  | 
| 124     return false; |  | 
| 125   } |  | 
| 126   NudgeSyncImpl(milliseconds_from_now, source); |  | 
| 127   return true; |  | 
| 128 } |  | 
| 129 |  | 
| 130 void* RunSyncerThread(void* syncer_thread) { |  | 
| 131   return (reinterpret_cast<SyncerThreadPthreadImpl*>( |  | 
| 132       syncer_thread))->ThreadMain(); |  | 
| 133 } |  | 
| 134 |  | 
| 135 SyncerThreadPthreadImpl::SyncerThreadPthreadImpl( |  | 
| 136     ClientCommandChannel* command_channel, |  | 
| 137     syncable::DirectoryManager* mgr, |  | 
| 138     ServerConnectionManager* connection_manager, |  | 
| 139     AllStatus* all_status, |  | 
| 140     ModelSafeWorker* model_safe_worker) |  | 
| 141     : dirman_(mgr), scm_(connection_manager), |  | 
| 142       syncer_(NULL), syncer_events_(NULL), thread_running_(false), |  | 
| 143       syncer_short_poll_interval_seconds_( |  | 
| 144           SyncerThread::kDefaultShortPollIntervalSeconds), |  | 
| 145       syncer_long_poll_interval_seconds_( |  | 
| 146           SyncerThread::kDefaultLongPollIntervalSeconds), |  | 
| 147       syncer_polling_interval_(SyncerThread::kDefaultShortPollIntervalSeconds), |  | 
| 148       syncer_max_interval_(SyncerThread::kDefaultMaxPollIntervalMs), |  | 
| 149       stop_syncer_thread_(false), connected_(false), conn_mgr_hookup_(NULL), |  | 
| 150       p2p_authenticated_(false), p2p_subscribed_(false), |  | 
| 151       allstatus_(all_status), talk_mediator_hookup_(NULL), |  | 
| 152       command_channel_(command_channel), directory_manager_hookup_(NULL), |  | 
| 153       model_safe_worker_(model_safe_worker), |  | 
| 154       client_command_hookup_(NULL), disable_idle_detection_(false) { |  | 
| 155 |  | 
| 156   SyncerEvent shutdown = { SyncerEvent::SHUTDOWN_USE_WITH_CARE }; |  | 
| 157   syncer_event_channel_.reset(new SyncerEventChannel(shutdown)); |  | 
| 158 |  | 
| 159   if (dirman_) { |  | 
| 160     directory_manager_hookup_.reset(NewEventListenerHookup( |  | 
| 161         dirman_->channel(), this, |  | 
| 162         &SyncerThreadPthreadImpl::HandleDirectoryManagerEvent)); |  | 
| 163   } |  | 
| 164 |  | 
| 165   if (scm_) { |  | 
| 166     WatchConnectionManager(scm_); |  | 
| 167   } |  | 
| 168 |  | 
| 169   if (command_channel_) { |  | 
| 170     WatchClientCommands(command_channel_); |  | 
| 171   } |  | 
| 172 } |  | 
| 173 |  | 
| 174 SyncerThreadPthreadImpl::~SyncerThreadPthreadImpl() { |  | 
| 175   client_command_hookup_.reset(); |  | 
| 176   conn_mgr_hookup_.reset(); |  | 
| 177   syncer_event_channel_.reset(); |  | 
| 178   directory_manager_hookup_.reset(); |  | 
| 179   syncer_events_.reset(); |  | 
| 180   delete syncer_; |  | 
| 181   talk_mediator_hookup_.reset(); |  | 
| 182   CHECK(!thread_running_); |  | 
| 183 } |  | 
| 184 |  | 
| 185 // Creates and starts a syncer thread. |  | 
| 186 // Returns true if it creates a thread or if there's currently a thread running |  | 
| 187 // and false otherwise. |  | 
| 188 bool SyncerThreadPthreadImpl::Start() { |  | 
| 189   MutexLock lock(&mutex_); |  | 
| 190   if (thread_running_) { |  | 
| 191     return true; |  | 
| 192   } |  | 
| 193   thread_running_ = |  | 
| 194     (0 == pthread_create(&thread_, NULL, RunSyncerThread, this)); |  | 
| 195   if (thread_running_) { |  | 
| 196     pthread_detach(thread_); |  | 
| 197   } |  | 
| 198   return thread_running_; |  | 
| 199 } |  | 
| 200 |  | 
| 201 // Stop processing. A max wait of at least 2*server RTT time is recommended. |  | 
| 202 // Returns true if we stopped, false otherwise. |  | 
| 203 bool SyncerThreadPthreadImpl::Stop(int max_wait) { |  | 
| 204   MutexLock lock(&mutex_); |  | 
| 205   if (!thread_running_) |  | 
| 206     return true; |  | 
| 207   stop_syncer_thread_ = true; |  | 
| 208   if (NULL != syncer_) { |  | 
| 209     // Try to early exit the syncer. |  | 
| 210     syncer_->RequestEarlyExit(); |  | 
| 211   } |  | 
| 212   pthread_cond_broadcast(&changed_.condvar_); |  | 
| 213   timespec deadline = { time(NULL) + (max_wait / 1000), 0 }; |  | 
| 214   do { |  | 
| 215     const int wait_result = max_wait < 0 ? |  | 
| 216       pthread_cond_wait(&changed_.condvar_, &mutex_.mutex_) : |  | 
| 217       pthread_cond_timedwait(&changed_.condvar_, &mutex_.mutex_, |  | 
| 218                              &deadline); |  | 
| 219     if (ETIMEDOUT == wait_result) { |  | 
| 220       LOG(ERROR) << "SyncerThread::Stop timed out. Problems likely."; |  | 
| 221       return false; |  | 
| 222     } |  | 
| 223   } while (thread_running_); |  | 
| 224   return true; |  | 
| 225 } |  | 
| 226 |  | 
| 227 void SyncerThreadPthreadImpl::WatchClientCommands( |  | 
| 228     ClientCommandChannel* channel) { |  | 
| 229   PThreadScopedLock<PThreadMutex> lock(&mutex_); |  | 
| 230   client_command_hookup_.reset(NewEventListenerHookup(channel, this, |  | 
| 231       &SyncerThreadPthreadImpl::HandleClientCommand)); |  | 
| 232 } |  | 
| 233 |  | 
| 234 void SyncerThreadPthreadImpl::HandleClientCommand( |  | 
| 235     ClientCommandChannel::EventType event) { |  | 
| 236   if (!event) { |  | 
| 237     return; |  | 
| 238   } |  | 
| 239 |  | 
| 240   // Mutex not really necessary for these. |  | 
| 241   if (event->has_set_sync_poll_interval()) { |  | 
| 242     syncer_short_poll_interval_seconds_ = event->set_sync_poll_interval(); |  | 
| 243   } |  | 
| 244 |  | 
| 245   if (event->has_set_sync_long_poll_interval()) { |  | 
| 246     syncer_long_poll_interval_seconds_ = event->set_sync_long_poll_interval(); |  | 
| 247   } |  | 
| 248 } |  | 
| 249 |  | 
| 250 void SyncerThreadPthreadImpl::ThreadMainLoop() { |  | 
| 251   // Use the short poll value by default. |  | 
| 252   int poll_seconds = syncer_short_poll_interval_seconds_; |  | 
| 253   int user_idle_milliseconds = 0; |  | 
| 254   timespec last_sync_time = { 0 }; |  | 
| 255   bool initial_sync_for_thread = true; |  | 
| 256   bool continue_sync_cycle = false; |  | 
| 257 |  | 
| 258   while (!stop_syncer_thread_) { |  | 
| 259     if (!connected_) { |  | 
| 260       LOG(INFO) << "Syncer thread waiting for connection."; |  | 
| 261       while (!connected_ && !stop_syncer_thread_) |  | 
| 262         pthread_cond_wait(&changed_.condvar_, &mutex_.mutex_); |  | 
| 263       LOG_IF(INFO, connected_) << "Syncer thread found connection."; |  | 
| 264       continue; |  | 
| 265     } |  | 
| 266 |  | 
| 267     if (syncer_ == NULL) { |  | 
| 268       LOG(INFO) << "Syncer thread waiting for database initialization."; |  | 
| 269       while (syncer_ == NULL && !stop_syncer_thread_) |  | 
| 270         pthread_cond_wait(&changed_.condvar_, &mutex_.mutex_); |  | 
| 271       LOG_IF(INFO, !(syncer_ == NULL)) << "Syncer was found after DB started."; |  | 
| 272       continue; |  | 
| 273     } |  | 
| 274 |  | 
| 275     timespec const next_poll = { last_sync_time.tv_sec + poll_seconds, |  | 
| 276                                  last_sync_time.tv_nsec }; |  | 
| 277     const timespec wake_time = |  | 
| 278       !nudge_queue_.empty() && nudge_queue_.top().first < next_poll ? |  | 
| 279       nudge_queue_.top().first : next_poll; |  | 
| 280     LOG(INFO) << "wake time is " << wake_time.tv_sec; |  | 
| 281     LOG(INFO) << "next poll is " << next_poll.tv_sec; |  | 
| 282 |  | 
| 283     const int error = pthread_cond_timedwait(&changed_.condvar_, &mutex_.mutex_, |  | 
| 284                                              &wake_time); |  | 
| 285     if (ETIMEDOUT != error) { |  | 
| 286       continue;  // Check all the conditions again. |  | 
| 287     } |  | 
| 288 |  | 
| 289     const timespec now = GetPThreadAbsoluteTime(0); |  | 
| 290 |  | 
| 291     // Handle a nudge, caused by either a notification or a local bookmark |  | 
| 292     // event.  This will also update the source of the following SyncMain call. |  | 
| 293     UpdateNudgeSource(now, &continue_sync_cycle, &initial_sync_for_thread); |  | 
| 294 |  | 
| 295     LOG(INFO) << "Calling Sync Main at time " << now.tv_sec; |  | 
| 296     SyncMain(syncer_); |  | 
| 297     last_sync_time = now; |  | 
| 298 |  | 
| 299     LOG(INFO) << "Updating the next polling time after SyncMain"; |  | 
| 300     poll_seconds = CalculatePollingWaitTime(allstatus_->status(), |  | 
| 301                                             poll_seconds, |  | 
| 302                                             &user_idle_milliseconds, |  | 
| 303                                             &continue_sync_cycle); |  | 
| 304   } |  | 
| 305 } |  | 
| 306 |  | 
| 307 // We check how long the user's been idle and sync less often if the machine is |  | 
| 308 // not in use. The aim is to reduce server load. |  | 
| 309 int SyncerThreadPthreadImpl::CalculatePollingWaitTime( |  | 
| 310     const AllStatus::Status& status, |  | 
| 311     int last_poll_wait,  // in s |  | 
| 312     int* user_idle_milliseconds, |  | 
| 313     bool* continue_sync_cycle) { |  | 
| 314   bool is_continuing_sync_cyle = *continue_sync_cycle; |  | 
| 315   *continue_sync_cycle = false; |  | 
| 316 |  | 
| 317   // Determine if the syncer has unfinished work to do from allstatus_. |  | 
| 318   const bool syncer_has_work_to_do = |  | 
| 319     status.updates_available > status.updates_received |  | 
| 320     || status.unsynced_count > 0; |  | 
| 321   LOG(INFO) << "syncer_has_work_to_do is " << syncer_has_work_to_do; |  | 
| 322 |  | 
| 323   // First calculate the expected wait time, figuring in any backoff because of |  | 
| 324   // user idle time.  next_wait is in seconds |  | 
| 325   syncer_polling_interval_ = (!status.notifications_enabled) ? |  | 
| 326       syncer_short_poll_interval_seconds_ : |  | 
| 327       syncer_long_poll_interval_seconds_; |  | 
| 328   int default_next_wait = syncer_polling_interval_; |  | 
| 329   int actual_next_wait = default_next_wait; |  | 
| 330 |  | 
| 331   if (syncer_has_work_to_do) { |  | 
| 332     // Provide exponential backoff due to consecutive errors, else attempt to |  | 
| 333     // complete the work as soon as possible. |  | 
| 334     if (!is_continuing_sync_cyle) { |  | 
| 335       actual_next_wait = AllStatus::GetRecommendedDelaySeconds(0); |  | 
| 336     } else { |  | 
| 337       actual_next_wait = AllStatus::GetRecommendedDelaySeconds(last_poll_wait); |  | 
| 338     } |  | 
| 339     *continue_sync_cycle = true; |  | 
| 340   } else if (!status.notifications_enabled) { |  | 
| 341     // Ensure that we start exponential backoff from our base polling |  | 
| 342     // interval when we are not continuing a sync cycle. |  | 
| 343     last_poll_wait = std::max(last_poll_wait, syncer_polling_interval_); |  | 
| 344 |  | 
| 345     // Did the user start interacting with the computer again? |  | 
| 346     // If so, revise our idle time (and probably next_sync_time) downwards |  | 
| 347     int new_idle_time = disable_idle_detection_ ? 0 : UserIdleTime(); |  | 
| 348     if (new_idle_time < *user_idle_milliseconds) { |  | 
| 349       *user_idle_milliseconds = new_idle_time; |  | 
| 350     } |  | 
| 351     actual_next_wait = CalculateSyncWaitTime(last_poll_wait * 1000, |  | 
| 352                                               *user_idle_milliseconds) / 1000; |  | 
| 353     DCHECK_GE(actual_next_wait, default_next_wait); |  | 
| 354   } |  | 
| 355 |  | 
| 356   LOG(INFO) << "Sync wait: idle " << default_next_wait |  | 
| 357             << " non-idle or backoff " << actual_next_wait << "."; |  | 
| 358 |  | 
| 359   return actual_next_wait; |  | 
| 360 } |  | 
| 361 |  | 
| 362 void* SyncerThreadPthreadImpl::ThreadMain() { |  | 
| 363   NameCurrentThreadForDebugging("SyncEngine_SyncerThread"); |  | 
| 364   mutex_.Lock(); |  | 
| 365   ThreadMainLoop(); |  | 
| 366   thread_running_ = false; |  | 
| 367   pthread_cond_broadcast(&changed_.condvar_); |  | 
| 368   mutex_.Unlock(); |  | 
| 369   LOG(INFO) << "Syncer thread exiting."; |  | 
| 370   return 0; |  | 
| 371 } |  | 
| 372 |  | 
| 373 void SyncerThreadPthreadImpl::SyncMain(Syncer* syncer) { |  | 
| 374   CHECK(syncer); |  | 
| 375   mutex_.Unlock(); |  | 
| 376   while (syncer->SyncShare()) { |  | 
| 377     LOG(INFO) << "Looping in sync share"; |  | 
| 378   } |  | 
| 379   LOG(INFO) << "Done looping in sync share"; |  | 
| 380 |  | 
| 381   mutex_.Lock(); |  | 
| 382 } |  | 
| 383 |  | 
| 384 void SyncerThreadPthreadImpl::UpdateNudgeSource(const timespec& now, |  | 
| 385                                      bool* continue_sync_cycle, |  | 
| 386                                      bool* initial_sync) { |  | 
| 387   bool nudged = false; |  | 
| 388   SyncerThread::NudgeSource nudge_source = SyncerThread::kUnknown; |  | 
| 389   // Has the previous sync cycle completed? |  | 
| 390   if (continue_sync_cycle) { |  | 
| 391     nudge_source = SyncerThread::kContinuation; |  | 
| 392   } |  | 
| 393   // Update the nudge source if a new nudge has come through during the |  | 
| 394   // previous sync cycle. |  | 
| 395   while (!nudge_queue_.empty() && !(now < nudge_queue_.top().first)) { |  | 
| 396     if (!nudged) { |  | 
| 397       nudge_source = nudge_queue_.top().second; |  | 
| 398       *continue_sync_cycle = false;  //  Reset the continuation token on nudge. |  | 
| 399       nudged = true; |  | 
| 400     } |  | 
| 401     nudge_queue_.pop(); |  | 
| 402   } |  | 
| 403   SetUpdatesSource(nudged, nudge_source, initial_sync); |  | 
| 404 } |  | 
| 405 |  | 
| 406 void SyncerThreadPthreadImpl::SetUpdatesSource(bool nudged, |  | 
| 407     SyncerThread::NudgeSource nudge_source, bool* initial_sync) { |  | 
| 408   sync_pb::GetUpdatesCallerInfo::GET_UPDATES_SOURCE updates_source = |  | 
| 409       sync_pb::GetUpdatesCallerInfo::UNKNOWN; |  | 
| 410   if (*initial_sync) { |  | 
| 411     updates_source = sync_pb::GetUpdatesCallerInfo::FIRST_UPDATE; |  | 
| 412     *initial_sync = false; |  | 
| 413   } else if (!nudged) { |  | 
| 414     updates_source = sync_pb::GetUpdatesCallerInfo::PERIODIC; |  | 
| 415   } else { |  | 
| 416     switch (nudge_source) { |  | 
| 417       case SyncerThread::kNotification: |  | 
| 418         updates_source = sync_pb::GetUpdatesCallerInfo::NOTIFICATION; |  | 
| 419         break; |  | 
| 420       case SyncerThread::kLocal: |  | 
| 421         updates_source = sync_pb::GetUpdatesCallerInfo::LOCAL; |  | 
| 422         break; |  | 
| 423       case SyncerThread::kContinuation: |  | 
| 424         updates_source = sync_pb::GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION; |  | 
| 425         break; |  | 
| 426       case SyncerThread::kUnknown: |  | 
| 427       default: |  | 
| 428         updates_source = sync_pb::GetUpdatesCallerInfo::UNKNOWN; |  | 
| 429         break; |  | 
| 430     } |  | 
| 431   } |  | 
| 432   syncer_->set_updates_source(updates_source); |  | 
| 433 } |  | 
| 434 |  | 
| 435 void SyncerThreadPthreadImpl::HandleSyncerEvent(const SyncerEvent& event) { |  | 
| 436   MutexLock lock(&mutex_); |  | 
| 437   channel()->NotifyListeners(event); |  | 
| 438   if (SyncerEvent::REQUEST_SYNC_NUDGE != event.what_happened) { |  | 
| 439     return; |  | 
| 440   } |  | 
| 441   NudgeSyncImpl(event.nudge_delay_milliseconds, SyncerThread::kUnknown); |  | 
| 442 } |  | 
| 443 |  | 
| 444 void SyncerThreadPthreadImpl::HandleDirectoryManagerEvent( |  | 
| 445     const syncable::DirectoryManagerEvent& event) { |  | 
| 446   LOG(INFO) << "Handling a directory manager event"; |  | 
| 447   if (syncable::DirectoryManagerEvent::OPENED == event.what_happened) { |  | 
| 448     MutexLock lock(&mutex_); |  | 
| 449     LOG(INFO) << "Syncer starting up for: " << event.dirname; |  | 
| 450     // The underlying database structure is ready, and we should create |  | 
| 451     // the syncer. |  | 
| 452     CHECK(syncer_ == NULL); |  | 
| 453     syncer_ = |  | 
| 454         new Syncer(dirman_, event.dirname, scm_, model_safe_worker_.get()); |  | 
| 455 |  | 
| 456     syncer_->set_command_channel(command_channel_); |  | 
| 457     syncer_events_.reset(NewEventListenerHookup( |  | 
| 458         syncer_->channel(), this, &SyncerThreadPthreadImpl::HandleSyncerEvent)); |  | 
| 459     pthread_cond_broadcast(&changed_.condvar_); |  | 
| 460   } |  | 
| 461 } |  | 
| 462 |  | 
| 463 static inline void CheckConnected(bool* connected, |  | 
| 464                                   HttpResponse::ServerConnectionCode code, |  | 
| 465                                   pthread_cond_t* condvar) { |  | 
| 466   if (*connected) { |  | 
| 467     if (HttpResponse::CONNECTION_UNAVAILABLE == code) { |  | 
| 468       *connected = false; |  | 
| 469       pthread_cond_broadcast(condvar); |  | 
| 470     } |  | 
| 471   } else { |  | 
| 472     if (HttpResponse::SERVER_CONNECTION_OK == code) { |  | 
| 473       *connected = true; |  | 
| 474       pthread_cond_broadcast(condvar); |  | 
| 475     } |  | 
| 476   } |  | 
| 477 } |  | 
| 478 |  | 
| 479 void SyncerThreadPthreadImpl::WatchConnectionManager( |  | 
| 480     ServerConnectionManager* conn_mgr) { |  | 
| 481   conn_mgr_hookup_.reset(NewEventListenerHookup(conn_mgr->channel(), this, |  | 
| 482       &SyncerThreadPthreadImpl::HandleServerConnectionEvent)); |  | 
| 483   CheckConnected(&connected_, conn_mgr->server_status(), |  | 
| 484                  &changed_.condvar_); |  | 
| 485 } |  | 
| 486 |  | 
| 487 void SyncerThreadPthreadImpl::HandleServerConnectionEvent( |  | 
| 488     const ServerConnectionEvent& event) { |  | 
| 489   if (ServerConnectionEvent::STATUS_CHANGED == event.what_happened) { |  | 
| 490     MutexLock lock(&mutex_); |  | 
| 491     CheckConnected(&connected_, event.connection_code, |  | 
| 492                    &changed_.condvar_); |  | 
| 493   } |  | 
| 494 } |  | 
| 495 |  | 
| 496 SyncerEventChannel* SyncerThreadPthreadImpl::channel() { |  | 
| 497   return syncer_event_channel_.get(); |  | 
| 498 } |  | 
| 499 |  | 
| 500 // Inputs and return value in milliseconds. |  | 
| 501 int SyncerThreadPthreadImpl::CalculateSyncWaitTime(int last_interval, |  | 
| 502     int user_idle_ms) { |  | 
| 503   // syncer_polling_interval_ is in seconds |  | 
| 504   int syncer_polling_interval_ms = syncer_polling_interval_ * 1000; |  | 
| 505 |  | 
| 506   // This is our default and lower bound. |  | 
| 507   int next_wait = syncer_polling_interval_ms; |  | 
| 508 |  | 
| 509   // Get idle time, bounded by max wait. |  | 
| 510   int idle = min(user_idle_ms, syncer_max_interval_); |  | 
| 511 |  | 
| 512   // If the user has been idle for a while, we'll start decreasing the poll |  | 
| 513   // rate. |  | 
| 514   if (idle >= kPollBackoffThresholdMultiplier * syncer_polling_interval_ms) { |  | 
| 515     next_wait = std::min(AllStatus::GetRecommendedDelaySeconds( |  | 
| 516         last_interval / 1000), syncer_max_interval_ / 1000) * 1000; |  | 
| 517   } |  | 
| 518 |  | 
| 519   return next_wait; |  | 
| 520 } |  | 
| 521 |  | 
| 522 // Called with mutex_ already locked. |  | 
| 523 void SyncerThreadPthreadImpl::NudgeSyncImpl(int milliseconds_from_now, |  | 
| 524                                  SyncerThread::NudgeSource source) { |  | 
| 525   const timespec nudge_time = GetPThreadAbsoluteTime(milliseconds_from_now); |  | 
| 526   NudgeObject nudge_object(nudge_time, source); |  | 
| 527   nudge_queue_.push(nudge_object); |  | 
| 528   pthread_cond_broadcast(&changed_.condvar_); |  | 
| 529 } |  | 
| 530 |  | 
| 531 void SyncerThreadPthreadImpl::WatchTalkMediator(TalkMediator* mediator) { |  | 
| 532   talk_mediator_hookup_.reset( |  | 
| 533       NewEventListenerHookup( |  | 
| 534           mediator->channel(), |  | 
| 535           this, |  | 
| 536           &SyncerThreadPthreadImpl::HandleTalkMediatorEvent)); |  | 
| 537 } |  | 
| 538 |  | 
| 539 void SyncerThreadPthreadImpl::HandleTalkMediatorEvent( |  | 
| 540     const TalkMediatorEvent& event) { |  | 
| 541   MutexLock lock(&mutex_); |  | 
| 542   switch (event.what_happened) { |  | 
| 543     case TalkMediatorEvent::LOGIN_SUCCEEDED: |  | 
| 544       LOG(INFO) << "P2P: Login succeeded."; |  | 
| 545       p2p_authenticated_ = true; |  | 
| 546       break; |  | 
| 547     case TalkMediatorEvent::LOGOUT_SUCCEEDED: |  | 
| 548       LOG(INFO) << "P2P: Login succeeded."; |  | 
| 549       p2p_authenticated_ = false; |  | 
| 550       break; |  | 
| 551     case TalkMediatorEvent::SUBSCRIPTIONS_ON: |  | 
| 552       LOG(INFO) << "P2P: Subscriptions successfully enabled."; |  | 
| 553       p2p_subscribed_ = true; |  | 
| 554       if (NULL != syncer_) { |  | 
| 555         LOG(INFO) << "Subscriptions on.  Nudging syncer for initial push."; |  | 
| 556         NudgeSyncImpl(0, SyncerThread::kLocal); |  | 
| 557       } |  | 
| 558       break; |  | 
| 559     case TalkMediatorEvent::SUBSCRIPTIONS_OFF: |  | 
| 560       LOG(INFO) << "P2P: Subscriptions are not enabled."; |  | 
| 561       p2p_subscribed_ = false; |  | 
| 562       break; |  | 
| 563     case TalkMediatorEvent::NOTIFICATION_RECEIVED: |  | 
| 564       LOG(INFO) << "P2P: Updates on server, pushing syncer"; |  | 
| 565       if (NULL != syncer_) { |  | 
| 566         NudgeSyncImpl(0, SyncerThread::kNotification); |  | 
| 567       } |  | 
| 568       break; |  | 
| 569     default: |  | 
| 570       break; |  | 
| 571   } |  | 
| 572 |  | 
| 573   if (NULL != syncer_) { |  | 
| 574     syncer_->set_notifications_enabled(p2p_authenticated_ && p2p_subscribed_); |  | 
| 575   } |  | 
| 576 } |  | 
| 577 |  | 
| 578 }  // namespace browser_sync |  | 
| OLD | NEW | 
|---|