Index: dbus/bus.cc |
diff --git a/dbus/bus.cc b/dbus/bus.cc |
new file mode 100644 |
index 0000000000000000000000000000000000000000..2e8fb4789f81cb3b9726520151a99afb742bbd2f |
--- /dev/null |
+++ b/dbus/bus.cc |
@@ -0,0 +1,613 @@ |
+// Copyright (c) 2011 The Chromium Authors. All rights reserved. |
+// Use of this source code is governed by a BSD-style license that can be |
+// found in the LICENSE file. |
+// |
+// TODO(satorux): |
+// - Handle "disconnected" signal. |
+// - Add support for signal sending |
+// - Add support for signal monitoring |
+// - Collect metrics (ex. # of method calls, method call time, etc.) |
+ |
+#include "dbus/bus.h" |
+ |
+#include "base/bind.h" |
+#include "base/logging.h" |
+#include "base/message_loop.h" |
+#include "base/stl_util.h" |
+#include "base/threading/thread.h" |
+#include "base/threading/thread_restrictions.h" |
+#include "dbus/exported_object.h" |
+#include "dbus/object_proxy.h" |
+#include "dbus/scoped_dbus_error.h" |
+ |
+namespace dbus { |
+ |
+namespace { |
+ |
+// The class is used for watching the file descriptor used for D-Bus |
+// communication. |
+class Watch : public base::MessagePumpLibevent::Watcher { |
+ public: |
+ Watch(DBusWatch* watch) |
+ : raw_watch_(watch) { |
+ dbus_watch_set_data(raw_watch_, this, NULL); |
+ } |
+ |
+ ~Watch() { |
+ dbus_watch_set_data(raw_watch_, NULL, NULL); |
+ } |
+ |
+ // Returns true if the underlying file descriptor is ready to be watched. |
+ bool IsReadyToBeWatched() { |
+ return dbus_watch_get_enabled(raw_watch_); |
+ } |
+ |
+ // Starts watching the underlying file descriptor. |
+ void StartWatching() { |
+ const int file_descriptor = dbus_watch_get_unix_fd(raw_watch_); |
+ const int flags = dbus_watch_get_flags(raw_watch_); |
+ |
+ MessageLoopForIO::Mode mode = MessageLoopForIO::WATCH_READ; |
+ if ((flags & DBUS_WATCH_READABLE) && (flags & DBUS_WATCH_WRITABLE)) |
+ mode = MessageLoopForIO::WATCH_READ_WRITE; |
+ else if (flags & DBUS_WATCH_READABLE) |
+ mode = MessageLoopForIO::WATCH_READ; |
+ else if (flags & DBUS_WATCH_WRITABLE) |
+ mode = MessageLoopForIO::WATCH_WRITE; |
+ else |
+ NOTREACHED(); |
+ |
+ const bool persistent = true; // Watch persistently. |
+ const bool success = MessageLoopForIO::current()->WatchFileDescriptor( |
+ file_descriptor, |
+ persistent, |
+ mode, |
+ &file_descriptor_watcher_, |
+ this); |
+ CHECK(success) << "Unable to allocate memory"; |
+ } |
+ |
+ // Stops watching the underlying file descriptor. |
+ void StopWatching() { |
+ file_descriptor_watcher_.StopWatchingFileDescriptor(); |
+ } |
+ |
+ private: |
+ // Implement MessagePumpLibevent::Watcher. |
+ virtual void OnFileCanReadWithoutBlocking(int file_descriptor) { |
+ const bool success = dbus_watch_handle(raw_watch_, DBUS_WATCH_READABLE); |
+ CHECK(success) << "Unable to allocate memory"; |
+ } |
+ |
+ // Implement MessagePumpLibevent::Watcher. |
+ virtual void OnFileCanWriteWithoutBlocking(int file_descriptor) { |
+ const bool success = dbus_watch_handle(raw_watch_, DBUS_WATCH_WRITABLE); |
+ CHECK(success) << "Unable to allocate memory"; |
+ } |
+ |
+ DBusWatch* raw_watch_; |
+ base::MessagePumpLibevent::FileDescriptorWatcher file_descriptor_watcher_; |
+}; |
+ |
+// The class is used for monitoring the timeout used for D-Bus method |
+// calls. |
+// |
+// Unlike Watch, Timeout is a ref counted object, to ensure that |this| of |
+// the object is is alive when HandleTimeout() is called. It's unlikely |
+// but it may be possible that HandleTimeout() is called after |
+// Bus::OnRemoveTimeout(). That's why we don't simply delete the object in |
+// Bus::OnRemoveTimeout(). |
+class Timeout : public base::RefCountedThreadSafe<Timeout> { |
+ public: |
+ Timeout(DBusTimeout* timeout) |
+ : raw_timeout_(timeout), |
+ monitoring_is_active_(false), |
+ is_completed(false) { |
+ dbus_timeout_set_data(raw_timeout_, this, NULL); |
+ AddRef(); // Balanced on Complete(). |
+ } |
+ |
+ // Returns true if the timeout is ready to be monitored. |
+ bool IsReadyToBeMonitored() { |
+ return dbus_timeout_get_enabled(raw_timeout_); |
+ } |
+ |
+ // Starts monitoring the timeout. |
+ void StartMonitoring(dbus::Bus* bus) { |
+ bus->PostDelayedTaskToDBusThread(FROM_HERE, |
+ base::Bind(&Timeout::HandleTimeout, |
+ this), |
+ GetIntervalInMs()); |
+ monitoring_is_active_ = true; |
+ } |
+ |
+ // Stops monitoring the timeout. |
+ void StopMonitoring() { |
+ // We cannot take back the delayed task we posted in |
+ // StartMonitoring(), so we just mark the monitoring is inactive now. |
+ monitoring_is_active_ = false; |
+ } |
+ |
+ // Returns the interval in milliseconds. |
+ int GetIntervalInMs() { |
+ return dbus_timeout_get_interval(raw_timeout_); |
+ } |
+ |
+ // Cleans up the raw_timeout and marks that timeout is completed. |
+ // See the class comment above for why we are doing this. |
+ void Complete() { |
+ dbus_timeout_set_data(raw_timeout_, NULL, NULL); |
+ is_completed = true; |
+ Release(); |
+ } |
+ |
+ private: |
+ friend class base::RefCountedThreadSafe<Timeout>; |
+ ~Timeout() { |
+ } |
+ |
+ // Handles the timeout. |
+ void HandleTimeout() { |
+ // If the timeout is marked completed, we should do nothing. This can |
+ // occur if this function is called after Bus::OnRemoveTimeout(). |
+ if (is_completed) |
+ return; |
+ // Skip if monitoring is cancled. |
+ if (!monitoring_is_active_) |
+ return; |
+ |
+ const bool success = dbus_timeout_handle(raw_timeout_); |
+ CHECK(success) << "Unable to allocate memory"; |
+ } |
+ |
+ DBusTimeout* raw_timeout_; |
+ bool monitoring_is_active_; |
+ bool is_completed; |
+}; |
+ |
+} // namespace |
+ |
+Bus::Options::Options() |
+ : bus_type(SESSION), |
+ connection_type(PRIVATE), |
+ dbus_thread(NULL) { |
+} |
+ |
+Bus::Options::~Options() { |
+} |
+ |
+Bus::Bus(const Options& options) |
+ : bus_type_(options.bus_type), |
+ connection_type_(options.connection_type), |
+ dbus_thread_(options.dbus_thread), |
+ connection_(NULL), |
+ origin_loop_(MessageLoop::current()), |
+ origin_thread_id_(base::PlatformThread::CurrentId()), |
+ dbus_thread_id_(base::kInvalidThreadId), |
+ async_operations_are_set_up_(false), |
+ num_pending_watches_(0), |
+ num_pending_timeouts_(0) { |
+ if (dbus_thread_) { |
+ dbus_thread_id_ = dbus_thread_->thread_id(); |
+ DCHECK(dbus_thread_->IsRunning()) |
+ << "The D-Bus thread should be running"; |
+ DCHECK_EQ(MessageLoop::TYPE_IO, |
+ dbus_thread_->message_loop()->type()) |
+ << "The D-Bus thread should have an MessageLoopForIO attached"; |
+ } |
+ |
+ // This is safe to call multiple times. |
+ dbus_threads_init_default(); |
+} |
+ |
+Bus::~Bus() { |
+ DCHECK(!connection_); |
+ DCHECK(owned_service_names_.empty()); |
+ DCHECK_EQ(0, num_pending_watches_); |
+ DCHECK_EQ(0, num_pending_timeouts_); |
+} |
+ |
+ObjectProxy* Bus::GetObjectProxy(const std::string& service_name, |
+ const std::string& object_path) { |
+ AssertOnOriginThread(); |
+ |
+ scoped_refptr<ObjectProxy> object_proxy = |
+ new ObjectProxy(this, service_name, object_path); |
+ object_proxies_.push_back(object_proxy); |
+ |
+ return object_proxy; |
+} |
+ |
+ExportedObject* Bus::GetExportedObject(const std::string& service_name, |
+ const std::string& object_path) { |
+ AssertOnOriginThread(); |
+ |
+ scoped_refptr<ExportedObject> exported_object = |
+ new ExportedObject(this, service_name, object_path); |
+ exported_objects_.push_back(exported_object); |
+ |
+ return exported_object; |
+} |
+ |
+bool Bus::Connect() { |
+ // dbus_bus_get_private() and dbus_bus_get() are blocking calls. |
+ AssertOnDBusThread(); |
+ |
+ // Check if it's already initialized. |
+ if (connection_) |
+ return true; |
+ |
+ ScopedDBusError error; |
+ const DBusBusType dbus_bus_type = static_cast<DBusBusType>(bus_type_); |
+ if (connection_type_ == PRIVATE) { |
+ connection_ = dbus_bus_get_private(dbus_bus_type, error.get()); |
+ } else { |
+ connection_ = dbus_bus_get(dbus_bus_type, error.get()); |
+ } |
+ if (!connection_) { |
+ LOG(ERROR) << "Failed to connect to the bus: " |
+ << (dbus_error_is_set(error.get()) ? error.message() : ""); |
+ return false; |
+ } |
+ // We shouldn't exit on the disconnected signal. |
+ dbus_connection_set_exit_on_disconnect(connection_, false); |
+ |
+ return true; |
+} |
+ |
+void Bus::ShutdownAndBlock() { |
+ AssertOnDBusThread(); |
+ |
+ // Unregister the exported objects. |
+ for (size_t i = 0; i < exported_objects_.size(); ++i) { |
+ exported_objects_[i]->Unregister(); |
+ } |
+ |
+ // Release all service names. |
+ for (std::set<std::string>::iterator iter = owned_service_names_.begin(); |
+ iter != owned_service_names_.end();) { |
+ // This is a bit tricky but we should increment the iter here as |
+ // ReleaseOwnership() may remove |service_name| from the set. |
+ const std::string& service_name = *iter++; |
+ ReleaseOwnership(service_name); |
+ } |
+ if (!owned_service_names_.empty()) { |
+ LOG(ERROR) << "Failed to release all service names. # of services left: " |
+ << owned_service_names_.size(); |
+ } |
+ |
+ // Private connection should be closed. |
+ if (connection_ && connection_type_ == PRIVATE) { |
+ dbus_connection_close(connection_); |
+ } |
+ // dbus_connection_close() won't unref. |
+ dbus_connection_unref(connection_); |
+ |
+ connection_ = NULL; |
+} |
+ |
+void Bus::Shutdown(OnShutdownCallback callback) { |
+ AssertOnOriginThread(); |
+ |
+ PostTaskToDBusThread(FROM_HERE, base::Bind(&Bus::ShutdownInternal, |
+ this, |
+ callback)); |
+} |
+ |
+bool Bus::RequestOwnership(const std::string& service_name) { |
+ DCHECK(connection_); |
+ // dbus_bus_request_name() is a blocking call. |
+ AssertOnDBusThread(); |
+ |
+ // Check if we already own the service name. |
+ if (owned_service_names_.find(service_name) != owned_service_names_.end()) { |
+ return true; |
+ } |
+ |
+ ScopedDBusError error; |
+ const int result = dbus_bus_request_name(connection_, |
+ service_name.c_str(), |
+ DBUS_NAME_FLAG_DO_NOT_QUEUE, |
+ error.get()); |
+ if (result != DBUS_REQUEST_NAME_REPLY_PRIMARY_OWNER) { |
+ LOG(ERROR) << "Failed to get the onwership of " << service_name << ": " |
+ << (dbus_error_is_set(error.get()) ? error.message() : ""); |
+ return false; |
+ } |
+ owned_service_names_.insert(service_name); |
+ return true; |
+} |
+ |
+bool Bus::ReleaseOwnership(const std::string& service_name) { |
+ DCHECK(connection_); |
+ // dbus_bus_request_name() is a blocking call. |
+ AssertOnDBusThread(); |
+ |
+ // Check if we already own the service name. |
+ std::set<std::string>::iterator found = |
+ owned_service_names_.find(service_name); |
+ if (found == owned_service_names_.end()) { |
+ LOG(ERROR) << service_name << " is not owned by the bus"; |
+ return false; |
+ } |
+ |
+ ScopedDBusError error; |
+ const int result = dbus_bus_release_name(connection_, service_name.c_str(), |
+ error.get()); |
+ if (result == DBUS_RELEASE_NAME_REPLY_RELEASED) { |
+ owned_service_names_.erase(found); |
+ return true; |
+ } else { |
+ LOG(ERROR) << "Failed to release the onwership of " << service_name << ": " |
+ << (error.is_set() ? error.message() : ""); |
+ return false; |
+ } |
+} |
+ |
+bool Bus::SetUpAsyncOperations() { |
+ DCHECK(connection_); |
+ AssertOnDBusThread(); |
+ |
+ if (async_operations_are_set_up_) |
+ return true; |
+ |
+ // Process all the incoming data if any, so that OnDispatchStatus() will |
+ // be called when the incoming data is ready. |
+ ProcessAllIncomingDataIfAny(); |
+ |
+ bool success = dbus_connection_set_watch_functions(connection_, |
+ &Bus::OnAddWatchThunk, |
+ &Bus::OnRemoveWatchThunk, |
+ &Bus::OnToggleWatchThunk, |
+ this, |
+ NULL); |
+ CHECK(success) << "Unable to allocate memory"; |
+ |
+ // TODO(satorux): Timeout is not yet implemented. |
+ success = dbus_connection_set_timeout_functions(connection_, |
+ &Bus::OnAddTimeoutThunk, |
+ &Bus::OnRemoveTimeoutThunk, |
+ &Bus::OnToggleTimeoutThunk, |
+ this, |
+ NULL); |
+ CHECK(success) << "Unable to allocate memory"; |
+ |
+ dbus_connection_set_dispatch_status_function( |
+ connection_, |
+ &Bus::OnDispatchStatusChangedThunk, |
+ this, |
+ NULL); |
+ |
+ async_operations_are_set_up_ = true; |
+ |
+ return true; |
+} |
+ |
+DBusMessage* Bus::SendWithReplyAndBlock(DBusMessage* request, |
+ int timeout_ms, |
+ DBusError* error) { |
+ DCHECK(connection_); |
+ AssertOnDBusThread(); |
+ |
+ return dbus_connection_send_with_reply_and_block( |
+ connection_, request, timeout_ms, error); |
+} |
+ |
+void Bus::SendWithReply(DBusMessage* request, |
+ DBusPendingCall** pending_call, |
+ int timeout_ms) { |
+ DCHECK(connection_); |
+ AssertOnDBusThread(); |
+ |
+ const bool success = dbus_connection_send_with_reply( |
+ connection_, request, pending_call, timeout_ms); |
+ CHECK(success) << "Unable to allocate memory"; |
+} |
+ |
+bool Bus::TryRegisterObjectPath(const std::string& object_path, |
+ const DBusObjectPathVTable* vtable, |
+ void* user_data, |
+ DBusError* error) { |
+ DCHECK(connection_); |
+ AssertOnDBusThread(); |
+ |
+ return dbus_connection_try_register_object_path( |
+ connection_, |
+ object_path.c_str(), |
+ vtable, |
+ user_data, |
+ error); |
+} |
+ |
+void Bus::UnregisterObjectPath(const std::string& object_path) { |
+ DCHECK(connection_); |
+ AssertOnDBusThread(); |
+ |
+ const bool success = dbus_connection_unregister_object_path( |
+ connection_, |
+ object_path.c_str()); |
+ CHECK(success) << "Unable to allocate memory"; |
+} |
+ |
+void Bus::ShutdownInternal(OnShutdownCallback callback) { |
+ AssertOnDBusThread(); |
+ |
+ ShutdownAndBlock(); |
+ PostTaskToOriginThread(FROM_HERE, callback); |
+} |
+ |
+void Bus::ProcessAllIncomingDataIfAny() { |
+ AssertOnDBusThread(); |
+ |
+ // As mentioned at the class comment in .h file, connection_ can be NULL. |
+ if (!connection_ || !dbus_connection_get_is_connected(connection_)) |
+ return; |
+ |
+ if (dbus_connection_get_dispatch_status(connection_) == |
+ DBUS_DISPATCH_DATA_REMAINS) { |
+ while (dbus_connection_dispatch(connection_) == |
+ DBUS_DISPATCH_DATA_REMAINS); |
+ } |
+} |
+ |
+void Bus::PostTaskToOriginThread(const tracked_objects::Location& from_here, |
+ const base::Closure& task) { |
+ origin_loop_->PostTask(from_here, task); |
+} |
+ |
+void Bus::PostTaskToDBusThread(const tracked_objects::Location& from_here, |
+ const base::Closure& task) { |
+ if (dbus_thread_) |
+ dbus_thread_->message_loop()->PostTask(from_here, task); |
+ else |
+ origin_loop_->PostTask(from_here, task); |
+} |
+ |
+void Bus::PostDelayedTaskToDBusThread( |
+ const tracked_objects::Location& from_here, |
+ const base::Closure& task, |
+ int delay_ms) { |
+ if (dbus_thread_) |
+ dbus_thread_->message_loop()->PostDelayedTask(from_here, task, delay_ms); |
+ else |
+ origin_loop_->PostDelayedTask(from_here, task, delay_ms); |
+} |
+ |
+bool Bus::HasDBusThread() { |
+ return dbus_thread_ != NULL; |
+} |
+ |
+void Bus::AssertOnOriginThread() { |
+ DCHECK_EQ(origin_thread_id_, base::PlatformThread::CurrentId()); |
+} |
+ |
+void Bus::AssertOnDBusThread() { |
+ base::ThreadRestrictions::AssertIOAllowed(); |
+ |
+ if (dbus_thread_) { |
+ DCHECK_EQ(dbus_thread_id_, base::PlatformThread::CurrentId()); |
+ } else { |
+ AssertOnOriginThread(); |
+ } |
+} |
+ |
+dbus_bool_t Bus::OnAddWatch(DBusWatch* raw_watch) { |
+ AssertOnDBusThread(); |
+ |
+ // watch will be deleted when raw_watch is removed in OnRemoveWatch(). |
+ Watch* watch = new Watch(raw_watch); |
+ if (watch->IsReadyToBeWatched()) { |
+ watch->StartWatching(); |
+ } |
+ ++num_pending_watches_; |
+ return true; |
+} |
+ |
+void Bus::OnRemoveWatch(DBusWatch* raw_watch) { |
+ AssertOnDBusThread(); |
+ |
+ Watch* watch = static_cast<Watch*>(dbus_watch_get_data(raw_watch)); |
+ delete watch; |
+ --num_pending_watches_; |
+} |
+ |
+void Bus::OnToggleWatch(DBusWatch* raw_watch) { |
+ AssertOnDBusThread(); |
+ |
+ Watch* watch = static_cast<Watch*>(dbus_watch_get_data(raw_watch)); |
+ if (watch->IsReadyToBeWatched()) { |
+ watch->StartWatching(); |
+ } else { |
+ // It's safe to call this if StartWatching() wasn't called, per |
+ // message_pump_libevent.h. |
+ watch->StopWatching(); |
+ } |
+} |
+ |
+dbus_bool_t Bus::OnAddTimeout(DBusTimeout* raw_timeout) { |
+ AssertOnDBusThread(); |
+ |
+ // timeout will be deleted when raw_timeout is removed in |
+ // OnRemoveTimeoutThunk(). |
+ Timeout* timeout = new Timeout(raw_timeout); |
+ if (timeout->IsReadyToBeMonitored()) { |
+ timeout->StartMonitoring(this); |
+ } |
+ ++num_pending_timeouts_; |
+ return true; |
+} |
+ |
+void Bus::OnRemoveTimeout(DBusTimeout* raw_timeout) { |
+ AssertOnDBusThread(); |
+ |
+ Timeout* timeout = static_cast<Timeout*>(dbus_timeout_get_data(raw_timeout)); |
+ timeout->Complete(); |
+ --num_pending_timeouts_; |
+} |
+ |
+void Bus::OnToggleTimeout(DBusTimeout* raw_timeout) { |
+ AssertOnDBusThread(); |
+ |
+ Timeout* timeout = static_cast<Timeout*>(dbus_timeout_get_data(raw_timeout)); |
+ if (timeout->IsReadyToBeMonitored()) { |
+ timeout->StartMonitoring(this); |
+ } else { |
+ timeout->StopMonitoring(); |
+ } |
+} |
+ |
+void Bus::OnDispatchStatusChanged(DBusConnection* connection, |
+ DBusDispatchStatus status) { |
+ DCHECK_EQ(connection, connection_); |
+ AssertOnDBusThread(); |
+ |
+ if (!dbus_connection_get_is_connected(connection)) |
+ return; |
+ |
+ // We cannot call ProcessAllIncomingDataIfAny() here, as calling |
+ // dbus_connection_dispatch() inside DBusDispatchStatusFunction is |
+ // prohibited by the D-Bus library. Hence, we post a task here instead. |
+ // See comments for dbus_connection_set_dispatch_status_function(). |
+ PostTaskToDBusThread(FROM_HERE, |
+ base::Bind(&Bus::ProcessAllIncomingDataIfAny, |
+ this)); |
+} |
+ |
+dbus_bool_t Bus::OnAddWatchThunk(DBusWatch* raw_watch, void* data) { |
+ Bus* self = static_cast<Bus*>(data); |
+ return self->OnAddWatch(raw_watch); |
+} |
+ |
+void Bus::OnRemoveWatchThunk(DBusWatch* raw_watch, void* data) { |
+ Bus* self = static_cast<Bus*>(data); |
+ return self->OnRemoveWatch(raw_watch); |
+} |
+ |
+void Bus::OnToggleWatchThunk(DBusWatch* raw_watch, void* data) { |
+ Bus* self = static_cast<Bus*>(data); |
+ return self->OnToggleWatch(raw_watch); |
+} |
+ |
+dbus_bool_t Bus::OnAddTimeoutThunk(DBusTimeout* raw_timeout, void* data) { |
+ Bus* self = static_cast<Bus*>(data); |
+ return self->OnAddTimeout(raw_timeout); |
+} |
+ |
+void Bus::OnRemoveTimeoutThunk(DBusTimeout* raw_timeout, void* data) { |
+ Bus* self = static_cast<Bus*>(data); |
+ return self->OnRemoveTimeout(raw_timeout); |
+} |
+ |
+void Bus::OnToggleTimeoutThunk(DBusTimeout* raw_timeout, void* data) { |
+ Bus* self = static_cast<Bus*>(data); |
+ return self->OnToggleTimeout(raw_timeout); |
+} |
+ |
+void Bus::OnDispatchStatusChangedThunk(DBusConnection* connection, |
+ DBusDispatchStatus status, |
+ void* data) { |
+ Bus* self = static_cast<Bus*>(data); |
+ return self->OnDispatchStatusChanged(connection, status); |
+} |
+ |
+} // namespace dbus |