OLD | NEW |
---|---|
(Empty) | |
1 // Copyright (c) 2011 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 // | |
5 // TODO(satorux): | |
6 // - Handle "disconnected" signal. | |
7 // - Add support for signal sending | |
8 // - Add support for signal monitoring | |
9 // - Collect metrics (ex. # of method calls, method call time, etc.) | |
10 | |
11 #include "dbus/bus.h" | |
12 | |
13 #include "base/bind.h" | |
14 #include "base/logging.h" | |
15 #include "base/message_loop.h" | |
16 #include "base/stl_util.h" | |
17 #include "base/threading/thread.h" | |
18 #include "base/threading/thread_restrictions.h" | |
19 #include "dbus/error.h" | |
20 #include "dbus/exported_object.h" | |
21 #include "dbus/object_proxy.h" | |
22 | |
23 namespace dbus { | |
24 | |
25 namespace { | |
26 | |
27 // The class is used for watching the file descriptor used for D-Bus | |
28 // communication. | |
29 class Watch : public base::MessagePumpLibevent::Watcher { | |
30 public: | |
31 Watch(DBusWatch* watch) | |
32 : raw_watch_(watch) { | |
33 dbus_watch_set_data(raw_watch_, this, NULL); | |
34 } | |
35 | |
36 ~Watch() { | |
37 dbus_watch_set_data(raw_watch_, NULL, NULL); | |
38 } | |
39 | |
40 // Returns true if the underlying file descriptor is ready to be watched. | |
41 bool IsReadyToBeWatched() { | |
42 return dbus_watch_get_enabled(raw_watch_); | |
43 } | |
44 | |
45 // Starts watching the underlying file descriptor. | |
46 void StartWatching() { | |
47 const int file_descriptor = dbus_watch_get_unix_fd(raw_watch_); | |
48 const int flags = dbus_watch_get_flags(raw_watch_); | |
49 | |
50 MessageLoopForIO::Mode mode = MessageLoopForIO::WATCH_READ; | |
51 if ((flags & DBUS_WATCH_READABLE) && (flags & DBUS_WATCH_WRITABLE)) | |
52 mode = MessageLoopForIO::WATCH_READ_WRITE; | |
53 else if (flags & DBUS_WATCH_READABLE) | |
54 mode = MessageLoopForIO::WATCH_READ; | |
55 else if (flags & DBUS_WATCH_WRITABLE) | |
56 mode = MessageLoopForIO::WATCH_WRITE; | |
57 else | |
58 NOTREACHED(); | |
59 | |
60 const bool persistent = true; // Watch persistently. | |
61 const bool success = MessageLoopForIO::current()->WatchFileDescriptor( | |
62 file_descriptor, | |
63 persistent, | |
64 mode, | |
65 &file_descriptor_watcher_, | |
66 this); | |
67 CHECK(success) << "Unable to allocate memory"; | |
68 } | |
69 | |
70 // Stops watching the underlying file descriptor. | |
71 void StopWatching() { | |
72 file_descriptor_watcher_.StopWatchingFileDescriptor(); | |
73 } | |
74 | |
75 private: | |
76 // Implement MessagePumpLibevent::Watcher. | |
77 virtual void OnFileCanReadWithoutBlocking(int file_descriptor) { | |
78 const bool success = dbus_watch_handle(raw_watch_, DBUS_WATCH_READABLE); | |
79 CHECK(success) << "Unable to allocate memory"; | |
80 } | |
81 | |
82 // Implement MessagePumpLibevent::Watcher. | |
83 virtual void OnFileCanWriteWithoutBlocking(int file_descriptor) { | |
84 const bool success = dbus_watch_handle(raw_watch_, DBUS_WATCH_WRITABLE); | |
85 CHECK(success) << "Unable to allocate memory"; | |
86 } | |
87 | |
88 DBusWatch* raw_watch_; | |
89 base::MessagePumpLibevent::FileDescriptorWatcher file_descriptor_watcher_; | |
90 }; | |
91 | |
92 // The class is used for monitoring the timeout used for D-Bus method | |
93 // calls. | |
94 // | |
95 // Unlike Watch, Timeout is a ref counted object, to ensure that |this| of | |
96 // the object is is alive when HandleTimeout() is called. It's unlikely | |
97 // but it may be possible that HandleTimeout() is called after | |
98 // Bus::OnRemoveTimeout(). Hence why we don't simply delete the object in | |
99 // Bus::OnRemoveTimeout(). | |
100 class Timeout : public base::RefCountedThreadSafe<Timeout> { | |
101 public: | |
102 Timeout(DBusTimeout* timeout) | |
103 : raw_timeout_(timeout), | |
104 monitoring_is_active_(false), | |
105 is_destroyed_(false) { | |
106 dbus_timeout_set_data(raw_timeout_, this, NULL); | |
107 } | |
108 | |
109 // Returns true if the timeout is ready to be monitored. | |
110 bool IsReadyToBeMonitored() { | |
111 return dbus_timeout_get_enabled(raw_timeout_); | |
112 } | |
113 | |
114 // Starts monitoring the timeout. | |
115 void StartMonitoring(dbus::Bus* bus) { | |
116 bus->PostDelayedTaskToDBusThread(FROM_HERE, | |
117 base::Bind(&Timeout::HandleTimeout, | |
118 this), | |
119 GetIntervalInMs()); | |
120 monitoring_is_active_ = true; | |
121 } | |
122 | |
123 // Stops monitoring the timeout. | |
124 void StopMonitoring() { | |
125 // We cannot take back the delayed task we posted in | |
126 // StartMonitoring(), so we just mark the monitoring is inactive now. | |
127 monitoring_is_active_ = false; | |
128 } | |
129 | |
130 // Returns the interval in milliseconds. | |
131 int GetIntervalInMs() { | |
132 return dbus_timeout_get_interval(raw_timeout_); | |
133 } | |
134 | |
135 // Cleans up the raw_timeout and marks that this object is destroyed. | |
136 // See the class comment above for why we are doing this. | |
137 void Destroy() { | |
138 dbus_timeout_set_data(raw_timeout_, NULL, NULL); | |
139 is_destroyed_ = true; | |
140 } | |
141 | |
142 private: | |
143 friend class base::RefCountedThreadSafe<Timeout>; | |
144 ~Timeout() { | |
145 } | |
146 | |
147 // Handles the timeout. | |
148 void HandleTimeout() { | |
149 // If the object is marked destroyed, we should do nothing. This can | |
150 // occur if this function is called after Bus::OnRemoveTimeout(). | |
151 if (is_destroyed_) | |
152 return; | |
153 // Skip if monitoring is cancled. | |
154 if (!monitoring_is_active_) | |
155 return; | |
156 | |
157 const bool success = dbus_timeout_handle(raw_timeout_); | |
158 CHECK(success) << "Unable to allocate memory"; | |
159 } | |
160 | |
161 DBusTimeout* raw_timeout_; | |
162 bool monitoring_is_active_; | |
163 bool is_destroyed_; | |
164 }; | |
165 | |
166 } // namespace | |
167 | |
168 Bus::Options::Options() | |
169 : bus_type(SESSION), | |
170 connection_type(PRIVATE), | |
171 dbus_thread(NULL) { | |
172 } | |
173 | |
174 Bus::Options::~Options() { | |
175 } | |
176 | |
177 Bus::Bus(const Options& options) | |
178 : bus_type_(options.bus_type), | |
179 connection_type_(options.connection_type), | |
180 dbus_thread_(options.dbus_thread), | |
181 connection_(NULL), | |
182 origin_loop_(MessageLoop::current()), | |
183 origin_thread_id_(base::PlatformThread::CurrentId()), | |
184 dbus_thread_id_(base::kInvalidThreadId), | |
185 async_operations_are_set_up_(false) { | |
186 if (dbus_thread_) { | |
187 dbus_thread_id_ = dbus_thread_->thread_id(); | |
188 DCHECK(dbus_thread_->IsRunning()) | |
189 << "The D-Bus thread should be running"; | |
190 DCHECK_EQ(MessageLoop::TYPE_IO, | |
191 dbus_thread_->message_loop()->type()) | |
192 << "The D-Bus thread should have an MessageLoopForIO attached"; | |
193 } | |
194 | |
195 // This is safe to call multiple times. | |
196 dbus_threads_init_default(); | |
197 } | |
198 | |
199 Bus::~Bus() { | |
200 DCHECK(!connection_); | |
201 DCHECK(owned_service_names_.empty()); | |
202 for (size_t i = 0; i < exported_objects_.size(); ++i) | |
203 exported_objects_[i]->Release(); | |
204 exported_objects_.clear(); | |
205 | |
206 for (size_t i = 0; i < object_proxies_.size(); ++i) | |
207 object_proxies_[i]->Release(); | |
208 object_proxies_.clear(); | |
209 } | |
210 | |
211 ObjectProxy* Bus::GetObjectProxy(const std::string& service_name, | |
212 const std::string& object_path) { | |
213 AssertOnOriginThread(); | |
214 | |
215 ObjectProxy* object_proxy = | |
216 new ObjectProxy(this, service_name, object_path); | |
217 object_proxy->AddRef(); | |
218 object_proxies_.push_back(object_proxy); | |
219 | |
220 return object_proxy; | |
221 } | |
222 | |
223 ExportedObject* Bus::GetExportedObject(const std::string& service_name, | |
224 const std::string& object_path) { | |
225 AssertOnOriginThread(); | |
226 | |
227 ExportedObject* exported_object = | |
228 new ExportedObject(this, service_name, object_path); | |
229 exported_object->AddRef(); | |
230 exported_objects_.push_back(exported_object); | |
231 | |
232 return exported_object; | |
233 } | |
234 | |
235 bool Bus::Connect() { | |
236 // dbus_bus_get_private() and dbus_bus_get() are blocking calls. | |
237 AssertOnDBusThread(); | |
238 | |
239 // Check if it's already initialized. | |
240 if (connection_) | |
241 return true; | |
242 | |
243 ScopedDBusError error; | |
244 const DBusBusType dbus_bus_type = static_cast<DBusBusType>(bus_type_); | |
245 if (connection_type_ == PRIVATE) { | |
246 connection_ = dbus_bus_get_private(dbus_bus_type, error.get()); | |
247 } else { | |
248 connection_ = dbus_bus_get(dbus_bus_type, error.get()); | |
249 } | |
250 if (!connection_) { | |
251 LOG(ERROR) << "Failed to connect to the bus: " | |
252 << (dbus_error_is_set(error.get()) ? error.message() : ""); | |
253 return false; | |
254 } | |
255 // We shouldn't exit on the disconnected signal. | |
256 dbus_connection_set_exit_on_disconnect(connection_, false); | |
257 | |
258 return true; | |
259 } | |
260 | |
261 void Bus::ShutdownAndBlock() { | |
262 AssertOnDBusThread(); | |
263 | |
264 // Delete exported objects. Need to unregister them beforehand. | |
265 for (size_t i = 0; i < exported_objects_.size(); ++i) { | |
266 exported_objects_[i]->Unregister(); | |
267 } | |
268 | |
269 // Release all service names. We should not use an iterator here, as | |
270 // we'll modify the set in ReleaseOwnership(). | |
271 while (!owned_service_names_.empty()) { | |
272 ReleaseOwnership(*owned_service_names_.begin()); | |
273 } | |
274 | |
275 // Private connection should be closed. | |
276 if (connection_ && connection_type_ == PRIVATE) { | |
277 dbus_connection_close(connection_); | |
278 } | |
279 // dbus_connection_close() won't unref. | |
280 dbus_connection_unref(connection_); | |
281 | |
282 connection_ = NULL; | |
283 } | |
284 | |
285 void Bus::Shutdown(OnShutdownCallback callback) { | |
286 AssertOnOriginThread(); | |
287 | |
288 PostTaskToDBusThread(FROM_HERE, base::Bind(&Bus::ShutdownInternal, | |
289 this, | |
290 callback)); | |
291 } | |
292 | |
293 bool Bus::RequestOwnership(const std::string& service_name) { | |
294 DCHECK(connection_); | |
295 // dbus_bus_request_name() is a blocking call. | |
296 AssertOnDBusThread(); | |
297 | |
298 // Check if we already own the service name. | |
299 if (owned_service_names_.find(service_name) != owned_service_names_.end()) { | |
300 return true; | |
301 } | |
302 | |
303 ScopedDBusError error; | |
304 const int result = dbus_bus_request_name(connection_, | |
305 service_name.c_str(), | |
306 DBUS_NAME_FLAG_DO_NOT_QUEUE, | |
307 error.get()); | |
308 if (result != DBUS_REQUEST_NAME_REPLY_PRIMARY_OWNER) { | |
309 LOG(ERROR) << "failed to get the onwership of " << service_name << ": " | |
310 << (dbus_error_is_set(error.get()) ? error.message() : ""); | |
311 return false; | |
312 } | |
313 owned_service_names_.insert(service_name); | |
314 return true; | |
315 } | |
316 | |
317 bool Bus::ReleaseOwnership(const std::string& service_name) { | |
318 DCHECK(connection_); | |
319 // dbus_bus_request_name() is a blocking call. | |
320 AssertOnDBusThread(); | |
321 | |
322 // Check if we already own the service name. | |
323 if (owned_service_names_.find(service_name) == owned_service_names_.end()) { | |
324 LOG(ERROR) << service_name << "is not owned by the bus"; | |
325 return false; | |
326 } | |
327 | |
328 ScopedDBusError error; | |
329 const int result = dbus_bus_release_name(connection_, service_name.c_str(), | |
330 error.get()); | |
331 if (result == DBUS_RELEASE_NAME_REPLY_RELEASED) { | |
332 owned_service_names_.erase(service_name); | |
333 return true; | |
334 } else { | |
335 LOG(ERROR) << "failed to release the onwership of " << service_name << ": " | |
336 << (error.is_set() ? error.message() : ""); | |
337 return false; | |
338 } | |
339 } | |
340 | |
341 bool Bus::SetUpAsyncOperations() { | |
342 DCHECK(connection_); | |
343 AssertOnDBusThread(); | |
344 | |
345 if (async_operations_are_set_up_) | |
346 return true; | |
347 | |
348 // Process all the incoming data if any, so that OnDispatchStatus() will | |
349 // be called when the incoming data is ready. | |
350 ProcessAllIncomingDataIfAny(); | |
351 | |
352 bool success = dbus_connection_set_watch_functions(connection_, | |
353 &Bus::OnAddWatchThunk, | |
354 &Bus::OnRemoveWatchThunk, | |
355 &Bus::OnToggleWatchThunk, | |
356 this, | |
357 NULL); | |
358 CHECK(success) << "Unable to allocate memory"; | |
359 | |
360 // TODO(satorux): Timeout is not yet implemented. | |
361 success = dbus_connection_set_timeout_functions(connection_, | |
362 &Bus::OnAddTimeoutThunk, | |
363 &Bus::OnRemoveTimeoutThunk, | |
364 &Bus::OnToggleTimeoutThunk, | |
365 this, | |
366 NULL); | |
367 CHECK(success) << "Unable to allocate memory"; | |
368 | |
369 dbus_connection_set_dispatch_status_function( | |
370 connection_, | |
371 &Bus::OnDispatchStatusChangedThunk, | |
372 this, | |
373 NULL); | |
374 | |
375 async_operations_are_set_up_ = true; | |
376 | |
377 return true; | |
378 } | |
379 | |
380 DBusMessage* Bus::SendWithReplyAndBlock(DBusMessage* request, | |
381 int timeout_ms, | |
382 DBusError* error) { | |
383 DCHECK(connection_); | |
384 AssertOnDBusThread(); | |
385 | |
386 return dbus_connection_send_with_reply_and_block( | |
387 connection_, request, timeout_ms, error); | |
388 } | |
389 | |
390 void Bus::SendWithReply(DBusMessage* request, | |
391 DBusPendingCall** pending_call, | |
392 int timeout_ms) { | |
393 DCHECK(connection_); | |
394 AssertOnDBusThread(); | |
395 | |
396 const bool success = dbus_connection_send_with_reply( | |
397 connection_, request, pending_call, timeout_ms); | |
398 CHECK(success) << "Unable to allocate memory"; | |
399 } | |
400 | |
401 bool Bus::TryRegisterObjectPath(const std::string& object_path, | |
402 const DBusObjectPathVTable* vtable, | |
403 void* user_data, | |
404 DBusError* error) { | |
405 DCHECK(connection_); | |
406 AssertOnDBusThread(); | |
407 | |
408 return dbus_connection_try_register_object_path( | |
409 connection_, | |
410 object_path.c_str(), | |
411 vtable, | |
412 user_data, | |
413 error); | |
414 } | |
415 | |
416 bool Bus::UnregisterObjectPath(const std::string& object_path) { | |
417 DCHECK(connection_); | |
418 AssertOnDBusThread(); | |
419 | |
420 return dbus_connection_unregister_object_path( | |
421 connection_, | |
422 object_path.c_str()); | |
stevenjb
2011/08/10 19:40:07
Since this will only fail on out of memory, and th
satorux1
2011/08/10 21:40:55
You are right. Done.
| |
423 } | |
424 | |
425 void Bus::ShutdownInternal(OnShutdownCallback callback) { | |
426 AssertOnDBusThread(); | |
427 | |
428 ShutdownAndBlock(); | |
429 PostTaskToOriginThread(FROM_HERE, callback); | |
430 } | |
431 | |
432 void Bus::ProcessAllIncomingDataIfAny() { | |
433 AssertOnDBusThread(); | |
434 | |
435 // As mentioned at the class comment in .h file, connection_ can be NULL. | |
436 if (!connection_ || !dbus_connection_get_is_connected(connection_)) | |
437 return; | |
438 | |
439 if (dbus_connection_get_dispatch_status(connection_) == | |
440 DBUS_DISPATCH_DATA_REMAINS) { | |
441 while (dbus_connection_dispatch(connection_) == | |
442 DBUS_DISPATCH_DATA_REMAINS); | |
443 } | |
444 } | |
445 | |
446 void Bus::PostTaskToOriginThread(const tracked_objects::Location& from_here, | |
447 const base::Closure& task) { | |
448 origin_loop_->PostTask(from_here, task); | |
449 } | |
450 | |
451 void Bus::PostTaskToDBusThread(const tracked_objects::Location& from_here, | |
452 const base::Closure& task) { | |
453 if (dbus_thread_) | |
454 dbus_thread_->message_loop()->PostTask(from_here, task); | |
455 else | |
456 origin_loop_->PostTask(from_here, task); | |
457 } | |
458 | |
459 void Bus::PostDelayedTaskToDBusThread( | |
460 const tracked_objects::Location& from_here, | |
461 const base::Closure& task, | |
462 int delay_ms) { | |
463 if (dbus_thread_) | |
464 dbus_thread_->message_loop()->PostDelayedTask(from_here, task, delay_ms); | |
465 else | |
466 origin_loop_->PostDelayedTask(from_here, task, delay_ms); | |
467 } | |
468 | |
469 void Bus::AssertOnOriginThread() { | |
470 DCHECK_EQ(origin_thread_id_, base::PlatformThread::CurrentId()); | |
471 } | |
472 | |
473 void Bus::AssertOnDBusThread() { | |
474 base::ThreadRestrictions::AssertIOAllowed(); | |
475 | |
476 if (dbus_thread_) { | |
477 DCHECK_EQ(dbus_thread_id_, base::PlatformThread::CurrentId()); | |
478 } else { | |
479 AssertOnOriginThread(); | |
480 } | |
481 } | |
482 | |
483 dbus_bool_t Bus::OnAddWatch(DBusWatch* raw_watch) { | |
484 AssertOnDBusThread(); | |
485 | |
486 // watch will be deleted when raw_watch is removed in OnRemoveWatch(). | |
487 Watch* watch = new Watch(raw_watch); | |
488 if (watch->IsReadyToBeWatched()) { | |
489 watch->StartWatching(); | |
490 } | |
491 return true; | |
492 } | |
493 | |
494 void Bus::OnRemoveWatch(DBusWatch* raw_watch) { | |
495 AssertOnDBusThread(); | |
496 | |
497 Watch* watch = static_cast<Watch*>(dbus_watch_get_data(raw_watch)); | |
498 delete watch; | |
499 } | |
500 | |
501 void Bus::OnToggleWatch(DBusWatch* raw_watch) { | |
502 AssertOnDBusThread(); | |
503 | |
504 Watch* watch = static_cast<Watch*>(dbus_watch_get_data(raw_watch)); | |
505 if (watch->IsReadyToBeWatched()) { | |
506 watch->StartWatching(); | |
507 } else { | |
508 // It's safe to call this if StartWatching() wasn't called, per | |
509 // message_pump_libevent.h. | |
510 watch->StopWatching(); | |
511 } | |
512 } | |
513 | |
514 dbus_bool_t Bus::OnAddTimeout(DBusTimeout* raw_timeout) { | |
515 AssertOnDBusThread(); | |
516 | |
517 // timeout will be deleted when raw_timeout is removed in | |
518 // OnRemoveTimeoutThunk(). | |
519 Timeout* timeout = new Timeout(raw_timeout); | |
520 timeout->AddRef(); // Balanced on OnRemoveTimeout(). | |
521 if (timeout->IsReadyToBeMonitored()) { | |
522 timeout->StartMonitoring(this); | |
523 } | |
524 return true; | |
525 } | |
526 | |
527 void Bus::OnRemoveTimeout(DBusTimeout* raw_timeout) { | |
528 AssertOnDBusThread(); | |
529 | |
530 Timeout* timeout = static_cast<Timeout*>(dbus_timeout_get_data(raw_timeout)); | |
531 timeout->Destroy(); | |
532 timeout->Release(); | |
533 } | |
534 | |
535 void Bus::OnToggleTimeout(DBusTimeout* raw_timeout) { | |
536 AssertOnDBusThread(); | |
537 | |
538 Timeout* timeout = static_cast<Timeout*>(dbus_timeout_get_data(raw_timeout)); | |
539 if (timeout->IsReadyToBeMonitored()) { | |
540 timeout->StartMonitoring(this); | |
541 } else { | |
542 timeout->StopMonitoring(); | |
543 } | |
544 } | |
545 | |
546 void Bus::OnDispatchStatusChanged(DBusConnection* connection, | |
547 DBusDispatchStatus status) { | |
548 DCHECK_EQ(connection, connection_); | |
549 AssertOnDBusThread(); | |
550 | |
551 if (!dbus_connection_get_is_connected(connection)) | |
552 return; | |
553 | |
554 // We cannot call ProcessAllIncomingDataIfAny() here, as calling | |
555 // dbus_connection_dispatch() inside DBusDispatchStatusFunction is | |
556 // prohibited by the D-Bus library. Hence, we post a task here instead. | |
557 // See comments for dbus_connection_set_dispatch_status_function(). | |
558 PostTaskToDBusThread(FROM_HERE, | |
559 base::Bind(&Bus::ProcessAllIncomingDataIfAny, | |
560 this)); | |
561 } | |
562 | |
563 dbus_bool_t Bus::OnAddWatchThunk(DBusWatch* raw_watch, void* data) { | |
564 Bus* self = static_cast<Bus*>(data); | |
565 return self->OnAddWatch(raw_watch); | |
566 } | |
567 | |
568 void Bus::OnRemoveWatchThunk(DBusWatch* raw_watch, void* data) { | |
569 Bus* self = static_cast<Bus*>(data); | |
570 return self->OnRemoveWatch(raw_watch); | |
571 } | |
572 | |
573 void Bus::OnToggleWatchThunk(DBusWatch* raw_watch, void* data) { | |
574 Bus* self = static_cast<Bus*>(data); | |
575 return self->OnToggleWatch(raw_watch); | |
576 } | |
577 | |
578 dbus_bool_t Bus::OnAddTimeoutThunk(DBusTimeout* raw_timeout, void* data) { | |
579 Bus* self = static_cast<Bus*>(data); | |
580 return self->OnAddTimeout(raw_timeout); | |
581 } | |
582 | |
583 void Bus::OnRemoveTimeoutThunk(DBusTimeout* raw_timeout, void* data) { | |
584 Bus* self = static_cast<Bus*>(data); | |
585 return self->OnRemoveTimeout(raw_timeout); | |
586 } | |
587 | |
588 void Bus::OnToggleTimeoutThunk(DBusTimeout* raw_timeout, void* data) { | |
589 Bus* self = static_cast<Bus*>(data); | |
590 return self->OnToggleTimeout(raw_timeout); | |
591 } | |
592 | |
593 void Bus::OnDispatchStatusChangedThunk(DBusConnection* connection, | |
594 DBusDispatchStatus status, | |
595 void* data) { | |
596 Bus* self = static_cast<Bus*>(data); | |
597 return self->OnDispatchStatusChanged(connection, status); | |
598 } | |
599 | |
600 } // namespace dbus | |
OLD | NEW |