Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(617)

Unified Diff: mojo/system/channel.cc

Issue 60103005: Mojo: First stab at making MessagePipes work across OS pipes. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: rebased Created 7 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: mojo/system/channel.cc
diff --git a/mojo/system/channel.cc b/mojo/system/channel.cc
new file mode 100644
index 0000000000000000000000000000000000000000..095e44cab1feddb6be2c662d0d734bc13fa1c23d
--- /dev/null
+++ b/mojo/system/channel.cc
@@ -0,0 +1,215 @@
+// Copyright 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "mojo/system/channel.h"
+
+#include "base/basictypes.h"
+#include "base/bind.h"
+#include "base/compiler_specific.h"
+#include "base/logging.h"
+#include "base/message_loop/message_loop.h"
+#include "base/strings/stringprintf.h"
+
+namespace mojo {
+namespace system {
+
+COMPILE_ASSERT(Channel::kBootstrapEndpointId !=
+ MessageInTransit::kInvalidEndpointId,
+ kBootstrapEndpointId_is_invalid);
+
+STATIC_CONST_MEMBER_DEFINITION const MessageInTransit::EndpointId
+ Channel::kBootstrapEndpointId;
+
+Channel::EndpointInfo::EndpointInfo() {
+}
+
+Channel::EndpointInfo::EndpointInfo(scoped_refptr<MessagePipe> message_pipe,
+ unsigned port)
+ : message_pipe(message_pipe),
+ port(port) {
+}
+
+Channel::EndpointInfo::~EndpointInfo() {
+}
+
+Channel::Channel()
+ : next_local_id_(kBootstrapEndpointId) {
+}
+
+bool Channel::Init(const PlatformChannelHandle& handle) {
+ DCHECK(creation_thread_checker_.CalledOnValidThread());
+
+ // No need to take |lock_|, since this must be called before this object
+ // becomes thread-safe.
+ DCHECK(!raw_channel_.get());
+
+ raw_channel_.reset(
+ RawChannel::Create(handle, this, base::MessageLoop::current()));
+ if (!raw_channel_->Init()) {
+ raw_channel_.reset();
+ return false;
+ }
+
+ return true;
+}
+
+void Channel::Shutdown() {
+ DCHECK(creation_thread_checker_.CalledOnValidThread());
+
+ base::AutoLock locker(lock_);
+ DCHECK(raw_channel_.get());
+ raw_channel_->Shutdown();
+ raw_channel_.reset();
+
+ // TODO(vtl): Should I clear |local_id_to_endpoint_info_map_|? Or assert that
+ // it's empty?
+}
+
+MessageInTransit::EndpointId Channel::AttachMessagePipeEndpoint(
+ scoped_refptr<MessagePipe> message_pipe, unsigned port) {
+ MessageInTransit::EndpointId local_id;
+ {
+ base::AutoLock locker(lock_);
+
+ while (next_local_id_ == MessageInTransit::kInvalidEndpointId ||
+ local_id_to_endpoint_info_map_.find(next_local_id_) !=
+ local_id_to_endpoint_info_map_.end())
+ next_local_id_++;
+
+ local_id = next_local_id_;
+ next_local_id_++;
+
+ // TODO(vtl): Use emplace when we move to C++11 unordered_maps. (It'll avoid
+ // some expensive reference count increment/decrements.) Once this is done,
+ // we should be able to delete |EndpointInfo|'s default constructor.
+ local_id_to_endpoint_info_map_[local_id] = EndpointInfo(message_pipe, port);
+ }
+
+ message_pipe->Attach(port, scoped_refptr<Channel>(this), local_id);
+ return local_id;
+}
+
+void Channel::RunMessagePipeEndpoint(MessageInTransit::EndpointId local_id,
+ MessageInTransit::EndpointId remote_id) {
+ EndpointInfo endpoint_info;
+ {
+ base::AutoLock locker(lock_);
+
+ IdToEndpointInfoMap::const_iterator it =
+ local_id_to_endpoint_info_map_.find(local_id);
+ CHECK(it != local_id_to_endpoint_info_map_.end());
+ endpoint_info = it->second;
+ }
+
+ endpoint_info.message_pipe->Run(endpoint_info.port, remote_id);
+}
+
+bool Channel::WriteMessage(MessageInTransit* message) {
+ base::AutoLock locker(lock_);
+ if (!raw_channel_.get()) {
+ // TODO(vtl): I think this is probably not an error condition, but I should
+ // think about it (and the shutdown sequence) more carefully.
+ LOG(INFO) << "WriteMessage() after shutdown";
+ return false;
+ }
+
+ return raw_channel_->WriteMessage(message);
+}
+
+void Channel::DetachMessagePipeEndpoint(MessageInTransit::EndpointId local_id) {
+ DCHECK_NE(local_id, MessageInTransit::kInvalidEndpointId);
+
+ base::AutoLock locker_(lock_);
+ local_id_to_endpoint_info_map_.erase(local_id);
+}
+
+Channel::~Channel() {
+ // The channel should have been shut down first.
+ DCHECK(!raw_channel_.get());
+}
+
+void Channel::OnReadMessage(const MessageInTransit& message) {
+ switch (message.type()) {
+ case MessageInTransit::kTypeMessagePipeEndpoint:
+ case MessageInTransit::kTypeMessagePipe:
+ OnReadMessageForDownstream(message);
+ break;
+ case MessageInTransit::TYPE_CHANNEL:
+ OnReadMessageForChannel(message);
+ break;
+ default:
+ HandleRemoteError(base::StringPrintf(
+ "Received message of invalid type %u",
+ static_cast<unsigned>(message.type())));
+ break;
+ }
+}
+
+void Channel::OnFatalError(FatalError fatal_error) {
+ // TODO(vtl): IMPORTANT. Notify all our endpoints that they're dead.
+ NOTIMPLEMENTED();
+}
+
+void Channel::OnReadMessageForDownstream(const MessageInTransit& message) {
+ DCHECK(message.type() == MessageInTransit::kTypeMessagePipeEndpoint ||
+ message.type() == MessageInTransit::kTypeMessagePipe);
+
+ MessageInTransit::EndpointId local_id = message.destination_id();
+ if (local_id == MessageInTransit::kInvalidEndpointId) {
+ HandleRemoteError("Received message with no destination ID");
+ return;
+ }
+
+ EndpointInfo endpoint_info;
+ {
+ base::AutoLock locker(lock_);
+
+ // Since we own |raw_channel_|, and this method and |Shutdown()| should only
+ // be called from the creation thread, |raw_channel_| should never be null
+ // here.
+ DCHECK(raw_channel_.get());
+
+ IdToEndpointInfoMap::const_iterator it =
+ local_id_to_endpoint_info_map_.find(local_id);
+ if (it == local_id_to_endpoint_info_map_.end()) {
+ HandleRemoteError(base::StringPrintf(
+ "Received a message for nonexistent local destination ID %u",
+ static_cast<unsigned>(local_id)));
+ return;
+ }
+ endpoint_info = it->second;
+ }
+
+ // We need to duplicate the message, because |EnqueueMessage()| will take
+ // ownership of it.
+ MessageInTransit* own_message = MessageInTransit::Create(
+ message.type(), message.subtype(), message.data(), message.data_size());
+ if (endpoint_info.message_pipe->EnqueueMessage(
+ MessagePipe::GetPeerPort(endpoint_info.port),
+ own_message) != MOJO_RESULT_OK) {
+ HandleLocalError(base::StringPrintf(
+ "Failed to enqueue message to local destination ID %u",
+ static_cast<unsigned>(local_id)));
+ return;
+ }
+}
+
+void Channel::OnReadMessageForChannel(const MessageInTransit& message) {
+ // TODO(vtl): Currently no channel-only messages yet.
+ HandleRemoteError("Received invalid channel message");
+ NOTREACHED();
+}
+
+void Channel::HandleRemoteError(const base::StringPiece& error_message) {
+ // TODO(vtl): Is this how we really want to handle this?
+ LOG(INFO) << error_message;
+}
+
+void Channel::HandleLocalError(const base::StringPiece& error_message) {
+ // TODO(vtl): Is this how we really want to handle this?
+ LOG(FATAL) << error_message;
+}
+
+} // namespace system
+} // namespace mojo

Powered by Google App Engine
This is Rietveld 408576698