Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(73)

Side by Side Diff: chrome/browser/sync/engine/syncer_thread_pthreads.cc

Issue 214033: Use chrome/base synchronization primitives and threads instead of... (Closed) Base URL: svn://chrome-svn/chrome/trunk/src/
Patch Set: '' Created 11 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2009 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2009 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4 #include "chrome/browser/sync/engine/syncer_thread_pthreads.h"
5 #include "chrome/browser/sync/engine/syncer_thread.h"
6 5
7 #include "build/build_config.h" 6 #include "build/build_config.h"
8 7
9 #ifdef OS_MACOSX 8 #ifdef OS_MACOSX
10 #include <CoreFoundation/CFNumber.h> 9 #include <CoreFoundation/CFNumber.h>
11 #include <IOKit/IOTypes.h> 10 #include <IOKit/IOTypes.h>
12 #include <IOKit/IOKitLib.h> 11 #include <IOKit/IOKitLib.h>
13 #endif 12 #endif
14 13
15 #include <algorithm> 14 #include <algorithm>
(...skipping 85 matching lines...) Expand 10 before | Expand all | Expand 10 after
101 } 100 }
102 #endif 101 #endif
103 102
104 return 0; 103 return 0;
105 } 104 }
106 105
107 } // namespace 106 } // namespace
108 107
109 namespace browser_sync { 108 namespace browser_sync {
110 109
111 bool SyncerThread::NudgeSyncer(int milliseconds_from_now, NudgeSource source) { 110 SyncerThreadPthreads::SyncerThreadPthreads(
111 ClientCommandChannel* command_channel,
112 syncable::DirectoryManager* mgr,
113 ServerConnectionManager* connection_manager,
114 AllStatus* all_status, ModelSafeWorker* model_safe_worker)
115 : SyncerThread() {
116 impl_.reset(new SyncerThreadPthreadImpl(command_channel, mgr,
117 connection_manager, all_status, model_safe_worker));
118 }
119
120 bool SyncerThreadPthreadImpl::NudgeSyncer(int milliseconds_from_now,
121 SyncerThread::NudgeSource source) {
112 MutexLock lock(&mutex_); 122 MutexLock lock(&mutex_);
113 if (syncer_ == NULL) { 123 if (syncer_ == NULL) {
114 return false; 124 return false;
115 } 125 }
116 NudgeSyncImpl(milliseconds_from_now, source); 126 NudgeSyncImpl(milliseconds_from_now, source);
117 return true; 127 return true;
118 } 128 }
119 129
120 void* RunSyncerThread(void* syncer_thread) { 130 void* RunSyncerThread(void* syncer_thread) {
121 return (reinterpret_cast<SyncerThread*>(syncer_thread))->ThreadMain(); 131 return (reinterpret_cast<SyncerThreadPthreadImpl*>(
132 syncer_thread))->ThreadMain();
122 } 133 }
123 134
124 SyncerThread::SyncerThread( 135 SyncerThreadPthreadImpl::SyncerThreadPthreadImpl(
125 ClientCommandChannel* command_channel, 136 ClientCommandChannel* command_channel,
126 syncable::DirectoryManager* mgr, 137 syncable::DirectoryManager* mgr,
127 ServerConnectionManager* connection_manager, 138 ServerConnectionManager* connection_manager,
128 AllStatus* all_status, 139 AllStatus* all_status,
129 ModelSafeWorker* model_safe_worker) 140 ModelSafeWorker* model_safe_worker)
130 : dirman_(mgr), scm_(connection_manager), 141 : dirman_(mgr), scm_(connection_manager),
131 syncer_(NULL), syncer_events_(NULL), thread_running_(false), 142 syncer_(NULL), syncer_events_(NULL), thread_running_(false),
132 syncer_short_poll_interval_seconds_(kDefaultShortPollIntervalSeconds), 143 syncer_short_poll_interval_seconds_(
133 syncer_long_poll_interval_seconds_(kDefaultLongPollIntervalSeconds), 144 SyncerThread::kDefaultShortPollIntervalSeconds),
134 syncer_polling_interval_(kDefaultShortPollIntervalSeconds), 145 syncer_long_poll_interval_seconds_(
135 syncer_max_interval_(kDefaultMaxPollIntervalMs), 146 SyncerThread::kDefaultLongPollIntervalSeconds),
147 syncer_polling_interval_(SyncerThread::kDefaultShortPollIntervalSeconds),
148 syncer_max_interval_(SyncerThread::kDefaultMaxPollIntervalMs),
136 stop_syncer_thread_(false), connected_(false), conn_mgr_hookup_(NULL), 149 stop_syncer_thread_(false), connected_(false), conn_mgr_hookup_(NULL),
137 p2p_authenticated_(false), p2p_subscribed_(false), 150 p2p_authenticated_(false), p2p_subscribed_(false),
138 allstatus_(all_status), talk_mediator_hookup_(NULL), 151 allstatus_(all_status), talk_mediator_hookup_(NULL),
139 command_channel_(command_channel), directory_manager_hookup_(NULL), 152 command_channel_(command_channel), directory_manager_hookup_(NULL),
140 model_safe_worker_(model_safe_worker), 153 model_safe_worker_(model_safe_worker),
141 client_command_hookup_(NULL), disable_idle_detection_(false) { 154 client_command_hookup_(NULL), disable_idle_detection_(false) {
142 155
143 SyncerEvent shutdown = { SyncerEvent::SHUTDOWN_USE_WITH_CARE }; 156 SyncerEvent shutdown = { SyncerEvent::SHUTDOWN_USE_WITH_CARE };
144 syncer_event_channel_.reset(new SyncerEventChannel(shutdown)); 157 syncer_event_channel_.reset(new SyncerEventChannel(shutdown));
145 158
146 if (dirman_) { 159 if (dirman_) {
147 directory_manager_hookup_.reset(NewEventListenerHookup( 160 directory_manager_hookup_.reset(NewEventListenerHookup(
148 dirman_->channel(), this, &SyncerThread::HandleDirectoryManagerEvent)); 161 dirman_->channel(), this,
162 &SyncerThreadPthreadImpl::HandleDirectoryManagerEvent));
149 } 163 }
150 164
151 if (scm_) { 165 if (scm_) {
152 WatchConnectionManager(scm_); 166 WatchConnectionManager(scm_);
153 } 167 }
154 168
155 if (command_channel_) { 169 if (command_channel_) {
156 WatchClientCommands(command_channel_); 170 WatchClientCommands(command_channel_);
157 } 171 }
158 } 172 }
159 173
160 SyncerThread::~SyncerThread() { 174 SyncerThreadPthreadImpl::~SyncerThreadPthreadImpl() {
161 client_command_hookup_.reset(); 175 client_command_hookup_.reset();
162 conn_mgr_hookup_.reset(); 176 conn_mgr_hookup_.reset();
163 syncer_event_channel_.reset(); 177 syncer_event_channel_.reset();
164 directory_manager_hookup_.reset(); 178 directory_manager_hookup_.reset();
165 syncer_events_.reset(); 179 syncer_events_.reset();
166 delete syncer_; 180 delete syncer_;
167 talk_mediator_hookup_.reset(); 181 talk_mediator_hookup_.reset();
168 CHECK(!thread_running_); 182 CHECK(!thread_running_);
169 } 183 }
170 184
171 // Creates and starts a syncer thread. 185 // Creates and starts a syncer thread.
172 // Returns true if it creates a thread or if there's currently a thread running 186 // Returns true if it creates a thread or if there's currently a thread running
173 // and false otherwise. 187 // and false otherwise.
174 bool SyncerThread::Start() { 188 bool SyncerThreadPthreadImpl::Start() {
175 MutexLock lock(&mutex_); 189 MutexLock lock(&mutex_);
176 if (thread_running_) { 190 if (thread_running_) {
177 return true; 191 return true;
178 } 192 }
179 thread_running_ = 193 thread_running_ =
180 (0 == pthread_create(&thread_, NULL, RunSyncerThread, this)); 194 (0 == pthread_create(&thread_, NULL, RunSyncerThread, this));
181 if (thread_running_) { 195 if (thread_running_) {
182 pthread_detach(thread_); 196 pthread_detach(thread_);
183 } 197 }
184 return thread_running_; 198 return thread_running_;
185 } 199 }
186 200
187 // Stop processing. A max wait of at least 2*server RTT time is recommended. 201 // Stop processing. A max wait of at least 2*server RTT time is recommended.
188 // Returns true if we stopped, false otherwise. 202 // Returns true if we stopped, false otherwise.
189 bool SyncerThread::Stop(int max_wait) { 203 bool SyncerThreadPthreadImpl::Stop(int max_wait) {
190 MutexLock lock(&mutex_); 204 MutexLock lock(&mutex_);
191 if (!thread_running_) 205 if (!thread_running_)
192 return true; 206 return true;
193 stop_syncer_thread_ = true; 207 stop_syncer_thread_ = true;
194 if (NULL != syncer_) { 208 if (NULL != syncer_) {
195 // Try to early exit the syncer. 209 // Try to early exit the syncer.
196 syncer_->RequestEarlyExit(); 210 syncer_->RequestEarlyExit();
197 } 211 }
198 pthread_cond_broadcast(&changed_.condvar_); 212 pthread_cond_broadcast(&changed_.condvar_);
199 timespec deadline = { time(NULL) + (max_wait / 1000), 0 }; 213 timespec deadline = { time(NULL) + (max_wait / 1000), 0 };
200 do { 214 do {
201 const int wait_result = max_wait < 0 ? 215 const int wait_result = max_wait < 0 ?
202 pthread_cond_wait(&changed_.condvar_, &mutex_.mutex_) : 216 pthread_cond_wait(&changed_.condvar_, &mutex_.mutex_) :
203 pthread_cond_timedwait(&changed_.condvar_, &mutex_.mutex_, 217 pthread_cond_timedwait(&changed_.condvar_, &mutex_.mutex_,
204 &deadline); 218 &deadline);
205 if (ETIMEDOUT == wait_result) { 219 if (ETIMEDOUT == wait_result) {
206 LOG(ERROR) << "SyncerThread::Stop timed out. Problems likely."; 220 LOG(ERROR) << "SyncerThread::Stop timed out. Problems likely.";
207 return false; 221 return false;
208 } 222 }
209 } while (thread_running_); 223 } while (thread_running_);
210 return true; 224 return true;
211 } 225 }
212 226
213 void SyncerThread::WatchClientCommands(ClientCommandChannel* channel) { 227 void SyncerThreadPthreadImpl::WatchClientCommands(
228 ClientCommandChannel* channel) {
214 PThreadScopedLock<PThreadMutex> lock(&mutex_); 229 PThreadScopedLock<PThreadMutex> lock(&mutex_);
215 client_command_hookup_.reset(NewEventListenerHookup(channel, this, 230 client_command_hookup_.reset(NewEventListenerHookup(channel, this,
216 &SyncerThread::HandleClientCommand)); 231 &SyncerThreadPthreadImpl::HandleClientCommand));
217 } 232 }
218 233
219 void SyncerThread::HandleClientCommand(ClientCommandChannel::EventType event) { 234 void SyncerThreadPthreadImpl::HandleClientCommand(
235 ClientCommandChannel::EventType event) {
220 if (!event) { 236 if (!event) {
221 return; 237 return;
222 } 238 }
223 239
224 // Mutex not really necessary for these. 240 // Mutex not really necessary for these.
225 if (event->has_set_sync_poll_interval()) { 241 if (event->has_set_sync_poll_interval()) {
226 syncer_short_poll_interval_seconds_ = event->set_sync_poll_interval(); 242 syncer_short_poll_interval_seconds_ = event->set_sync_poll_interval();
227 } 243 }
228 244
229 if (event->has_set_sync_long_poll_interval()) { 245 if (event->has_set_sync_long_poll_interval()) {
230 syncer_long_poll_interval_seconds_ = event->set_sync_long_poll_interval(); 246 syncer_long_poll_interval_seconds_ = event->set_sync_long_poll_interval();
231 } 247 }
232 } 248 }
233 249
234 void SyncerThread::ThreadMainLoop() { 250 void SyncerThreadPthreadImpl::ThreadMainLoop() {
235 // Use the short poll value by default. 251 // Use the short poll value by default.
236 int poll_seconds = syncer_short_poll_interval_seconds_; 252 int poll_seconds = syncer_short_poll_interval_seconds_;
237 int user_idle_milliseconds = 0; 253 int user_idle_milliseconds = 0;
238 timespec last_sync_time = { 0 }; 254 timespec last_sync_time = { 0 };
239 bool initial_sync_for_thread = true; 255 bool initial_sync_for_thread = true;
240 bool continue_sync_cycle = false; 256 bool continue_sync_cycle = false;
241 257
242 while (!stop_syncer_thread_) { 258 while (!stop_syncer_thread_) {
243 if (!connected_) { 259 if (!connected_) {
244 LOG(INFO) << "Syncer thread waiting for connection."; 260 LOG(INFO) << "Syncer thread waiting for connection.";
(...skipping 38 matching lines...) Expand 10 before | Expand all | Expand 10 after
283 LOG(INFO) << "Updating the next polling time after SyncMain"; 299 LOG(INFO) << "Updating the next polling time after SyncMain";
284 poll_seconds = CalculatePollingWaitTime(allstatus_->status(), 300 poll_seconds = CalculatePollingWaitTime(allstatus_->status(),
285 poll_seconds, 301 poll_seconds,
286 &user_idle_milliseconds, 302 &user_idle_milliseconds,
287 &continue_sync_cycle); 303 &continue_sync_cycle);
288 } 304 }
289 } 305 }
290 306
291 // We check how long the user's been idle and sync less often if the machine is 307 // We check how long the user's been idle and sync less often if the machine is
292 // not in use. The aim is to reduce server load. 308 // not in use. The aim is to reduce server load.
293 int SyncerThread::CalculatePollingWaitTime( 309 int SyncerThreadPthreadImpl::CalculatePollingWaitTime(
294 const AllStatus::Status& status, 310 const AllStatus::Status& status,
295 int last_poll_wait, // in s 311 int last_poll_wait, // in s
296 int* user_idle_milliseconds, 312 int* user_idle_milliseconds,
297 bool* continue_sync_cycle) { 313 bool* continue_sync_cycle) {
298 bool is_continuing_sync_cyle = *continue_sync_cycle; 314 bool is_continuing_sync_cyle = *continue_sync_cycle;
299 *continue_sync_cycle = false; 315 *continue_sync_cycle = false;
300 316
301 // Determine if the syncer has unfinished work to do from allstatus_. 317 // Determine if the syncer has unfinished work to do from allstatus_.
302 const bool syncer_has_work_to_do = 318 const bool syncer_has_work_to_do =
303 status.updates_available > status.updates_received 319 status.updates_available > status.updates_received
(...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after
336 *user_idle_milliseconds) / 1000; 352 *user_idle_milliseconds) / 1000;
337 DCHECK_GE(actual_next_wait, default_next_wait); 353 DCHECK_GE(actual_next_wait, default_next_wait);
338 } 354 }
339 355
340 LOG(INFO) << "Sync wait: idle " << default_next_wait 356 LOG(INFO) << "Sync wait: idle " << default_next_wait
341 << " non-idle or backoff " << actual_next_wait << "."; 357 << " non-idle or backoff " << actual_next_wait << ".";
342 358
343 return actual_next_wait; 359 return actual_next_wait;
344 } 360 }
345 361
346 void* SyncerThread::ThreadMain() { 362 void* SyncerThreadPthreadImpl::ThreadMain() {
347 NameCurrentThreadForDebugging("SyncEngine_SyncerThread"); 363 NameCurrentThreadForDebugging("SyncEngine_SyncerThread");
348 mutex_.Lock(); 364 mutex_.Lock();
349 ThreadMainLoop(); 365 ThreadMainLoop();
350 thread_running_ = false; 366 thread_running_ = false;
351 pthread_cond_broadcast(&changed_.condvar_); 367 pthread_cond_broadcast(&changed_.condvar_);
352 mutex_.Unlock(); 368 mutex_.Unlock();
353 LOG(INFO) << "Syncer thread exiting."; 369 LOG(INFO) << "Syncer thread exiting.";
354 return 0; 370 return 0;
355 } 371 }
356 372
357 void SyncerThread::SyncMain(Syncer* syncer) { 373 void SyncerThreadPthreadImpl::SyncMain(Syncer* syncer) {
358 CHECK(syncer); 374 CHECK(syncer);
359 mutex_.Unlock(); 375 mutex_.Unlock();
360 while (syncer->SyncShare()) { 376 while (syncer->SyncShare()) {
361 LOG(INFO) << "Looping in sync share"; 377 LOG(INFO) << "Looping in sync share";
362 } 378 }
363 LOG(INFO) << "Done looping in sync share"; 379 LOG(INFO) << "Done looping in sync share";
364 380
365 mutex_.Lock(); 381 mutex_.Lock();
366 } 382 }
367 383
368 void SyncerThread::UpdateNudgeSource(const timespec& now, 384 void SyncerThreadPthreadImpl::UpdateNudgeSource(const timespec& now,
369 bool* continue_sync_cycle, 385 bool* continue_sync_cycle,
370 bool* initial_sync) { 386 bool* initial_sync) {
371 bool nudged = false; 387 bool nudged = false;
372 NudgeSource nudge_source = kUnknown; 388 SyncerThread::NudgeSource nudge_source = SyncerThread::kUnknown;
373 // Has the previous sync cycle completed? 389 // Has the previous sync cycle completed?
374 if (continue_sync_cycle) { 390 if (continue_sync_cycle) {
375 nudge_source = kContinuation; 391 nudge_source = SyncerThread::kContinuation;
376 } 392 }
377 // Update the nudge source if a new nudge has come through during the 393 // Update the nudge source if a new nudge has come through during the
378 // previous sync cycle. 394 // previous sync cycle.
379 while (!nudge_queue_.empty() && !(now < nudge_queue_.top().first)) { 395 while (!nudge_queue_.empty() && !(now < nudge_queue_.top().first)) {
380 if (!nudged) { 396 if (!nudged) {
381 nudge_source = nudge_queue_.top().second; 397 nudge_source = nudge_queue_.top().second;
382 *continue_sync_cycle = false; // Reset the continuation token on nudge. 398 *continue_sync_cycle = false; // Reset the continuation token on nudge.
383 nudged = true; 399 nudged = true;
384 } 400 }
385 nudge_queue_.pop(); 401 nudge_queue_.pop();
386 } 402 }
387 SetUpdatesSource(nudged, nudge_source, initial_sync); 403 SetUpdatesSource(nudged, nudge_source, initial_sync);
388 } 404 }
389 405
390 void SyncerThread::SetUpdatesSource(bool nudged, NudgeSource nudge_source, 406 void SyncerThreadPthreadImpl::SetUpdatesSource(bool nudged,
391 bool* initial_sync) { 407 SyncerThread::NudgeSource nudge_source, bool* initial_sync) {
392 sync_pb::GetUpdatesCallerInfo::GET_UPDATES_SOURCE updates_source = 408 sync_pb::GetUpdatesCallerInfo::GET_UPDATES_SOURCE updates_source =
393 sync_pb::GetUpdatesCallerInfo::UNKNOWN; 409 sync_pb::GetUpdatesCallerInfo::UNKNOWN;
394 if (*initial_sync) { 410 if (*initial_sync) {
395 updates_source = sync_pb::GetUpdatesCallerInfo::FIRST_UPDATE; 411 updates_source = sync_pb::GetUpdatesCallerInfo::FIRST_UPDATE;
396 *initial_sync = false; 412 *initial_sync = false;
397 } else if (!nudged) { 413 } else if (!nudged) {
398 updates_source = sync_pb::GetUpdatesCallerInfo::PERIODIC; 414 updates_source = sync_pb::GetUpdatesCallerInfo::PERIODIC;
399 } else { 415 } else {
400 switch (nudge_source) { 416 switch (nudge_source) {
401 case kNotification: 417 case SyncerThread::kNotification:
402 updates_source = sync_pb::GetUpdatesCallerInfo::NOTIFICATION; 418 updates_source = sync_pb::GetUpdatesCallerInfo::NOTIFICATION;
403 break; 419 break;
404 case kLocal: 420 case SyncerThread::kLocal:
405 updates_source = sync_pb::GetUpdatesCallerInfo::LOCAL; 421 updates_source = sync_pb::GetUpdatesCallerInfo::LOCAL;
406 break; 422 break;
407 case kContinuation: 423 case SyncerThread::kContinuation:
408 updates_source = sync_pb::GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION; 424 updates_source = sync_pb::GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION;
409 break; 425 break;
410 case kUnknown: 426 case SyncerThread::kUnknown:
411 default: 427 default:
412 updates_source = sync_pb::GetUpdatesCallerInfo::UNKNOWN; 428 updates_source = sync_pb::GetUpdatesCallerInfo::UNKNOWN;
413 break; 429 break;
414 } 430 }
415 } 431 }
416 syncer_->set_updates_source(updates_source); 432 syncer_->set_updates_source(updates_source);
417 } 433 }
418 434
419 void SyncerThread::HandleSyncerEvent(const SyncerEvent& event) { 435 void SyncerThreadPthreadImpl::HandleSyncerEvent(const SyncerEvent& event) {
420 MutexLock lock(&mutex_); 436 MutexLock lock(&mutex_);
421 channel()->NotifyListeners(event); 437 channel()->NotifyListeners(event);
422 if (SyncerEvent::REQUEST_SYNC_NUDGE != event.what_happened) { 438 if (SyncerEvent::REQUEST_SYNC_NUDGE != event.what_happened) {
423 return; 439 return;
424 } 440 }
425 NudgeSyncImpl(event.nudge_delay_milliseconds, kUnknown); 441 NudgeSyncImpl(event.nudge_delay_milliseconds, SyncerThread::kUnknown);
426 } 442 }
427 443
428 void SyncerThread::HandleDirectoryManagerEvent( 444 void SyncerThreadPthreadImpl::HandleDirectoryManagerEvent(
429 const syncable::DirectoryManagerEvent& event) { 445 const syncable::DirectoryManagerEvent& event) {
430 LOG(INFO) << "Handling a directory manager event"; 446 LOG(INFO) << "Handling a directory manager event";
431 if (syncable::DirectoryManagerEvent::OPENED == event.what_happened) { 447 if (syncable::DirectoryManagerEvent::OPENED == event.what_happened) {
432 MutexLock lock(&mutex_); 448 MutexLock lock(&mutex_);
433 LOG(INFO) << "Syncer starting up for: " << event.dirname; 449 LOG(INFO) << "Syncer starting up for: " << event.dirname;
434 // The underlying database structure is ready, and we should create 450 // The underlying database structure is ready, and we should create
435 // the syncer. 451 // the syncer.
436 CHECK(syncer_ == NULL); 452 CHECK(syncer_ == NULL);
437 syncer_ = 453 syncer_ =
438 new Syncer(dirman_, event.dirname, scm_, model_safe_worker_.get()); 454 new Syncer(dirman_, event.dirname, scm_, model_safe_worker_.get());
439 455
440 syncer_->set_command_channel(command_channel_); 456 syncer_->set_command_channel(command_channel_);
441 syncer_events_.reset(NewEventListenerHookup( 457 syncer_events_.reset(NewEventListenerHookup(
442 syncer_->channel(), this, &SyncerThread::HandleSyncerEvent)); 458 syncer_->channel(), this, &SyncerThreadPthreadImpl::HandleSyncerEvent));
443 pthread_cond_broadcast(&changed_.condvar_); 459 pthread_cond_broadcast(&changed_.condvar_);
444 } 460 }
445 } 461 }
446 462
447 static inline void CheckConnected(bool* connected, 463 static inline void CheckConnected(bool* connected,
448 HttpResponse::ServerConnectionCode code, 464 HttpResponse::ServerConnectionCode code,
449 pthread_cond_t* condvar) { 465 pthread_cond_t* condvar) {
450 if (*connected) { 466 if (*connected) {
451 if (HttpResponse::CONNECTION_UNAVAILABLE == code) { 467 if (HttpResponse::CONNECTION_UNAVAILABLE == code) {
452 *connected = false; 468 *connected = false;
453 pthread_cond_broadcast(condvar); 469 pthread_cond_broadcast(condvar);
454 } 470 }
455 } else { 471 } else {
456 if (HttpResponse::SERVER_CONNECTION_OK == code) { 472 if (HttpResponse::SERVER_CONNECTION_OK == code) {
457 *connected = true; 473 *connected = true;
458 pthread_cond_broadcast(condvar); 474 pthread_cond_broadcast(condvar);
459 } 475 }
460 } 476 }
461 } 477 }
462 478
463 void SyncerThread::WatchConnectionManager(ServerConnectionManager* conn_mgr) { 479 void SyncerThreadPthreadImpl::WatchConnectionManager(
480 ServerConnectionManager* conn_mgr) {
464 conn_mgr_hookup_.reset(NewEventListenerHookup(conn_mgr->channel(), this, 481 conn_mgr_hookup_.reset(NewEventListenerHookup(conn_mgr->channel(), this,
465 &SyncerThread::HandleServerConnectionEvent)); 482 &SyncerThreadPthreadImpl::HandleServerConnectionEvent));
466 CheckConnected(&connected_, conn_mgr->server_status(), 483 CheckConnected(&connected_, conn_mgr->server_status(),
467 &changed_.condvar_); 484 &changed_.condvar_);
468 } 485 }
469 486
470 void SyncerThread::HandleServerConnectionEvent( 487 void SyncerThreadPthreadImpl::HandleServerConnectionEvent(
471 const ServerConnectionEvent& event) { 488 const ServerConnectionEvent& event) {
472 if (ServerConnectionEvent::STATUS_CHANGED == event.what_happened) { 489 if (ServerConnectionEvent::STATUS_CHANGED == event.what_happened) {
473 MutexLock lock(&mutex_); 490 MutexLock lock(&mutex_);
474 CheckConnected(&connected_, event.connection_code, 491 CheckConnected(&connected_, event.connection_code,
475 &changed_.condvar_); 492 &changed_.condvar_);
476 } 493 }
477 } 494 }
478 495
479 SyncerEventChannel* SyncerThread::channel() { 496 SyncerEventChannel* SyncerThreadPthreadImpl::channel() {
480 return syncer_event_channel_.get(); 497 return syncer_event_channel_.get();
481 } 498 }
482 499
483 // Inputs and return value in milliseconds. 500 // Inputs and return value in milliseconds.
484 int SyncerThread::CalculateSyncWaitTime(int last_interval, int user_idle_ms) { 501 int SyncerThreadPthreadImpl::CalculateSyncWaitTime(int last_interval,
502 int user_idle_ms) {
485 // syncer_polling_interval_ is in seconds 503 // syncer_polling_interval_ is in seconds
486 int syncer_polling_interval_ms = syncer_polling_interval_ * 1000; 504 int syncer_polling_interval_ms = syncer_polling_interval_ * 1000;
487 505
488 // This is our default and lower bound. 506 // This is our default and lower bound.
489 int next_wait = syncer_polling_interval_ms; 507 int next_wait = syncer_polling_interval_ms;
490 508
491 // Get idle time, bounded by max wait. 509 // Get idle time, bounded by max wait.
492 int idle = min(user_idle_ms, syncer_max_interval_); 510 int idle = min(user_idle_ms, syncer_max_interval_);
493 511
494 // If the user has been idle for a while, we'll start decreasing the poll 512 // If the user has been idle for a while, we'll start decreasing the poll
495 // rate. 513 // rate.
496 if (idle >= kPollBackoffThresholdMultiplier * syncer_polling_interval_ms) { 514 if (idle >= kPollBackoffThresholdMultiplier * syncer_polling_interval_ms) {
497 next_wait = std::min(AllStatus::GetRecommendedDelaySeconds( 515 next_wait = std::min(AllStatus::GetRecommendedDelaySeconds(
498 last_interval / 1000), syncer_max_interval_ / 1000) * 1000; 516 last_interval / 1000), syncer_max_interval_ / 1000) * 1000;
499 } 517 }
500 518
501 return next_wait; 519 return next_wait;
502 } 520 }
503 521
504 // Called with mutex_ already locked. 522 // Called with mutex_ already locked.
505 void SyncerThread::NudgeSyncImpl(int milliseconds_from_now, 523 void SyncerThreadPthreadImpl::NudgeSyncImpl(int milliseconds_from_now,
506 NudgeSource source) { 524 SyncerThread::NudgeSource source) {
507 const timespec nudge_time = GetPThreadAbsoluteTime(milliseconds_from_now); 525 const timespec nudge_time = GetPThreadAbsoluteTime(milliseconds_from_now);
508 NudgeObject nudge_object(nudge_time, source); 526 NudgeObject nudge_object(nudge_time, source);
509 nudge_queue_.push(nudge_object); 527 nudge_queue_.push(nudge_object);
510 pthread_cond_broadcast(&changed_.condvar_); 528 pthread_cond_broadcast(&changed_.condvar_);
511 } 529 }
512 530
513 void SyncerThread::WatchTalkMediator(TalkMediator* mediator) { 531 void SyncerThreadPthreadImpl::WatchTalkMediator(TalkMediator* mediator) {
514 talk_mediator_hookup_.reset( 532 talk_mediator_hookup_.reset(
515 NewEventListenerHookup( 533 NewEventListenerHookup(
516 mediator->channel(), 534 mediator->channel(),
517 this, 535 this,
518 &SyncerThread::HandleTalkMediatorEvent)); 536 &SyncerThreadPthreadImpl::HandleTalkMediatorEvent));
519 } 537 }
520 538
521 void SyncerThread::HandleTalkMediatorEvent(const TalkMediatorEvent& event) { 539 void SyncerThreadPthreadImpl::HandleTalkMediatorEvent(
540 const TalkMediatorEvent& event) {
522 MutexLock lock(&mutex_); 541 MutexLock lock(&mutex_);
523 switch (event.what_happened) { 542 switch (event.what_happened) {
524 case TalkMediatorEvent::LOGIN_SUCCEEDED: 543 case TalkMediatorEvent::LOGIN_SUCCEEDED:
525 LOG(INFO) << "P2P: Login succeeded."; 544 LOG(INFO) << "P2P: Login succeeded.";
526 p2p_authenticated_ = true; 545 p2p_authenticated_ = true;
527 break; 546 break;
528 case TalkMediatorEvent::LOGOUT_SUCCEEDED: 547 case TalkMediatorEvent::LOGOUT_SUCCEEDED:
529 LOG(INFO) << "P2P: Login succeeded."; 548 LOG(INFO) << "P2P: Login succeeded.";
530 p2p_authenticated_ = false; 549 p2p_authenticated_ = false;
531 break; 550 break;
532 case TalkMediatorEvent::SUBSCRIPTIONS_ON: 551 case TalkMediatorEvent::SUBSCRIPTIONS_ON:
533 LOG(INFO) << "P2P: Subscriptions successfully enabled."; 552 LOG(INFO) << "P2P: Subscriptions successfully enabled.";
534 p2p_subscribed_ = true; 553 p2p_subscribed_ = true;
535 if (NULL != syncer_) { 554 if (NULL != syncer_) {
536 LOG(INFO) << "Subscriptions on. Nudging syncer for initial push."; 555 LOG(INFO) << "Subscriptions on. Nudging syncer for initial push.";
537 NudgeSyncImpl(0, kLocal); 556 NudgeSyncImpl(0, SyncerThread::kLocal);
538 } 557 }
539 break; 558 break;
540 case TalkMediatorEvent::SUBSCRIPTIONS_OFF: 559 case TalkMediatorEvent::SUBSCRIPTIONS_OFF:
541 LOG(INFO) << "P2P: Subscriptions are not enabled."; 560 LOG(INFO) << "P2P: Subscriptions are not enabled.";
542 p2p_subscribed_ = false; 561 p2p_subscribed_ = false;
543 break; 562 break;
544 case TalkMediatorEvent::NOTIFICATION_RECEIVED: 563 case TalkMediatorEvent::NOTIFICATION_RECEIVED:
545 LOG(INFO) << "P2P: Updates on server, pushing syncer"; 564 LOG(INFO) << "P2P: Updates on server, pushing syncer";
546 if (NULL != syncer_) { 565 if (NULL != syncer_) {
547 NudgeSyncImpl(0, kNotification); 566 NudgeSyncImpl(0, SyncerThread::kNotification);
548 } 567 }
549 break; 568 break;
550 default: 569 default:
551 break; 570 break;
552 } 571 }
553 572
554 if (NULL != syncer_) { 573 if (NULL != syncer_) {
555 syncer_->set_notifications_enabled(p2p_authenticated_ && p2p_subscribed_); 574 syncer_->set_notifications_enabled(p2p_authenticated_ && p2p_subscribed_);
556 } 575 }
557 } 576 }
558 577
559 } // namespace browser_sync 578 } // namespace browser_sync
OLDNEW
« no previous file with comments | « chrome/browser/sync/engine/syncer_thread_pthreads.h ('k') | chrome/browser/sync/engine/syncer_thread_timed_stop.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698