OLD | NEW |
1 // Copyright (c) 2009 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2009 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 #include "chrome/browser/sync/engine/syncer_thread_timed_stop.h" |
5 #include "chrome/browser/sync/engine/syncer_thread.h" | |
6 | 5 |
7 #include "build/build_config.h" | 6 #include "build/build_config.h" |
8 | 7 |
9 #ifdef OS_MACOSX | 8 #ifdef OS_MACOSX |
10 #include <CoreFoundation/CFNumber.h> | 9 #include <CoreFoundation/CFNumber.h> |
11 #include <IOKit/IOTypes.h> | 10 #include <IOKit/IOTypes.h> |
12 #include <IOKit/IOKitLib.h> | 11 #include <IOKit/IOKitLib.h> |
13 #endif | 12 #endif |
14 | 13 |
15 #include <algorithm> | 14 #include <algorithm> |
16 #include <map> | 15 #include <map> |
17 #include <queue> | 16 #include <queue> |
18 | 17 |
19 #include "chrome/browser/sync/engine/auth_watcher.h" | 18 #include "chrome/browser/sync/engine/auth_watcher.h" |
20 #include "chrome/browser/sync/engine/model_safe_worker.h" | 19 #include "chrome/browser/sync/engine/model_safe_worker.h" |
21 #include "chrome/browser/sync/engine/net/server_connection_manager.h" | 20 #include "chrome/browser/sync/engine/net/server_connection_manager.h" |
22 #include "chrome/browser/sync/engine/syncer.h" | 21 #include "chrome/browser/sync/engine/syncer.h" |
23 #include "chrome/browser/sync/notifier/listener/talk_mediator.h" | 22 #include "chrome/browser/sync/notifier/listener/talk_mediator.h" |
24 #include "chrome/browser/sync/notifier/listener/talk_mediator_impl.h" | 23 #include "chrome/browser/sync/notifier/listener/talk_mediator_impl.h" |
25 #include "chrome/browser/sync/syncable/directory_manager.h" | 24 #include "chrome/browser/sync/syncable/directory_manager.h" |
26 | 25 |
27 using std::priority_queue; | 26 using std::priority_queue; |
28 using std::min; | 27 using std::min; |
29 | 28 using base::Time; |
30 static inline bool operator < (const timespec& a, const timespec& b) { | 29 using base::TimeDelta; |
31 return a.tv_sec == b.tv_sec ? a.tv_nsec < b.tv_nsec : a.tv_sec < b.tv_sec; | 30 using base::TimeTicks; |
32 } | |
33 | |
34 namespace { | |
35 | |
36 // Returns the amount of time since the user last interacted with the computer, | |
37 // in milliseconds | |
38 int UserIdleTime() { | |
39 #ifdef OS_WIN | |
40 LASTINPUTINFO last_input_info; | |
41 last_input_info.cbSize = sizeof(LASTINPUTINFO); | |
42 | |
43 // Get time in windows ticks since system start of last activity. | |
44 BOOL b = ::GetLastInputInfo(&last_input_info); | |
45 if (b == TRUE) | |
46 return ::GetTickCount() - last_input_info.dwTime; | |
47 #elif defined(OS_MACOSX) | |
48 // It would be great to do something like: | |
49 // | |
50 // return 1000 * | |
51 // CGEventSourceSecondsSinceLastEventType( | |
52 // kCGEventSourceStateCombinedSessionState, | |
53 // kCGAnyInputEventType); | |
54 // | |
55 // Unfortunately, CGEvent* lives in ApplicationServices, and we're a daemon | |
56 // and can't link that high up the food chain. Thus this mucking in IOKit. | |
57 | |
58 io_service_t hid_service = | |
59 IOServiceGetMatchingService(kIOMasterPortDefault, | |
60 IOServiceMatching("IOHIDSystem")); | |
61 if (!hid_service) { | |
62 LOG(WARNING) << "Could not obtain IOHIDSystem"; | |
63 return 0; | |
64 } | |
65 | |
66 CFTypeRef object = IORegistryEntryCreateCFProperty(hid_service, | |
67 CFSTR("HIDIdleTime"), | |
68 kCFAllocatorDefault, | |
69 0); | |
70 if (!object) { | |
71 LOG(WARNING) << "Could not get IOHIDSystem's HIDIdleTime property"; | |
72 IOObjectRelease(hid_service); | |
73 return 0; | |
74 } | |
75 | |
76 int64 idle_time; // in nanoseconds | |
77 Boolean success; | |
78 if (CFGetTypeID(object) == CFNumberGetTypeID()) { | |
79 success = CFNumberGetValue((CFNumberRef)object, | |
80 kCFNumberSInt64Type, | |
81 &idle_time); | |
82 } else { | |
83 LOG(WARNING) << "IOHIDSystem's HIDIdleTime property isn't a number!"; | |
84 } | |
85 | |
86 CFRelease(object); | |
87 IOObjectRelease(hid_service); | |
88 | |
89 if (!success) { | |
90 LOG(WARNING) << "Could not get IOHIDSystem's HIDIdleTime property's value"; | |
91 return 0; | |
92 } else { | |
93 return idle_time / 1000000; // nano to milli | |
94 } | |
95 #else | |
96 static bool was_logged = false; | |
97 if (!was_logged) { | |
98 was_logged = true; | |
99 LOG(INFO) << "UserIdleTime unimplemented on this platform, " | |
100 "synchronization will not throttle when user idle"; | |
101 } | |
102 #endif | |
103 | |
104 return 0; | |
105 } | |
106 | |
107 } // namespace | |
108 | 31 |
109 namespace browser_sync { | 32 namespace browser_sync { |
110 | 33 |
111 bool SyncerThread::NudgeSyncer(int milliseconds_from_now, NudgeSource source) { | 34 SyncerThreadTimedStop::SyncerThreadTimedStop( |
112 MutexLock lock(&mutex_); | |
113 if (syncer_ == NULL) { | |
114 return false; | |
115 } | |
116 NudgeSyncImpl(milliseconds_from_now, source); | |
117 return true; | |
118 } | |
119 | |
120 void* RunSyncerThread(void* syncer_thread) { | |
121 return (reinterpret_cast<SyncerThread*>(syncer_thread))->ThreadMain(); | |
122 } | |
123 | |
124 SyncerThread::SyncerThread( | |
125 ClientCommandChannel* command_channel, | 35 ClientCommandChannel* command_channel, |
126 syncable::DirectoryManager* mgr, | 36 syncable::DirectoryManager* mgr, |
127 ServerConnectionManager* connection_manager, | 37 ServerConnectionManager* connection_manager, |
128 AllStatus* all_status, | 38 AllStatus* all_status, |
129 ModelSafeWorker* model_safe_worker) | 39 ModelSafeWorker* model_safe_worker) |
130 : dirman_(mgr), scm_(connection_manager), | 40 : SyncerThread(command_channel, mgr, connection_manager, all_status, |
131 syncer_(NULL), syncer_events_(NULL), thread_running_(false), | 41 model_safe_worker), |
132 syncer_short_poll_interval_seconds_(kDefaultShortPollIntervalSeconds), | 42 in_thread_main_loop_(false) { |
133 syncer_long_poll_interval_seconds_(kDefaultLongPollIntervalSeconds), | |
134 syncer_polling_interval_(kDefaultShortPollIntervalSeconds), | |
135 syncer_max_interval_(kDefaultMaxPollIntervalMs), | |
136 stop_syncer_thread_(false), connected_(false), conn_mgr_hookup_(NULL), | |
137 p2p_authenticated_(false), p2p_subscribed_(false), | |
138 allstatus_(all_status), talk_mediator_hookup_(NULL), | |
139 command_channel_(command_channel), directory_manager_hookup_(NULL), | |
140 model_safe_worker_(model_safe_worker), | |
141 client_command_hookup_(NULL), disable_idle_detection_(false) { | |
142 | |
143 SyncerEvent shutdown = { SyncerEvent::SHUTDOWN_USE_WITH_CARE }; | |
144 syncer_event_channel_.reset(new SyncerEventChannel(shutdown)); | |
145 | |
146 if (dirman_) { | |
147 directory_manager_hookup_.reset(NewEventListenerHookup( | |
148 dirman_->channel(), this, &SyncerThread::HandleDirectoryManagerEvent)); | |
149 } | |
150 | |
151 if (scm_) { | |
152 WatchConnectionManager(scm_); | |
153 } | |
154 | |
155 if (command_channel_) { | |
156 WatchClientCommands(command_channel_); | |
157 } | |
158 } | |
159 | |
160 SyncerThread::~SyncerThread() { | |
161 client_command_hookup_.reset(); | |
162 conn_mgr_hookup_.reset(); | |
163 syncer_event_channel_.reset(); | |
164 directory_manager_hookup_.reset(); | |
165 syncer_events_.reset(); | |
166 delete syncer_; | |
167 talk_mediator_hookup_.reset(); | |
168 CHECK(!thread_running_); | |
169 } | |
170 | |
171 // Creates and starts a syncer thread. | |
172 // Returns true if it creates a thread or if there's currently a thread running | |
173 // and false otherwise. | |
174 bool SyncerThread::Start() { | |
175 MutexLock lock(&mutex_); | |
176 if (thread_running_) { | |
177 return true; | |
178 } | |
179 thread_running_ = | |
180 (0 == pthread_create(&thread_, NULL, RunSyncerThread, this)); | |
181 if (thread_running_) { | |
182 pthread_detach(thread_); | |
183 } | |
184 return thread_running_; | |
185 } | 43 } |
186 | 44 |
187 // Stop processing. A max wait of at least 2*server RTT time is recommended. | 45 // Stop processing. A max wait of at least 2*server RTT time is recommended. |
188 // Returns true if we stopped, false otherwise. | 46 // Returns true if we stopped, false otherwise. |
189 bool SyncerThread::Stop(int max_wait) { | 47 bool SyncerThreadTimedStop::Stop(int max_wait) { |
190 MutexLock lock(&mutex_); | 48 AutoLock lock(lock_); |
191 if (!thread_running_) | 49 // If the thread has been started, then we either already have or are about to |
| 50 // enter ThreadMainLoop so we have to proceed with shutdown and wait for it to |
| 51 // finish. If the thread has not been started --and we now own the lock-- |
| 52 // then we can early out because the caller has not called Start(). |
| 53 if (!thread_.IsRunning()) |
192 return true; | 54 return true; |
193 stop_syncer_thread_ = true; | 55 |
194 if (NULL != syncer_) { | 56 LOG(INFO) << "SyncerThread::Stop - setting ThreadMain exit condition to " |
| 57 << "true (vault_.stop_syncer_thread_)"; |
| 58 // Exit the ThreadMainLoop once the syncer finishes (we tell it to exit |
| 59 // below). |
| 60 vault_.stop_syncer_thread_ = true; |
| 61 if (NULL != vault_.syncer_) { |
195 // Try to early exit the syncer. | 62 // Try to early exit the syncer. |
196 syncer_->RequestEarlyExit(); | 63 vault_.syncer_->RequestEarlyExit(); |
197 } | 64 } |
198 pthread_cond_broadcast(&changed_.condvar_); | 65 |
199 timespec deadline = { time(NULL) + (max_wait / 1000), 0 }; | 66 // stop_syncer_thread_ is now true and the Syncer has been told to exit. |
200 do { | 67 // We want to wake up all waiters so they can re-examine state. We signal, |
201 const int wait_result = max_wait < 0 ? | 68 // causing all waiters to try to re-acquire the lock, and then we atomically |
202 pthread_cond_wait(&changed_.condvar_, &mutex_.mutex_) : | 69 // release the lock and wait. Our wait can be spuriously signaled, so we |
203 pthread_cond_timedwait(&changed_.condvar_, &mutex_.mutex_, | 70 // recalculate the remaining sleep time each time through and re- |
204 &deadline); | 71 // check the condition before exiting the loop. |
205 if (ETIMEDOUT == wait_result) { | 72 vault_field_changed_.Broadcast(); |
206 LOG(ERROR) << "SyncerThread::Stop timed out. Problems likely."; | 73 TimeTicks start = TimeTicks::Now(); |
207 return false; | 74 TimeTicks end = start + TimeDelta::FromMilliseconds(max_wait); |
| 75 bool timed_out = false; |
| 76 // Eventually the combination of RequestEarlyExit and setting |
| 77 // stop_syncer_thread_ to true above will cause in_thread_main_loop_ to become |
| 78 // false. |
| 79 while (in_thread_main_loop_) { |
| 80 TimeDelta sleep_time = end - TimeTicks::Now(); |
| 81 if (sleep_time < TimeDelta::FromSeconds(0)) { |
| 82 timed_out = true; |
| 83 break; |
208 } | 84 } |
209 } while (thread_running_); | 85 LOG(INFO) << "Waiting in stop for " << sleep_time.InSeconds() << "s."; |
| 86 vault_field_changed_.TimedWait(sleep_time); |
| 87 } |
| 88 |
| 89 if (timed_out) { |
| 90 LOG(ERROR) << "SyncerThread::Stop timed out or error. Problems likely."; |
| 91 return false; |
| 92 } |
| 93 |
| 94 // Stop() should not block on anything at this point, given above madness. |
| 95 DLOG(INFO) << "Calling SyncerThread::thread_.Stop() at " |
| 96 << Time::Now().ToInternalValue(); |
| 97 thread_.Stop(); |
| 98 DLOG(INFO) << "SyncerThread::thread_.Stop() finished at " |
| 99 << Time::Now().ToInternalValue(); |
210 return true; | 100 return true; |
211 } | 101 } |
212 | 102 |
213 void SyncerThread::WatchClientCommands(ClientCommandChannel* channel) { | 103 void SyncerThreadTimedStop::ThreadMain() { |
214 PThreadScopedLock<PThreadMutex> lock(&mutex_); | 104 AutoLock lock(lock_); |
215 client_command_hookup_.reset(NewEventListenerHookup(channel, this, | 105 // Signal Start() to let it know we've made it safely are now running on the |
216 &SyncerThread::HandleClientCommand)); | 106 // message loop, and unblock it's caller. |
| 107 thread_main_started_.Signal(); |
| 108 |
| 109 // The only thing that could be waiting on this value is Stop, and we don't |
| 110 // release the lock until we're far enough along to Stop safely. |
| 111 in_thread_main_loop_ = true; |
| 112 vault_field_changed_.Broadcast(); |
| 113 ThreadMainLoop(); |
| 114 in_thread_main_loop_ = false; |
| 115 vault_field_changed_.Broadcast(); |
| 116 LOG(INFO) << "Syncer thread ThreadMain is done."; |
217 } | 117 } |
218 | 118 |
219 void SyncerThread::HandleClientCommand(ClientCommandChannel::EventType event) { | 119 } // namespace browser_sync |
220 if (!event) { | |
221 return; | |
222 } | |
223 | |
224 // Mutex not really necessary for these. | |
225 if (event->has_set_sync_poll_interval()) { | |
226 syncer_short_poll_interval_seconds_ = event->set_sync_poll_interval(); | |
227 } | |
228 | |
229 if (event->has_set_sync_long_poll_interval()) { | |
230 syncer_long_poll_interval_seconds_ = event->set_sync_long_poll_interval(); | |
231 } | |
232 } | |
233 | |
234 void SyncerThread::ThreadMainLoop() { | |
235 // Use the short poll value by default. | |
236 int poll_seconds = syncer_short_poll_interval_seconds_; | |
237 int user_idle_milliseconds = 0; | |
238 timespec last_sync_time = { 0 }; | |
239 bool initial_sync_for_thread = true; | |
240 bool continue_sync_cycle = false; | |
241 | |
242 while (!stop_syncer_thread_) { | |
243 if (!connected_) { | |
244 LOG(INFO) << "Syncer thread waiting for connection."; | |
245 while (!connected_ && !stop_syncer_thread_) | |
246 pthread_cond_wait(&changed_.condvar_, &mutex_.mutex_); | |
247 LOG_IF(INFO, connected_) << "Syncer thread found connection."; | |
248 continue; | |
249 } | |
250 | |
251 if (syncer_ == NULL) { | |
252 LOG(INFO) << "Syncer thread waiting for database initialization."; | |
253 while (syncer_ == NULL && !stop_syncer_thread_) | |
254 pthread_cond_wait(&changed_.condvar_, &mutex_.mutex_); | |
255 LOG_IF(INFO, !(syncer_ == NULL)) << "Syncer was found after DB started."; | |
256 continue; | |
257 } | |
258 | |
259 timespec const next_poll = { last_sync_time.tv_sec + poll_seconds, | |
260 last_sync_time.tv_nsec }; | |
261 const timespec wake_time = | |
262 !nudge_queue_.empty() && nudge_queue_.top().first < next_poll ? | |
263 nudge_queue_.top().first : next_poll; | |
264 LOG(INFO) << "wake time is " << wake_time.tv_sec; | |
265 LOG(INFO) << "next poll is " << next_poll.tv_sec; | |
266 | |
267 const int error = pthread_cond_timedwait(&changed_.condvar_, &mutex_.mutex_, | |
268 &wake_time); | |
269 if (ETIMEDOUT != error) { | |
270 continue; // Check all the conditions again. | |
271 } | |
272 | |
273 const timespec now = GetPThreadAbsoluteTime(0); | |
274 | |
275 // Handle a nudge, caused by either a notification or a local bookmark | |
276 // event. This will also update the source of the following SyncMain call. | |
277 UpdateNudgeSource(now, &continue_sync_cycle, &initial_sync_for_thread); | |
278 | |
279 LOG(INFO) << "Calling Sync Main at time " << now.tv_sec; | |
280 SyncMain(syncer_); | |
281 last_sync_time = now; | |
282 | |
283 LOG(INFO) << "Updating the next polling time after SyncMain"; | |
284 poll_seconds = CalculatePollingWaitTime(allstatus_->status(), | |
285 poll_seconds, | |
286 &user_idle_milliseconds, | |
287 &continue_sync_cycle); | |
288 } | |
289 } | |
290 | |
291 // We check how long the user's been idle and sync less often if the machine is | |
292 // not in use. The aim is to reduce server load. | |
293 int SyncerThread::CalculatePollingWaitTime( | |
294 const AllStatus::Status& status, | |
295 int last_poll_wait, // in s | |
296 int* user_idle_milliseconds, | |
297 bool* continue_sync_cycle) { | |
298 bool is_continuing_sync_cyle = *continue_sync_cycle; | |
299 *continue_sync_cycle = false; | |
300 | |
301 // Determine if the syncer has unfinished work to do from allstatus_. | |
302 const bool syncer_has_work_to_do = | |
303 status.updates_available > status.updates_received | |
304 || status.unsynced_count > 0; | |
305 LOG(INFO) << "syncer_has_work_to_do is " << syncer_has_work_to_do; | |
306 | |
307 // First calculate the expected wait time, figuring in any backoff because of | |
308 // user idle time. next_wait is in seconds | |
309 syncer_polling_interval_ = (!status.notifications_enabled) ? | |
310 syncer_short_poll_interval_seconds_ : | |
311 syncer_long_poll_interval_seconds_; | |
312 int default_next_wait = syncer_polling_interval_; | |
313 int actual_next_wait = default_next_wait; | |
314 | |
315 if (syncer_has_work_to_do) { | |
316 // Provide exponential backoff due to consecutive errors, else attempt to | |
317 // complete the work as soon as possible. | |
318 if (!is_continuing_sync_cyle) { | |
319 actual_next_wait = AllStatus::GetRecommendedDelaySeconds(0); | |
320 } else { | |
321 actual_next_wait = AllStatus::GetRecommendedDelaySeconds(last_poll_wait); | |
322 } | |
323 *continue_sync_cycle = true; | |
324 } else if (!status.notifications_enabled) { | |
325 // Ensure that we start exponential backoff from our base polling | |
326 // interval when we are not continuing a sync cycle. | |
327 last_poll_wait = std::max(last_poll_wait, syncer_polling_interval_); | |
328 | |
329 // Did the user start interacting with the computer again? | |
330 // If so, revise our idle time (and probably next_sync_time) downwards | |
331 int new_idle_time = disable_idle_detection_ ? 0 : UserIdleTime(); | |
332 if (new_idle_time < *user_idle_milliseconds) { | |
333 *user_idle_milliseconds = new_idle_time; | |
334 } | |
335 actual_next_wait = CalculateSyncWaitTime(last_poll_wait * 1000, | |
336 *user_idle_milliseconds) / 1000; | |
337 DCHECK_GE(actual_next_wait, default_next_wait); | |
338 } | |
339 | |
340 LOG(INFO) << "Sync wait: idle " << default_next_wait | |
341 << " non-idle or backoff " << actual_next_wait << "."; | |
342 | |
343 return actual_next_wait; | |
344 } | |
345 | |
346 void* SyncerThread::ThreadMain() { | |
347 NameCurrentThreadForDebugging("SyncEngine_SyncerThread"); | |
348 mutex_.Lock(); | |
349 ThreadMainLoop(); | |
350 thread_running_ = false; | |
351 pthread_cond_broadcast(&changed_.condvar_); | |
352 mutex_.Unlock(); | |
353 LOG(INFO) << "Syncer thread exiting."; | |
354 return 0; | |
355 } | |
356 | |
357 void SyncerThread::SyncMain(Syncer* syncer) { | |
358 CHECK(syncer); | |
359 mutex_.Unlock(); | |
360 while (syncer->SyncShare()) { | |
361 LOG(INFO) << "Looping in sync share"; | |
362 } | |
363 LOG(INFO) << "Done looping in sync share"; | |
364 | |
365 mutex_.Lock(); | |
366 } | |
367 | |
368 void SyncerThread::UpdateNudgeSource(const timespec& now, | |
369 bool* continue_sync_cycle, | |
370 bool* initial_sync) { | |
371 bool nudged = false; | |
372 NudgeSource nudge_source = kUnknown; | |
373 // Has the previous sync cycle completed? | |
374 if (continue_sync_cycle) { | |
375 nudge_source = kContinuation; | |
376 } | |
377 // Update the nudge source if a new nudge has come through during the | |
378 // previous sync cycle. | |
379 while (!nudge_queue_.empty() && !(now < nudge_queue_.top().first)) { | |
380 if (!nudged) { | |
381 nudge_source = nudge_queue_.top().second; | |
382 *continue_sync_cycle = false; // Reset the continuation token on nudge. | |
383 nudged = true; | |
384 } | |
385 nudge_queue_.pop(); | |
386 } | |
387 SetUpdatesSource(nudged, nudge_source, initial_sync); | |
388 } | |
389 | |
390 void SyncerThread::SetUpdatesSource(bool nudged, NudgeSource nudge_source, | |
391 bool* initial_sync) { | |
392 sync_pb::GetUpdatesCallerInfo::GET_UPDATES_SOURCE updates_source = | |
393 sync_pb::GetUpdatesCallerInfo::UNKNOWN; | |
394 if (*initial_sync) { | |
395 updates_source = sync_pb::GetUpdatesCallerInfo::FIRST_UPDATE; | |
396 *initial_sync = false; | |
397 } else if (!nudged) { | |
398 updates_source = sync_pb::GetUpdatesCallerInfo::PERIODIC; | |
399 } else { | |
400 switch (nudge_source) { | |
401 case kNotification: | |
402 updates_source = sync_pb::GetUpdatesCallerInfo::NOTIFICATION; | |
403 break; | |
404 case kLocal: | |
405 updates_source = sync_pb::GetUpdatesCallerInfo::LOCAL; | |
406 break; | |
407 case kContinuation: | |
408 updates_source = sync_pb::GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION; | |
409 break; | |
410 case kUnknown: | |
411 default: | |
412 updates_source = sync_pb::GetUpdatesCallerInfo::UNKNOWN; | |
413 break; | |
414 } | |
415 } | |
416 syncer_->set_updates_source(updates_source); | |
417 } | |
418 | |
419 void SyncerThread::HandleSyncerEvent(const SyncerEvent& event) { | |
420 MutexLock lock(&mutex_); | |
421 channel()->NotifyListeners(event); | |
422 if (SyncerEvent::REQUEST_SYNC_NUDGE != event.what_happened) { | |
423 return; | |
424 } | |
425 NudgeSyncImpl(event.nudge_delay_milliseconds, kUnknown); | |
426 } | |
427 | |
428 void SyncerThread::HandleDirectoryManagerEvent( | |
429 const syncable::DirectoryManagerEvent& event) { | |
430 LOG(INFO) << "Handling a directory manager event"; | |
431 if (syncable::DirectoryManagerEvent::OPENED == event.what_happened) { | |
432 MutexLock lock(&mutex_); | |
433 LOG(INFO) << "Syncer starting up for: " << event.dirname; | |
434 // The underlying database structure is ready, and we should create | |
435 // the syncer. | |
436 CHECK(syncer_ == NULL); | |
437 syncer_ = | |
438 new Syncer(dirman_, event.dirname, scm_, model_safe_worker_.get()); | |
439 | |
440 syncer_->set_command_channel(command_channel_); | |
441 syncer_events_.reset(NewEventListenerHookup( | |
442 syncer_->channel(), this, &SyncerThread::HandleSyncerEvent)); | |
443 pthread_cond_broadcast(&changed_.condvar_); | |
444 } | |
445 } | |
446 | |
447 static inline void CheckConnected(bool* connected, | |
448 HttpResponse::ServerConnectionCode code, | |
449 pthread_cond_t* condvar) { | |
450 if (*connected) { | |
451 if (HttpResponse::CONNECTION_UNAVAILABLE == code) { | |
452 *connected = false; | |
453 pthread_cond_broadcast(condvar); | |
454 } | |
455 } else { | |
456 if (HttpResponse::SERVER_CONNECTION_OK == code) { | |
457 *connected = true; | |
458 pthread_cond_broadcast(condvar); | |
459 } | |
460 } | |
461 } | |
462 | |
463 void SyncerThread::WatchConnectionManager(ServerConnectionManager* conn_mgr) { | |
464 conn_mgr_hookup_.reset(NewEventListenerHookup(conn_mgr->channel(), this, | |
465 &SyncerThread::HandleServerConnectionEvent)); | |
466 CheckConnected(&connected_, conn_mgr->server_status(), | |
467 &changed_.condvar_); | |
468 } | |
469 | |
470 void SyncerThread::HandleServerConnectionEvent( | |
471 const ServerConnectionEvent& event) { | |
472 if (ServerConnectionEvent::STATUS_CHANGED == event.what_happened) { | |
473 MutexLock lock(&mutex_); | |
474 CheckConnected(&connected_, event.connection_code, | |
475 &changed_.condvar_); | |
476 } | |
477 } | |
478 | |
479 SyncerEventChannel* SyncerThread::channel() { | |
480 return syncer_event_channel_.get(); | |
481 } | |
482 | |
483 // Inputs and return value in milliseconds. | |
484 int SyncerThread::CalculateSyncWaitTime(int last_interval, int user_idle_ms) { | |
485 // syncer_polling_interval_ is in seconds | |
486 int syncer_polling_interval_ms = syncer_polling_interval_ * 1000; | |
487 | |
488 // This is our default and lower bound. | |
489 int next_wait = syncer_polling_interval_ms; | |
490 | |
491 // Get idle time, bounded by max wait. | |
492 int idle = min(user_idle_ms, syncer_max_interval_); | |
493 | |
494 // If the user has been idle for a while, we'll start decreasing the poll | |
495 // rate. | |
496 if (idle >= kPollBackoffThresholdMultiplier * syncer_polling_interval_ms) { | |
497 next_wait = std::min(AllStatus::GetRecommendedDelaySeconds( | |
498 last_interval / 1000), syncer_max_interval_ / 1000) * 1000; | |
499 } | |
500 | |
501 return next_wait; | |
502 } | |
503 | |
504 // Called with mutex_ already locked. | |
505 void SyncerThread::NudgeSyncImpl(int milliseconds_from_now, | |
506 NudgeSource source) { | |
507 const timespec nudge_time = GetPThreadAbsoluteTime(milliseconds_from_now); | |
508 NudgeObject nudge_object(nudge_time, source); | |
509 nudge_queue_.push(nudge_object); | |
510 pthread_cond_broadcast(&changed_.condvar_); | |
511 } | |
512 | |
513 void SyncerThread::WatchTalkMediator(TalkMediator* mediator) { | |
514 talk_mediator_hookup_.reset( | |
515 NewEventListenerHookup( | |
516 mediator->channel(), | |
517 this, | |
518 &SyncerThread::HandleTalkMediatorEvent)); | |
519 } | |
520 | |
521 void SyncerThread::HandleTalkMediatorEvent(const TalkMediatorEvent& event) { | |
522 MutexLock lock(&mutex_); | |
523 switch (event.what_happened) { | |
524 case TalkMediatorEvent::LOGIN_SUCCEEDED: | |
525 LOG(INFO) << "P2P: Login succeeded."; | |
526 p2p_authenticated_ = true; | |
527 break; | |
528 case TalkMediatorEvent::LOGOUT_SUCCEEDED: | |
529 LOG(INFO) << "P2P: Login succeeded."; | |
530 p2p_authenticated_ = false; | |
531 break; | |
532 case TalkMediatorEvent::SUBSCRIPTIONS_ON: | |
533 LOG(INFO) << "P2P: Subscriptions successfully enabled."; | |
534 p2p_subscribed_ = true; | |
535 if (NULL != syncer_) { | |
536 LOG(INFO) << "Subscriptions on. Nudging syncer for initial push."; | |
537 NudgeSyncImpl(0, kLocal); | |
538 } | |
539 break; | |
540 case TalkMediatorEvent::SUBSCRIPTIONS_OFF: | |
541 LOG(INFO) << "P2P: Subscriptions are not enabled."; | |
542 p2p_subscribed_ = false; | |
543 break; | |
544 case TalkMediatorEvent::NOTIFICATION_RECEIVED: | |
545 LOG(INFO) << "P2P: Updates on server, pushing syncer"; | |
546 if (NULL != syncer_) { | |
547 NudgeSyncImpl(0, kNotification); | |
548 } | |
549 break; | |
550 default: | |
551 break; | |
552 } | |
553 | |
554 if (NULL != syncer_) { | |
555 syncer_->set_notifications_enabled(p2p_authenticated_ && p2p_subscribed_); | |
556 } | |
557 } | |
558 | |
559 } // namespace browser_sync | |
OLD | NEW |