Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(171)

Side by Side Diff: dbus/bus.cc

Issue 7491029: Implement Bus and ObjectProxy classes for our D-Bus library. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: minor change Created 9 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 // Copyright (c) 2011 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 //
5 // TODO(satorux):
6 // - Handle "disconnected" signal.
7 // - Add support for signal sending
8 // - Add support for signal monitoring
9 // - Collect metrics (ex. # of method calls, method call time, etc.)
10
11 #include "dbus/bus.h"
12
13 #include "base/bind.h"
14 #include "base/logging.h"
15 #include "base/message_loop.h"
16 #include "base/stl_util.h"
17 #include "base/threading/thread.h"
18 #include "base/threading/thread_restrictions.h"
19 #include "dbus/error.h"
20 #include "dbus/exported_object.h"
21 #include "dbus/object_proxy.h"
22
23 namespace dbus {
24
25 namespace {
26
27 // The class is used for watching the file descriptor used for D-Bus
28 // communication.
29 class Watch : public base::MessagePumpLibevent::Watcher {
30 public:
31 Watch(DBusWatch* watch)
32 : raw_watch_(watch) {
33 dbus_watch_set_data(raw_watch_, this, NULL);
34 }
35
36 ~Watch() {
37 dbus_watch_set_data(raw_watch_, NULL, NULL);
38 }
39
40 // Returns true if the underlying file descriptor is ready to be watched.
41 bool IsReadyToBeWatched() {
42 return dbus_watch_get_enabled(raw_watch_);
43 }
44
45 // Starts watching the underlying file descriptor.
46 void StartWatching() {
47 const int file_descriptor = dbus_watch_get_unix_fd(raw_watch_);
48 const int flags = dbus_watch_get_flags(raw_watch_);
49
50 MessageLoopForIO::Mode mode = MessageLoopForIO::WATCH_READ;
51 if ((flags & DBUS_WATCH_READABLE) && (flags & DBUS_WATCH_WRITABLE))
52 mode = MessageLoopForIO::WATCH_READ_WRITE;
53 else if (flags & DBUS_WATCH_READABLE)
54 mode = MessageLoopForIO::WATCH_READ;
55 else if (flags & DBUS_WATCH_WRITABLE)
56 mode = MessageLoopForIO::WATCH_WRITE;
57 else
58 NOTREACHED();
59
60 const bool persistent = true; // Watch persistently.
61 const bool success = MessageLoopForIO::current()->WatchFileDescriptor(
62 file_descriptor,
63 persistent,
64 mode,
65 &file_descriptor_watcher_,
66 this);
67 CHECK(success) << "Unable to allocate memory";
68 }
69
70 // Stops watching the underlying file descriptor.
71 void StopWatching() {
72 file_descriptor_watcher_.StopWatchingFileDescriptor();
73 }
74
75 private:
76 // Implement MessagePumpLibevent::Watcher.
77 virtual void OnFileCanReadWithoutBlocking(int file_descriptor) {
78 const bool success = dbus_watch_handle(raw_watch_, DBUS_WATCH_READABLE);
79 CHECK(success) << "Unable to allocate memory";
80 }
81
82 // Implement MessagePumpLibevent::Watcher.
83 virtual void OnFileCanWriteWithoutBlocking(int file_descriptor) {
84 const bool success = dbus_watch_handle(raw_watch_, DBUS_WATCH_WRITABLE);
85 CHECK(success) << "Unable to allocate memory";
86 }
87
88 DBusWatch* raw_watch_;
89 base::MessagePumpLibevent::FileDescriptorWatcher file_descriptor_watcher_;
90 };
91
92 // The class is used for monitoring the timeout used for D-Bus method
93 // calls.
94 //
95 // Unlike Watch, Timeout is a ref counted object, to ensure that |this| of
96 // the object is is alive when HandleTimeout() is called. It's unlikely
97 // but it may be possible that HandleTimeout() is called after
98 // Bus::OnRemoveTimeout(). Hence why we don't simply delete the object in
99 // Bus::OnRemoveTimeout().
100 class Timeout : public base::RefCountedThreadSafe<Timeout> {
101 public:
102 Timeout(DBusTimeout* timeout)
103 : raw_timeout_(timeout),
104 monitoring_is_active_(false),
105 is_destroyed_(false) {
106 dbus_timeout_set_data(raw_timeout_, this, NULL);
107 }
108
109 // Returns true if the timeout is ready to be monitored.
110 bool IsReadyToBeMonitored() {
111 return dbus_timeout_get_enabled(raw_timeout_);
112 }
113
114 // Starts monitoring the timeout.
115 void StartMonitoring(dbus::Bus* bus) {
116 bus->PostDelayedTaskToDBusThread(FROM_HERE,
117 base::Bind(&Timeout::HandleTimeout,
118 this),
119 GetIntervalInMs());
120 monitoring_is_active_ = true;
121 }
122
123 // Stops monitoring the timeout.
124 void StopMonitoring() {
125 // We cannot take back the delayed task we posted in
126 // StartMonitoring(), so we just mark the monitoring is inactive now.
127 monitoring_is_active_ = false;
128 }
129
130 // Returns the interval in milliseconds.
131 int GetIntervalInMs() {
132 return dbus_timeout_get_interval(raw_timeout_);
133 }
134
135 // Cleans up the raw_timeout and marks that this object is destroyed.
136 // See the class comment above for why we are doing this.
137 void Destroy() {
138 dbus_timeout_set_data(raw_timeout_, NULL, NULL);
139 is_destroyed_ = true;
140 }
141
142 private:
143 friend class base::RefCountedThreadSafe<Timeout>;
144 ~Timeout() {
145 }
146
147 // Handles the timeout.
148 void HandleTimeout() {
149 // If the object is marked destroyed, we should do nothing. This can
150 // occur if this function is called after Bus::OnRemoveTimeout().
151 if (is_destroyed_)
152 return;
153 // Skip if monitoring is cancled.
154 if (!monitoring_is_active_)
155 return;
156
157 const bool success = dbus_timeout_handle(raw_timeout_);
158 CHECK(success) << "Unable to allocate memory";
159 }
160
161 DBusTimeout* raw_timeout_;
162 bool monitoring_is_active_;
163 bool is_destroyed_;
164 };
165
166 } // namespace
167
168 Bus::Options::Options()
169 : bus_type(SESSION),
170 connection_type(PRIVATE),
171 dbus_thread(NULL) {
172 }
173
174 Bus::Options::~Options() {
175 }
176
177 Bus::Bus(const Options& options)
178 : bus_type_(options.bus_type),
179 connection_type_(options.connection_type),
180 dbus_thread_(options.dbus_thread),
181 connection_(NULL),
182 origin_loop_(MessageLoop::current()),
183 origin_thread_id_(base::PlatformThread::CurrentId()),
184 dbus_thread_id_(base::kInvalidThreadId),
185 async_operations_are_set_up_(false) {
186 if (dbus_thread_) {
187 dbus_thread_id_ = dbus_thread_->thread_id();
188 DCHECK(dbus_thread_->IsRunning())
189 << "The D-Bus thread should be running";
190 DCHECK_EQ(MessageLoop::TYPE_IO,
191 dbus_thread_->message_loop()->type())
192 << "The D-Bus thread should have an MessageLoopForIO attached";
193 }
194
195 // This is safe to call multiple times.
196 dbus_threads_init_default();
197 }
198
199 Bus::~Bus() {
200 DCHECK(!connection_);
201 DCHECK(owned_service_names_.empty());
202 for (size_t i = 0; i < exported_objects_.size(); ++i)
203 exported_objects_[i]->Release();
204 exported_objects_.clear();
205
206 for (size_t i = 0; i < object_proxies_.size(); ++i)
207 object_proxies_[i]->Release();
208 object_proxies_.clear();
209 }
210
211 ObjectProxy* Bus::GetObjectProxy(const std::string& service_name,
212 const std::string& object_path) {
213 AssertOnOriginThread();
214
215 ObjectProxy* object_proxy =
216 new ObjectProxy(this, service_name, object_path);
217 object_proxy->AddRef();
218 object_proxies_.push_back(object_proxy);
219
220 return object_proxy;
221 }
222
223 ExportedObject* Bus::GetExportedObject(const std::string& service_name,
224 const std::string& object_path) {
225 AssertOnOriginThread();
226
227 ExportedObject* exported_object =
228 new ExportedObject(this, service_name, object_path);
229 exported_object->AddRef();
230 exported_objects_.push_back(exported_object);
231
232 return exported_object;
233 }
234
235 bool Bus::Connect() {
236 // dbus_bus_get_private() and dbus_bus_get() are blocking calls.
237 AssertOnDBusThread();
238
239 // Check if it's already initialized.
240 if (connection_)
241 return true;
242
243 ScopedDBusError error;
244 const DBusBusType dbus_bus_type = static_cast<DBusBusType>(bus_type_);
245 if (connection_type_ == PRIVATE) {
246 connection_ = dbus_bus_get_private(dbus_bus_type, error.get());
247 } else {
248 connection_ = dbus_bus_get(dbus_bus_type, error.get());
249 }
250 if (!connection_) {
251 LOG(ERROR) << "Failed to connect to the bus: "
252 << (dbus_error_is_set(error.get()) ? error.message() : "");
253 return false;
254 }
255 // We shouldn't exit on the disconnected signal.
256 dbus_connection_set_exit_on_disconnect(connection_, false);
257
258 return true;
259 }
260
261 void Bus::ShutdownAndBlock() {
262 AssertOnDBusThread();
263
264 // Delete exported objects. Need to unregister them beforehand.
265 for (size_t i = 0; i < exported_objects_.size(); ++i) {
266 exported_objects_[i]->Unregister();
267 }
268
269 // Release all service names. We should not use an iterator here, as
270 // we'll modify the set in ReleaseOwnership().
271 while (!owned_service_names_.empty()) {
272 ReleaseOwnership(*owned_service_names_.begin());
273 }
274
275 // Private connection should be closed.
276 if (connection_ && connection_type_ == PRIVATE) {
277 dbus_connection_close(connection_);
278 }
279 // dbus_connection_close() won't unref.
280 dbus_connection_unref(connection_);
281
282 connection_ = NULL;
283 }
284
285 void Bus::Shutdown(OnShutdownCallback callback) {
286 AssertOnOriginThread();
287
288 PostTaskToDBusThread(FROM_HERE, base::Bind(&Bus::ShutdownInternal,
289 this,
290 callback));
291 }
292
293 bool Bus::RequestOwnership(const std::string& service_name) {
294 DCHECK(connection_);
295 // dbus_bus_request_name() is a blocking call.
296 AssertOnDBusThread();
297
298 // Check if we already own the service name.
299 if (owned_service_names_.find(service_name) != owned_service_names_.end()) {
300 return true;
301 }
302
303 ScopedDBusError error;
304 const int result = dbus_bus_request_name(connection_,
305 service_name.c_str(),
306 DBUS_NAME_FLAG_DO_NOT_QUEUE,
307 error.get());
308 if (result != DBUS_REQUEST_NAME_REPLY_PRIMARY_OWNER) {
309 LOG(ERROR) << "failed to get the onwership of " << service_name << ": "
310 << (dbus_error_is_set(error.get()) ? error.message() : "");
311 return false;
312 }
313 owned_service_names_.insert(service_name);
314 return true;
315 }
316
317 bool Bus::ReleaseOwnership(const std::string& service_name) {
318 DCHECK(connection_);
319 // dbus_bus_request_name() is a blocking call.
320 AssertOnDBusThread();
321
322 // Check if we already own the service name.
323 if (owned_service_names_.find(service_name) == owned_service_names_.end()) {
324 LOG(ERROR) << service_name << "is not owned by the bus";
325 return false;
326 }
327
328 ScopedDBusError error;
329 const int result = dbus_bus_release_name(connection_, service_name.c_str(),
330 error.get());
331 if (result == DBUS_RELEASE_NAME_REPLY_RELEASED) {
332 owned_service_names_.erase(service_name);
333 return true;
334 } else {
335 LOG(ERROR) << "failed to release the onwership of " << service_name << ": "
336 << (error.is_set() ? error.message() : "");
337 return false;
338 }
339 }
340
341 bool Bus::SetUpAsyncOperations() {
342 DCHECK(connection_);
343 AssertOnDBusThread();
344
345 if (async_operations_are_set_up_)
346 return true;
347
348 // Process all the incoming data if any, so that OnDispatchStatus() will
349 // be called when the incoming data is ready.
350 ProcessAllIncomingDataIfAny();
351
352 bool success = dbus_connection_set_watch_functions(connection_,
353 &Bus::OnAddWatchThunk,
354 &Bus::OnRemoveWatchThunk,
355 &Bus::OnToggleWatchThunk,
356 this,
357 NULL);
358 CHECK(success) << "Unable to allocate memory";
359
360 // TODO(satorux): Timeout is not yet implemented.
361 success = dbus_connection_set_timeout_functions(connection_,
362 &Bus::OnAddTimeoutThunk,
363 &Bus::OnRemoveTimeoutThunk,
364 &Bus::OnToggleTimeoutThunk,
365 this,
366 NULL);
367 CHECK(success) << "Unable to allocate memory";
368
369 dbus_connection_set_dispatch_status_function(
370 connection_,
371 &Bus::OnDispatchStatusChangedThunk,
372 this,
373 NULL);
374
375 async_operations_are_set_up_ = true;
376
377 return true;
378 }
379
380 DBusMessage* Bus::SendWithReplyAndBlock(DBusMessage* request,
381 int timeout_ms,
382 DBusError* error) {
383 DCHECK(connection_);
384 AssertOnDBusThread();
385
386 return dbus_connection_send_with_reply_and_block(
387 connection_, request, timeout_ms, error);
388 }
389
390 void Bus::SendWithReply(DBusMessage* request,
391 DBusPendingCall** pending_call,
392 int timeout_ms) {
393 DCHECK(connection_);
394 AssertOnDBusThread();
395
396 const bool success = dbus_connection_send_with_reply(
397 connection_, request, pending_call, timeout_ms);
398 CHECK(success) << "Unable to allocate memory";
399 }
400
401 bool Bus::TryRegisterObjectPath(const std::string& object_path,
402 const DBusObjectPathVTable* vtable,
403 void* user_data,
404 DBusError* error) {
405 DCHECK(connection_);
406 AssertOnDBusThread();
407
408 return dbus_connection_try_register_object_path(
409 connection_,
410 object_path.c_str(),
411 vtable,
412 user_data,
413 error);
414 }
415
416 bool Bus::UnregisterObjectPath(const std::string& object_path) {
417 DCHECK(connection_);
418 AssertOnDBusThread();
419
420 return dbus_connection_unregister_object_path(
421 connection_,
422 object_path.c_str());
stevenjb 2011/08/10 19:40:07 Since this will only fail on out of memory, and th
satorux1 2011/08/10 21:40:55 You are right. Done.
423 }
424
425 void Bus::ShutdownInternal(OnShutdownCallback callback) {
426 AssertOnDBusThread();
427
428 ShutdownAndBlock();
429 PostTaskToOriginThread(FROM_HERE, callback);
430 }
431
432 void Bus::ProcessAllIncomingDataIfAny() {
433 AssertOnDBusThread();
434
435 // As mentioned at the class comment in .h file, connection_ can be NULL.
436 if (!connection_ || !dbus_connection_get_is_connected(connection_))
437 return;
438
439 if (dbus_connection_get_dispatch_status(connection_) ==
440 DBUS_DISPATCH_DATA_REMAINS) {
441 while (dbus_connection_dispatch(connection_) ==
442 DBUS_DISPATCH_DATA_REMAINS);
443 }
444 }
445
446 void Bus::PostTaskToOriginThread(const tracked_objects::Location& from_here,
447 const base::Closure& task) {
448 origin_loop_->PostTask(from_here, task);
449 }
450
451 void Bus::PostTaskToDBusThread(const tracked_objects::Location& from_here,
452 const base::Closure& task) {
453 if (dbus_thread_)
454 dbus_thread_->message_loop()->PostTask(from_here, task);
455 else
456 origin_loop_->PostTask(from_here, task);
457 }
458
459 void Bus::PostDelayedTaskToDBusThread(
460 const tracked_objects::Location& from_here,
461 const base::Closure& task,
462 int delay_ms) {
463 if (dbus_thread_)
464 dbus_thread_->message_loop()->PostDelayedTask(from_here, task, delay_ms);
465 else
466 origin_loop_->PostDelayedTask(from_here, task, delay_ms);
467 }
468
469 void Bus::AssertOnOriginThread() {
470 DCHECK_EQ(origin_thread_id_, base::PlatformThread::CurrentId());
471 }
472
473 void Bus::AssertOnDBusThread() {
474 base::ThreadRestrictions::AssertIOAllowed();
475
476 if (dbus_thread_) {
477 DCHECK_EQ(dbus_thread_id_, base::PlatformThread::CurrentId());
478 } else {
479 AssertOnOriginThread();
480 }
481 }
482
483 dbus_bool_t Bus::OnAddWatch(DBusWatch* raw_watch) {
484 AssertOnDBusThread();
485
486 // watch will be deleted when raw_watch is removed in OnRemoveWatch().
487 Watch* watch = new Watch(raw_watch);
488 if (watch->IsReadyToBeWatched()) {
489 watch->StartWatching();
490 }
491 return true;
492 }
493
494 void Bus::OnRemoveWatch(DBusWatch* raw_watch) {
495 AssertOnDBusThread();
496
497 Watch* watch = static_cast<Watch*>(dbus_watch_get_data(raw_watch));
498 delete watch;
499 }
500
501 void Bus::OnToggleWatch(DBusWatch* raw_watch) {
502 AssertOnDBusThread();
503
504 Watch* watch = static_cast<Watch*>(dbus_watch_get_data(raw_watch));
505 if (watch->IsReadyToBeWatched()) {
506 watch->StartWatching();
507 } else {
508 // It's safe to call this if StartWatching() wasn't called, per
509 // message_pump_libevent.h.
510 watch->StopWatching();
511 }
512 }
513
514 dbus_bool_t Bus::OnAddTimeout(DBusTimeout* raw_timeout) {
515 AssertOnDBusThread();
516
517 // timeout will be deleted when raw_timeout is removed in
518 // OnRemoveTimeoutThunk().
519 Timeout* timeout = new Timeout(raw_timeout);
520 timeout->AddRef(); // Balanced on OnRemoveTimeout().
521 if (timeout->IsReadyToBeMonitored()) {
522 timeout->StartMonitoring(this);
523 }
524 return true;
525 }
526
527 void Bus::OnRemoveTimeout(DBusTimeout* raw_timeout) {
528 AssertOnDBusThread();
529
530 Timeout* timeout = static_cast<Timeout*>(dbus_timeout_get_data(raw_timeout));
531 timeout->Destroy();
532 timeout->Release();
533 }
534
535 void Bus::OnToggleTimeout(DBusTimeout* raw_timeout) {
536 AssertOnDBusThread();
537
538 Timeout* timeout = static_cast<Timeout*>(dbus_timeout_get_data(raw_timeout));
539 if (timeout->IsReadyToBeMonitored()) {
540 timeout->StartMonitoring(this);
541 } else {
542 timeout->StopMonitoring();
543 }
544 }
545
546 void Bus::OnDispatchStatusChanged(DBusConnection* connection,
547 DBusDispatchStatus status) {
548 DCHECK_EQ(connection, connection_);
549 AssertOnDBusThread();
550
551 if (!dbus_connection_get_is_connected(connection))
552 return;
553
554 // We cannot call ProcessAllIncomingDataIfAny() here, as calling
555 // dbus_connection_dispatch() inside DBusDispatchStatusFunction is
556 // prohibited by the D-Bus library. Hence, we post a task here instead.
557 // See comments for dbus_connection_set_dispatch_status_function().
558 PostTaskToDBusThread(FROM_HERE,
559 base::Bind(&Bus::ProcessAllIncomingDataIfAny,
560 this));
561 }
562
563 dbus_bool_t Bus::OnAddWatchThunk(DBusWatch* raw_watch, void* data) {
564 Bus* self = static_cast<Bus*>(data);
565 return self->OnAddWatch(raw_watch);
566 }
567
568 void Bus::OnRemoveWatchThunk(DBusWatch* raw_watch, void* data) {
569 Bus* self = static_cast<Bus*>(data);
570 return self->OnRemoveWatch(raw_watch);
571 }
572
573 void Bus::OnToggleWatchThunk(DBusWatch* raw_watch, void* data) {
574 Bus* self = static_cast<Bus*>(data);
575 return self->OnToggleWatch(raw_watch);
576 }
577
578 dbus_bool_t Bus::OnAddTimeoutThunk(DBusTimeout* raw_timeout, void* data) {
579 Bus* self = static_cast<Bus*>(data);
580 return self->OnAddTimeout(raw_timeout);
581 }
582
583 void Bus::OnRemoveTimeoutThunk(DBusTimeout* raw_timeout, void* data) {
584 Bus* self = static_cast<Bus*>(data);
585 return self->OnRemoveTimeout(raw_timeout);
586 }
587
588 void Bus::OnToggleTimeoutThunk(DBusTimeout* raw_timeout, void* data) {
589 Bus* self = static_cast<Bus*>(data);
590 return self->OnToggleTimeout(raw_timeout);
591 }
592
593 void Bus::OnDispatchStatusChangedThunk(DBusConnection* connection,
594 DBusDispatchStatus status,
595 void* data) {
596 Bus* self = static_cast<Bus*>(data);
597 return self->OnDispatchStatusChanged(connection, status);
598 }
599
600 } // namespace dbus
OLDNEW
« no previous file with comments | « dbus/bus.h ('k') | dbus/dbus.gyp » ('j') | dbus/exported_object.cc » ('J')

Powered by Google App Engine
This is Rietveld 408576698