OLD | NEW |
1 // Copyright (c) 2009 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2009 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 #include "chrome/browser/sync/engine/syncer_thread_pthreads.h" |
5 #include "chrome/browser/sync/engine/syncer_thread.h" | |
6 | 5 |
7 #include "build/build_config.h" | 6 #include "build/build_config.h" |
8 | 7 |
9 #ifdef OS_MACOSX | 8 #ifdef OS_MACOSX |
10 #include <CoreFoundation/CFNumber.h> | 9 #include <CoreFoundation/CFNumber.h> |
11 #include <IOKit/IOTypes.h> | 10 #include <IOKit/IOTypes.h> |
12 #include <IOKit/IOKitLib.h> | 11 #include <IOKit/IOKitLib.h> |
13 #endif | 12 #endif |
14 | 13 |
15 #include <algorithm> | 14 #include <algorithm> |
(...skipping 85 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
101 } | 100 } |
102 #endif | 101 #endif |
103 | 102 |
104 return 0; | 103 return 0; |
105 } | 104 } |
106 | 105 |
107 } // namespace | 106 } // namespace |
108 | 107 |
109 namespace browser_sync { | 108 namespace browser_sync { |
110 | 109 |
111 bool SyncerThread::NudgeSyncer(int milliseconds_from_now, NudgeSource source) { | 110 SyncerThreadPthreads::SyncerThreadPthreads( |
| 111 ClientCommandChannel* command_channel, |
| 112 syncable::DirectoryManager* mgr, |
| 113 ServerConnectionManager* connection_manager, |
| 114 AllStatus* all_status, ModelSafeWorker* model_safe_worker) |
| 115 : SyncerThread() { |
| 116 impl_.reset(new SyncerThreadPthreadImpl(command_channel, mgr, |
| 117 connection_manager, all_status, model_safe_worker)); |
| 118 } |
| 119 |
| 120 bool SyncerThreadPthreadImpl::NudgeSyncer(int milliseconds_from_now, |
| 121 SyncerThread::NudgeSource source) { |
112 MutexLock lock(&mutex_); | 122 MutexLock lock(&mutex_); |
113 if (syncer_ == NULL) { | 123 if (syncer_ == NULL) { |
114 return false; | 124 return false; |
115 } | 125 } |
116 NudgeSyncImpl(milliseconds_from_now, source); | 126 NudgeSyncImpl(milliseconds_from_now, source); |
117 return true; | 127 return true; |
118 } | 128 } |
119 | 129 |
120 void* RunSyncerThread(void* syncer_thread) { | 130 void* RunSyncerThread(void* syncer_thread) { |
121 return (reinterpret_cast<SyncerThread*>(syncer_thread))->ThreadMain(); | 131 return (reinterpret_cast<SyncerThreadPthreadImpl*>( |
| 132 syncer_thread))->ThreadMain(); |
122 } | 133 } |
123 | 134 |
124 SyncerThread::SyncerThread( | 135 SyncerThreadPthreadImpl::SyncerThreadPthreadImpl( |
125 ClientCommandChannel* command_channel, | 136 ClientCommandChannel* command_channel, |
126 syncable::DirectoryManager* mgr, | 137 syncable::DirectoryManager* mgr, |
127 ServerConnectionManager* connection_manager, | 138 ServerConnectionManager* connection_manager, |
128 AllStatus* all_status, | 139 AllStatus* all_status, |
129 ModelSafeWorker* model_safe_worker) | 140 ModelSafeWorker* model_safe_worker) |
130 : dirman_(mgr), scm_(connection_manager), | 141 : dirman_(mgr), scm_(connection_manager), |
131 syncer_(NULL), syncer_events_(NULL), thread_running_(false), | 142 syncer_(NULL), syncer_events_(NULL), thread_running_(false), |
132 syncer_short_poll_interval_seconds_(kDefaultShortPollIntervalSeconds), | 143 syncer_short_poll_interval_seconds_( |
133 syncer_long_poll_interval_seconds_(kDefaultLongPollIntervalSeconds), | 144 SyncerThread::kDefaultShortPollIntervalSeconds), |
134 syncer_polling_interval_(kDefaultShortPollIntervalSeconds), | 145 syncer_long_poll_interval_seconds_( |
135 syncer_max_interval_(kDefaultMaxPollIntervalMs), | 146 SyncerThread::kDefaultLongPollIntervalSeconds), |
| 147 syncer_polling_interval_(SyncerThread::kDefaultShortPollIntervalSeconds), |
| 148 syncer_max_interval_(SyncerThread::kDefaultMaxPollIntervalMs), |
136 stop_syncer_thread_(false), connected_(false), conn_mgr_hookup_(NULL), | 149 stop_syncer_thread_(false), connected_(false), conn_mgr_hookup_(NULL), |
137 p2p_authenticated_(false), p2p_subscribed_(false), | 150 p2p_authenticated_(false), p2p_subscribed_(false), |
138 allstatus_(all_status), talk_mediator_hookup_(NULL), | 151 allstatus_(all_status), talk_mediator_hookup_(NULL), |
139 command_channel_(command_channel), directory_manager_hookup_(NULL), | 152 command_channel_(command_channel), directory_manager_hookup_(NULL), |
140 model_safe_worker_(model_safe_worker), | 153 model_safe_worker_(model_safe_worker), |
141 client_command_hookup_(NULL), disable_idle_detection_(false) { | 154 client_command_hookup_(NULL), disable_idle_detection_(false) { |
142 | 155 |
143 SyncerEvent shutdown = { SyncerEvent::SHUTDOWN_USE_WITH_CARE }; | 156 SyncerEvent shutdown = { SyncerEvent::SHUTDOWN_USE_WITH_CARE }; |
144 syncer_event_channel_.reset(new SyncerEventChannel(shutdown)); | 157 syncer_event_channel_.reset(new SyncerEventChannel(shutdown)); |
145 | 158 |
146 if (dirman_) { | 159 if (dirman_) { |
147 directory_manager_hookup_.reset(NewEventListenerHookup( | 160 directory_manager_hookup_.reset(NewEventListenerHookup( |
148 dirman_->channel(), this, &SyncerThread::HandleDirectoryManagerEvent)); | 161 dirman_->channel(), this, |
| 162 &SyncerThreadPthreadImpl::HandleDirectoryManagerEvent)); |
149 } | 163 } |
150 | 164 |
151 if (scm_) { | 165 if (scm_) { |
152 WatchConnectionManager(scm_); | 166 WatchConnectionManager(scm_); |
153 } | 167 } |
154 | 168 |
155 if (command_channel_) { | 169 if (command_channel_) { |
156 WatchClientCommands(command_channel_); | 170 WatchClientCommands(command_channel_); |
157 } | 171 } |
158 } | 172 } |
159 | 173 |
160 SyncerThread::~SyncerThread() { | 174 SyncerThreadPthreadImpl::~SyncerThreadPthreadImpl() { |
161 client_command_hookup_.reset(); | 175 client_command_hookup_.reset(); |
162 conn_mgr_hookup_.reset(); | 176 conn_mgr_hookup_.reset(); |
163 syncer_event_channel_.reset(); | 177 syncer_event_channel_.reset(); |
164 directory_manager_hookup_.reset(); | 178 directory_manager_hookup_.reset(); |
165 syncer_events_.reset(); | 179 syncer_events_.reset(); |
166 delete syncer_; | 180 delete syncer_; |
167 talk_mediator_hookup_.reset(); | 181 talk_mediator_hookup_.reset(); |
168 CHECK(!thread_running_); | 182 CHECK(!thread_running_); |
169 } | 183 } |
170 | 184 |
171 // Creates and starts a syncer thread. | 185 // Creates and starts a syncer thread. |
172 // Returns true if it creates a thread or if there's currently a thread running | 186 // Returns true if it creates a thread or if there's currently a thread running |
173 // and false otherwise. | 187 // and false otherwise. |
174 bool SyncerThread::Start() { | 188 bool SyncerThreadPthreadImpl::Start() { |
175 MutexLock lock(&mutex_); | 189 MutexLock lock(&mutex_); |
176 if (thread_running_) { | 190 if (thread_running_) { |
177 return true; | 191 return true; |
178 } | 192 } |
179 thread_running_ = | 193 thread_running_ = |
180 (0 == pthread_create(&thread_, NULL, RunSyncerThread, this)); | 194 (0 == pthread_create(&thread_, NULL, RunSyncerThread, this)); |
181 if (thread_running_) { | 195 if (thread_running_) { |
182 pthread_detach(thread_); | 196 pthread_detach(thread_); |
183 } | 197 } |
184 return thread_running_; | 198 return thread_running_; |
185 } | 199 } |
186 | 200 |
187 // Stop processing. A max wait of at least 2*server RTT time is recommended. | 201 // Stop processing. A max wait of at least 2*server RTT time is recommended. |
188 // Returns true if we stopped, false otherwise. | 202 // Returns true if we stopped, false otherwise. |
189 bool SyncerThread::Stop(int max_wait) { | 203 bool SyncerThreadPthreadImpl::Stop(int max_wait) { |
190 MutexLock lock(&mutex_); | 204 MutexLock lock(&mutex_); |
191 if (!thread_running_) | 205 if (!thread_running_) |
192 return true; | 206 return true; |
193 stop_syncer_thread_ = true; | 207 stop_syncer_thread_ = true; |
194 if (NULL != syncer_) { | 208 if (NULL != syncer_) { |
195 // Try to early exit the syncer. | 209 // Try to early exit the syncer. |
196 syncer_->RequestEarlyExit(); | 210 syncer_->RequestEarlyExit(); |
197 } | 211 } |
198 pthread_cond_broadcast(&changed_.condvar_); | 212 pthread_cond_broadcast(&changed_.condvar_); |
199 timespec deadline = { time(NULL) + (max_wait / 1000), 0 }; | 213 timespec deadline = { time(NULL) + (max_wait / 1000), 0 }; |
200 do { | 214 do { |
201 const int wait_result = max_wait < 0 ? | 215 const int wait_result = max_wait < 0 ? |
202 pthread_cond_wait(&changed_.condvar_, &mutex_.mutex_) : | 216 pthread_cond_wait(&changed_.condvar_, &mutex_.mutex_) : |
203 pthread_cond_timedwait(&changed_.condvar_, &mutex_.mutex_, | 217 pthread_cond_timedwait(&changed_.condvar_, &mutex_.mutex_, |
204 &deadline); | 218 &deadline); |
205 if (ETIMEDOUT == wait_result) { | 219 if (ETIMEDOUT == wait_result) { |
206 LOG(ERROR) << "SyncerThread::Stop timed out. Problems likely."; | 220 LOG(ERROR) << "SyncerThread::Stop timed out. Problems likely."; |
207 return false; | 221 return false; |
208 } | 222 } |
209 } while (thread_running_); | 223 } while (thread_running_); |
210 return true; | 224 return true; |
211 } | 225 } |
212 | 226 |
213 void SyncerThread::WatchClientCommands(ClientCommandChannel* channel) { | 227 void SyncerThreadPthreadImpl::WatchClientCommands( |
| 228 ClientCommandChannel* channel) { |
214 PThreadScopedLock<PThreadMutex> lock(&mutex_); | 229 PThreadScopedLock<PThreadMutex> lock(&mutex_); |
215 client_command_hookup_.reset(NewEventListenerHookup(channel, this, | 230 client_command_hookup_.reset(NewEventListenerHookup(channel, this, |
216 &SyncerThread::HandleClientCommand)); | 231 &SyncerThreadPthreadImpl::HandleClientCommand)); |
217 } | 232 } |
218 | 233 |
219 void SyncerThread::HandleClientCommand(ClientCommandChannel::EventType event) { | 234 void SyncerThreadPthreadImpl::HandleClientCommand( |
| 235 ClientCommandChannel::EventType event) { |
220 if (!event) { | 236 if (!event) { |
221 return; | 237 return; |
222 } | 238 } |
223 | 239 |
224 // Mutex not really necessary for these. | 240 // Mutex not really necessary for these. |
225 if (event->has_set_sync_poll_interval()) { | 241 if (event->has_set_sync_poll_interval()) { |
226 syncer_short_poll_interval_seconds_ = event->set_sync_poll_interval(); | 242 syncer_short_poll_interval_seconds_ = event->set_sync_poll_interval(); |
227 } | 243 } |
228 | 244 |
229 if (event->has_set_sync_long_poll_interval()) { | 245 if (event->has_set_sync_long_poll_interval()) { |
230 syncer_long_poll_interval_seconds_ = event->set_sync_long_poll_interval(); | 246 syncer_long_poll_interval_seconds_ = event->set_sync_long_poll_interval(); |
231 } | 247 } |
232 } | 248 } |
233 | 249 |
234 void SyncerThread::ThreadMainLoop() { | 250 void SyncerThreadPthreadImpl::ThreadMainLoop() { |
235 // Use the short poll value by default. | 251 // Use the short poll value by default. |
236 int poll_seconds = syncer_short_poll_interval_seconds_; | 252 int poll_seconds = syncer_short_poll_interval_seconds_; |
237 int user_idle_milliseconds = 0; | 253 int user_idle_milliseconds = 0; |
238 timespec last_sync_time = { 0 }; | 254 timespec last_sync_time = { 0 }; |
239 bool initial_sync_for_thread = true; | 255 bool initial_sync_for_thread = true; |
240 bool continue_sync_cycle = false; | 256 bool continue_sync_cycle = false; |
241 | 257 |
242 while (!stop_syncer_thread_) { | 258 while (!stop_syncer_thread_) { |
243 if (!connected_) { | 259 if (!connected_) { |
244 LOG(INFO) << "Syncer thread waiting for connection."; | 260 LOG(INFO) << "Syncer thread waiting for connection."; |
(...skipping 38 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
283 LOG(INFO) << "Updating the next polling time after SyncMain"; | 299 LOG(INFO) << "Updating the next polling time after SyncMain"; |
284 poll_seconds = CalculatePollingWaitTime(allstatus_->status(), | 300 poll_seconds = CalculatePollingWaitTime(allstatus_->status(), |
285 poll_seconds, | 301 poll_seconds, |
286 &user_idle_milliseconds, | 302 &user_idle_milliseconds, |
287 &continue_sync_cycle); | 303 &continue_sync_cycle); |
288 } | 304 } |
289 } | 305 } |
290 | 306 |
291 // We check how long the user's been idle and sync less often if the machine is | 307 // We check how long the user's been idle and sync less often if the machine is |
292 // not in use. The aim is to reduce server load. | 308 // not in use. The aim is to reduce server load. |
293 int SyncerThread::CalculatePollingWaitTime( | 309 int SyncerThreadPthreadImpl::CalculatePollingWaitTime( |
294 const AllStatus::Status& status, | 310 const AllStatus::Status& status, |
295 int last_poll_wait, // in s | 311 int last_poll_wait, // in s |
296 int* user_idle_milliseconds, | 312 int* user_idle_milliseconds, |
297 bool* continue_sync_cycle) { | 313 bool* continue_sync_cycle) { |
298 bool is_continuing_sync_cyle = *continue_sync_cycle; | 314 bool is_continuing_sync_cyle = *continue_sync_cycle; |
299 *continue_sync_cycle = false; | 315 *continue_sync_cycle = false; |
300 | 316 |
301 // Determine if the syncer has unfinished work to do from allstatus_. | 317 // Determine if the syncer has unfinished work to do from allstatus_. |
302 const bool syncer_has_work_to_do = | 318 const bool syncer_has_work_to_do = |
303 status.updates_available > status.updates_received | 319 status.updates_available > status.updates_received |
(...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
336 *user_idle_milliseconds) / 1000; | 352 *user_idle_milliseconds) / 1000; |
337 DCHECK_GE(actual_next_wait, default_next_wait); | 353 DCHECK_GE(actual_next_wait, default_next_wait); |
338 } | 354 } |
339 | 355 |
340 LOG(INFO) << "Sync wait: idle " << default_next_wait | 356 LOG(INFO) << "Sync wait: idle " << default_next_wait |
341 << " non-idle or backoff " << actual_next_wait << "."; | 357 << " non-idle or backoff " << actual_next_wait << "."; |
342 | 358 |
343 return actual_next_wait; | 359 return actual_next_wait; |
344 } | 360 } |
345 | 361 |
346 void* SyncerThread::ThreadMain() { | 362 void* SyncerThreadPthreadImpl::ThreadMain() { |
347 NameCurrentThreadForDebugging("SyncEngine_SyncerThread"); | 363 NameCurrentThreadForDebugging("SyncEngine_SyncerThread"); |
348 mutex_.Lock(); | 364 mutex_.Lock(); |
349 ThreadMainLoop(); | 365 ThreadMainLoop(); |
350 thread_running_ = false; | 366 thread_running_ = false; |
351 pthread_cond_broadcast(&changed_.condvar_); | 367 pthread_cond_broadcast(&changed_.condvar_); |
352 mutex_.Unlock(); | 368 mutex_.Unlock(); |
353 LOG(INFO) << "Syncer thread exiting."; | 369 LOG(INFO) << "Syncer thread exiting."; |
354 return 0; | 370 return 0; |
355 } | 371 } |
356 | 372 |
357 void SyncerThread::SyncMain(Syncer* syncer) { | 373 void SyncerThreadPthreadImpl::SyncMain(Syncer* syncer) { |
358 CHECK(syncer); | 374 CHECK(syncer); |
359 mutex_.Unlock(); | 375 mutex_.Unlock(); |
360 while (syncer->SyncShare()) { | 376 while (syncer->SyncShare()) { |
361 LOG(INFO) << "Looping in sync share"; | 377 LOG(INFO) << "Looping in sync share"; |
362 } | 378 } |
363 LOG(INFO) << "Done looping in sync share"; | 379 LOG(INFO) << "Done looping in sync share"; |
364 | 380 |
365 mutex_.Lock(); | 381 mutex_.Lock(); |
366 } | 382 } |
367 | 383 |
368 void SyncerThread::UpdateNudgeSource(const timespec& now, | 384 void SyncerThreadPthreadImpl::UpdateNudgeSource(const timespec& now, |
369 bool* continue_sync_cycle, | 385 bool* continue_sync_cycle, |
370 bool* initial_sync) { | 386 bool* initial_sync) { |
371 bool nudged = false; | 387 bool nudged = false; |
372 NudgeSource nudge_source = kUnknown; | 388 SyncerThread::NudgeSource nudge_source = SyncerThread::kUnknown; |
373 // Has the previous sync cycle completed? | 389 // Has the previous sync cycle completed? |
374 if (continue_sync_cycle) { | 390 if (continue_sync_cycle) { |
375 nudge_source = kContinuation; | 391 nudge_source = SyncerThread::kContinuation; |
376 } | 392 } |
377 // Update the nudge source if a new nudge has come through during the | 393 // Update the nudge source if a new nudge has come through during the |
378 // previous sync cycle. | 394 // previous sync cycle. |
379 while (!nudge_queue_.empty() && !(now < nudge_queue_.top().first)) { | 395 while (!nudge_queue_.empty() && !(now < nudge_queue_.top().first)) { |
380 if (!nudged) { | 396 if (!nudged) { |
381 nudge_source = nudge_queue_.top().second; | 397 nudge_source = nudge_queue_.top().second; |
382 *continue_sync_cycle = false; // Reset the continuation token on nudge. | 398 *continue_sync_cycle = false; // Reset the continuation token on nudge. |
383 nudged = true; | 399 nudged = true; |
384 } | 400 } |
385 nudge_queue_.pop(); | 401 nudge_queue_.pop(); |
386 } | 402 } |
387 SetUpdatesSource(nudged, nudge_source, initial_sync); | 403 SetUpdatesSource(nudged, nudge_source, initial_sync); |
388 } | 404 } |
389 | 405 |
390 void SyncerThread::SetUpdatesSource(bool nudged, NudgeSource nudge_source, | 406 void SyncerThreadPthreadImpl::SetUpdatesSource(bool nudged, |
391 bool* initial_sync) { | 407 SyncerThread::NudgeSource nudge_source, bool* initial_sync) { |
392 sync_pb::GetUpdatesCallerInfo::GET_UPDATES_SOURCE updates_source = | 408 sync_pb::GetUpdatesCallerInfo::GET_UPDATES_SOURCE updates_source = |
393 sync_pb::GetUpdatesCallerInfo::UNKNOWN; | 409 sync_pb::GetUpdatesCallerInfo::UNKNOWN; |
394 if (*initial_sync) { | 410 if (*initial_sync) { |
395 updates_source = sync_pb::GetUpdatesCallerInfo::FIRST_UPDATE; | 411 updates_source = sync_pb::GetUpdatesCallerInfo::FIRST_UPDATE; |
396 *initial_sync = false; | 412 *initial_sync = false; |
397 } else if (!nudged) { | 413 } else if (!nudged) { |
398 updates_source = sync_pb::GetUpdatesCallerInfo::PERIODIC; | 414 updates_source = sync_pb::GetUpdatesCallerInfo::PERIODIC; |
399 } else { | 415 } else { |
400 switch (nudge_source) { | 416 switch (nudge_source) { |
401 case kNotification: | 417 case SyncerThread::kNotification: |
402 updates_source = sync_pb::GetUpdatesCallerInfo::NOTIFICATION; | 418 updates_source = sync_pb::GetUpdatesCallerInfo::NOTIFICATION; |
403 break; | 419 break; |
404 case kLocal: | 420 case SyncerThread::kLocal: |
405 updates_source = sync_pb::GetUpdatesCallerInfo::LOCAL; | 421 updates_source = sync_pb::GetUpdatesCallerInfo::LOCAL; |
406 break; | 422 break; |
407 case kContinuation: | 423 case SyncerThread::kContinuation: |
408 updates_source = sync_pb::GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION; | 424 updates_source = sync_pb::GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION; |
409 break; | 425 break; |
410 case kUnknown: | 426 case SyncerThread::kUnknown: |
411 default: | 427 default: |
412 updates_source = sync_pb::GetUpdatesCallerInfo::UNKNOWN; | 428 updates_source = sync_pb::GetUpdatesCallerInfo::UNKNOWN; |
413 break; | 429 break; |
414 } | 430 } |
415 } | 431 } |
416 syncer_->set_updates_source(updates_source); | 432 syncer_->set_updates_source(updates_source); |
417 } | 433 } |
418 | 434 |
419 void SyncerThread::HandleSyncerEvent(const SyncerEvent& event) { | 435 void SyncerThreadPthreadImpl::HandleSyncerEvent(const SyncerEvent& event) { |
420 MutexLock lock(&mutex_); | 436 MutexLock lock(&mutex_); |
421 channel()->NotifyListeners(event); | 437 channel()->NotifyListeners(event); |
422 if (SyncerEvent::REQUEST_SYNC_NUDGE != event.what_happened) { | 438 if (SyncerEvent::REQUEST_SYNC_NUDGE != event.what_happened) { |
423 return; | 439 return; |
424 } | 440 } |
425 NudgeSyncImpl(event.nudge_delay_milliseconds, kUnknown); | 441 NudgeSyncImpl(event.nudge_delay_milliseconds, SyncerThread::kUnknown); |
426 } | 442 } |
427 | 443 |
428 void SyncerThread::HandleDirectoryManagerEvent( | 444 void SyncerThreadPthreadImpl::HandleDirectoryManagerEvent( |
429 const syncable::DirectoryManagerEvent& event) { | 445 const syncable::DirectoryManagerEvent& event) { |
430 LOG(INFO) << "Handling a directory manager event"; | 446 LOG(INFO) << "Handling a directory manager event"; |
431 if (syncable::DirectoryManagerEvent::OPENED == event.what_happened) { | 447 if (syncable::DirectoryManagerEvent::OPENED == event.what_happened) { |
432 MutexLock lock(&mutex_); | 448 MutexLock lock(&mutex_); |
433 LOG(INFO) << "Syncer starting up for: " << event.dirname; | 449 LOG(INFO) << "Syncer starting up for: " << event.dirname; |
434 // The underlying database structure is ready, and we should create | 450 // The underlying database structure is ready, and we should create |
435 // the syncer. | 451 // the syncer. |
436 CHECK(syncer_ == NULL); | 452 CHECK(syncer_ == NULL); |
437 syncer_ = | 453 syncer_ = |
438 new Syncer(dirman_, event.dirname, scm_, model_safe_worker_.get()); | 454 new Syncer(dirman_, event.dirname, scm_, model_safe_worker_.get()); |
439 | 455 |
440 syncer_->set_command_channel(command_channel_); | 456 syncer_->set_command_channel(command_channel_); |
441 syncer_events_.reset(NewEventListenerHookup( | 457 syncer_events_.reset(NewEventListenerHookup( |
442 syncer_->channel(), this, &SyncerThread::HandleSyncerEvent)); | 458 syncer_->channel(), this, &SyncerThreadPthreadImpl::HandleSyncerEvent)); |
443 pthread_cond_broadcast(&changed_.condvar_); | 459 pthread_cond_broadcast(&changed_.condvar_); |
444 } | 460 } |
445 } | 461 } |
446 | 462 |
447 static inline void CheckConnected(bool* connected, | 463 static inline void CheckConnected(bool* connected, |
448 HttpResponse::ServerConnectionCode code, | 464 HttpResponse::ServerConnectionCode code, |
449 pthread_cond_t* condvar) { | 465 pthread_cond_t* condvar) { |
450 if (*connected) { | 466 if (*connected) { |
451 if (HttpResponse::CONNECTION_UNAVAILABLE == code) { | 467 if (HttpResponse::CONNECTION_UNAVAILABLE == code) { |
452 *connected = false; | 468 *connected = false; |
453 pthread_cond_broadcast(condvar); | 469 pthread_cond_broadcast(condvar); |
454 } | 470 } |
455 } else { | 471 } else { |
456 if (HttpResponse::SERVER_CONNECTION_OK == code) { | 472 if (HttpResponse::SERVER_CONNECTION_OK == code) { |
457 *connected = true; | 473 *connected = true; |
458 pthread_cond_broadcast(condvar); | 474 pthread_cond_broadcast(condvar); |
459 } | 475 } |
460 } | 476 } |
461 } | 477 } |
462 | 478 |
463 void SyncerThread::WatchConnectionManager(ServerConnectionManager* conn_mgr) { | 479 void SyncerThreadPthreadImpl::WatchConnectionManager( |
| 480 ServerConnectionManager* conn_mgr) { |
464 conn_mgr_hookup_.reset(NewEventListenerHookup(conn_mgr->channel(), this, | 481 conn_mgr_hookup_.reset(NewEventListenerHookup(conn_mgr->channel(), this, |
465 &SyncerThread::HandleServerConnectionEvent)); | 482 &SyncerThreadPthreadImpl::HandleServerConnectionEvent)); |
466 CheckConnected(&connected_, conn_mgr->server_status(), | 483 CheckConnected(&connected_, conn_mgr->server_status(), |
467 &changed_.condvar_); | 484 &changed_.condvar_); |
468 } | 485 } |
469 | 486 |
470 void SyncerThread::HandleServerConnectionEvent( | 487 void SyncerThreadPthreadImpl::HandleServerConnectionEvent( |
471 const ServerConnectionEvent& event) { | 488 const ServerConnectionEvent& event) { |
472 if (ServerConnectionEvent::STATUS_CHANGED == event.what_happened) { | 489 if (ServerConnectionEvent::STATUS_CHANGED == event.what_happened) { |
473 MutexLock lock(&mutex_); | 490 MutexLock lock(&mutex_); |
474 CheckConnected(&connected_, event.connection_code, | 491 CheckConnected(&connected_, event.connection_code, |
475 &changed_.condvar_); | 492 &changed_.condvar_); |
476 } | 493 } |
477 } | 494 } |
478 | 495 |
479 SyncerEventChannel* SyncerThread::channel() { | 496 SyncerEventChannel* SyncerThreadPthreadImpl::channel() { |
480 return syncer_event_channel_.get(); | 497 return syncer_event_channel_.get(); |
481 } | 498 } |
482 | 499 |
483 // Inputs and return value in milliseconds. | 500 // Inputs and return value in milliseconds. |
484 int SyncerThread::CalculateSyncWaitTime(int last_interval, int user_idle_ms) { | 501 int SyncerThreadPthreadImpl::CalculateSyncWaitTime(int last_interval, |
| 502 int user_idle_ms) { |
485 // syncer_polling_interval_ is in seconds | 503 // syncer_polling_interval_ is in seconds |
486 int syncer_polling_interval_ms = syncer_polling_interval_ * 1000; | 504 int syncer_polling_interval_ms = syncer_polling_interval_ * 1000; |
487 | 505 |
488 // This is our default and lower bound. | 506 // This is our default and lower bound. |
489 int next_wait = syncer_polling_interval_ms; | 507 int next_wait = syncer_polling_interval_ms; |
490 | 508 |
491 // Get idle time, bounded by max wait. | 509 // Get idle time, bounded by max wait. |
492 int idle = min(user_idle_ms, syncer_max_interval_); | 510 int idle = min(user_idle_ms, syncer_max_interval_); |
493 | 511 |
494 // If the user has been idle for a while, we'll start decreasing the poll | 512 // If the user has been idle for a while, we'll start decreasing the poll |
495 // rate. | 513 // rate. |
496 if (idle >= kPollBackoffThresholdMultiplier * syncer_polling_interval_ms) { | 514 if (idle >= kPollBackoffThresholdMultiplier * syncer_polling_interval_ms) { |
497 next_wait = std::min(AllStatus::GetRecommendedDelaySeconds( | 515 next_wait = std::min(AllStatus::GetRecommendedDelaySeconds( |
498 last_interval / 1000), syncer_max_interval_ / 1000) * 1000; | 516 last_interval / 1000), syncer_max_interval_ / 1000) * 1000; |
499 } | 517 } |
500 | 518 |
501 return next_wait; | 519 return next_wait; |
502 } | 520 } |
503 | 521 |
504 // Called with mutex_ already locked. | 522 // Called with mutex_ already locked. |
505 void SyncerThread::NudgeSyncImpl(int milliseconds_from_now, | 523 void SyncerThreadPthreadImpl::NudgeSyncImpl(int milliseconds_from_now, |
506 NudgeSource source) { | 524 SyncerThread::NudgeSource source) { |
507 const timespec nudge_time = GetPThreadAbsoluteTime(milliseconds_from_now); | 525 const timespec nudge_time = GetPThreadAbsoluteTime(milliseconds_from_now); |
508 NudgeObject nudge_object(nudge_time, source); | 526 NudgeObject nudge_object(nudge_time, source); |
509 nudge_queue_.push(nudge_object); | 527 nudge_queue_.push(nudge_object); |
510 pthread_cond_broadcast(&changed_.condvar_); | 528 pthread_cond_broadcast(&changed_.condvar_); |
511 } | 529 } |
512 | 530 |
513 void SyncerThread::WatchTalkMediator(TalkMediator* mediator) { | 531 void SyncerThreadPthreadImpl::WatchTalkMediator(TalkMediator* mediator) { |
514 talk_mediator_hookup_.reset( | 532 talk_mediator_hookup_.reset( |
515 NewEventListenerHookup( | 533 NewEventListenerHookup( |
516 mediator->channel(), | 534 mediator->channel(), |
517 this, | 535 this, |
518 &SyncerThread::HandleTalkMediatorEvent)); | 536 &SyncerThreadPthreadImpl::HandleTalkMediatorEvent)); |
519 } | 537 } |
520 | 538 |
521 void SyncerThread::HandleTalkMediatorEvent(const TalkMediatorEvent& event) { | 539 void SyncerThreadPthreadImpl::HandleTalkMediatorEvent( |
| 540 const TalkMediatorEvent& event) { |
522 MutexLock lock(&mutex_); | 541 MutexLock lock(&mutex_); |
523 switch (event.what_happened) { | 542 switch (event.what_happened) { |
524 case TalkMediatorEvent::LOGIN_SUCCEEDED: | 543 case TalkMediatorEvent::LOGIN_SUCCEEDED: |
525 LOG(INFO) << "P2P: Login succeeded."; | 544 LOG(INFO) << "P2P: Login succeeded."; |
526 p2p_authenticated_ = true; | 545 p2p_authenticated_ = true; |
527 break; | 546 break; |
528 case TalkMediatorEvent::LOGOUT_SUCCEEDED: | 547 case TalkMediatorEvent::LOGOUT_SUCCEEDED: |
529 LOG(INFO) << "P2P: Login succeeded."; | 548 LOG(INFO) << "P2P: Login succeeded."; |
530 p2p_authenticated_ = false; | 549 p2p_authenticated_ = false; |
531 break; | 550 break; |
532 case TalkMediatorEvent::SUBSCRIPTIONS_ON: | 551 case TalkMediatorEvent::SUBSCRIPTIONS_ON: |
533 LOG(INFO) << "P2P: Subscriptions successfully enabled."; | 552 LOG(INFO) << "P2P: Subscriptions successfully enabled."; |
534 p2p_subscribed_ = true; | 553 p2p_subscribed_ = true; |
535 if (NULL != syncer_) { | 554 if (NULL != syncer_) { |
536 LOG(INFO) << "Subscriptions on. Nudging syncer for initial push."; | 555 LOG(INFO) << "Subscriptions on. Nudging syncer for initial push."; |
537 NudgeSyncImpl(0, kLocal); | 556 NudgeSyncImpl(0, SyncerThread::kLocal); |
538 } | 557 } |
539 break; | 558 break; |
540 case TalkMediatorEvent::SUBSCRIPTIONS_OFF: | 559 case TalkMediatorEvent::SUBSCRIPTIONS_OFF: |
541 LOG(INFO) << "P2P: Subscriptions are not enabled."; | 560 LOG(INFO) << "P2P: Subscriptions are not enabled."; |
542 p2p_subscribed_ = false; | 561 p2p_subscribed_ = false; |
543 break; | 562 break; |
544 case TalkMediatorEvent::NOTIFICATION_RECEIVED: | 563 case TalkMediatorEvent::NOTIFICATION_RECEIVED: |
545 LOG(INFO) << "P2P: Updates on server, pushing syncer"; | 564 LOG(INFO) << "P2P: Updates on server, pushing syncer"; |
546 if (NULL != syncer_) { | 565 if (NULL != syncer_) { |
547 NudgeSyncImpl(0, kNotification); | 566 NudgeSyncImpl(0, SyncerThread::kNotification); |
548 } | 567 } |
549 break; | 568 break; |
550 default: | 569 default: |
551 break; | 570 break; |
552 } | 571 } |
553 | 572 |
554 if (NULL != syncer_) { | 573 if (NULL != syncer_) { |
555 syncer_->set_notifications_enabled(p2p_authenticated_ && p2p_subscribed_); | 574 syncer_->set_notifications_enabled(p2p_authenticated_ && p2p_subscribed_); |
556 } | 575 } |
557 } | 576 } |
558 | 577 |
559 } // namespace browser_sync | 578 } // namespace browser_sync |
OLD | NEW |