Index: mojo/system/channel.cc |
diff --git a/mojo/system/channel.cc b/mojo/system/channel.cc |
new file mode 100644 |
index 0000000000000000000000000000000000000000..095e44cab1feddb6be2c662d0d734bc13fa1c23d |
--- /dev/null |
+++ b/mojo/system/channel.cc |
@@ -0,0 +1,215 @@ |
+// Copyright 2013 The Chromium Authors. All rights reserved. |
+// Use of this source code is governed by a BSD-style license that can be |
+// found in the LICENSE file. |
+ |
+#include "mojo/system/channel.h" |
+ |
+#include "base/basictypes.h" |
+#include "base/bind.h" |
+#include "base/compiler_specific.h" |
+#include "base/logging.h" |
+#include "base/message_loop/message_loop.h" |
+#include "base/strings/stringprintf.h" |
+ |
+namespace mojo { |
+namespace system { |
+ |
+COMPILE_ASSERT(Channel::kBootstrapEndpointId != |
+ MessageInTransit::kInvalidEndpointId, |
+ kBootstrapEndpointId_is_invalid); |
+ |
+STATIC_CONST_MEMBER_DEFINITION const MessageInTransit::EndpointId |
+ Channel::kBootstrapEndpointId; |
+ |
+Channel::EndpointInfo::EndpointInfo() { |
+} |
+ |
+Channel::EndpointInfo::EndpointInfo(scoped_refptr<MessagePipe> message_pipe, |
+ unsigned port) |
+ : message_pipe(message_pipe), |
+ port(port) { |
+} |
+ |
+Channel::EndpointInfo::~EndpointInfo() { |
+} |
+ |
+Channel::Channel() |
+ : next_local_id_(kBootstrapEndpointId) { |
+} |
+ |
+bool Channel::Init(const PlatformChannelHandle& handle) { |
+ DCHECK(creation_thread_checker_.CalledOnValidThread()); |
+ |
+ // No need to take |lock_|, since this must be called before this object |
+ // becomes thread-safe. |
+ DCHECK(!raw_channel_.get()); |
+ |
+ raw_channel_.reset( |
+ RawChannel::Create(handle, this, base::MessageLoop::current())); |
+ if (!raw_channel_->Init()) { |
+ raw_channel_.reset(); |
+ return false; |
+ } |
+ |
+ return true; |
+} |
+ |
+void Channel::Shutdown() { |
+ DCHECK(creation_thread_checker_.CalledOnValidThread()); |
+ |
+ base::AutoLock locker(lock_); |
+ DCHECK(raw_channel_.get()); |
+ raw_channel_->Shutdown(); |
+ raw_channel_.reset(); |
+ |
+ // TODO(vtl): Should I clear |local_id_to_endpoint_info_map_|? Or assert that |
+ // it's empty? |
+} |
+ |
+MessageInTransit::EndpointId Channel::AttachMessagePipeEndpoint( |
+ scoped_refptr<MessagePipe> message_pipe, unsigned port) { |
+ MessageInTransit::EndpointId local_id; |
+ { |
+ base::AutoLock locker(lock_); |
+ |
+ while (next_local_id_ == MessageInTransit::kInvalidEndpointId || |
+ local_id_to_endpoint_info_map_.find(next_local_id_) != |
+ local_id_to_endpoint_info_map_.end()) |
+ next_local_id_++; |
+ |
+ local_id = next_local_id_; |
+ next_local_id_++; |
+ |
+ // TODO(vtl): Use emplace when we move to C++11 unordered_maps. (It'll avoid |
+ // some expensive reference count increment/decrements.) Once this is done, |
+ // we should be able to delete |EndpointInfo|'s default constructor. |
+ local_id_to_endpoint_info_map_[local_id] = EndpointInfo(message_pipe, port); |
+ } |
+ |
+ message_pipe->Attach(port, scoped_refptr<Channel>(this), local_id); |
+ return local_id; |
+} |
+ |
+void Channel::RunMessagePipeEndpoint(MessageInTransit::EndpointId local_id, |
+ MessageInTransit::EndpointId remote_id) { |
+ EndpointInfo endpoint_info; |
+ { |
+ base::AutoLock locker(lock_); |
+ |
+ IdToEndpointInfoMap::const_iterator it = |
+ local_id_to_endpoint_info_map_.find(local_id); |
+ CHECK(it != local_id_to_endpoint_info_map_.end()); |
+ endpoint_info = it->second; |
+ } |
+ |
+ endpoint_info.message_pipe->Run(endpoint_info.port, remote_id); |
+} |
+ |
+bool Channel::WriteMessage(MessageInTransit* message) { |
+ base::AutoLock locker(lock_); |
+ if (!raw_channel_.get()) { |
+ // TODO(vtl): I think this is probably not an error condition, but I should |
+ // think about it (and the shutdown sequence) more carefully. |
+ LOG(INFO) << "WriteMessage() after shutdown"; |
+ return false; |
+ } |
+ |
+ return raw_channel_->WriteMessage(message); |
+} |
+ |
+void Channel::DetachMessagePipeEndpoint(MessageInTransit::EndpointId local_id) { |
+ DCHECK_NE(local_id, MessageInTransit::kInvalidEndpointId); |
+ |
+ base::AutoLock locker_(lock_); |
+ local_id_to_endpoint_info_map_.erase(local_id); |
+} |
+ |
+Channel::~Channel() { |
+ // The channel should have been shut down first. |
+ DCHECK(!raw_channel_.get()); |
+} |
+ |
+void Channel::OnReadMessage(const MessageInTransit& message) { |
+ switch (message.type()) { |
+ case MessageInTransit::kTypeMessagePipeEndpoint: |
+ case MessageInTransit::kTypeMessagePipe: |
+ OnReadMessageForDownstream(message); |
+ break; |
+ case MessageInTransit::TYPE_CHANNEL: |
+ OnReadMessageForChannel(message); |
+ break; |
+ default: |
+ HandleRemoteError(base::StringPrintf( |
+ "Received message of invalid type %u", |
+ static_cast<unsigned>(message.type()))); |
+ break; |
+ } |
+} |
+ |
+void Channel::OnFatalError(FatalError fatal_error) { |
+ // TODO(vtl): IMPORTANT. Notify all our endpoints that they're dead. |
+ NOTIMPLEMENTED(); |
+} |
+ |
+void Channel::OnReadMessageForDownstream(const MessageInTransit& message) { |
+ DCHECK(message.type() == MessageInTransit::kTypeMessagePipeEndpoint || |
+ message.type() == MessageInTransit::kTypeMessagePipe); |
+ |
+ MessageInTransit::EndpointId local_id = message.destination_id(); |
+ if (local_id == MessageInTransit::kInvalidEndpointId) { |
+ HandleRemoteError("Received message with no destination ID"); |
+ return; |
+ } |
+ |
+ EndpointInfo endpoint_info; |
+ { |
+ base::AutoLock locker(lock_); |
+ |
+ // Since we own |raw_channel_|, and this method and |Shutdown()| should only |
+ // be called from the creation thread, |raw_channel_| should never be null |
+ // here. |
+ DCHECK(raw_channel_.get()); |
+ |
+ IdToEndpointInfoMap::const_iterator it = |
+ local_id_to_endpoint_info_map_.find(local_id); |
+ if (it == local_id_to_endpoint_info_map_.end()) { |
+ HandleRemoteError(base::StringPrintf( |
+ "Received a message for nonexistent local destination ID %u", |
+ static_cast<unsigned>(local_id))); |
+ return; |
+ } |
+ endpoint_info = it->second; |
+ } |
+ |
+ // We need to duplicate the message, because |EnqueueMessage()| will take |
+ // ownership of it. |
+ MessageInTransit* own_message = MessageInTransit::Create( |
+ message.type(), message.subtype(), message.data(), message.data_size()); |
+ if (endpoint_info.message_pipe->EnqueueMessage( |
+ MessagePipe::GetPeerPort(endpoint_info.port), |
+ own_message) != MOJO_RESULT_OK) { |
+ HandleLocalError(base::StringPrintf( |
+ "Failed to enqueue message to local destination ID %u", |
+ static_cast<unsigned>(local_id))); |
+ return; |
+ } |
+} |
+ |
+void Channel::OnReadMessageForChannel(const MessageInTransit& message) { |
+ // TODO(vtl): Currently no channel-only messages yet. |
+ HandleRemoteError("Received invalid channel message"); |
+ NOTREACHED(); |
+} |
+ |
+void Channel::HandleRemoteError(const base::StringPiece& error_message) { |
+ // TODO(vtl): Is this how we really want to handle this? |
+ LOG(INFO) << error_message; |
+} |
+ |
+void Channel::HandleLocalError(const base::StringPiece& error_message) { |
+ // TODO(vtl): Is this how we really want to handle this? |
+ LOG(FATAL) << error_message; |
+} |
+ |
+} // namespace system |
+} // namespace mojo |