Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(402)

Side by Side Diff: mojo/system/channel.cc

Issue 60103005: Mojo: First stab at making MessagePipes work across OS pipes. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Created 7 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/system/channel.h"
6
7 #include "base/basictypes.h"
8 #include "base/bind.h"
9 #include "base/compiler_specific.h"
10 #include "base/logging.h"
11 #include "base/message_loop/message_loop.h"
12 #include "base/strings/stringprintf.h"
13
14 namespace mojo {
15 namespace system {
16
17 COMPILE_ASSERT(Channel::kBootstrapEndpointId !=
18 MessageInTransit::kInvalidEndpointId,
19 kBootstrapEndpointId_is_invalid);
20
21 STATIC_CONST_MEMBER_DEFINITION const MessageInTransit::EndpointId
22 Channel::kBootstrapEndpointId;
23
24 Channel::EndpointInfo::EndpointInfo() {
25 }
26
27 Channel::EndpointInfo::EndpointInfo(scoped_refptr<MessagePipe> message_pipe,
28 unsigned port)
29 : message_pipe(message_pipe),
30 port(port) {
31 }
32
33 Channel::EndpointInfo::~EndpointInfo() {
34 }
35
36 Channel::Channel(const PlatformChannelHandle& handle)
37 : raw_channel_(RawChannel::Create(handle,
38 this,
39 base::MessageLoop::current())),
40 next_local_id_(kBootstrapEndpointId) {
41 #ifndef NDEBUG
42 creation_thread_message_loop_ = base::MessageLoop::current();
43 #endif
44
45 // TODO(vtl): Should there be an explicit |Init()| instead?
darin (slow to review) 2013/11/06 18:26:00 Do you mean Channel::Init()? That might be wise. I
viettrungluu 2013/11/06 21:13:39 Done.
46 raw_channel_->Init();
47 }
48
49 void Channel::Shutdown() {
50 AssertOnCreationThread();
51
52 base::AutoLock locker(lock_);
53 raw_channel_->Shutdown();
54 raw_channel_.reset();
55
56 // TODO(vtl): Should I clear |local_id_to_endpoint_info_map_|? Or assert that
57 // it's empty?
58 }
59
60 MessageInTransit::EndpointId Channel::AttachMessagePipeEndpoint(
61 scoped_refptr<MessagePipe> message_pipe, unsigned port) {
62 MessageInTransit::EndpointId local_id;
63 {
64 base::AutoLock locker(lock_);
65
66 while (next_local_id_ == MessageInTransit::kInvalidEndpointId ||
67 local_id_to_endpoint_info_map_.find(next_local_id_) !=
68 local_id_to_endpoint_info_map_.end())
69 next_local_id_++;
70
71 local_id = next_local_id_;
72 next_local_id_++;
73
74 // TODO(vtl): Use emplace when we move to C++11 unordered_maps. (It'll avoid
75 // some expensive reference count increment/decrements.) Once this is done,
76 // we should be able to delete |EndpointInfo|'s default constructor.
77 local_id_to_endpoint_info_map_[local_id] = EndpointInfo(message_pipe, port);
78 }
79
80 message_pipe->Attach(port, scoped_refptr<Channel>(this), local_id);
81 return local_id;
82 }
83
84 void Channel::RunMessagePipeEndpoint(MessageInTransit::EndpointId local_id,
85 MessageInTransit::EndpointId remote_id) {
86 EndpointInfo endpoint_info;
87 {
88 base::AutoLock locker(lock_);
89
90 IdToEndpointInfoMap::const_iterator it =
91 local_id_to_endpoint_info_map_.find(local_id);
92 CHECK(it != local_id_to_endpoint_info_map_.end());
93 endpoint_info = it->second;
94 }
95
96 endpoint_info.message_pipe->Run(endpoint_info.port, remote_id);
97 }
98
99 bool Channel::WriteMessage(MessageInTransit* message) {
100 base::AutoLock locker(lock_);
101 if (!raw_channel_.get()) {
102 // TODO(vtl): I think this is probably not an error condition, but I should
103 // think about it (and the shutdown sequence) more carefully.
104 LOG(INFO) << "WriteMessage() after shutdown";
105 return false;
106 }
107
108 return raw_channel_->WriteMessage(message);
109 }
110
111 void Channel::DetachMessagePipeEndpoint(MessageInTransit::EndpointId local_id) {
112 DCHECK_NE(local_id, MessageInTransit::kInvalidEndpointId);
113
114 base::AutoLock locker_(lock_);
115 local_id_to_endpoint_info_map_.erase(local_id);
116 }
117
118 Channel::~Channel() {
119 // The channel should have been shut down first.
120 DCHECK(!raw_channel_.get());
121 }
122
123 void Channel::OnReadMessage(const MessageInTransit& message) {
124 switch (message.type()) {
125 case MessageInTransit::TYPE_MESSAGE_PIPE_ENDPOINT:
126 case MessageInTransit::TYPE_MESSAGE_PIPE:
127 OnReadMessageForDownstream(message);
128 break;
129 case MessageInTransit::TYPE_CHANNEL:
130 OnReadMessageForChannel(message);
131 break;
132 default:
133 HandleRemoteError(base::StringPrintf(
134 "Received message of invalid type %u",
135 static_cast<unsigned>(message.type())));
136 break;
137 }
138 }
139
140 void Channel::OnFatalError(FatalError fatal_error) {
141 // TODO(vtl): IMPORTANT. Notify all our endpoints that they're dead.
142 NOTIMPLEMENTED();
143 }
144
145 void Channel::OnReadMessageForDownstream(const MessageInTransit& message) {
146 DCHECK(message.type() == MessageInTransit::TYPE_MESSAGE_PIPE_ENDPOINT ||
147 message.type() == MessageInTransit::TYPE_MESSAGE_PIPE);
148
149 MessageInTransit::EndpointId local_id = message.destination_id();
150 if (local_id == MessageInTransit::kInvalidEndpointId) {
151 HandleRemoteError("Received message with no destination ID");
152 return;
153 }
154
155 EndpointInfo endpoint_info;
156 {
157 base::AutoLock locker(lock_);
158
159 // Since we own |raw_channel_|, and this method and |Shutdown()| should only
160 // be called from the creation thread, |raw_channel_| should never be null
161 // here.
162 DCHECK(raw_channel_.get());
163
164 IdToEndpointInfoMap::const_iterator it =
165 local_id_to_endpoint_info_map_.find(local_id);
166 if (it == local_id_to_endpoint_info_map_.end()) {
167 HandleRemoteError(base::StringPrintf(
168 "Received a message for nonexistent local destination ID %u",
169 static_cast<unsigned>(local_id)));
170 return;
171 }
172 endpoint_info = it->second;
173 }
174
175 // We need to duplicate the message, because |EnqueueMessage()| will take
176 // ownership of it.
177 MessageInTransit* own_message = MessageInTransit::Create(
178 message.type(), message.subtype(), message.data(), message.data_size());
179 if (endpoint_info.message_pipe->EnqueueMessage(
180 MessagePipe::GetPeerPort(endpoint_info.port),
181 own_message) != MOJO_RESULT_OK) {
182 HandleLocalError(base::StringPrintf(
183 "Failed to enqueue message to local destination ID %u",
184 static_cast<unsigned>(local_id)));
185 return;
186 }
187 }
188
189 void Channel::OnReadMessageForChannel(const MessageInTransit& message) {
190 // TODO(vtl): Currently no channel-only messages yet.
191 HandleRemoteError("Received invalid channel message");
192 NOTREACHED();
193 }
194
195 void Channel::HandleRemoteError(const base::StringPiece& error_message) {
196 // TODO(vtl): Is this how we really want to handle this?
197 LOG(INFO) << error_message;
198 }
199
200 void Channel::HandleLocalError(const base::StringPiece& error_message) {
201 // TODO(vtl): Is this how we really want to handle this?
202 LOG(FATAL) << error_message;
203 }
204
205 #ifndef NDEBUG
206 void Channel::AssertOnCreationThread() {
207 DCHECK_EQ(base::MessageLoop::current(), creation_thread_message_loop_);
208 }
209 #endif
210
211 } // namespace system
212 } // namespace mojo
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698