Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(317)

Side by Side Diff: mojo/edk/system/core.cc

Issue 1585493002: [mojo] Ports EDK (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2013 The Chromium Authors. All rights reserved. 1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/edk/system/core.h" 5 #include "mojo/edk/system/core.h"
6 6
7 #include <stddef.h> 7 #include <string.h>
8 #include <stdint.h>
9 8
10 #include <utility> 9 #include <utility>
11 #include <vector>
12 10
11 #include "base/bind.h"
13 #include "base/containers/stack_container.h" 12 #include "base/containers/stack_container.h"
13 #include "base/location.h"
14 #include "base/logging.h" 14 #include "base/logging.h"
15 #include "base/macros.h"
16 #include "base/message_loop/message_loop.h"
15 #include "base/rand_util.h" 17 #include "base/rand_util.h"
18 #include "base/thread_task_runner_handle.h"
16 #include "base/time/time.h" 19 #include "base/time/time.h"
20 #include "crypto/random.h"
21 #include "mojo/edk/embedder/embedder.h"
17 #include "mojo/edk/embedder/embedder_internal.h" 22 #include "mojo/edk/embedder/embedder_internal.h"
18 #include "mojo/edk/embedder/platform_channel_pair.h"
19 #include "mojo/edk/embedder/platform_shared_buffer.h" 23 #include "mojo/edk/embedder/platform_shared_buffer.h"
20 #include "mojo/edk/embedder/platform_support.h" 24 #include "mojo/edk/embedder/platform_support.h"
21 #include "mojo/edk/system/async_waiter.h" 25 #include "mojo/edk/system/async_waiter.h"
22 #include "mojo/edk/system/broker.h" 26 #include "mojo/edk/system/channel.h"
23 #include "mojo/edk/system/configuration.h" 27 #include "mojo/edk/system/configuration.h"
24 #include "mojo/edk/system/data_pipe.h"
25 #include "mojo/edk/system/data_pipe_consumer_dispatcher.h" 28 #include "mojo/edk/system/data_pipe_consumer_dispatcher.h"
26 #include "mojo/edk/system/data_pipe_producer_dispatcher.h" 29 #include "mojo/edk/system/data_pipe_producer_dispatcher.h"
27 #include "mojo/edk/system/dispatcher.h"
28 #include "mojo/edk/system/handle_signals_state.h" 30 #include "mojo/edk/system/handle_signals_state.h"
29 #include "mojo/edk/system/message_pipe_dispatcher.h" 31 #include "mojo/edk/system/message_pipe_dispatcher.h"
32 #include "mojo/edk/system/platform_handle_dispatcher.h"
33 #include "mojo/edk/system/ports/node.h"
34 #include "mojo/edk/system/remote_message_pipe_bootstrap.h"
30 #include "mojo/edk/system/shared_buffer_dispatcher.h" 35 #include "mojo/edk/system/shared_buffer_dispatcher.h"
31 #include "mojo/edk/system/wait_set_dispatcher.h" 36 #include "mojo/edk/system/wait_set_dispatcher.h"
32 #include "mojo/edk/system/waiter.h" 37 #include "mojo/edk/system/waiter.h"
33 #include "mojo/public/c/system/macros.h"
34 #include "mojo/public/cpp/system/macros.h"
35 38
36 namespace mojo { 39 namespace mojo {
37 namespace edk { 40 namespace edk {
38 41
39 // Implementation notes 42 namespace {
40 //
41 // Mojo primitives are implemented by the singleton |Core| object. Most calls
42 // are for a "primary" handle (the first argument). |Core::GetDispatcher()| is
43 // used to look up a |Dispatcher| object for a given handle. That object
44 // implements most primitives for that object. The wait primitives are not
45 // attached to objects and are implemented by |Core| itself.
46 //
47 // Some objects have multiple handles associated to them, e.g., message pipes
48 // (which have two). In such a case, there is still a |Dispatcher| (e.g.,
49 // |MessagePipeDispatcher|) for each handle, with each handle having a strong
50 // reference to the common "secondary" object (e.g., |MessagePipe|). This
51 // secondary object does NOT have any references to the |Dispatcher|s (even if
52 // it did, it wouldn't be able to do anything with them due to lock order
53 // requirements -- see below).
54 //
55 // Waiting is implemented by having the thread that wants to wait call the
56 // |Dispatcher|s for the handles that it wants to wait on with a |Waiter|
57 // object; this |Waiter| object may be created on the stack of that thread or be
58 // kept in thread local storage for that thread (TODO(vtl): future improvement).
59 // The |Dispatcher| then adds the |Waiter| to an |AwakableList| that's either
60 // owned by that |Dispatcher| (see |SimpleDispatcher|) or by a secondary object
61 // (e.g., |MessagePipe|). To signal/wake a |Waiter|, the object in question --
62 // either a |SimpleDispatcher| or a secondary object -- talks to its
63 // |AwakableList|.
64 43
65 // Thread-safety notes 44 // This is an unnecessarily large limit that is relatively easy to enforce.
66 // 45 const uint32_t kMaxHandlesPerMessage = 1024 * 1024;
67 // Mojo primitives calls are thread-safe. We achieve this with relatively
68 // fine-grained locking. There is a global handle table lock. This lock should
69 // be held as briefly as possible (TODO(vtl): a future improvement would be to
70 // switch it to a reader-writer lock). Each |Dispatcher| object then has a lock
71 // (which subclasses can use to protect their data).
72 //
73 // The lock ordering is as follows:
74 // 1. global handle table lock, global mapping table lock
75 // 2. |Dispatcher| locks
76 // 3. secondary object locks
77 // ...
78 // INF. |Waiter| locks
79 //
80 // Notes:
81 // - While holding a |Dispatcher| lock, you may not unconditionally attempt
82 // to take another |Dispatcher| lock. (This has consequences on the
83 // concurrency semantics of |MojoWriteMessage()| when passing handles.)
84 // Doing so would lead to deadlock.
85 // - Locks at the "INF" level may not have any locks taken while they are
86 // held.
87 46
88 // TODO(vtl): This should take a |scoped_ptr<PlatformSupport>| as a parameter. 47 void OnPortConnected(
89 Core::Core(PlatformSupport* platform_support) 48 Core* core,
90 : platform_support_(platform_support) { 49 int endpoint,
50 const base::Callback<void(ScopedMessagePipeHandle)>& callback,
51 const ports::PortRef& port) {
52 // TODO: Maybe we could negotiate a pipe ID for cross-process pipes too;
53 // for now we just use 0x7F7F7F7F7F7F7F7F. In practice these are used for
54 // bootstrap and aren't passed around, so tracking them is less important.
55 MojoHandle handle = core->AddDispatcher(
56 new MessagePipeDispatcher(core->GetNodeController(), port,
57 0x7f7f7f7f7f7f7f7fUL, endpoint));
58 callback.Run(ScopedMessagePipeHandle(MessagePipeHandle(handle)));
91 } 59 }
92 60
61 } // namespace
62
63 Core::Core() {}
64
93 Core::~Core() { 65 Core::~Core() {
66 if (node_controller_ && node_controller_->io_task_runner()) {
67 // If this races with IO thread shutdown the callback will be dropped and
68 // the NodeController will be shutdown on this thread anyway, which is also
69 // just fine.
70 scoped_refptr<base::TaskRunner> io_task_runner =
71 node_controller_->io_task_runner();
72 io_task_runner->PostTask(FROM_HERE,
73 base::Bind(&Core::PassNodeControllerToIOThread,
74 base::Passed(&node_controller_)));
75 }
94 } 76 }
95 77
96 MojoHandle Core::AddDispatcher(const scoped_refptr<Dispatcher>& dispatcher) { 78 void Core::SetIOTaskRunner(scoped_refptr<base::TaskRunner> io_task_runner) {
97 base::AutoLock locker(handle_table_lock_); 79 GetNodeController()->SetIOTaskRunner(io_task_runner);
98 return handle_table_.AddDispatcher(dispatcher); 80 }
81
82 NodeController* Core::GetNodeController() {
83 if (!node_controller_)
84 node_controller_.reset(new NodeController(this));
85 return node_controller_.get();
99 } 86 }
100 87
101 scoped_refptr<Dispatcher> Core::GetDispatcher(MojoHandle handle) { 88 scoped_refptr<Dispatcher> Core::GetDispatcher(MojoHandle handle) {
102 if (handle == MOJO_HANDLE_INVALID) 89 base::AutoLock lock(handles_lock_);
103 return nullptr; 90 return handles_.GetDispatcher(handle);
91 }
104 92
105 base::AutoLock locker(handle_table_lock_); 93 void Core::AddChild(base::ProcessHandle process_handle,
106 return handle_table_.GetDispatcher(handle); 94 ScopedPlatformHandle platform_handle) {
95 GetNodeController()->ConnectToChild(process_handle,
96 std::move(platform_handle));
97 }
98
99 void Core::InitChild(ScopedPlatformHandle platform_handle) {
100 GetNodeController()->ConnectToParent(std::move(platform_handle));
101 }
102
103 MojoHandle Core::AddDispatcher(scoped_refptr<Dispatcher> dispatcher) {
104 base::AutoLock lock(handles_lock_);
105 return handles_.AddDispatcher(dispatcher);
106 }
107
108 bool Core::AddDispatchersFromTransit(
109 const std::vector<Dispatcher::DispatcherInTransit>& dispatchers,
110 MojoHandle* handles) {
111 bool failed = false;
112 {
113 base::AutoLock lock(handles_lock_);
114 if (!handles_.AddDispatchersFromTransit(dispatchers, handles))
115 failed = true;
116 }
117 if (failed) {
118 for (auto d : dispatchers)
119 d.dispatcher->Close();
120 return false;
121 }
122 return true;
123 }
124
125 MojoResult Core::CreatePlatformHandleWrapper(
126 ScopedPlatformHandle platform_handle,
127 MojoHandle* wrapper_handle) {
128 MojoHandle h = AddDispatcher(
129 PlatformHandleDispatcher::Create(std::move(platform_handle)));
130 if (h == MOJO_HANDLE_INVALID)
131 return MOJO_RESULT_RESOURCE_EXHAUSTED;
132 *wrapper_handle = h;
133 return MOJO_RESULT_OK;
134 }
135
136 MojoResult Core::PassWrappedPlatformHandle(
137 MojoHandle wrapper_handle,
138 ScopedPlatformHandle* platform_handle) {
139 base::AutoLock lock(handles_lock_);
140 scoped_refptr<Dispatcher> d;
141 MojoResult result = handles_.GetAndRemoveDispatcher(wrapper_handle, &d);
142 if (result != MOJO_RESULT_OK)
143 return result;
144 PlatformHandleDispatcher* phd =
145 static_cast<PlatformHandleDispatcher*>(d.get());
146 *platform_handle = phd->PassPlatformHandle();
147 phd->Close();
148 return MOJO_RESULT_OK;
149 }
150
151 void Core::RequestShutdown(const base::Closure& callback) {
152 base::Closure on_shutdown;
153 if (base::ThreadTaskRunnerHandle::IsSet()) {
154 on_shutdown = base::Bind(base::IgnoreResult(&base::TaskRunner::PostTask),
155 base::ThreadTaskRunnerHandle::Get(),
156 FROM_HERE, callback);
157 } else {
158 on_shutdown = callback;
159 }
160 GetNodeController()->RequestShutdown(on_shutdown);
161 }
162
163 void Core::CreateParentMessagePipe(
164 ScopedPlatformHandle platform_handle,
165 const base::Callback<void(ScopedMessagePipeHandle)>& callback) {
166 std::string token = GenerateRandomToken();
167 CreateParentMessagePipe(token, callback);
168 RemoteMessagePipeBootstrap::CreateForParent(
169 GetNodeController(), std::move(platform_handle), token);
170 }
171
172 void Core::CreateChildMessagePipe(
173 ScopedPlatformHandle platform_handle,
174 const base::Callback<void(ScopedMessagePipeHandle)>& callback) {
175 ports::PortRef port;
176 GetNodeController()->node()->CreateUninitializedPort(&port);
177 RemoteMessagePipeBootstrap::CreateForChild(
178 GetNodeController(), std::move(platform_handle), port,
179 base::Bind(&OnPortConnected, base::Unretained(this), 1, callback, port));
180 }
181
182 void Core::CreateParentMessagePipe(
183 const std::string& token,
184 const base::Callback<void(ScopedMessagePipeHandle)>& callback) {
185 GetNodeController()->ReservePort(
186 token,
187 base::Bind(&OnPortConnected, base::Unretained(this), 0, callback));
188 }
189
190 void Core::CreateChildMessagePipe(
191 const std::string& token,
192 const base::Callback<void(ScopedMessagePipeHandle)>& callback) {
193 ports::PortRef port;
194 GetNodeController()->node()->CreateUninitializedPort(&port);
195 GetNodeController()->ConnectToParentPort(
196 port, token,
197 base::Bind(&OnPortConnected, base::Unretained(this), 1, callback, port));
107 } 198 }
108 199
109 MojoResult Core::AsyncWait(MojoHandle handle, 200 MojoResult Core::AsyncWait(MojoHandle handle,
110 MojoHandleSignals signals, 201 MojoHandleSignals signals,
111 const base::Callback<void(MojoResult)>& callback) { 202 const base::Callback<void(MojoResult)>& callback) {
112 scoped_refptr<Dispatcher> dispatcher = GetDispatcher(handle); 203 scoped_refptr<Dispatcher> dispatcher = GetDispatcher(handle);
113 DCHECK(dispatcher); 204 DCHECK(dispatcher);
114 205
115 scoped_ptr<AsyncWaiter> waiter = make_scoped_ptr(new AsyncWaiter(callback)); 206 scoped_ptr<AsyncWaiter> waiter = make_scoped_ptr(new AsyncWaiter(callback));
116 MojoResult rv = dispatcher->AddAwakable(waiter.get(), signals, 0, nullptr); 207 MojoResult rv = dispatcher->AddAwakable(waiter.get(), signals, 0, nullptr);
117 if (rv == MOJO_RESULT_OK) 208 if (rv == MOJO_RESULT_OK)
118 ignore_result(waiter.release()); 209 ignore_result(waiter.release());
119 return rv; 210 return rv;
120 } 211 }
121 212
122 MojoTimeTicks Core::GetTimeTicksNow() { 213 MojoTimeTicks Core::GetTimeTicksNow() {
123 return base::TimeTicks::Now().ToInternalValue(); 214 return base::TimeTicks::Now().ToInternalValue();
124 } 215 }
125 216
126 MojoResult Core::Close(MojoHandle handle) { 217 MojoResult Core::Close(MojoHandle handle) {
127 if (handle == MOJO_HANDLE_INVALID)
128 return MOJO_RESULT_INVALID_ARGUMENT;
129
130 scoped_refptr<Dispatcher> dispatcher; 218 scoped_refptr<Dispatcher> dispatcher;
131 { 219 {
132 base::AutoLock locker(handle_table_lock_); 220 base::AutoLock lock(handles_lock_);
133 MojoResult result = 221 MojoResult rv = handles_.GetAndRemoveDispatcher(handle, &dispatcher);
134 handle_table_.GetAndRemoveDispatcher(handle, &dispatcher); 222 if (rv != MOJO_RESULT_OK)
135 if (result != MOJO_RESULT_OK) 223 return rv;
136 return result;
137 } 224 }
138 225 dispatcher->Close();
139 // The dispatcher doesn't have a say in being closed, but gets notified of it. 226 return MOJO_RESULT_OK;
140 // Note: This is done outside of |handle_table_lock_|. As a result, there's a
141 // race condition that the dispatcher must handle; see the comment in
142 // |Dispatcher| in dispatcher.h.
143 return dispatcher->Close();
144 } 227 }
145 228
146 MojoResult Core::Wait(MojoHandle handle, 229 MojoResult Core::Wait(MojoHandle handle,
147 MojoHandleSignals signals, 230 MojoHandleSignals signals,
148 MojoDeadline deadline, 231 MojoDeadline deadline,
149 MojoHandleSignalsState* signals_state) { 232 MojoHandleSignalsState* signals_state) {
150 uint32_t unused = static_cast<uint32_t>(-1); 233 uint32_t unused = static_cast<uint32_t>(-1);
151 HandleSignalsState hss; 234 HandleSignalsState hss;
152 MojoResult rv = WaitManyInternal(&handle, &signals, 1, deadline, &unused, 235 MojoResult rv = WaitManyInternal(&handle, &signals, 1, deadline, &unused,
153 signals_state ? &hss : nullptr); 236 signals_state ? &hss : nullptr);
(...skipping 69 matching lines...) Expand 10 before | Expand all | Expand 10 after
223 if (!dispatcher) 306 if (!dispatcher)
224 return MOJO_RESULT_INVALID_ARGUMENT; 307 return MOJO_RESULT_INVALID_ARGUMENT;
225 308
226 return wait_set_dispatcher->RemoveWaitingDispatcher(dispatcher); 309 return wait_set_dispatcher->RemoveWaitingDispatcher(dispatcher);
227 } 310 }
228 311
229 MojoResult Core::GetReadyHandles(MojoHandle wait_set_handle, 312 MojoResult Core::GetReadyHandles(MojoHandle wait_set_handle,
230 uint32_t* count, 313 uint32_t* count,
231 MojoHandle* handles, 314 MojoHandle* handles,
232 MojoResult* results, 315 MojoResult* results,
233 MojoHandleSignalsState* signals_state) { 316 MojoHandleSignalsState* signals_states) {
234 if (!handles || !count || !(*count) || !results) 317 if (!handles || !count || !(*count) || !results)
235 return MOJO_RESULT_INVALID_ARGUMENT; 318 return MOJO_RESULT_INVALID_ARGUMENT;
236 319
237 scoped_refptr<Dispatcher> wait_set_dispatcher(GetDispatcher(wait_set_handle)); 320 scoped_refptr<Dispatcher> wait_set_dispatcher(GetDispatcher(wait_set_handle));
238 if (!wait_set_dispatcher) 321 if (!wait_set_dispatcher)
239 return MOJO_RESULT_INVALID_ARGUMENT; 322 return MOJO_RESULT_INVALID_ARGUMENT;
240 323
241 DispatcherVector awoken_dispatchers; 324 DispatcherVector awoken_dispatchers;
242 base::StackVector<uintptr_t, 16> contexts; 325 base::StackVector<uintptr_t, 16> contexts;
243 contexts->assign(*count, MOJO_HANDLE_INVALID); 326 contexts->assign(*count, MOJO_HANDLE_INVALID);
244 327
245 MojoResult result = wait_set_dispatcher->GetReadyDispatchers( 328 MojoResult result = wait_set_dispatcher->GetReadyDispatchers(
246 count, &awoken_dispatchers, results, contexts->data()); 329 count, &awoken_dispatchers, results, contexts->data());
247 330
248 if (result == MOJO_RESULT_OK) { 331 if (result == MOJO_RESULT_OK) {
249 for (size_t i = 0; i < *count; i++) { 332 for (size_t i = 0; i < *count; i++) {
250 handles[i] = static_cast<MojoHandle>(contexts[i]); 333 handles[i] = static_cast<MojoHandle>(contexts[i]);
251 if (signals_state) 334 if (signals_states)
252 signals_state[i] = awoken_dispatchers[i]->GetHandleSignalsState(); 335 signals_states[i] = awoken_dispatchers[i]->GetHandleSignalsState();
253 } 336 }
254 } 337 }
255 338
256 return result; 339 return result;
257 } 340 }
258 341
259 MojoResult Core::CreateMessagePipe( 342 MojoResult Core::CreateMessagePipe(
260 const MojoCreateMessagePipeOptions* options, 343 const MojoCreateMessagePipeOptions* options,
261 MojoHandle* message_pipe_handle0, 344 MojoHandle* message_pipe_handle0,
262 MojoHandle* message_pipe_handle1) { 345 MojoHandle* message_pipe_handle1) {
346 ports::PortRef port0, port1;
347 GetNodeController()->node()->CreatePortPair(&port0, &port1);
348
263 CHECK(message_pipe_handle0); 349 CHECK(message_pipe_handle0);
264 CHECK(message_pipe_handle1); 350 CHECK(message_pipe_handle1);
265 MojoCreateMessagePipeOptions validated_options = {};
266 MojoResult result =
267 MessagePipeDispatcher::ValidateCreateOptions(options, &validated_options);
268 if (result != MOJO_RESULT_OK)
269 return result;
270 351
271 scoped_refptr<MessagePipeDispatcher> dispatcher0 = 352 uint64_t pipe_id = base::RandUint64();
272 MessagePipeDispatcher::Create(validated_options);
273 scoped_refptr<MessagePipeDispatcher> dispatcher1 =
274 MessagePipeDispatcher::Create(validated_options);
275 353
276 std::pair<MojoHandle, MojoHandle> handle_pair; 354 *message_pipe_handle0 = AddDispatcher(
277 { 355 new MessagePipeDispatcher(GetNodeController(), port0, pipe_id, 0));
278 base::AutoLock locker(handle_table_lock_); 356 if (*message_pipe_handle0 == MOJO_HANDLE_INVALID)
279 handle_pair = handle_table_.AddDispatcherPair(dispatcher0, dispatcher1); 357 return MOJO_RESULT_RESOURCE_EXHAUSTED;
280 } 358
281 if (handle_pair.first == MOJO_HANDLE_INVALID) { 359 *message_pipe_handle1 = AddDispatcher(
282 DCHECK_EQ(handle_pair.second, MOJO_HANDLE_INVALID); 360 new MessagePipeDispatcher(GetNodeController(), port1, pipe_id, 1));
283 LOG(ERROR) << "Handle table full"; 361 if (*message_pipe_handle1 == MOJO_HANDLE_INVALID) {
284 dispatcher0->Close(); 362 scoped_refptr<Dispatcher> unused;
285 dispatcher1->Close(); 363 unused->Close();
364 handles_.GetAndRemoveDispatcher(*message_pipe_handle0, &unused);
286 return MOJO_RESULT_RESOURCE_EXHAUSTED; 365 return MOJO_RESULT_RESOURCE_EXHAUSTED;
287 } 366 }
288 367
289 if (validated_options.flags &
290 MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_TRANSFERABLE) {
291 ScopedPlatformHandle server_handle, client_handle;
292 #if defined(OS_WIN)
293 internal::g_broker->CreatePlatformChannelPair(&server_handle,
294 &client_handle);
295 #else
296 PlatformChannelPair channel_pair;
297 server_handle = channel_pair.PassServerHandle();
298 client_handle = channel_pair.PassClientHandle();
299 #endif
300 dispatcher0->Init(std::move(server_handle), nullptr, 0u, nullptr, 0u,
301 nullptr, nullptr);
302 dispatcher1->Init(std::move(client_handle), nullptr, 0u, nullptr, 0u,
303 nullptr, nullptr);
304 } else {
305 uint64_t pipe_id = 0;
306 // route_id 0 is used internally in RoutedRawChannel. See kInternalRouteId
307 // in routed_raw_channel.cc.
308 // route_id 1 is used by broker communication. See kBrokerRouteId in
309 // broker_messages.h.
310 while (pipe_id < 2)
311 pipe_id = base::RandUint64();
312 dispatcher0->InitNonTransferable(pipe_id);
313 dispatcher1->InitNonTransferable(pipe_id);
314 }
315
316 *message_pipe_handle0 = handle_pair.first;
317 *message_pipe_handle1 = handle_pair.second;
318 return MOJO_RESULT_OK; 368 return MOJO_RESULT_OK;
319 } 369 }
320 370
321 // Implementation note: To properly cancel waiters and avoid other races, this
322 // does not transfer dispatchers from one handle to another, even when sending a
323 // message in-process. Instead, it must transfer the "contents" of the
324 // dispatcher to a new dispatcher, and then close the old dispatcher. If this
325 // isn't done, in the in-process case, calls on the old handle may complete
326 // after the the message has been received and a new handle created (and
327 // possibly even after calls have been made on the new handle).
328 MojoResult Core::WriteMessage(MojoHandle message_pipe_handle, 371 MojoResult Core::WriteMessage(MojoHandle message_pipe_handle,
329 const void* bytes, 372 const void* bytes,
330 uint32_t num_bytes, 373 uint32_t num_bytes,
331 const MojoHandle* handles, 374 const MojoHandle* handles,
332 uint32_t num_handles, 375 uint32_t num_handles,
333 MojoWriteMessageFlags flags) { 376 MojoWriteMessageFlags flags) {
334 scoped_refptr<Dispatcher> dispatcher(GetDispatcher(message_pipe_handle)); 377 auto dispatcher = GetDispatcher(message_pipe_handle);
335 if (!dispatcher) 378 if (!dispatcher)
336 return MOJO_RESULT_INVALID_ARGUMENT; 379 return MOJO_RESULT_INVALID_ARGUMENT;
337 380
338 // Easy case: not sending any handles. 381 if (num_handles == 0) // Fast path: no handles.
339 if (num_handles == 0) 382 return dispatcher->WriteMessage(bytes, num_bytes, nullptr, 0, flags);
340 return dispatcher->WriteMessage(bytes, num_bytes, nullptr, flags);
341 383
342 // We have to handle |handles| here, since we have to mark them busy in the 384 CHECK(handles);
343 // global handle table. We can't delegate this to the dispatcher, since the 385
344 // handle table lock must be acquired before the dispatcher lock. 386 if (num_handles > kMaxHandlesPerMessage)
345 //
346 // (This leads to an oddity: |handles|/|num_handles| are always verified for
347 // validity, even for dispatchers that don't support |WriteMessage()| and will
348 // simply return failure unconditionally. It also breaks the usual
349 // left-to-right verification order of arguments.)
350 if (num_handles > GetConfiguration().max_message_num_handles)
351 return MOJO_RESULT_RESOURCE_EXHAUSTED; 387 return MOJO_RESULT_RESOURCE_EXHAUSTED;
352 388
353 // We'll need to hold on to the dispatchers so that we can pass them on to 389 for (size_t i = 0; i < num_handles; ++i) {
354 // |WriteMessage()| and also so that we can unlock their locks afterwards 390 if (message_pipe_handle == handles[i])
355 // without accessing the handle table. These can be dumb pointers, since their 391 return MOJO_RESULT_BUSY;
356 // entries in the handle table won't get removed (since they'll be marked as
357 // busy).
358 std::vector<DispatcherTransport> transports(num_handles);
359
360 // When we pass handles, we have to try to take all their dispatchers' locks
361 // and mark the handles as busy. If the call succeeds, we then remove the
362 // handles from the handle table.
363 {
364 base::AutoLock locker(handle_table_lock_);
365 MojoResult result = handle_table_.MarkBusyAndStartTransport(
366 message_pipe_handle, handles, num_handles, &transports);
367 if (result != MOJO_RESULT_OK)
368 return result;
369 } 392 }
370 393
371 MojoResult rv = 394 std::vector<Dispatcher::DispatcherInTransit> dispatchers;
372 dispatcher->WriteMessage(bytes, num_bytes, &transports, flags); 395 {
396 base::AutoLock lock(handles_lock_);
397 MojoResult rv = handles_.BeginTransit(handles, num_handles, &dispatchers);
398 if (rv != MOJO_RESULT_OK) {
399 handles_.CancelTransit(dispatchers);
400 return rv;
401 }
402 }
403 DCHECK_EQ(num_handles, dispatchers.size());
373 404
374 // We need to release the dispatcher locks before we take the handle table 405 MojoResult rv = dispatcher->WriteMessage(
375 // lock. 406 bytes, num_bytes, dispatchers.data(), num_handles, flags);
376 for (uint32_t i = 0; i < num_handles; i++)
377 transports[i].End();
378 407
379 { 408 {
380 base::AutoLock locker(handle_table_lock_); 409 base::AutoLock lock(handles_lock_);
381 if (rv == MOJO_RESULT_OK) { 410 if (rv == MOJO_RESULT_OK) {
382 handle_table_.RemoveBusyHandles(handles, num_handles); 411 handles_.CompleteTransitAndClose(dispatchers);
383 } else { 412 } else {
384 handle_table_.RestoreBusyHandles(handles, num_handles); 413 handles_.CancelTransit(dispatchers);
385 } 414 }
386 } 415 }
387 416
388 return rv; 417 return rv;
389 } 418 }
390 419
391 MojoResult Core::ReadMessage(MojoHandle message_pipe_handle, 420 MojoResult Core::ReadMessage(MojoHandle message_pipe_handle,
392 void* bytes, 421 void* bytes,
393 uint32_t* num_bytes, 422 uint32_t* num_bytes,
394 MojoHandle* handles, 423 MojoHandle* handles,
395 uint32_t* num_handles, 424 uint32_t* num_handles,
396 MojoReadMessageFlags flags) { 425 MojoReadMessageFlags flags) {
397 scoped_refptr<Dispatcher> dispatcher(GetDispatcher(message_pipe_handle)); 426 CHECK((!num_handles || !*num_handles || handles) &&
427 (!num_bytes || !*num_bytes || bytes));
428 auto dispatcher = GetDispatcher(message_pipe_handle);
398 if (!dispatcher) 429 if (!dispatcher)
399 return MOJO_RESULT_INVALID_ARGUMENT; 430 return MOJO_RESULT_INVALID_ARGUMENT;
400 431 return dispatcher->ReadMessage(bytes, num_bytes, handles, num_handles, flags);
401 MojoResult rv;
402 uint32_t num_handles_value = num_handles ? *num_handles : 0;
403 if (num_handles_value == 0) {
404 // Easy case: won't receive any handles.
405 rv = dispatcher->ReadMessage(bytes, num_bytes, nullptr, &num_handles_value,
406 flags);
407 } else {
408 DispatcherVector dispatchers;
409 rv = dispatcher->ReadMessage(bytes, num_bytes, &dispatchers,
410 &num_handles_value, flags);
411 if (!dispatchers.empty()) {
412 DCHECK_EQ(rv, MOJO_RESULT_OK);
413 DCHECK(num_handles);
414 DCHECK_LE(dispatchers.size(), static_cast<size_t>(num_handles_value));
415
416 bool success;
417 {
418 base::AutoLock locker(handle_table_lock_);
419 success = handle_table_.AddDispatcherVector(dispatchers, handles);
420 }
421 if (!success) {
422 LOG(ERROR) << "Received message with " << dispatchers.size()
423 << " handles, but handle table full";
424 // Close dispatchers (outside the lock).
425 for (size_t i = 0; i < dispatchers.size(); i++) {
426 if (dispatchers[i])
427 dispatchers[i]->Close();
428 }
429 if (rv == MOJO_RESULT_OK)
430 rv = MOJO_RESULT_RESOURCE_EXHAUSTED;
431 }
432 }
433 }
434
435 if (num_handles)
436 *num_handles = num_handles_value;
437 return rv;
438 } 432 }
439 433
440 MojoResult Core::CreateDataPipe( 434 MojoResult Core::CreateDataPipe(
441 const MojoCreateDataPipeOptions* options, 435 const MojoCreateDataPipeOptions* options,
442 MojoHandle* data_pipe_producer_handle, 436 MojoHandle* data_pipe_producer_handle,
443 MojoHandle* data_pipe_consumer_handle) { 437 MojoHandle* data_pipe_consumer_handle) {
444 MojoCreateDataPipeOptions validated_options = {}; 438 if (options && options->struct_size != sizeof(MojoCreateDataPipeOptions))
445 MojoResult result = 439 return MOJO_RESULT_INVALID_ARGUMENT;
446 DataPipe::ValidateCreateOptions(options, &validated_options);
447 if (result != MOJO_RESULT_OK)
448 return result;
449 440
450 scoped_refptr<DataPipeProducerDispatcher> producer_dispatcher = 441 MojoCreateDataPipeOptions create_options;
451 DataPipeProducerDispatcher::Create(validated_options); 442 create_options.struct_size = sizeof(MojoCreateDataPipeOptions);
452 scoped_refptr<DataPipeConsumerDispatcher> consumer_dispatcher = 443 create_options.flags = options ? options->flags : 0;
453 DataPipeConsumerDispatcher::Create(validated_options); 444 create_options.element_num_bytes = options ? options->element_num_bytes : 1;
445 // TODO: Use Configuration to get default data pipe capacity.
446 create_options.capacity_num_bytes =
447 options && options->capacity_num_bytes ? options->capacity_num_bytes
448 : 64 * 1024;
454 449
455 std::pair<MojoHandle, MojoHandle> handle_pair; 450 // TODO: Broker through the parent when necessary.
456 { 451 scoped_refptr<PlatformSharedBuffer> ring_buffer =
457 base::AutoLock locker(handle_table_lock_); 452 GetNodeController()->CreateSharedBuffer(
458 handle_pair = handle_table_.AddDispatcherPair(producer_dispatcher, 453 create_options.capacity_num_bytes);
459 consumer_dispatcher); 454 if (!ring_buffer)
460 } 455 return MOJO_RESULT_RESOURCE_EXHAUSTED;
461 if (handle_pair.first == MOJO_HANDLE_INVALID) { 456
462 DCHECK_EQ(handle_pair.second, MOJO_HANDLE_INVALID); 457 ports::PortRef port0, port1;
463 LOG(ERROR) << "Handle table full"; 458 GetNodeController()->node()->CreatePortPair(&port0, &port1);
464 producer_dispatcher->Close(); 459
465 consumer_dispatcher->Close(); 460 CHECK(data_pipe_producer_handle);
461 CHECK(data_pipe_consumer_handle);
462
463 uint64_t pipe_id = base::RandUint64();
464
465 scoped_refptr<Dispatcher> producer = new DataPipeProducerDispatcher(
466 GetNodeController(), port0, ring_buffer, create_options,
467 true /* initialized */, pipe_id);
468 scoped_refptr<Dispatcher> consumer = new DataPipeConsumerDispatcher(
469 GetNodeController(), port1, ring_buffer, create_options,
470 true /* initialized */, pipe_id);
471
472 *data_pipe_producer_handle = AddDispatcher(producer);
473 *data_pipe_consumer_handle = AddDispatcher(consumer);
474 if (*data_pipe_producer_handle == MOJO_HANDLE_INVALID ||
475 *data_pipe_consumer_handle == MOJO_HANDLE_INVALID) {
476 if (*data_pipe_producer_handle != MOJO_HANDLE_INVALID) {
477 scoped_refptr<Dispatcher> unused;
478 handles_.GetAndRemoveDispatcher(*data_pipe_producer_handle, &unused);
479 }
480 producer->Close();
481 consumer->Close();
466 return MOJO_RESULT_RESOURCE_EXHAUSTED; 482 return MOJO_RESULT_RESOURCE_EXHAUSTED;
467 } 483 }
468 DCHECK_NE(handle_pair.second, MOJO_HANDLE_INVALID);
469 484
470 ScopedPlatformHandle server_handle, client_handle;
471 #if defined(OS_WIN)
472 internal::g_broker->CreatePlatformChannelPair(&server_handle, &client_handle);
473 #else
474 PlatformChannelPair channel_pair;
475 server_handle = channel_pair.PassServerHandle();
476 client_handle = channel_pair.PassClientHandle();
477 #endif
478 producer_dispatcher->Init(std::move(server_handle), nullptr, 0u);
479 consumer_dispatcher->Init(std::move(client_handle), nullptr, 0u);
480
481 *data_pipe_producer_handle = handle_pair.first;
482 *data_pipe_consumer_handle = handle_pair.second;
483 return MOJO_RESULT_OK; 485 return MOJO_RESULT_OK;
484 } 486 }
485 487
486 MojoResult Core::WriteData(MojoHandle data_pipe_producer_handle, 488 MojoResult Core::WriteData(MojoHandle data_pipe_producer_handle,
487 const void* elements, 489 const void* elements,
488 uint32_t* num_bytes, 490 uint32_t* num_bytes,
489 MojoWriteDataFlags flags) { 491 MojoWriteDataFlags flags) {
490 scoped_refptr<Dispatcher> dispatcher( 492 scoped_refptr<Dispatcher> dispatcher(
491 GetDispatcher(data_pipe_producer_handle)); 493 GetDispatcher(data_pipe_producer_handle));
492 if (!dispatcher) 494 if (!dispatcher)
(...skipping 62 matching lines...) Expand 10 before | Expand all | Expand 10 after
555 const MojoCreateSharedBufferOptions* options, 557 const MojoCreateSharedBufferOptions* options,
556 uint64_t num_bytes, 558 uint64_t num_bytes,
557 MojoHandle* shared_buffer_handle) { 559 MojoHandle* shared_buffer_handle) {
558 MojoCreateSharedBufferOptions validated_options = {}; 560 MojoCreateSharedBufferOptions validated_options = {};
559 MojoResult result = SharedBufferDispatcher::ValidateCreateOptions( 561 MojoResult result = SharedBufferDispatcher::ValidateCreateOptions(
560 options, &validated_options); 562 options, &validated_options);
561 if (result != MOJO_RESULT_OK) 563 if (result != MOJO_RESULT_OK)
562 return result; 564 return result;
563 565
564 scoped_refptr<SharedBufferDispatcher> dispatcher; 566 scoped_refptr<SharedBufferDispatcher> dispatcher;
565 result = SharedBufferDispatcher::Create(platform_support_, validated_options, 567 result = SharedBufferDispatcher::Create(
566 num_bytes, &dispatcher); 568 internal::g_platform_support, validated_options, num_bytes, &dispatcher);
567 if (result != MOJO_RESULT_OK) { 569 if (result != MOJO_RESULT_OK) {
568 DCHECK(!dispatcher); 570 DCHECK(!dispatcher);
569 return result; 571 return result;
570 } 572 }
571 573
572 *shared_buffer_handle = AddDispatcher(dispatcher); 574 *shared_buffer_handle = AddDispatcher(dispatcher);
573 if (*shared_buffer_handle == MOJO_HANDLE_INVALID) { 575 if (*shared_buffer_handle == MOJO_HANDLE_INVALID) {
574 LOG(ERROR) << "Handle table full"; 576 LOG(ERROR) << "Handle table full";
575 dispatcher->Close(); 577 dispatcher->Close();
576 return MOJO_RESULT_RESOURCE_EXHAUSTED; 578 return MOJO_RESULT_RESOURCE_EXHAUSTED;
(...skipping 48 matching lines...) Expand 10 before | Expand all | Expand 10 after
625 result = mapping_table_.AddMapping(std::move(mapping)); 627 result = mapping_table_.AddMapping(std::move(mapping));
626 } 628 }
627 if (result != MOJO_RESULT_OK) 629 if (result != MOJO_RESULT_OK)
628 return result; 630 return result;
629 631
630 *buffer = address; 632 *buffer = address;
631 return MOJO_RESULT_OK; 633 return MOJO_RESULT_OK;
632 } 634 }
633 635
634 MojoResult Core::UnmapBuffer(void* buffer) { 636 MojoResult Core::UnmapBuffer(void* buffer) {
635 base::AutoLock locker(mapping_table_lock_); 637 base::AutoLock lock(mapping_table_lock_);
636 return mapping_table_.RemoveMapping(buffer); 638 return mapping_table_.RemoveMapping(buffer);
637 } 639 }
638 640
639 // Note: We allow |handles| to repeat the same handle multiple times, since 641 void Core::GetActiveHandlesForTest(std::vector<MojoHandle>* handles) {
640 // different flags may be specified. 642 base::AutoLock lock(handles_lock_);
641 // TODO(vtl): This incurs a performance cost in |Remove()|. Analyze this 643 handles_.GetActiveHandlesForTest(handles);
642 // more carefully and address it if necessary. 644 }
645
643 MojoResult Core::WaitManyInternal(const MojoHandle* handles, 646 MojoResult Core::WaitManyInternal(const MojoHandle* handles,
644 const MojoHandleSignals* signals, 647 const MojoHandleSignals* signals,
645 uint32_t num_handles, 648 uint32_t num_handles,
646 MojoDeadline deadline, 649 MojoDeadline deadline,
647 uint32_t* result_index, 650 uint32_t *result_index,
648 HandleSignalsState* signals_states) { 651 HandleSignalsState* signals_states) {
649 CHECK(handles); 652 CHECK(handles);
650 CHECK(signals); 653 CHECK(signals);
651 DCHECK_GT(num_handles, 0u); 654 DCHECK_GT(num_handles, 0u);
652 if (result_index) { 655 if (result_index) {
653 DCHECK_EQ(*result_index, static_cast<uint32_t>(-1)); 656 DCHECK_EQ(*result_index, static_cast<uint32_t>(-1));
654 } 657 }
655 658
656 DispatcherVector dispatchers; 659 DispatcherVector dispatchers;
657 dispatchers.reserve(num_handles); 660 dispatchers.reserve(num_handles);
(...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after
698 &waiter, signals_states ? &signals_states[i] : nullptr); 701 &waiter, signals_states ? &signals_states[i] : nullptr);
699 } 702 }
700 if (signals_states) { 703 if (signals_states) {
701 for (; i < num_handles; i++) 704 for (; i < num_handles; i++)
702 signals_states[i] = dispatchers[i]->GetHandleSignalsState(); 705 signals_states[i] = dispatchers[i]->GetHandleSignalsState();
703 } 706 }
704 707
705 return rv; 708 return rv;
706 } 709 }
707 710
711 // static
712 void Core::PassNodeControllerToIOThread(
713 scoped_ptr<NodeController> node_controller) {
714 // It's OK to leak this reference. At this point we know the IO loop is still
715 // running, and we know the NodeController will observe its eventual
716 // destruction. This tells the NodeController to delete itself when that
717 // happens.
718 node_controller.release()->DestroyOnIOThreadShutdown();
719 }
720
708 } // namespace edk 721 } // namespace edk
709 } // namespace mojo 722 } // namespace mojo
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698