Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(130)

Side by Side Diff: chrome/browser/sync/engine/syncer_thread.cc

Issue 194065: Initial commit of sync engine code to browser/sync.... (Closed) Base URL: svn://chrome-svn/chrome/trunk/src/
Patch Set: Fixes to gtest include path, reverted syncapi. Created 11 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
Property Changes:
Added: svn:eol-style
+ LF
OLDNEW
(Empty)
1 // Copyright (c) 2009 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "chrome/browser/sync/engine/syncer_thread.h"
6
7 #ifdef OS_MACOSX
8 #include <CoreFoundation/CFNumber.h>
9 #include <IOKit/IOTypes.h>
10 #include <IOKit/IOKitLib.h>
11 #endif
12
13 #include <algorithm>
14 #include <map>
15 #include <queue>
16
17 #include "chrome/browser/sync/engine/auth_watcher.h"
18 #include "chrome/browser/sync/engine/model_safe_worker.h"
19 #include "chrome/browser/sync/engine/net/server_connection_manager.h"
20 #include "chrome/browser/sync/engine/syncer.h"
21 #include "chrome/browser/sync/notifier/listener/talk_mediator.h"
22 #include "chrome/browser/sync/notifier/listener/talk_mediator_impl.h"
23 #include "chrome/browser/sync/syncable/directory_manager.h"
24
25 using std::priority_queue;
26 using std::min;
27
28 static inline bool operator < (const timespec& a, const timespec& b) {
29 return a.tv_sec == b.tv_sec ? a.tv_nsec < b.tv_nsec : a.tv_sec < b.tv_sec;
30 }
31
32 namespace {
33
34 // returns the amount of time since the user last interacted with
35 // the computer, in milliseconds
36 int UserIdleTime() {
37 #ifdef OS_WINDOWS
38 LASTINPUTINFO last_input_info;
39 last_input_info.cbSize = sizeof(LASTINPUTINFO);
40
41 // get time in windows ticks since system start of last activity
42 BOOL b = ::GetLastInputInfo(&last_input_info);
43 if (b == TRUE)
44 return ::GetTickCount() - last_input_info.dwTime;
45 #elif defined(OS_MACOSX)
46 // It would be great to do something like:
47 //
48 // return 1000 *
49 // CGEventSourceSecondsSinceLastEventType(
50 // kCGEventSourceStateCombinedSessionState,
51 // kCGAnyInputEventType);
52 //
53 // Unfortunately, CGEvent* lives in ApplicationServices, and we're a daemon
54 // and can't link that high up the food chain. Thus this mucking in IOKit.
55
56 io_service_t hid_service =
57 IOServiceGetMatchingService(kIOMasterPortDefault,
58 IOServiceMatching("IOHIDSystem"));
59 if (!hid_service) {
60 LOG(WARNING) << "Could not obtain IOHIDSystem";
61 return 0;
62 }
63
64 CFTypeRef object = IORegistryEntryCreateCFProperty(hid_service,
65 CFSTR("HIDIdleTime"),
66 kCFAllocatorDefault,
67 0);
68 if (!object) {
69 LOG(WARNING) << "Could not get IOHIDSystem's HIDIdleTime property";
70 IOObjectRelease(hid_service);
71 return 0;
72 }
73
74 int64 idle_time; // in nanoseconds
75 Boolean success;
76 if (CFGetTypeID(object) == CFNumberGetTypeID()) {
77 success = CFNumberGetValue((CFNumberRef)object,
78 kCFNumberSInt64Type,
79 &idle_time);
80 } else {
81 LOG(WARNING) << "IOHIDSystem's HIDIdleTime property isn't a number!";
82 }
83
84 CFRelease(object);
85 IOObjectRelease(hid_service);
86
87 if (!success) {
88 LOG(WARNING) << "Could not get IOHIDSystem's HIDIdleTime property's value";
89 return 0;
90 } else {
91 return idle_time / 1000000; // nano to milli
92 }
93 #else
94 static bool was_logged = false;
95 if (!was_logged) {
96 was_logged = true;
97 LOG(INFO) << "UserIdleTime unimplemented on this platform, "
98 "synchronization will not throttle when user idle";
99 }
100 #endif
101
102 return 0;
103 }
104
105 } // namespace
106
107 namespace browser_sync {
108
109 bool SyncerThread::NudgeSyncer(int milliseconds_from_now, NudgeSource source) {
110 MutexLock lock(&mutex_);
111 if (syncer_ == NULL) {
112 return false;
113 }
114 NudgeSyncImpl(milliseconds_from_now, source);
115 return true;
116 }
117
118 void* RunSyncerThread(void* syncer_thread) {
119 return (reinterpret_cast<SyncerThread*>(syncer_thread))->ThreadMain();
120 }
121
122 SyncerThread::SyncerThread(
123 ClientCommandChannel* command_channel,
124 syncable::DirectoryManager* mgr,
125 ServerConnectionManager* connection_manager,
126 AllStatus* all_status,
127 ModelSafeWorker* model_safe_worker)
128 : dirman_(mgr), scm_(connection_manager),
129 syncer_(NULL), syncer_events_(NULL), thread_running_(false),
130 syncer_short_poll_interval_seconds_(kDefaultShortPollIntervalSeconds),
131 syncer_long_poll_interval_seconds_(kDefaultLongPollIntervalSeconds),
132 syncer_polling_interval_(kDefaultShortPollIntervalSeconds),
133 syncer_max_interval_(kDefaultMaxPollIntervalMs),
134 stop_syncer_thread_(false), connected_(false), conn_mgr_hookup_(NULL),
135 p2p_authenticated_(false), p2p_subscribed_(false),
136 allstatus_(all_status), talk_mediator_hookup_(NULL),
137 command_channel_(command_channel), directory_manager_hookup_(NULL),
138 model_safe_worker_(model_safe_worker),
139 client_command_hookup_(NULL), disable_idle_detection_(false) {
140
141 SyncerEvent shutdown = { SyncerEvent::SHUTDOWN_USE_WITH_CARE };
142 syncer_event_channel_.reset(new SyncerEventChannel(shutdown));
143
144 if (dirman_) {
145 directory_manager_hookup_.reset(NewEventListenerHookup(
146 dirman_->channel(), this, &SyncerThread::HandleDirectoryManagerEvent));
147 }
148
149 if (scm_) {
150 WatchConnectionManager(scm_);
151 }
152
153 if (command_channel_) {
154 WatchClientCommands(command_channel_);
155 }
156 }
157
158 SyncerThread::~SyncerThread() {
159 client_command_hookup_.reset();
160 conn_mgr_hookup_.reset();
161 syncer_event_channel_.reset();
162 directory_manager_hookup_.reset();
163 syncer_events_.reset();
164 delete syncer_;
165 talk_mediator_hookup_.reset();
166 CHECK(!thread_running_);
167 }
168
169 // Creates and starts a syncer thread.
170 // Returns true if it creates a thread or if there's currently a thread
171 // running and false otherwise.
172 bool SyncerThread::Start() {
173 MutexLock lock(&mutex_);
174 if (thread_running_) {
175 return true;
176 }
177 thread_running_ =
178 (0 == pthread_create(&thread_, NULL, RunSyncerThread, this));
179 if (thread_running_) {
180 pthread_detach(thread_);
181 }
182 return thread_running_;
183 }
184
185 // Stop processing. A max wait of at least 2*server RTT time is recommended.
186 // returns true if we stopped, false otherwise.
187 bool SyncerThread::Stop(int max_wait) {
188 MutexLock lock(&mutex_);
189 if (!thread_running_)
190 return true;
191 stop_syncer_thread_ = true;
192 if (NULL != syncer_) {
193 // try to early exit the syncer
194 syncer_->RequestEarlyExit();
195 }
196 pthread_cond_broadcast(&changed_.condvar_);
197 timespec deadline = { time(NULL) + (max_wait / 1000), 0 };
198 do {
199 const int wait_result = max_wait < 0 ?
200 pthread_cond_wait(&changed_.condvar_, &mutex_.mutex_) :
201 pthread_cond_timedwait(&changed_.condvar_, &mutex_.mutex_,
202 &deadline);
203 if (ETIMEDOUT == wait_result) {
204 LOG(ERROR) << "SyncerThread::Stop timed out. Problems likely.";
205 return false;
206 }
207 } while (thread_running_);
208 return true;
209 }
210
211 void SyncerThread::WatchClientCommands(ClientCommandChannel* channel) {
212 PThreadScopedLock<PThreadMutex> lock(&mutex_);
213 client_command_hookup_.reset(NewEventListenerHookup(channel, this,
214 &SyncerThread::HandleClientCommand));
215 }
216
217 void SyncerThread::HandleClientCommand(ClientCommandChannel::EventType
218 event) {
219 if (!event) {
220 return;
221 }
222
223 // mutex not really necessary for these
224 if (event->has_set_sync_poll_interval()) {
225 syncer_short_poll_interval_seconds_ = event->set_sync_poll_interval();
226 }
227
228 if (event->has_set_sync_long_poll_interval()) {
229 syncer_long_poll_interval_seconds_ = event->set_sync_long_poll_interval();
230 }
231 }
232
233 void SyncerThread::ThreadMainLoop() {
234 // Use the short poll value by default.
235 int poll_seconds = syncer_short_poll_interval_seconds_;
236 int user_idle_milliseconds = 0;
237 timespec last_sync_time = { 0 };
238 bool initial_sync_for_thread = true;
239 bool continue_sync_cycle = false;
240
241 while (!stop_syncer_thread_) {
242 if (!connected_) {
243 LOG(INFO) << "Syncer thread waiting for connection.";
244 while (!connected_ && !stop_syncer_thread_)
245 pthread_cond_wait(&changed_.condvar_, &mutex_.mutex_);
246 LOG_IF(INFO, connected_) << "Syncer thread found connection.";
247 continue;
248 }
249
250 if (syncer_ == NULL) {
251 LOG(INFO) << "Syncer thread waiting for database initialization.";
252 while (syncer_ == NULL && !stop_syncer_thread_)
253 pthread_cond_wait(&changed_.condvar_, &mutex_.mutex_);
254 LOG_IF(INFO, !(syncer_ == NULL)) << "Syncer was found after DB started.";
255 continue;
256 }
257
258 timespec const next_poll = { last_sync_time.tv_sec + poll_seconds,
259 last_sync_time.tv_nsec };
260 const timespec wake_time =
261 !nudge_queue_.empty() && nudge_queue_.top().first < next_poll ?
262 nudge_queue_.top().first : next_poll;
263 LOG(INFO) << "wake time is " << wake_time.tv_sec;
264 LOG(INFO) << "next poll is " << next_poll.tv_sec;
265
266 const int error = pthread_cond_timedwait(&changed_.condvar_, &mutex_.mutex_,
267 &wake_time);
268 if (ETIMEDOUT != error) {
269 continue; // Check all the conditions again.
270 }
271
272 const timespec now = GetPThreadAbsoluteTime(0);
273
274 // Handle a nudge, caused by either a notification or a local bookmark
275 // event. This will also update the source of the following SyncMain call.
276 UpdateNudgeSource(now, &continue_sync_cycle, &initial_sync_for_thread);
277
278 LOG(INFO) << "Calling Sync Main at time " << now.tv_sec;
279 SyncMain(syncer_);
280 last_sync_time = now;
281
282 LOG(INFO) << "Updating the next polling time after SyncMain";
283 poll_seconds = CalculatePollingWaitTime(allstatus_->status(),
284 poll_seconds,
285 &user_idle_milliseconds,
286 &continue_sync_cycle);
287 }
288 }
289
290 // We check how long the user's been idle and sync less often if the
291 // machine is not in use. The aim is to reduce server load.
292 int SyncerThread::CalculatePollingWaitTime(
293 const AllStatus::Status& status,
294 int last_poll_wait, // in s
295 int* user_idle_milliseconds,
296 bool* continue_sync_cycle) {
297 bool is_continuing_sync_cyle = *continue_sync_cycle;
298 *continue_sync_cycle = false;
299
300 // Determine if the syncer has unfinished work to do from allstatus_.
301 const bool syncer_has_work_to_do =
302 status.updates_available > status.updates_received
303 || status.unsynced_count > 0;
304 LOG(INFO) << "syncer_has_work_to_do is " << syncer_has_work_to_do;
305
306 // First calculate the expected wait time, figuring in any backoff because of
307 // user idle time. next_wait is in seconds
308 syncer_polling_interval_ = (!status.notifications_enabled) ?
309 syncer_short_poll_interval_seconds_ :
310 syncer_long_poll_interval_seconds_;
311 int default_next_wait = syncer_polling_interval_;
312 int actual_next_wait = default_next_wait;
313
314 if (syncer_has_work_to_do) {
315 // Provide exponential backoff due to consecutive errors, else attempt to
316 // complete the work as soon as possible.
317 if (!is_continuing_sync_cyle) {
318 actual_next_wait = AllStatus::GetRecommendedDelaySeconds(0);
319 } else {
320 actual_next_wait = AllStatus::GetRecommendedDelaySeconds(last_poll_wait);
321 }
322 *continue_sync_cycle = true;
323 } else if (!status.notifications_enabled) {
324 // Ensure that we start exponential backoff from our base polling
325 // interval when we are not continuing a sync cycle.
326 last_poll_wait = std::max(last_poll_wait, syncer_polling_interval_);
327
328 // Did the user start interacting with the computer again?
329 // If so, revise our idle time (and probably next_sync_time) downwards
330 int new_idle_time = disable_idle_detection_ ? 0 : UserIdleTime();
331 if (new_idle_time < *user_idle_milliseconds) {
332 *user_idle_milliseconds = new_idle_time;
333 }
334 actual_next_wait = CalculateSyncWaitTime(last_poll_wait * 1000,
335 *user_idle_milliseconds) / 1000;
336 DCHECK_GE(actual_next_wait, default_next_wait);
337 }
338
339 LOG(INFO) << "Sync wait: idle " << default_next_wait
340 << " non-idle or backoff " << actual_next_wait << ".";
341
342 return actual_next_wait;
343 }
344
345 void* SyncerThread::ThreadMain() {
346 NameCurrentThreadForDebugging("SyncEngine_SyncerThread");
347 mutex_.Lock();
348 ThreadMainLoop();
349 thread_running_ = false;
350 pthread_cond_broadcast(&changed_.condvar_);
351 mutex_.Unlock();
352 LOG(INFO) << "Syncer thread exiting.";
353 return 0;
354 }
355
356 void SyncerThread::SyncMain(Syncer* syncer) {
357 CHECK(syncer);
358 mutex_.Unlock();
359 while (syncer->SyncShare()) {
360 LOG(INFO) << "Looping in sync share";
361 }
362 LOG(INFO) << "Done looping in sync share";
363
364 mutex_.Lock();
365 }
366
367 void SyncerThread::UpdateNudgeSource(const timespec& now,
368 bool* continue_sync_cycle,
369 bool* initial_sync) {
370 bool nudged = false;
371 NudgeSource nudge_source = kUnknown;
372 // Has the previous sync cycle completed?
373 if (continue_sync_cycle) {
374 nudge_source = kContinuation;
375 }
376 // Update the nudge source if a new nudge has come through during the
377 // previous sync cycle.
378 while (!nudge_queue_.empty() && !(now < nudge_queue_.top().first)) {
379 if (!nudged) {
380 nudge_source = nudge_queue_.top().second;
381 *continue_sync_cycle = false; // Reset the continuation token on nudge.
382 nudged = true;
383 }
384 nudge_queue_.pop();
385 }
386 SetUpdatesSource(nudged, nudge_source, initial_sync);
387 }
388
389 void SyncerThread::SetUpdatesSource(bool nudged, NudgeSource nudge_source,
390 bool* initial_sync) {
391 sync_pb::GetUpdatesCallerInfo::GET_UPDATES_SOURCE updates_source =
392 sync_pb::GetUpdatesCallerInfo::UNKNOWN;
393 if (*initial_sync) {
394 updates_source = sync_pb::GetUpdatesCallerInfo::FIRST_UPDATE;
395 *initial_sync = false;
396 } else if (!nudged) {
397 updates_source = sync_pb::GetUpdatesCallerInfo::PERIODIC;
398 } else {
399 switch (nudge_source) {
400 case kNotification:
401 updates_source = sync_pb::GetUpdatesCallerInfo::NOTIFICATION;
402 break;
403 case kLocal:
404 updates_source = sync_pb::GetUpdatesCallerInfo::LOCAL;
405 break;
406 case kContinuation:
407 updates_source = sync_pb::GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION;
408 break;
409 case kUnknown:
410 default:
411 updates_source = sync_pb::GetUpdatesCallerInfo::UNKNOWN;
412 break;
413 }
414 }
415 syncer_->set_updates_source(updates_source);
416 }
417
418 void SyncerThread::HandleSyncerEvent(const SyncerEvent& event) {
419 MutexLock lock(&mutex_);
420 channel()->NotifyListeners(event);
421 if (SyncerEvent::REQUEST_SYNC_NUDGE != event.what_happened) {
422 return;
423 }
424 NudgeSyncImpl(event.nudge_delay_milliseconds, kUnknown);
425 }
426
427 void SyncerThread::HandleDirectoryManagerEvent(
428 const syncable::DirectoryManagerEvent& event) {
429 LOG(INFO) << "Handling a directory manager event";
430 if (syncable::DirectoryManagerEvent::OPENED == event.what_happened) {
431 MutexLock lock(&mutex_);
432 LOG(INFO) << "Syncer starting up for: " << event.dirname;
433 // The underlying database structure is ready, and we should create
434 // the syncer.
435 CHECK(syncer_ == NULL);
436 syncer_ =
437 new Syncer(dirman_, event.dirname, scm_, model_safe_worker_.get());
438
439 syncer_->set_command_channel(command_channel_);
440 syncer_events_.reset(NewEventListenerHookup(
441 syncer_->channel(), this, &SyncerThread::HandleSyncerEvent));
442 pthread_cond_broadcast(&changed_.condvar_);
443 }
444 }
445
446 static inline void CheckConnected(bool* connected,
447 HttpResponse::ServerConnectionCode code,
448 pthread_cond_t* condvar) {
449 if (*connected) {
450 if (HttpResponse::CONNECTION_UNAVAILABLE == code) {
451 *connected = false;
452 pthread_cond_broadcast(condvar);
453 }
454 } else {
455 if (HttpResponse::SERVER_CONNECTION_OK == code) {
456 *connected = true;
457 pthread_cond_broadcast(condvar);
458 }
459 }
460 }
461
462 void SyncerThread::WatchConnectionManager(ServerConnectionManager* conn_mgr) {
463 conn_mgr_hookup_.reset(NewEventListenerHookup(conn_mgr->channel(), this,
464 &SyncerThread::HandleServerConnectionEvent));
465 CheckConnected(&connected_, conn_mgr->server_status(),
466 &changed_.condvar_);
467 }
468
469 void SyncerThread::HandleServerConnectionEvent(
470 const ServerConnectionEvent& event) {
471 if (ServerConnectionEvent::STATUS_CHANGED == event.what_happened) {
472 MutexLock lock(&mutex_);
473 CheckConnected(&connected_, event.connection_code,
474 &changed_.condvar_);
475 }
476 }
477
478 SyncerEventChannel* SyncerThread::channel() {
479 return syncer_event_channel_.get();
480 }
481
482 // inputs and return value in milliseconds
483 int SyncerThread::CalculateSyncWaitTime(int last_interval, int user_idle_ms) {
484 // syncer_polling_interval_ is in seconds
485 int syncer_polling_interval_ms = syncer_polling_interval_ * 1000;
486
487 // This is our default and lower bound.
488 int next_wait = syncer_polling_interval_ms;
489
490 // Get idle time, bounded by max wait.
491 int idle = min(user_idle_ms, syncer_max_interval_);
492
493 // If the user has been idle for a while,
494 // we'll start decreasing the poll rate.
495 if (idle >= kPollBackoffThresholdMultiplier * syncer_polling_interval_ms) {
496 next_wait = std::min(AllStatus::GetRecommendedDelaySeconds(
497 last_interval / 1000), syncer_max_interval_ / 1000) * 1000;
498 }
499
500 return next_wait;
501 }
502
503 // Called with mutex_ already locked
504 void SyncerThread::NudgeSyncImpl(int milliseconds_from_now,
505 NudgeSource source) {
506 const timespec nudge_time = GetPThreadAbsoluteTime(milliseconds_from_now);
507 NudgeObject nudge_object(nudge_time, source);
508 nudge_queue_.push(nudge_object);
509 pthread_cond_broadcast(&changed_.condvar_);
510 }
511
512 void SyncerThread::WatchTalkMediator(TalkMediator* mediator) {
513 talk_mediator_hookup_.reset(
514 NewEventListenerHookup(
515 mediator->channel(),
516 this,
517 &SyncerThread::HandleTalkMediatorEvent));
518 }
519
520 void SyncerThread::HandleTalkMediatorEvent(const TalkMediatorEvent& event) {
521 MutexLock lock(&mutex_);
522 switch (event.what_happened) {
523 case TalkMediatorEvent::LOGIN_SUCCEEDED:
524 LOG(INFO) << "P2P: Login succeeded.";
525 p2p_authenticated_ = true;
526 break;
527 case TalkMediatorEvent::LOGOUT_SUCCEEDED:
528 LOG(INFO) << "P2P: Login succeeded.";
529 p2p_authenticated_ = false;
530 break;
531 case TalkMediatorEvent::SUBSCRIPTIONS_ON:
532 LOG(INFO) << "P2P: Subscriptions successfully enabled.";
533 p2p_subscribed_ = true;
534 if (NULL != syncer_) {
535 LOG(INFO) << "Subscriptions on. Nudging syncer for initial push.";
536 NudgeSyncImpl(0, kLocal);
537 }
538 break;
539 case TalkMediatorEvent::SUBSCRIPTIONS_OFF:
540 LOG(INFO) << "P2P: Subscriptions are not enabled.";
541 p2p_subscribed_ = false;
542 break;
543 case TalkMediatorEvent::NOTIFICATION_RECEIVED:
544 LOG(INFO) << "P2P: Updates on server, pushing syncer";
545 if (NULL != syncer_) {
546 NudgeSyncImpl(0, kNotification);
547 }
548 break;
549 default:
550 break;
551 }
552
553 if (NULL != syncer_) {
554 syncer_->set_notifications_enabled(p2p_authenticated_ && p2p_subscribed_);
555 }
556 }
557
558 } // namespace browser_sync
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698