Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(520)

Side by Side Diff: chrome/browser/sync/engine/syncer_thread_timed_stop.cc

Issue 214033: Use chrome/base synchronization primitives and threads instead of... (Closed) Base URL: svn://chrome-svn/chrome/trunk/src/
Patch Set: '' Created 11 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2009 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2009 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4 #include "chrome/browser/sync/engine/syncer_thread_timed_stop.h"
5 #include "chrome/browser/sync/engine/syncer_thread.h"
6 5
7 #include "build/build_config.h" 6 #include "build/build_config.h"
8 7
9 #ifdef OS_MACOSX 8 #ifdef OS_MACOSX
10 #include <CoreFoundation/CFNumber.h> 9 #include <CoreFoundation/CFNumber.h>
11 #include <IOKit/IOTypes.h> 10 #include <IOKit/IOTypes.h>
12 #include <IOKit/IOKitLib.h> 11 #include <IOKit/IOKitLib.h>
13 #endif 12 #endif
14 13
15 #include <algorithm> 14 #include <algorithm>
16 #include <map> 15 #include <map>
17 #include <queue> 16 #include <queue>
18 17
19 #include "chrome/browser/sync/engine/auth_watcher.h" 18 #include "chrome/browser/sync/engine/auth_watcher.h"
20 #include "chrome/browser/sync/engine/model_safe_worker.h" 19 #include "chrome/browser/sync/engine/model_safe_worker.h"
21 #include "chrome/browser/sync/engine/net/server_connection_manager.h" 20 #include "chrome/browser/sync/engine/net/server_connection_manager.h"
22 #include "chrome/browser/sync/engine/syncer.h" 21 #include "chrome/browser/sync/engine/syncer.h"
23 #include "chrome/browser/sync/notifier/listener/talk_mediator.h" 22 #include "chrome/browser/sync/notifier/listener/talk_mediator.h"
24 #include "chrome/browser/sync/notifier/listener/talk_mediator_impl.h" 23 #include "chrome/browser/sync/notifier/listener/talk_mediator_impl.h"
25 #include "chrome/browser/sync/syncable/directory_manager.h" 24 #include "chrome/browser/sync/syncable/directory_manager.h"
26 25
27 using std::priority_queue; 26 using std::priority_queue;
28 using std::min; 27 using std::min;
29 28 using base::Time;
30 static inline bool operator < (const timespec& a, const timespec& b) { 29 using base::TimeDelta;
31 return a.tv_sec == b.tv_sec ? a.tv_nsec < b.tv_nsec : a.tv_sec < b.tv_sec; 30 using base::TimeTicks;
32 }
33
34 namespace {
35
36 // Returns the amount of time since the user last interacted with the computer,
37 // in milliseconds
38 int UserIdleTime() {
39 #ifdef OS_WIN
40 LASTINPUTINFO last_input_info;
41 last_input_info.cbSize = sizeof(LASTINPUTINFO);
42
43 // Get time in windows ticks since system start of last activity.
44 BOOL b = ::GetLastInputInfo(&last_input_info);
45 if (b == TRUE)
46 return ::GetTickCount() - last_input_info.dwTime;
47 #elif defined(OS_MACOSX)
48 // It would be great to do something like:
49 //
50 // return 1000 *
51 // CGEventSourceSecondsSinceLastEventType(
52 // kCGEventSourceStateCombinedSessionState,
53 // kCGAnyInputEventType);
54 //
55 // Unfortunately, CGEvent* lives in ApplicationServices, and we're a daemon
56 // and can't link that high up the food chain. Thus this mucking in IOKit.
57
58 io_service_t hid_service =
59 IOServiceGetMatchingService(kIOMasterPortDefault,
60 IOServiceMatching("IOHIDSystem"));
61 if (!hid_service) {
62 LOG(WARNING) << "Could not obtain IOHIDSystem";
63 return 0;
64 }
65
66 CFTypeRef object = IORegistryEntryCreateCFProperty(hid_service,
67 CFSTR("HIDIdleTime"),
68 kCFAllocatorDefault,
69 0);
70 if (!object) {
71 LOG(WARNING) << "Could not get IOHIDSystem's HIDIdleTime property";
72 IOObjectRelease(hid_service);
73 return 0;
74 }
75
76 int64 idle_time; // in nanoseconds
77 Boolean success;
78 if (CFGetTypeID(object) == CFNumberGetTypeID()) {
79 success = CFNumberGetValue((CFNumberRef)object,
80 kCFNumberSInt64Type,
81 &idle_time);
82 } else {
83 LOG(WARNING) << "IOHIDSystem's HIDIdleTime property isn't a number!";
84 }
85
86 CFRelease(object);
87 IOObjectRelease(hid_service);
88
89 if (!success) {
90 LOG(WARNING) << "Could not get IOHIDSystem's HIDIdleTime property's value";
91 return 0;
92 } else {
93 return idle_time / 1000000; // nano to milli
94 }
95 #else
96 static bool was_logged = false;
97 if (!was_logged) {
98 was_logged = true;
99 LOG(INFO) << "UserIdleTime unimplemented on this platform, "
100 "synchronization will not throttle when user idle";
101 }
102 #endif
103
104 return 0;
105 }
106
107 } // namespace
108 31
109 namespace browser_sync { 32 namespace browser_sync {
110 33
111 bool SyncerThread::NudgeSyncer(int milliseconds_from_now, NudgeSource source) { 34 SyncerThreadTimedStop::SyncerThreadTimedStop(
112 MutexLock lock(&mutex_);
113 if (syncer_ == NULL) {
114 return false;
115 }
116 NudgeSyncImpl(milliseconds_from_now, source);
117 return true;
118 }
119
120 void* RunSyncerThread(void* syncer_thread) {
121 return (reinterpret_cast<SyncerThread*>(syncer_thread))->ThreadMain();
122 }
123
124 SyncerThread::SyncerThread(
125 ClientCommandChannel* command_channel, 35 ClientCommandChannel* command_channel,
126 syncable::DirectoryManager* mgr, 36 syncable::DirectoryManager* mgr,
127 ServerConnectionManager* connection_manager, 37 ServerConnectionManager* connection_manager,
128 AllStatus* all_status, 38 AllStatus* all_status,
129 ModelSafeWorker* model_safe_worker) 39 ModelSafeWorker* model_safe_worker)
130 : dirman_(mgr), scm_(connection_manager), 40 : SyncerThread(command_channel, mgr, connection_manager, all_status,
131 syncer_(NULL), syncer_events_(NULL), thread_running_(false), 41 model_safe_worker),
132 syncer_short_poll_interval_seconds_(kDefaultShortPollIntervalSeconds), 42 in_thread_main_loop_(false) {
133 syncer_long_poll_interval_seconds_(kDefaultLongPollIntervalSeconds),
134 syncer_polling_interval_(kDefaultShortPollIntervalSeconds),
135 syncer_max_interval_(kDefaultMaxPollIntervalMs),
136 stop_syncer_thread_(false), connected_(false), conn_mgr_hookup_(NULL),
137 p2p_authenticated_(false), p2p_subscribed_(false),
138 allstatus_(all_status), talk_mediator_hookup_(NULL),
139 command_channel_(command_channel), directory_manager_hookup_(NULL),
140 model_safe_worker_(model_safe_worker),
141 client_command_hookup_(NULL), disable_idle_detection_(false) {
142
143 SyncerEvent shutdown = { SyncerEvent::SHUTDOWN_USE_WITH_CARE };
144 syncer_event_channel_.reset(new SyncerEventChannel(shutdown));
145
146 if (dirman_) {
147 directory_manager_hookup_.reset(NewEventListenerHookup(
148 dirman_->channel(), this, &SyncerThread::HandleDirectoryManagerEvent));
149 }
150
151 if (scm_) {
152 WatchConnectionManager(scm_);
153 }
154
155 if (command_channel_) {
156 WatchClientCommands(command_channel_);
157 }
158 }
159
160 SyncerThread::~SyncerThread() {
161 client_command_hookup_.reset();
162 conn_mgr_hookup_.reset();
163 syncer_event_channel_.reset();
164 directory_manager_hookup_.reset();
165 syncer_events_.reset();
166 delete syncer_;
167 talk_mediator_hookup_.reset();
168 CHECK(!thread_running_);
169 }
170
171 // Creates and starts a syncer thread.
172 // Returns true if it creates a thread or if there's currently a thread running
173 // and false otherwise.
174 bool SyncerThread::Start() {
175 MutexLock lock(&mutex_);
176 if (thread_running_) {
177 return true;
178 }
179 thread_running_ =
180 (0 == pthread_create(&thread_, NULL, RunSyncerThread, this));
181 if (thread_running_) {
182 pthread_detach(thread_);
183 }
184 return thread_running_;
185 } 43 }
186 44
187 // Stop processing. A max wait of at least 2*server RTT time is recommended. 45 // Stop processing. A max wait of at least 2*server RTT time is recommended.
188 // Returns true if we stopped, false otherwise. 46 // Returns true if we stopped, false otherwise.
189 bool SyncerThread::Stop(int max_wait) { 47 bool SyncerThreadTimedStop::Stop(int max_wait) {
190 MutexLock lock(&mutex_); 48 AutoLock lock(lock_);
191 if (!thread_running_) 49 // If the thread has been started, then we either already have or are about to
50 // enter ThreadMainLoop so we have to proceed with shutdown and wait for it to
51 // finish. If the thread has not been started --and we now own the lock--
52 // then we can early out because the caller has not called Start().
53 if (!thread_.IsRunning())
192 return true; 54 return true;
193 stop_syncer_thread_ = true; 55
194 if (NULL != syncer_) { 56 LOG(INFO) << "SyncerThread::Stop - setting ThreadMain exit condition to "
57 << "true (vault_.stop_syncer_thread_)";
58 // Exit the ThreadMainLoop once the syncer finishes (we tell it to exit
59 // below).
60 vault_.stop_syncer_thread_ = true;
61 if (NULL != vault_.syncer_) {
195 // Try to early exit the syncer. 62 // Try to early exit the syncer.
196 syncer_->RequestEarlyExit(); 63 vault_.syncer_->RequestEarlyExit();
197 } 64 }
198 pthread_cond_broadcast(&changed_.condvar_); 65
199 timespec deadline = { time(NULL) + (max_wait / 1000), 0 }; 66 // stop_syncer_thread_ is now true and the Syncer has been told to exit.
200 do { 67 // We want to wake up all waiters so they can re-examine state. We signal,
201 const int wait_result = max_wait < 0 ? 68 // causing all waiters to try to re-acquire the lock, and then we atomically
202 pthread_cond_wait(&changed_.condvar_, &mutex_.mutex_) : 69 // release the lock and wait. Our wait can be spuriously signaled, so we
203 pthread_cond_timedwait(&changed_.condvar_, &mutex_.mutex_, 70 // recalculate the remaining sleep time each time through and re-
204 &deadline); 71 // check the condition before exiting the loop.
205 if (ETIMEDOUT == wait_result) { 72 vault_field_changed_.Broadcast();
206 LOG(ERROR) << "SyncerThread::Stop timed out. Problems likely."; 73 TimeTicks start = TimeTicks::Now();
207 return false; 74 TimeTicks end = start + TimeDelta::FromMilliseconds(max_wait);
75 bool timed_out = false;
76 // Eventually the combination of RequestEarlyExit and setting
77 // stop_syncer_thread_ to true above will cause in_thread_main_loop_ to become
78 // false.
79 while (in_thread_main_loop_) {
80 TimeDelta sleep_time = end - TimeTicks::Now();
81 if (sleep_time < TimeDelta::FromSeconds(0)) {
82 timed_out = true;
83 break;
208 } 84 }
209 } while (thread_running_); 85 LOG(INFO) << "Waiting in stop for " << sleep_time.InSeconds() << "s.";
86 vault_field_changed_.TimedWait(sleep_time);
87 }
88
89 if (timed_out) {
90 LOG(ERROR) << "SyncerThread::Stop timed out or error. Problems likely.";
91 return false;
92 }
93
94 // Stop() should not block on anything at this point, given above madness.
95 DLOG(INFO) << "Calling SyncerThread::thread_.Stop() at "
96 << Time::Now().ToInternalValue();
97 thread_.Stop();
98 DLOG(INFO) << "SyncerThread::thread_.Stop() finished at "
99 << Time::Now().ToInternalValue();
210 return true; 100 return true;
211 } 101 }
212 102
213 void SyncerThread::WatchClientCommands(ClientCommandChannel* channel) { 103 void SyncerThreadTimedStop::ThreadMain() {
214 PThreadScopedLock<PThreadMutex> lock(&mutex_); 104 AutoLock lock(lock_);
215 client_command_hookup_.reset(NewEventListenerHookup(channel, this, 105 // Signal Start() to let it know we've made it safely are now running on the
216 &SyncerThread::HandleClientCommand)); 106 // message loop, and unblock it's caller.
107 thread_main_started_.Signal();
108
109 // The only thing that could be waiting on this value is Stop, and we don't
110 // release the lock until we're far enough along to Stop safely.
111 in_thread_main_loop_ = true;
112 vault_field_changed_.Broadcast();
113 ThreadMainLoop();
114 in_thread_main_loop_ = false;
115 vault_field_changed_.Broadcast();
116 LOG(INFO) << "Syncer thread ThreadMain is done.";
217 } 117 }
218 118
219 void SyncerThread::HandleClientCommand(ClientCommandChannel::EventType event) { 119 } // namespace browser_sync
220 if (!event) {
221 return;
222 }
223
224 // Mutex not really necessary for these.
225 if (event->has_set_sync_poll_interval()) {
226 syncer_short_poll_interval_seconds_ = event->set_sync_poll_interval();
227 }
228
229 if (event->has_set_sync_long_poll_interval()) {
230 syncer_long_poll_interval_seconds_ = event->set_sync_long_poll_interval();
231 }
232 }
233
234 void SyncerThread::ThreadMainLoop() {
235 // Use the short poll value by default.
236 int poll_seconds = syncer_short_poll_interval_seconds_;
237 int user_idle_milliseconds = 0;
238 timespec last_sync_time = { 0 };
239 bool initial_sync_for_thread = true;
240 bool continue_sync_cycle = false;
241
242 while (!stop_syncer_thread_) {
243 if (!connected_) {
244 LOG(INFO) << "Syncer thread waiting for connection.";
245 while (!connected_ && !stop_syncer_thread_)
246 pthread_cond_wait(&changed_.condvar_, &mutex_.mutex_);
247 LOG_IF(INFO, connected_) << "Syncer thread found connection.";
248 continue;
249 }
250
251 if (syncer_ == NULL) {
252 LOG(INFO) << "Syncer thread waiting for database initialization.";
253 while (syncer_ == NULL && !stop_syncer_thread_)
254 pthread_cond_wait(&changed_.condvar_, &mutex_.mutex_);
255 LOG_IF(INFO, !(syncer_ == NULL)) << "Syncer was found after DB started.";
256 continue;
257 }
258
259 timespec const next_poll = { last_sync_time.tv_sec + poll_seconds,
260 last_sync_time.tv_nsec };
261 const timespec wake_time =
262 !nudge_queue_.empty() && nudge_queue_.top().first < next_poll ?
263 nudge_queue_.top().first : next_poll;
264 LOG(INFO) << "wake time is " << wake_time.tv_sec;
265 LOG(INFO) << "next poll is " << next_poll.tv_sec;
266
267 const int error = pthread_cond_timedwait(&changed_.condvar_, &mutex_.mutex_,
268 &wake_time);
269 if (ETIMEDOUT != error) {
270 continue; // Check all the conditions again.
271 }
272
273 const timespec now = GetPThreadAbsoluteTime(0);
274
275 // Handle a nudge, caused by either a notification or a local bookmark
276 // event. This will also update the source of the following SyncMain call.
277 UpdateNudgeSource(now, &continue_sync_cycle, &initial_sync_for_thread);
278
279 LOG(INFO) << "Calling Sync Main at time " << now.tv_sec;
280 SyncMain(syncer_);
281 last_sync_time = now;
282
283 LOG(INFO) << "Updating the next polling time after SyncMain";
284 poll_seconds = CalculatePollingWaitTime(allstatus_->status(),
285 poll_seconds,
286 &user_idle_milliseconds,
287 &continue_sync_cycle);
288 }
289 }
290
291 // We check how long the user's been idle and sync less often if the machine is
292 // not in use. The aim is to reduce server load.
293 int SyncerThread::CalculatePollingWaitTime(
294 const AllStatus::Status& status,
295 int last_poll_wait, // in s
296 int* user_idle_milliseconds,
297 bool* continue_sync_cycle) {
298 bool is_continuing_sync_cyle = *continue_sync_cycle;
299 *continue_sync_cycle = false;
300
301 // Determine if the syncer has unfinished work to do from allstatus_.
302 const bool syncer_has_work_to_do =
303 status.updates_available > status.updates_received
304 || status.unsynced_count > 0;
305 LOG(INFO) << "syncer_has_work_to_do is " << syncer_has_work_to_do;
306
307 // First calculate the expected wait time, figuring in any backoff because of
308 // user idle time. next_wait is in seconds
309 syncer_polling_interval_ = (!status.notifications_enabled) ?
310 syncer_short_poll_interval_seconds_ :
311 syncer_long_poll_interval_seconds_;
312 int default_next_wait = syncer_polling_interval_;
313 int actual_next_wait = default_next_wait;
314
315 if (syncer_has_work_to_do) {
316 // Provide exponential backoff due to consecutive errors, else attempt to
317 // complete the work as soon as possible.
318 if (!is_continuing_sync_cyle) {
319 actual_next_wait = AllStatus::GetRecommendedDelaySeconds(0);
320 } else {
321 actual_next_wait = AllStatus::GetRecommendedDelaySeconds(last_poll_wait);
322 }
323 *continue_sync_cycle = true;
324 } else if (!status.notifications_enabled) {
325 // Ensure that we start exponential backoff from our base polling
326 // interval when we are not continuing a sync cycle.
327 last_poll_wait = std::max(last_poll_wait, syncer_polling_interval_);
328
329 // Did the user start interacting with the computer again?
330 // If so, revise our idle time (and probably next_sync_time) downwards
331 int new_idle_time = disable_idle_detection_ ? 0 : UserIdleTime();
332 if (new_idle_time < *user_idle_milliseconds) {
333 *user_idle_milliseconds = new_idle_time;
334 }
335 actual_next_wait = CalculateSyncWaitTime(last_poll_wait * 1000,
336 *user_idle_milliseconds) / 1000;
337 DCHECK_GE(actual_next_wait, default_next_wait);
338 }
339
340 LOG(INFO) << "Sync wait: idle " << default_next_wait
341 << " non-idle or backoff " << actual_next_wait << ".";
342
343 return actual_next_wait;
344 }
345
346 void* SyncerThread::ThreadMain() {
347 NameCurrentThreadForDebugging("SyncEngine_SyncerThread");
348 mutex_.Lock();
349 ThreadMainLoop();
350 thread_running_ = false;
351 pthread_cond_broadcast(&changed_.condvar_);
352 mutex_.Unlock();
353 LOG(INFO) << "Syncer thread exiting.";
354 return 0;
355 }
356
357 void SyncerThread::SyncMain(Syncer* syncer) {
358 CHECK(syncer);
359 mutex_.Unlock();
360 while (syncer->SyncShare()) {
361 LOG(INFO) << "Looping in sync share";
362 }
363 LOG(INFO) << "Done looping in sync share";
364
365 mutex_.Lock();
366 }
367
368 void SyncerThread::UpdateNudgeSource(const timespec& now,
369 bool* continue_sync_cycle,
370 bool* initial_sync) {
371 bool nudged = false;
372 NudgeSource nudge_source = kUnknown;
373 // Has the previous sync cycle completed?
374 if (continue_sync_cycle) {
375 nudge_source = kContinuation;
376 }
377 // Update the nudge source if a new nudge has come through during the
378 // previous sync cycle.
379 while (!nudge_queue_.empty() && !(now < nudge_queue_.top().first)) {
380 if (!nudged) {
381 nudge_source = nudge_queue_.top().second;
382 *continue_sync_cycle = false; // Reset the continuation token on nudge.
383 nudged = true;
384 }
385 nudge_queue_.pop();
386 }
387 SetUpdatesSource(nudged, nudge_source, initial_sync);
388 }
389
390 void SyncerThread::SetUpdatesSource(bool nudged, NudgeSource nudge_source,
391 bool* initial_sync) {
392 sync_pb::GetUpdatesCallerInfo::GET_UPDATES_SOURCE updates_source =
393 sync_pb::GetUpdatesCallerInfo::UNKNOWN;
394 if (*initial_sync) {
395 updates_source = sync_pb::GetUpdatesCallerInfo::FIRST_UPDATE;
396 *initial_sync = false;
397 } else if (!nudged) {
398 updates_source = sync_pb::GetUpdatesCallerInfo::PERIODIC;
399 } else {
400 switch (nudge_source) {
401 case kNotification:
402 updates_source = sync_pb::GetUpdatesCallerInfo::NOTIFICATION;
403 break;
404 case kLocal:
405 updates_source = sync_pb::GetUpdatesCallerInfo::LOCAL;
406 break;
407 case kContinuation:
408 updates_source = sync_pb::GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION;
409 break;
410 case kUnknown:
411 default:
412 updates_source = sync_pb::GetUpdatesCallerInfo::UNKNOWN;
413 break;
414 }
415 }
416 syncer_->set_updates_source(updates_source);
417 }
418
419 void SyncerThread::HandleSyncerEvent(const SyncerEvent& event) {
420 MutexLock lock(&mutex_);
421 channel()->NotifyListeners(event);
422 if (SyncerEvent::REQUEST_SYNC_NUDGE != event.what_happened) {
423 return;
424 }
425 NudgeSyncImpl(event.nudge_delay_milliseconds, kUnknown);
426 }
427
428 void SyncerThread::HandleDirectoryManagerEvent(
429 const syncable::DirectoryManagerEvent& event) {
430 LOG(INFO) << "Handling a directory manager event";
431 if (syncable::DirectoryManagerEvent::OPENED == event.what_happened) {
432 MutexLock lock(&mutex_);
433 LOG(INFO) << "Syncer starting up for: " << event.dirname;
434 // The underlying database structure is ready, and we should create
435 // the syncer.
436 CHECK(syncer_ == NULL);
437 syncer_ =
438 new Syncer(dirman_, event.dirname, scm_, model_safe_worker_.get());
439
440 syncer_->set_command_channel(command_channel_);
441 syncer_events_.reset(NewEventListenerHookup(
442 syncer_->channel(), this, &SyncerThread::HandleSyncerEvent));
443 pthread_cond_broadcast(&changed_.condvar_);
444 }
445 }
446
447 static inline void CheckConnected(bool* connected,
448 HttpResponse::ServerConnectionCode code,
449 pthread_cond_t* condvar) {
450 if (*connected) {
451 if (HttpResponse::CONNECTION_UNAVAILABLE == code) {
452 *connected = false;
453 pthread_cond_broadcast(condvar);
454 }
455 } else {
456 if (HttpResponse::SERVER_CONNECTION_OK == code) {
457 *connected = true;
458 pthread_cond_broadcast(condvar);
459 }
460 }
461 }
462
463 void SyncerThread::WatchConnectionManager(ServerConnectionManager* conn_mgr) {
464 conn_mgr_hookup_.reset(NewEventListenerHookup(conn_mgr->channel(), this,
465 &SyncerThread::HandleServerConnectionEvent));
466 CheckConnected(&connected_, conn_mgr->server_status(),
467 &changed_.condvar_);
468 }
469
470 void SyncerThread::HandleServerConnectionEvent(
471 const ServerConnectionEvent& event) {
472 if (ServerConnectionEvent::STATUS_CHANGED == event.what_happened) {
473 MutexLock lock(&mutex_);
474 CheckConnected(&connected_, event.connection_code,
475 &changed_.condvar_);
476 }
477 }
478
479 SyncerEventChannel* SyncerThread::channel() {
480 return syncer_event_channel_.get();
481 }
482
483 // Inputs and return value in milliseconds.
484 int SyncerThread::CalculateSyncWaitTime(int last_interval, int user_idle_ms) {
485 // syncer_polling_interval_ is in seconds
486 int syncer_polling_interval_ms = syncer_polling_interval_ * 1000;
487
488 // This is our default and lower bound.
489 int next_wait = syncer_polling_interval_ms;
490
491 // Get idle time, bounded by max wait.
492 int idle = min(user_idle_ms, syncer_max_interval_);
493
494 // If the user has been idle for a while, we'll start decreasing the poll
495 // rate.
496 if (idle >= kPollBackoffThresholdMultiplier * syncer_polling_interval_ms) {
497 next_wait = std::min(AllStatus::GetRecommendedDelaySeconds(
498 last_interval / 1000), syncer_max_interval_ / 1000) * 1000;
499 }
500
501 return next_wait;
502 }
503
504 // Called with mutex_ already locked.
505 void SyncerThread::NudgeSyncImpl(int milliseconds_from_now,
506 NudgeSource source) {
507 const timespec nudge_time = GetPThreadAbsoluteTime(milliseconds_from_now);
508 NudgeObject nudge_object(nudge_time, source);
509 nudge_queue_.push(nudge_object);
510 pthread_cond_broadcast(&changed_.condvar_);
511 }
512
513 void SyncerThread::WatchTalkMediator(TalkMediator* mediator) {
514 talk_mediator_hookup_.reset(
515 NewEventListenerHookup(
516 mediator->channel(),
517 this,
518 &SyncerThread::HandleTalkMediatorEvent));
519 }
520
521 void SyncerThread::HandleTalkMediatorEvent(const TalkMediatorEvent& event) {
522 MutexLock lock(&mutex_);
523 switch (event.what_happened) {
524 case TalkMediatorEvent::LOGIN_SUCCEEDED:
525 LOG(INFO) << "P2P: Login succeeded.";
526 p2p_authenticated_ = true;
527 break;
528 case TalkMediatorEvent::LOGOUT_SUCCEEDED:
529 LOG(INFO) << "P2P: Login succeeded.";
530 p2p_authenticated_ = false;
531 break;
532 case TalkMediatorEvent::SUBSCRIPTIONS_ON:
533 LOG(INFO) << "P2P: Subscriptions successfully enabled.";
534 p2p_subscribed_ = true;
535 if (NULL != syncer_) {
536 LOG(INFO) << "Subscriptions on. Nudging syncer for initial push.";
537 NudgeSyncImpl(0, kLocal);
538 }
539 break;
540 case TalkMediatorEvent::SUBSCRIPTIONS_OFF:
541 LOG(INFO) << "P2P: Subscriptions are not enabled.";
542 p2p_subscribed_ = false;
543 break;
544 case TalkMediatorEvent::NOTIFICATION_RECEIVED:
545 LOG(INFO) << "P2P: Updates on server, pushing syncer";
546 if (NULL != syncer_) {
547 NudgeSyncImpl(0, kNotification);
548 }
549 break;
550 default:
551 break;
552 }
553
554 if (NULL != syncer_) {
555 syncer_->set_notifications_enabled(p2p_authenticated_ && p2p_subscribed_);
556 }
557 }
558
559 } // namespace browser_sync
OLDNEW
« no previous file with comments | « chrome/browser/sync/engine/syncer_thread_timed_stop.h ('k') | chrome/browser/sync/engine/syncer_thread_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698