Chromium Code Reviews (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out

Side by Side Diff: chrome/browser/sync/engine/

Issue 235010: Reverting 27117. (Closed) Base URL: svn://chrome-svn/chrome/trunk/src/
Patch Set: Created 11 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
1 // Copyright (c) 2009 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 #include "chrome/browser/sync/engine/syncer_thread_pthreads.h"
6 #include "build/build_config.h"
8 #ifdef OS_MACOSX
9 #include <CoreFoundation/CFNumber.h>
10 #include <IOKit/IOTypes.h>
11 #include <IOKit/IOKitLib.h>
12 #endif
14 #include <algorithm>
15 #include <map>
16 #include <queue>
18 #include "chrome/browser/sync/engine/auth_watcher.h"
19 #include "chrome/browser/sync/engine/model_safe_worker.h"
20 #include "chrome/browser/sync/engine/net/server_connection_manager.h"
21 #include "chrome/browser/sync/engine/syncer.h"
22 #include "chrome/browser/sync/notifier/listener/talk_mediator.h"
23 #include "chrome/browser/sync/notifier/listener/talk_mediator_impl.h"
24 #include "chrome/browser/sync/syncable/directory_manager.h"
26 using std::priority_queue;
27 using std::min;
29 static inline bool operator < (const timespec& a, const timespec& b) {
30 return a.tv_sec == b.tv_sec ? a.tv_nsec < b.tv_nsec : a.tv_sec < b.tv_sec;
31 }
33 namespace {
35 // Returns the amount of time since the user last interacted with the computer,
36 // in milliseconds
37 int UserIdleTime() {
38 #ifdef OS_WIN
39 LASTINPUTINFO last_input_info;
40 last_input_info.cbSize = sizeof(LASTINPUTINFO);
42 // Get time in windows ticks since system start of last activity.
43 BOOL b = ::GetLastInputInfo(&last_input_info);
44 if (b == TRUE)
45 return ::GetTickCount() - last_input_info.dwTime;
46 #elif defined(OS_MACOSX)
47 // It would be great to do something like:
48 //
49 // return 1000 *
50 // CGEventSourceSecondsSinceLastEventType(
51 // kCGEventSourceStateCombinedSessionState,
52 // kCGAnyInputEventType);
53 //
54 // Unfortunately, CGEvent* lives in ApplicationServices, and we're a daemon
55 // and can't link that high up the food chain. Thus this mucking in IOKit.
57 io_service_t hid_service =
58 IOServiceGetMatchingService(kIOMasterPortDefault,
59 IOServiceMatching("IOHIDSystem"));
60 if (!hid_service) {
61 LOG(WARNING) << "Could not obtain IOHIDSystem";
62 return 0;
63 }
65 CFTypeRef object = IORegistryEntryCreateCFProperty(hid_service,
66 CFSTR("HIDIdleTime"),
67 kCFAllocatorDefault,
68 0);
69 if (!object) {
70 LOG(WARNING) << "Could not get IOHIDSystem's HIDIdleTime property";
71 IOObjectRelease(hid_service);
72 return 0;
73 }
75 int64 idle_time; // in nanoseconds
76 Boolean success;
77 if (CFGetTypeID(object) == CFNumberGetTypeID()) {
78 success = CFNumberGetValue((CFNumberRef)object,
79 kCFNumberSInt64Type,
80 &idle_time);
81 } else {
82 LOG(WARNING) << "IOHIDSystem's HIDIdleTime property isn't a number!";
83 }
85 CFRelease(object);
86 IOObjectRelease(hid_service);
88 if (!success) {
89 LOG(WARNING) << "Could not get IOHIDSystem's HIDIdleTime property's value";
90 return 0;
91 } else {
92 return idle_time / 1000000; // nano to milli
93 }
94 #else
95 static bool was_logged = false;
96 if (!was_logged) {
97 was_logged = true;
98 LOG(INFO) << "UserIdleTime unimplemented on this platform, "
99 "synchronization will not throttle when user idle";
100 }
101 #endif
103 return 0;
104 }
106 } // namespace
108 namespace browser_sync {
110 SyncerThreadPthreads::SyncerThreadPthreads(
111 ClientCommandChannel* command_channel,
112 syncable::DirectoryManager* mgr,
113 ServerConnectionManager* connection_manager,
114 AllStatus* all_status, ModelSafeWorker* model_safe_worker)
115 : SyncerThread() {
116 impl_.reset(new SyncerThreadPthreadImpl(command_channel, mgr,
117 connection_manager, all_status, model_safe_worker));
118 }
120 bool SyncerThreadPthreadImpl::NudgeSyncer(int milliseconds_from_now,
121 SyncerThread::NudgeSource source) {
122 MutexLock lock(&mutex_);
123 if (syncer_ == NULL) {
124 return false;
125 }
126 NudgeSyncImpl(milliseconds_from_now, source);
127 return true;
128 }
130 void* RunSyncerThread(void* syncer_thread) {
131 return (reinterpret_cast<SyncerThreadPthreadImpl*>(
132 syncer_thread))->ThreadMain();
133 }
135 SyncerThreadPthreadImpl::SyncerThreadPthreadImpl(
136 ClientCommandChannel* command_channel,
137 syncable::DirectoryManager* mgr,
138 ServerConnectionManager* connection_manager,
139 AllStatus* all_status,
140 ModelSafeWorker* model_safe_worker)
141 : dirman_(mgr), scm_(connection_manager),
142 syncer_(NULL), syncer_events_(NULL), thread_running_(false),
143 syncer_short_poll_interval_seconds_(
144 SyncerThread::kDefaultShortPollIntervalSeconds),
145 syncer_long_poll_interval_seconds_(
146 SyncerThread::kDefaultLongPollIntervalSeconds),
147 syncer_polling_interval_(SyncerThread::kDefaultShortPollIntervalSeconds),
148 syncer_max_interval_(SyncerThread::kDefaultMaxPollIntervalMs),
149 stop_syncer_thread_(false), connected_(false), conn_mgr_hookup_(NULL),
150 p2p_authenticated_(false), p2p_subscribed_(false),
151 allstatus_(all_status), talk_mediator_hookup_(NULL),
152 command_channel_(command_channel), directory_manager_hookup_(NULL),
153 model_safe_worker_(model_safe_worker),
154 client_command_hookup_(NULL), disable_idle_detection_(false) {
156 SyncerEvent shutdown = { SyncerEvent::SHUTDOWN_USE_WITH_CARE };
157 syncer_event_channel_.reset(new SyncerEventChannel(shutdown));
159 if (dirman_) {
160 directory_manager_hookup_.reset(NewEventListenerHookup(
161 dirman_->channel(), this,
162 &SyncerThreadPthreadImpl::HandleDirectoryManagerEvent));
163 }
165 if (scm_) {
166 WatchConnectionManager(scm_);
167 }
169 if (command_channel_) {
170 WatchClientCommands(command_channel_);
171 }
172 }
174 SyncerThreadPthreadImpl::~SyncerThreadPthreadImpl() {
175 client_command_hookup_.reset();
176 conn_mgr_hookup_.reset();
177 syncer_event_channel_.reset();
178 directory_manager_hookup_.reset();
179 syncer_events_.reset();
180 delete syncer_;
181 talk_mediator_hookup_.reset();
182 CHECK(!thread_running_);
183 }
185 // Creates and starts a syncer thread.
186 // Returns true if it creates a thread or if there's currently a thread running
187 // and false otherwise.
188 bool SyncerThreadPthreadImpl::Start() {
189 MutexLock lock(&mutex_);
190 if (thread_running_) {
191 return true;
192 }
193 thread_running_ =
194 (0 == pthread_create(&thread_, NULL, RunSyncerThread, this));
195 if (thread_running_) {
196 pthread_detach(thread_);
197 }
198 return thread_running_;
199 }
201 // Stop processing. A max wait of at least 2*server RTT time is recommended.
202 // Returns true if we stopped, false otherwise.
203 bool SyncerThreadPthreadImpl::Stop(int max_wait) {
204 MutexLock lock(&mutex_);
205 if (!thread_running_)
206 return true;
207 stop_syncer_thread_ = true;
208 if (NULL != syncer_) {
209 // Try to early exit the syncer.
210 syncer_->RequestEarlyExit();
211 }
212 pthread_cond_broadcast(&changed_.condvar_);
213 timespec deadline = { time(NULL) + (max_wait / 1000), 0 };
214 do {
215 const int wait_result = max_wait < 0 ?
216 pthread_cond_wait(&changed_.condvar_, &mutex_.mutex_) :
217 pthread_cond_timedwait(&changed_.condvar_, &mutex_.mutex_,
218 &deadline);
219 if (ETIMEDOUT == wait_result) {
220 LOG(ERROR) << "SyncerThread::Stop timed out. Problems likely.";
221 return false;
222 }
223 } while (thread_running_);
224 return true;
225 }
227 void SyncerThreadPthreadImpl::WatchClientCommands(
228 ClientCommandChannel* channel) {
229 PThreadScopedLock<PThreadMutex> lock(&mutex_);
230 client_command_hookup_.reset(NewEventListenerHookup(channel, this,
231 &SyncerThreadPthreadImpl::HandleClientCommand));
232 }
234 void SyncerThreadPthreadImpl::HandleClientCommand(
235 ClientCommandChannel::EventType event) {
236 if (!event) {
237 return;
238 }
240 // Mutex not really necessary for these.
241 if (event->has_set_sync_poll_interval()) {
242 syncer_short_poll_interval_seconds_ = event->set_sync_poll_interval();
243 }
245 if (event->has_set_sync_long_poll_interval()) {
246 syncer_long_poll_interval_seconds_ = event->set_sync_long_poll_interval();
247 }
248 }
250 void SyncerThreadPthreadImpl::ThreadMainLoop() {
251 // Use the short poll value by default.
252 int poll_seconds = syncer_short_poll_interval_seconds_;
253 int user_idle_milliseconds = 0;
254 timespec last_sync_time = { 0 };
255 bool initial_sync_for_thread = true;
256 bool continue_sync_cycle = false;
258 while (!stop_syncer_thread_) {
259 if (!connected_) {
260 LOG(INFO) << "Syncer thread waiting for connection.";
261 while (!connected_ && !stop_syncer_thread_)
262 pthread_cond_wait(&changed_.condvar_, &mutex_.mutex_);
263 LOG_IF(INFO, connected_) << "Syncer thread found connection.";
264 continue;
265 }
267 if (syncer_ == NULL) {
268 LOG(INFO) << "Syncer thread waiting for database initialization.";
269 while (syncer_ == NULL && !stop_syncer_thread_)
270 pthread_cond_wait(&changed_.condvar_, &mutex_.mutex_);
271 LOG_IF(INFO, !(syncer_ == NULL)) << "Syncer was found after DB started.";
272 continue;
273 }
275 timespec const next_poll = { last_sync_time.tv_sec + poll_seconds,
276 last_sync_time.tv_nsec };
277 const timespec wake_time =
278 !nudge_queue_.empty() && < next_poll ?
279 : next_poll;
280 LOG(INFO) << "wake time is " << wake_time.tv_sec;
281 LOG(INFO) << "next poll is " << next_poll.tv_sec;
283 const int error = pthread_cond_timedwait(&changed_.condvar_, &mutex_.mutex_,
284 &wake_time);
285 if (ETIMEDOUT != error) {
286 continue; // Check all the conditions again.
287 }
289 const timespec now = GetPThreadAbsoluteTime(0);
291 // Handle a nudge, caused by either a notification or a local bookmark
292 // event. This will also update the source of the following SyncMain call.
293 UpdateNudgeSource(now, &continue_sync_cycle, &initial_sync_for_thread);
295 LOG(INFO) << "Calling Sync Main at time " << now.tv_sec;
296 SyncMain(syncer_);
297 last_sync_time = now;
299 LOG(INFO) << "Updating the next polling time after SyncMain";
300 poll_seconds = CalculatePollingWaitTime(allstatus_->status(),
301 poll_seconds,
302 &user_idle_milliseconds,
303 &continue_sync_cycle);
304 }
305 }
307 // We check how long the user's been idle and sync less often if the machine is
308 // not in use. The aim is to reduce server load.
309 int SyncerThreadPthreadImpl::CalculatePollingWaitTime(
310 const AllStatus::Status& status,
311 int last_poll_wait, // in s
312 int* user_idle_milliseconds,
313 bool* continue_sync_cycle) {
314 bool is_continuing_sync_cyle = *continue_sync_cycle;
315 *continue_sync_cycle = false;
317 // Determine if the syncer has unfinished work to do from allstatus_.
318 const bool syncer_has_work_to_do =
319 status.updates_available > status.updates_received
320 || status.unsynced_count > 0;
321 LOG(INFO) << "syncer_has_work_to_do is " << syncer_has_work_to_do;
323 // First calculate the expected wait time, figuring in any backoff because of
324 // user idle time. next_wait is in seconds
325 syncer_polling_interval_ = (!status.notifications_enabled) ?
326 syncer_short_poll_interval_seconds_ :
327 syncer_long_poll_interval_seconds_;
328 int default_next_wait = syncer_polling_interval_;
329 int actual_next_wait = default_next_wait;
331 if (syncer_has_work_to_do) {
332 // Provide exponential backoff due to consecutive errors, else attempt to
333 // complete the work as soon as possible.
334 if (!is_continuing_sync_cyle) {
335 actual_next_wait = AllStatus::GetRecommendedDelaySeconds(0);
336 } else {
337 actual_next_wait = AllStatus::GetRecommendedDelaySeconds(last_poll_wait);
338 }
339 *continue_sync_cycle = true;
340 } else if (!status.notifications_enabled) {
341 // Ensure that we start exponential backoff from our base polling
342 // interval when we are not continuing a sync cycle.
343 last_poll_wait = std::max(last_poll_wait, syncer_polling_interval_);
345 // Did the user start interacting with the computer again?
346 // If so, revise our idle time (and probably next_sync_time) downwards
347 int new_idle_time = disable_idle_detection_ ? 0 : UserIdleTime();
348 if (new_idle_time < *user_idle_milliseconds) {
349 *user_idle_milliseconds = new_idle_time;
350 }
351 actual_next_wait = CalculateSyncWaitTime(last_poll_wait * 1000,
352 *user_idle_milliseconds) / 1000;
353 DCHECK_GE(actual_next_wait, default_next_wait);
354 }
356 LOG(INFO) << "Sync wait: idle " << default_next_wait
357 << " non-idle or backoff " << actual_next_wait << ".";
359 return actual_next_wait;
360 }
362 void* SyncerThreadPthreadImpl::ThreadMain() {
363 NameCurrentThreadForDebugging("SyncEngine_SyncerThread");
364 mutex_.Lock();
365 ThreadMainLoop();
366 thread_running_ = false;
367 pthread_cond_broadcast(&changed_.condvar_);
368 mutex_.Unlock();
369 LOG(INFO) << "Syncer thread exiting.";
370 return 0;
371 }
373 void SyncerThreadPthreadImpl::SyncMain(Syncer* syncer) {
374 CHECK(syncer);
375 mutex_.Unlock();
376 while (syncer->SyncShare()) {
377 LOG(INFO) << "Looping in sync share";
378 }
379 LOG(INFO) << "Done looping in sync share";
381 mutex_.Lock();
382 }
384 void SyncerThreadPthreadImpl::UpdateNudgeSource(const timespec& now,
385 bool* continue_sync_cycle,
386 bool* initial_sync) {
387 bool nudged = false;
388 SyncerThread::NudgeSource nudge_source = SyncerThread::kUnknown;
389 // Has the previous sync cycle completed?
390 if (continue_sync_cycle) {
391 nudge_source = SyncerThread::kContinuation;
392 }
393 // Update the nudge source if a new nudge has come through during the
394 // previous sync cycle.
395 while (!nudge_queue_.empty() && !(now < {
396 if (!nudged) {
397 nudge_source =;
398 *continue_sync_cycle = false; // Reset the continuation token on nudge.
399 nudged = true;
400 }
401 nudge_queue_.pop();
402 }
403 SetUpdatesSource(nudged, nudge_source, initial_sync);
404 }
406 void SyncerThreadPthreadImpl::SetUpdatesSource(bool nudged,
407 SyncerThread::NudgeSource nudge_source, bool* initial_sync) {
408 sync_pb::GetUpdatesCallerInfo::GET_UPDATES_SOURCE updates_source =
409 sync_pb::GetUpdatesCallerInfo::UNKNOWN;
410 if (*initial_sync) {
411 updates_source = sync_pb::GetUpdatesCallerInfo::FIRST_UPDATE;
412 *initial_sync = false;
413 } else if (!nudged) {
414 updates_source = sync_pb::GetUpdatesCallerInfo::PERIODIC;
415 } else {
416 switch (nudge_source) {
417 case SyncerThread::kNotification:
418 updates_source = sync_pb::GetUpdatesCallerInfo::NOTIFICATION;
419 break;
420 case SyncerThread::kLocal:
421 updates_source = sync_pb::GetUpdatesCallerInfo::LOCAL;
422 break;
423 case SyncerThread::kContinuation:
424 updates_source = sync_pb::GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION;
425 break;
426 case SyncerThread::kUnknown:
427 default:
428 updates_source = sync_pb::GetUpdatesCallerInfo::UNKNOWN;
429 break;
430 }
431 }
432 syncer_->set_updates_source(updates_source);
433 }
435 void SyncerThreadPthreadImpl::HandleSyncerEvent(const SyncerEvent& event) {
436 MutexLock lock(&mutex_);
437 channel()->NotifyListeners(event);
438 if (SyncerEvent::REQUEST_SYNC_NUDGE != event.what_happened) {
439 return;
440 }
441 NudgeSyncImpl(event.nudge_delay_milliseconds, SyncerThread::kUnknown);
442 }
444 void SyncerThreadPthreadImpl::HandleDirectoryManagerEvent(
445 const syncable::DirectoryManagerEvent& event) {
446 LOG(INFO) << "Handling a directory manager event";
447 if (syncable::DirectoryManagerEvent::OPENED == event.what_happened) {
448 MutexLock lock(&mutex_);
449 LOG(INFO) << "Syncer starting up for: " << event.dirname;
450 // The underlying database structure is ready, and we should create
451 // the syncer.
452 CHECK(syncer_ == NULL);
453 syncer_ =
454 new Syncer(dirman_, event.dirname, scm_, model_safe_worker_.get());
456 syncer_->set_command_channel(command_channel_);
457 syncer_events_.reset(NewEventListenerHookup(
458 syncer_->channel(), this, &SyncerThreadPthreadImpl::HandleSyncerEvent));
459 pthread_cond_broadcast(&changed_.condvar_);
460 }
461 }
463 static inline void CheckConnected(bool* connected,
464 HttpResponse::ServerConnectionCode code,
465 pthread_cond_t* condvar) {
466 if (*connected) {
467 if (HttpResponse::CONNECTION_UNAVAILABLE == code) {
468 *connected = false;
469 pthread_cond_broadcast(condvar);
470 }
471 } else {
472 if (HttpResponse::SERVER_CONNECTION_OK == code) {
473 *connected = true;
474 pthread_cond_broadcast(condvar);
475 }
476 }
477 }
479 void SyncerThreadPthreadImpl::WatchConnectionManager(
480 ServerConnectionManager* conn_mgr) {
481 conn_mgr_hookup_.reset(NewEventListenerHookup(conn_mgr->channel(), this,
482 &SyncerThreadPthreadImpl::HandleServerConnectionEvent));
483 CheckConnected(&connected_, conn_mgr->server_status(),
484 &changed_.condvar_);
485 }
487 void SyncerThreadPthreadImpl::HandleServerConnectionEvent(
488 const ServerConnectionEvent& event) {
489 if (ServerConnectionEvent::STATUS_CHANGED == event.what_happened) {
490 MutexLock lock(&mutex_);
491 CheckConnected(&connected_, event.connection_code,
492 &changed_.condvar_);
493 }
494 }
496 SyncerEventChannel* SyncerThreadPthreadImpl::channel() {
497 return syncer_event_channel_.get();
498 }
500 // Inputs and return value in milliseconds.
501 int SyncerThreadPthreadImpl::CalculateSyncWaitTime(int last_interval,
502 int user_idle_ms) {
503 // syncer_polling_interval_ is in seconds
504 int syncer_polling_interval_ms = syncer_polling_interval_ * 1000;
506 // This is our default and lower bound.
507 int next_wait = syncer_polling_interval_ms;
509 // Get idle time, bounded by max wait.
510 int idle = min(user_idle_ms, syncer_max_interval_);
512 // If the user has been idle for a while, we'll start decreasing the poll
513 // rate.
514 if (idle >= kPollBackoffThresholdMultiplier * syncer_polling_interval_ms) {
515 next_wait = std::min(AllStatus::GetRecommendedDelaySeconds(
516 last_interval / 1000), syncer_max_interval_ / 1000) * 1000;
517 }
519 return next_wait;
520 }
522 // Called with mutex_ already locked.
523 void SyncerThreadPthreadImpl::NudgeSyncImpl(int milliseconds_from_now,
524 SyncerThread::NudgeSource source) {
525 const timespec nudge_time = GetPThreadAbsoluteTime(milliseconds_from_now);
526 NudgeObject nudge_object(nudge_time, source);
527 nudge_queue_.push(nudge_object);
528 pthread_cond_broadcast(&changed_.condvar_);
529 }
531 void SyncerThreadPthreadImpl::WatchTalkMediator(TalkMediator* mediator) {
532 talk_mediator_hookup_.reset(
533 NewEventListenerHookup(
534 mediator->channel(),
535 this,
536 &SyncerThreadPthreadImpl::HandleTalkMediatorEvent));
537 }
539 void SyncerThreadPthreadImpl::HandleTalkMediatorEvent(
540 const TalkMediatorEvent& event) {
541 MutexLock lock(&mutex_);
542 switch (event.what_happened) {
543 case TalkMediatorEvent::LOGIN_SUCCEEDED:
544 LOG(INFO) << "P2P: Login succeeded.";
545 p2p_authenticated_ = true;
546 break;
547 case TalkMediatorEvent::LOGOUT_SUCCEEDED:
548 LOG(INFO) << "P2P: Login succeeded.";
549 p2p_authenticated_ = false;
550 break;
551 case TalkMediatorEvent::SUBSCRIPTIONS_ON:
552 LOG(INFO) << "P2P: Subscriptions successfully enabled.";
553 p2p_subscribed_ = true;
554 if (NULL != syncer_) {
555 LOG(INFO) << "Subscriptions on. Nudging syncer for initial push.";
556 NudgeSyncImpl(0, SyncerThread::kLocal);
557 }
558 break;
559 case TalkMediatorEvent::SUBSCRIPTIONS_OFF:
560 LOG(INFO) << "P2P: Subscriptions are not enabled.";
561 p2p_subscribed_ = false;
562 break;
563 case TalkMediatorEvent::NOTIFICATION_RECEIVED:
564 LOG(INFO) << "P2P: Updates on server, pushing syncer";
565 if (NULL != syncer_) {
566 NudgeSyncImpl(0, SyncerThread::kNotification);
567 }
568 break;
569 default:
570 break;
571 }
573 if (NULL != syncer_) {
574 syncer_->set_notifications_enabled(p2p_authenticated_ && p2p_subscribed_);
575 }
576 }
578 } // namespace browser_sync
« no previous file with comments | « chrome/browser/sync/engine/syncer_thread_pthreads.h ('k') | chrome/browser/sync/engine/syncer_thread_timed_stop.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698