| Index: mojo/system/channel.cc
|
| diff --git a/mojo/system/channel.cc b/mojo/system/channel.cc
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..095e44cab1feddb6be2c662d0d734bc13fa1c23d
|
| --- /dev/null
|
| +++ b/mojo/system/channel.cc
|
| @@ -0,0 +1,215 @@
|
| +// Copyright 2013 The Chromium Authors. All rights reserved.
|
| +// Use of this source code is governed by a BSD-style license that can be
|
| +// found in the LICENSE file.
|
| +
|
| +#include "mojo/system/channel.h"
|
| +
|
| +#include "base/basictypes.h"
|
| +#include "base/bind.h"
|
| +#include "base/compiler_specific.h"
|
| +#include "base/logging.h"
|
| +#include "base/message_loop/message_loop.h"
|
| +#include "base/strings/stringprintf.h"
|
| +
|
| +namespace mojo {
|
| +namespace system {
|
| +
|
| +COMPILE_ASSERT(Channel::kBootstrapEndpointId !=
|
| + MessageInTransit::kInvalidEndpointId,
|
| + kBootstrapEndpointId_is_invalid);
|
| +
|
| +STATIC_CONST_MEMBER_DEFINITION const MessageInTransit::EndpointId
|
| + Channel::kBootstrapEndpointId;
|
| +
|
| +Channel::EndpointInfo::EndpointInfo() {
|
| +}
|
| +
|
| +Channel::EndpointInfo::EndpointInfo(scoped_refptr<MessagePipe> message_pipe,
|
| + unsigned port)
|
| + : message_pipe(message_pipe),
|
| + port(port) {
|
| +}
|
| +
|
| +Channel::EndpointInfo::~EndpointInfo() {
|
| +}
|
| +
|
| +Channel::Channel()
|
| + : next_local_id_(kBootstrapEndpointId) {
|
| +}
|
| +
|
| +bool Channel::Init(const PlatformChannelHandle& handle) {
|
| + DCHECK(creation_thread_checker_.CalledOnValidThread());
|
| +
|
| + // No need to take |lock_|, since this must be called before this object
|
| + // becomes thread-safe.
|
| + DCHECK(!raw_channel_.get());
|
| +
|
| + raw_channel_.reset(
|
| + RawChannel::Create(handle, this, base::MessageLoop::current()));
|
| + if (!raw_channel_->Init()) {
|
| + raw_channel_.reset();
|
| + return false;
|
| + }
|
| +
|
| + return true;
|
| +}
|
| +
|
| +void Channel::Shutdown() {
|
| + DCHECK(creation_thread_checker_.CalledOnValidThread());
|
| +
|
| + base::AutoLock locker(lock_);
|
| + DCHECK(raw_channel_.get());
|
| + raw_channel_->Shutdown();
|
| + raw_channel_.reset();
|
| +
|
| + // TODO(vtl): Should I clear |local_id_to_endpoint_info_map_|? Or assert that
|
| + // it's empty?
|
| +}
|
| +
|
| +MessageInTransit::EndpointId Channel::AttachMessagePipeEndpoint(
|
| + scoped_refptr<MessagePipe> message_pipe, unsigned port) {
|
| + MessageInTransit::EndpointId local_id;
|
| + {
|
| + base::AutoLock locker(lock_);
|
| +
|
| + while (next_local_id_ == MessageInTransit::kInvalidEndpointId ||
|
| + local_id_to_endpoint_info_map_.find(next_local_id_) !=
|
| + local_id_to_endpoint_info_map_.end())
|
| + next_local_id_++;
|
| +
|
| + local_id = next_local_id_;
|
| + next_local_id_++;
|
| +
|
| + // TODO(vtl): Use emplace when we move to C++11 unordered_maps. (It'll avoid
|
| + // some expensive reference count increment/decrements.) Once this is done,
|
| + // we should be able to delete |EndpointInfo|'s default constructor.
|
| + local_id_to_endpoint_info_map_[local_id] = EndpointInfo(message_pipe, port);
|
| + }
|
| +
|
| + message_pipe->Attach(port, scoped_refptr<Channel>(this), local_id);
|
| + return local_id;
|
| +}
|
| +
|
| +void Channel::RunMessagePipeEndpoint(MessageInTransit::EndpointId local_id,
|
| + MessageInTransit::EndpointId remote_id) {
|
| + EndpointInfo endpoint_info;
|
| + {
|
| + base::AutoLock locker(lock_);
|
| +
|
| + IdToEndpointInfoMap::const_iterator it =
|
| + local_id_to_endpoint_info_map_.find(local_id);
|
| + CHECK(it != local_id_to_endpoint_info_map_.end());
|
| + endpoint_info = it->second;
|
| + }
|
| +
|
| + endpoint_info.message_pipe->Run(endpoint_info.port, remote_id);
|
| +}
|
| +
|
| +bool Channel::WriteMessage(MessageInTransit* message) {
|
| + base::AutoLock locker(lock_);
|
| + if (!raw_channel_.get()) {
|
| + // TODO(vtl): I think this is probably not an error condition, but I should
|
| + // think about it (and the shutdown sequence) more carefully.
|
| + LOG(INFO) << "WriteMessage() after shutdown";
|
| + return false;
|
| + }
|
| +
|
| + return raw_channel_->WriteMessage(message);
|
| +}
|
| +
|
| +void Channel::DetachMessagePipeEndpoint(MessageInTransit::EndpointId local_id) {
|
| + DCHECK_NE(local_id, MessageInTransit::kInvalidEndpointId);
|
| +
|
| + base::AutoLock locker_(lock_);
|
| + local_id_to_endpoint_info_map_.erase(local_id);
|
| +}
|
| +
|
| +Channel::~Channel() {
|
| + // The channel should have been shut down first.
|
| + DCHECK(!raw_channel_.get());
|
| +}
|
| +
|
| +void Channel::OnReadMessage(const MessageInTransit& message) {
|
| + switch (message.type()) {
|
| + case MessageInTransit::kTypeMessagePipeEndpoint:
|
| + case MessageInTransit::kTypeMessagePipe:
|
| + OnReadMessageForDownstream(message);
|
| + break;
|
| + case MessageInTransit::TYPE_CHANNEL:
|
| + OnReadMessageForChannel(message);
|
| + break;
|
| + default:
|
| + HandleRemoteError(base::StringPrintf(
|
| + "Received message of invalid type %u",
|
| + static_cast<unsigned>(message.type())));
|
| + break;
|
| + }
|
| +}
|
| +
|
| +void Channel::OnFatalError(FatalError fatal_error) {
|
| + // TODO(vtl): IMPORTANT. Notify all our endpoints that they're dead.
|
| + NOTIMPLEMENTED();
|
| +}
|
| +
|
| +void Channel::OnReadMessageForDownstream(const MessageInTransit& message) {
|
| + DCHECK(message.type() == MessageInTransit::kTypeMessagePipeEndpoint ||
|
| + message.type() == MessageInTransit::kTypeMessagePipe);
|
| +
|
| + MessageInTransit::EndpointId local_id = message.destination_id();
|
| + if (local_id == MessageInTransit::kInvalidEndpointId) {
|
| + HandleRemoteError("Received message with no destination ID");
|
| + return;
|
| + }
|
| +
|
| + EndpointInfo endpoint_info;
|
| + {
|
| + base::AutoLock locker(lock_);
|
| +
|
| + // Since we own |raw_channel_|, and this method and |Shutdown()| should only
|
| + // be called from the creation thread, |raw_channel_| should never be null
|
| + // here.
|
| + DCHECK(raw_channel_.get());
|
| +
|
| + IdToEndpointInfoMap::const_iterator it =
|
| + local_id_to_endpoint_info_map_.find(local_id);
|
| + if (it == local_id_to_endpoint_info_map_.end()) {
|
| + HandleRemoteError(base::StringPrintf(
|
| + "Received a message for nonexistent local destination ID %u",
|
| + static_cast<unsigned>(local_id)));
|
| + return;
|
| + }
|
| + endpoint_info = it->second;
|
| + }
|
| +
|
| + // We need to duplicate the message, because |EnqueueMessage()| will take
|
| + // ownership of it.
|
| + MessageInTransit* own_message = MessageInTransit::Create(
|
| + message.type(), message.subtype(), message.data(), message.data_size());
|
| + if (endpoint_info.message_pipe->EnqueueMessage(
|
| + MessagePipe::GetPeerPort(endpoint_info.port),
|
| + own_message) != MOJO_RESULT_OK) {
|
| + HandleLocalError(base::StringPrintf(
|
| + "Failed to enqueue message to local destination ID %u",
|
| + static_cast<unsigned>(local_id)));
|
| + return;
|
| + }
|
| +}
|
| +
|
| +void Channel::OnReadMessageForChannel(const MessageInTransit& message) {
|
| + // TODO(vtl): Currently no channel-only messages yet.
|
| + HandleRemoteError("Received invalid channel message");
|
| + NOTREACHED();
|
| +}
|
| +
|
| +void Channel::HandleRemoteError(const base::StringPiece& error_message) {
|
| + // TODO(vtl): Is this how we really want to handle this?
|
| + LOG(INFO) << error_message;
|
| +}
|
| +
|
| +void Channel::HandleLocalError(const base::StringPiece& error_message) {
|
| + // TODO(vtl): Is this how we really want to handle this?
|
| + LOG(FATAL) << error_message;
|
| +}
|
| +
|
| +} // namespace system
|
| +} // namespace mojo
|
|
|