Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(194)

Side by Side Diff: chrome/browser/sync/engine/syncer_thread_pthreads.cc

Issue 261042: Remove pthreads from build and installer files. (Closed) Base URL: svn://chrome-svn/chrome/trunk/src/
Patch Set: '' Created 11 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 // Copyright (c) 2009 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 #include "chrome/browser/sync/engine/syncer_thread_pthreads.h"
5
6 #include "build/build_config.h"
7
8 #ifdef OS_MACOSX
9 #include <CoreFoundation/CFNumber.h>
10 #include <IOKit/IOTypes.h>
11 #include <IOKit/IOKitLib.h>
12 #endif
13
14 #include <algorithm>
15 #include <map>
16 #include <queue>
17
18 #include "chrome/browser/sync/engine/auth_watcher.h"
19 #include "chrome/browser/sync/engine/model_safe_worker.h"
20 #include "chrome/browser/sync/engine/net/server_connection_manager.h"
21 #include "chrome/browser/sync/engine/syncer.h"
22 #include "chrome/browser/sync/notifier/listener/talk_mediator.h"
23 #include "chrome/browser/sync/notifier/listener/talk_mediator_impl.h"
24 #include "chrome/browser/sync/syncable/directory_manager.h"
25
26 using std::priority_queue;
27 using std::min;
28
29 static inline bool operator < (const timespec& a, const timespec& b) {
30 return a.tv_sec == b.tv_sec ? a.tv_nsec < b.tv_nsec : a.tv_sec < b.tv_sec;
31 }
32
33 namespace {
34
35 // Returns the amount of time since the user last interacted with the computer,
36 // in milliseconds
37 int UserIdleTime() {
38 #ifdef OS_WIN
39 LASTINPUTINFO last_input_info;
40 last_input_info.cbSize = sizeof(LASTINPUTINFO);
41
42 // Get time in windows ticks since system start of last activity.
43 BOOL b = ::GetLastInputInfo(&last_input_info);
44 if (b == TRUE)
45 return ::GetTickCount() - last_input_info.dwTime;
46 #elif defined(OS_MACOSX)
47 // It would be great to do something like:
48 //
49 // return 1000 *
50 // CGEventSourceSecondsSinceLastEventType(
51 // kCGEventSourceStateCombinedSessionState,
52 // kCGAnyInputEventType);
53 //
54 // Unfortunately, CGEvent* lives in ApplicationServices, and we're a daemon
55 // and can't link that high up the food chain. Thus this mucking in IOKit.
56
57 io_service_t hid_service =
58 IOServiceGetMatchingService(kIOMasterPortDefault,
59 IOServiceMatching("IOHIDSystem"));
60 if (!hid_service) {
61 LOG(WARNING) << "Could not obtain IOHIDSystem";
62 return 0;
63 }
64
65 CFTypeRef object = IORegistryEntryCreateCFProperty(hid_service,
66 CFSTR("HIDIdleTime"),
67 kCFAllocatorDefault,
68 0);
69 if (!object) {
70 LOG(WARNING) << "Could not get IOHIDSystem's HIDIdleTime property";
71 IOObjectRelease(hid_service);
72 return 0;
73 }
74
75 int64 idle_time; // in nanoseconds
76 Boolean success;
77 if (CFGetTypeID(object) == CFNumberGetTypeID()) {
78 success = CFNumberGetValue((CFNumberRef)object,
79 kCFNumberSInt64Type,
80 &idle_time);
81 } else {
82 LOG(WARNING) << "IOHIDSystem's HIDIdleTime property isn't a number!";
83 }
84
85 CFRelease(object);
86 IOObjectRelease(hid_service);
87
88 if (!success) {
89 LOG(WARNING) << "Could not get IOHIDSystem's HIDIdleTime property's value";
90 return 0;
91 } else {
92 return idle_time / 1000000; // nano to milli
93 }
94 #else
95 static bool was_logged = false;
96 if (!was_logged) {
97 was_logged = true;
98 LOG(INFO) << "UserIdleTime unimplemented on this platform, "
99 "synchronization will not throttle when user idle";
100 }
101 #endif
102
103 return 0;
104 }
105
106 } // namespace
107
108 namespace browser_sync {
109
110 SyncerThreadPthreads::SyncerThreadPthreads(
111 ClientCommandChannel* command_channel,
112 syncable::DirectoryManager* mgr,
113 ServerConnectionManager* connection_manager,
114 AllStatus* all_status, ModelSafeWorker* model_safe_worker)
115 : SyncerThread() {
116 impl_.reset(new SyncerThreadPthreadImpl(command_channel, mgr,
117 connection_manager, all_status, model_safe_worker));
118 }
119
120 bool SyncerThreadPthreadImpl::NudgeSyncer(int milliseconds_from_now,
121 SyncerThread::NudgeSource source) {
122 MutexLock lock(&mutex_);
123 if (syncer_ == NULL) {
124 return false;
125 }
126 NudgeSyncImpl(milliseconds_from_now, source);
127 return true;
128 }
129
130 void* RunSyncerThread(void* syncer_thread) {
131 return (reinterpret_cast<SyncerThreadPthreadImpl*>(
132 syncer_thread))->ThreadMain();
133 }
134
135 SyncerThreadPthreadImpl::SyncerThreadPthreadImpl(
136 ClientCommandChannel* command_channel,
137 syncable::DirectoryManager* mgr,
138 ServerConnectionManager* connection_manager,
139 AllStatus* all_status,
140 ModelSafeWorker* model_safe_worker)
141 : stop_syncer_thread_(false),
142 thread_running_(false),
143 connected_(false),
144 p2p_authenticated_(false),
145 p2p_subscribed_(false),
146 allstatus_(all_status),
147 syncer_(NULL),
148 dirman_(mgr),
149 scm_(connection_manager),
150 syncer_short_poll_interval_seconds_(
151 SyncerThread::kDefaultShortPollIntervalSeconds),
152 syncer_long_poll_interval_seconds_(
153 SyncerThread::kDefaultLongPollIntervalSeconds),
154 syncer_polling_interval_(SyncerThread::kDefaultShortPollIntervalSeconds),
155 syncer_max_interval_(SyncerThread::kDefaultMaxPollIntervalMs),
156 command_channel_(command_channel),
157 model_safe_worker_(model_safe_worker),
158 disable_idle_detection_(false) {
159
160 SyncerEvent shutdown = { SyncerEvent::SHUTDOWN_USE_WITH_CARE };
161 syncer_event_channel_.reset(new SyncerEventChannel(shutdown));
162
163 if (dirman_) {
164 directory_manager_hookup_.reset(NewEventListenerHookup(
165 dirman_->channel(), this,
166 &SyncerThreadPthreadImpl::HandleDirectoryManagerEvent));
167 }
168
169 if (scm_) {
170 WatchConnectionManager(scm_);
171 }
172
173 if (command_channel_) {
174 WatchClientCommands(command_channel_);
175 }
176 }
177
178 SyncerThreadPthreadImpl::~SyncerThreadPthreadImpl() {
179 client_command_hookup_.reset();
180 conn_mgr_hookup_.reset();
181 syncer_event_channel_.reset();
182 directory_manager_hookup_.reset();
183 syncer_events_.reset();
184 delete syncer_;
185 talk_mediator_hookup_.reset();
186 CHECK(!thread_running_);
187 }
188
189 // Creates and starts a syncer thread.
190 // Returns true if it creates a thread or if there's currently a thread running
191 // and false otherwise.
192 bool SyncerThreadPthreadImpl::Start() {
193 MutexLock lock(&mutex_);
194 if (thread_running_) {
195 return true;
196 }
197 thread_running_ =
198 (0 == pthread_create(&thread_, NULL, RunSyncerThread, this));
199 if (thread_running_) {
200 pthread_detach(thread_);
201 }
202 return thread_running_;
203 }
204
205 // Stop processing. A max wait of at least 2*server RTT time is recommended.
206 // Returns true if we stopped, false otherwise.
207 bool SyncerThreadPthreadImpl::Stop(int max_wait) {
208 MutexLock lock(&mutex_);
209 if (!thread_running_)
210 return true;
211 stop_syncer_thread_ = true;
212 if (NULL != syncer_) {
213 // Try to early exit the syncer.
214 syncer_->RequestEarlyExit();
215 }
216 pthread_cond_broadcast(&changed_.condvar_);
217 timespec deadline = { time(NULL) + (max_wait / 1000), 0 };
218 do {
219 const int wait_result = max_wait < 0 ?
220 pthread_cond_wait(&changed_.condvar_, &mutex_.mutex_) :
221 pthread_cond_timedwait(&changed_.condvar_, &mutex_.mutex_,
222 &deadline);
223 if (ETIMEDOUT == wait_result) {
224 LOG(ERROR) << "SyncerThread::Stop timed out. Problems likely.";
225 return false;
226 }
227 } while (thread_running_);
228 return true;
229 }
230
231 void SyncerThreadPthreadImpl::WatchClientCommands(
232 ClientCommandChannel* channel) {
233 PThreadScopedLock<PThreadMutex> lock(&mutex_);
234 client_command_hookup_.reset(NewEventListenerHookup(channel, this,
235 &SyncerThreadPthreadImpl::HandleClientCommand));
236 }
237
238 void SyncerThreadPthreadImpl::HandleClientCommand(
239 ClientCommandChannel::EventType event) {
240 if (!event) {
241 return;
242 }
243
244 // Mutex not really necessary for these.
245 if (event->has_set_sync_poll_interval()) {
246 syncer_short_poll_interval_seconds_ = event->set_sync_poll_interval();
247 }
248
249 if (event->has_set_sync_long_poll_interval()) {
250 syncer_long_poll_interval_seconds_ = event->set_sync_long_poll_interval();
251 }
252 }
253
254 void SyncerThreadPthreadImpl::ThreadMainLoop() {
255 // Use the short poll value by default.
256 int poll_seconds = syncer_short_poll_interval_seconds_;
257 int user_idle_milliseconds = 0;
258 timespec last_sync_time = { 0 };
259 bool initial_sync_for_thread = true;
260 bool continue_sync_cycle = false;
261
262 while (!stop_syncer_thread_) {
263 if (!connected_) {
264 LOG(INFO) << "Syncer thread waiting for connection.";
265 while (!connected_ && !stop_syncer_thread_)
266 pthread_cond_wait(&changed_.condvar_, &mutex_.mutex_);
267 LOG_IF(INFO, connected_) << "Syncer thread found connection.";
268 continue;
269 }
270
271 if (syncer_ == NULL) {
272 LOG(INFO) << "Syncer thread waiting for database initialization.";
273 while (syncer_ == NULL && !stop_syncer_thread_)
274 pthread_cond_wait(&changed_.condvar_, &mutex_.mutex_);
275 LOG_IF(INFO, !(syncer_ == NULL)) << "Syncer was found after DB started.";
276 continue;
277 }
278
279 timespec const next_poll = { last_sync_time.tv_sec + poll_seconds,
280 last_sync_time.tv_nsec };
281 const timespec wake_time =
282 !nudge_queue_.empty() && nudge_queue_.top().first < next_poll ?
283 nudge_queue_.top().first : next_poll;
284 LOG(INFO) << "wake time is " << wake_time.tv_sec;
285 LOG(INFO) << "next poll is " << next_poll.tv_sec;
286
287 const int error = pthread_cond_timedwait(&changed_.condvar_, &mutex_.mutex_,
288 &wake_time);
289 if (ETIMEDOUT != error) {
290 continue; // Check all the conditions again.
291 }
292
293 const timespec now = GetPThreadAbsoluteTime(0);
294
295 // Handle a nudge, caused by either a notification or a local bookmark
296 // event. This will also update the source of the following SyncMain call.
297 UpdateNudgeSource(now, &continue_sync_cycle, &initial_sync_for_thread);
298
299 LOG(INFO) << "Calling Sync Main at time " << now.tv_sec;
300 SyncMain(syncer_);
301 last_sync_time = now;
302
303 LOG(INFO) << "Updating the next polling time after SyncMain";
304 poll_seconds = CalculatePollingWaitTime(allstatus_->status(),
305 poll_seconds,
306 &user_idle_milliseconds,
307 &continue_sync_cycle);
308 }
309 }
310
311 // We check how long the user's been idle and sync less often if the machine is
312 // not in use. The aim is to reduce server load.
313 int SyncerThreadPthreadImpl::CalculatePollingWaitTime(
314 const AllStatus::Status& status,
315 int last_poll_wait, // in s
316 int* user_idle_milliseconds,
317 bool* continue_sync_cycle) {
318 bool is_continuing_sync_cyle = *continue_sync_cycle;
319 *continue_sync_cycle = false;
320
321 // Determine if the syncer has unfinished work to do from allstatus_.
322 const bool syncer_has_work_to_do =
323 status.updates_available > status.updates_received
324 || status.unsynced_count > 0;
325 LOG(INFO) << "syncer_has_work_to_do is " << syncer_has_work_to_do;
326
327 // First calculate the expected wait time, figuring in any backoff because of
328 // user idle time. next_wait is in seconds
329 syncer_polling_interval_ = (!status.notifications_enabled) ?
330 syncer_short_poll_interval_seconds_ :
331 syncer_long_poll_interval_seconds_;
332 int default_next_wait = syncer_polling_interval_;
333 int actual_next_wait = default_next_wait;
334
335 if (syncer_has_work_to_do) {
336 // Provide exponential backoff due to consecutive errors, else attempt to
337 // complete the work as soon as possible.
338 if (!is_continuing_sync_cyle) {
339 actual_next_wait = AllStatus::GetRecommendedDelaySeconds(0);
340 } else {
341 actual_next_wait = AllStatus::GetRecommendedDelaySeconds(last_poll_wait);
342 }
343 *continue_sync_cycle = true;
344 } else if (!status.notifications_enabled) {
345 // Ensure that we start exponential backoff from our base polling
346 // interval when we are not continuing a sync cycle.
347 last_poll_wait = std::max(last_poll_wait, syncer_polling_interval_);
348
349 // Did the user start interacting with the computer again?
350 // If so, revise our idle time (and probably next_sync_time) downwards
351 int new_idle_time = disable_idle_detection_ ? 0 : UserIdleTime();
352 if (new_idle_time < *user_idle_milliseconds) {
353 *user_idle_milliseconds = new_idle_time;
354 }
355 actual_next_wait = CalculateSyncWaitTime(last_poll_wait * 1000,
356 *user_idle_milliseconds) / 1000;
357 DCHECK_GE(actual_next_wait, default_next_wait);
358 }
359
360 LOG(INFO) << "Sync wait: idle " << default_next_wait
361 << " non-idle or backoff " << actual_next_wait << ".";
362
363 return actual_next_wait;
364 }
365
366 void* SyncerThreadPthreadImpl::ThreadMain() {
367 NameCurrentThreadForDebugging("SyncEngine_SyncerThread");
368 mutex_.Lock();
369 ThreadMainLoop();
370 thread_running_ = false;
371 pthread_cond_broadcast(&changed_.condvar_);
372 mutex_.Unlock();
373 LOG(INFO) << "Syncer thread exiting.";
374 return 0;
375 }
376
377 void SyncerThreadPthreadImpl::SyncMain(Syncer* syncer) {
378 CHECK(syncer);
379 mutex_.Unlock();
380 while (syncer->SyncShare()) {
381 LOG(INFO) << "Looping in sync share";
382 }
383 LOG(INFO) << "Done looping in sync share";
384
385 mutex_.Lock();
386 }
387
388 void SyncerThreadPthreadImpl::UpdateNudgeSource(const timespec& now,
389 bool* continue_sync_cycle,
390 bool* initial_sync) {
391 bool nudged = false;
392 SyncerThread::NudgeSource nudge_source = SyncerThread::kUnknown;
393 // Has the previous sync cycle completed?
394 if (continue_sync_cycle) {
395 nudge_source = SyncerThread::kContinuation;
396 }
397 // Update the nudge source if a new nudge has come through during the
398 // previous sync cycle.
399 while (!nudge_queue_.empty() && !(now < nudge_queue_.top().first)) {
400 if (!nudged) {
401 nudge_source = nudge_queue_.top().second;
402 *continue_sync_cycle = false; // Reset the continuation token on nudge.
403 nudged = true;
404 }
405 nudge_queue_.pop();
406 }
407 SetUpdatesSource(nudged, nudge_source, initial_sync);
408 }
409
410 void SyncerThreadPthreadImpl::SetUpdatesSource(bool nudged,
411 SyncerThread::NudgeSource nudge_source, bool* initial_sync) {
412 sync_pb::GetUpdatesCallerInfo::GET_UPDATES_SOURCE updates_source =
413 sync_pb::GetUpdatesCallerInfo::UNKNOWN;
414 if (*initial_sync) {
415 updates_source = sync_pb::GetUpdatesCallerInfo::FIRST_UPDATE;
416 *initial_sync = false;
417 } else if (!nudged) {
418 updates_source = sync_pb::GetUpdatesCallerInfo::PERIODIC;
419 } else {
420 switch (nudge_source) {
421 case SyncerThread::kNotification:
422 updates_source = sync_pb::GetUpdatesCallerInfo::NOTIFICATION;
423 break;
424 case SyncerThread::kLocal:
425 updates_source = sync_pb::GetUpdatesCallerInfo::LOCAL;
426 break;
427 case SyncerThread::kContinuation:
428 updates_source = sync_pb::GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION;
429 break;
430 case SyncerThread::kUnknown:
431 default:
432 updates_source = sync_pb::GetUpdatesCallerInfo::UNKNOWN;
433 break;
434 }
435 }
436 syncer_->set_updates_source(updates_source);
437 }
438
439 void SyncerThreadPthreadImpl::HandleSyncerEvent(const SyncerEvent& event) {
440 MutexLock lock(&mutex_);
441 channel()->NotifyListeners(event);
442 if (SyncerEvent::REQUEST_SYNC_NUDGE != event.what_happened) {
443 return;
444 }
445 NudgeSyncImpl(event.nudge_delay_milliseconds, SyncerThread::kUnknown);
446 }
447
448 void SyncerThreadPthreadImpl::HandleDirectoryManagerEvent(
449 const syncable::DirectoryManagerEvent& event) {
450 LOG(INFO) << "Handling a directory manager event";
451 if (syncable::DirectoryManagerEvent::OPENED == event.what_happened) {
452 MutexLock lock(&mutex_);
453 LOG(INFO) << "Syncer starting up for: " << event.dirname;
454 // The underlying database structure is ready, and we should create
455 // the syncer.
456 CHECK(syncer_ == NULL);
457 syncer_ =
458 new Syncer(dirman_, event.dirname, scm_, model_safe_worker_.get());
459
460 syncer_->set_command_channel(command_channel_);
461 syncer_events_.reset(NewEventListenerHookup(
462 syncer_->channel(), this, &SyncerThreadPthreadImpl::HandleSyncerEvent));
463 pthread_cond_broadcast(&changed_.condvar_);
464 }
465 }
466
467 static inline void CheckConnected(bool* connected,
468 HttpResponse::ServerConnectionCode code,
469 pthread_cond_t* condvar) {
470 if (*connected) {
471 if (HttpResponse::CONNECTION_UNAVAILABLE == code) {
472 *connected = false;
473 pthread_cond_broadcast(condvar);
474 }
475 } else {
476 if (HttpResponse::SERVER_CONNECTION_OK == code) {
477 *connected = true;
478 pthread_cond_broadcast(condvar);
479 }
480 }
481 }
482
483 void SyncerThreadPthreadImpl::WatchConnectionManager(
484 ServerConnectionManager* conn_mgr) {
485 conn_mgr_hookup_.reset(NewEventListenerHookup(conn_mgr->channel(), this,
486 &SyncerThreadPthreadImpl::HandleServerConnectionEvent));
487 CheckConnected(&connected_, conn_mgr->server_status(),
488 &changed_.condvar_);
489 }
490
491 void SyncerThreadPthreadImpl::HandleServerConnectionEvent(
492 const ServerConnectionEvent& event) {
493 if (ServerConnectionEvent::STATUS_CHANGED == event.what_happened) {
494 MutexLock lock(&mutex_);
495 CheckConnected(&connected_, event.connection_code,
496 &changed_.condvar_);
497 }
498 }
499
500 SyncerEventChannel* SyncerThreadPthreadImpl::channel() {
501 return syncer_event_channel_.get();
502 }
503
504 // Inputs and return value in milliseconds.
505 int SyncerThreadPthreadImpl::CalculateSyncWaitTime(int last_interval,
506 int user_idle_ms) {
507 // syncer_polling_interval_ is in seconds
508 int syncer_polling_interval_ms = syncer_polling_interval_ * 1000;
509
510 // This is our default and lower bound.
511 int next_wait = syncer_polling_interval_ms;
512
513 // Get idle time, bounded by max wait.
514 int idle = min(user_idle_ms, syncer_max_interval_);
515
516 // If the user has been idle for a while, we'll start decreasing the poll
517 // rate.
518 if (idle >= kPollBackoffThresholdMultiplier * syncer_polling_interval_ms) {
519 next_wait = std::min(AllStatus::GetRecommendedDelaySeconds(
520 last_interval / 1000), syncer_max_interval_ / 1000) * 1000;
521 }
522
523 return next_wait;
524 }
525
526 // Called with mutex_ already locked.
527 void SyncerThreadPthreadImpl::NudgeSyncImpl(int milliseconds_from_now,
528 SyncerThread::NudgeSource source) {
529 const timespec nudge_time = GetPThreadAbsoluteTime(milliseconds_from_now);
530 NudgeObject nudge_object(nudge_time, source);
531 nudge_queue_.push(nudge_object);
532 pthread_cond_broadcast(&changed_.condvar_);
533 }
534
535 void SyncerThreadPthreadImpl::WatchTalkMediator(TalkMediator* mediator) {
536 talk_mediator_hookup_.reset(
537 NewEventListenerHookup(
538 mediator->channel(),
539 this,
540 &SyncerThreadPthreadImpl::HandleTalkMediatorEvent));
541 }
542
543 void SyncerThreadPthreadImpl::HandleTalkMediatorEvent(
544 const TalkMediatorEvent& event) {
545 MutexLock lock(&mutex_);
546 switch (event.what_happened) {
547 case TalkMediatorEvent::LOGIN_SUCCEEDED:
548 LOG(INFO) << "P2P: Login succeeded.";
549 p2p_authenticated_ = true;
550 break;
551 case TalkMediatorEvent::LOGOUT_SUCCEEDED:
552 LOG(INFO) << "P2P: Login succeeded.";
553 p2p_authenticated_ = false;
554 break;
555 case TalkMediatorEvent::SUBSCRIPTIONS_ON:
556 LOG(INFO) << "P2P: Subscriptions successfully enabled.";
557 p2p_subscribed_ = true;
558 if (NULL != syncer_) {
559 LOG(INFO) << "Subscriptions on. Nudging syncer for initial push.";
560 NudgeSyncImpl(0, SyncerThread::kLocal);
561 }
562 break;
563 case TalkMediatorEvent::SUBSCRIPTIONS_OFF:
564 LOG(INFO) << "P2P: Subscriptions are not enabled.";
565 p2p_subscribed_ = false;
566 break;
567 case TalkMediatorEvent::NOTIFICATION_RECEIVED:
568 LOG(INFO) << "P2P: Updates on server, pushing syncer";
569 if (NULL != syncer_) {
570 NudgeSyncImpl(0, SyncerThread::kNotification);
571 }
572 break;
573 default:
574 break;
575 }
576
577 if (NULL != syncer_) {
578 syncer_->set_notifications_enabled(p2p_authenticated_ && p2p_subscribed_);
579 }
580 }
581
582 } // namespace browser_sync
OLDNEW
« no previous file with comments | « chrome/browser/sync/engine/syncer_thread_pthreads.h ('k') | chrome/browser/sync/util/compat_pthread.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698