Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(248)

Side by Side Diff: mojo/system/channel.cc

Issue 60103005: Mojo: First stab at making MessagePipes work across OS pipes. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: rebased Created 7 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/system/channel.h"
6
7 #include "base/basictypes.h"
8 #include "base/bind.h"
9 #include "base/compiler_specific.h"
10 #include "base/logging.h"
11 #include "base/message_loop/message_loop.h"
12 #include "base/strings/stringprintf.h"
13
14 namespace mojo {
15 namespace system {
16
17 COMPILE_ASSERT(Channel::kBootstrapEndpointId !=
18 MessageInTransit::kInvalidEndpointId,
19 kBootstrapEndpointId_is_invalid);
20
21 STATIC_CONST_MEMBER_DEFINITION const MessageInTransit::EndpointId
22 Channel::kBootstrapEndpointId;
23
24 Channel::EndpointInfo::EndpointInfo() {
25 }
26
27 Channel::EndpointInfo::EndpointInfo(scoped_refptr<MessagePipe> message_pipe,
28 unsigned port)
29 : message_pipe(message_pipe),
30 port(port) {
31 }
32
33 Channel::EndpointInfo::~EndpointInfo() {
34 }
35
36 Channel::Channel()
37 : next_local_id_(kBootstrapEndpointId) {
38 }
39
40 bool Channel::Init(const PlatformChannelHandle& handle) {
41 DCHECK(creation_thread_checker_.CalledOnValidThread());
42
43 // No need to take |lock_|, since this must be called before this object
44 // becomes thread-safe.
45 DCHECK(!raw_channel_.get());
46
47 raw_channel_.reset(
48 RawChannel::Create(handle, this, base::MessageLoop::current()));
49 if (!raw_channel_->Init()) {
50 raw_channel_.reset();
51 return false;
52 }
53
54 return true;
55 }
56
57 void Channel::Shutdown() {
58 DCHECK(creation_thread_checker_.CalledOnValidThread());
59
60 base::AutoLock locker(lock_);
61 DCHECK(raw_channel_.get());
62 raw_channel_->Shutdown();
63 raw_channel_.reset();
64
65 // TODO(vtl): Should I clear |local_id_to_endpoint_info_map_|? Or assert that
66 // it's empty?
67 }
68
69 MessageInTransit::EndpointId Channel::AttachMessagePipeEndpoint(
70 scoped_refptr<MessagePipe> message_pipe, unsigned port) {
71 MessageInTransit::EndpointId local_id;
72 {
73 base::AutoLock locker(lock_);
74
75 while (next_local_id_ == MessageInTransit::kInvalidEndpointId ||
76 local_id_to_endpoint_info_map_.find(next_local_id_) !=
77 local_id_to_endpoint_info_map_.end())
78 next_local_id_++;
79
80 local_id = next_local_id_;
81 next_local_id_++;
82
83 // TODO(vtl): Use emplace when we move to C++11 unordered_maps. (It'll avoid
84 // some expensive reference count increment/decrements.) Once this is done,
85 // we should be able to delete |EndpointInfo|'s default constructor.
86 local_id_to_endpoint_info_map_[local_id] = EndpointInfo(message_pipe, port);
87 }
88
89 message_pipe->Attach(port, scoped_refptr<Channel>(this), local_id);
90 return local_id;
91 }
92
93 void Channel::RunMessagePipeEndpoint(MessageInTransit::EndpointId local_id,
94 MessageInTransit::EndpointId remote_id) {
95 EndpointInfo endpoint_info;
96 {
97 base::AutoLock locker(lock_);
98
99 IdToEndpointInfoMap::const_iterator it =
100 local_id_to_endpoint_info_map_.find(local_id);
101 CHECK(it != local_id_to_endpoint_info_map_.end());
102 endpoint_info = it->second;
103 }
104
105 endpoint_info.message_pipe->Run(endpoint_info.port, remote_id);
106 }
107
108 bool Channel::WriteMessage(MessageInTransit* message) {
109 base::AutoLock locker(lock_);
110 if (!raw_channel_.get()) {
111 // TODO(vtl): I think this is probably not an error condition, but I should
112 // think about it (and the shutdown sequence) more carefully.
113 LOG(INFO) << "WriteMessage() after shutdown";
114 return false;
115 }
116
117 return raw_channel_->WriteMessage(message);
118 }
119
120 void Channel::DetachMessagePipeEndpoint(MessageInTransit::EndpointId local_id) {
121 DCHECK_NE(local_id, MessageInTransit::kInvalidEndpointId);
122
123 base::AutoLock locker_(lock_);
124 local_id_to_endpoint_info_map_.erase(local_id);
125 }
126
127 Channel::~Channel() {
128 // The channel should have been shut down first.
129 DCHECK(!raw_channel_.get());
130 }
131
132 void Channel::OnReadMessage(const MessageInTransit& message) {
133 switch (message.type()) {
134 case MessageInTransit::kTypeMessagePipeEndpoint:
135 case MessageInTransit::kTypeMessagePipe:
136 OnReadMessageForDownstream(message);
137 break;
138 case MessageInTransit::TYPE_CHANNEL:
139 OnReadMessageForChannel(message);
140 break;
141 default:
142 HandleRemoteError(base::StringPrintf(
143 "Received message of invalid type %u",
144 static_cast<unsigned>(message.type())));
145 break;
146 }
147 }
148
149 void Channel::OnFatalError(FatalError fatal_error) {
150 // TODO(vtl): IMPORTANT. Notify all our endpoints that they're dead.
151 NOTIMPLEMENTED();
152 }
153
154 void Channel::OnReadMessageForDownstream(const MessageInTransit& message) {
155 DCHECK(message.type() == MessageInTransit::kTypeMessagePipeEndpoint ||
156 message.type() == MessageInTransit::kTypeMessagePipe);
157
158 MessageInTransit::EndpointId local_id = message.destination_id();
159 if (local_id == MessageInTransit::kInvalidEndpointId) {
160 HandleRemoteError("Received message with no destination ID");
161 return;
162 }
163
164 EndpointInfo endpoint_info;
165 {
166 base::AutoLock locker(lock_);
167
168 // Since we own |raw_channel_|, and this method and |Shutdown()| should only
169 // be called from the creation thread, |raw_channel_| should never be null
170 // here.
171 DCHECK(raw_channel_.get());
172
173 IdToEndpointInfoMap::const_iterator it =
174 local_id_to_endpoint_info_map_.find(local_id);
175 if (it == local_id_to_endpoint_info_map_.end()) {
176 HandleRemoteError(base::StringPrintf(
177 "Received a message for nonexistent local destination ID %u",
178 static_cast<unsigned>(local_id)));
179 return;
180 }
181 endpoint_info = it->second;
182 }
183
184 // We need to duplicate the message, because |EnqueueMessage()| will take
185 // ownership of it.
186 MessageInTransit* own_message = MessageInTransit::Create(
187 message.type(), message.subtype(), message.data(), message.data_size());
188 if (endpoint_info.message_pipe->EnqueueMessage(
189 MessagePipe::GetPeerPort(endpoint_info.port),
190 own_message) != MOJO_RESULT_OK) {
191 HandleLocalError(base::StringPrintf(
192 "Failed to enqueue message to local destination ID %u",
193 static_cast<unsigned>(local_id)));
194 return;
195 }
196 }
197
198 void Channel::OnReadMessageForChannel(const MessageInTransit& message) {
199 // TODO(vtl): Currently no channel-only messages yet.
200 HandleRemoteError("Received invalid channel message");
201 NOTREACHED();
202 }
203
204 void Channel::HandleRemoteError(const base::StringPiece& error_message) {
205 // TODO(vtl): Is this how we really want to handle this?
206 LOG(INFO) << error_message;
207 }
208
209 void Channel::HandleLocalError(const base::StringPiece& error_message) {
210 // TODO(vtl): Is this how we really want to handle this?
211 LOG(FATAL) << error_message;
212 }
213
214 } // namespace system
215 } // namespace mojo
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698