| Index: mojo/edk/system/core.cc | 
| diff --git a/mojo/edk/system/core.cc b/mojo/edk/system/core.cc | 
| index 80e9815ac5f21808f7a401330ddbd90d65679a12..fa164751cf786265d56d72609cb5f03762ade61e 100644 | 
| --- a/mojo/edk/system/core.cc | 
| +++ b/mojo/edk/system/core.cc | 
| @@ -4,106 +4,197 @@ | 
|  | 
| #include "mojo/edk/system/core.h" | 
|  | 
| -#include <stddef.h> | 
| -#include <stdint.h> | 
| +#include <string.h> | 
|  | 
| #include <utility> | 
| -#include <vector> | 
|  | 
| +#include "base/bind.h" | 
| #include "base/containers/stack_container.h" | 
| +#include "base/location.h" | 
| #include "base/logging.h" | 
| +#include "base/macros.h" | 
| +#include "base/message_loop/message_loop.h" | 
| #include "base/rand_util.h" | 
| +#include "base/thread_task_runner_handle.h" | 
| #include "base/time/time.h" | 
| +#include "crypto/random.h" | 
| +#include "mojo/edk/embedder/embedder.h" | 
| #include "mojo/edk/embedder/embedder_internal.h" | 
| -#include "mojo/edk/embedder/platform_channel_pair.h" | 
| #include "mojo/edk/embedder/platform_shared_buffer.h" | 
| #include "mojo/edk/embedder/platform_support.h" | 
| #include "mojo/edk/system/async_waiter.h" | 
| -#include "mojo/edk/system/broker.h" | 
| +#include "mojo/edk/system/channel.h" | 
| #include "mojo/edk/system/configuration.h" | 
| -#include "mojo/edk/system/data_pipe.h" | 
| #include "mojo/edk/system/data_pipe_consumer_dispatcher.h" | 
| #include "mojo/edk/system/data_pipe_producer_dispatcher.h" | 
| -#include "mojo/edk/system/dispatcher.h" | 
| #include "mojo/edk/system/handle_signals_state.h" | 
| #include "mojo/edk/system/message_pipe_dispatcher.h" | 
| +#include "mojo/edk/system/platform_handle_dispatcher.h" | 
| +#include "mojo/edk/system/ports/node.h" | 
| +#include "mojo/edk/system/remote_message_pipe_bootstrap.h" | 
| #include "mojo/edk/system/shared_buffer_dispatcher.h" | 
| #include "mojo/edk/system/wait_set_dispatcher.h" | 
| #include "mojo/edk/system/waiter.h" | 
| -#include "mojo/public/c/system/macros.h" | 
| -#include "mojo/public/cpp/system/macros.h" | 
|  | 
| namespace mojo { | 
| namespace edk { | 
|  | 
| -// Implementation notes | 
| -// | 
| -// Mojo primitives are implemented by the singleton |Core| object. Most calls | 
| -// are for a "primary" handle (the first argument). |Core::GetDispatcher()| is | 
| -// used to look up a |Dispatcher| object for a given handle. That object | 
| -// implements most primitives for that object. The wait primitives are not | 
| -// attached to objects and are implemented by |Core| itself. | 
| -// | 
| -// Some objects have multiple handles associated to them, e.g., message pipes | 
| -// (which have two). In such a case, there is still a |Dispatcher| (e.g., | 
| -// |MessagePipeDispatcher|) for each handle, with each handle having a strong | 
| -// reference to the common "secondary" object (e.g., |MessagePipe|). This | 
| -// secondary object does NOT have any references to the |Dispatcher|s (even if | 
| -// it did, it wouldn't be able to do anything with them due to lock order | 
| -// requirements -- see below). | 
| -// | 
| -// Waiting is implemented by having the thread that wants to wait call the | 
| -// |Dispatcher|s for the handles that it wants to wait on with a |Waiter| | 
| -// object; this |Waiter| object may be created on the stack of that thread or be | 
| -// kept in thread local storage for that thread (TODO(vtl): future improvement). | 
| -// The |Dispatcher| then adds the |Waiter| to an |AwakableList| that's either | 
| -// owned by that |Dispatcher| (see |SimpleDispatcher|) or by a secondary object | 
| -// (e.g., |MessagePipe|). To signal/wake a |Waiter|, the object in question -- | 
| -// either a |SimpleDispatcher| or a secondary object -- talks to its | 
| -// |AwakableList|. | 
| - | 
| -// Thread-safety notes | 
| -// | 
| -// Mojo primitives calls are thread-safe. We achieve this with relatively | 
| -// fine-grained locking. There is a global handle table lock. This lock should | 
| -// be held as briefly as possible (TODO(vtl): a future improvement would be to | 
| -// switch it to a reader-writer lock). Each |Dispatcher| object then has a lock | 
| -// (which subclasses can use to protect their data). | 
| -// | 
| -// The lock ordering is as follows: | 
| -//   1. global handle table lock, global mapping table lock | 
| -//   2. |Dispatcher| locks | 
| -//   3. secondary object locks | 
| -//   ... | 
| -//   INF. |Waiter| locks | 
| -// | 
| -// Notes: | 
| -//    - While holding a |Dispatcher| lock, you may not unconditionally attempt | 
| -//      to take another |Dispatcher| lock. (This has consequences on the | 
| -//      concurrency semantics of |MojoWriteMessage()| when passing handles.) | 
| -//      Doing so would lead to deadlock. | 
| -//    - Locks at the "INF" level may not have any locks taken while they are | 
| -//      held. | 
| - | 
| -// TODO(vtl): This should take a |scoped_ptr<PlatformSupport>| as a parameter. | 
| -Core::Core(PlatformSupport* platform_support) | 
| -    : platform_support_(platform_support) { | 
| +namespace { | 
| + | 
| +// This is an unnecessarily large limit that is relatively easy to enforce. | 
| +const uint32_t kMaxHandlesPerMessage = 1024 * 1024; | 
| + | 
| +void OnPortConnected( | 
| +    Core* core, | 
| +    int endpoint, | 
| +    const base::Callback<void(ScopedMessagePipeHandle)>& callback, | 
| +    const ports::PortRef& port) { | 
| +  // TODO: Maybe we could negotiate a pipe ID for cross-process pipes too; | 
| +  // for now we just use 0x7F7F7F7F7F7F7F7F. In practice these are used for | 
| +  // bootstrap and aren't passed around, so tracking them is less important. | 
| +  MojoHandle handle = core->AddDispatcher( | 
| +      new MessagePipeDispatcher(core->GetNodeController(), port, | 
| +                                0x7f7f7f7f7f7f7f7fUL, endpoint)); | 
| +  callback.Run(ScopedMessagePipeHandle(MessagePipeHandle(handle))); | 
| } | 
|  | 
| +}  // namespace | 
| + | 
| +Core::Core() {} | 
| + | 
| Core::~Core() { | 
| +  if (node_controller_ && node_controller_->io_task_runner()) { | 
| +    // If this races with IO thread shutdown the callback will be dropped and | 
| +    // the NodeController will be shutdown on this thread anyway, which is also | 
| +    // just fine. | 
| +    scoped_refptr<base::TaskRunner> io_task_runner = | 
| +        node_controller_->io_task_runner(); | 
| +    io_task_runner->PostTask(FROM_HERE, | 
| +                             base::Bind(&Core::PassNodeControllerToIOThread, | 
| +                                        base::Passed(&node_controller_))); | 
| +  } | 
| } | 
|  | 
| -MojoHandle Core::AddDispatcher(const scoped_refptr<Dispatcher>& dispatcher) { | 
| -  base::AutoLock locker(handle_table_lock_); | 
| -  return handle_table_.AddDispatcher(dispatcher); | 
| +void Core::SetIOTaskRunner(scoped_refptr<base::TaskRunner> io_task_runner) { | 
| +  GetNodeController()->SetIOTaskRunner(io_task_runner); | 
| +} | 
| + | 
| +NodeController* Core::GetNodeController() { | 
| +  if (!node_controller_) | 
| +    node_controller_.reset(new NodeController(this)); | 
| +  return node_controller_.get(); | 
| } | 
|  | 
| scoped_refptr<Dispatcher> Core::GetDispatcher(MojoHandle handle) { | 
| -  if (handle == MOJO_HANDLE_INVALID) | 
| -    return nullptr; | 
| +  base::AutoLock lock(handles_lock_); | 
| +  return handles_.GetDispatcher(handle); | 
| +} | 
|  | 
| -  base::AutoLock locker(handle_table_lock_); | 
| -  return handle_table_.GetDispatcher(handle); | 
| +void Core::AddChild(base::ProcessHandle process_handle, | 
| +                    ScopedPlatformHandle platform_handle) { | 
| +  GetNodeController()->ConnectToChild(process_handle, | 
| +                                      std::move(platform_handle)); | 
| +} | 
| + | 
| +void Core::InitChild(ScopedPlatformHandle platform_handle) { | 
| +  GetNodeController()->ConnectToParent(std::move(platform_handle)); | 
| +} | 
| + | 
| +MojoHandle Core::AddDispatcher(scoped_refptr<Dispatcher> dispatcher) { | 
| +  base::AutoLock lock(handles_lock_); | 
| +  return handles_.AddDispatcher(dispatcher); | 
| +} | 
| + | 
| +bool Core::AddDispatchersFromTransit( | 
| +    const std::vector<Dispatcher::DispatcherInTransit>& dispatchers, | 
| +    MojoHandle* handles) { | 
| +  bool failed = false; | 
| +  { | 
| +    base::AutoLock lock(handles_lock_); | 
| +    if (!handles_.AddDispatchersFromTransit(dispatchers, handles)) | 
| +      failed = true; | 
| +  } | 
| +  if (failed) { | 
| +    for (auto d : dispatchers) | 
| +      d.dispatcher->Close(); | 
| +    return false; | 
| +  } | 
| +  return true; | 
| +} | 
| + | 
| +MojoResult Core::CreatePlatformHandleWrapper( | 
| +    ScopedPlatformHandle platform_handle, | 
| +    MojoHandle* wrapper_handle) { | 
| +  MojoHandle h = AddDispatcher( | 
| +      PlatformHandleDispatcher::Create(std::move(platform_handle))); | 
| +  if (h == MOJO_HANDLE_INVALID) | 
| +    return MOJO_RESULT_RESOURCE_EXHAUSTED; | 
| +  *wrapper_handle = h; | 
| +  return MOJO_RESULT_OK; | 
| +} | 
| + | 
| +MojoResult Core::PassWrappedPlatformHandle( | 
| +    MojoHandle wrapper_handle, | 
| +    ScopedPlatformHandle* platform_handle) { | 
| +  base::AutoLock lock(handles_lock_); | 
| +  scoped_refptr<Dispatcher> d; | 
| +  MojoResult result = handles_.GetAndRemoveDispatcher(wrapper_handle, &d); | 
| +  if (result != MOJO_RESULT_OK) | 
| +    return result; | 
| +  PlatformHandleDispatcher* phd = | 
| +      static_cast<PlatformHandleDispatcher*>(d.get()); | 
| +  *platform_handle = phd->PassPlatformHandle(); | 
| +  phd->Close(); | 
| +  return MOJO_RESULT_OK; | 
| +} | 
| + | 
| +void Core::RequestShutdown(const base::Closure& callback) { | 
| +  base::Closure on_shutdown; | 
| +  if (base::ThreadTaskRunnerHandle::IsSet()) { | 
| +    on_shutdown = base::Bind(base::IgnoreResult(&base::TaskRunner::PostTask), | 
| +                             base::ThreadTaskRunnerHandle::Get(), | 
| +                             FROM_HERE, callback); | 
| +  } else { | 
| +    on_shutdown = callback; | 
| +  } | 
| +  GetNodeController()->RequestShutdown(on_shutdown); | 
| +} | 
| + | 
| +void Core::CreateParentMessagePipe( | 
| +    ScopedPlatformHandle platform_handle, | 
| +    const base::Callback<void(ScopedMessagePipeHandle)>& callback) { | 
| +  std::string token = GenerateRandomToken(); | 
| +  CreateParentMessagePipe(token, callback); | 
| +  RemoteMessagePipeBootstrap::CreateForParent( | 
| +      GetNodeController(), std::move(platform_handle), token); | 
| +} | 
| + | 
| +void Core::CreateChildMessagePipe( | 
| +    ScopedPlatformHandle platform_handle, | 
| +    const base::Callback<void(ScopedMessagePipeHandle)>& callback) { | 
| +  ports::PortRef port; | 
| +  GetNodeController()->node()->CreateUninitializedPort(&port); | 
| +  RemoteMessagePipeBootstrap::CreateForChild( | 
| +      GetNodeController(), std::move(platform_handle), port, | 
| +      base::Bind(&OnPortConnected, base::Unretained(this), 1, callback, port)); | 
| +} | 
| + | 
| +void Core::CreateParentMessagePipe( | 
| +    const std::string& token, | 
| +    const base::Callback<void(ScopedMessagePipeHandle)>& callback) { | 
| +  GetNodeController()->ReservePort( | 
| +      token, | 
| +      base::Bind(&OnPortConnected, base::Unretained(this), 0, callback)); | 
| +} | 
| + | 
| +void Core::CreateChildMessagePipe( | 
| +    const std::string& token, | 
| +    const base::Callback<void(ScopedMessagePipeHandle)>& callback) { | 
| +  ports::PortRef port; | 
| +  GetNodeController()->node()->CreateUninitializedPort(&port); | 
| +  GetNodeController()->ConnectToParentPort( | 
| +      port, token, | 
| +      base::Bind(&OnPortConnected, base::Unretained(this), 1, callback, port)); | 
| } | 
|  | 
| MojoResult Core::AsyncWait(MojoHandle handle, | 
| @@ -124,23 +215,15 @@ MojoTimeTicks Core::GetTimeTicksNow() { | 
| } | 
|  | 
| MojoResult Core::Close(MojoHandle handle) { | 
| -  if (handle == MOJO_HANDLE_INVALID) | 
| -    return MOJO_RESULT_INVALID_ARGUMENT; | 
| - | 
| scoped_refptr<Dispatcher> dispatcher; | 
| { | 
| -    base::AutoLock locker(handle_table_lock_); | 
| -    MojoResult result = | 
| -        handle_table_.GetAndRemoveDispatcher(handle, &dispatcher); | 
| -    if (result != MOJO_RESULT_OK) | 
| -      return result; | 
| +    base::AutoLock lock(handles_lock_); | 
| +    MojoResult rv = handles_.GetAndRemoveDispatcher(handle, &dispatcher); | 
| +    if (rv != MOJO_RESULT_OK) | 
| +      return rv; | 
| } | 
| - | 
| -  // The dispatcher doesn't have a say in being closed, but gets notified of it. | 
| -  // Note: This is done outside of |handle_table_lock_|. As a result, there's a | 
| -  // race condition that the dispatcher must handle; see the comment in | 
| -  // |Dispatcher| in dispatcher.h. | 
| -  return dispatcher->Close(); | 
| +  dispatcher->Close(); | 
| +  return MOJO_RESULT_OK; | 
| } | 
|  | 
| MojoResult Core::Wait(MojoHandle handle, | 
| @@ -230,7 +313,7 @@ MojoResult Core::GetReadyHandles(MojoHandle wait_set_handle, | 
| uint32_t* count, | 
| MojoHandle* handles, | 
| MojoResult* results, | 
| -                                 MojoHandleSignalsState* signals_state) { | 
| +                                 MojoHandleSignalsState* signals_states) { | 
| if (!handles || !count || !(*count) || !results) | 
| return MOJO_RESULT_INVALID_ARGUMENT; | 
|  | 
| @@ -248,8 +331,8 @@ MojoResult Core::GetReadyHandles(MojoHandle wait_set_handle, | 
| if (result == MOJO_RESULT_OK) { | 
| for (size_t i = 0; i < *count; i++) { | 
| handles[i] = static_cast<MojoHandle>(contexts[i]); | 
| -      if (signals_state) | 
| -        signals_state[i] = awoken_dispatchers[i]->GetHandleSignalsState(); | 
| +      if (signals_states) | 
| +        signals_states[i] = awoken_dispatchers[i]->GetHandleSignalsState(); | 
| } | 
| } | 
|  | 
| @@ -260,128 +343,74 @@ MojoResult Core::CreateMessagePipe( | 
| const MojoCreateMessagePipeOptions* options, | 
| MojoHandle* message_pipe_handle0, | 
| MojoHandle* message_pipe_handle1) { | 
| +  ports::PortRef port0, port1; | 
| +  GetNodeController()->node()->CreatePortPair(&port0, &port1); | 
| + | 
| CHECK(message_pipe_handle0); | 
| CHECK(message_pipe_handle1); | 
| -  MojoCreateMessagePipeOptions validated_options = {}; | 
| -  MojoResult result = | 
| -      MessagePipeDispatcher::ValidateCreateOptions(options, &validated_options); | 
| -  if (result != MOJO_RESULT_OK) | 
| -    return result; | 
|  | 
| -  scoped_refptr<MessagePipeDispatcher> dispatcher0 = | 
| -      MessagePipeDispatcher::Create(validated_options); | 
| -  scoped_refptr<MessagePipeDispatcher> dispatcher1 = | 
| -      MessagePipeDispatcher::Create(validated_options); | 
| +  uint64_t pipe_id = base::RandUint64(); | 
|  | 
| -  std::pair<MojoHandle, MojoHandle> handle_pair; | 
| -  { | 
| -    base::AutoLock locker(handle_table_lock_); | 
| -    handle_pair = handle_table_.AddDispatcherPair(dispatcher0, dispatcher1); | 
| -  } | 
| -  if (handle_pair.first == MOJO_HANDLE_INVALID) { | 
| -    DCHECK_EQ(handle_pair.second, MOJO_HANDLE_INVALID); | 
| -    LOG(ERROR) << "Handle table full"; | 
| -    dispatcher0->Close(); | 
| -    dispatcher1->Close(); | 
| +  *message_pipe_handle0 = AddDispatcher( | 
| +      new MessagePipeDispatcher(GetNodeController(), port0, pipe_id, 0)); | 
| +  if (*message_pipe_handle0 == MOJO_HANDLE_INVALID) | 
| return MOJO_RESULT_RESOURCE_EXHAUSTED; | 
| -  } | 
|  | 
| -  if (validated_options.flags & | 
| -          MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_TRANSFERABLE) { | 
| -    ScopedPlatformHandle server_handle, client_handle; | 
| -#if defined(OS_WIN) | 
| -    internal::g_broker->CreatePlatformChannelPair(&server_handle, | 
| -                                                  &client_handle); | 
| -#else | 
| -    PlatformChannelPair channel_pair; | 
| -    server_handle = channel_pair.PassServerHandle(); | 
| -    client_handle = channel_pair.PassClientHandle(); | 
| -#endif | 
| -    dispatcher0->Init(std::move(server_handle), nullptr, 0u, nullptr, 0u, | 
| -                      nullptr, nullptr); | 
| -    dispatcher1->Init(std::move(client_handle), nullptr, 0u, nullptr, 0u, | 
| -                      nullptr, nullptr); | 
| -  } else { | 
| -    uint64_t pipe_id = 0; | 
| -    // route_id 0 is used internally in RoutedRawChannel. See kInternalRouteId | 
| -    // in routed_raw_channel.cc. | 
| -    // route_id 1 is used by broker communication. See kBrokerRouteId in | 
| -    // broker_messages.h. | 
| -    while (pipe_id < 2) | 
| -      pipe_id = base::RandUint64(); | 
| -    dispatcher0->InitNonTransferable(pipe_id); | 
| -    dispatcher1->InitNonTransferable(pipe_id); | 
| +  *message_pipe_handle1 = AddDispatcher( | 
| +      new MessagePipeDispatcher(GetNodeController(), port1, pipe_id, 1)); | 
| +  if (*message_pipe_handle1 == MOJO_HANDLE_INVALID) { | 
| +    scoped_refptr<Dispatcher> unused; | 
| +    unused->Close(); | 
| +    handles_.GetAndRemoveDispatcher(*message_pipe_handle0, &unused); | 
| +    return MOJO_RESULT_RESOURCE_EXHAUSTED; | 
| } | 
|  | 
| -  *message_pipe_handle0 = handle_pair.first; | 
| -  *message_pipe_handle1 = handle_pair.second; | 
| return MOJO_RESULT_OK; | 
| } | 
|  | 
| -// Implementation note: To properly cancel waiters and avoid other races, this | 
| -// does not transfer dispatchers from one handle to another, even when sending a | 
| -// message in-process. Instead, it must transfer the "contents" of the | 
| -// dispatcher to a new dispatcher, and then close the old dispatcher. If this | 
| -// isn't done, in the in-process case, calls on the old handle may complete | 
| -// after the the message has been received and a new handle created (and | 
| -// possibly even after calls have been made on the new handle). | 
| MojoResult Core::WriteMessage(MojoHandle message_pipe_handle, | 
| const void* bytes, | 
| uint32_t num_bytes, | 
| const MojoHandle* handles, | 
| uint32_t num_handles, | 
| MojoWriteMessageFlags flags) { | 
| -  scoped_refptr<Dispatcher> dispatcher(GetDispatcher(message_pipe_handle)); | 
| +  auto dispatcher = GetDispatcher(message_pipe_handle); | 
| if (!dispatcher) | 
| return MOJO_RESULT_INVALID_ARGUMENT; | 
|  | 
| -  // Easy case: not sending any handles. | 
| -  if (num_handles == 0) | 
| -    return dispatcher->WriteMessage(bytes, num_bytes, nullptr, flags); | 
| - | 
| -  // We have to handle |handles| here, since we have to mark them busy in the | 
| -  // global handle table. We can't delegate this to the dispatcher, since the | 
| -  // handle table lock must be acquired before the dispatcher lock. | 
| -  // | 
| -  // (This leads to an oddity: |handles|/|num_handles| are always verified for | 
| -  // validity, even for dispatchers that don't support |WriteMessage()| and will | 
| -  // simply return failure unconditionally. It also breaks the usual | 
| -  // left-to-right verification order of arguments.) | 
| -  if (num_handles > GetConfiguration().max_message_num_handles) | 
| +  if (num_handles == 0)  // Fast path: no handles. | 
| +    return dispatcher->WriteMessage(bytes, num_bytes, nullptr, 0, flags); | 
| + | 
| +  CHECK(handles); | 
| + | 
| +  if (num_handles > kMaxHandlesPerMessage) | 
| return MOJO_RESULT_RESOURCE_EXHAUSTED; | 
|  | 
| -  // We'll need to hold on to the dispatchers so that we can pass them on to | 
| -  // |WriteMessage()| and also so that we can unlock their locks afterwards | 
| -  // without accessing the handle table. These can be dumb pointers, since their | 
| -  // entries in the handle table won't get removed (since they'll be marked as | 
| -  // busy). | 
| -  std::vector<DispatcherTransport> transports(num_handles); | 
| +  for (size_t i = 0; i < num_handles; ++i) { | 
| +    if (message_pipe_handle == handles[i]) | 
| +      return MOJO_RESULT_BUSY; | 
| +  } | 
|  | 
| -  // When we pass handles, we have to try to take all their dispatchers' locks | 
| -  // and mark the handles as busy. If the call succeeds, we then remove the | 
| -  // handles from the handle table. | 
| +  std::vector<Dispatcher::DispatcherInTransit> dispatchers; | 
| { | 
| -    base::AutoLock locker(handle_table_lock_); | 
| -    MojoResult result = handle_table_.MarkBusyAndStartTransport( | 
| -        message_pipe_handle, handles, num_handles, &transports); | 
| -    if (result != MOJO_RESULT_OK) | 
| -      return result; | 
| +    base::AutoLock lock(handles_lock_); | 
| +    MojoResult rv = handles_.BeginTransit(handles, num_handles, &dispatchers); | 
| +    if (rv != MOJO_RESULT_OK) { | 
| +      handles_.CancelTransit(dispatchers); | 
| +      return rv; | 
| +    } | 
| } | 
| +  DCHECK_EQ(num_handles, dispatchers.size()); | 
|  | 
| -  MojoResult rv = | 
| -      dispatcher->WriteMessage(bytes, num_bytes, &transports, flags); | 
| - | 
| -  // We need to release the dispatcher locks before we take the handle table | 
| -  // lock. | 
| -  for (uint32_t i = 0; i < num_handles; i++) | 
| -    transports[i].End(); | 
| +  MojoResult rv = dispatcher->WriteMessage( | 
| +      bytes, num_bytes, dispatchers.data(), num_handles, flags); | 
|  | 
| { | 
| -    base::AutoLock locker(handle_table_lock_); | 
| +    base::AutoLock lock(handles_lock_); | 
| if (rv == MOJO_RESULT_OK) { | 
| -      handle_table_.RemoveBusyHandles(handles, num_handles); | 
| +      handles_.CompleteTransitAndClose(dispatchers); | 
| } else { | 
| -      handle_table_.RestoreBusyHandles(handles, num_handles); | 
| +      handles_.CancelTransit(dispatchers); | 
| } | 
| } | 
|  | 
| @@ -394,92 +423,65 @@ MojoResult Core::ReadMessage(MojoHandle message_pipe_handle, | 
| MojoHandle* handles, | 
| uint32_t* num_handles, | 
| MojoReadMessageFlags flags) { | 
| -  scoped_refptr<Dispatcher> dispatcher(GetDispatcher(message_pipe_handle)); | 
| +  CHECK((!num_handles || !*num_handles || handles) && | 
| +        (!num_bytes || !*num_bytes || bytes)); | 
| +  auto dispatcher = GetDispatcher(message_pipe_handle); | 
| if (!dispatcher) | 
| return MOJO_RESULT_INVALID_ARGUMENT; | 
| - | 
| -  MojoResult rv; | 
| -  uint32_t num_handles_value = num_handles ? *num_handles : 0; | 
| -  if (num_handles_value == 0) { | 
| -    // Easy case: won't receive any handles. | 
| -    rv = dispatcher->ReadMessage(bytes, num_bytes, nullptr, &num_handles_value, | 
| -                                 flags); | 
| -  } else { | 
| -    DispatcherVector dispatchers; | 
| -    rv = dispatcher->ReadMessage(bytes, num_bytes, &dispatchers, | 
| -                                 &num_handles_value, flags); | 
| -    if (!dispatchers.empty()) { | 
| -      DCHECK_EQ(rv, MOJO_RESULT_OK); | 
| -      DCHECK(num_handles); | 
| -      DCHECK_LE(dispatchers.size(), static_cast<size_t>(num_handles_value)); | 
| - | 
| -      bool success; | 
| -      { | 
| -        base::AutoLock locker(handle_table_lock_); | 
| -        success = handle_table_.AddDispatcherVector(dispatchers, handles); | 
| -      } | 
| -      if (!success) { | 
| -        LOG(ERROR) << "Received message with " << dispatchers.size() | 
| -                   << " handles, but handle table full"; | 
| -        // Close dispatchers (outside the lock). | 
| -        for (size_t i = 0; i < dispatchers.size(); i++) { | 
| -          if (dispatchers[i]) | 
| -            dispatchers[i]->Close(); | 
| -        } | 
| -        if (rv == MOJO_RESULT_OK) | 
| -          rv = MOJO_RESULT_RESOURCE_EXHAUSTED; | 
| -      } | 
| -    } | 
| -  } | 
| - | 
| -  if (num_handles) | 
| -    *num_handles = num_handles_value; | 
| -  return rv; | 
| +  return dispatcher->ReadMessage(bytes, num_bytes, handles, num_handles, flags); | 
| } | 
|  | 
| MojoResult Core::CreateDataPipe( | 
| const MojoCreateDataPipeOptions* options, | 
| MojoHandle* data_pipe_producer_handle, | 
| MojoHandle* data_pipe_consumer_handle) { | 
| -  MojoCreateDataPipeOptions validated_options = {}; | 
| -  MojoResult result = | 
| -      DataPipe::ValidateCreateOptions(options, &validated_options); | 
| -  if (result != MOJO_RESULT_OK) | 
| -    return result; | 
| +  if (options && options->struct_size != sizeof(MojoCreateDataPipeOptions)) | 
| +    return MOJO_RESULT_INVALID_ARGUMENT; | 
|  | 
| -  scoped_refptr<DataPipeProducerDispatcher> producer_dispatcher = | 
| -      DataPipeProducerDispatcher::Create(validated_options); | 
| -  scoped_refptr<DataPipeConsumerDispatcher> consumer_dispatcher = | 
| -      DataPipeConsumerDispatcher::Create(validated_options); | 
| +  MojoCreateDataPipeOptions create_options; | 
| +  create_options.struct_size = sizeof(MojoCreateDataPipeOptions); | 
| +  create_options.flags = options ? options->flags : 0; | 
| +  create_options.element_num_bytes = options ? options->element_num_bytes : 1; | 
| +  // TODO: Use Configuration to get default data pipe capacity. | 
| +  create_options.capacity_num_bytes = | 
| +      options && options->capacity_num_bytes ? options->capacity_num_bytes | 
| +                                             : 64 * 1024; | 
| + | 
| +  // TODO: Broker through the parent when necessary. | 
| +  scoped_refptr<PlatformSharedBuffer> ring_buffer = | 
| +      GetNodeController()->CreateSharedBuffer( | 
| +          create_options.capacity_num_bytes); | 
| +  if (!ring_buffer) | 
| +    return MOJO_RESULT_RESOURCE_EXHAUSTED; | 
|  | 
| -  std::pair<MojoHandle, MojoHandle> handle_pair; | 
| -  { | 
| -    base::AutoLock locker(handle_table_lock_); | 
| -    handle_pair = handle_table_.AddDispatcherPair(producer_dispatcher, | 
| -                                                  consumer_dispatcher); | 
| -  } | 
| -  if (handle_pair.first == MOJO_HANDLE_INVALID) { | 
| -    DCHECK_EQ(handle_pair.second, MOJO_HANDLE_INVALID); | 
| -    LOG(ERROR) << "Handle table full"; | 
| -    producer_dispatcher->Close(); | 
| -    consumer_dispatcher->Close(); | 
| +  ports::PortRef port0, port1; | 
| +  GetNodeController()->node()->CreatePortPair(&port0, &port1); | 
| + | 
| +  CHECK(data_pipe_producer_handle); | 
| +  CHECK(data_pipe_consumer_handle); | 
| + | 
| +  uint64_t pipe_id = base::RandUint64(); | 
| + | 
| +  scoped_refptr<Dispatcher> producer = new DataPipeProducerDispatcher( | 
| +      GetNodeController(), port0, ring_buffer, create_options, | 
| +      true /* initialized */, pipe_id); | 
| +  scoped_refptr<Dispatcher> consumer = new DataPipeConsumerDispatcher( | 
| +      GetNodeController(), port1, ring_buffer, create_options, | 
| +      true /* initialized */, pipe_id); | 
| + | 
| +  *data_pipe_producer_handle = AddDispatcher(producer); | 
| +  *data_pipe_consumer_handle = AddDispatcher(consumer); | 
| +  if (*data_pipe_producer_handle == MOJO_HANDLE_INVALID || | 
| +      *data_pipe_consumer_handle == MOJO_HANDLE_INVALID) { | 
| +    if (*data_pipe_producer_handle != MOJO_HANDLE_INVALID) { | 
| +      scoped_refptr<Dispatcher> unused; | 
| +      handles_.GetAndRemoveDispatcher(*data_pipe_producer_handle, &unused); | 
| +    } | 
| +    producer->Close(); | 
| +    consumer->Close(); | 
| return MOJO_RESULT_RESOURCE_EXHAUSTED; | 
| } | 
| -  DCHECK_NE(handle_pair.second, MOJO_HANDLE_INVALID); | 
| - | 
| -  ScopedPlatformHandle server_handle, client_handle; | 
| -#if defined(OS_WIN) | 
| -  internal::g_broker->CreatePlatformChannelPair(&server_handle, &client_handle); | 
| -#else | 
| -  PlatformChannelPair channel_pair; | 
| -  server_handle = channel_pair.PassServerHandle(); | 
| -  client_handle = channel_pair.PassClientHandle(); | 
| -#endif | 
| -  producer_dispatcher->Init(std::move(server_handle), nullptr, 0u); | 
| -  consumer_dispatcher->Init(std::move(client_handle), nullptr, 0u); | 
| - | 
| -  *data_pipe_producer_handle = handle_pair.first; | 
| -  *data_pipe_consumer_handle = handle_pair.second; | 
| + | 
| return MOJO_RESULT_OK; | 
| } | 
|  | 
| @@ -562,8 +564,8 @@ MojoResult Core::CreateSharedBuffer( | 
| return result; | 
|  | 
| scoped_refptr<SharedBufferDispatcher> dispatcher; | 
| -  result = SharedBufferDispatcher::Create(platform_support_, validated_options, | 
| -                                          num_bytes, &dispatcher); | 
| +  result = SharedBufferDispatcher::Create( | 
| +      internal::g_platform_support, validated_options, num_bytes, &dispatcher); | 
| if (result != MOJO_RESULT_OK) { | 
| DCHECK(!dispatcher); | 
| return result; | 
| @@ -632,19 +634,20 @@ MojoResult Core::MapBuffer(MojoHandle buffer_handle, | 
| } | 
|  | 
| MojoResult Core::UnmapBuffer(void* buffer) { | 
| -  base::AutoLock locker(mapping_table_lock_); | 
| +  base::AutoLock lock(mapping_table_lock_); | 
| return mapping_table_.RemoveMapping(buffer); | 
| } | 
|  | 
| -// Note: We allow |handles| to repeat the same handle multiple times, since | 
| -// different flags may be specified. | 
| -// TODO(vtl): This incurs a performance cost in |Remove()|. Analyze this | 
| -// more carefully and address it if necessary. | 
| +void Core::GetActiveHandlesForTest(std::vector<MojoHandle>* handles) { | 
| +  base::AutoLock lock(handles_lock_); | 
| +  handles_.GetActiveHandlesForTest(handles); | 
| +} | 
| + | 
| MojoResult Core::WaitManyInternal(const MojoHandle* handles, | 
| const MojoHandleSignals* signals, | 
| uint32_t num_handles, | 
| MojoDeadline deadline, | 
| -                                  uint32_t* result_index, | 
| +                                  uint32_t *result_index, | 
| HandleSignalsState* signals_states) { | 
| CHECK(handles); | 
| CHECK(signals); | 
| @@ -705,5 +708,15 @@ MojoResult Core::WaitManyInternal(const MojoHandle* handles, | 
| return rv; | 
| } | 
|  | 
| +// static | 
| +void Core::PassNodeControllerToIOThread( | 
| +    scoped_ptr<NodeController> node_controller) { | 
| +  // It's OK to leak this reference. At this point we know the IO loop is still | 
| +  // running, and we know the NodeController will observe its eventual | 
| +  // destruction. This tells the NodeController to delete itself when that | 
| +  // happens. | 
| +  node_controller.release()->DestroyOnIOThreadShutdown(); | 
| +} | 
| + | 
| }  // namespace edk | 
| }  // namespace mojo | 
|  |