Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(83)

Side by Side Diff: chrome/browser/sync/engine/syncer_thread.cc

Issue 214033: Use chrome/base synchronization primitives and threads instead of... (Closed) Base URL: svn://chrome-svn/chrome/trunk/src/
Patch Set: '' Created 11 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2009 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2009 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4
5 #include "chrome/browser/sync/engine/syncer_thread.h" 4 #include "chrome/browser/sync/engine/syncer_thread.h"
6 5
7 #include "build/build_config.h" 6 #include "build/build_config.h"
8 7
9 #ifdef OS_MACOSX 8 #ifdef OS_MACOSX
10 #include <CoreFoundation/CFNumber.h> 9 #include <CoreFoundation/CFNumber.h>
11 #include <IOKit/IOTypes.h> 10 #include <IOKit/IOTypes.h>
12 #include <IOKit/IOKitLib.h> 11 #include <IOKit/IOKitLib.h>
13 #endif 12 #endif
14 13
15 #include <algorithm> 14 #include <algorithm>
16 #include <map> 15 #include <map>
17 #include <queue> 16 #include <queue>
18 17
18 #include "base/command_line.h"
19 #include "chrome/browser/sync/engine/auth_watcher.h" 19 #include "chrome/browser/sync/engine/auth_watcher.h"
20 #include "chrome/browser/sync/engine/model_safe_worker.h" 20 #include "chrome/browser/sync/engine/model_safe_worker.h"
21 #include "chrome/browser/sync/engine/net/server_connection_manager.h" 21 #include "chrome/browser/sync/engine/net/server_connection_manager.h"
22 #include "chrome/browser/sync/engine/syncer.h" 22 #include "chrome/browser/sync/engine/syncer.h"
23 #include "chrome/browser/sync/engine/syncer_thread_pthreads.h"
24 #include "chrome/browser/sync/engine/syncer_thread_timed_stop.h"
23 #include "chrome/browser/sync/notifier/listener/talk_mediator.h" 25 #include "chrome/browser/sync/notifier/listener/talk_mediator.h"
24 #include "chrome/browser/sync/notifier/listener/talk_mediator_impl.h" 26 #include "chrome/browser/sync/notifier/listener/talk_mediator_impl.h"
25 #include "chrome/browser/sync/syncable/directory_manager.h" 27 #include "chrome/browser/sync/syncable/directory_manager.h"
28 #include "chrome/common/chrome_switches.h"
26 29
27 using std::priority_queue; 30 using std::priority_queue;
28 using std::min; 31 using std::min;
29 32 using base::Time;
30 static inline bool operator < (const timespec& a, const timespec& b) { 33 using base::TimeDelta;
31 return a.tv_sec == b.tv_sec ? a.tv_nsec < b.tv_nsec : a.tv_sec < b.tv_sec; 34 using base::TimeTicks;
32 }
33 35
34 namespace { 36 namespace {
35 37
36 // Returns the amount of time since the user last interacted with the computer, 38 // Returns the amount of time since the user last interacted with the computer,
37 // in milliseconds 39 // in milliseconds
38 int UserIdleTime() { 40 int UserIdleTime() {
39 #ifdef OS_WIN 41 #ifdef OS_WIN
40 LASTINPUTINFO last_input_info; 42 LASTINPUTINFO last_input_info;
41 last_input_info.cbSize = sizeof(LASTINPUTINFO); 43 last_input_info.cbSize = sizeof(LASTINPUTINFO);
42 44
(...skipping 58 matching lines...) Expand 10 before | Expand all | Expand 10 after
101 } 103 }
102 #endif 104 #endif
103 105
104 return 0; 106 return 0;
105 } 107 }
106 108
107 } // namespace 109 } // namespace
108 110
109 namespace browser_sync { 111 namespace browser_sync {
110 112
113 SyncerThread* SyncerThreadFactory::Create(
114 ClientCommandChannel* command_channel,
115 syncable::DirectoryManager* mgr,
116 ServerConnectionManager* connection_manager, AllStatus* all_status,
117 ModelSafeWorker* model_safe_worker) {
118 const CommandLine* cmd = CommandLine::ForCurrentProcess();
119 if (cmd->HasSwitch(switches::kSyncerThreadTimedStop)) {
120 return new SyncerThreadTimedStop(command_channel, mgr, connection_manager,
121 all_status, model_safe_worker);
122 } else if (cmd->HasSwitch(switches::kSyncerThreadPthreads)) {
123 return new SyncerThreadPthreads(command_channel, mgr, connection_manager,
124 all_status, model_safe_worker);
125 } else {
126 // The default SyncerThread implementation, which does not time-out when
127 // Stop is called.
128 return new SyncerThread(command_channel, mgr, connection_manager,
129 all_status, model_safe_worker);
130 }
131 }
132
111 bool SyncerThread::NudgeSyncer(int milliseconds_from_now, NudgeSource source) { 133 bool SyncerThread::NudgeSyncer(int milliseconds_from_now, NudgeSource source) {
112 MutexLock lock(&mutex_); 134 AutoLock lock(lock_);
113 if (syncer_ == NULL) { 135 if (vault_.syncer_ == NULL) {
114 return false; 136 return false;
115 } 137 }
116 NudgeSyncImpl(milliseconds_from_now, source); 138 NudgeSyncImpl(milliseconds_from_now, source);
117 return true; 139 return true;
118 } 140 }
119 141
120 void* RunSyncerThread(void* syncer_thread) { 142 SyncerThread::SyncerThread()
121 return (reinterpret_cast<SyncerThread*>(syncer_thread))->ThreadMain(); 143 : thread_main_started_(false, false),
144 thread_("SyncEngine_SyncerThread"),
145 vault_field_changed_(&lock_),
146 p2p_authenticated_(false),
147 p2p_subscribed_(false),
148 client_command_hookup_(NULL),
149 conn_mgr_hookup_(NULL),
150 allstatus_(NULL),
151 dirman_(NULL),
152 scm_(NULL),
153 syncer_short_poll_interval_seconds_(kDefaultShortPollIntervalSeconds),
154 syncer_long_poll_interval_seconds_(kDefaultLongPollIntervalSeconds),
155 syncer_polling_interval_(kDefaultShortPollIntervalSeconds),
156 syncer_max_interval_(kDefaultMaxPollIntervalMs),
157 talk_mediator_hookup_(NULL),
158 command_channel_(NULL),
159 directory_manager_hookup_(NULL),
160 syncer_events_(NULL),
161 model_safe_worker_(NULL),
162 disable_idle_detection_(false) {
122 } 163 }
123 164
124 SyncerThread::SyncerThread( 165 SyncerThread::SyncerThread(
125 ClientCommandChannel* command_channel, 166 ClientCommandChannel* command_channel,
126 syncable::DirectoryManager* mgr, 167 syncable::DirectoryManager* mgr,
127 ServerConnectionManager* connection_manager, 168 ServerConnectionManager* connection_manager,
128 AllStatus* all_status, 169 AllStatus* all_status,
129 ModelSafeWorker* model_safe_worker) 170 ModelSafeWorker* model_safe_worker)
130 : dirman_(mgr), scm_(connection_manager), 171 : thread_main_started_(false, false),
131 syncer_(NULL), syncer_events_(NULL), thread_running_(false), 172 thread_("SyncEngine_SyncerThread"),
173 vault_field_changed_(&lock_),
174 p2p_authenticated_(false),
175 p2p_subscribed_(false),
176 client_command_hookup_(NULL),
177 conn_mgr_hookup_(NULL),
178 allstatus_(all_status),
179 dirman_(mgr),
180 scm_(connection_manager),
132 syncer_short_poll_interval_seconds_(kDefaultShortPollIntervalSeconds), 181 syncer_short_poll_interval_seconds_(kDefaultShortPollIntervalSeconds),
133 syncer_long_poll_interval_seconds_(kDefaultLongPollIntervalSeconds), 182 syncer_long_poll_interval_seconds_(kDefaultLongPollIntervalSeconds),
134 syncer_polling_interval_(kDefaultShortPollIntervalSeconds), 183 syncer_polling_interval_(kDefaultShortPollIntervalSeconds),
135 syncer_max_interval_(kDefaultMaxPollIntervalMs), 184 syncer_max_interval_(kDefaultMaxPollIntervalMs),
136 stop_syncer_thread_(false), connected_(false), conn_mgr_hookup_(NULL), 185 talk_mediator_hookup_(NULL),
137 p2p_authenticated_(false), p2p_subscribed_(false), 186 command_channel_(command_channel),
138 allstatus_(all_status), talk_mediator_hookup_(NULL), 187 directory_manager_hookup_(NULL),
139 command_channel_(command_channel), directory_manager_hookup_(NULL), 188 syncer_events_(NULL),
140 model_safe_worker_(model_safe_worker), 189 model_safe_worker_(model_safe_worker),
141 client_command_hookup_(NULL), disable_idle_detection_(false) { 190 disable_idle_detection_(false) {
142 191
143 SyncerEvent shutdown = { SyncerEvent::SHUTDOWN_USE_WITH_CARE }; 192 SyncerEvent shutdown = { SyncerEvent::SHUTDOWN_USE_WITH_CARE };
144 syncer_event_channel_.reset(new SyncerEventChannel(shutdown)); 193 syncer_event_channel_.reset(new SyncerEventChannel(shutdown));
145 194
146 if (dirman_) { 195 if (dirman_) {
147 directory_manager_hookup_.reset(NewEventListenerHookup( 196 directory_manager_hookup_.reset(NewEventListenerHookup(
148 dirman_->channel(), this, &SyncerThread::HandleDirectoryManagerEvent)); 197 dirman_->channel(), this, &SyncerThread::HandleDirectoryManagerEvent));
149 } 198 }
150 199
151 if (scm_) { 200 if (scm_) {
152 WatchConnectionManager(scm_); 201 WatchConnectionManager(scm_);
153 } 202 }
154 203
155 if (command_channel_) { 204 if (command_channel_) {
156 WatchClientCommands(command_channel_); 205 WatchClientCommands(command_channel_);
157 } 206 }
158 } 207 }
159 208
160 SyncerThread::~SyncerThread() { 209 SyncerThread::~SyncerThread() {
161 client_command_hookup_.reset(); 210 client_command_hookup_.reset();
162 conn_mgr_hookup_.reset(); 211 conn_mgr_hookup_.reset();
163 syncer_event_channel_.reset(); 212 syncer_event_channel_.reset();
164 directory_manager_hookup_.reset(); 213 directory_manager_hookup_.reset();
165 syncer_events_.reset(); 214 syncer_events_.reset();
166 delete syncer_; 215 delete vault_.syncer_;
167 talk_mediator_hookup_.reset(); 216 talk_mediator_hookup_.reset();
168 CHECK(!thread_running_); 217 CHECK(!thread_.IsRunning());
169 } 218 }
170 219
171 // Creates and starts a syncer thread. 220 // Creates and starts a syncer thread.
172 // Returns true if it creates a thread or if there's currently a thread running 221 // Returns true if it creates a thread or if there's currently a thread running
173 // and false otherwise. 222 // and false otherwise.
174 bool SyncerThread::Start() { 223 bool SyncerThread::Start() {
175 MutexLock lock(&mutex_); 224 {
176 if (thread_running_) { 225 AutoLock lock(lock_);
177 return true; 226 if (thread_.IsRunning()) {
227 return true;
228 }
229
230 if (!thread_.Start()) {
231 return false;
232 }
178 } 233 }
179 thread_running_ = 234
180 (0 == pthread_create(&thread_, NULL, RunSyncerThread, this)); 235 thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod(this,
181 if (thread_running_) { 236 &SyncerThread::ThreadMain));
182 pthread_detach(thread_); 237
183 } 238 // Wait for notification that our task makes it safely onto the message
184 return thread_running_; 239 // loop before returning, so the caller can't call Stop before we're
240 // actually up and running. This is for consistency with the old pthread
241 // impl because pthread_create would do this in one step.
242 thread_main_started_.Wait();
243 LOG(INFO) << "SyncerThread started.";
244 return true;
185 } 245 }
186 246
187 // Stop processing. A max wait of at least 2*server RTT time is recommended. 247 // Stop processing. A max wait of at least 2*server RTT time is recommended.
188 // Returns true if we stopped, false otherwise. 248 // Returns true if we stopped, false otherwise.
189 bool SyncerThread::Stop(int max_wait) { 249 bool SyncerThread::Stop(int max_wait) {
190 MutexLock lock(&mutex_); 250 {
191 if (!thread_running_) 251 AutoLock lock(lock_);
192 return true; 252 // If the thread has been started, then we either already have or are about
193 stop_syncer_thread_ = true; 253 // to enter ThreadMainLoop so we have to proceed with shutdown and wait for
194 if (NULL != syncer_) { 254 // it to finish. If the thread has not been started --and we now own the
195 // Try to early exit the syncer. 255 // lock-- then we can early out because the caller has not called Start().
196 syncer_->RequestEarlyExit(); 256 if (!thread_.IsRunning())
257 return true;
258
259 LOG(INFO) << "SyncerThread::Stop - setting ThreadMain exit condition to "
260 << "true (vault_.stop_syncer_thread_)";
261 // Exit the ThreadMainLoop once the syncer finishes (we tell it to exit
262 // below).
263 vault_.stop_syncer_thread_ = true;
264 if (NULL != vault_.syncer_) {
265 // Try to early exit the syncer itself, which could be looping inside
266 // SyncShare.
267 vault_.syncer_->RequestEarlyExit();
268 }
269
270 // stop_syncer_thread_ is now true and the Syncer has been told to exit.
271 // We want to wake up all waiters so they can re-examine state. We signal,
272 // causing all waiters to try to re-acquire the lock, and then we release
273 // the lock, and join on our internal thread which should soon run off the
274 // end of ThreadMain.
275 vault_field_changed_.Broadcast();
197 } 276 }
198 pthread_cond_broadcast(&changed_.condvar_); 277
199 timespec deadline = { time(NULL) + (max_wait / 1000), 0 }; 278 // This will join, and finish when ThreadMain terminates.
200 do { 279 thread_.Stop();
201 const int wait_result = max_wait < 0 ?
202 pthread_cond_wait(&changed_.condvar_, &mutex_.mutex_) :
203 pthread_cond_timedwait(&changed_.condvar_, &mutex_.mutex_,
204 &deadline);
205 if (ETIMEDOUT == wait_result) {
206 LOG(ERROR) << "SyncerThread::Stop timed out. Problems likely.";
207 return false;
208 }
209 } while (thread_running_);
210 return true; 280 return true;
211 } 281 }
212 282
213 void SyncerThread::WatchClientCommands(ClientCommandChannel* channel) { 283 void SyncerThread::WatchClientCommands(ClientCommandChannel* channel) {
214 PThreadScopedLock<PThreadMutex> lock(&mutex_); 284 AutoLock lock(lock_);
215 client_command_hookup_.reset(NewEventListenerHookup(channel, this, 285 client_command_hookup_.reset(NewEventListenerHookup(channel, this,
216 &SyncerThread::HandleClientCommand)); 286 &SyncerThread::HandleClientCommand));
217 } 287 }
218 288
219 void SyncerThread::HandleClientCommand(ClientCommandChannel::EventType event) { 289 void SyncerThread::HandleClientCommand(ClientCommandChannel::EventType event) {
220 if (!event) { 290 if (!event) {
221 return; 291 return;
222 } 292 }
223 293
224 // Mutex not really necessary for these. 294 // Mutex not really necessary for these.
225 if (event->has_set_sync_poll_interval()) { 295 if (event->has_set_sync_poll_interval()) {
226 syncer_short_poll_interval_seconds_ = event->set_sync_poll_interval(); 296 syncer_short_poll_interval_seconds_ = event->set_sync_poll_interval();
227 } 297 }
228 298
229 if (event->has_set_sync_long_poll_interval()) { 299 if (event->has_set_sync_long_poll_interval()) {
230 syncer_long_poll_interval_seconds_ = event->set_sync_long_poll_interval(); 300 syncer_long_poll_interval_seconds_ = event->set_sync_long_poll_interval();
231 } 301 }
232 } 302 }
233 303
234 void SyncerThread::ThreadMainLoop() { 304 void SyncerThread::ThreadMainLoop() {
305 // This is called with lock_ acquired.
306 lock_.AssertAcquired();
307 LOG(INFO) << "In thread main loop.";
308
235 // Use the short poll value by default. 309 // Use the short poll value by default.
236 int poll_seconds = syncer_short_poll_interval_seconds_; 310 TimeDelta poll_seconds =
311 TimeDelta::FromSeconds(syncer_short_poll_interval_seconds_);
237 int user_idle_milliseconds = 0; 312 int user_idle_milliseconds = 0;
238 timespec last_sync_time = { 0 }; 313 TimeTicks last_sync_time;
239 bool initial_sync_for_thread = true; 314 bool initial_sync_for_thread = true;
240 bool continue_sync_cycle = false; 315 bool continue_sync_cycle = false;
241 316
242 while (!stop_syncer_thread_) { 317 while (!vault_.stop_syncer_thread_) {
243 if (!connected_) { 318 // The Wait()s in these conditionals using |vault_| are not TimedWait()s (as
319 // below) because we cannot poll until these conditions are met, so we wait
320 // indefinitely.
321 if (!vault_.connected_) {
244 LOG(INFO) << "Syncer thread waiting for connection."; 322 LOG(INFO) << "Syncer thread waiting for connection.";
245 while (!connected_ && !stop_syncer_thread_) 323 while (!vault_.connected_ && !vault_.stop_syncer_thread_)
246 pthread_cond_wait(&changed_.condvar_, &mutex_.mutex_); 324 vault_field_changed_.Wait();
247 LOG_IF(INFO, connected_) << "Syncer thread found connection."; 325 LOG_IF(INFO, vault_.connected_) << "Syncer thread found connection.";
248 continue; 326 continue;
249 } 327 }
250 328
251 if (syncer_ == NULL) { 329 if (vault_.syncer_ == NULL) {
252 LOG(INFO) << "Syncer thread waiting for database initialization."; 330 LOG(INFO) << "Syncer thread waiting for database initialization.";
253 while (syncer_ == NULL && !stop_syncer_thread_) 331 while (vault_.syncer_ == NULL && !vault_.stop_syncer_thread_)
254 pthread_cond_wait(&changed_.condvar_, &mutex_.mutex_); 332 vault_field_changed_.Wait();
255 LOG_IF(INFO, !(syncer_ == NULL)) << "Syncer was found after DB started."; 333 LOG_IF(INFO, !(vault_.syncer_ == NULL))
334 << "Syncer was found after DB started.";
256 continue; 335 continue;
257 } 336 }
258 337
259 timespec const next_poll = { last_sync_time.tv_sec + poll_seconds, 338 const TimeTicks next_poll = last_sync_time + poll_seconds;
260 last_sync_time.tv_nsec }; 339 const TimeTicks end_wait =
261 const timespec wake_time = 340 !vault_.nudge_queue_.empty() &&
262 !nudge_queue_.empty() && nudge_queue_.top().first < next_poll ? 341 vault_.nudge_queue_.top().first < next_poll ?
263 nudge_queue_.top().first : next_poll; 342 vault_.nudge_queue_.top().first : next_poll;
264 LOG(INFO) << "wake time is " << wake_time.tv_sec; 343 LOG(INFO) << "end_wait is " << end_wait.ToInternalValue();
265 LOG(INFO) << "next poll is " << next_poll.tv_sec; 344 LOG(INFO) << "next_poll is " << next_poll.ToInternalValue();
266 345
267 const int error = pthread_cond_timedwait(&changed_.condvar_, &mutex_.mutex_, 346 // We block until the CV is signaled (e.g a control field changed, loss of
268 &wake_time); 347 // network connection, nudge, spurious, etc), or the poll interval elapses.
269 if (ETIMEDOUT != error) { 348 TimeDelta sleep_time = end_wait - TimeTicks::Now();
270 continue; // Check all the conditions again. 349 if (sleep_time > TimeDelta::FromSeconds(0)) {
350 vault_field_changed_.TimedWait(sleep_time);
351
352 if (TimeTicks::Now() < end_wait) {
353 // Didn't timeout. Could be a spurious signal, or a signal corresponding
354 // to an actual change in one of our control fields. By continuing here
355 // we perform the typical "always recheck conditions when signaled",
356 // (typically handled by a while(condition_not_met) cv.wait() construct)
357 // because we jump to the top of the loop. The main difference is we
358 // recalculate the wait interval, but last_sync_time won't have changed.
359 // So if we were signaled by a nudge (for ex.) we'll grab the new nudge
360 // off the queue and wait for that delta. If it was a spurious signal,
361 // we'll keep waiting for the same moment in time as we just were.
362 continue;
363 }
271 } 364 }
272 365
273 const timespec now = GetPThreadAbsoluteTime(0);
274
275 // Handle a nudge, caused by either a notification or a local bookmark 366 // Handle a nudge, caused by either a notification or a local bookmark
276 // event. This will also update the source of the following SyncMain call. 367 // event. This will also update the source of the following SyncMain call.
277 UpdateNudgeSource(now, &continue_sync_cycle, &initial_sync_for_thread); 368 UpdateNudgeSource(&continue_sync_cycle, &initial_sync_for_thread);
278 369
279 LOG(INFO) << "Calling Sync Main at time " << now.tv_sec; 370 LOG(INFO) << "Calling Sync Main at time " << Time::Now().ToInternalValue();
280 SyncMain(syncer_); 371 SyncMain(vault_.syncer_);
281 last_sync_time = now; 372 last_sync_time = TimeTicks::Now();
282 373
283 LOG(INFO) << "Updating the next polling time after SyncMain"; 374 LOG(INFO) << "Updating the next polling time after SyncMain";
284 poll_seconds = CalculatePollingWaitTime(allstatus_->status(), 375 poll_seconds = TimeDelta::FromSeconds(CalculatePollingWaitTime(
285 poll_seconds, 376 allstatus_->status(), static_cast<int>(poll_seconds.InSeconds()),
286 &user_idle_milliseconds, 377 &user_idle_milliseconds, &continue_sync_cycle));
287 &continue_sync_cycle);
288 } 378 }
379
289 } 380 }
290 381
291 // We check how long the user's been idle and sync less often if the machine is 382 // We check how long the user's been idle and sync less often if the machine is
292 // not in use. The aim is to reduce server load. 383 // not in use. The aim is to reduce server load.
384 // TODO(timsteele): Should use Time(Delta).
293 int SyncerThread::CalculatePollingWaitTime( 385 int SyncerThread::CalculatePollingWaitTime(
294 const AllStatus::Status& status, 386 const AllStatus::Status& status,
295 int last_poll_wait, // in s 387 int last_poll_wait, // Time in seconds.
296 int* user_idle_milliseconds, 388 int* user_idle_milliseconds,
297 bool* continue_sync_cycle) { 389 bool* continue_sync_cycle) {
298 bool is_continuing_sync_cyle = *continue_sync_cycle; 390 bool is_continuing_sync_cyle = *continue_sync_cycle;
299 *continue_sync_cycle = false; 391 *continue_sync_cycle = false;
300 392
301 // Determine if the syncer has unfinished work to do from allstatus_. 393 // Determine if the syncer has unfinished work to do from allstatus_.
302 const bool syncer_has_work_to_do = 394 const bool syncer_has_work_to_do =
303 status.updates_available > status.updates_received 395 status.updates_available > status.updates_received
304 || status.unsynced_count > 0; 396 || status.unsynced_count > 0;
305 LOG(INFO) << "syncer_has_work_to_do is " << syncer_has_work_to_do; 397 LOG(INFO) << "syncer_has_work_to_do is " << syncer_has_work_to_do;
(...skipping 30 matching lines...) Expand all
336 *user_idle_milliseconds) / 1000; 428 *user_idle_milliseconds) / 1000;
337 DCHECK_GE(actual_next_wait, default_next_wait); 429 DCHECK_GE(actual_next_wait, default_next_wait);
338 } 430 }
339 431
340 LOG(INFO) << "Sync wait: idle " << default_next_wait 432 LOG(INFO) << "Sync wait: idle " << default_next_wait
341 << " non-idle or backoff " << actual_next_wait << "."; 433 << " non-idle or backoff " << actual_next_wait << ".";
342 434
343 return actual_next_wait; 435 return actual_next_wait;
344 } 436 }
345 437
346 void* SyncerThread::ThreadMain() { 438 void SyncerThread::ThreadMain() {
347 NameCurrentThreadForDebugging("SyncEngine_SyncerThread"); 439 AutoLock lock(lock_);
348 mutex_.Lock(); 440 // Signal Start() to let it know we've made it safely onto the message loop,
441 // and unblock it's caller.
442 thread_main_started_.Signal();
349 ThreadMainLoop(); 443 ThreadMainLoop();
350 thread_running_ = false; 444 LOG(INFO) << "Syncer thread ThreadMain is done.";
351 pthread_cond_broadcast(&changed_.condvar_);
352 mutex_.Unlock();
353 LOG(INFO) << "Syncer thread exiting.";
354 return 0;
355 } 445 }
356 446
357 void SyncerThread::SyncMain(Syncer* syncer) { 447 void SyncerThread::SyncMain(Syncer* syncer) {
358 CHECK(syncer); 448 CHECK(syncer);
359 mutex_.Unlock(); 449 AutoUnlock unlock(lock_);
360 while (syncer->SyncShare()) { 450 while (syncer->SyncShare()) {
361 LOG(INFO) << "Looping in sync share"; 451 LOG(INFO) << "Looping in sync share";
362 } 452 }
363 LOG(INFO) << "Done looping in sync share"; 453 LOG(INFO) << "Done looping in sync share";
364
365 mutex_.Lock();
366 } 454 }
367 455
368 void SyncerThread::UpdateNudgeSource(const timespec& now, 456 void SyncerThread::UpdateNudgeSource(bool* continue_sync_cycle,
369 bool* continue_sync_cycle,
370 bool* initial_sync) { 457 bool* initial_sync) {
371 bool nudged = false; 458 bool nudged = false;
372 NudgeSource nudge_source = kUnknown; 459 NudgeSource nudge_source = kUnknown;
373 // Has the previous sync cycle completed? 460 // Has the previous sync cycle completed?
374 if (continue_sync_cycle) { 461 if (continue_sync_cycle) {
375 nudge_source = kContinuation; 462 nudge_source = kContinuation;
376 } 463 }
377 // Update the nudge source if a new nudge has come through during the 464 // Update the nudge source if a new nudge has come through during the
378 // previous sync cycle. 465 // previous sync cycle.
379 while (!nudge_queue_.empty() && !(now < nudge_queue_.top().first)) { 466 while (!vault_.nudge_queue_.empty() &&
467 TimeTicks::Now() >= vault_.nudge_queue_.top().first) {
380 if (!nudged) { 468 if (!nudged) {
381 nudge_source = nudge_queue_.top().second; 469 nudge_source = vault_.nudge_queue_.top().second;
382 *continue_sync_cycle = false; // Reset the continuation token on nudge. 470 *continue_sync_cycle = false; // Reset the continuation token on nudge.
383 nudged = true; 471 nudged = true;
384 } 472 }
385 nudge_queue_.pop(); 473 vault_.nudge_queue_.pop();
386 } 474 }
387 SetUpdatesSource(nudged, nudge_source, initial_sync); 475 SetUpdatesSource(nudged, nudge_source, initial_sync);
388 } 476 }
389 477
390 void SyncerThread::SetUpdatesSource(bool nudged, NudgeSource nudge_source, 478 void SyncerThread::SetUpdatesSource(bool nudged, NudgeSource nudge_source,
391 bool* initial_sync) { 479 bool* initial_sync) {
392 sync_pb::GetUpdatesCallerInfo::GET_UPDATES_SOURCE updates_source = 480 sync_pb::GetUpdatesCallerInfo::GET_UPDATES_SOURCE updates_source =
393 sync_pb::GetUpdatesCallerInfo::UNKNOWN; 481 sync_pb::GetUpdatesCallerInfo::UNKNOWN;
394 if (*initial_sync) { 482 if (*initial_sync) {
395 updates_source = sync_pb::GetUpdatesCallerInfo::FIRST_UPDATE; 483 updates_source = sync_pb::GetUpdatesCallerInfo::FIRST_UPDATE;
(...skipping 10 matching lines...) Expand all
406 break; 494 break;
407 case kContinuation: 495 case kContinuation:
408 updates_source = sync_pb::GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION; 496 updates_source = sync_pb::GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION;
409 break; 497 break;
410 case kUnknown: 498 case kUnknown:
411 default: 499 default:
412 updates_source = sync_pb::GetUpdatesCallerInfo::UNKNOWN; 500 updates_source = sync_pb::GetUpdatesCallerInfo::UNKNOWN;
413 break; 501 break;
414 } 502 }
415 } 503 }
416 syncer_->set_updates_source(updates_source); 504 vault_.syncer_->set_updates_source(updates_source);
417 } 505 }
418 506
419 void SyncerThread::HandleSyncerEvent(const SyncerEvent& event) { 507 void SyncerThread::HandleSyncerEvent(const SyncerEvent& event) {
420 MutexLock lock(&mutex_); 508 AutoLock lock(lock_);
421 channel()->NotifyListeners(event); 509 channel()->NotifyListeners(event);
422 if (SyncerEvent::REQUEST_SYNC_NUDGE != event.what_happened) { 510 if (SyncerEvent::REQUEST_SYNC_NUDGE != event.what_happened) {
423 return; 511 return;
424 } 512 }
425 NudgeSyncImpl(event.nudge_delay_milliseconds, kUnknown); 513 NudgeSyncImpl(event.nudge_delay_milliseconds, kUnknown);
426 } 514 }
427 515
428 void SyncerThread::HandleDirectoryManagerEvent( 516 void SyncerThread::HandleDirectoryManagerEvent(
429 const syncable::DirectoryManagerEvent& event) { 517 const syncable::DirectoryManagerEvent& event) {
430 LOG(INFO) << "Handling a directory manager event"; 518 LOG(INFO) << "Handling a directory manager event";
431 if (syncable::DirectoryManagerEvent::OPENED == event.what_happened) { 519 if (syncable::DirectoryManagerEvent::OPENED == event.what_happened) {
432 MutexLock lock(&mutex_); 520 AutoLock lock(lock_);
433 LOG(INFO) << "Syncer starting up for: " << event.dirname; 521 LOG(INFO) << "Syncer starting up for: " << event.dirname;
434 // The underlying database structure is ready, and we should create 522 // The underlying database structure is ready, and we should create
435 // the syncer. 523 // the syncer.
436 CHECK(syncer_ == NULL); 524 CHECK(vault_.syncer_ == NULL);
437 syncer_ = 525 vault_.syncer_ =
438 new Syncer(dirman_, event.dirname, scm_, model_safe_worker_.get()); 526 new Syncer(dirman_, event.dirname, scm_, model_safe_worker_.get());
439 527
440 syncer_->set_command_channel(command_channel_); 528 vault_.syncer_->set_command_channel(command_channel_);
441 syncer_events_.reset(NewEventListenerHookup( 529 syncer_events_.reset(NewEventListenerHookup(
442 syncer_->channel(), this, &SyncerThread::HandleSyncerEvent)); 530 vault_.syncer_->channel(), this, &SyncerThread::HandleSyncerEvent));
443 pthread_cond_broadcast(&changed_.condvar_); 531 vault_field_changed_.Broadcast();
444 } 532 }
445 } 533 }
446 534
447 static inline void CheckConnected(bool* connected, 535 static inline void CheckConnected(bool* connected,
448 HttpResponse::ServerConnectionCode code, 536 HttpResponse::ServerConnectionCode code,
449 pthread_cond_t* condvar) { 537 ConditionVariable* condvar) {
450 if (*connected) { 538 if (*connected) {
451 if (HttpResponse::CONNECTION_UNAVAILABLE == code) { 539 if (HttpResponse::CONNECTION_UNAVAILABLE == code) {
452 *connected = false; 540 *connected = false;
453 pthread_cond_broadcast(condvar); 541 condvar->Broadcast();
454 } 542 }
455 } else { 543 } else {
456 if (HttpResponse::SERVER_CONNECTION_OK == code) { 544 if (HttpResponse::SERVER_CONNECTION_OK == code) {
457 *connected = true; 545 *connected = true;
458 pthread_cond_broadcast(condvar); 546 condvar->Broadcast();
459 } 547 }
460 } 548 }
461 } 549 }
462 550
463 void SyncerThread::WatchConnectionManager(ServerConnectionManager* conn_mgr) { 551 void SyncerThread::WatchConnectionManager(ServerConnectionManager* conn_mgr) {
464 conn_mgr_hookup_.reset(NewEventListenerHookup(conn_mgr->channel(), this, 552 conn_mgr_hookup_.reset(NewEventListenerHookup(conn_mgr->channel(), this,
465 &SyncerThread::HandleServerConnectionEvent)); 553 &SyncerThread::HandleServerConnectionEvent));
466 CheckConnected(&connected_, conn_mgr->server_status(), 554 CheckConnected(&vault_.connected_, conn_mgr->server_status(),
467 &changed_.condvar_); 555 &vault_field_changed_);
468 } 556 }
469 557
470 void SyncerThread::HandleServerConnectionEvent( 558 void SyncerThread::HandleServerConnectionEvent(
471 const ServerConnectionEvent& event) { 559 const ServerConnectionEvent& event) {
472 if (ServerConnectionEvent::STATUS_CHANGED == event.what_happened) { 560 if (ServerConnectionEvent::STATUS_CHANGED == event.what_happened) {
473 MutexLock lock(&mutex_); 561 AutoLock lock(lock_);
474 CheckConnected(&connected_, event.connection_code, 562 CheckConnected(&vault_.connected_, event.connection_code,
475 &changed_.condvar_); 563 &vault_field_changed_);
476 } 564 }
477 } 565 }
478 566
479 SyncerEventChannel* SyncerThread::channel() { 567 SyncerEventChannel* SyncerThread::channel() {
480 return syncer_event_channel_.get(); 568 return syncer_event_channel_.get();
481 } 569 }
482 570
483 // Inputs and return value in milliseconds. 571 // Inputs and return value in milliseconds.
484 int SyncerThread::CalculateSyncWaitTime(int last_interval, int user_idle_ms) { 572 int SyncerThread::CalculateSyncWaitTime(int last_interval, int user_idle_ms) {
485 // syncer_polling_interval_ is in seconds 573 // syncer_polling_interval_ is in seconds
(...skipping 11 matching lines...) Expand all
497 next_wait = std::min(AllStatus::GetRecommendedDelaySeconds( 585 next_wait = std::min(AllStatus::GetRecommendedDelaySeconds(
498 last_interval / 1000), syncer_max_interval_ / 1000) * 1000; 586 last_interval / 1000), syncer_max_interval_ / 1000) * 1000;
499 } 587 }
500 588
501 return next_wait; 589 return next_wait;
502 } 590 }
503 591
504 // Called with mutex_ already locked. 592 // Called with mutex_ already locked.
505 void SyncerThread::NudgeSyncImpl(int milliseconds_from_now, 593 void SyncerThread::NudgeSyncImpl(int milliseconds_from_now,
506 NudgeSource source) { 594 NudgeSource source) {
507 const timespec nudge_time = GetPThreadAbsoluteTime(milliseconds_from_now); 595 const TimeTicks nudge_time = TimeTicks::Now() +
596 TimeDelta::FromMilliseconds(milliseconds_from_now);
508 NudgeObject nudge_object(nudge_time, source); 597 NudgeObject nudge_object(nudge_time, source);
509 nudge_queue_.push(nudge_object); 598 vault_.nudge_queue_.push(nudge_object);
510 pthread_cond_broadcast(&changed_.condvar_); 599 vault_field_changed_.Broadcast();
511 } 600 }
512 601
513 void SyncerThread::WatchTalkMediator(TalkMediator* mediator) { 602 void SyncerThread::WatchTalkMediator(TalkMediator* mediator) {
514 talk_mediator_hookup_.reset( 603 talk_mediator_hookup_.reset(
515 NewEventListenerHookup( 604 NewEventListenerHookup(
516 mediator->channel(), 605 mediator->channel(),
517 this, 606 this,
518 &SyncerThread::HandleTalkMediatorEvent)); 607 &SyncerThread::HandleTalkMediatorEvent));
519 } 608 }
520 609
521 void SyncerThread::HandleTalkMediatorEvent(const TalkMediatorEvent& event) { 610 void SyncerThread::HandleTalkMediatorEvent(const TalkMediatorEvent& event) {
522 MutexLock lock(&mutex_); 611 AutoLock lock(lock_);
523 switch (event.what_happened) { 612 switch (event.what_happened) {
524 case TalkMediatorEvent::LOGIN_SUCCEEDED: 613 case TalkMediatorEvent::LOGIN_SUCCEEDED:
525 LOG(INFO) << "P2P: Login succeeded."; 614 LOG(INFO) << "P2P: Login succeeded.";
526 p2p_authenticated_ = true; 615 p2p_authenticated_ = true;
527 break; 616 break;
528 case TalkMediatorEvent::LOGOUT_SUCCEEDED: 617 case TalkMediatorEvent::LOGOUT_SUCCEEDED:
529 LOG(INFO) << "P2P: Login succeeded."; 618 LOG(INFO) << "P2P: Login succeeded.";
530 p2p_authenticated_ = false; 619 p2p_authenticated_ = false;
531 break; 620 break;
532 case TalkMediatorEvent::SUBSCRIPTIONS_ON: 621 case TalkMediatorEvent::SUBSCRIPTIONS_ON:
533 LOG(INFO) << "P2P: Subscriptions successfully enabled."; 622 LOG(INFO) << "P2P: Subscriptions successfully enabled.";
534 p2p_subscribed_ = true; 623 p2p_subscribed_ = true;
535 if (NULL != syncer_) { 624 if (NULL != vault_.syncer_) {
536 LOG(INFO) << "Subscriptions on. Nudging syncer for initial push."; 625 LOG(INFO) << "Subscriptions on. Nudging syncer for initial push.";
537 NudgeSyncImpl(0, kLocal); 626 NudgeSyncImpl(0, kLocal);
538 } 627 }
539 break; 628 break;
540 case TalkMediatorEvent::SUBSCRIPTIONS_OFF: 629 case TalkMediatorEvent::SUBSCRIPTIONS_OFF:
541 LOG(INFO) << "P2P: Subscriptions are not enabled."; 630 LOG(INFO) << "P2P: Subscriptions are not enabled.";
542 p2p_subscribed_ = false; 631 p2p_subscribed_ = false;
543 break; 632 break;
544 case TalkMediatorEvent::NOTIFICATION_RECEIVED: 633 case TalkMediatorEvent::NOTIFICATION_RECEIVED:
545 LOG(INFO) << "P2P: Updates on server, pushing syncer"; 634 LOG(INFO) << "P2P: Updates on server, pushing syncer";
546 if (NULL != syncer_) { 635 if (NULL != vault_.syncer_) {
547 NudgeSyncImpl(0, kNotification); 636 NudgeSyncImpl(0, kNotification);
548 } 637 }
549 break; 638 break;
550 default: 639 default:
551 break; 640 break;
552 } 641 }
553 642
554 if (NULL != syncer_) { 643 if (NULL != vault_.syncer_) {
555 syncer_->set_notifications_enabled(p2p_authenticated_ && p2p_subscribed_); 644 vault_.syncer_->set_notifications_enabled(
645 p2p_authenticated_ && p2p_subscribed_);
556 } 646 }
557 } 647 }
558 648
559 } // namespace browser_sync 649 } // namespace browser_sync
OLDNEW
« no previous file with comments | « chrome/browser/sync/engine/syncer_thread.h ('k') | chrome/browser/sync/engine/syncer_thread_pthreads.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698