Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1733)

Unified Diff: dbus/bus.cc

Issue 7491029: Implement Bus and ObjectProxy classes for our D-Bus library. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: another clang challenge Created 9 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « dbus/bus.h ('k') | dbus/dbus.gyp » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: dbus/bus.cc
diff --git a/dbus/bus.cc b/dbus/bus.cc
new file mode 100644
index 0000000000000000000000000000000000000000..2e8fb4789f81cb3b9726520151a99afb742bbd2f
--- /dev/null
+++ b/dbus/bus.cc
@@ -0,0 +1,613 @@
+// Copyright (c) 2011 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+//
+// TODO(satorux):
+// - Handle "disconnected" signal.
+// - Add support for signal sending
+// - Add support for signal monitoring
+// - Collect metrics (ex. # of method calls, method call time, etc.)
+
+#include "dbus/bus.h"
+
+#include "base/bind.h"
+#include "base/logging.h"
+#include "base/message_loop.h"
+#include "base/stl_util.h"
+#include "base/threading/thread.h"
+#include "base/threading/thread_restrictions.h"
+#include "dbus/exported_object.h"
+#include "dbus/object_proxy.h"
+#include "dbus/scoped_dbus_error.h"
+
+namespace dbus {
+
+namespace {
+
+// The class is used for watching the file descriptor used for D-Bus
+// communication.
+class Watch : public base::MessagePumpLibevent::Watcher {
+ public:
+ Watch(DBusWatch* watch)
+ : raw_watch_(watch) {
+ dbus_watch_set_data(raw_watch_, this, NULL);
+ }
+
+ ~Watch() {
+ dbus_watch_set_data(raw_watch_, NULL, NULL);
+ }
+
+ // Returns true if the underlying file descriptor is ready to be watched.
+ bool IsReadyToBeWatched() {
+ return dbus_watch_get_enabled(raw_watch_);
+ }
+
+ // Starts watching the underlying file descriptor.
+ void StartWatching() {
+ const int file_descriptor = dbus_watch_get_unix_fd(raw_watch_);
+ const int flags = dbus_watch_get_flags(raw_watch_);
+
+ MessageLoopForIO::Mode mode = MessageLoopForIO::WATCH_READ;
+ if ((flags & DBUS_WATCH_READABLE) && (flags & DBUS_WATCH_WRITABLE))
+ mode = MessageLoopForIO::WATCH_READ_WRITE;
+ else if (flags & DBUS_WATCH_READABLE)
+ mode = MessageLoopForIO::WATCH_READ;
+ else if (flags & DBUS_WATCH_WRITABLE)
+ mode = MessageLoopForIO::WATCH_WRITE;
+ else
+ NOTREACHED();
+
+ const bool persistent = true; // Watch persistently.
+ const bool success = MessageLoopForIO::current()->WatchFileDescriptor(
+ file_descriptor,
+ persistent,
+ mode,
+ &file_descriptor_watcher_,
+ this);
+ CHECK(success) << "Unable to allocate memory";
+ }
+
+ // Stops watching the underlying file descriptor.
+ void StopWatching() {
+ file_descriptor_watcher_.StopWatchingFileDescriptor();
+ }
+
+ private:
+ // Implement MessagePumpLibevent::Watcher.
+ virtual void OnFileCanReadWithoutBlocking(int file_descriptor) {
+ const bool success = dbus_watch_handle(raw_watch_, DBUS_WATCH_READABLE);
+ CHECK(success) << "Unable to allocate memory";
+ }
+
+ // Implement MessagePumpLibevent::Watcher.
+ virtual void OnFileCanWriteWithoutBlocking(int file_descriptor) {
+ const bool success = dbus_watch_handle(raw_watch_, DBUS_WATCH_WRITABLE);
+ CHECK(success) << "Unable to allocate memory";
+ }
+
+ DBusWatch* raw_watch_;
+ base::MessagePumpLibevent::FileDescriptorWatcher file_descriptor_watcher_;
+};
+
+// The class is used for monitoring the timeout used for D-Bus method
+// calls.
+//
+// Unlike Watch, Timeout is a ref counted object, to ensure that |this| of
+// the object is is alive when HandleTimeout() is called. It's unlikely
+// but it may be possible that HandleTimeout() is called after
+// Bus::OnRemoveTimeout(). That's why we don't simply delete the object in
+// Bus::OnRemoveTimeout().
+class Timeout : public base::RefCountedThreadSafe<Timeout> {
+ public:
+ Timeout(DBusTimeout* timeout)
+ : raw_timeout_(timeout),
+ monitoring_is_active_(false),
+ is_completed(false) {
+ dbus_timeout_set_data(raw_timeout_, this, NULL);
+ AddRef(); // Balanced on Complete().
+ }
+
+ // Returns true if the timeout is ready to be monitored.
+ bool IsReadyToBeMonitored() {
+ return dbus_timeout_get_enabled(raw_timeout_);
+ }
+
+ // Starts monitoring the timeout.
+ void StartMonitoring(dbus::Bus* bus) {
+ bus->PostDelayedTaskToDBusThread(FROM_HERE,
+ base::Bind(&Timeout::HandleTimeout,
+ this),
+ GetIntervalInMs());
+ monitoring_is_active_ = true;
+ }
+
+ // Stops monitoring the timeout.
+ void StopMonitoring() {
+ // We cannot take back the delayed task we posted in
+ // StartMonitoring(), so we just mark the monitoring is inactive now.
+ monitoring_is_active_ = false;
+ }
+
+ // Returns the interval in milliseconds.
+ int GetIntervalInMs() {
+ return dbus_timeout_get_interval(raw_timeout_);
+ }
+
+ // Cleans up the raw_timeout and marks that timeout is completed.
+ // See the class comment above for why we are doing this.
+ void Complete() {
+ dbus_timeout_set_data(raw_timeout_, NULL, NULL);
+ is_completed = true;
+ Release();
+ }
+
+ private:
+ friend class base::RefCountedThreadSafe<Timeout>;
+ ~Timeout() {
+ }
+
+ // Handles the timeout.
+ void HandleTimeout() {
+ // If the timeout is marked completed, we should do nothing. This can
+ // occur if this function is called after Bus::OnRemoveTimeout().
+ if (is_completed)
+ return;
+ // Skip if monitoring is cancled.
+ if (!monitoring_is_active_)
+ return;
+
+ const bool success = dbus_timeout_handle(raw_timeout_);
+ CHECK(success) << "Unable to allocate memory";
+ }
+
+ DBusTimeout* raw_timeout_;
+ bool monitoring_is_active_;
+ bool is_completed;
+};
+
+} // namespace
+
+Bus::Options::Options()
+ : bus_type(SESSION),
+ connection_type(PRIVATE),
+ dbus_thread(NULL) {
+}
+
+Bus::Options::~Options() {
+}
+
+Bus::Bus(const Options& options)
+ : bus_type_(options.bus_type),
+ connection_type_(options.connection_type),
+ dbus_thread_(options.dbus_thread),
+ connection_(NULL),
+ origin_loop_(MessageLoop::current()),
+ origin_thread_id_(base::PlatformThread::CurrentId()),
+ dbus_thread_id_(base::kInvalidThreadId),
+ async_operations_are_set_up_(false),
+ num_pending_watches_(0),
+ num_pending_timeouts_(0) {
+ if (dbus_thread_) {
+ dbus_thread_id_ = dbus_thread_->thread_id();
+ DCHECK(dbus_thread_->IsRunning())
+ << "The D-Bus thread should be running";
+ DCHECK_EQ(MessageLoop::TYPE_IO,
+ dbus_thread_->message_loop()->type())
+ << "The D-Bus thread should have an MessageLoopForIO attached";
+ }
+
+ // This is safe to call multiple times.
+ dbus_threads_init_default();
+}
+
+Bus::~Bus() {
+ DCHECK(!connection_);
+ DCHECK(owned_service_names_.empty());
+ DCHECK_EQ(0, num_pending_watches_);
+ DCHECK_EQ(0, num_pending_timeouts_);
+}
+
+ObjectProxy* Bus::GetObjectProxy(const std::string& service_name,
+ const std::string& object_path) {
+ AssertOnOriginThread();
+
+ scoped_refptr<ObjectProxy> object_proxy =
+ new ObjectProxy(this, service_name, object_path);
+ object_proxies_.push_back(object_proxy);
+
+ return object_proxy;
+}
+
+ExportedObject* Bus::GetExportedObject(const std::string& service_name,
+ const std::string& object_path) {
+ AssertOnOriginThread();
+
+ scoped_refptr<ExportedObject> exported_object =
+ new ExportedObject(this, service_name, object_path);
+ exported_objects_.push_back(exported_object);
+
+ return exported_object;
+}
+
+bool Bus::Connect() {
+ // dbus_bus_get_private() and dbus_bus_get() are blocking calls.
+ AssertOnDBusThread();
+
+ // Check if it's already initialized.
+ if (connection_)
+ return true;
+
+ ScopedDBusError error;
+ const DBusBusType dbus_bus_type = static_cast<DBusBusType>(bus_type_);
+ if (connection_type_ == PRIVATE) {
+ connection_ = dbus_bus_get_private(dbus_bus_type, error.get());
+ } else {
+ connection_ = dbus_bus_get(dbus_bus_type, error.get());
+ }
+ if (!connection_) {
+ LOG(ERROR) << "Failed to connect to the bus: "
+ << (dbus_error_is_set(error.get()) ? error.message() : "");
+ return false;
+ }
+ // We shouldn't exit on the disconnected signal.
+ dbus_connection_set_exit_on_disconnect(connection_, false);
+
+ return true;
+}
+
+void Bus::ShutdownAndBlock() {
+ AssertOnDBusThread();
+
+ // Unregister the exported objects.
+ for (size_t i = 0; i < exported_objects_.size(); ++i) {
+ exported_objects_[i]->Unregister();
+ }
+
+ // Release all service names.
+ for (std::set<std::string>::iterator iter = owned_service_names_.begin();
+ iter != owned_service_names_.end();) {
+ // This is a bit tricky but we should increment the iter here as
+ // ReleaseOwnership() may remove |service_name| from the set.
+ const std::string& service_name = *iter++;
+ ReleaseOwnership(service_name);
+ }
+ if (!owned_service_names_.empty()) {
+ LOG(ERROR) << "Failed to release all service names. # of services left: "
+ << owned_service_names_.size();
+ }
+
+ // Private connection should be closed.
+ if (connection_ && connection_type_ == PRIVATE) {
+ dbus_connection_close(connection_);
+ }
+ // dbus_connection_close() won't unref.
+ dbus_connection_unref(connection_);
+
+ connection_ = NULL;
+}
+
+void Bus::Shutdown(OnShutdownCallback callback) {
+ AssertOnOriginThread();
+
+ PostTaskToDBusThread(FROM_HERE, base::Bind(&Bus::ShutdownInternal,
+ this,
+ callback));
+}
+
+bool Bus::RequestOwnership(const std::string& service_name) {
+ DCHECK(connection_);
+ // dbus_bus_request_name() is a blocking call.
+ AssertOnDBusThread();
+
+ // Check if we already own the service name.
+ if (owned_service_names_.find(service_name) != owned_service_names_.end()) {
+ return true;
+ }
+
+ ScopedDBusError error;
+ const int result = dbus_bus_request_name(connection_,
+ service_name.c_str(),
+ DBUS_NAME_FLAG_DO_NOT_QUEUE,
+ error.get());
+ if (result != DBUS_REQUEST_NAME_REPLY_PRIMARY_OWNER) {
+ LOG(ERROR) << "Failed to get the onwership of " << service_name << ": "
+ << (dbus_error_is_set(error.get()) ? error.message() : "");
+ return false;
+ }
+ owned_service_names_.insert(service_name);
+ return true;
+}
+
+bool Bus::ReleaseOwnership(const std::string& service_name) {
+ DCHECK(connection_);
+ // dbus_bus_request_name() is a blocking call.
+ AssertOnDBusThread();
+
+ // Check if we already own the service name.
+ std::set<std::string>::iterator found =
+ owned_service_names_.find(service_name);
+ if (found == owned_service_names_.end()) {
+ LOG(ERROR) << service_name << " is not owned by the bus";
+ return false;
+ }
+
+ ScopedDBusError error;
+ const int result = dbus_bus_release_name(connection_, service_name.c_str(),
+ error.get());
+ if (result == DBUS_RELEASE_NAME_REPLY_RELEASED) {
+ owned_service_names_.erase(found);
+ return true;
+ } else {
+ LOG(ERROR) << "Failed to release the onwership of " << service_name << ": "
+ << (error.is_set() ? error.message() : "");
+ return false;
+ }
+}
+
+bool Bus::SetUpAsyncOperations() {
+ DCHECK(connection_);
+ AssertOnDBusThread();
+
+ if (async_operations_are_set_up_)
+ return true;
+
+ // Process all the incoming data if any, so that OnDispatchStatus() will
+ // be called when the incoming data is ready.
+ ProcessAllIncomingDataIfAny();
+
+ bool success = dbus_connection_set_watch_functions(connection_,
+ &Bus::OnAddWatchThunk,
+ &Bus::OnRemoveWatchThunk,
+ &Bus::OnToggleWatchThunk,
+ this,
+ NULL);
+ CHECK(success) << "Unable to allocate memory";
+
+ // TODO(satorux): Timeout is not yet implemented.
+ success = dbus_connection_set_timeout_functions(connection_,
+ &Bus::OnAddTimeoutThunk,
+ &Bus::OnRemoveTimeoutThunk,
+ &Bus::OnToggleTimeoutThunk,
+ this,
+ NULL);
+ CHECK(success) << "Unable to allocate memory";
+
+ dbus_connection_set_dispatch_status_function(
+ connection_,
+ &Bus::OnDispatchStatusChangedThunk,
+ this,
+ NULL);
+
+ async_operations_are_set_up_ = true;
+
+ return true;
+}
+
+DBusMessage* Bus::SendWithReplyAndBlock(DBusMessage* request,
+ int timeout_ms,
+ DBusError* error) {
+ DCHECK(connection_);
+ AssertOnDBusThread();
+
+ return dbus_connection_send_with_reply_and_block(
+ connection_, request, timeout_ms, error);
+}
+
+void Bus::SendWithReply(DBusMessage* request,
+ DBusPendingCall** pending_call,
+ int timeout_ms) {
+ DCHECK(connection_);
+ AssertOnDBusThread();
+
+ const bool success = dbus_connection_send_with_reply(
+ connection_, request, pending_call, timeout_ms);
+ CHECK(success) << "Unable to allocate memory";
+}
+
+bool Bus::TryRegisterObjectPath(const std::string& object_path,
+ const DBusObjectPathVTable* vtable,
+ void* user_data,
+ DBusError* error) {
+ DCHECK(connection_);
+ AssertOnDBusThread();
+
+ return dbus_connection_try_register_object_path(
+ connection_,
+ object_path.c_str(),
+ vtable,
+ user_data,
+ error);
+}
+
+void Bus::UnregisterObjectPath(const std::string& object_path) {
+ DCHECK(connection_);
+ AssertOnDBusThread();
+
+ const bool success = dbus_connection_unregister_object_path(
+ connection_,
+ object_path.c_str());
+ CHECK(success) << "Unable to allocate memory";
+}
+
+void Bus::ShutdownInternal(OnShutdownCallback callback) {
+ AssertOnDBusThread();
+
+ ShutdownAndBlock();
+ PostTaskToOriginThread(FROM_HERE, callback);
+}
+
+void Bus::ProcessAllIncomingDataIfAny() {
+ AssertOnDBusThread();
+
+ // As mentioned at the class comment in .h file, connection_ can be NULL.
+ if (!connection_ || !dbus_connection_get_is_connected(connection_))
+ return;
+
+ if (dbus_connection_get_dispatch_status(connection_) ==
+ DBUS_DISPATCH_DATA_REMAINS) {
+ while (dbus_connection_dispatch(connection_) ==
+ DBUS_DISPATCH_DATA_REMAINS);
+ }
+}
+
+void Bus::PostTaskToOriginThread(const tracked_objects::Location& from_here,
+ const base::Closure& task) {
+ origin_loop_->PostTask(from_here, task);
+}
+
+void Bus::PostTaskToDBusThread(const tracked_objects::Location& from_here,
+ const base::Closure& task) {
+ if (dbus_thread_)
+ dbus_thread_->message_loop()->PostTask(from_here, task);
+ else
+ origin_loop_->PostTask(from_here, task);
+}
+
+void Bus::PostDelayedTaskToDBusThread(
+ const tracked_objects::Location& from_here,
+ const base::Closure& task,
+ int delay_ms) {
+ if (dbus_thread_)
+ dbus_thread_->message_loop()->PostDelayedTask(from_here, task, delay_ms);
+ else
+ origin_loop_->PostDelayedTask(from_here, task, delay_ms);
+}
+
+bool Bus::HasDBusThread() {
+ return dbus_thread_ != NULL;
+}
+
+void Bus::AssertOnOriginThread() {
+ DCHECK_EQ(origin_thread_id_, base::PlatformThread::CurrentId());
+}
+
+void Bus::AssertOnDBusThread() {
+ base::ThreadRestrictions::AssertIOAllowed();
+
+ if (dbus_thread_) {
+ DCHECK_EQ(dbus_thread_id_, base::PlatformThread::CurrentId());
+ } else {
+ AssertOnOriginThread();
+ }
+}
+
+dbus_bool_t Bus::OnAddWatch(DBusWatch* raw_watch) {
+ AssertOnDBusThread();
+
+ // watch will be deleted when raw_watch is removed in OnRemoveWatch().
+ Watch* watch = new Watch(raw_watch);
+ if (watch->IsReadyToBeWatched()) {
+ watch->StartWatching();
+ }
+ ++num_pending_watches_;
+ return true;
+}
+
+void Bus::OnRemoveWatch(DBusWatch* raw_watch) {
+ AssertOnDBusThread();
+
+ Watch* watch = static_cast<Watch*>(dbus_watch_get_data(raw_watch));
+ delete watch;
+ --num_pending_watches_;
+}
+
+void Bus::OnToggleWatch(DBusWatch* raw_watch) {
+ AssertOnDBusThread();
+
+ Watch* watch = static_cast<Watch*>(dbus_watch_get_data(raw_watch));
+ if (watch->IsReadyToBeWatched()) {
+ watch->StartWatching();
+ } else {
+ // It's safe to call this if StartWatching() wasn't called, per
+ // message_pump_libevent.h.
+ watch->StopWatching();
+ }
+}
+
+dbus_bool_t Bus::OnAddTimeout(DBusTimeout* raw_timeout) {
+ AssertOnDBusThread();
+
+ // timeout will be deleted when raw_timeout is removed in
+ // OnRemoveTimeoutThunk().
+ Timeout* timeout = new Timeout(raw_timeout);
+ if (timeout->IsReadyToBeMonitored()) {
+ timeout->StartMonitoring(this);
+ }
+ ++num_pending_timeouts_;
+ return true;
+}
+
+void Bus::OnRemoveTimeout(DBusTimeout* raw_timeout) {
+ AssertOnDBusThread();
+
+ Timeout* timeout = static_cast<Timeout*>(dbus_timeout_get_data(raw_timeout));
+ timeout->Complete();
+ --num_pending_timeouts_;
+}
+
+void Bus::OnToggleTimeout(DBusTimeout* raw_timeout) {
+ AssertOnDBusThread();
+
+ Timeout* timeout = static_cast<Timeout*>(dbus_timeout_get_data(raw_timeout));
+ if (timeout->IsReadyToBeMonitored()) {
+ timeout->StartMonitoring(this);
+ } else {
+ timeout->StopMonitoring();
+ }
+}
+
+void Bus::OnDispatchStatusChanged(DBusConnection* connection,
+ DBusDispatchStatus status) {
+ DCHECK_EQ(connection, connection_);
+ AssertOnDBusThread();
+
+ if (!dbus_connection_get_is_connected(connection))
+ return;
+
+ // We cannot call ProcessAllIncomingDataIfAny() here, as calling
+ // dbus_connection_dispatch() inside DBusDispatchStatusFunction is
+ // prohibited by the D-Bus library. Hence, we post a task here instead.
+ // See comments for dbus_connection_set_dispatch_status_function().
+ PostTaskToDBusThread(FROM_HERE,
+ base::Bind(&Bus::ProcessAllIncomingDataIfAny,
+ this));
+}
+
+dbus_bool_t Bus::OnAddWatchThunk(DBusWatch* raw_watch, void* data) {
+ Bus* self = static_cast<Bus*>(data);
+ return self->OnAddWatch(raw_watch);
+}
+
+void Bus::OnRemoveWatchThunk(DBusWatch* raw_watch, void* data) {
+ Bus* self = static_cast<Bus*>(data);
+ return self->OnRemoveWatch(raw_watch);
+}
+
+void Bus::OnToggleWatchThunk(DBusWatch* raw_watch, void* data) {
+ Bus* self = static_cast<Bus*>(data);
+ return self->OnToggleWatch(raw_watch);
+}
+
+dbus_bool_t Bus::OnAddTimeoutThunk(DBusTimeout* raw_timeout, void* data) {
+ Bus* self = static_cast<Bus*>(data);
+ return self->OnAddTimeout(raw_timeout);
+}
+
+void Bus::OnRemoveTimeoutThunk(DBusTimeout* raw_timeout, void* data) {
+ Bus* self = static_cast<Bus*>(data);
+ return self->OnRemoveTimeout(raw_timeout);
+}
+
+void Bus::OnToggleTimeoutThunk(DBusTimeout* raw_timeout, void* data) {
+ Bus* self = static_cast<Bus*>(data);
+ return self->OnToggleTimeout(raw_timeout);
+}
+
+void Bus::OnDispatchStatusChangedThunk(DBusConnection* connection,
+ DBusDispatchStatus status,
+ void* data) {
+ Bus* self = static_cast<Bus*>(data);
+ return self->OnDispatchStatusChanged(connection, status);
+}
+
+} // namespace dbus
« no previous file with comments | « dbus/bus.h ('k') | dbus/dbus.gyp » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698