Chromium Code Reviews| Index: mojo/system/channel.cc |
| diff --git a/mojo/system/channel.cc b/mojo/system/channel.cc |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..d6d090f976e966b8e0914ea4ad196acd0faac080 |
| --- /dev/null |
| +++ b/mojo/system/channel.cc |
| @@ -0,0 +1,212 @@ |
| +// Copyright 2013 The Chromium Authors. All rights reserved. |
| +// Use of this source code is governed by a BSD-style license that can be |
| +// found in the LICENSE file. |
| + |
| +#include "mojo/system/channel.h" |
| + |
| +#include "base/basictypes.h" |
| +#include "base/bind.h" |
| +#include "base/compiler_specific.h" |
| +#include "base/logging.h" |
| +#include "base/message_loop/message_loop.h" |
| +#include "base/strings/stringprintf.h" |
| + |
| +namespace mojo { |
| +namespace system { |
| + |
| +COMPILE_ASSERT(Channel::kBootstrapEndpointId != |
| + MessageInTransit::kInvalidEndpointId, |
| + kBootstrapEndpointId_is_invalid); |
| + |
| +STATIC_CONST_MEMBER_DEFINITION const MessageInTransit::EndpointId |
| + Channel::kBootstrapEndpointId; |
| + |
| +Channel::EndpointInfo::EndpointInfo() { |
| +} |
| + |
| +Channel::EndpointInfo::EndpointInfo(scoped_refptr<MessagePipe> message_pipe, |
| + unsigned port) |
| + : message_pipe(message_pipe), |
| + port(port) { |
| +} |
| + |
| +Channel::EndpointInfo::~EndpointInfo() { |
| +} |
| + |
| +Channel::Channel(const PlatformChannelHandle& handle) |
| + : raw_channel_(RawChannel::Create(handle, |
| + this, |
| + base::MessageLoop::current())), |
| + next_local_id_(kBootstrapEndpointId) { |
| +#ifndef NDEBUG |
| + creation_thread_message_loop_ = base::MessageLoop::current(); |
| +#endif |
| + |
| + // TODO(vtl): Should there be an explicit |Init()| instead? |
|
darin (slow to review)
2013/11/06 18:26:00
Do you mean Channel::Init()? That might be wise. I
viettrungluu
2013/11/06 21:13:39
Done.
|
| + raw_channel_->Init(); |
| +} |
| + |
| +void Channel::Shutdown() { |
| + AssertOnCreationThread(); |
| + |
| + base::AutoLock locker(lock_); |
| + raw_channel_->Shutdown(); |
| + raw_channel_.reset(); |
| + |
| + // TODO(vtl): Should I clear |local_id_to_endpoint_info_map_|? Or assert that |
| + // it's empty? |
| +} |
| + |
| +MessageInTransit::EndpointId Channel::AttachMessagePipeEndpoint( |
| + scoped_refptr<MessagePipe> message_pipe, unsigned port) { |
| + MessageInTransit::EndpointId local_id; |
| + { |
| + base::AutoLock locker(lock_); |
| + |
| + while (next_local_id_ == MessageInTransit::kInvalidEndpointId || |
| + local_id_to_endpoint_info_map_.find(next_local_id_) != |
| + local_id_to_endpoint_info_map_.end()) |
| + next_local_id_++; |
| + |
| + local_id = next_local_id_; |
| + next_local_id_++; |
| + |
| + // TODO(vtl): Use emplace when we move to C++11 unordered_maps. (It'll avoid |
| + // some expensive reference count increment/decrements.) Once this is done, |
| + // we should be able to delete |EndpointInfo|'s default constructor. |
| + local_id_to_endpoint_info_map_[local_id] = EndpointInfo(message_pipe, port); |
| + } |
| + |
| + message_pipe->Attach(port, scoped_refptr<Channel>(this), local_id); |
| + return local_id; |
| +} |
| + |
| +void Channel::RunMessagePipeEndpoint(MessageInTransit::EndpointId local_id, |
| + MessageInTransit::EndpointId remote_id) { |
| + EndpointInfo endpoint_info; |
| + { |
| + base::AutoLock locker(lock_); |
| + |
| + IdToEndpointInfoMap::const_iterator it = |
| + local_id_to_endpoint_info_map_.find(local_id); |
| + CHECK(it != local_id_to_endpoint_info_map_.end()); |
| + endpoint_info = it->second; |
| + } |
| + |
| + endpoint_info.message_pipe->Run(endpoint_info.port, remote_id); |
| +} |
| + |
| +bool Channel::WriteMessage(MessageInTransit* message) { |
| + base::AutoLock locker(lock_); |
| + if (!raw_channel_.get()) { |
| + // TODO(vtl): I think this is probably not an error condition, but I should |
| + // think about it (and the shutdown sequence) more carefully. |
| + LOG(INFO) << "WriteMessage() after shutdown"; |
| + return false; |
| + } |
| + |
| + return raw_channel_->WriteMessage(message); |
| +} |
| + |
| +void Channel::DetachMessagePipeEndpoint(MessageInTransit::EndpointId local_id) { |
| + DCHECK_NE(local_id, MessageInTransit::kInvalidEndpointId); |
| + |
| + base::AutoLock locker_(lock_); |
| + local_id_to_endpoint_info_map_.erase(local_id); |
| +} |
| + |
| +Channel::~Channel() { |
| + // The channel should have been shut down first. |
| + DCHECK(!raw_channel_.get()); |
| +} |
| + |
| +void Channel::OnReadMessage(const MessageInTransit& message) { |
| + switch (message.type()) { |
| + case MessageInTransit::TYPE_MESSAGE_PIPE_ENDPOINT: |
| + case MessageInTransit::TYPE_MESSAGE_PIPE: |
| + OnReadMessageForDownstream(message); |
| + break; |
| + case MessageInTransit::TYPE_CHANNEL: |
| + OnReadMessageForChannel(message); |
| + break; |
| + default: |
| + HandleRemoteError(base::StringPrintf( |
| + "Received message of invalid type %u", |
| + static_cast<unsigned>(message.type()))); |
| + break; |
| + } |
| +} |
| + |
| +void Channel::OnFatalError(FatalError fatal_error) { |
| + // TODO(vtl): IMPORTANT. Notify all our endpoints that they're dead. |
| + NOTIMPLEMENTED(); |
| +} |
| + |
| +void Channel::OnReadMessageForDownstream(const MessageInTransit& message) { |
| + DCHECK(message.type() == MessageInTransit::TYPE_MESSAGE_PIPE_ENDPOINT || |
| + message.type() == MessageInTransit::TYPE_MESSAGE_PIPE); |
| + |
| + MessageInTransit::EndpointId local_id = message.destination_id(); |
| + if (local_id == MessageInTransit::kInvalidEndpointId) { |
| + HandleRemoteError("Received message with no destination ID"); |
| + return; |
| + } |
| + |
| + EndpointInfo endpoint_info; |
| + { |
| + base::AutoLock locker(lock_); |
| + |
| + // Since we own |raw_channel_|, and this method and |Shutdown()| should only |
| + // be called from the creation thread, |raw_channel_| should never be null |
| + // here. |
| + DCHECK(raw_channel_.get()); |
| + |
| + IdToEndpointInfoMap::const_iterator it = |
| + local_id_to_endpoint_info_map_.find(local_id); |
| + if (it == local_id_to_endpoint_info_map_.end()) { |
| + HandleRemoteError(base::StringPrintf( |
| + "Received a message for nonexistent local destination ID %u", |
| + static_cast<unsigned>(local_id))); |
| + return; |
| + } |
| + endpoint_info = it->second; |
| + } |
| + |
| + // We need to duplicate the message, because |EnqueueMessage()| will take |
| + // ownership of it. |
| + MessageInTransit* own_message = MessageInTransit::Create( |
| + message.type(), message.subtype(), message.data(), message.data_size()); |
| + if (endpoint_info.message_pipe->EnqueueMessage( |
| + MessagePipe::GetPeerPort(endpoint_info.port), |
| + own_message) != MOJO_RESULT_OK) { |
| + HandleLocalError(base::StringPrintf( |
| + "Failed to enqueue message to local destination ID %u", |
| + static_cast<unsigned>(local_id))); |
| + return; |
| + } |
| +} |
| + |
| +void Channel::OnReadMessageForChannel(const MessageInTransit& message) { |
| + // TODO(vtl): Currently no channel-only messages yet. |
| + HandleRemoteError("Received invalid channel message"); |
| + NOTREACHED(); |
| +} |
| + |
| +void Channel::HandleRemoteError(const base::StringPiece& error_message) { |
| + // TODO(vtl): Is this how we really want to handle this? |
| + LOG(INFO) << error_message; |
| +} |
| + |
| +void Channel::HandleLocalError(const base::StringPiece& error_message) { |
| + // TODO(vtl): Is this how we really want to handle this? |
| + LOG(FATAL) << error_message; |
| +} |
| + |
| +#ifndef NDEBUG |
| +void Channel::AssertOnCreationThread() { |
| + DCHECK_EQ(base::MessageLoop::current(), creation_thread_message_loop_); |
| +} |
| +#endif |
| + |
| +} // namespace system |
| +} // namespace mojo |