| Index: third_party/mojo/src/mojo/edk/embedder/embedder.cc
|
| diff --git a/third_party/mojo/src/mojo/edk/embedder/embedder.cc b/third_party/mojo/src/mojo/edk/embedder/embedder.cc
|
| index d0a5135e8d0e03cb9647e9f177c45b488f7d416f..a4997e7da72094fd8b400c02adc65b3fe5714585 100644
|
| --- a/third_party/mojo/src/mojo/edk/embedder/embedder.cc
|
| +++ b/third_party/mojo/src/mojo/edk/embedder/embedder.cc
|
| @@ -6,25 +6,60 @@
|
|
|
| #include "base/atomicops.h"
|
| #include "base/bind.h"
|
| +#include "base/bind_helpers.h"
|
| #include "base/location.h"
|
| #include "base/logging.h"
|
| #include "base/memory/scoped_ptr.h"
|
| #include "base/message_loop/message_loop_proxy.h"
|
| +#include "base/task_runner.h"
|
| #include "mojo/edk/embedder/embedder_internal.h"
|
| +#include "mojo/edk/embedder/master_process_delegate.h"
|
| #include "mojo/edk/embedder/platform_support.h"
|
| +#include "mojo/edk/embedder/process_delegate.h"
|
| +#include "mojo/edk/embedder/slave_process_delegate.h"
|
| #include "mojo/edk/system/channel.h"
|
| #include "mojo/edk/system/channel_manager.h"
|
| #include "mojo/edk/system/configuration.h"
|
| +#include "mojo/edk/system/connection_manager.h"
|
| #include "mojo/edk/system/core.h"
|
| +#include "mojo/edk/system/master_connection_manager.h"
|
| #include "mojo/edk/system/message_pipe_dispatcher.h"
|
| #include "mojo/edk/system/platform_handle_dispatcher.h"
|
| #include "mojo/edk/system/raw_channel.h"
|
| +#include "mojo/edk/system/slave_connection_manager.h"
|
|
|
| namespace mojo {
|
| namespace embedder {
|
|
|
| +namespace internal {
|
| +
|
| +// Declared in embedder_internal.h.
|
| +PlatformSupport* g_platform_support = nullptr;
|
| +system::Core* g_core = nullptr;
|
| +ProcessType g_process_type = ProcessType::UNINITIALIZED;
|
| +
|
| +} // namespace internal
|
| +
|
| namespace {
|
|
|
| +// The following global variables are set in |InitIPCSupport()| and reset by
|
| +// |ShutdownIPCSupport()|/|ShutdownIPCSupportOnIOThread()|.
|
| +
|
| +// Note: This needs to be |AddRef()|ed/|Release()|d.
|
| +base::TaskRunner* g_delegate_thread_task_runner = nullptr;
|
| +
|
| +ProcessDelegate* g_process_delegate = nullptr;
|
| +
|
| +// Note: This needs to be |AddRef()|ed/|Release()|d.
|
| +base::TaskRunner* g_io_thread_task_runner = nullptr;
|
| +
|
| +// Instance of |ConnectionManager| used by the channel manager (below).
|
| +system::ConnectionManager* g_connection_manager = nullptr;
|
| +
|
| +// Instance of |ChannelManager| used by the channel management functions
|
| +// (|CreateChannel()|, etc.).
|
| +system::ChannelManager* g_channel_manager = nullptr;
|
| +
|
| // TODO(vtl): For now, we need this to be thread-safe (since theoretically we
|
| // currently support multiple channel creation threads -- possibly one per
|
| // channel). Eventually, we won't need it to be thread-safe (we'll require a
|
| @@ -46,18 +81,22 @@ system::ChannelId MakeChannelId() {
|
| return static_cast<system::ChannelId>(-new_counter_value);
|
| }
|
|
|
| -} // namespace
|
| +// Note: Called on the I/O thread.
|
| +void ShutdownIPCSupportHelper() {
|
| + // Save these before nuking them using |ShutdownChannelOnIOThread()|.
|
| + scoped_refptr<base::TaskRunner> delegate_thread_task_runner(
|
| + g_delegate_thread_task_runner);
|
| + ProcessDelegate* process_delegate = g_process_delegate;
|
|
|
| -namespace internal {
|
| + ShutdownIPCSupportOnIOThread();
|
|
|
| -// Declared in embedder_internal.h.
|
| -PlatformSupport* g_platform_support = nullptr;
|
| -system::Core* g_core = nullptr;
|
| -system::ChannelManager* g_channel_manager = nullptr;
|
| -MasterProcessDelegate* g_master_process_delegate = nullptr;
|
| -SlaveProcessDelegate* g_slave_process_delegate = nullptr;
|
| + bool ok = delegate_thread_task_runner->PostTask(
|
| + FROM_HERE, base::Bind(&ProcessDelegate::OnShutdownComplete,
|
| + base::Unretained(process_delegate)));
|
| + DCHECK(ok);
|
| +}
|
|
|
| -} // namespace internal
|
| +} // namespace
|
|
|
| Configuration* GetConfiguration() {
|
| return system::GetMutableConfiguration();
|
| @@ -71,33 +110,150 @@ void Init(scoped_ptr<PlatformSupport> platform_support) {
|
|
|
| DCHECK(!internal::g_core);
|
| internal::g_core = new system::Core(internal::g_platform_support);
|
| +}
|
|
|
| - DCHECK(!internal::g_channel_manager);
|
| - internal::g_channel_manager =
|
| - new system::ChannelManager(internal::g_platform_support);
|
| +MojoResult AsyncWait(MojoHandle handle,
|
| + MojoHandleSignals signals,
|
| + const base::Callback<void(MojoResult)>& callback) {
|
| + return internal::g_core->AsyncWait(handle, signals, callback);
|
| }
|
|
|
| -void InitMaster(scoped_refptr<base::TaskRunner> delegate_thread_task_runner,
|
| - MasterProcessDelegate* master_process_delegate,
|
| - scoped_refptr<base::TaskRunner> io_thread_task_runner) {
|
| - // |Init()| must have already been called.
|
| +MojoResult CreatePlatformHandleWrapper(
|
| + ScopedPlatformHandle platform_handle,
|
| + MojoHandle* platform_handle_wrapper_handle) {
|
| + DCHECK(platform_handle_wrapper_handle);
|
| +
|
| + scoped_refptr<system::Dispatcher> dispatcher(
|
| + new system::PlatformHandleDispatcher(platform_handle.Pass()));
|
| +
|
| + DCHECK(internal::g_core);
|
| + MojoHandle h = internal::g_core->AddDispatcher(dispatcher);
|
| + if (h == MOJO_HANDLE_INVALID) {
|
| + LOG(ERROR) << "Handle table full";
|
| + dispatcher->Close();
|
| + return MOJO_RESULT_RESOURCE_EXHAUSTED;
|
| + }
|
| +
|
| + *platform_handle_wrapper_handle = h;
|
| + return MOJO_RESULT_OK;
|
| +}
|
| +
|
| +MojoResult PassWrappedPlatformHandle(MojoHandle platform_handle_wrapper_handle,
|
| + ScopedPlatformHandle* platform_handle) {
|
| + DCHECK(platform_handle);
|
| +
|
| DCHECK(internal::g_core);
|
| + scoped_refptr<system::Dispatcher> dispatcher(
|
| + internal::g_core->GetDispatcher(platform_handle_wrapper_handle));
|
| + if (!dispatcher)
|
| + return MOJO_RESULT_INVALID_ARGUMENT;
|
|
|
| - // TODO(vtl): This is temporary. We really want to construct a
|
| - // |MasterConnectionManager| here, which will in turn hold on to the delegate.
|
| - internal::g_master_process_delegate = master_process_delegate;
|
| + if (dispatcher->GetType() != system::Dispatcher::kTypePlatformHandle)
|
| + return MOJO_RESULT_INVALID_ARGUMENT;
|
| +
|
| + *platform_handle =
|
| + static_cast<system::PlatformHandleDispatcher*>(dispatcher.get())
|
| + ->PassPlatformHandle()
|
| + .Pass();
|
| + return MOJO_RESULT_OK;
|
| }
|
|
|
| -void InitSlave(scoped_refptr<base::TaskRunner> delegate_thread_task_runner,
|
| - SlaveProcessDelegate* slave_process_delegate,
|
| - scoped_refptr<base::TaskRunner> io_thread_task_runner,
|
| - ScopedPlatformHandle platform_handle) {
|
| +void InitIPCSupport(ProcessType process_type,
|
| + scoped_refptr<base::TaskRunner> delegate_thread_task_runner,
|
| + ProcessDelegate* process_delegate,
|
| + scoped_refptr<base::TaskRunner> io_thread_task_runner,
|
| + ScopedPlatformHandle platform_handle) {
|
| // |Init()| must have already been called.
|
| DCHECK(internal::g_core);
|
| + // And not |InitIPCSupport()| (without |ShutdownIPCSupport()|).
|
| + DCHECK(internal::g_process_type == ProcessType::UNINITIALIZED);
|
|
|
| - // TODO(vtl): This is temporary. We really want to construct a
|
| - // |SlaveConnectionManager| here, which will in turn hold on to the delegate.
|
| - internal::g_slave_process_delegate = slave_process_delegate;
|
| + internal::g_process_type = process_type;
|
| +
|
| + DCHECK(delegate_thread_task_runner);
|
| + DCHECK(!g_delegate_thread_task_runner);
|
| + g_delegate_thread_task_runner = delegate_thread_task_runner.get();
|
| + g_delegate_thread_task_runner->AddRef();
|
| +
|
| + DCHECK(process_delegate->GetType() == process_type);
|
| + DCHECK(!g_process_delegate);
|
| + g_process_delegate = process_delegate;
|
| +
|
| + DCHECK(io_thread_task_runner);
|
| + DCHECK(!g_io_thread_task_runner);
|
| + g_io_thread_task_runner = io_thread_task_runner.get();
|
| + g_io_thread_task_runner->AddRef();
|
| +
|
| + DCHECK(!g_connection_manager);
|
| + switch (process_type) {
|
| + case ProcessType::UNINITIALIZED:
|
| + CHECK(false);
|
| + break;
|
| + case ProcessType::NONE:
|
| + DCHECK(!platform_handle.is_valid()); // We wouldn't do anything with it.
|
| + // Nothing to do.
|
| + break;
|
| + case ProcessType::MASTER:
|
| + DCHECK(!platform_handle.is_valid()); // We wouldn't do anything with it.
|
| + g_connection_manager = new system::MasterConnectionManager();
|
| + static_cast<system::MasterConnectionManager*>(g_connection_manager)
|
| + ->Init(g_delegate_thread_task_runner,
|
| + static_cast<MasterProcessDelegate*>(g_process_delegate));
|
| + break;
|
| + case ProcessType::SLAVE:
|
| + DCHECK(platform_handle.is_valid());
|
| + g_connection_manager = new system::SlaveConnectionManager();
|
| + static_cast<system::SlaveConnectionManager*>(g_connection_manager)
|
| + ->Init(g_delegate_thread_task_runner,
|
| + static_cast<SlaveProcessDelegate*>(g_process_delegate),
|
| + platform_handle.Pass());
|
| + break;
|
| + }
|
| +
|
| + DCHECK(!g_channel_manager);
|
| + g_channel_manager =
|
| + new system::ChannelManager(internal::g_platform_support,
|
| + io_thread_task_runner, g_connection_manager);
|
| +}
|
| +
|
| +void ShutdownIPCSupportOnIOThread() {
|
| + DCHECK(internal::g_process_type != ProcessType::UNINITIALIZED);
|
| +
|
| + g_channel_manager->ShutdownOnIOThread();
|
| + delete g_channel_manager;
|
| + g_channel_manager = nullptr;
|
| +
|
| + if (g_connection_manager) {
|
| + g_connection_manager->Shutdown();
|
| + delete g_connection_manager;
|
| + g_connection_manager = nullptr;
|
| + }
|
| +
|
| + g_io_thread_task_runner->Release();
|
| + g_io_thread_task_runner = nullptr;
|
| +
|
| + g_delegate_thread_task_runner->Release();
|
| + g_delegate_thread_task_runner = nullptr;
|
| +
|
| + g_process_delegate = nullptr;
|
| +
|
| + internal::g_process_type = ProcessType::UNINITIALIZED;
|
| +}
|
| +
|
| +void ShutdownIPCSupport() {
|
| + DCHECK(internal::g_process_type != ProcessType::UNINITIALIZED);
|
| +
|
| + bool ok = g_io_thread_task_runner->PostTask(
|
| + FROM_HERE, base::Bind(&ShutdownIPCSupportHelper));
|
| + DCHECK(ok);
|
| +}
|
| +
|
| +void ConnectToSlave(SlaveInfo slave_info,
|
| + ScopedPlatformHandle platform_handle) {
|
| + DCHECK(platform_handle.is_valid());
|
| + DCHECK(internal::g_process_type == ProcessType::MASTER);
|
| + static_cast<system::MasterConnectionManager*>(g_connection_manager)
|
| + ->AddSlave(slave_info, platform_handle.Pass());
|
| }
|
|
|
| // TODO(vtl): Write tests for this.
|
| @@ -109,8 +265,8 @@ ScopedMessagePipeHandle CreateChannelOnIOThread(
|
|
|
| *channel_info = new ChannelInfo(MakeChannelId());
|
| scoped_refptr<system::MessagePipeDispatcher> dispatcher =
|
| - internal::g_channel_manager->CreateChannelOnIOThread(
|
| - (*channel_info)->channel_id, platform_handle.Pass());
|
| + g_channel_manager->CreateChannelOnIOThread((*channel_info)->channel_id,
|
| + platform_handle.Pass());
|
|
|
| ScopedMessagePipeHandle rv(
|
| MessagePipeHandle(internal::g_core->AddDispatcher(dispatcher)));
|
| @@ -123,7 +279,7 @@ ScopedMessagePipeHandle CreateChannelOnIOThread(
|
| ScopedMessagePipeHandle CreateChannel(
|
| ScopedPlatformHandle platform_handle,
|
| scoped_refptr<base::TaskRunner> io_thread_task_runner,
|
| - DidCreateChannelCallback callback,
|
| + const DidCreateChannelCallback& callback,
|
| scoped_refptr<base::TaskRunner> callback_thread_task_runner) {
|
| DCHECK(platform_handle.is_valid());
|
| DCHECK(io_thread_task_runner);
|
| @@ -132,7 +288,7 @@ ScopedMessagePipeHandle CreateChannel(
|
| system::ChannelId channel_id = MakeChannelId();
|
| scoped_ptr<ChannelInfo> channel_info(new ChannelInfo(channel_id));
|
| scoped_refptr<system::MessagePipeDispatcher> dispatcher =
|
| - internal::g_channel_manager->CreateChannel(
|
| + g_channel_manager->CreateChannel(
|
| channel_id, platform_handle.Pass(), io_thread_task_runner,
|
| base::Bind(callback, base::Unretained(channel_info.release())),
|
| callback_thread_task_runner);
|
| @@ -146,66 +302,32 @@ ScopedMessagePipeHandle CreateChannel(
|
| }
|
|
|
| // TODO(vtl): Write tests for this.
|
| -void DestroyChannel(ChannelInfo* channel_info) {
|
| +void DestroyChannelOnIOThread(ChannelInfo* channel_info) {
|
| DCHECK(channel_info);
|
| DCHECK(channel_info->channel_id);
|
| - DCHECK(internal::g_channel_manager);
|
| - // This will destroy the channel synchronously if called from the channel
|
| - // thread.
|
| - internal::g_channel_manager->ShutdownChannel(channel_info->channel_id);
|
| + DCHECK(g_channel_manager);
|
| + g_channel_manager->ShutdownChannelOnIOThread(channel_info->channel_id);
|
| delete channel_info;
|
| }
|
|
|
| -void WillDestroyChannelSoon(ChannelInfo* channel_info) {
|
| +// TODO(vtl): Write tests for this.
|
| +void DestroyChannel(
|
| + ChannelInfo* channel_info,
|
| + const DidDestroyChannelCallback& callback,
|
| + scoped_refptr<base::TaskRunner> callback_thread_task_runner) {
|
| DCHECK(channel_info);
|
| - DCHECK(internal::g_channel_manager);
|
| - internal::g_channel_manager->WillShutdownChannel(channel_info->channel_id);
|
| -}
|
| -
|
| -MojoResult CreatePlatformHandleWrapper(
|
| - ScopedPlatformHandle platform_handle,
|
| - MojoHandle* platform_handle_wrapper_handle) {
|
| - DCHECK(platform_handle_wrapper_handle);
|
| -
|
| - scoped_refptr<system::Dispatcher> dispatcher(
|
| - new system::PlatformHandleDispatcher(platform_handle.Pass()));
|
| -
|
| - DCHECK(internal::g_core);
|
| - MojoHandle h = internal::g_core->AddDispatcher(dispatcher);
|
| - if (h == MOJO_HANDLE_INVALID) {
|
| - LOG(ERROR) << "Handle table full";
|
| - dispatcher->Close();
|
| - return MOJO_RESULT_RESOURCE_EXHAUSTED;
|
| - }
|
| -
|
| - *platform_handle_wrapper_handle = h;
|
| - return MOJO_RESULT_OK;
|
| -}
|
| -
|
| -MojoResult PassWrappedPlatformHandle(MojoHandle platform_handle_wrapper_handle,
|
| - ScopedPlatformHandle* platform_handle) {
|
| - DCHECK(platform_handle);
|
| -
|
| - DCHECK(internal::g_core);
|
| - scoped_refptr<system::Dispatcher> dispatcher(
|
| - internal::g_core->GetDispatcher(platform_handle_wrapper_handle));
|
| - if (!dispatcher)
|
| - return MOJO_RESULT_INVALID_ARGUMENT;
|
| -
|
| - if (dispatcher->GetType() != system::Dispatcher::kTypePlatformHandle)
|
| - return MOJO_RESULT_INVALID_ARGUMENT;
|
| -
|
| - *platform_handle =
|
| - static_cast<system::PlatformHandleDispatcher*>(dispatcher.get())
|
| - ->PassPlatformHandle()
|
| - .Pass();
|
| - return MOJO_RESULT_OK;
|
| + DCHECK(channel_info->channel_id);
|
| + DCHECK(!callback.is_null());
|
| + DCHECK(g_channel_manager);
|
| + g_channel_manager->ShutdownChannel(channel_info->channel_id, callback,
|
| + callback_thread_task_runner);
|
| + delete channel_info;
|
| }
|
|
|
| -MojoResult AsyncWait(MojoHandle handle,
|
| - MojoHandleSignals signals,
|
| - base::Callback<void(MojoResult)> callback) {
|
| - return internal::g_core->AsyncWait(handle, signals, callback);
|
| +void WillDestroyChannelSoon(ChannelInfo* channel_info) {
|
| + DCHECK(channel_info);
|
| + DCHECK(g_channel_manager);
|
| + g_channel_manager->WillShutdownChannel(channel_info->channel_id);
|
| }
|
|
|
| } // namespace embedder
|
|
|