OLD | NEW |
1 // Copyright (c) 2009 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2009 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | |
5 #include "chrome/browser/sync/engine/syncer_thread.h" | 4 #include "chrome/browser/sync/engine/syncer_thread.h" |
6 | 5 |
7 #include "build/build_config.h" | 6 #include "build/build_config.h" |
8 | 7 |
9 #ifdef OS_MACOSX | 8 #ifdef OS_MACOSX |
10 #include <CoreFoundation/CFNumber.h> | 9 #include <CoreFoundation/CFNumber.h> |
11 #include <IOKit/IOTypes.h> | 10 #include <IOKit/IOTypes.h> |
12 #include <IOKit/IOKitLib.h> | 11 #include <IOKit/IOKitLib.h> |
13 #endif | 12 #endif |
14 | 13 |
15 #include <algorithm> | 14 #include <algorithm> |
16 #include <map> | 15 #include <map> |
17 #include <queue> | 16 #include <queue> |
18 | 17 |
| 18 #include "base/command_line.h" |
19 #include "chrome/browser/sync/engine/auth_watcher.h" | 19 #include "chrome/browser/sync/engine/auth_watcher.h" |
20 #include "chrome/browser/sync/engine/model_safe_worker.h" | 20 #include "chrome/browser/sync/engine/model_safe_worker.h" |
21 #include "chrome/browser/sync/engine/net/server_connection_manager.h" | 21 #include "chrome/browser/sync/engine/net/server_connection_manager.h" |
22 #include "chrome/browser/sync/engine/syncer.h" | 22 #include "chrome/browser/sync/engine/syncer.h" |
| 23 #include "chrome/browser/sync/engine/syncer_thread_pthreads.h" |
| 24 #include "chrome/browser/sync/engine/syncer_thread_timed_stop.h" |
23 #include "chrome/browser/sync/notifier/listener/talk_mediator.h" | 25 #include "chrome/browser/sync/notifier/listener/talk_mediator.h" |
24 #include "chrome/browser/sync/notifier/listener/talk_mediator_impl.h" | 26 #include "chrome/browser/sync/notifier/listener/talk_mediator_impl.h" |
25 #include "chrome/browser/sync/syncable/directory_manager.h" | 27 #include "chrome/browser/sync/syncable/directory_manager.h" |
| 28 #include "chrome/common/chrome_switches.h" |
26 | 29 |
27 using std::priority_queue; | 30 using std::priority_queue; |
28 using std::min; | 31 using std::min; |
29 | 32 using base::Time; |
30 static inline bool operator < (const timespec& a, const timespec& b) { | 33 using base::TimeDelta; |
31 return a.tv_sec == b.tv_sec ? a.tv_nsec < b.tv_nsec : a.tv_sec < b.tv_sec; | 34 using base::TimeTicks; |
32 } | |
33 | 35 |
34 namespace { | 36 namespace { |
35 | 37 |
36 // Returns the amount of time since the user last interacted with the computer, | 38 // Returns the amount of time since the user last interacted with the computer, |
37 // in milliseconds | 39 // in milliseconds |
38 int UserIdleTime() { | 40 int UserIdleTime() { |
39 #ifdef OS_WIN | 41 #ifdef OS_WIN |
40 LASTINPUTINFO last_input_info; | 42 LASTINPUTINFO last_input_info; |
41 last_input_info.cbSize = sizeof(LASTINPUTINFO); | 43 last_input_info.cbSize = sizeof(LASTINPUTINFO); |
42 | 44 |
(...skipping 58 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
101 } | 103 } |
102 #endif | 104 #endif |
103 | 105 |
104 return 0; | 106 return 0; |
105 } | 107 } |
106 | 108 |
107 } // namespace | 109 } // namespace |
108 | 110 |
109 namespace browser_sync { | 111 namespace browser_sync { |
110 | 112 |
| 113 SyncerThread* SyncerThreadFactory::Create( |
| 114 ClientCommandChannel* command_channel, |
| 115 syncable::DirectoryManager* mgr, |
| 116 ServerConnectionManager* connection_manager, AllStatus* all_status, |
| 117 ModelSafeWorker* model_safe_worker) { |
| 118 const CommandLine* cmd = CommandLine::ForCurrentProcess(); |
| 119 if (cmd->HasSwitch(switches::kSyncerThreadTimedStop)) { |
| 120 return new SyncerThreadTimedStop(command_channel, mgr, connection_manager, |
| 121 all_status, model_safe_worker); |
| 122 } else if (cmd->HasSwitch(switches::kSyncerThreadPthreads)) { |
| 123 return new SyncerThreadPthreads(command_channel, mgr, connection_manager, |
| 124 all_status, model_safe_worker); |
| 125 } else { |
| 126 // The default SyncerThread implementation, which does not time-out when |
| 127 // Stop is called. |
| 128 return new SyncerThread(command_channel, mgr, connection_manager, |
| 129 all_status, model_safe_worker); |
| 130 } |
| 131 } |
| 132 |
111 bool SyncerThread::NudgeSyncer(int milliseconds_from_now, NudgeSource source) { | 133 bool SyncerThread::NudgeSyncer(int milliseconds_from_now, NudgeSource source) { |
112 MutexLock lock(&mutex_); | 134 AutoLock lock(lock_); |
113 if (syncer_ == NULL) { | 135 if (vault_.syncer_ == NULL) { |
114 return false; | 136 return false; |
115 } | 137 } |
116 NudgeSyncImpl(milliseconds_from_now, source); | 138 NudgeSyncImpl(milliseconds_from_now, source); |
117 return true; | 139 return true; |
118 } | 140 } |
119 | 141 |
120 void* RunSyncerThread(void* syncer_thread) { | 142 SyncerThread::SyncerThread() |
121 return (reinterpret_cast<SyncerThread*>(syncer_thread))->ThreadMain(); | 143 : thread_main_started_(false, false), |
| 144 thread_("SyncEngine_SyncerThread"), |
| 145 vault_field_changed_(&lock_), |
| 146 p2p_authenticated_(false), |
| 147 p2p_subscribed_(false), |
| 148 client_command_hookup_(NULL), |
| 149 conn_mgr_hookup_(NULL), |
| 150 allstatus_(NULL), |
| 151 dirman_(NULL), |
| 152 scm_(NULL), |
| 153 syncer_short_poll_interval_seconds_(kDefaultShortPollIntervalSeconds), |
| 154 syncer_long_poll_interval_seconds_(kDefaultLongPollIntervalSeconds), |
| 155 syncer_polling_interval_(kDefaultShortPollIntervalSeconds), |
| 156 syncer_max_interval_(kDefaultMaxPollIntervalMs), |
| 157 talk_mediator_hookup_(NULL), |
| 158 command_channel_(NULL), |
| 159 directory_manager_hookup_(NULL), |
| 160 syncer_events_(NULL), |
| 161 model_safe_worker_(NULL), |
| 162 disable_idle_detection_(false) { |
122 } | 163 } |
123 | 164 |
124 SyncerThread::SyncerThread( | 165 SyncerThread::SyncerThread( |
125 ClientCommandChannel* command_channel, | 166 ClientCommandChannel* command_channel, |
126 syncable::DirectoryManager* mgr, | 167 syncable::DirectoryManager* mgr, |
127 ServerConnectionManager* connection_manager, | 168 ServerConnectionManager* connection_manager, |
128 AllStatus* all_status, | 169 AllStatus* all_status, |
129 ModelSafeWorker* model_safe_worker) | 170 ModelSafeWorker* model_safe_worker) |
130 : dirman_(mgr), scm_(connection_manager), | 171 : thread_main_started_(false, false), |
131 syncer_(NULL), syncer_events_(NULL), thread_running_(false), | 172 thread_("SyncEngine_SyncerThread"), |
| 173 vault_field_changed_(&lock_), |
| 174 p2p_authenticated_(false), |
| 175 p2p_subscribed_(false), |
| 176 client_command_hookup_(NULL), |
| 177 conn_mgr_hookup_(NULL), |
| 178 allstatus_(all_status), |
| 179 dirman_(mgr), |
| 180 scm_(connection_manager), |
132 syncer_short_poll_interval_seconds_(kDefaultShortPollIntervalSeconds), | 181 syncer_short_poll_interval_seconds_(kDefaultShortPollIntervalSeconds), |
133 syncer_long_poll_interval_seconds_(kDefaultLongPollIntervalSeconds), | 182 syncer_long_poll_interval_seconds_(kDefaultLongPollIntervalSeconds), |
134 syncer_polling_interval_(kDefaultShortPollIntervalSeconds), | 183 syncer_polling_interval_(kDefaultShortPollIntervalSeconds), |
135 syncer_max_interval_(kDefaultMaxPollIntervalMs), | 184 syncer_max_interval_(kDefaultMaxPollIntervalMs), |
136 stop_syncer_thread_(false), connected_(false), conn_mgr_hookup_(NULL), | 185 talk_mediator_hookup_(NULL), |
137 p2p_authenticated_(false), p2p_subscribed_(false), | 186 command_channel_(command_channel), |
138 allstatus_(all_status), talk_mediator_hookup_(NULL), | 187 directory_manager_hookup_(NULL), |
139 command_channel_(command_channel), directory_manager_hookup_(NULL), | 188 syncer_events_(NULL), |
140 model_safe_worker_(model_safe_worker), | 189 model_safe_worker_(model_safe_worker), |
141 client_command_hookup_(NULL), disable_idle_detection_(false) { | 190 disable_idle_detection_(false) { |
142 | 191 |
143 SyncerEvent shutdown = { SyncerEvent::SHUTDOWN_USE_WITH_CARE }; | 192 SyncerEvent shutdown = { SyncerEvent::SHUTDOWN_USE_WITH_CARE }; |
144 syncer_event_channel_.reset(new SyncerEventChannel(shutdown)); | 193 syncer_event_channel_.reset(new SyncerEventChannel(shutdown)); |
145 | 194 |
146 if (dirman_) { | 195 if (dirman_) { |
147 directory_manager_hookup_.reset(NewEventListenerHookup( | 196 directory_manager_hookup_.reset(NewEventListenerHookup( |
148 dirman_->channel(), this, &SyncerThread::HandleDirectoryManagerEvent)); | 197 dirman_->channel(), this, &SyncerThread::HandleDirectoryManagerEvent)); |
149 } | 198 } |
150 | 199 |
151 if (scm_) { | 200 if (scm_) { |
152 WatchConnectionManager(scm_); | 201 WatchConnectionManager(scm_); |
153 } | 202 } |
154 | 203 |
155 if (command_channel_) { | 204 if (command_channel_) { |
156 WatchClientCommands(command_channel_); | 205 WatchClientCommands(command_channel_); |
157 } | 206 } |
158 } | 207 } |
159 | 208 |
160 SyncerThread::~SyncerThread() { | 209 SyncerThread::~SyncerThread() { |
161 client_command_hookup_.reset(); | 210 client_command_hookup_.reset(); |
162 conn_mgr_hookup_.reset(); | 211 conn_mgr_hookup_.reset(); |
163 syncer_event_channel_.reset(); | 212 syncer_event_channel_.reset(); |
164 directory_manager_hookup_.reset(); | 213 directory_manager_hookup_.reset(); |
165 syncer_events_.reset(); | 214 syncer_events_.reset(); |
166 delete syncer_; | 215 delete vault_.syncer_; |
167 talk_mediator_hookup_.reset(); | 216 talk_mediator_hookup_.reset(); |
168 CHECK(!thread_running_); | 217 CHECK(!thread_.IsRunning()); |
169 } | 218 } |
170 | 219 |
171 // Creates and starts a syncer thread. | 220 // Creates and starts a syncer thread. |
172 // Returns true if it creates a thread or if there's currently a thread running | 221 // Returns true if it creates a thread or if there's currently a thread running |
173 // and false otherwise. | 222 // and false otherwise. |
174 bool SyncerThread::Start() { | 223 bool SyncerThread::Start() { |
175 MutexLock lock(&mutex_); | 224 { |
176 if (thread_running_) { | 225 AutoLock lock(lock_); |
177 return true; | 226 if (thread_.IsRunning()) { |
| 227 return true; |
| 228 } |
| 229 |
| 230 if (!thread_.Start()) { |
| 231 return false; |
| 232 } |
178 } | 233 } |
179 thread_running_ = | 234 |
180 (0 == pthread_create(&thread_, NULL, RunSyncerThread, this)); | 235 thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod(this, |
181 if (thread_running_) { | 236 &SyncerThread::ThreadMain)); |
182 pthread_detach(thread_); | 237 |
183 } | 238 // Wait for notification that our task makes it safely onto the message |
184 return thread_running_; | 239 // loop before returning, so the caller can't call Stop before we're |
| 240 // actually up and running. This is for consistency with the old pthread |
| 241 // impl because pthread_create would do this in one step. |
| 242 thread_main_started_.Wait(); |
| 243 LOG(INFO) << "SyncerThread started."; |
| 244 return true; |
185 } | 245 } |
186 | 246 |
187 // Stop processing. A max wait of at least 2*server RTT time is recommended. | 247 // Stop processing. A max wait of at least 2*server RTT time is recommended. |
188 // Returns true if we stopped, false otherwise. | 248 // Returns true if we stopped, false otherwise. |
189 bool SyncerThread::Stop(int max_wait) { | 249 bool SyncerThread::Stop(int max_wait) { |
190 MutexLock lock(&mutex_); | 250 { |
191 if (!thread_running_) | 251 AutoLock lock(lock_); |
192 return true; | 252 // If the thread has been started, then we either already have or are about |
193 stop_syncer_thread_ = true; | 253 // to enter ThreadMainLoop so we have to proceed with shutdown and wait for |
194 if (NULL != syncer_) { | 254 // it to finish. If the thread has not been started --and we now own the |
195 // Try to early exit the syncer. | 255 // lock-- then we can early out because the caller has not called Start(). |
196 syncer_->RequestEarlyExit(); | 256 if (!thread_.IsRunning()) |
| 257 return true; |
| 258 |
| 259 LOG(INFO) << "SyncerThread::Stop - setting ThreadMain exit condition to " |
| 260 << "true (vault_.stop_syncer_thread_)"; |
| 261 // Exit the ThreadMainLoop once the syncer finishes (we tell it to exit |
| 262 // below). |
| 263 vault_.stop_syncer_thread_ = true; |
| 264 if (NULL != vault_.syncer_) { |
| 265 // Try to early exit the syncer itself, which could be looping inside |
| 266 // SyncShare. |
| 267 vault_.syncer_->RequestEarlyExit(); |
| 268 } |
| 269 |
| 270 // stop_syncer_thread_ is now true and the Syncer has been told to exit. |
| 271 // We want to wake up all waiters so they can re-examine state. We signal, |
| 272 // causing all waiters to try to re-acquire the lock, and then we release |
| 273 // the lock, and join on our internal thread which should soon run off the |
| 274 // end of ThreadMain. |
| 275 vault_field_changed_.Broadcast(); |
197 } | 276 } |
198 pthread_cond_broadcast(&changed_.condvar_); | 277 |
199 timespec deadline = { time(NULL) + (max_wait / 1000), 0 }; | 278 // This will join, and finish when ThreadMain terminates. |
200 do { | 279 thread_.Stop(); |
201 const int wait_result = max_wait < 0 ? | |
202 pthread_cond_wait(&changed_.condvar_, &mutex_.mutex_) : | |
203 pthread_cond_timedwait(&changed_.condvar_, &mutex_.mutex_, | |
204 &deadline); | |
205 if (ETIMEDOUT == wait_result) { | |
206 LOG(ERROR) << "SyncerThread::Stop timed out. Problems likely."; | |
207 return false; | |
208 } | |
209 } while (thread_running_); | |
210 return true; | 280 return true; |
211 } | 281 } |
212 | 282 |
213 void SyncerThread::WatchClientCommands(ClientCommandChannel* channel) { | 283 void SyncerThread::WatchClientCommands(ClientCommandChannel* channel) { |
214 PThreadScopedLock<PThreadMutex> lock(&mutex_); | 284 AutoLock lock(lock_); |
215 client_command_hookup_.reset(NewEventListenerHookup(channel, this, | 285 client_command_hookup_.reset(NewEventListenerHookup(channel, this, |
216 &SyncerThread::HandleClientCommand)); | 286 &SyncerThread::HandleClientCommand)); |
217 } | 287 } |
218 | 288 |
219 void SyncerThread::HandleClientCommand(ClientCommandChannel::EventType event) { | 289 void SyncerThread::HandleClientCommand(ClientCommandChannel::EventType event) { |
220 if (!event) { | 290 if (!event) { |
221 return; | 291 return; |
222 } | 292 } |
223 | 293 |
224 // Mutex not really necessary for these. | 294 // Mutex not really necessary for these. |
225 if (event->has_set_sync_poll_interval()) { | 295 if (event->has_set_sync_poll_interval()) { |
226 syncer_short_poll_interval_seconds_ = event->set_sync_poll_interval(); | 296 syncer_short_poll_interval_seconds_ = event->set_sync_poll_interval(); |
227 } | 297 } |
228 | 298 |
229 if (event->has_set_sync_long_poll_interval()) { | 299 if (event->has_set_sync_long_poll_interval()) { |
230 syncer_long_poll_interval_seconds_ = event->set_sync_long_poll_interval(); | 300 syncer_long_poll_interval_seconds_ = event->set_sync_long_poll_interval(); |
231 } | 301 } |
232 } | 302 } |
233 | 303 |
234 void SyncerThread::ThreadMainLoop() { | 304 void SyncerThread::ThreadMainLoop() { |
| 305 // This is called with lock_ acquired. |
| 306 lock_.AssertAcquired(); |
| 307 LOG(INFO) << "In thread main loop."; |
| 308 |
235 // Use the short poll value by default. | 309 // Use the short poll value by default. |
236 int poll_seconds = syncer_short_poll_interval_seconds_; | 310 TimeDelta poll_seconds = |
| 311 TimeDelta::FromSeconds(syncer_short_poll_interval_seconds_); |
237 int user_idle_milliseconds = 0; | 312 int user_idle_milliseconds = 0; |
238 timespec last_sync_time = { 0 }; | 313 TimeTicks last_sync_time; |
239 bool initial_sync_for_thread = true; | 314 bool initial_sync_for_thread = true; |
240 bool continue_sync_cycle = false; | 315 bool continue_sync_cycle = false; |
241 | 316 |
242 while (!stop_syncer_thread_) { | 317 while (!vault_.stop_syncer_thread_) { |
243 if (!connected_) { | 318 // The Wait()s in these conditionals using |vault_| are not TimedWait()s (as |
| 319 // below) because we cannot poll until these conditions are met, so we wait |
| 320 // indefinitely. |
| 321 if (!vault_.connected_) { |
244 LOG(INFO) << "Syncer thread waiting for connection."; | 322 LOG(INFO) << "Syncer thread waiting for connection."; |
245 while (!connected_ && !stop_syncer_thread_) | 323 while (!vault_.connected_ && !vault_.stop_syncer_thread_) |
246 pthread_cond_wait(&changed_.condvar_, &mutex_.mutex_); | 324 vault_field_changed_.Wait(); |
247 LOG_IF(INFO, connected_) << "Syncer thread found connection."; | 325 LOG_IF(INFO, vault_.connected_) << "Syncer thread found connection."; |
248 continue; | 326 continue; |
249 } | 327 } |
250 | 328 |
251 if (syncer_ == NULL) { | 329 if (vault_.syncer_ == NULL) { |
252 LOG(INFO) << "Syncer thread waiting for database initialization."; | 330 LOG(INFO) << "Syncer thread waiting for database initialization."; |
253 while (syncer_ == NULL && !stop_syncer_thread_) | 331 while (vault_.syncer_ == NULL && !vault_.stop_syncer_thread_) |
254 pthread_cond_wait(&changed_.condvar_, &mutex_.mutex_); | 332 vault_field_changed_.Wait(); |
255 LOG_IF(INFO, !(syncer_ == NULL)) << "Syncer was found after DB started."; | 333 LOG_IF(INFO, !(vault_.syncer_ == NULL)) |
| 334 << "Syncer was found after DB started."; |
256 continue; | 335 continue; |
257 } | 336 } |
258 | 337 |
259 timespec const next_poll = { last_sync_time.tv_sec + poll_seconds, | 338 const TimeTicks next_poll = last_sync_time + poll_seconds; |
260 last_sync_time.tv_nsec }; | 339 const TimeTicks end_wait = |
261 const timespec wake_time = | 340 !vault_.nudge_queue_.empty() && |
262 !nudge_queue_.empty() && nudge_queue_.top().first < next_poll ? | 341 vault_.nudge_queue_.top().first < next_poll ? |
263 nudge_queue_.top().first : next_poll; | 342 vault_.nudge_queue_.top().first : next_poll; |
264 LOG(INFO) << "wake time is " << wake_time.tv_sec; | 343 LOG(INFO) << "end_wait is " << end_wait.ToInternalValue(); |
265 LOG(INFO) << "next poll is " << next_poll.tv_sec; | 344 LOG(INFO) << "next_poll is " << next_poll.ToInternalValue(); |
266 | 345 |
267 const int error = pthread_cond_timedwait(&changed_.condvar_, &mutex_.mutex_, | 346 // We block until the CV is signaled (e.g a control field changed, loss of |
268 &wake_time); | 347 // network connection, nudge, spurious, etc), or the poll interval elapses. |
269 if (ETIMEDOUT != error) { | 348 TimeDelta sleep_time = end_wait - TimeTicks::Now(); |
270 continue; // Check all the conditions again. | 349 if (sleep_time > TimeDelta::FromSeconds(0)) { |
| 350 vault_field_changed_.TimedWait(sleep_time); |
| 351 |
| 352 if (TimeTicks::Now() < end_wait) { |
| 353 // Didn't timeout. Could be a spurious signal, or a signal corresponding |
| 354 // to an actual change in one of our control fields. By continuing here |
| 355 // we perform the typical "always recheck conditions when signaled", |
| 356 // (typically handled by a while(condition_not_met) cv.wait() construct) |
| 357 // because we jump to the top of the loop. The main difference is we |
| 358 // recalculate the wait interval, but last_sync_time won't have changed. |
| 359 // So if we were signaled by a nudge (for ex.) we'll grab the new nudge |
| 360 // off the queue and wait for that delta. If it was a spurious signal, |
| 361 // we'll keep waiting for the same moment in time as we just were. |
| 362 continue; |
| 363 } |
271 } | 364 } |
272 | 365 |
273 const timespec now = GetPThreadAbsoluteTime(0); | |
274 | |
275 // Handle a nudge, caused by either a notification or a local bookmark | 366 // Handle a nudge, caused by either a notification or a local bookmark |
276 // event. This will also update the source of the following SyncMain call. | 367 // event. This will also update the source of the following SyncMain call. |
277 UpdateNudgeSource(now, &continue_sync_cycle, &initial_sync_for_thread); | 368 UpdateNudgeSource(&continue_sync_cycle, &initial_sync_for_thread); |
278 | 369 |
279 LOG(INFO) << "Calling Sync Main at time " << now.tv_sec; | 370 LOG(INFO) << "Calling Sync Main at time " << Time::Now().ToInternalValue(); |
280 SyncMain(syncer_); | 371 SyncMain(vault_.syncer_); |
281 last_sync_time = now; | 372 last_sync_time = TimeTicks::Now(); |
282 | 373 |
283 LOG(INFO) << "Updating the next polling time after SyncMain"; | 374 LOG(INFO) << "Updating the next polling time after SyncMain"; |
284 poll_seconds = CalculatePollingWaitTime(allstatus_->status(), | 375 poll_seconds = TimeDelta::FromSeconds(CalculatePollingWaitTime( |
285 poll_seconds, | 376 allstatus_->status(), static_cast<int>(poll_seconds.InSeconds()), |
286 &user_idle_milliseconds, | 377 &user_idle_milliseconds, &continue_sync_cycle)); |
287 &continue_sync_cycle); | |
288 } | 378 } |
| 379 |
289 } | 380 } |
290 | 381 |
291 // We check how long the user's been idle and sync less often if the machine is | 382 // We check how long the user's been idle and sync less often if the machine is |
292 // not in use. The aim is to reduce server load. | 383 // not in use. The aim is to reduce server load. |
| 384 // TODO(timsteele): Should use Time(Delta). |
293 int SyncerThread::CalculatePollingWaitTime( | 385 int SyncerThread::CalculatePollingWaitTime( |
294 const AllStatus::Status& status, | 386 const AllStatus::Status& status, |
295 int last_poll_wait, // in s | 387 int last_poll_wait, // Time in seconds. |
296 int* user_idle_milliseconds, | 388 int* user_idle_milliseconds, |
297 bool* continue_sync_cycle) { | 389 bool* continue_sync_cycle) { |
298 bool is_continuing_sync_cyle = *continue_sync_cycle; | 390 bool is_continuing_sync_cyle = *continue_sync_cycle; |
299 *continue_sync_cycle = false; | 391 *continue_sync_cycle = false; |
300 | 392 |
301 // Determine if the syncer has unfinished work to do from allstatus_. | 393 // Determine if the syncer has unfinished work to do from allstatus_. |
302 const bool syncer_has_work_to_do = | 394 const bool syncer_has_work_to_do = |
303 status.updates_available > status.updates_received | 395 status.updates_available > status.updates_received |
304 || status.unsynced_count > 0; | 396 || status.unsynced_count > 0; |
305 LOG(INFO) << "syncer_has_work_to_do is " << syncer_has_work_to_do; | 397 LOG(INFO) << "syncer_has_work_to_do is " << syncer_has_work_to_do; |
(...skipping 30 matching lines...) Expand all Loading... |
336 *user_idle_milliseconds) / 1000; | 428 *user_idle_milliseconds) / 1000; |
337 DCHECK_GE(actual_next_wait, default_next_wait); | 429 DCHECK_GE(actual_next_wait, default_next_wait); |
338 } | 430 } |
339 | 431 |
340 LOG(INFO) << "Sync wait: idle " << default_next_wait | 432 LOG(INFO) << "Sync wait: idle " << default_next_wait |
341 << " non-idle or backoff " << actual_next_wait << "."; | 433 << " non-idle or backoff " << actual_next_wait << "."; |
342 | 434 |
343 return actual_next_wait; | 435 return actual_next_wait; |
344 } | 436 } |
345 | 437 |
346 void* SyncerThread::ThreadMain() { | 438 void SyncerThread::ThreadMain() { |
347 NameCurrentThreadForDebugging("SyncEngine_SyncerThread"); | 439 AutoLock lock(lock_); |
348 mutex_.Lock(); | 440 // Signal Start() to let it know we've made it safely onto the message loop, |
| 441 // and unblock it's caller. |
| 442 thread_main_started_.Signal(); |
349 ThreadMainLoop(); | 443 ThreadMainLoop(); |
350 thread_running_ = false; | 444 LOG(INFO) << "Syncer thread ThreadMain is done."; |
351 pthread_cond_broadcast(&changed_.condvar_); | |
352 mutex_.Unlock(); | |
353 LOG(INFO) << "Syncer thread exiting."; | |
354 return 0; | |
355 } | 445 } |
356 | 446 |
357 void SyncerThread::SyncMain(Syncer* syncer) { | 447 void SyncerThread::SyncMain(Syncer* syncer) { |
358 CHECK(syncer); | 448 CHECK(syncer); |
359 mutex_.Unlock(); | 449 AutoUnlock unlock(lock_); |
360 while (syncer->SyncShare()) { | 450 while (syncer->SyncShare()) { |
361 LOG(INFO) << "Looping in sync share"; | 451 LOG(INFO) << "Looping in sync share"; |
362 } | 452 } |
363 LOG(INFO) << "Done looping in sync share"; | 453 LOG(INFO) << "Done looping in sync share"; |
364 | |
365 mutex_.Lock(); | |
366 } | 454 } |
367 | 455 |
368 void SyncerThread::UpdateNudgeSource(const timespec& now, | 456 void SyncerThread::UpdateNudgeSource(bool* continue_sync_cycle, |
369 bool* continue_sync_cycle, | |
370 bool* initial_sync) { | 457 bool* initial_sync) { |
371 bool nudged = false; | 458 bool nudged = false; |
372 NudgeSource nudge_source = kUnknown; | 459 NudgeSource nudge_source = kUnknown; |
373 // Has the previous sync cycle completed? | 460 // Has the previous sync cycle completed? |
374 if (continue_sync_cycle) { | 461 if (continue_sync_cycle) { |
375 nudge_source = kContinuation; | 462 nudge_source = kContinuation; |
376 } | 463 } |
377 // Update the nudge source if a new nudge has come through during the | 464 // Update the nudge source if a new nudge has come through during the |
378 // previous sync cycle. | 465 // previous sync cycle. |
379 while (!nudge_queue_.empty() && !(now < nudge_queue_.top().first)) { | 466 while (!vault_.nudge_queue_.empty() && |
| 467 TimeTicks::Now() >= vault_.nudge_queue_.top().first) { |
380 if (!nudged) { | 468 if (!nudged) { |
381 nudge_source = nudge_queue_.top().second; | 469 nudge_source = vault_.nudge_queue_.top().second; |
382 *continue_sync_cycle = false; // Reset the continuation token on nudge. | 470 *continue_sync_cycle = false; // Reset the continuation token on nudge. |
383 nudged = true; | 471 nudged = true; |
384 } | 472 } |
385 nudge_queue_.pop(); | 473 vault_.nudge_queue_.pop(); |
386 } | 474 } |
387 SetUpdatesSource(nudged, nudge_source, initial_sync); | 475 SetUpdatesSource(nudged, nudge_source, initial_sync); |
388 } | 476 } |
389 | 477 |
390 void SyncerThread::SetUpdatesSource(bool nudged, NudgeSource nudge_source, | 478 void SyncerThread::SetUpdatesSource(bool nudged, NudgeSource nudge_source, |
391 bool* initial_sync) { | 479 bool* initial_sync) { |
392 sync_pb::GetUpdatesCallerInfo::GET_UPDATES_SOURCE updates_source = | 480 sync_pb::GetUpdatesCallerInfo::GET_UPDATES_SOURCE updates_source = |
393 sync_pb::GetUpdatesCallerInfo::UNKNOWN; | 481 sync_pb::GetUpdatesCallerInfo::UNKNOWN; |
394 if (*initial_sync) { | 482 if (*initial_sync) { |
395 updates_source = sync_pb::GetUpdatesCallerInfo::FIRST_UPDATE; | 483 updates_source = sync_pb::GetUpdatesCallerInfo::FIRST_UPDATE; |
(...skipping 10 matching lines...) Expand all Loading... |
406 break; | 494 break; |
407 case kContinuation: | 495 case kContinuation: |
408 updates_source = sync_pb::GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION; | 496 updates_source = sync_pb::GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION; |
409 break; | 497 break; |
410 case kUnknown: | 498 case kUnknown: |
411 default: | 499 default: |
412 updates_source = sync_pb::GetUpdatesCallerInfo::UNKNOWN; | 500 updates_source = sync_pb::GetUpdatesCallerInfo::UNKNOWN; |
413 break; | 501 break; |
414 } | 502 } |
415 } | 503 } |
416 syncer_->set_updates_source(updates_source); | 504 vault_.syncer_->set_updates_source(updates_source); |
417 } | 505 } |
418 | 506 |
419 void SyncerThread::HandleSyncerEvent(const SyncerEvent& event) { | 507 void SyncerThread::HandleSyncerEvent(const SyncerEvent& event) { |
420 MutexLock lock(&mutex_); | 508 AutoLock lock(lock_); |
421 channel()->NotifyListeners(event); | 509 channel()->NotifyListeners(event); |
422 if (SyncerEvent::REQUEST_SYNC_NUDGE != event.what_happened) { | 510 if (SyncerEvent::REQUEST_SYNC_NUDGE != event.what_happened) { |
423 return; | 511 return; |
424 } | 512 } |
425 NudgeSyncImpl(event.nudge_delay_milliseconds, kUnknown); | 513 NudgeSyncImpl(event.nudge_delay_milliseconds, kUnknown); |
426 } | 514 } |
427 | 515 |
428 void SyncerThread::HandleDirectoryManagerEvent( | 516 void SyncerThread::HandleDirectoryManagerEvent( |
429 const syncable::DirectoryManagerEvent& event) { | 517 const syncable::DirectoryManagerEvent& event) { |
430 LOG(INFO) << "Handling a directory manager event"; | 518 LOG(INFO) << "Handling a directory manager event"; |
431 if (syncable::DirectoryManagerEvent::OPENED == event.what_happened) { | 519 if (syncable::DirectoryManagerEvent::OPENED == event.what_happened) { |
432 MutexLock lock(&mutex_); | 520 AutoLock lock(lock_); |
433 LOG(INFO) << "Syncer starting up for: " << event.dirname; | 521 LOG(INFO) << "Syncer starting up for: " << event.dirname; |
434 // The underlying database structure is ready, and we should create | 522 // The underlying database structure is ready, and we should create |
435 // the syncer. | 523 // the syncer. |
436 CHECK(syncer_ == NULL); | 524 CHECK(vault_.syncer_ == NULL); |
437 syncer_ = | 525 vault_.syncer_ = |
438 new Syncer(dirman_, event.dirname, scm_, model_safe_worker_.get()); | 526 new Syncer(dirman_, event.dirname, scm_, model_safe_worker_.get()); |
439 | 527 |
440 syncer_->set_command_channel(command_channel_); | 528 vault_.syncer_->set_command_channel(command_channel_); |
441 syncer_events_.reset(NewEventListenerHookup( | 529 syncer_events_.reset(NewEventListenerHookup( |
442 syncer_->channel(), this, &SyncerThread::HandleSyncerEvent)); | 530 vault_.syncer_->channel(), this, &SyncerThread::HandleSyncerEvent)); |
443 pthread_cond_broadcast(&changed_.condvar_); | 531 vault_field_changed_.Broadcast(); |
444 } | 532 } |
445 } | 533 } |
446 | 534 |
447 static inline void CheckConnected(bool* connected, | 535 static inline void CheckConnected(bool* connected, |
448 HttpResponse::ServerConnectionCode code, | 536 HttpResponse::ServerConnectionCode code, |
449 pthread_cond_t* condvar) { | 537 ConditionVariable* condvar) { |
450 if (*connected) { | 538 if (*connected) { |
451 if (HttpResponse::CONNECTION_UNAVAILABLE == code) { | 539 if (HttpResponse::CONNECTION_UNAVAILABLE == code) { |
452 *connected = false; | 540 *connected = false; |
453 pthread_cond_broadcast(condvar); | 541 condvar->Broadcast(); |
454 } | 542 } |
455 } else { | 543 } else { |
456 if (HttpResponse::SERVER_CONNECTION_OK == code) { | 544 if (HttpResponse::SERVER_CONNECTION_OK == code) { |
457 *connected = true; | 545 *connected = true; |
458 pthread_cond_broadcast(condvar); | 546 condvar->Broadcast(); |
459 } | 547 } |
460 } | 548 } |
461 } | 549 } |
462 | 550 |
463 void SyncerThread::WatchConnectionManager(ServerConnectionManager* conn_mgr) { | 551 void SyncerThread::WatchConnectionManager(ServerConnectionManager* conn_mgr) { |
464 conn_mgr_hookup_.reset(NewEventListenerHookup(conn_mgr->channel(), this, | 552 conn_mgr_hookup_.reset(NewEventListenerHookup(conn_mgr->channel(), this, |
465 &SyncerThread::HandleServerConnectionEvent)); | 553 &SyncerThread::HandleServerConnectionEvent)); |
466 CheckConnected(&connected_, conn_mgr->server_status(), | 554 CheckConnected(&vault_.connected_, conn_mgr->server_status(), |
467 &changed_.condvar_); | 555 &vault_field_changed_); |
468 } | 556 } |
469 | 557 |
470 void SyncerThread::HandleServerConnectionEvent( | 558 void SyncerThread::HandleServerConnectionEvent( |
471 const ServerConnectionEvent& event) { | 559 const ServerConnectionEvent& event) { |
472 if (ServerConnectionEvent::STATUS_CHANGED == event.what_happened) { | 560 if (ServerConnectionEvent::STATUS_CHANGED == event.what_happened) { |
473 MutexLock lock(&mutex_); | 561 AutoLock lock(lock_); |
474 CheckConnected(&connected_, event.connection_code, | 562 CheckConnected(&vault_.connected_, event.connection_code, |
475 &changed_.condvar_); | 563 &vault_field_changed_); |
476 } | 564 } |
477 } | 565 } |
478 | 566 |
479 SyncerEventChannel* SyncerThread::channel() { | 567 SyncerEventChannel* SyncerThread::channel() { |
480 return syncer_event_channel_.get(); | 568 return syncer_event_channel_.get(); |
481 } | 569 } |
482 | 570 |
483 // Inputs and return value in milliseconds. | 571 // Inputs and return value in milliseconds. |
484 int SyncerThread::CalculateSyncWaitTime(int last_interval, int user_idle_ms) { | 572 int SyncerThread::CalculateSyncWaitTime(int last_interval, int user_idle_ms) { |
485 // syncer_polling_interval_ is in seconds | 573 // syncer_polling_interval_ is in seconds |
(...skipping 11 matching lines...) Expand all Loading... |
497 next_wait = std::min(AllStatus::GetRecommendedDelaySeconds( | 585 next_wait = std::min(AllStatus::GetRecommendedDelaySeconds( |
498 last_interval / 1000), syncer_max_interval_ / 1000) * 1000; | 586 last_interval / 1000), syncer_max_interval_ / 1000) * 1000; |
499 } | 587 } |
500 | 588 |
501 return next_wait; | 589 return next_wait; |
502 } | 590 } |
503 | 591 |
504 // Called with mutex_ already locked. | 592 // Called with mutex_ already locked. |
505 void SyncerThread::NudgeSyncImpl(int milliseconds_from_now, | 593 void SyncerThread::NudgeSyncImpl(int milliseconds_from_now, |
506 NudgeSource source) { | 594 NudgeSource source) { |
507 const timespec nudge_time = GetPThreadAbsoluteTime(milliseconds_from_now); | 595 const TimeTicks nudge_time = TimeTicks::Now() + |
| 596 TimeDelta::FromMilliseconds(milliseconds_from_now); |
508 NudgeObject nudge_object(nudge_time, source); | 597 NudgeObject nudge_object(nudge_time, source); |
509 nudge_queue_.push(nudge_object); | 598 vault_.nudge_queue_.push(nudge_object); |
510 pthread_cond_broadcast(&changed_.condvar_); | 599 vault_field_changed_.Broadcast(); |
511 } | 600 } |
512 | 601 |
513 void SyncerThread::WatchTalkMediator(TalkMediator* mediator) { | 602 void SyncerThread::WatchTalkMediator(TalkMediator* mediator) { |
514 talk_mediator_hookup_.reset( | 603 talk_mediator_hookup_.reset( |
515 NewEventListenerHookup( | 604 NewEventListenerHookup( |
516 mediator->channel(), | 605 mediator->channel(), |
517 this, | 606 this, |
518 &SyncerThread::HandleTalkMediatorEvent)); | 607 &SyncerThread::HandleTalkMediatorEvent)); |
519 } | 608 } |
520 | 609 |
521 void SyncerThread::HandleTalkMediatorEvent(const TalkMediatorEvent& event) { | 610 void SyncerThread::HandleTalkMediatorEvent(const TalkMediatorEvent& event) { |
522 MutexLock lock(&mutex_); | 611 AutoLock lock(lock_); |
523 switch (event.what_happened) { | 612 switch (event.what_happened) { |
524 case TalkMediatorEvent::LOGIN_SUCCEEDED: | 613 case TalkMediatorEvent::LOGIN_SUCCEEDED: |
525 LOG(INFO) << "P2P: Login succeeded."; | 614 LOG(INFO) << "P2P: Login succeeded."; |
526 p2p_authenticated_ = true; | 615 p2p_authenticated_ = true; |
527 break; | 616 break; |
528 case TalkMediatorEvent::LOGOUT_SUCCEEDED: | 617 case TalkMediatorEvent::LOGOUT_SUCCEEDED: |
529 LOG(INFO) << "P2P: Login succeeded."; | 618 LOG(INFO) << "P2P: Login succeeded."; |
530 p2p_authenticated_ = false; | 619 p2p_authenticated_ = false; |
531 break; | 620 break; |
532 case TalkMediatorEvent::SUBSCRIPTIONS_ON: | 621 case TalkMediatorEvent::SUBSCRIPTIONS_ON: |
533 LOG(INFO) << "P2P: Subscriptions successfully enabled."; | 622 LOG(INFO) << "P2P: Subscriptions successfully enabled."; |
534 p2p_subscribed_ = true; | 623 p2p_subscribed_ = true; |
535 if (NULL != syncer_) { | 624 if (NULL != vault_.syncer_) { |
536 LOG(INFO) << "Subscriptions on. Nudging syncer for initial push."; | 625 LOG(INFO) << "Subscriptions on. Nudging syncer for initial push."; |
537 NudgeSyncImpl(0, kLocal); | 626 NudgeSyncImpl(0, kLocal); |
538 } | 627 } |
539 break; | 628 break; |
540 case TalkMediatorEvent::SUBSCRIPTIONS_OFF: | 629 case TalkMediatorEvent::SUBSCRIPTIONS_OFF: |
541 LOG(INFO) << "P2P: Subscriptions are not enabled."; | 630 LOG(INFO) << "P2P: Subscriptions are not enabled."; |
542 p2p_subscribed_ = false; | 631 p2p_subscribed_ = false; |
543 break; | 632 break; |
544 case TalkMediatorEvent::NOTIFICATION_RECEIVED: | 633 case TalkMediatorEvent::NOTIFICATION_RECEIVED: |
545 LOG(INFO) << "P2P: Updates on server, pushing syncer"; | 634 LOG(INFO) << "P2P: Updates on server, pushing syncer"; |
546 if (NULL != syncer_) { | 635 if (NULL != vault_.syncer_) { |
547 NudgeSyncImpl(0, kNotification); | 636 NudgeSyncImpl(0, kNotification); |
548 } | 637 } |
549 break; | 638 break; |
550 default: | 639 default: |
551 break; | 640 break; |
552 } | 641 } |
553 | 642 |
554 if (NULL != syncer_) { | 643 if (NULL != vault_.syncer_) { |
555 syncer_->set_notifications_enabled(p2p_authenticated_ && p2p_subscribed_); | 644 vault_.syncer_->set_notifications_enabled( |
| 645 p2p_authenticated_ && p2p_subscribed_); |
556 } | 646 } |
557 } | 647 } |
558 | 648 |
559 } // namespace browser_sync | 649 } // namespace browser_sync |
OLD | NEW |