Chromium Code Reviews| Index: mojo/edk/system/child_broker_host.cc |
| diff --git a/mojo/edk/system/child_broker_host.cc b/mojo/edk/system/child_broker_host.cc |
| index 20a73d9589e7812da2cf5348c5dece9b682269bd..916c521f76a49dbf546d86772c0df768e77bdf2f 100644 |
| --- a/mojo/edk/system/child_broker_host.cc |
| +++ b/mojo/edk/system/child_broker_host.cc |
| @@ -12,12 +12,20 @@ |
| #include "base/lazy_instance.h" |
| #include "mojo/edk/embedder/embedder_internal.h" |
| #include "mojo/edk/embedder/platform_channel_pair.h" |
| +#include "mojo/edk/embedder/platform_shared_buffer.h" |
| +#include "mojo/edk/embedder/platform_support.h" |
| #include "mojo/edk/system/broker_messages.h" |
| #include "mojo/edk/system/broker_state.h" |
| #include "mojo/edk/system/configuration.h" |
| #include "mojo/edk/system/core.h" |
| #include "mojo/edk/system/platform_handle_dispatcher.h" |
| +#if defined(OS_POSIX) |
| +#include <sys/uio.h> |
| + |
| +#include "mojo/edk/embedder/platform_channel_utils_posix.h" |
| +#endif |
| + |
| namespace mojo { |
| namespace edk { |
| @@ -29,27 +37,33 @@ static const int kDefaultReadBufferSize = 256; |
| ChildBrokerHost::ChildBrokerHost(base::ProcessHandle child_process, |
| ScopedPlatformHandle pipe) |
| - : process_id_(base::GetProcId(child_process)), child_channel_(nullptr) { |
| - ScopedPlatformHandle parent_async_channel_handle; |
| -#if defined(OS_POSIX) |
| - parent_async_channel_handle = std::move(pipe); |
| -#else |
| + : process_id_(base::GetProcId(child_process)), |
| + child_channel_(nullptr), |
| + num_bytes_read_(0) { |
| + // First set up the synchronous pipe. |
| + sync_channel_ = std::move(pipe); |
| + |
| + // See comment in ChildBroker::SetChildBrokerHostHandle. Summary is we need |
| + // two pipes, so send the second one over the first one. |
| + PlatformChannelPair parent_pipe; |
| + |
| + ScopedPlatformHandle parent_async_channel_handle = |
| + parent_pipe.PassServerHandle(); |
| + |
| + num_bytes_read_ = 0; |
| + |
| +// Send over the async pipe. |
| +#if defined(OS_WIN) |
| DuplicateHandle(GetCurrentProcess(), child_process, |
| GetCurrentProcess(), &child_process, |
| 0, FALSE, DUPLICATE_SAME_ACCESS); |
| child_process_ = base::Process(child_process); |
| - sync_channel_ = pipe.Pass(); |
| + |
| memset(&read_context_.overlapped, 0, sizeof(read_context_.overlapped)); |
| read_context_.handler = this; |
| memset(&write_context_.overlapped, 0, sizeof(write_context_.overlapped)); |
| write_context_.handler = this; |
| read_data_.resize(kDefaultReadBufferSize); |
| - num_bytes_read_ = 0; |
| - |
| - // See comment in ChildBroker::SetChildBrokerHostHandle. Summary is we need |
| - // two pipes on Windows, so send the second one over the first one. |
| - PlatformChannelPair parent_pipe; |
| - parent_async_channel_handle = parent_pipe.PassServerHandle(); |
| HANDLE duplicated_child_handle = |
| DuplicateToChild(parent_pipe.PassClientHandle().release().handle); |
| @@ -57,6 +71,13 @@ ChildBrokerHost::ChildBrokerHost(base::ProcessHandle child_process, |
| &duplicated_child_handle, sizeof(duplicated_child_handle), |
| NULL, &write_context_.overlapped); |
| DCHECK(rv || GetLastError() == ERROR_IO_PENDING); |
| +#else |
| + // Send just one null byte. |
| + struct iovec iov = {const_cast<char*>(""), 1}; |
| + PlatformHandle child_handle = parent_pipe.PassClientHandle().release(); |
| + ssize_t result = PlatformChannelSendmsgWithHandles(sync_channel_.get(), &iov, |
|
Eliot Courtney
2016/01/05 04:45:47
Should this be done in a loop to make sure it send
|
| + 1, &child_handle, 1); |
| + CHECK_NE(-1, result); |
| #endif |
| internal::g_io_thread_task_runner->PostTask( |
| @@ -128,6 +149,11 @@ void ChildBrokerHost::InitOnIO( |
| base::MessageLoopForIO::current()->RegisterIOHandler( |
| sync_channel_.get().handle, this); |
| BeginRead(); |
| +#else |
| + base::MessageLoopForIO::current()->WatchFileDescriptor( |
| + sync_channel_.get().handle, true, base::MessageLoopForIO::WATCH_READ, |
| + &fd_controller_, this); |
| + DoRead(); |
| #endif |
| } |
| @@ -166,11 +192,8 @@ void ChildBrokerHost::OnError(Error error) { |
| } |
| void ChildBrokerHost::ChannelDestructed(RoutedRawChannel* channel) { |
| - // On Windows, we have two pipes to the child process. It's easier to wait |
| + // We have two pipes to the child process. It's easier to wait |
| // until we get the error from the pipe that is used for synchronous I/O. |
| -#if !defined(OS_WIN) |
| - delete this; |
| -#endif |
| } |
| #if defined(OS_WIN) |
| @@ -310,6 +333,100 @@ HANDLE ChildBrokerHost::DuplicateFromChild(HANDLE handle) { |
| DCHECK(result); |
| return rv; |
| } |
| +#else |
| +void ChildBrokerHost::DoRead() { |
| + // Grab as many messages as we can before we would start blocking. |
| + while (1) { |
| + // Try to at least read the size of the message. |
| + if (read_data_.size() < sizeof(uint32_t)) |
| + read_data_.resize(sizeof(uint32_t)); |
| + DCHECK_LT(num_bytes_read_, read_data_.size()); |
| + |
| + std::deque<PlatformHandle> dummy; |
| + ssize_t bytes_read = PlatformChannelRecvmsg( |
| + sync_channel_.get(), &read_data_[num_bytes_read_], |
| + static_cast<int>(read_data_.size() - num_bytes_read_), &dummy); |
| + DCHECK(dummy.empty()); |
| + |
| + // Connection closed. |
| + if (bytes_read == 0 || |
| + (bytes_read == -1 && errno != EAGAIN && errno != EWOULDBLOCK)) { |
| + delete this; |
| + return; |
| + } |
| + |
| + // No more data. |
| + if (bytes_read == -1) |
| + break; |
| + |
| + num_bytes_read_ += bytes_read; |
| + |
| + // We don't know how big the message is yet. |
| + if (num_bytes_read_ < sizeof(uint32_t)) |
| + continue; |
| + |
| + BrokerMessage* message = reinterpret_cast<BrokerMessage*>(&read_data_[0]); |
| + |
| + // Message not done yet. |
| + if (num_bytes_read_ < message->size) { |
| + read_data_.resize(message->size); |
| + continue; |
| + } |
| + |
| + if (num_bytes_read_ >= message->size) { |
| + if (message->id == CREATE_SHARED_BUFFER) { |
| + scoped_refptr<PlatformSharedBuffer> shared_buffer = |
| + internal::g_platform_support->CreateSharedBuffer( |
| + message->num_bytes); |
| + if (shared_buffer) { |
| + write_handles_.push_back( |
| + shared_buffer->PassPlatformHandle().release()); |
| + } else { |
| + LOG(ERROR) |
| + << "ChildBrokerHost failed to create shared buffer of size " |
| + << message->num_bytes; |
| + write_handles_.emplace_back(); |
| + } |
| + |
| + } else { |
| + NOTREACHED() << "Unknown command. Stopping reading."; |
| + delete this; |
| + return; |
| + } |
| + |
| + num_bytes_read_ -= message->size; |
| + } |
| + } |
| + |
| + DoWrite(); |
| +} |
| + |
| +void ChildBrokerHost::DoWrite() { |
| + while (!write_handles_.empty()) { |
| + // Send just one null byte. |
| + struct iovec iov = {const_cast<char*>(""), 1}; |
| + PlatformHandle handle = write_handles_.front(); |
| + ssize_t result = PlatformChannelSendmsgWithHandles(sync_channel_.get(), |
| + &iov, 1, &handle, 1); |
| + if (result == -1) |
| + break; |
| + write_handles_.pop_front(); |
| + } |
| +} |
| + |
| +void ChildBrokerHost::OnFileCanReadWithoutBlocking(int fd) { |
| + DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread()); |
| + if (fd != sync_channel_.get().handle) { |
| + NOTREACHED() << "ChildBrokerHost shouldn't get notifications about file " |
| + "descriptors other than sync_channel_'s"; |
| + delete this; |
| + return; |
| + } |
| + |
| + DoRead(); |
| +} |
| + |
| +void ChildBrokerHost::OnFileCanWriteWithoutBlocking(int fd) {} |
| #endif |
| } // namespace edk |