OLD | NEW |
| (Empty) |
1 // Copyright 2013 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 #include "mojo/system/message_pipe_dispatcher.h" | |
6 | |
7 #include "base/logging.h" | |
8 #include "mojo/system/channel.h" | |
9 #include "mojo/system/channel_endpoint.h" | |
10 #include "mojo/system/constants.h" | |
11 #include "mojo/system/local_message_pipe_endpoint.h" | |
12 #include "mojo/system/memory.h" | |
13 #include "mojo/system/message_in_transit.h" | |
14 #include "mojo/system/message_pipe.h" | |
15 #include "mojo/system/options_validation.h" | |
16 #include "mojo/system/proxy_message_pipe_endpoint.h" | |
17 | |
18 namespace mojo { | |
19 namespace system { | |
20 | |
21 namespace { | |
22 | |
23 const unsigned kInvalidPort = static_cast<unsigned>(-1); | |
24 | |
25 struct SerializedMessagePipeDispatcher { | |
26 MessageInTransit::EndpointId endpoint_id; | |
27 }; | |
28 | |
29 } // namespace | |
30 | |
31 // MessagePipeDispatcher ------------------------------------------------------- | |
32 | |
33 // static | |
34 const MojoCreateMessagePipeOptions | |
35 MessagePipeDispatcher::kDefaultCreateOptions = { | |
36 static_cast<uint32_t>(sizeof(MojoCreateMessagePipeOptions)), | |
37 MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_NONE}; | |
38 | |
39 MessagePipeDispatcher::MessagePipeDispatcher( | |
40 const MojoCreateMessagePipeOptions& /*validated_options*/) | |
41 : port_(kInvalidPort) { | |
42 } | |
43 | |
44 // static | |
45 MojoResult MessagePipeDispatcher::ValidateCreateOptions( | |
46 UserPointer<const MojoCreateMessagePipeOptions> in_options, | |
47 MojoCreateMessagePipeOptions* out_options) { | |
48 const MojoCreateMessagePipeOptionsFlags kKnownFlags = | |
49 MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_NONE; | |
50 | |
51 *out_options = kDefaultCreateOptions; | |
52 if (in_options.IsNull()) | |
53 return MOJO_RESULT_OK; | |
54 | |
55 UserOptionsReader<MojoCreateMessagePipeOptions> reader(in_options); | |
56 if (!reader.is_valid()) | |
57 return MOJO_RESULT_INVALID_ARGUMENT; | |
58 | |
59 if (!OPTIONS_STRUCT_HAS_MEMBER(MojoCreateMessagePipeOptions, flags, reader)) | |
60 return MOJO_RESULT_OK; | |
61 if ((reader.options().flags & ~kKnownFlags)) | |
62 return MOJO_RESULT_UNIMPLEMENTED; | |
63 out_options->flags = reader.options().flags; | |
64 | |
65 // Checks for fields beyond |flags|: | |
66 | |
67 // (Nothing here yet.) | |
68 | |
69 return MOJO_RESULT_OK; | |
70 } | |
71 | |
72 void MessagePipeDispatcher::Init(scoped_refptr<MessagePipe> message_pipe, | |
73 unsigned port) { | |
74 DCHECK(message_pipe.get()); | |
75 DCHECK(port == 0 || port == 1); | |
76 | |
77 message_pipe_ = message_pipe; | |
78 port_ = port; | |
79 } | |
80 | |
81 Dispatcher::Type MessagePipeDispatcher::GetType() const { | |
82 return kTypeMessagePipe; | |
83 } | |
84 | |
85 // static | |
86 scoped_refptr<MessagePipeDispatcher> | |
87 MessagePipeDispatcher::CreateRemoteMessagePipe( | |
88 scoped_refptr<ChannelEndpoint>* channel_endpoint) { | |
89 scoped_refptr<MessagePipe> message_pipe( | |
90 MessagePipe::CreateLocalProxy(channel_endpoint)); | |
91 scoped_refptr<MessagePipeDispatcher> dispatcher( | |
92 new MessagePipeDispatcher(MessagePipeDispatcher::kDefaultCreateOptions)); | |
93 dispatcher->Init(message_pipe, 0); | |
94 return dispatcher; | |
95 } | |
96 | |
97 // static | |
98 scoped_refptr<MessagePipeDispatcher> MessagePipeDispatcher::Deserialize( | |
99 Channel* channel, | |
100 const void* source, | |
101 size_t size) { | |
102 if (size != sizeof(SerializedMessagePipeDispatcher)) { | |
103 LOG(ERROR) << "Invalid serialized message pipe dispatcher"; | |
104 return scoped_refptr<MessagePipeDispatcher>(); | |
105 } | |
106 | |
107 scoped_refptr<ChannelEndpoint> channel_endpoint; | |
108 scoped_refptr<MessagePipeDispatcher> dispatcher = | |
109 CreateRemoteMessagePipe(&channel_endpoint); | |
110 | |
111 MessageInTransit::EndpointId remote_id = | |
112 static_cast<const SerializedMessagePipeDispatcher*>(source)->endpoint_id; | |
113 if (remote_id == MessageInTransit::kInvalidEndpointId) { | |
114 // This means that the other end was closed, and there were no messages | |
115 // enqueued for us. | |
116 // TODO(vtl): This is wrong. We should produce a "dead" message pipe | |
117 // dispatcher. | |
118 NOTIMPLEMENTED(); | |
119 return scoped_refptr<MessagePipeDispatcher>(); | |
120 } | |
121 MessageInTransit::EndpointId local_id = | |
122 channel->AttachEndpoint(channel_endpoint); | |
123 if (local_id == MessageInTransit::kInvalidEndpointId) { | |
124 LOG(ERROR) << "Failed to deserialize message pipe dispatcher (failed to " | |
125 "attach; remote ID = " << remote_id << ")"; | |
126 return scoped_refptr<MessagePipeDispatcher>(); | |
127 } | |
128 DVLOG(2) << "Deserializing message pipe dispatcher (remote ID = " << remote_id | |
129 << ", new local ID = " << local_id << ")"; | |
130 | |
131 if (!channel->RunMessagePipeEndpoint(local_id, remote_id)) { | |
132 // In general, this shouldn't fail, since we generated |local_id| locally. | |
133 NOTREACHED(); | |
134 return scoped_refptr<MessagePipeDispatcher>(); | |
135 } | |
136 | |
137 // TODO(vtl): FIXME -- Need some error handling here. | |
138 channel->RunRemoteMessagePipeEndpoint(local_id, remote_id); | |
139 return dispatcher; | |
140 } | |
141 | |
142 MessagePipeDispatcher::~MessagePipeDispatcher() { | |
143 // |Close()|/|CloseImplNoLock()| should have taken care of the pipe. | |
144 DCHECK(!message_pipe_.get()); | |
145 } | |
146 | |
147 MessagePipe* MessagePipeDispatcher::GetMessagePipeNoLock() const { | |
148 lock().AssertAcquired(); | |
149 return message_pipe_.get(); | |
150 } | |
151 | |
152 unsigned MessagePipeDispatcher::GetPortNoLock() const { | |
153 lock().AssertAcquired(); | |
154 return port_; | |
155 } | |
156 | |
157 void MessagePipeDispatcher::CancelAllWaitersNoLock() { | |
158 lock().AssertAcquired(); | |
159 message_pipe_->CancelAllWaiters(port_); | |
160 } | |
161 | |
162 void MessagePipeDispatcher::CloseImplNoLock() { | |
163 lock().AssertAcquired(); | |
164 message_pipe_->Close(port_); | |
165 message_pipe_ = nullptr; | |
166 port_ = kInvalidPort; | |
167 } | |
168 | |
169 scoped_refptr<Dispatcher> | |
170 MessagePipeDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() { | |
171 lock().AssertAcquired(); | |
172 | |
173 // TODO(vtl): Currently, there are no options, so we just use | |
174 // |kDefaultCreateOptions|. Eventually, we'll have to duplicate the options | |
175 // too. | |
176 scoped_refptr<MessagePipeDispatcher> rv = | |
177 new MessagePipeDispatcher(kDefaultCreateOptions); | |
178 rv->Init(message_pipe_, port_); | |
179 message_pipe_ = nullptr; | |
180 port_ = kInvalidPort; | |
181 return scoped_refptr<Dispatcher>(rv.get()); | |
182 } | |
183 | |
184 MojoResult MessagePipeDispatcher::WriteMessageImplNoLock( | |
185 UserPointer<const void> bytes, | |
186 uint32_t num_bytes, | |
187 std::vector<DispatcherTransport>* transports, | |
188 MojoWriteMessageFlags flags) { | |
189 DCHECK(!transports || (transports->size() > 0 && | |
190 transports->size() <= kMaxMessageNumHandles)); | |
191 | |
192 lock().AssertAcquired(); | |
193 | |
194 if (num_bytes > kMaxMessageNumBytes) | |
195 return MOJO_RESULT_RESOURCE_EXHAUSTED; | |
196 | |
197 return message_pipe_->WriteMessage( | |
198 port_, bytes, num_bytes, transports, flags); | |
199 } | |
200 | |
201 MojoResult MessagePipeDispatcher::ReadMessageImplNoLock( | |
202 UserPointer<void> bytes, | |
203 UserPointer<uint32_t> num_bytes, | |
204 DispatcherVector* dispatchers, | |
205 uint32_t* num_dispatchers, | |
206 MojoReadMessageFlags flags) { | |
207 lock().AssertAcquired(); | |
208 return message_pipe_->ReadMessage( | |
209 port_, bytes, num_bytes, dispatchers, num_dispatchers, flags); | |
210 } | |
211 | |
212 HandleSignalsState MessagePipeDispatcher::GetHandleSignalsStateImplNoLock() | |
213 const { | |
214 lock().AssertAcquired(); | |
215 return message_pipe_->GetHandleSignalsState(port_); | |
216 } | |
217 | |
218 MojoResult MessagePipeDispatcher::AddWaiterImplNoLock( | |
219 Waiter* waiter, | |
220 MojoHandleSignals signals, | |
221 uint32_t context, | |
222 HandleSignalsState* signals_state) { | |
223 lock().AssertAcquired(); | |
224 return message_pipe_->AddWaiter( | |
225 port_, waiter, signals, context, signals_state); | |
226 } | |
227 | |
228 void MessagePipeDispatcher::RemoveWaiterImplNoLock( | |
229 Waiter* waiter, | |
230 HandleSignalsState* signals_state) { | |
231 lock().AssertAcquired(); | |
232 message_pipe_->RemoveWaiter(port_, waiter, signals_state); | |
233 } | |
234 | |
235 void MessagePipeDispatcher::StartSerializeImplNoLock( | |
236 Channel* /*channel*/, | |
237 size_t* max_size, | |
238 size_t* max_platform_handles) { | |
239 DCHECK(HasOneRef()); // Only one ref => no need to take the lock. | |
240 *max_size = sizeof(SerializedMessagePipeDispatcher); | |
241 *max_platform_handles = 0; | |
242 } | |
243 | |
244 bool MessagePipeDispatcher::EndSerializeAndCloseImplNoLock( | |
245 Channel* channel, | |
246 void* destination, | |
247 size_t* actual_size, | |
248 embedder::PlatformHandleVector* /*platform_handles*/) { | |
249 DCHECK(HasOneRef()); // Only one ref => no need to take the lock. | |
250 | |
251 // Convert the local endpoint to a proxy endpoint (moving the message queue) | |
252 // and attach it to the channel. | |
253 MessageInTransit::EndpointId endpoint_id = | |
254 channel->AttachEndpoint(message_pipe_->ConvertLocalToProxy(port_)); | |
255 // Note: It's okay to get an endpoint ID of |kInvalidEndpointId|. (It's | |
256 // possible that the other endpoint -- the one that we're not sending -- was | |
257 // closed in the intervening time.) In that case, we need to deserialize a | |
258 // "dead" message pipe dispatcher on the other end. (Note that this is | |
259 // different from just producing |MOJO_HANDLE_INVALID|.) | |
260 DVLOG(2) << "Serializing message pipe dispatcher (local ID = " << endpoint_id | |
261 << ")"; | |
262 | |
263 // We now have a local ID. Before we can run the proxy endpoint, we need to | |
264 // get an ack back from the other side with the remote ID. | |
265 static_cast<SerializedMessagePipeDispatcher*>(destination)->endpoint_id = | |
266 endpoint_id; | |
267 | |
268 message_pipe_ = nullptr; | |
269 port_ = kInvalidPort; | |
270 | |
271 *actual_size = sizeof(SerializedMessagePipeDispatcher); | |
272 return true; | |
273 } | |
274 | |
275 // MessagePipeDispatcherTransport ---------------------------------------------- | |
276 | |
277 MessagePipeDispatcherTransport::MessagePipeDispatcherTransport( | |
278 DispatcherTransport transport) | |
279 : DispatcherTransport(transport) { | |
280 DCHECK_EQ(message_pipe_dispatcher()->GetType(), Dispatcher::kTypeMessagePipe); | |
281 } | |
282 | |
283 } // namespace system | |
284 } // namespace mojo | |
OLD | NEW |