Index: mojo/edk/system/core.cc |
diff --git a/mojo/edk/system/core.cc b/mojo/edk/system/core.cc |
index 80e9815ac5f21808f7a401330ddbd90d65679a12..fa164751cf786265d56d72609cb5f03762ade61e 100644 |
--- a/mojo/edk/system/core.cc |
+++ b/mojo/edk/system/core.cc |
@@ -4,106 +4,197 @@ |
#include "mojo/edk/system/core.h" |
-#include <stddef.h> |
-#include <stdint.h> |
+#include <string.h> |
#include <utility> |
-#include <vector> |
+#include "base/bind.h" |
#include "base/containers/stack_container.h" |
+#include "base/location.h" |
#include "base/logging.h" |
+#include "base/macros.h" |
+#include "base/message_loop/message_loop.h" |
#include "base/rand_util.h" |
+#include "base/thread_task_runner_handle.h" |
#include "base/time/time.h" |
+#include "crypto/random.h" |
+#include "mojo/edk/embedder/embedder.h" |
#include "mojo/edk/embedder/embedder_internal.h" |
-#include "mojo/edk/embedder/platform_channel_pair.h" |
#include "mojo/edk/embedder/platform_shared_buffer.h" |
#include "mojo/edk/embedder/platform_support.h" |
#include "mojo/edk/system/async_waiter.h" |
-#include "mojo/edk/system/broker.h" |
+#include "mojo/edk/system/channel.h" |
#include "mojo/edk/system/configuration.h" |
-#include "mojo/edk/system/data_pipe.h" |
#include "mojo/edk/system/data_pipe_consumer_dispatcher.h" |
#include "mojo/edk/system/data_pipe_producer_dispatcher.h" |
-#include "mojo/edk/system/dispatcher.h" |
#include "mojo/edk/system/handle_signals_state.h" |
#include "mojo/edk/system/message_pipe_dispatcher.h" |
+#include "mojo/edk/system/platform_handle_dispatcher.h" |
+#include "mojo/edk/system/ports/node.h" |
+#include "mojo/edk/system/remote_message_pipe_bootstrap.h" |
#include "mojo/edk/system/shared_buffer_dispatcher.h" |
#include "mojo/edk/system/wait_set_dispatcher.h" |
#include "mojo/edk/system/waiter.h" |
-#include "mojo/public/c/system/macros.h" |
-#include "mojo/public/cpp/system/macros.h" |
namespace mojo { |
namespace edk { |
-// Implementation notes |
-// |
-// Mojo primitives are implemented by the singleton |Core| object. Most calls |
-// are for a "primary" handle (the first argument). |Core::GetDispatcher()| is |
-// used to look up a |Dispatcher| object for a given handle. That object |
-// implements most primitives for that object. The wait primitives are not |
-// attached to objects and are implemented by |Core| itself. |
-// |
-// Some objects have multiple handles associated to them, e.g., message pipes |
-// (which have two). In such a case, there is still a |Dispatcher| (e.g., |
-// |MessagePipeDispatcher|) for each handle, with each handle having a strong |
-// reference to the common "secondary" object (e.g., |MessagePipe|). This |
-// secondary object does NOT have any references to the |Dispatcher|s (even if |
-// it did, it wouldn't be able to do anything with them due to lock order |
-// requirements -- see below). |
-// |
-// Waiting is implemented by having the thread that wants to wait call the |
-// |Dispatcher|s for the handles that it wants to wait on with a |Waiter| |
-// object; this |Waiter| object may be created on the stack of that thread or be |
-// kept in thread local storage for that thread (TODO(vtl): future improvement). |
-// The |Dispatcher| then adds the |Waiter| to an |AwakableList| that's either |
-// owned by that |Dispatcher| (see |SimpleDispatcher|) or by a secondary object |
-// (e.g., |MessagePipe|). To signal/wake a |Waiter|, the object in question -- |
-// either a |SimpleDispatcher| or a secondary object -- talks to its |
-// |AwakableList|. |
- |
-// Thread-safety notes |
-// |
-// Mojo primitives calls are thread-safe. We achieve this with relatively |
-// fine-grained locking. There is a global handle table lock. This lock should |
-// be held as briefly as possible (TODO(vtl): a future improvement would be to |
-// switch it to a reader-writer lock). Each |Dispatcher| object then has a lock |
-// (which subclasses can use to protect their data). |
-// |
-// The lock ordering is as follows: |
-// 1. global handle table lock, global mapping table lock |
-// 2. |Dispatcher| locks |
-// 3. secondary object locks |
-// ... |
-// INF. |Waiter| locks |
-// |
-// Notes: |
-// - While holding a |Dispatcher| lock, you may not unconditionally attempt |
-// to take another |Dispatcher| lock. (This has consequences on the |
-// concurrency semantics of |MojoWriteMessage()| when passing handles.) |
-// Doing so would lead to deadlock. |
-// - Locks at the "INF" level may not have any locks taken while they are |
-// held. |
- |
-// TODO(vtl): This should take a |scoped_ptr<PlatformSupport>| as a parameter. |
-Core::Core(PlatformSupport* platform_support) |
- : platform_support_(platform_support) { |
+namespace { |
+ |
+// This is an unnecessarily large limit that is relatively easy to enforce. |
+const uint32_t kMaxHandlesPerMessage = 1024 * 1024; |
+ |
+void OnPortConnected( |
+ Core* core, |
+ int endpoint, |
+ const base::Callback<void(ScopedMessagePipeHandle)>& callback, |
+ const ports::PortRef& port) { |
+ // TODO: Maybe we could negotiate a pipe ID for cross-process pipes too; |
+ // for now we just use 0x7F7F7F7F7F7F7F7F. In practice these are used for |
+ // bootstrap and aren't passed around, so tracking them is less important. |
+ MojoHandle handle = core->AddDispatcher( |
+ new MessagePipeDispatcher(core->GetNodeController(), port, |
+ 0x7f7f7f7f7f7f7f7fUL, endpoint)); |
+ callback.Run(ScopedMessagePipeHandle(MessagePipeHandle(handle))); |
} |
+} // namespace |
+ |
+Core::Core() {} |
+ |
Core::~Core() { |
+ if (node_controller_ && node_controller_->io_task_runner()) { |
+ // If this races with IO thread shutdown the callback will be dropped and |
+ // the NodeController will be shutdown on this thread anyway, which is also |
+ // just fine. |
+ scoped_refptr<base::TaskRunner> io_task_runner = |
+ node_controller_->io_task_runner(); |
+ io_task_runner->PostTask(FROM_HERE, |
+ base::Bind(&Core::PassNodeControllerToIOThread, |
+ base::Passed(&node_controller_))); |
+ } |
} |
-MojoHandle Core::AddDispatcher(const scoped_refptr<Dispatcher>& dispatcher) { |
- base::AutoLock locker(handle_table_lock_); |
- return handle_table_.AddDispatcher(dispatcher); |
+void Core::SetIOTaskRunner(scoped_refptr<base::TaskRunner> io_task_runner) { |
+ GetNodeController()->SetIOTaskRunner(io_task_runner); |
+} |
+ |
+NodeController* Core::GetNodeController() { |
+ if (!node_controller_) |
+ node_controller_.reset(new NodeController(this)); |
+ return node_controller_.get(); |
} |
scoped_refptr<Dispatcher> Core::GetDispatcher(MojoHandle handle) { |
- if (handle == MOJO_HANDLE_INVALID) |
- return nullptr; |
+ base::AutoLock lock(handles_lock_); |
+ return handles_.GetDispatcher(handle); |
+} |
- base::AutoLock locker(handle_table_lock_); |
- return handle_table_.GetDispatcher(handle); |
+void Core::AddChild(base::ProcessHandle process_handle, |
+ ScopedPlatformHandle platform_handle) { |
+ GetNodeController()->ConnectToChild(process_handle, |
+ std::move(platform_handle)); |
+} |
+ |
+void Core::InitChild(ScopedPlatformHandle platform_handle) { |
+ GetNodeController()->ConnectToParent(std::move(platform_handle)); |
+} |
+ |
+MojoHandle Core::AddDispatcher(scoped_refptr<Dispatcher> dispatcher) { |
+ base::AutoLock lock(handles_lock_); |
+ return handles_.AddDispatcher(dispatcher); |
+} |
+ |
+bool Core::AddDispatchersFromTransit( |
+ const std::vector<Dispatcher::DispatcherInTransit>& dispatchers, |
+ MojoHandle* handles) { |
+ bool failed = false; |
+ { |
+ base::AutoLock lock(handles_lock_); |
+ if (!handles_.AddDispatchersFromTransit(dispatchers, handles)) |
+ failed = true; |
+ } |
+ if (failed) { |
+ for (auto d : dispatchers) |
+ d.dispatcher->Close(); |
+ return false; |
+ } |
+ return true; |
+} |
+ |
+MojoResult Core::CreatePlatformHandleWrapper( |
+ ScopedPlatformHandle platform_handle, |
+ MojoHandle* wrapper_handle) { |
+ MojoHandle h = AddDispatcher( |
+ PlatformHandleDispatcher::Create(std::move(platform_handle))); |
+ if (h == MOJO_HANDLE_INVALID) |
+ return MOJO_RESULT_RESOURCE_EXHAUSTED; |
+ *wrapper_handle = h; |
+ return MOJO_RESULT_OK; |
+} |
+ |
+MojoResult Core::PassWrappedPlatformHandle( |
+ MojoHandle wrapper_handle, |
+ ScopedPlatformHandle* platform_handle) { |
+ base::AutoLock lock(handles_lock_); |
+ scoped_refptr<Dispatcher> d; |
+ MojoResult result = handles_.GetAndRemoveDispatcher(wrapper_handle, &d); |
+ if (result != MOJO_RESULT_OK) |
+ return result; |
+ PlatformHandleDispatcher* phd = |
+ static_cast<PlatformHandleDispatcher*>(d.get()); |
+ *platform_handle = phd->PassPlatformHandle(); |
+ phd->Close(); |
+ return MOJO_RESULT_OK; |
+} |
+ |
+void Core::RequestShutdown(const base::Closure& callback) { |
+ base::Closure on_shutdown; |
+ if (base::ThreadTaskRunnerHandle::IsSet()) { |
+ on_shutdown = base::Bind(base::IgnoreResult(&base::TaskRunner::PostTask), |
+ base::ThreadTaskRunnerHandle::Get(), |
+ FROM_HERE, callback); |
+ } else { |
+ on_shutdown = callback; |
+ } |
+ GetNodeController()->RequestShutdown(on_shutdown); |
+} |
+ |
+void Core::CreateParentMessagePipe( |
+ ScopedPlatformHandle platform_handle, |
+ const base::Callback<void(ScopedMessagePipeHandle)>& callback) { |
+ std::string token = GenerateRandomToken(); |
+ CreateParentMessagePipe(token, callback); |
+ RemoteMessagePipeBootstrap::CreateForParent( |
+ GetNodeController(), std::move(platform_handle), token); |
+} |
+ |
+void Core::CreateChildMessagePipe( |
+ ScopedPlatformHandle platform_handle, |
+ const base::Callback<void(ScopedMessagePipeHandle)>& callback) { |
+ ports::PortRef port; |
+ GetNodeController()->node()->CreateUninitializedPort(&port); |
+ RemoteMessagePipeBootstrap::CreateForChild( |
+ GetNodeController(), std::move(platform_handle), port, |
+ base::Bind(&OnPortConnected, base::Unretained(this), 1, callback, port)); |
+} |
+ |
+void Core::CreateParentMessagePipe( |
+ const std::string& token, |
+ const base::Callback<void(ScopedMessagePipeHandle)>& callback) { |
+ GetNodeController()->ReservePort( |
+ token, |
+ base::Bind(&OnPortConnected, base::Unretained(this), 0, callback)); |
+} |
+ |
+void Core::CreateChildMessagePipe( |
+ const std::string& token, |
+ const base::Callback<void(ScopedMessagePipeHandle)>& callback) { |
+ ports::PortRef port; |
+ GetNodeController()->node()->CreateUninitializedPort(&port); |
+ GetNodeController()->ConnectToParentPort( |
+ port, token, |
+ base::Bind(&OnPortConnected, base::Unretained(this), 1, callback, port)); |
} |
MojoResult Core::AsyncWait(MojoHandle handle, |
@@ -124,23 +215,15 @@ MojoTimeTicks Core::GetTimeTicksNow() { |
} |
MojoResult Core::Close(MojoHandle handle) { |
- if (handle == MOJO_HANDLE_INVALID) |
- return MOJO_RESULT_INVALID_ARGUMENT; |
- |
scoped_refptr<Dispatcher> dispatcher; |
{ |
- base::AutoLock locker(handle_table_lock_); |
- MojoResult result = |
- handle_table_.GetAndRemoveDispatcher(handle, &dispatcher); |
- if (result != MOJO_RESULT_OK) |
- return result; |
+ base::AutoLock lock(handles_lock_); |
+ MojoResult rv = handles_.GetAndRemoveDispatcher(handle, &dispatcher); |
+ if (rv != MOJO_RESULT_OK) |
+ return rv; |
} |
- |
- // The dispatcher doesn't have a say in being closed, but gets notified of it. |
- // Note: This is done outside of |handle_table_lock_|. As a result, there's a |
- // race condition that the dispatcher must handle; see the comment in |
- // |Dispatcher| in dispatcher.h. |
- return dispatcher->Close(); |
+ dispatcher->Close(); |
+ return MOJO_RESULT_OK; |
} |
MojoResult Core::Wait(MojoHandle handle, |
@@ -230,7 +313,7 @@ MojoResult Core::GetReadyHandles(MojoHandle wait_set_handle, |
uint32_t* count, |
MojoHandle* handles, |
MojoResult* results, |
- MojoHandleSignalsState* signals_state) { |
+ MojoHandleSignalsState* signals_states) { |
if (!handles || !count || !(*count) || !results) |
return MOJO_RESULT_INVALID_ARGUMENT; |
@@ -248,8 +331,8 @@ MojoResult Core::GetReadyHandles(MojoHandle wait_set_handle, |
if (result == MOJO_RESULT_OK) { |
for (size_t i = 0; i < *count; i++) { |
handles[i] = static_cast<MojoHandle>(contexts[i]); |
- if (signals_state) |
- signals_state[i] = awoken_dispatchers[i]->GetHandleSignalsState(); |
+ if (signals_states) |
+ signals_states[i] = awoken_dispatchers[i]->GetHandleSignalsState(); |
} |
} |
@@ -260,128 +343,74 @@ MojoResult Core::CreateMessagePipe( |
const MojoCreateMessagePipeOptions* options, |
MojoHandle* message_pipe_handle0, |
MojoHandle* message_pipe_handle1) { |
+ ports::PortRef port0, port1; |
+ GetNodeController()->node()->CreatePortPair(&port0, &port1); |
+ |
CHECK(message_pipe_handle0); |
CHECK(message_pipe_handle1); |
- MojoCreateMessagePipeOptions validated_options = {}; |
- MojoResult result = |
- MessagePipeDispatcher::ValidateCreateOptions(options, &validated_options); |
- if (result != MOJO_RESULT_OK) |
- return result; |
- scoped_refptr<MessagePipeDispatcher> dispatcher0 = |
- MessagePipeDispatcher::Create(validated_options); |
- scoped_refptr<MessagePipeDispatcher> dispatcher1 = |
- MessagePipeDispatcher::Create(validated_options); |
+ uint64_t pipe_id = base::RandUint64(); |
- std::pair<MojoHandle, MojoHandle> handle_pair; |
- { |
- base::AutoLock locker(handle_table_lock_); |
- handle_pair = handle_table_.AddDispatcherPair(dispatcher0, dispatcher1); |
- } |
- if (handle_pair.first == MOJO_HANDLE_INVALID) { |
- DCHECK_EQ(handle_pair.second, MOJO_HANDLE_INVALID); |
- LOG(ERROR) << "Handle table full"; |
- dispatcher0->Close(); |
- dispatcher1->Close(); |
+ *message_pipe_handle0 = AddDispatcher( |
+ new MessagePipeDispatcher(GetNodeController(), port0, pipe_id, 0)); |
+ if (*message_pipe_handle0 == MOJO_HANDLE_INVALID) |
return MOJO_RESULT_RESOURCE_EXHAUSTED; |
- } |
- if (validated_options.flags & |
- MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_TRANSFERABLE) { |
- ScopedPlatformHandle server_handle, client_handle; |
-#if defined(OS_WIN) |
- internal::g_broker->CreatePlatformChannelPair(&server_handle, |
- &client_handle); |
-#else |
- PlatformChannelPair channel_pair; |
- server_handle = channel_pair.PassServerHandle(); |
- client_handle = channel_pair.PassClientHandle(); |
-#endif |
- dispatcher0->Init(std::move(server_handle), nullptr, 0u, nullptr, 0u, |
- nullptr, nullptr); |
- dispatcher1->Init(std::move(client_handle), nullptr, 0u, nullptr, 0u, |
- nullptr, nullptr); |
- } else { |
- uint64_t pipe_id = 0; |
- // route_id 0 is used internally in RoutedRawChannel. See kInternalRouteId |
- // in routed_raw_channel.cc. |
- // route_id 1 is used by broker communication. See kBrokerRouteId in |
- // broker_messages.h. |
- while (pipe_id < 2) |
- pipe_id = base::RandUint64(); |
- dispatcher0->InitNonTransferable(pipe_id); |
- dispatcher1->InitNonTransferable(pipe_id); |
+ *message_pipe_handle1 = AddDispatcher( |
+ new MessagePipeDispatcher(GetNodeController(), port1, pipe_id, 1)); |
+ if (*message_pipe_handle1 == MOJO_HANDLE_INVALID) { |
+ scoped_refptr<Dispatcher> unused; |
+ unused->Close(); |
+ handles_.GetAndRemoveDispatcher(*message_pipe_handle0, &unused); |
+ return MOJO_RESULT_RESOURCE_EXHAUSTED; |
} |
- *message_pipe_handle0 = handle_pair.first; |
- *message_pipe_handle1 = handle_pair.second; |
return MOJO_RESULT_OK; |
} |
-// Implementation note: To properly cancel waiters and avoid other races, this |
-// does not transfer dispatchers from one handle to another, even when sending a |
-// message in-process. Instead, it must transfer the "contents" of the |
-// dispatcher to a new dispatcher, and then close the old dispatcher. If this |
-// isn't done, in the in-process case, calls on the old handle may complete |
-// after the the message has been received and a new handle created (and |
-// possibly even after calls have been made on the new handle). |
MojoResult Core::WriteMessage(MojoHandle message_pipe_handle, |
const void* bytes, |
uint32_t num_bytes, |
const MojoHandle* handles, |
uint32_t num_handles, |
MojoWriteMessageFlags flags) { |
- scoped_refptr<Dispatcher> dispatcher(GetDispatcher(message_pipe_handle)); |
+ auto dispatcher = GetDispatcher(message_pipe_handle); |
if (!dispatcher) |
return MOJO_RESULT_INVALID_ARGUMENT; |
- // Easy case: not sending any handles. |
- if (num_handles == 0) |
- return dispatcher->WriteMessage(bytes, num_bytes, nullptr, flags); |
- |
- // We have to handle |handles| here, since we have to mark them busy in the |
- // global handle table. We can't delegate this to the dispatcher, since the |
- // handle table lock must be acquired before the dispatcher lock. |
- // |
- // (This leads to an oddity: |handles|/|num_handles| are always verified for |
- // validity, even for dispatchers that don't support |WriteMessage()| and will |
- // simply return failure unconditionally. It also breaks the usual |
- // left-to-right verification order of arguments.) |
- if (num_handles > GetConfiguration().max_message_num_handles) |
+ if (num_handles == 0) // Fast path: no handles. |
+ return dispatcher->WriteMessage(bytes, num_bytes, nullptr, 0, flags); |
+ |
+ CHECK(handles); |
+ |
+ if (num_handles > kMaxHandlesPerMessage) |
return MOJO_RESULT_RESOURCE_EXHAUSTED; |
- // We'll need to hold on to the dispatchers so that we can pass them on to |
- // |WriteMessage()| and also so that we can unlock their locks afterwards |
- // without accessing the handle table. These can be dumb pointers, since their |
- // entries in the handle table won't get removed (since they'll be marked as |
- // busy). |
- std::vector<DispatcherTransport> transports(num_handles); |
+ for (size_t i = 0; i < num_handles; ++i) { |
+ if (message_pipe_handle == handles[i]) |
+ return MOJO_RESULT_BUSY; |
+ } |
- // When we pass handles, we have to try to take all their dispatchers' locks |
- // and mark the handles as busy. If the call succeeds, we then remove the |
- // handles from the handle table. |
+ std::vector<Dispatcher::DispatcherInTransit> dispatchers; |
{ |
- base::AutoLock locker(handle_table_lock_); |
- MojoResult result = handle_table_.MarkBusyAndStartTransport( |
- message_pipe_handle, handles, num_handles, &transports); |
- if (result != MOJO_RESULT_OK) |
- return result; |
+ base::AutoLock lock(handles_lock_); |
+ MojoResult rv = handles_.BeginTransit(handles, num_handles, &dispatchers); |
+ if (rv != MOJO_RESULT_OK) { |
+ handles_.CancelTransit(dispatchers); |
+ return rv; |
+ } |
} |
+ DCHECK_EQ(num_handles, dispatchers.size()); |
- MojoResult rv = |
- dispatcher->WriteMessage(bytes, num_bytes, &transports, flags); |
- |
- // We need to release the dispatcher locks before we take the handle table |
- // lock. |
- for (uint32_t i = 0; i < num_handles; i++) |
- transports[i].End(); |
+ MojoResult rv = dispatcher->WriteMessage( |
+ bytes, num_bytes, dispatchers.data(), num_handles, flags); |
{ |
- base::AutoLock locker(handle_table_lock_); |
+ base::AutoLock lock(handles_lock_); |
if (rv == MOJO_RESULT_OK) { |
- handle_table_.RemoveBusyHandles(handles, num_handles); |
+ handles_.CompleteTransitAndClose(dispatchers); |
} else { |
- handle_table_.RestoreBusyHandles(handles, num_handles); |
+ handles_.CancelTransit(dispatchers); |
} |
} |
@@ -394,92 +423,65 @@ MojoResult Core::ReadMessage(MojoHandle message_pipe_handle, |
MojoHandle* handles, |
uint32_t* num_handles, |
MojoReadMessageFlags flags) { |
- scoped_refptr<Dispatcher> dispatcher(GetDispatcher(message_pipe_handle)); |
+ CHECK((!num_handles || !*num_handles || handles) && |
+ (!num_bytes || !*num_bytes || bytes)); |
+ auto dispatcher = GetDispatcher(message_pipe_handle); |
if (!dispatcher) |
return MOJO_RESULT_INVALID_ARGUMENT; |
- |
- MojoResult rv; |
- uint32_t num_handles_value = num_handles ? *num_handles : 0; |
- if (num_handles_value == 0) { |
- // Easy case: won't receive any handles. |
- rv = dispatcher->ReadMessage(bytes, num_bytes, nullptr, &num_handles_value, |
- flags); |
- } else { |
- DispatcherVector dispatchers; |
- rv = dispatcher->ReadMessage(bytes, num_bytes, &dispatchers, |
- &num_handles_value, flags); |
- if (!dispatchers.empty()) { |
- DCHECK_EQ(rv, MOJO_RESULT_OK); |
- DCHECK(num_handles); |
- DCHECK_LE(dispatchers.size(), static_cast<size_t>(num_handles_value)); |
- |
- bool success; |
- { |
- base::AutoLock locker(handle_table_lock_); |
- success = handle_table_.AddDispatcherVector(dispatchers, handles); |
- } |
- if (!success) { |
- LOG(ERROR) << "Received message with " << dispatchers.size() |
- << " handles, but handle table full"; |
- // Close dispatchers (outside the lock). |
- for (size_t i = 0; i < dispatchers.size(); i++) { |
- if (dispatchers[i]) |
- dispatchers[i]->Close(); |
- } |
- if (rv == MOJO_RESULT_OK) |
- rv = MOJO_RESULT_RESOURCE_EXHAUSTED; |
- } |
- } |
- } |
- |
- if (num_handles) |
- *num_handles = num_handles_value; |
- return rv; |
+ return dispatcher->ReadMessage(bytes, num_bytes, handles, num_handles, flags); |
} |
MojoResult Core::CreateDataPipe( |
const MojoCreateDataPipeOptions* options, |
MojoHandle* data_pipe_producer_handle, |
MojoHandle* data_pipe_consumer_handle) { |
- MojoCreateDataPipeOptions validated_options = {}; |
- MojoResult result = |
- DataPipe::ValidateCreateOptions(options, &validated_options); |
- if (result != MOJO_RESULT_OK) |
- return result; |
+ if (options && options->struct_size != sizeof(MojoCreateDataPipeOptions)) |
+ return MOJO_RESULT_INVALID_ARGUMENT; |
- scoped_refptr<DataPipeProducerDispatcher> producer_dispatcher = |
- DataPipeProducerDispatcher::Create(validated_options); |
- scoped_refptr<DataPipeConsumerDispatcher> consumer_dispatcher = |
- DataPipeConsumerDispatcher::Create(validated_options); |
+ MojoCreateDataPipeOptions create_options; |
+ create_options.struct_size = sizeof(MojoCreateDataPipeOptions); |
+ create_options.flags = options ? options->flags : 0; |
+ create_options.element_num_bytes = options ? options->element_num_bytes : 1; |
+ // TODO: Use Configuration to get default data pipe capacity. |
+ create_options.capacity_num_bytes = |
+ options && options->capacity_num_bytes ? options->capacity_num_bytes |
+ : 64 * 1024; |
+ |
+ // TODO: Broker through the parent when necessary. |
+ scoped_refptr<PlatformSharedBuffer> ring_buffer = |
+ GetNodeController()->CreateSharedBuffer( |
+ create_options.capacity_num_bytes); |
+ if (!ring_buffer) |
+ return MOJO_RESULT_RESOURCE_EXHAUSTED; |
- std::pair<MojoHandle, MojoHandle> handle_pair; |
- { |
- base::AutoLock locker(handle_table_lock_); |
- handle_pair = handle_table_.AddDispatcherPair(producer_dispatcher, |
- consumer_dispatcher); |
- } |
- if (handle_pair.first == MOJO_HANDLE_INVALID) { |
- DCHECK_EQ(handle_pair.second, MOJO_HANDLE_INVALID); |
- LOG(ERROR) << "Handle table full"; |
- producer_dispatcher->Close(); |
- consumer_dispatcher->Close(); |
+ ports::PortRef port0, port1; |
+ GetNodeController()->node()->CreatePortPair(&port0, &port1); |
+ |
+ CHECK(data_pipe_producer_handle); |
+ CHECK(data_pipe_consumer_handle); |
+ |
+ uint64_t pipe_id = base::RandUint64(); |
+ |
+ scoped_refptr<Dispatcher> producer = new DataPipeProducerDispatcher( |
+ GetNodeController(), port0, ring_buffer, create_options, |
+ true /* initialized */, pipe_id); |
+ scoped_refptr<Dispatcher> consumer = new DataPipeConsumerDispatcher( |
+ GetNodeController(), port1, ring_buffer, create_options, |
+ true /* initialized */, pipe_id); |
+ |
+ *data_pipe_producer_handle = AddDispatcher(producer); |
+ *data_pipe_consumer_handle = AddDispatcher(consumer); |
+ if (*data_pipe_producer_handle == MOJO_HANDLE_INVALID || |
+ *data_pipe_consumer_handle == MOJO_HANDLE_INVALID) { |
+ if (*data_pipe_producer_handle != MOJO_HANDLE_INVALID) { |
+ scoped_refptr<Dispatcher> unused; |
+ handles_.GetAndRemoveDispatcher(*data_pipe_producer_handle, &unused); |
+ } |
+ producer->Close(); |
+ consumer->Close(); |
return MOJO_RESULT_RESOURCE_EXHAUSTED; |
} |
- DCHECK_NE(handle_pair.second, MOJO_HANDLE_INVALID); |
- |
- ScopedPlatformHandle server_handle, client_handle; |
-#if defined(OS_WIN) |
- internal::g_broker->CreatePlatformChannelPair(&server_handle, &client_handle); |
-#else |
- PlatformChannelPair channel_pair; |
- server_handle = channel_pair.PassServerHandle(); |
- client_handle = channel_pair.PassClientHandle(); |
-#endif |
- producer_dispatcher->Init(std::move(server_handle), nullptr, 0u); |
- consumer_dispatcher->Init(std::move(client_handle), nullptr, 0u); |
- |
- *data_pipe_producer_handle = handle_pair.first; |
- *data_pipe_consumer_handle = handle_pair.second; |
+ |
return MOJO_RESULT_OK; |
} |
@@ -562,8 +564,8 @@ MojoResult Core::CreateSharedBuffer( |
return result; |
scoped_refptr<SharedBufferDispatcher> dispatcher; |
- result = SharedBufferDispatcher::Create(platform_support_, validated_options, |
- num_bytes, &dispatcher); |
+ result = SharedBufferDispatcher::Create( |
+ internal::g_platform_support, validated_options, num_bytes, &dispatcher); |
if (result != MOJO_RESULT_OK) { |
DCHECK(!dispatcher); |
return result; |
@@ -632,19 +634,20 @@ MojoResult Core::MapBuffer(MojoHandle buffer_handle, |
} |
MojoResult Core::UnmapBuffer(void* buffer) { |
- base::AutoLock locker(mapping_table_lock_); |
+ base::AutoLock lock(mapping_table_lock_); |
return mapping_table_.RemoveMapping(buffer); |
} |
-// Note: We allow |handles| to repeat the same handle multiple times, since |
-// different flags may be specified. |
-// TODO(vtl): This incurs a performance cost in |Remove()|. Analyze this |
-// more carefully and address it if necessary. |
+void Core::GetActiveHandlesForTest(std::vector<MojoHandle>* handles) { |
+ base::AutoLock lock(handles_lock_); |
+ handles_.GetActiveHandlesForTest(handles); |
+} |
+ |
MojoResult Core::WaitManyInternal(const MojoHandle* handles, |
const MojoHandleSignals* signals, |
uint32_t num_handles, |
MojoDeadline deadline, |
- uint32_t* result_index, |
+ uint32_t *result_index, |
HandleSignalsState* signals_states) { |
CHECK(handles); |
CHECK(signals); |
@@ -705,5 +708,15 @@ MojoResult Core::WaitManyInternal(const MojoHandle* handles, |
return rv; |
} |
+// static |
+void Core::PassNodeControllerToIOThread( |
+ scoped_ptr<NodeController> node_controller) { |
+ // It's OK to leak this reference. At this point we know the IO loop is still |
+ // running, and we know the NodeController will observe its eventual |
+ // destruction. This tells the NodeController to delete itself when that |
+ // happens. |
+ node_controller.release()->DestroyOnIOThreadShutdown(); |
+} |
+ |
} // namespace edk |
} // namespace mojo |