Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(901)

Unified Diff: mojo/edk/system/raw_channel_win.cc

Issue 1350023003: Add a Mojo EDK for Chrome that uses one OS pipe per message pipe. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 5 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: mojo/edk/system/raw_channel_win.cc
diff --git a/mojo/edk/system/raw_channel_win.cc b/mojo/edk/system/raw_channel_win.cc
new file mode 100644
index 0000000000000000000000000000000000000000..da1d17ca6709f02026d4d623becd6d998261eb3a
--- /dev/null
+++ b/mojo/edk/system/raw_channel_win.cc
@@ -0,0 +1,885 @@
+// Copyright 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "mojo/edk/system/raw_channel.h"
+
+#include <windows.h>
+
+#include "base/bind.h"
+#include "base/lazy_instance.h"
+#include "base/location.h"
+#include "base/logging.h"
+#include "base/memory/scoped_ptr.h"
+#include "base/message_loop/message_loop.h"
+#include "base/process/process.h"
+#include "base/synchronization/lock.h"
+#include "base/win/object_watcher.h"
+#include "base/win/windows_version.h"
+#include "mojo/edk/embedder/platform_handle.h"
+#include "mojo/public/cpp/system/macros.h"
+
+#define STATUS_CANCELLED 0xC0000120
+#define STATUS_PIPE_BROKEN 0xC000014B
+
+// We can't use IO completion ports if we send a message pipe. The reason is
+// that the only way to stop an existing IOCP is by closing the pipe handle.
+// See https://msdn.microsoft.com/en-us/library/windows/hardware/ff545834(v=vs.85).aspx
+bool g_use_iocp = false;
+
+// Manual reset per
+// Doc for overlapped I/O says use manual per
+// https://msdn.microsoft.com/en-us/library/windows/desktop/ms684342(v=vs.85).aspx
+// However using an auto-reset event makes the perf test 5x faster and also
+// works since we don't wait on the event elsewhere or call GetOverlappedResult
+// before it fires.
+bool g_use_autoreset_event = true;
+
+namespace mojo {
+namespace system {
+
+namespace {
+
+class VistaOrHigherFunctions {
+ public:
+ VistaOrHigherFunctions()
+ : is_vista_or_higher_(
+ base::win::GetVersion() >= base::win::VERSION_VISTA),
+ set_file_completion_notification_modes_(nullptr),
+ cancel_io_ex_(nullptr) {
+ if (!is_vista_or_higher_)
+ return;
+
+ HMODULE module = GetModuleHandleW(L"kernel32.dll");
+ set_file_completion_notification_modes_ =
+ reinterpret_cast<SetFileCompletionNotificationModesFunc>(
+ GetProcAddress(module, "SetFileCompletionNotificationModes"));
+ DCHECK(set_file_completion_notification_modes_);
+
+ cancel_io_ex_ =
+ reinterpret_cast<CancelIoExFunc>(GetProcAddress(module, "CancelIoEx"));
+ DCHECK(cancel_io_ex_);
+ }
+
+ bool is_vista_or_higher() const { return is_vista_or_higher_; }
+
+ BOOL SetFileCompletionNotificationModes(HANDLE handle, UCHAR flags) {
+ return set_file_completion_notification_modes_(handle, flags);
+ }
+
+ BOOL CancelIoEx(HANDLE handle, LPOVERLAPPED overlapped) {
+ return cancel_io_ex_(handle, overlapped);
+ }
+
+ private:
+ using SetFileCompletionNotificationModesFunc = BOOL(WINAPI*)(HANDLE, UCHAR);
+ using CancelIoExFunc = BOOL(WINAPI*)(HANDLE, LPOVERLAPPED);
+
+ bool is_vista_or_higher_;
+ SetFileCompletionNotificationModesFunc
+ set_file_completion_notification_modes_;
+ CancelIoExFunc cancel_io_ex_;
+};
+
+base::LazyInstance<VistaOrHigherFunctions> g_vista_or_higher_functions =
+ LAZY_INSTANCE_INITIALIZER;
+
+class RawChannelWin final : public RawChannel {
+ public:
+ RawChannelWin(embedder::ScopedPlatformHandle handle)
+ : handle_(handle.Pass()),
+ io_handler_(nullptr),
+ skip_completion_port_on_success_(
+ g_use_iocp &&
+ g_vista_or_higher_functions.Get().is_vista_or_higher()) {
+ DCHECK(handle_.is_valid());
+ }
+ ~RawChannelWin() override {
+ DCHECK(!io_handler_);
+ }
+
+ private:
+ // RawChannelIOHandler receives OS notifications for I/O completion. It must
+ // be created on the I/O thread.
+ //
+ // It manages its own destruction. Destruction happens on the I/O thread when
+ // all the following conditions are satisfied:
+ // - |DetachFromOwnerNoLock()| has been called;
+ // - there is no pending read;
+ // - there is no pending write.
+ class RawChannelIOHandler : public base::MessageLoopForIO::IOHandler,
+ public base::win::ObjectWatcher::Delegate {
+ public:
+ RawChannelIOHandler(RawChannelWin* owner,
+ embedder::ScopedPlatformHandle handle)
+ : handle_(handle.Pass()),
+ owner_(owner),
+ suppress_self_destruct_(false),
+ pending_read_(false),
+ pending_write_(false),
+ platform_handles_written_(0),
+ pipe_disconnected_(false) {
+ memset(&read_context_.overlapped, 0, sizeof(read_context_.overlapped));
+ memset(&write_context_.overlapped, 0, sizeof(write_context_.overlapped));
+ if (g_use_iocp) {
+ owner_->message_loop_for_io()->RegisterIOHandler(
+ handle_.get().handle, this);
+ read_context_.handler = this;
+ write_context_.handler = this;
+ } else {
+ read_event = CreateEvent(
+ NULL, g_use_autoreset_event ? FALSE : TRUE, FALSE, NULL);
+ write_event = CreateEvent(
+ NULL, g_use_autoreset_event ? FALSE : TRUE, FALSE, NULL);
+ read_context_.overlapped.hEvent = read_event;
+ write_context_.overlapped.hEvent = write_event;
+
+
+ if (g_use_autoreset_event) {
+ read_watcher_.StartWatching(read_event, this, true);
+ write_watcher_.StartWatching(write_event, this, true);
+ }
+ }
+ }
+
+ ~RawChannelIOHandler() override {
+ DCHECK(ShouldSelfDestruct());
+ }
+
+ HANDLE handle() const { return handle_.get().handle; }
+
+ // The following methods are only called by the owner on the I/O thread.
+ bool pending_read() const {
+ DCHECK(owner_);
+ DCHECK_EQ(base::MessageLoop::current(), owner_->message_loop_for_io());
+ return pending_read_;
+ }
+
+ base::MessageLoopForIO::IOContext* read_context() {
+ DCHECK(owner_);
+ DCHECK_EQ(base::MessageLoop::current(), owner_->message_loop_for_io());
+ return &read_context_;
+ }
+
+ // Instructs the object to wait for an |OnIOCompleted()| notification.
+ void OnPendingReadStarted() {
+ DCHECK(owner_);
+ DCHECK_EQ(base::MessageLoop::current(), owner_->message_loop_for_io());
+ DCHECK(!pending_read_);
+ pending_read_ = true;
+ }
+
+ // The following methods are only called by the owner under
+ // |owner_->write_lock()|.
+ bool pending_write_no_lock() const {
+ DCHECK(owner_);
+ owner_->write_lock().AssertAcquired();
+ return pending_write_;
+ }
+
+ base::MessageLoopForIO::IOContext* write_context_no_lock() {
+ DCHECK(owner_);
+ owner_->write_lock().AssertAcquired();
+ return &write_context_;
+ }
+ // Instructs the object to wait for an |OnIOCompleted()| notification.
+ void OnPendingWriteStartedNoLock(size_t platform_handles_written) {
+ DCHECK(owner_);
+ owner_->write_lock().AssertAcquired();
+ DCHECK(!pending_write_);
+ pending_write_ = true;
+ platform_handles_written_ = platform_handles_written;
+ }
+
+ // |base::MessageLoopForIO::IOHandler| implementation:
+ // Must be called on the I/O thread. It could be called before or after
+ // detached from the owner.
+ void OnIOCompleted(base::MessageLoopForIO::IOContext* context,
+ DWORD bytes_transferred,
+ DWORD error) override {
+ DCHECK(owner_);
+ DCHECK(base::MessageLoop::current() == owner_->message_loop_for_io());
+
+ // Suppress self-destruction inside |OnReadCompleted()|, etc. (in case
+ // they result in a call to |Shutdown()|).
+ bool old_suppress_self_destruct = suppress_self_destruct_;
+ suppress_self_destruct_ = true;
+
+ if (context == &read_context_)
+ OnReadCompleted(bytes_transferred, error);
+ else if (context == &write_context_)
+ OnWriteCompleted(bytes_transferred, error);
+ else
+ NOTREACHED();
+
+ // Maybe allow self-destruction again.
+ suppress_self_destruct_ = old_suppress_self_destruct;
+
+ if (ShouldSelfDestruct())
+ delete this;
+ }
+
+ // Must be called on the I/O thread under |owner_->write_lock()|.
+ // After this call, the owner must not make any further calls on this
+ // object, and therefore the object is used on the I/O thread exclusively
+ // (if it stays alive).
+ void DetachFromOwnerNoLock(scoped_ptr<ReadBuffer> read_buffer,
+ scoped_ptr<WriteBuffer> write_buffer) {
+ DCHECK(owner_);
+ DCHECK_EQ(base::MessageLoop::current(), owner_->message_loop_for_io());
+ //owner_->write_lock().AssertAcquired();
+
+ // If read/write is pending, we have to retain the corresponding buffer.
+ if (pending_read_)
+ preserved_read_buffer_after_detach_ = read_buffer.Pass();
+ if (pending_write_)
+ preserved_write_buffer_after_detach_ = write_buffer.Pass();
+
+ owner_ = nullptr;
+ read_watcher_.StopWatching();
+ write_watcher_.StopWatching();
+ bool read_signalled =
+ WaitForSingleObjectEx(read_event, 0, true) != WAIT_TIMEOUT;
+ bool write_signalled =
+ WaitForSingleObjectEx(write_event, 0, true) != WAIT_TIMEOUT;
+
+ if (read_signalled) {
+ if (read_context_.overlapped.Internal == STATUS_PIPE_BROKEN) {
+ pipe_disconnected_ = true;
+ } else if (read_context_.overlapped.Internal == STATUS_CANCELLED) {
+ // fine condition
+ } else {
+ NOTREACHED() << "TODO(jam)";
+ }
+ }
+
+ // Otherwise we will never quit since both endpoints of the channel could
+ // be waiting for their Read calls.
+ pending_read_ = false;
+
+ if (write_signalled) {
+ NOTREACHED() << "TODO(jam)";
+ }
+
+ if (ShouldSelfDestruct())
+ delete this;
+ }
+
+ embedder::ScopedPlatformHandle ReleaseHandle(
+ std::vector<char>* read_buffer) {
+ // TODO(jam): handle XP
+ CancelIoEx(handle(), NULL);
+ // NOTE!!!!
+ // The above call will cancel pending IO calls.
+ // HOWEVER, some could have already finished and posted task to IO thread
+ // that will execute
+
+
+ size_t read_buffer_byte_size = owner_->read_buffer()->num_valid_bytes();
+
+ if (pending_read_) {
+ DWORD bytes_read_dword = 0;
+
+ DWORD old_bytes = read_context_.overlapped.InternalHigh;
+
+ //TODO(jam): for XP, can return TRUE here to wait. also below.
+ BOOL rv = GetOverlappedResult(
+ handle(), &read_context_.overlapped, &bytes_read_dword, FALSE);
+ DCHECK_EQ(old_bytes, bytes_read_dword);
+ if (rv) {
+ if (read_context_.overlapped.Internal != STATUS_CANCELLED) {
+ read_buffer_byte_size += read_context_.overlapped.InternalHigh;
+ }
+ }
+ pending_read_ = false;
+ }
+
+ RawChannel::WriteBuffer* write_buffer = owner_->write_buffer_no_lock();
+
+ if (pending_write_) {
+ DWORD bytes_written_dword = 0;
+ DWORD old_bytes = write_context_.overlapped.InternalHigh;
+
+
+ BOOL rv = GetOverlappedResult(
+ handle(), &write_context_.overlapped, &bytes_written_dword, FALSE);
+
+ if (old_bytes != bytes_written_dword) {
+ NOTREACHED();
+ }
+
+ if (rv) {
+ if (write_context_.overlapped.Internal != STATUS_CANCELLED) {
+ CHECK(write_buffer->queue_size() != 0);
+
+ // TODO(jam)
+ DCHECK(!write_buffer->HavePlatformHandlesToSend());
+
+ write_buffer->data_offset_ += bytes_written_dword;
+
+ // TODO(jam): copied from OnWriteCompletedNoLock
+ MessageInTransit* message =
+ write_buffer->message_queue()->PeekMessage();
+ if (write_buffer->data_offset_ >= message->total_size()) {
+ // Complete write.
+ CHECK_EQ(write_buffer->data_offset_, message->total_size());
+ write_buffer->message_queue_.DiscardMessage();
+ write_buffer->platform_handles_offset_ = 0;
+ write_buffer->data_offset_ = 0;
+ }
+
+
+ //TODO(jam): handle more write msgs
+ DCHECK(write_buffer->message_queue_.IsEmpty());
+ }
+ }
+ pending_write_ = false;
+ }
+
+ if (read_buffer_byte_size) {
+ read_buffer->resize(read_buffer_byte_size);
+ memcpy(&(*read_buffer)[0], owner_->read_buffer()->buffer(),
+ read_buffer_byte_size);
+ owner_->read_buffer()->Reset();
+ }
+
+ return embedder::ScopedPlatformHandle(handle_.release());
+ }
+
+ void OnObjectSignaled(HANDLE object) override {
+ // TODO(jam): need to call out to owner?
+
+
+ //not needed per doc, since reset by readfile & writefile
+ //ResetEvent(object);
+ if (object == read_event) {
+ if (read_context_.overlapped.Internal == STATUS_CANCELLED)
+ return;
+
+ {
+ // TODO(jam): improve this locking
+ base::AutoLock locker(owner_->read_lock());
+ if (!handle_.is_valid()) {
+
+
+
+
+ // TODO(jam): have way to cancel objectwatcher posttask so that we
+ // can have a dcheck here and catch any erronous conditions. right
+ // now this fires for non-problems.
+ return;
+
+
+
+ }
+ // this isn't covered with above lock
+ // DCHECK(!owner_->debug_started_sending());
+ DCHECK(handle_.is_valid());
+ }
+ OnIOCompleted(&read_context_, read_context_.overlapped.InternalHigh,
+ read_context_.overlapped.Internal);
+
+ } else {
+ if (write_context_.overlapped.Internal == STATUS_CANCELLED)
+ return;
+ DCHECK(!owner_->debug_started_sending());
+ DCHECK(handle_.is_valid());
+ CHECK(object == write_event);
+ OnIOCompleted(&write_context_, write_context_.overlapped.InternalHigh,
yzshen1 2015/09/23 22:47:09 Since write could happen on any thread, and we sta
+ write_context_.overlapped.Internal);
+ }
+ }
+ HANDLE read_event, write_event;
+ base::win::ObjectWatcher read_watcher_, write_watcher_;
+
+ bool pipe_disconnected() { return pipe_disconnected_; }
+
+ void QuitNow() {
+ pipe_disconnected_ = true;
+ owner_ = nullptr;
+ base::MessageLoop::current()->DeleteSoon(FROM_HERE, this);
+ }
+
+ private:
+ // Returns true if |owner_| has been reset and there is not pending read or
+ // write.
+ // Must be called on the I/O thread.
+ bool ShouldSelfDestruct() const {
+ if (owner_ || suppress_self_destruct_)
+ return false;
+
+ if (pipe_disconnected_)
+ return true;
+
+ // Note: Detached, hence no lock needed for |pending_write_|.
+ return !pending_read_ && !pending_write_;
+ }
+
+ // Must be called on the I/O thread. It may be called before or after
+ // detaching from the owner.
+ void OnReadCompleted(DWORD bytes_read, DWORD error) {
+ DCHECK(owner_);
+ DCHECK(base::MessageLoop::current() == owner_->message_loop_for_io());
+ DCHECK(suppress_self_destruct_);
+
+ if (g_use_autoreset_event && !pending_read_)
+ return;
+
+ CHECK(pending_read_);
+ pending_read_ = false;
+ if (!owner_)
+ return;
+
+ // Note: |OnReadCompleted()| may detach us from |owner_|.
+ if (error == ERROR_SUCCESS) {
+ DCHECK_GT(bytes_read, 0u);
+ owner_->OnReadCompleted(IO_SUCCEEDED, bytes_read);
+ } else if (error == ERROR_BROKEN_PIPE ||
+ (g_use_autoreset_event && error == STATUS_PIPE_BROKEN)) {
+ DCHECK_EQ(bytes_read, 0u);
+ pipe_disconnected_ = true;
+ owner_->OnReadCompleted(IO_FAILED_SHUTDOWN, 0);
+ } else if (error == ERROR_NO_MORE_ITEMS && g_use_autoreset_event) {
+ return owner_->OnReadCompleted(IO_SUCCEEDED, bytes_read);
+ } else {
+ DCHECK_EQ(bytes_read, 0u);
+ LOG(WARNING) << "ReadFile: " << logging::SystemErrorCodeToString(error);
+ owner_->OnReadCompleted(IO_FAILED_UNKNOWN, 0);
+ }
+ }
+
+ // Must be called on the I/O thread. It may be called before or after
+ // detaching from the owner.
+ void OnWriteCompleted(DWORD bytes_written, DWORD error) {
+ DCHECK(base::MessageLoop::current() == owner_->message_loop_for_io());
+ DCHECK(suppress_self_destruct_);
+
+ if (!owner_) {
+ // No lock needed.
+ CHECK(pending_write_);
+ pending_write_ = false;
+ return;
+ }
+
+ {
+ base::AutoLock locker(owner_->write_lock());
+ if (g_use_autoreset_event && !pending_write_)
+ return;
+
+ CHECK(pending_write_);
+ pending_write_ = false;
+ }
+
+ // Note: |OnWriteCompleted()| may detach us from |owner_|.
+ if (error == ERROR_SUCCESS) {
+ // Reset |platform_handles_written_| before calling |OnWriteCompleted()|
+ // since that function may call back to this class and set it again.
+ size_t local_platform_handles_written_ = platform_handles_written_;
+ platform_handles_written_ = 0;
+ owner_->OnWriteCompleted(IO_SUCCEEDED, local_platform_handles_written_,
+ bytes_written);
+ } else if (error == ERROR_BROKEN_PIPE ||
+ (g_use_autoreset_event && error ==STATUS_PIPE_BROKEN)) {
yzshen1 2015/09/23 22:47:09 It seems nice to unify the error code used.
+ pipe_disconnected_ = true;
+ owner_->OnWriteCompleted(IO_FAILED_SHUTDOWN, 0, 0);
+ } else if (error == ERROR_NO_MORE_ITEMS && g_use_autoreset_event) {
+ size_t local_platform_handles_written_ = platform_handles_written_;
+ platform_handles_written_ = 0;
+ owner_->OnWriteCompleted(IO_SUCCEEDED, local_platform_handles_written_,
+ bytes_written);
+ } else {
+ LOG(WARNING) << "WriteFile: "
+ << logging::SystemErrorCodeToString(error);
+ owner_->OnWriteCompleted(IO_FAILED_UNKNOWN, 0, 0);
+ }
+ }
+
+ embedder::ScopedPlatformHandle handle_;
+
+ // |owner_| is reset on the I/O thread under |owner_->write_lock()|.
+ // Therefore, it may be used on any thread under lock; or on the I/O thread
+ // without locking.
+ RawChannelWin* owner_;
+
+ // The following members must be used on the I/O thread.
+ scoped_ptr<ReadBuffer> preserved_read_buffer_after_detach_;
+ scoped_ptr<WriteBuffer> preserved_write_buffer_after_detach_;
+ bool suppress_self_destruct_;
+
+ bool pending_read_;
+ base::MessageLoopForIO::IOContext read_context_;
+
+ // The following members must be used under |owner_->write_lock()| while the
+ // object is still attached to the owner, and only on the I/O thread
+ // afterwards.
+ bool pending_write_;
+ size_t platform_handles_written_;
+ base::MessageLoopForIO::IOContext write_context_;
+
+ bool pipe_disconnected_;
+
+ MOJO_DISALLOW_COPY_AND_ASSIGN(RawChannelIOHandler);
+ };
+
+ embedder::ScopedPlatformHandle ReleaseHandleNoLock(
+ std::vector<char>* read_buffer_out) override {
+ std::vector<WriteBuffer::Buffer> buffers;
+ write_buffer_no_lock()->GetBuffers(&buffers);
+ if (!buffers.empty()) {
+ // TODO(jam): copy code in OnShutdownNoLock
+ NOTREACHED() << "releasing handle with pending write buffer";
+ }
+
+
+ if( handle_.is_valid()) {
+ // SetInitialBuffer could have been called on main thread before OnInit
+ // is called on Io thread. and in meantime releasehandle called.
+ //DCHECK(read_buffer()->num_valid_bytes() == 0);
+ if (read_buffer()->num_valid_bytes()) {
+ read_buffer_out->resize(read_buffer()->num_valid_bytes());
+ memcpy(&(*read_buffer_out)[0], read_buffer()->buffer(),
+ read_buffer()->num_valid_bytes());
+ read_buffer()->Reset();
+ }
+ DCHECK(write_buffer_no_lock()->queue_size() == 0);
+ return embedder::ScopedPlatformHandle(
+ embedder::PlatformHandle(handle_.release().handle));
+ }
+
+ return io_handler_->ReleaseHandle(read_buffer_out);
+ }
+ embedder::PlatformHandle HandleForDebuggingNoLock() override {
+ if (handle_.is_valid())
+ return handle_.get();
+
+ if (!io_handler_)
+ return embedder::PlatformHandle();
+
+ return embedder::PlatformHandle(io_handler_->handle());
+ }
+
+ IOResult Read(size_t* bytes_read) override {
+ DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
+
+ char* buffer = nullptr;
+ size_t bytes_to_read = 0;
+ read_buffer()->GetBuffer(&buffer, &bytes_to_read);
+
+ DCHECK(io_handler_);
+ DCHECK(!io_handler_->pending_read());
+ BOOL result = ReadFile(
+ io_handler_->handle(), buffer, static_cast<DWORD>(bytes_to_read),
+ nullptr, &io_handler_->read_context()->overlapped);
+ if (!result) {
+ DWORD error = GetLastError();
+ if (error == ERROR_BROKEN_PIPE)
+ return IO_FAILED_SHUTDOWN;
+ if (error != ERROR_IO_PENDING) {
+ LOG(WARNING) << "ReadFile: " << logging::SystemErrorCodeToString(error);
+ return IO_FAILED_UNKNOWN;
+ }
+ }
+
+ if (result && skip_completion_port_on_success_) {
+ DWORD bytes_read_dword = 0;
+ BOOL get_size_result = GetOverlappedResult(
+ io_handler_->handle(), &io_handler_->read_context()->overlapped,
+ &bytes_read_dword, FALSE);
+ DPCHECK(get_size_result);
+ *bytes_read = bytes_read_dword;
+ return IO_SUCCEEDED;
+ }
+
+ if (!g_use_autoreset_event) {
+ if (!g_use_iocp) {
+ io_handler_->read_watcher_.StartWatching(
+ io_handler_->read_event, io_handler_, false);
+ }
+ }
+ // If the read is pending or the read has succeeded but we don't skip
+ // completion port on success, instruct |io_handler_| to wait for the
+ // completion packet.
+ //
+ // TODO(yzshen): It seems there isn't document saying that all error cases
+ // (other than ERROR_IO_PENDING) are guaranteed to *not* queue a completion
+ // packet. If we do get one for errors,
+ // |RawChannelIOHandler::OnIOCompleted()| will crash so we will learn about
+ // it.
+
+ io_handler_->OnPendingReadStarted();
+ return IO_PENDING;
+ }
+
+ IOResult ScheduleRead() override {
+ if (!io_handler_)
+ return IO_PENDING; // OnInit could have earlied out.
+
+ DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
+ DCHECK(io_handler_);
+ DCHECK(!io_handler_->pending_read());
+
+ size_t bytes_read = 0;
+ IOResult io_result = Read(&bytes_read);
+ if (io_result == IO_SUCCEEDED) {
+ DCHECK(skip_completion_port_on_success_);
+
+ // We have finished reading successfully. Queue a notification manually.
+ io_handler_->OnPendingReadStarted();
+ // |io_handler_| won't go away before the task is run, so it is safe to
+ // use |base::Unretained()|.
+ message_loop_for_io()->PostTask(
+ FROM_HERE, base::Bind(&RawChannelIOHandler::OnIOCompleted,
+ base::Unretained(io_handler_),
+ base::Unretained(io_handler_->read_context()),
+ static_cast<DWORD>(bytes_read), ERROR_SUCCESS));
+ return IO_PENDING;
+ }
+
+ return io_result;
+ }
+ embedder::ScopedPlatformHandleVectorPtr GetReadPlatformHandles(
+ size_t num_platform_handles,
+ const void* platform_handle_table) override {
+ // TODO(jam): this code will have to be updated once it's used in a sandbox
+ // and the receiving process doesn't have duplicate permission for the
+ // receiver. Once there's a broker and we have a connection to it (possibly
+ // through ConnectionManager), then we can make a sync IPC to it here to get
+ // a token for this handle, and it will duplicate the handle to is process.
+ // Then we pass the token to the receiver, which will then make a sync call
+ // to the broker to get a duplicated handle. This will also allow us to
+ // avoid leaks of the handle if the receiver dies, since the broker can
+ // notice that.
+ DCHECK_GT(num_platform_handles, 0u);
+ embedder::ScopedPlatformHandleVectorPtr rv(
+ new embedder::PlatformHandleVector());
+
+ const char* serialization_data =
+ static_cast<const char*>(platform_handle_table);
+ for (size_t i = 0; i < num_platform_handles; i++) {
+ DWORD pid = *reinterpret_cast<const DWORD*>(serialization_data);
+ serialization_data += sizeof(DWORD);
+ HANDLE source_handle =
+ *reinterpret_cast<const HANDLE*>(serialization_data);
+ serialization_data += sizeof(HANDLE);
+ base::Process sender =
+ base::Process::OpenWithAccess(pid, PROCESS_DUP_HANDLE);
+ DCHECK(sender.IsValid());
+ HANDLE target_handle = NULL;
+ BOOL dup_result = DuplicateHandle(
+ sender.Handle(), source_handle,
+ base::GetCurrentProcessHandle(), &target_handle, 0,
+ FALSE, DUPLICATE_SAME_ACCESS | DUPLICATE_CLOSE_SOURCE);
+ DCHECK(dup_result);
+ rv->push_back(embedder::PlatformHandle(target_handle));
+ }
+ return rv.Pass();
+ }
+
+ IOResult WriteNoLock(size_t* platform_handles_written,
+ size_t* bytes_written) override {
+ write_lock().AssertAcquired();
+
+ DCHECK(io_handler_);
+ DCHECK(!io_handler_->pending_write_no_lock());
+
+ size_t num_platform_handles = 0;
+ if (write_buffer_no_lock()->HavePlatformHandlesToSend()) {
+ // Since we're not sure which process might ultimately deserialize this
+ // message, we can't duplicate the handle now. Instead, write the process
+ // ID and handle now and let the receiver duplicate it.
+ embedder::PlatformHandle* platform_handles;
+ void* serialization_data_temp;
+ write_buffer_no_lock()->GetPlatformHandlesToSend(
+ &num_platform_handles, &platform_handles, &serialization_data_temp);
+ char* serialization_data = static_cast<char*>(serialization_data_temp);
+ DCHECK_GT(num_platform_handles, 0u);
+ DCHECK(platform_handles);
+
+ DWORD current_process_id = base::GetCurrentProcId();
+ for (size_t i = 0; i < num_platform_handles; i++) {
+ *reinterpret_cast<DWORD*>(serialization_data) = current_process_id;
+ serialization_data += sizeof(DWORD);
+ *reinterpret_cast<HANDLE*>(serialization_data) =
+ platform_handles[i].handle;
+ serialization_data += sizeof(HANDLE);
+ platform_handles[i] = embedder::PlatformHandle();
+ }
+ }
+
+ std::vector<WriteBuffer::Buffer> buffers;
+ write_buffer_no_lock()->GetBuffers(&buffers);
+ DCHECK(!buffers.empty());
+
+ // TODO(yzshen): Handle multi-segment writes more efficiently.
+ DWORD bytes_written_dword = 0;
+
+
+
+
+ // TODO(jam): right now we get in bad situation where we might first write
yzshen1 2015/09/23 22:47:09 I haven't understood: I thought we synchronously f
+ // the main buffer and then the MP gets sent before we write the transport
+ // buffer. We can fix this by sending information about partially written
+ // messages, or by teaching transport buffer how to grow the main buffer and
+ // write its data there.
+ // Until that's done, for now make another copy.
+
+ size_t total_size = buffers[0].size;
+ if (buffers.size() > 1)
+ total_size+=buffers[1].size;
+ char* buf = new char[total_size];
+ memcpy(buf, buffers[0].addr, buffers[0].size);
+ if (buffers.size() > 1)
+ memcpy(buf + buffers[0].size, buffers[1].addr, buffers[1].size);
+
+ BOOL result = WriteFile(
+ io_handler_->handle(), buf,
+ static_cast<DWORD>(total_size),
+ &bytes_written_dword,
+ &io_handler_->write_context_no_lock()->overlapped);
+ delete [] buf;
yzshen1 2015/09/23 22:47:09 I think we need to keep the buffer valid until the
+
+ if (!result) {
+ DWORD error = GetLastError();
+ if (error == ERROR_BROKEN_PIPE)
+ return IO_FAILED_SHUTDOWN;
+ if (error != ERROR_IO_PENDING) {
+ LOG(WARNING) << "WriteFile: "
+ << logging::SystemErrorCodeToString(error);
+ return IO_FAILED_UNKNOWN;
+ }
+ }
+
+ if (result && skip_completion_port_on_success_) {
+ *platform_handles_written = num_platform_handles;
+ *bytes_written = bytes_written_dword;
+ return IO_SUCCEEDED;
+ }
+
+ if (!g_use_autoreset_event) {
+ if (!g_use_iocp) {
+ io_handler_->write_watcher_.StartWatching(
+ io_handler_->write_event, io_handler_, false);
+ }
+ }
+ // If the write is pending or the write has succeeded but we don't skip
+ // completion port on success, instruct |io_handler_| to wait for the
+ // completion packet.
+ //
+ // TODO(yzshen): it seems there isn't document saying that all error cases
+ // (other than ERROR_IO_PENDING) are guaranteed to *not* queue a completion
+ // packet. If we do get one for errors,
+ // |RawChannelIOHandler::OnIOCompleted()| will crash so we will learn about
+ // it.
+
+ io_handler_->OnPendingWriteStartedNoLock(num_platform_handles);
+ return IO_PENDING;
+ }
+
+ IOResult ScheduleWriteNoLock() override {
+ write_lock().AssertAcquired();
+
+ DCHECK(io_handler_);
+ DCHECK(!io_handler_->pending_write_no_lock());
+
+ size_t platform_handles_written = 0;
+ size_t bytes_written = 0;
+ IOResult io_result = WriteNoLock(&platform_handles_written, &bytes_written);
+ if (io_result == IO_SUCCEEDED) {
+ DCHECK(skip_completion_port_on_success_);
+
+ // We have finished writing successfully. Queue a notification manually.
+ io_handler_->OnPendingWriteStartedNoLock(platform_handles_written);
+ // |io_handler_| won't go away before that task is run, so it is safe to
+ // use |base::Unretained()|.
+ message_loop_for_io()->PostTask(
+ FROM_HERE,
+ base::Bind(&RawChannelIOHandler::OnIOCompleted,
+ base::Unretained(io_handler_),
+ base::Unretained(io_handler_->write_context_no_lock()),
+ static_cast<DWORD>(bytes_written), ERROR_SUCCESS));
+ return IO_PENDING;
+ }
+
+ return io_result;
+ }
+
+ void OnInit() override {
+ DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
+
+ if (!handle_.is_valid()) {
+ LOG(ERROR) << "Note: RawChannelWin " << this
+ << " early exiting in OnInit because no handle";
+ return;
+ }
+
+ DCHECK(handle_.is_valid());
+ if (skip_completion_port_on_success_) {
+ // I don't know how this can fail (unless |handle_| is bad, in which case
+ // it's a bug in our code).
+ CHECK(g_vista_or_higher_functions.Get().
+ SetFileCompletionNotificationModes(
+ handle_.get().handle, FILE_SKIP_COMPLETION_PORT_ON_SUCCESS));
+ }
+
+ DCHECK(!io_handler_);
+ io_handler_ = new RawChannelIOHandler(this, handle_.Pass());
+ }
+
+ void OnShutdownNoLock(scoped_ptr<ReadBuffer> read_buffer,
+ scoped_ptr<WriteBuffer> write_buffer) override {
+ // happens on shutdown if didn't call init when doing createduplicate
+ if (message_loop_for_io()) {
+ DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
+ }
+
+ if (!io_handler_) {
+ // This is hit when creating a duplicate dispatcher since we don't call
yzshen1 2015/09/23 22:47:09 Before this CL, Init() is restricted to be called
+ // Init on it.
+ DCHECK_EQ(read_buffer->num_valid_bytes(), 0U);
+ DCHECK_EQ(write_buffer->queue_size(), 0U);
+ return;
+ }
+
+ if (io_handler_->pending_read() || io_handler_->pending_write_no_lock()) {
+ // |io_handler_| will be alive until pending read/write (if any)
+ // completes. Call |CancelIoEx()| or |CancelIo()| so that resources can be
+ // freed up as soon as possible.
+ // Note: |CancelIo()| only cancels read/write requests started from this
+ // thread.
+ if (g_vista_or_higher_functions.Get().is_vista_or_higher()) {
+ g_vista_or_higher_functions.Get().CancelIoEx(io_handler_->handle(),
+ nullptr);
+ } else {
+ CancelIo(io_handler_->handle());
+ }
+ }
+
+ io_handler_->DetachFromOwnerNoLock(read_buffer.Pass(), write_buffer.Pass());
+ io_handler_ = nullptr;
+ }
+
+ // Passed to |io_handler_| during initialization.
+ embedder::ScopedPlatformHandle handle_;
+
+ RawChannelIOHandler* io_handler_;
+
+ const bool skip_completion_port_on_success_;
+
+ MOJO_DISALLOW_COPY_AND_ASSIGN(RawChannelWin);
+};
+
+
+} // namespace
+
+// -----------------------------------------------------------------------------
+
+RawChannel* RawChannel::Create(embedder::ScopedPlatformHandle handle) {
+ return new RawChannelWin(handle.Pass());
+}
+
+size_t RawChannel::GetSerializedPlatformHandleSize() {
+ return sizeof(DWORD) + sizeof(HANDLE);
+}
+
+} // namespace system
+} // namespace mojo

Powered by Google App Engine
This is Rietveld 408576698