Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(240)

Side by Side Diff: chrome/browser/sync/engine/syncer_thread.cc

Issue 250001: Second attempt at the new syncer thread impl, now with less crashes!... (Closed) Base URL: svn://chrome-svn/chrome/trunk/src/
Patch Set: '' Created 11 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2009 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2009 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4
5 #include "chrome/browser/sync/engine/syncer_thread.h" 4 #include "chrome/browser/sync/engine/syncer_thread.h"
6 5
7 #include "build/build_config.h" 6 #include "build/build_config.h"
8 7
9 #ifdef OS_MACOSX 8 #ifdef OS_MACOSX
10 #include <CoreFoundation/CFNumber.h> 9 #include <CoreFoundation/CFNumber.h>
11 #include <IOKit/IOTypes.h> 10 #include <IOKit/IOTypes.h>
12 #include <IOKit/IOKitLib.h> 11 #include <IOKit/IOKitLib.h>
13 #endif 12 #endif
14 13
15 #include <algorithm> 14 #include <algorithm>
16 #include <map> 15 #include <map>
17 #include <queue> 16 #include <queue>
18 17
18 #include "base/command_line.h"
19 #include "chrome/browser/sync/engine/auth_watcher.h" 19 #include "chrome/browser/sync/engine/auth_watcher.h"
20 #include "chrome/browser/sync/engine/model_safe_worker.h" 20 #include "chrome/browser/sync/engine/model_safe_worker.h"
21 #include "chrome/browser/sync/engine/net/server_connection_manager.h" 21 #include "chrome/browser/sync/engine/net/server_connection_manager.h"
22 #include "chrome/browser/sync/engine/syncer.h" 22 #include "chrome/browser/sync/engine/syncer.h"
23 #include "chrome/browser/sync/engine/syncer_thread_pthreads.h"
24 #include "chrome/browser/sync/engine/syncer_thread_timed_stop.h"
23 #include "chrome/browser/sync/notifier/listener/talk_mediator.h" 25 #include "chrome/browser/sync/notifier/listener/talk_mediator.h"
24 #include "chrome/browser/sync/notifier/listener/talk_mediator_impl.h" 26 #include "chrome/browser/sync/notifier/listener/talk_mediator_impl.h"
25 #include "chrome/browser/sync/syncable/directory_manager.h" 27 #include "chrome/browser/sync/syncable/directory_manager.h"
28 #include "chrome/common/chrome_switches.h"
26 29
27 using std::priority_queue; 30 using std::priority_queue;
28 using std::min; 31 using std::min;
29 32 using base::Time;
30 static inline bool operator < (const timespec& a, const timespec& b) { 33 using base::TimeDelta;
31 return a.tv_sec == b.tv_sec ? a.tv_nsec < b.tv_nsec : a.tv_sec < b.tv_sec; 34 using base::TimeTicks;
32 }
33 35
34 namespace { 36 namespace {
35 37
36 // Returns the amount of time since the user last interacted with the computer, 38 // Returns the amount of time since the user last interacted with the computer,
37 // in milliseconds 39 // in milliseconds
38 int UserIdleTime() { 40 int UserIdleTime() {
39 #ifdef OS_WIN 41 #ifdef OS_WIN
40 LASTINPUTINFO last_input_info; 42 LASTINPUTINFO last_input_info;
41 last_input_info.cbSize = sizeof(LASTINPUTINFO); 43 last_input_info.cbSize = sizeof(LASTINPUTINFO);
42 44
(...skipping 58 matching lines...) Expand 10 before | Expand all | Expand 10 after
101 } 103 }
102 #endif 104 #endif
103 105
104 return 0; 106 return 0;
105 } 107 }
106 108
107 } // namespace 109 } // namespace
108 110
109 namespace browser_sync { 111 namespace browser_sync {
110 112
113 SyncerThread* SyncerThreadFactory::Create(
114 ClientCommandChannel* command_channel,
115 syncable::DirectoryManager* mgr,
116 ServerConnectionManager* connection_manager, AllStatus* all_status,
117 ModelSafeWorker* model_safe_worker) {
118 const CommandLine* cmd = CommandLine::ForCurrentProcess();
119 if (cmd->HasSwitch(switches::kSyncerThreadTimedStop)) {
120 return new SyncerThreadTimedStop(command_channel, mgr, connection_manager,
121 all_status, model_safe_worker);
122 } else if (cmd->HasSwitch(switches::kSyncerThreadPthreads)) {
123 return new SyncerThreadPthreads(command_channel, mgr, connection_manager,
124 all_status, model_safe_worker);
125 } else {
126 // The default SyncerThread implementation, which does not time-out when
127 // Stop is called.
128 return new SyncerThread(command_channel, mgr, connection_manager,
129 all_status, model_safe_worker);
130 }
131 }
132
111 bool SyncerThread::NudgeSyncer(int milliseconds_from_now, NudgeSource source) { 133 bool SyncerThread::NudgeSyncer(int milliseconds_from_now, NudgeSource source) {
112 MutexLock lock(&mutex_); 134 AutoLock lock(lock_);
113 if (syncer_ == NULL) { 135 if (vault_.syncer_ == NULL) {
114 return false; 136 return false;
115 } 137 }
116 NudgeSyncImpl(milliseconds_from_now, source); 138 NudgeSyncImpl(milliseconds_from_now, source);
117 return true; 139 return true;
118 } 140 }
119 141
120 void* RunSyncerThread(void* syncer_thread) { 142 SyncerThread::SyncerThread()
121 return (reinterpret_cast<SyncerThread*>(syncer_thread))->ThreadMain(); 143 : thread_main_started_(false, false),
144 thread_("SyncEngine_SyncerThread"),
145 vault_field_changed_(&lock_),
146 p2p_authenticated_(false),
147 p2p_subscribed_(false),
148 client_command_hookup_(NULL),
149 conn_mgr_hookup_(NULL),
150 allstatus_(NULL),
151 dirman_(NULL),
152 scm_(NULL),
153 syncer_short_poll_interval_seconds_(kDefaultShortPollIntervalSeconds),
154 syncer_long_poll_interval_seconds_(kDefaultLongPollIntervalSeconds),
155 syncer_polling_interval_(kDefaultShortPollIntervalSeconds),
156 syncer_max_interval_(kDefaultMaxPollIntervalMs),
157 talk_mediator_hookup_(NULL),
158 command_channel_(NULL),
159 directory_manager_hookup_(NULL),
160 syncer_events_(NULL),
161 model_safe_worker_(NULL),
162 disable_idle_detection_(false) {
122 } 163 }
123 164
124 SyncerThread::SyncerThread( 165 SyncerThread::SyncerThread(
125 ClientCommandChannel* command_channel, 166 ClientCommandChannel* command_channel,
126 syncable::DirectoryManager* mgr, 167 syncable::DirectoryManager* mgr,
127 ServerConnectionManager* connection_manager, 168 ServerConnectionManager* connection_manager,
128 AllStatus* all_status, 169 AllStatus* all_status,
129 ModelSafeWorker* model_safe_worker) 170 ModelSafeWorker* model_safe_worker)
130 : stop_syncer_thread_(false), 171 : thread_main_started_(false, false),
131 thread_running_(false), 172 thread_("SyncEngine_SyncerThread"),
132 connected_(false), 173 vault_field_changed_(&lock_),
133 p2p_authenticated_(false), 174 p2p_authenticated_(false),
134 p2p_subscribed_(false), 175 p2p_subscribed_(false),
135 client_command_hookup_(NULL), 176 client_command_hookup_(NULL),
136 conn_mgr_hookup_(NULL), 177 conn_mgr_hookup_(NULL),
137 allstatus_(all_status), 178 allstatus_(all_status),
138 syncer_(NULL),
139 dirman_(mgr), 179 dirman_(mgr),
140 scm_(connection_manager), 180 scm_(connection_manager),
141 syncer_short_poll_interval_seconds_(kDefaultShortPollIntervalSeconds), 181 syncer_short_poll_interval_seconds_(kDefaultShortPollIntervalSeconds),
142 syncer_long_poll_interval_seconds_(kDefaultLongPollIntervalSeconds), 182 syncer_long_poll_interval_seconds_(kDefaultLongPollIntervalSeconds),
143 syncer_polling_interval_(kDefaultShortPollIntervalSeconds), 183 syncer_polling_interval_(kDefaultShortPollIntervalSeconds),
144 syncer_max_interval_(kDefaultMaxPollIntervalMs), 184 syncer_max_interval_(kDefaultMaxPollIntervalMs),
145 talk_mediator_hookup_(NULL), 185 talk_mediator_hookup_(NULL),
146 command_channel_(command_channel), 186 command_channel_(command_channel),
147 directory_manager_hookup_(NULL), 187 directory_manager_hookup_(NULL),
148 syncer_events_(NULL), 188 syncer_events_(NULL),
(...skipping 16 matching lines...) Expand all
165 WatchClientCommands(command_channel_); 205 WatchClientCommands(command_channel_);
166 } 206 }
167 } 207 }
168 208
169 SyncerThread::~SyncerThread() { 209 SyncerThread::~SyncerThread() {
170 client_command_hookup_.reset(); 210 client_command_hookup_.reset();
171 conn_mgr_hookup_.reset(); 211 conn_mgr_hookup_.reset();
172 syncer_event_channel_.reset(); 212 syncer_event_channel_.reset();
173 directory_manager_hookup_.reset(); 213 directory_manager_hookup_.reset();
174 syncer_events_.reset(); 214 syncer_events_.reset();
175 delete syncer_; 215 delete vault_.syncer_;
176 talk_mediator_hookup_.reset(); 216 talk_mediator_hookup_.reset();
177 CHECK(!thread_running_); 217 CHECK(!thread_.IsRunning());
178 } 218 }
179 219
180 // Creates and starts a syncer thread. 220 // Creates and starts a syncer thread.
181 // Returns true if it creates a thread or if there's currently a thread running 221 // Returns true if it creates a thread or if there's currently a thread running
182 // and false otherwise. 222 // and false otherwise.
183 bool SyncerThread::Start() { 223 bool SyncerThread::Start() {
184 MutexLock lock(&mutex_); 224 {
185 if (thread_running_) { 225 AutoLock lock(lock_);
186 return true; 226 if (thread_.IsRunning()) {
227 return true;
228 }
229
230 if (!thread_.Start()) {
231 return false;
232 }
187 } 233 }
188 thread_running_ = 234
189 (0 == pthread_create(&thread_, NULL, RunSyncerThread, this)); 235 thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod(this,
190 if (thread_running_) { 236 &SyncerThread::ThreadMain));
191 pthread_detach(thread_); 237
192 } 238 // Wait for notification that our task makes it safely onto the message
193 return thread_running_; 239 // loop before returning, so the caller can't call Stop before we're
240 // actually up and running. This is for consistency with the old pthread
241 // impl because pthread_create would do this in one step.
242 thread_main_started_.Wait();
243 LOG(INFO) << "SyncerThread started.";
244 return true;
194 } 245 }
195 246
196 // Stop processing. A max wait of at least 2*server RTT time is recommended. 247 // Stop processing. A max wait of at least 2*server RTT time is recommended.
197 // Returns true if we stopped, false otherwise. 248 // Returns true if we stopped, false otherwise.
198 bool SyncerThread::Stop(int max_wait) { 249 bool SyncerThread::Stop(int max_wait) {
199 MutexLock lock(&mutex_); 250 {
200 if (!thread_running_) 251 AutoLock lock(lock_);
201 return true; 252 // If the thread has been started, then we either already have or are about
202 stop_syncer_thread_ = true; 253 // to enter ThreadMainLoop so we have to proceed with shutdown and wait for
203 if (NULL != syncer_) { 254 // it to finish. If the thread has not been started --and we now own the
204 // Try to early exit the syncer. 255 // lock-- then we can early out because the caller has not called Start().
205 syncer_->RequestEarlyExit(); 256 if (!thread_.IsRunning())
257 return true;
258
259 LOG(INFO) << "SyncerThread::Stop - setting ThreadMain exit condition to "
260 << "true (vault_.stop_syncer_thread_)";
261 // Exit the ThreadMainLoop once the syncer finishes (we tell it to exit
262 // below).
263 vault_.stop_syncer_thread_ = true;
264 if (NULL != vault_.syncer_) {
265 // Try to early exit the syncer itself, which could be looping inside
266 // SyncShare.
267 vault_.syncer_->RequestEarlyExit();
268 }
269
270 // stop_syncer_thread_ is now true and the Syncer has been told to exit.
271 // We want to wake up all waiters so they can re-examine state. We signal,
272 // causing all waiters to try to re-acquire the lock, and then we release
273 // the lock, and join on our internal thread which should soon run off the
274 // end of ThreadMain.
275 vault_field_changed_.Broadcast();
206 } 276 }
207 pthread_cond_broadcast(&changed_.condvar_); 277
208 timespec deadline = { time(NULL) + (max_wait / 1000), 0 }; 278 // This will join, and finish when ThreadMain terminates.
209 do { 279 thread_.Stop();
210 const int wait_result = max_wait < 0 ?
211 pthread_cond_wait(&changed_.condvar_, &mutex_.mutex_) :
212 pthread_cond_timedwait(&changed_.condvar_, &mutex_.mutex_,
213 &deadline);
214 if (ETIMEDOUT == wait_result) {
215 LOG(ERROR) << "SyncerThread::Stop timed out. Problems likely.";
216 return false;
217 }
218 } while (thread_running_);
219 return true; 280 return true;
220 } 281 }
221 282
222 void SyncerThread::WatchClientCommands(ClientCommandChannel* channel) { 283 void SyncerThread::WatchClientCommands(ClientCommandChannel* channel) {
223 PThreadScopedLock<PThreadMutex> lock(&mutex_); 284 AutoLock lock(lock_);
224 client_command_hookup_.reset(NewEventListenerHookup(channel, this, 285 client_command_hookup_.reset(NewEventListenerHookup(channel, this,
225 &SyncerThread::HandleClientCommand)); 286 &SyncerThread::HandleClientCommand));
226 } 287 }
227 288
228 void SyncerThread::HandleClientCommand(ClientCommandChannel::EventType event) { 289 void SyncerThread::HandleClientCommand(ClientCommandChannel::EventType event) {
229 if (!event) { 290 if (!event) {
230 return; 291 return;
231 } 292 }
232 293
233 // Mutex not really necessary for these. 294 // Mutex not really necessary for these.
234 if (event->has_set_sync_poll_interval()) { 295 if (event->has_set_sync_poll_interval()) {
235 syncer_short_poll_interval_seconds_ = event->set_sync_poll_interval(); 296 syncer_short_poll_interval_seconds_ = event->set_sync_poll_interval();
236 } 297 }
237 298
238 if (event->has_set_sync_long_poll_interval()) { 299 if (event->has_set_sync_long_poll_interval()) {
239 syncer_long_poll_interval_seconds_ = event->set_sync_long_poll_interval(); 300 syncer_long_poll_interval_seconds_ = event->set_sync_long_poll_interval();
240 } 301 }
241 } 302 }
242 303
243 void SyncerThread::ThreadMainLoop() { 304 void SyncerThread::ThreadMainLoop() {
305 // This is called with lock_ acquired.
306 lock_.AssertAcquired();
307 LOG(INFO) << "In thread main loop.";
308
244 // Use the short poll value by default. 309 // Use the short poll value by default.
245 int poll_seconds = syncer_short_poll_interval_seconds_; 310 TimeDelta poll_seconds =
311 TimeDelta::FromSeconds(syncer_short_poll_interval_seconds_);
246 int user_idle_milliseconds = 0; 312 int user_idle_milliseconds = 0;
247 timespec last_sync_time = { 0 }; 313 TimeTicks last_sync_time;
248 bool initial_sync_for_thread = true; 314 bool initial_sync_for_thread = true;
249 bool continue_sync_cycle = false; 315 bool continue_sync_cycle = false;
250 316
251 while (!stop_syncer_thread_) { 317 while (!vault_.stop_syncer_thread_) {
252 if (!connected_) { 318 // The Wait()s in these conditionals using |vault_| are not TimedWait()s (as
319 // below) because we cannot poll until these conditions are met, so we wait
320 // indefinitely.
321 if (!vault_.connected_) {
253 LOG(INFO) << "Syncer thread waiting for connection."; 322 LOG(INFO) << "Syncer thread waiting for connection.";
254 while (!connected_ && !stop_syncer_thread_) 323 while (!vault_.connected_ && !vault_.stop_syncer_thread_)
255 pthread_cond_wait(&changed_.condvar_, &mutex_.mutex_); 324 vault_field_changed_.Wait();
256 LOG_IF(INFO, connected_) << "Syncer thread found connection."; 325 LOG_IF(INFO, vault_.connected_) << "Syncer thread found connection.";
257 continue; 326 continue;
258 } 327 }
259 328
260 if (syncer_ == NULL) { 329 if (vault_.syncer_ == NULL) {
261 LOG(INFO) << "Syncer thread waiting for database initialization."; 330 LOG(INFO) << "Syncer thread waiting for database initialization.";
262 while (syncer_ == NULL && !stop_syncer_thread_) 331 while (vault_.syncer_ == NULL && !vault_.stop_syncer_thread_)
263 pthread_cond_wait(&changed_.condvar_, &mutex_.mutex_); 332 vault_field_changed_.Wait();
264 LOG_IF(INFO, !(syncer_ == NULL)) << "Syncer was found after DB started."; 333 LOG_IF(INFO, !(vault_.syncer_ == NULL))
334 << "Syncer was found after DB started.";
265 continue; 335 continue;
266 } 336 }
267 337
268 timespec const next_poll = { last_sync_time.tv_sec + poll_seconds, 338 const TimeTicks next_poll = last_sync_time + poll_seconds;
269 last_sync_time.tv_nsec }; 339 const TimeTicks end_wait =
270 const timespec wake_time = 340 !vault_.nudge_queue_.empty() &&
271 !nudge_queue_.empty() && nudge_queue_.top().first < next_poll ? 341 vault_.nudge_queue_.top().first < next_poll ?
272 nudge_queue_.top().first : next_poll; 342 vault_.nudge_queue_.top().first : next_poll;
273 LOG(INFO) << "wake time is " << wake_time.tv_sec; 343 LOG(INFO) << "end_wait is " << end_wait.ToInternalValue();
274 LOG(INFO) << "next poll is " << next_poll.tv_sec; 344 LOG(INFO) << "next_poll is " << next_poll.ToInternalValue();
275 345
276 const int error = pthread_cond_timedwait(&changed_.condvar_, &mutex_.mutex_, 346 // We block until the CV is signaled (e.g a control field changed, loss of
277 &wake_time); 347 // network connection, nudge, spurious, etc), or the poll interval elapses.
278 if (ETIMEDOUT != error) { 348 TimeDelta sleep_time = end_wait - TimeTicks::Now();
279 continue; // Check all the conditions again. 349 if (sleep_time > TimeDelta::FromSeconds(0)) {
350 vault_field_changed_.TimedWait(sleep_time);
351
352 if (TimeTicks::Now() < end_wait) {
353 // Didn't timeout. Could be a spurious signal, or a signal corresponding
354 // to an actual change in one of our control fields. By continuing here
355 // we perform the typical "always recheck conditions when signaled",
356 // (typically handled by a while(condition_not_met) cv.wait() construct)
357 // because we jump to the top of the loop. The main difference is we
358 // recalculate the wait interval, but last_sync_time won't have changed.
359 // So if we were signaled by a nudge (for ex.) we'll grab the new nudge
360 // off the queue and wait for that delta. If it was a spurious signal,
361 // we'll keep waiting for the same moment in time as we just were.
362 continue;
363 }
280 } 364 }
281 365
282 const timespec now = GetPThreadAbsoluteTime(0);
283
284 // Handle a nudge, caused by either a notification or a local bookmark 366 // Handle a nudge, caused by either a notification or a local bookmark
285 // event. This will also update the source of the following SyncMain call. 367 // event. This will also update the source of the following SyncMain call.
286 UpdateNudgeSource(now, &continue_sync_cycle, &initial_sync_for_thread); 368 UpdateNudgeSource(&continue_sync_cycle, &initial_sync_for_thread);
287 369
288 LOG(INFO) << "Calling Sync Main at time " << now.tv_sec; 370 LOG(INFO) << "Calling Sync Main at time " << Time::Now().ToInternalValue();
289 SyncMain(syncer_); 371 SyncMain(vault_.syncer_);
290 last_sync_time = now; 372 last_sync_time = TimeTicks::Now();
291 373
292 LOG(INFO) << "Updating the next polling time after SyncMain"; 374 LOG(INFO) << "Updating the next polling time after SyncMain";
293 poll_seconds = CalculatePollingWaitTime(allstatus_->status(), 375 poll_seconds = TimeDelta::FromSeconds(CalculatePollingWaitTime(
294 poll_seconds, 376 allstatus_->status(), static_cast<int>(poll_seconds.InSeconds()),
295 &user_idle_milliseconds, 377 &user_idle_milliseconds, &continue_sync_cycle));
296 &continue_sync_cycle);
297 } 378 }
379
298 } 380 }
299 381
300 // We check how long the user's been idle and sync less often if the machine is 382 // We check how long the user's been idle and sync less often if the machine is
301 // not in use. The aim is to reduce server load. 383 // not in use. The aim is to reduce server load.
384 // TODO(timsteele): Should use Time(Delta).
302 int SyncerThread::CalculatePollingWaitTime( 385 int SyncerThread::CalculatePollingWaitTime(
303 const AllStatus::Status& status, 386 const AllStatus::Status& status,
304 int last_poll_wait, // in s 387 int last_poll_wait, // Time in seconds.
305 int* user_idle_milliseconds, 388 int* user_idle_milliseconds,
306 bool* continue_sync_cycle) { 389 bool* continue_sync_cycle) {
307 bool is_continuing_sync_cyle = *continue_sync_cycle; 390 bool is_continuing_sync_cyle = *continue_sync_cycle;
308 *continue_sync_cycle = false; 391 *continue_sync_cycle = false;
309 392
310 // Determine if the syncer has unfinished work to do from allstatus_. 393 // Determine if the syncer has unfinished work to do from allstatus_.
311 const bool syncer_has_work_to_do = 394 const bool syncer_has_work_to_do =
312 status.updates_available > status.updates_received 395 status.updates_available > status.updates_received
313 || status.unsynced_count > 0; 396 || status.unsynced_count > 0;
314 LOG(INFO) << "syncer_has_work_to_do is " << syncer_has_work_to_do; 397 LOG(INFO) << "syncer_has_work_to_do is " << syncer_has_work_to_do;
(...skipping 30 matching lines...) Expand all
345 *user_idle_milliseconds) / 1000; 428 *user_idle_milliseconds) / 1000;
346 DCHECK_GE(actual_next_wait, default_next_wait); 429 DCHECK_GE(actual_next_wait, default_next_wait);
347 } 430 }
348 431
349 LOG(INFO) << "Sync wait: idle " << default_next_wait 432 LOG(INFO) << "Sync wait: idle " << default_next_wait
350 << " non-idle or backoff " << actual_next_wait << "."; 433 << " non-idle or backoff " << actual_next_wait << ".";
351 434
352 return actual_next_wait; 435 return actual_next_wait;
353 } 436 }
354 437
355 void* SyncerThread::ThreadMain() { 438 void SyncerThread::ThreadMain() {
356 NameCurrentThreadForDebugging("SyncEngine_SyncerThread"); 439 AutoLock lock(lock_);
357 mutex_.Lock(); 440 // Signal Start() to let it know we've made it safely onto the message loop,
441 // and unblock it's caller.
442 thread_main_started_.Signal();
358 ThreadMainLoop(); 443 ThreadMainLoop();
359 thread_running_ = false; 444 LOG(INFO) << "Syncer thread ThreadMain is done.";
360 pthread_cond_broadcast(&changed_.condvar_);
361 mutex_.Unlock();
362 LOG(INFO) << "Syncer thread exiting.";
363 return 0;
364 } 445 }
365 446
366 void SyncerThread::SyncMain(Syncer* syncer) { 447 void SyncerThread::SyncMain(Syncer* syncer) {
367 CHECK(syncer); 448 CHECK(syncer);
368 mutex_.Unlock(); 449 AutoUnlock unlock(lock_);
369 while (syncer->SyncShare()) { 450 while (syncer->SyncShare()) {
370 LOG(INFO) << "Looping in sync share"; 451 LOG(INFO) << "Looping in sync share";
371 } 452 }
372 LOG(INFO) << "Done looping in sync share"; 453 LOG(INFO) << "Done looping in sync share";
373
374 mutex_.Lock();
375 } 454 }
376 455
377 void SyncerThread::UpdateNudgeSource(const timespec& now, 456 void SyncerThread::UpdateNudgeSource(bool* continue_sync_cycle,
378 bool* continue_sync_cycle,
379 bool* initial_sync) { 457 bool* initial_sync) {
380 bool nudged = false; 458 bool nudged = false;
381 NudgeSource nudge_source = kUnknown; 459 NudgeSource nudge_source = kUnknown;
382 // Has the previous sync cycle completed? 460 // Has the previous sync cycle completed?
383 if (continue_sync_cycle) { 461 if (continue_sync_cycle) {
384 nudge_source = kContinuation; 462 nudge_source = kContinuation;
385 } 463 }
386 // Update the nudge source if a new nudge has come through during the 464 // Update the nudge source if a new nudge has come through during the
387 // previous sync cycle. 465 // previous sync cycle.
388 while (!nudge_queue_.empty() && !(now < nudge_queue_.top().first)) { 466 while (!vault_.nudge_queue_.empty() &&
467 TimeTicks::Now() >= vault_.nudge_queue_.top().first) {
389 if (!nudged) { 468 if (!nudged) {
390 nudge_source = nudge_queue_.top().second; 469 nudge_source = vault_.nudge_queue_.top().second;
391 *continue_sync_cycle = false; // Reset the continuation token on nudge. 470 *continue_sync_cycle = false; // Reset the continuation token on nudge.
392 nudged = true; 471 nudged = true;
393 } 472 }
394 nudge_queue_.pop(); 473 vault_.nudge_queue_.pop();
395 } 474 }
396 SetUpdatesSource(nudged, nudge_source, initial_sync); 475 SetUpdatesSource(nudged, nudge_source, initial_sync);
397 } 476 }
398 477
399 void SyncerThread::SetUpdatesSource(bool nudged, NudgeSource nudge_source, 478 void SyncerThread::SetUpdatesSource(bool nudged, NudgeSource nudge_source,
400 bool* initial_sync) { 479 bool* initial_sync) {
401 sync_pb::GetUpdatesCallerInfo::GET_UPDATES_SOURCE updates_source = 480 sync_pb::GetUpdatesCallerInfo::GET_UPDATES_SOURCE updates_source =
402 sync_pb::GetUpdatesCallerInfo::UNKNOWN; 481 sync_pb::GetUpdatesCallerInfo::UNKNOWN;
403 if (*initial_sync) { 482 if (*initial_sync) {
404 updates_source = sync_pb::GetUpdatesCallerInfo::FIRST_UPDATE; 483 updates_source = sync_pb::GetUpdatesCallerInfo::FIRST_UPDATE;
(...skipping 10 matching lines...) Expand all
415 break; 494 break;
416 case kContinuation: 495 case kContinuation:
417 updates_source = sync_pb::GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION; 496 updates_source = sync_pb::GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION;
418 break; 497 break;
419 case kUnknown: 498 case kUnknown:
420 default: 499 default:
421 updates_source = sync_pb::GetUpdatesCallerInfo::UNKNOWN; 500 updates_source = sync_pb::GetUpdatesCallerInfo::UNKNOWN;
422 break; 501 break;
423 } 502 }
424 } 503 }
425 syncer_->set_updates_source(updates_source); 504 vault_.syncer_->set_updates_source(updates_source);
426 } 505 }
427 506
428 void SyncerThread::HandleSyncerEvent(const SyncerEvent& event) { 507 void SyncerThread::HandleSyncerEvent(const SyncerEvent& event) {
429 MutexLock lock(&mutex_); 508 AutoLock lock(lock_);
430 channel()->NotifyListeners(event); 509 channel()->NotifyListeners(event);
431 if (SyncerEvent::REQUEST_SYNC_NUDGE != event.what_happened) { 510 if (SyncerEvent::REQUEST_SYNC_NUDGE != event.what_happened) {
432 return; 511 return;
433 } 512 }
434 NudgeSyncImpl(event.nudge_delay_milliseconds, kUnknown); 513 NudgeSyncImpl(event.nudge_delay_milliseconds, kUnknown);
435 } 514 }
436 515
437 void SyncerThread::HandleDirectoryManagerEvent( 516 void SyncerThread::HandleDirectoryManagerEvent(
438 const syncable::DirectoryManagerEvent& event) { 517 const syncable::DirectoryManagerEvent& event) {
439 LOG(INFO) << "Handling a directory manager event"; 518 LOG(INFO) << "Handling a directory manager event";
440 if (syncable::DirectoryManagerEvent::OPENED == event.what_happened) { 519 if (syncable::DirectoryManagerEvent::OPENED == event.what_happened) {
441 MutexLock lock(&mutex_); 520 AutoLock lock(lock_);
442 LOG(INFO) << "Syncer starting up for: " << event.dirname; 521 LOG(INFO) << "Syncer starting up for: " << event.dirname;
443 // The underlying database structure is ready, and we should create 522 // The underlying database structure is ready, and we should create
444 // the syncer. 523 // the syncer.
445 CHECK(syncer_ == NULL); 524 CHECK(vault_.syncer_ == NULL);
446 syncer_ = 525 vault_.syncer_ =
447 new Syncer(dirman_, event.dirname, scm_, model_safe_worker_.get()); 526 new Syncer(dirman_, event.dirname, scm_, model_safe_worker_.get());
448 527
449 syncer_->set_command_channel(command_channel_); 528 vault_.syncer_->set_command_channel(command_channel_);
450 syncer_events_.reset(NewEventListenerHookup( 529 syncer_events_.reset(NewEventListenerHookup(
451 syncer_->channel(), this, &SyncerThread::HandleSyncerEvent)); 530 vault_.syncer_->channel(), this, &SyncerThread::HandleSyncerEvent));
452 pthread_cond_broadcast(&changed_.condvar_); 531 vault_field_changed_.Broadcast();
453 } 532 }
454 } 533 }
455 534
456 static inline void CheckConnected(bool* connected, 535 static inline void CheckConnected(bool* connected,
457 HttpResponse::ServerConnectionCode code, 536 HttpResponse::ServerConnectionCode code,
458 pthread_cond_t* condvar) { 537 ConditionVariable* condvar) {
459 if (*connected) { 538 if (*connected) {
460 if (HttpResponse::CONNECTION_UNAVAILABLE == code) { 539 if (HttpResponse::CONNECTION_UNAVAILABLE == code) {
461 *connected = false; 540 *connected = false;
462 pthread_cond_broadcast(condvar); 541 condvar->Broadcast();
463 } 542 }
464 } else { 543 } else {
465 if (HttpResponse::SERVER_CONNECTION_OK == code) { 544 if (HttpResponse::SERVER_CONNECTION_OK == code) {
466 *connected = true; 545 *connected = true;
467 pthread_cond_broadcast(condvar); 546 condvar->Broadcast();
468 } 547 }
469 } 548 }
470 } 549 }
471 550
472 void SyncerThread::WatchConnectionManager(ServerConnectionManager* conn_mgr) { 551 void SyncerThread::WatchConnectionManager(ServerConnectionManager* conn_mgr) {
473 conn_mgr_hookup_.reset(NewEventListenerHookup(conn_mgr->channel(), this, 552 conn_mgr_hookup_.reset(NewEventListenerHookup(conn_mgr->channel(), this,
474 &SyncerThread::HandleServerConnectionEvent)); 553 &SyncerThread::HandleServerConnectionEvent));
475 CheckConnected(&connected_, conn_mgr->server_status(), 554 CheckConnected(&vault_.connected_, conn_mgr->server_status(),
476 &changed_.condvar_); 555 &vault_field_changed_);
477 } 556 }
478 557
479 void SyncerThread::HandleServerConnectionEvent( 558 void SyncerThread::HandleServerConnectionEvent(
480 const ServerConnectionEvent& event) { 559 const ServerConnectionEvent& event) {
481 if (ServerConnectionEvent::STATUS_CHANGED == event.what_happened) { 560 if (ServerConnectionEvent::STATUS_CHANGED == event.what_happened) {
482 MutexLock lock(&mutex_); 561 AutoLock lock(lock_);
483 CheckConnected(&connected_, event.connection_code, 562 CheckConnected(&vault_.connected_, event.connection_code,
484 &changed_.condvar_); 563 &vault_field_changed_);
485 } 564 }
486 } 565 }
487 566
488 SyncerEventChannel* SyncerThread::channel() { 567 SyncerEventChannel* SyncerThread::channel() {
489 return syncer_event_channel_.get(); 568 return syncer_event_channel_.get();
490 } 569 }
491 570
492 // Inputs and return value in milliseconds. 571 // Inputs and return value in milliseconds.
493 int SyncerThread::CalculateSyncWaitTime(int last_interval, int user_idle_ms) { 572 int SyncerThread::CalculateSyncWaitTime(int last_interval, int user_idle_ms) {
494 // syncer_polling_interval_ is in seconds 573 // syncer_polling_interval_ is in seconds
(...skipping 11 matching lines...) Expand all
506 next_wait = std::min(AllStatus::GetRecommendedDelaySeconds( 585 next_wait = std::min(AllStatus::GetRecommendedDelaySeconds(
507 last_interval / 1000), syncer_max_interval_ / 1000) * 1000; 586 last_interval / 1000), syncer_max_interval_ / 1000) * 1000;
508 } 587 }
509 588
510 return next_wait; 589 return next_wait;
511 } 590 }
512 591
513 // Called with mutex_ already locked. 592 // Called with mutex_ already locked.
514 void SyncerThread::NudgeSyncImpl(int milliseconds_from_now, 593 void SyncerThread::NudgeSyncImpl(int milliseconds_from_now,
515 NudgeSource source) { 594 NudgeSource source) {
516 const timespec nudge_time = GetPThreadAbsoluteTime(milliseconds_from_now); 595 const TimeTicks nudge_time = TimeTicks::Now() +
596 TimeDelta::FromMilliseconds(milliseconds_from_now);
517 NudgeObject nudge_object(nudge_time, source); 597 NudgeObject nudge_object(nudge_time, source);
518 nudge_queue_.push(nudge_object); 598 vault_.nudge_queue_.push(nudge_object);
519 pthread_cond_broadcast(&changed_.condvar_); 599 vault_field_changed_.Broadcast();
520 } 600 }
521 601
522 void SyncerThread::WatchTalkMediator(TalkMediator* mediator) { 602 void SyncerThread::WatchTalkMediator(TalkMediator* mediator) {
523 talk_mediator_hookup_.reset( 603 talk_mediator_hookup_.reset(
524 NewEventListenerHookup( 604 NewEventListenerHookup(
525 mediator->channel(), 605 mediator->channel(),
526 this, 606 this,
527 &SyncerThread::HandleTalkMediatorEvent)); 607 &SyncerThread::HandleTalkMediatorEvent));
528 } 608 }
529 609
530 void SyncerThread::HandleTalkMediatorEvent(const TalkMediatorEvent& event) { 610 void SyncerThread::HandleTalkMediatorEvent(const TalkMediatorEvent& event) {
531 MutexLock lock(&mutex_); 611 AutoLock lock(lock_);
532 switch (event.what_happened) { 612 switch (event.what_happened) {
533 case TalkMediatorEvent::LOGIN_SUCCEEDED: 613 case TalkMediatorEvent::LOGIN_SUCCEEDED:
534 LOG(INFO) << "P2P: Login succeeded."; 614 LOG(INFO) << "P2P: Login succeeded.";
535 p2p_authenticated_ = true; 615 p2p_authenticated_ = true;
536 break; 616 break;
537 case TalkMediatorEvent::LOGOUT_SUCCEEDED: 617 case TalkMediatorEvent::LOGOUT_SUCCEEDED:
538 LOG(INFO) << "P2P: Login succeeded."; 618 LOG(INFO) << "P2P: Login succeeded.";
539 p2p_authenticated_ = false; 619 p2p_authenticated_ = false;
540 break; 620 break;
541 case TalkMediatorEvent::SUBSCRIPTIONS_ON: 621 case TalkMediatorEvent::SUBSCRIPTIONS_ON:
542 LOG(INFO) << "P2P: Subscriptions successfully enabled."; 622 LOG(INFO) << "P2P: Subscriptions successfully enabled.";
543 p2p_subscribed_ = true; 623 p2p_subscribed_ = true;
544 if (NULL != syncer_) { 624 if (NULL != vault_.syncer_) {
545 LOG(INFO) << "Subscriptions on. Nudging syncer for initial push."; 625 LOG(INFO) << "Subscriptions on. Nudging syncer for initial push.";
546 NudgeSyncImpl(0, kLocal); 626 NudgeSyncImpl(0, kLocal);
547 } 627 }
548 break; 628 break;
549 case TalkMediatorEvent::SUBSCRIPTIONS_OFF: 629 case TalkMediatorEvent::SUBSCRIPTIONS_OFF:
550 LOG(INFO) << "P2P: Subscriptions are not enabled."; 630 LOG(INFO) << "P2P: Subscriptions are not enabled.";
551 p2p_subscribed_ = false; 631 p2p_subscribed_ = false;
552 break; 632 break;
553 case TalkMediatorEvent::NOTIFICATION_RECEIVED: 633 case TalkMediatorEvent::NOTIFICATION_RECEIVED:
554 LOG(INFO) << "P2P: Updates on server, pushing syncer"; 634 LOG(INFO) << "P2P: Updates on server, pushing syncer";
555 if (NULL != syncer_) { 635 if (NULL != vault_.syncer_) {
556 NudgeSyncImpl(0, kNotification); 636 NudgeSyncImpl(0, kNotification);
557 } 637 }
558 break; 638 break;
559 default: 639 default:
560 break; 640 break;
561 } 641 }
562 642
563 if (NULL != syncer_) { 643 if (NULL != vault_.syncer_) {
564 syncer_->set_notifications_enabled(p2p_authenticated_ && p2p_subscribed_); 644 vault_.syncer_->set_notifications_enabled(
645 p2p_authenticated_ && p2p_subscribed_);
565 } 646 }
566 } 647 }
567 648
568 } // namespace browser_sync 649 } // namespace browser_sync
OLDNEW
« no previous file with comments | « chrome/browser/sync/engine/syncer_thread.h ('k') | chrome/browser/sync/engine/syncer_thread_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698