Chromium Code Reviews| Index: mojo/edk/system/raw_channel_win.cc |
| diff --git a/mojo/edk/system/raw_channel_win.cc b/mojo/edk/system/raw_channel_win.cc |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..da1d17ca6709f02026d4d623becd6d998261eb3a |
| --- /dev/null |
| +++ b/mojo/edk/system/raw_channel_win.cc |
| @@ -0,0 +1,885 @@ |
| +// Copyright 2013 The Chromium Authors. All rights reserved. |
| +// Use of this source code is governed by a BSD-style license that can be |
| +// found in the LICENSE file. |
| + |
| +#include "mojo/edk/system/raw_channel.h" |
| + |
| +#include <windows.h> |
| + |
| +#include "base/bind.h" |
| +#include "base/lazy_instance.h" |
| +#include "base/location.h" |
| +#include "base/logging.h" |
| +#include "base/memory/scoped_ptr.h" |
| +#include "base/message_loop/message_loop.h" |
| +#include "base/process/process.h" |
| +#include "base/synchronization/lock.h" |
| +#include "base/win/object_watcher.h" |
| +#include "base/win/windows_version.h" |
| +#include "mojo/edk/embedder/platform_handle.h" |
| +#include "mojo/public/cpp/system/macros.h" |
| + |
| +#define STATUS_CANCELLED 0xC0000120 |
| +#define STATUS_PIPE_BROKEN 0xC000014B |
| + |
| +// We can't use IO completion ports if we send a message pipe. The reason is |
| +// that the only way to stop an existing IOCP is by closing the pipe handle. |
| +// See https://msdn.microsoft.com/en-us/library/windows/hardware/ff545834(v=vs.85).aspx |
| +bool g_use_iocp = false; |
| + |
| +// Manual reset per |
| +// Doc for overlapped I/O says use manual per |
| +// https://msdn.microsoft.com/en-us/library/windows/desktop/ms684342(v=vs.85).aspx |
| +// However using an auto-reset event makes the perf test 5x faster and also |
| +// works since we don't wait on the event elsewhere or call GetOverlappedResult |
| +// before it fires. |
| +bool g_use_autoreset_event = true; |
| + |
| +namespace mojo { |
| +namespace system { |
| + |
| +namespace { |
| + |
| +class VistaOrHigherFunctions { |
| + public: |
| + VistaOrHigherFunctions() |
| + : is_vista_or_higher_( |
| + base::win::GetVersion() >= base::win::VERSION_VISTA), |
| + set_file_completion_notification_modes_(nullptr), |
| + cancel_io_ex_(nullptr) { |
| + if (!is_vista_or_higher_) |
| + return; |
| + |
| + HMODULE module = GetModuleHandleW(L"kernel32.dll"); |
| + set_file_completion_notification_modes_ = |
| + reinterpret_cast<SetFileCompletionNotificationModesFunc>( |
| + GetProcAddress(module, "SetFileCompletionNotificationModes")); |
| + DCHECK(set_file_completion_notification_modes_); |
| + |
| + cancel_io_ex_ = |
| + reinterpret_cast<CancelIoExFunc>(GetProcAddress(module, "CancelIoEx")); |
| + DCHECK(cancel_io_ex_); |
| + } |
| + |
| + bool is_vista_or_higher() const { return is_vista_or_higher_; } |
| + |
| + BOOL SetFileCompletionNotificationModes(HANDLE handle, UCHAR flags) { |
| + return set_file_completion_notification_modes_(handle, flags); |
| + } |
| + |
| + BOOL CancelIoEx(HANDLE handle, LPOVERLAPPED overlapped) { |
| + return cancel_io_ex_(handle, overlapped); |
| + } |
| + |
| + private: |
| + using SetFileCompletionNotificationModesFunc = BOOL(WINAPI*)(HANDLE, UCHAR); |
| + using CancelIoExFunc = BOOL(WINAPI*)(HANDLE, LPOVERLAPPED); |
| + |
| + bool is_vista_or_higher_; |
| + SetFileCompletionNotificationModesFunc |
| + set_file_completion_notification_modes_; |
| + CancelIoExFunc cancel_io_ex_; |
| +}; |
| + |
| +base::LazyInstance<VistaOrHigherFunctions> g_vista_or_higher_functions = |
| + LAZY_INSTANCE_INITIALIZER; |
| + |
| +class RawChannelWin final : public RawChannel { |
| + public: |
| + RawChannelWin(embedder::ScopedPlatformHandle handle) |
| + : handle_(handle.Pass()), |
| + io_handler_(nullptr), |
| + skip_completion_port_on_success_( |
| + g_use_iocp && |
| + g_vista_or_higher_functions.Get().is_vista_or_higher()) { |
| + DCHECK(handle_.is_valid()); |
| + } |
| + ~RawChannelWin() override { |
| + DCHECK(!io_handler_); |
| + } |
| + |
| + private: |
| + // RawChannelIOHandler receives OS notifications for I/O completion. It must |
| + // be created on the I/O thread. |
| + // |
| + // It manages its own destruction. Destruction happens on the I/O thread when |
| + // all the following conditions are satisfied: |
| + // - |DetachFromOwnerNoLock()| has been called; |
| + // - there is no pending read; |
| + // - there is no pending write. |
| + class RawChannelIOHandler : public base::MessageLoopForIO::IOHandler, |
| + public base::win::ObjectWatcher::Delegate { |
| + public: |
| + RawChannelIOHandler(RawChannelWin* owner, |
| + embedder::ScopedPlatformHandle handle) |
| + : handle_(handle.Pass()), |
| + owner_(owner), |
| + suppress_self_destruct_(false), |
| + pending_read_(false), |
| + pending_write_(false), |
| + platform_handles_written_(0), |
| + pipe_disconnected_(false) { |
| + memset(&read_context_.overlapped, 0, sizeof(read_context_.overlapped)); |
| + memset(&write_context_.overlapped, 0, sizeof(write_context_.overlapped)); |
| + if (g_use_iocp) { |
| + owner_->message_loop_for_io()->RegisterIOHandler( |
| + handle_.get().handle, this); |
| + read_context_.handler = this; |
| + write_context_.handler = this; |
| + } else { |
| + read_event = CreateEvent( |
| + NULL, g_use_autoreset_event ? FALSE : TRUE, FALSE, NULL); |
| + write_event = CreateEvent( |
| + NULL, g_use_autoreset_event ? FALSE : TRUE, FALSE, NULL); |
| + read_context_.overlapped.hEvent = read_event; |
| + write_context_.overlapped.hEvent = write_event; |
| + |
| + |
| + if (g_use_autoreset_event) { |
| + read_watcher_.StartWatching(read_event, this, true); |
| + write_watcher_.StartWatching(write_event, this, true); |
| + } |
| + } |
| + } |
| + |
| + ~RawChannelIOHandler() override { |
| + DCHECK(ShouldSelfDestruct()); |
| + } |
| + |
| + HANDLE handle() const { return handle_.get().handle; } |
| + |
| + // The following methods are only called by the owner on the I/O thread. |
| + bool pending_read() const { |
| + DCHECK(owner_); |
| + DCHECK_EQ(base::MessageLoop::current(), owner_->message_loop_for_io()); |
| + return pending_read_; |
| + } |
| + |
| + base::MessageLoopForIO::IOContext* read_context() { |
| + DCHECK(owner_); |
| + DCHECK_EQ(base::MessageLoop::current(), owner_->message_loop_for_io()); |
| + return &read_context_; |
| + } |
| + |
| + // Instructs the object to wait for an |OnIOCompleted()| notification. |
| + void OnPendingReadStarted() { |
| + DCHECK(owner_); |
| + DCHECK_EQ(base::MessageLoop::current(), owner_->message_loop_for_io()); |
| + DCHECK(!pending_read_); |
| + pending_read_ = true; |
| + } |
| + |
| + // The following methods are only called by the owner under |
| + // |owner_->write_lock()|. |
| + bool pending_write_no_lock() const { |
| + DCHECK(owner_); |
| + owner_->write_lock().AssertAcquired(); |
| + return pending_write_; |
| + } |
| + |
| + base::MessageLoopForIO::IOContext* write_context_no_lock() { |
| + DCHECK(owner_); |
| + owner_->write_lock().AssertAcquired(); |
| + return &write_context_; |
| + } |
| + // Instructs the object to wait for an |OnIOCompleted()| notification. |
| + void OnPendingWriteStartedNoLock(size_t platform_handles_written) { |
| + DCHECK(owner_); |
| + owner_->write_lock().AssertAcquired(); |
| + DCHECK(!pending_write_); |
| + pending_write_ = true; |
| + platform_handles_written_ = platform_handles_written; |
| + } |
| + |
| + // |base::MessageLoopForIO::IOHandler| implementation: |
| + // Must be called on the I/O thread. It could be called before or after |
| + // detached from the owner. |
| + void OnIOCompleted(base::MessageLoopForIO::IOContext* context, |
| + DWORD bytes_transferred, |
| + DWORD error) override { |
| + DCHECK(owner_); |
| + DCHECK(base::MessageLoop::current() == owner_->message_loop_for_io()); |
| + |
| + // Suppress self-destruction inside |OnReadCompleted()|, etc. (in case |
| + // they result in a call to |Shutdown()|). |
| + bool old_suppress_self_destruct = suppress_self_destruct_; |
| + suppress_self_destruct_ = true; |
| + |
| + if (context == &read_context_) |
| + OnReadCompleted(bytes_transferred, error); |
| + else if (context == &write_context_) |
| + OnWriteCompleted(bytes_transferred, error); |
| + else |
| + NOTREACHED(); |
| + |
| + // Maybe allow self-destruction again. |
| + suppress_self_destruct_ = old_suppress_self_destruct; |
| + |
| + if (ShouldSelfDestruct()) |
| + delete this; |
| + } |
| + |
| + // Must be called on the I/O thread under |owner_->write_lock()|. |
| + // After this call, the owner must not make any further calls on this |
| + // object, and therefore the object is used on the I/O thread exclusively |
| + // (if it stays alive). |
| + void DetachFromOwnerNoLock(scoped_ptr<ReadBuffer> read_buffer, |
| + scoped_ptr<WriteBuffer> write_buffer) { |
| + DCHECK(owner_); |
| + DCHECK_EQ(base::MessageLoop::current(), owner_->message_loop_for_io()); |
| + //owner_->write_lock().AssertAcquired(); |
| + |
| + // If read/write is pending, we have to retain the corresponding buffer. |
| + if (pending_read_) |
| + preserved_read_buffer_after_detach_ = read_buffer.Pass(); |
| + if (pending_write_) |
| + preserved_write_buffer_after_detach_ = write_buffer.Pass(); |
| + |
| + owner_ = nullptr; |
| + read_watcher_.StopWatching(); |
| + write_watcher_.StopWatching(); |
| + bool read_signalled = |
| + WaitForSingleObjectEx(read_event, 0, true) != WAIT_TIMEOUT; |
| + bool write_signalled = |
| + WaitForSingleObjectEx(write_event, 0, true) != WAIT_TIMEOUT; |
| + |
| + if (read_signalled) { |
| + if (read_context_.overlapped.Internal == STATUS_PIPE_BROKEN) { |
| + pipe_disconnected_ = true; |
| + } else if (read_context_.overlapped.Internal == STATUS_CANCELLED) { |
| + // fine condition |
| + } else { |
| + NOTREACHED() << "TODO(jam)"; |
| + } |
| + } |
| + |
| + // Otherwise we will never quit since both endpoints of the channel could |
| + // be waiting for their Read calls. |
| + pending_read_ = false; |
| + |
| + if (write_signalled) { |
| + NOTREACHED() << "TODO(jam)"; |
| + } |
| + |
| + if (ShouldSelfDestruct()) |
| + delete this; |
| + } |
| + |
| + embedder::ScopedPlatformHandle ReleaseHandle( |
| + std::vector<char>* read_buffer) { |
| + // TODO(jam): handle XP |
| + CancelIoEx(handle(), NULL); |
| + // NOTE!!!! |
| + // The above call will cancel pending IO calls. |
| + // HOWEVER, some could have already finished and posted task to IO thread |
| + // that will execute |
| + |
| + |
| + size_t read_buffer_byte_size = owner_->read_buffer()->num_valid_bytes(); |
| + |
| + if (pending_read_) { |
| + DWORD bytes_read_dword = 0; |
| + |
| + DWORD old_bytes = read_context_.overlapped.InternalHigh; |
| + |
| + //TODO(jam): for XP, can return TRUE here to wait. also below. |
| + BOOL rv = GetOverlappedResult( |
| + handle(), &read_context_.overlapped, &bytes_read_dword, FALSE); |
| + DCHECK_EQ(old_bytes, bytes_read_dword); |
| + if (rv) { |
| + if (read_context_.overlapped.Internal != STATUS_CANCELLED) { |
| + read_buffer_byte_size += read_context_.overlapped.InternalHigh; |
| + } |
| + } |
| + pending_read_ = false; |
| + } |
| + |
| + RawChannel::WriteBuffer* write_buffer = owner_->write_buffer_no_lock(); |
| + |
| + if (pending_write_) { |
| + DWORD bytes_written_dword = 0; |
| + DWORD old_bytes = write_context_.overlapped.InternalHigh; |
| + |
| + |
| + BOOL rv = GetOverlappedResult( |
| + handle(), &write_context_.overlapped, &bytes_written_dword, FALSE); |
| + |
| + if (old_bytes != bytes_written_dword) { |
| + NOTREACHED(); |
| + } |
| + |
| + if (rv) { |
| + if (write_context_.overlapped.Internal != STATUS_CANCELLED) { |
| + CHECK(write_buffer->queue_size() != 0); |
| + |
| + // TODO(jam) |
| + DCHECK(!write_buffer->HavePlatformHandlesToSend()); |
| + |
| + write_buffer->data_offset_ += bytes_written_dword; |
| + |
| + // TODO(jam): copied from OnWriteCompletedNoLock |
| + MessageInTransit* message = |
| + write_buffer->message_queue()->PeekMessage(); |
| + if (write_buffer->data_offset_ >= message->total_size()) { |
| + // Complete write. |
| + CHECK_EQ(write_buffer->data_offset_, message->total_size()); |
| + write_buffer->message_queue_.DiscardMessage(); |
| + write_buffer->platform_handles_offset_ = 0; |
| + write_buffer->data_offset_ = 0; |
| + } |
| + |
| + |
| + //TODO(jam): handle more write msgs |
| + DCHECK(write_buffer->message_queue_.IsEmpty()); |
| + } |
| + } |
| + pending_write_ = false; |
| + } |
| + |
| + if (read_buffer_byte_size) { |
| + read_buffer->resize(read_buffer_byte_size); |
| + memcpy(&(*read_buffer)[0], owner_->read_buffer()->buffer(), |
| + read_buffer_byte_size); |
| + owner_->read_buffer()->Reset(); |
| + } |
| + |
| + return embedder::ScopedPlatformHandle(handle_.release()); |
| + } |
| + |
| + void OnObjectSignaled(HANDLE object) override { |
| + // TODO(jam): need to call out to owner? |
| + |
| + |
| + //not needed per doc, since reset by readfile & writefile |
| + //ResetEvent(object); |
| + if (object == read_event) { |
| + if (read_context_.overlapped.Internal == STATUS_CANCELLED) |
| + return; |
| + |
| + { |
| + // TODO(jam): improve this locking |
| + base::AutoLock locker(owner_->read_lock()); |
| + if (!handle_.is_valid()) { |
| + |
| + |
| + |
| + |
| + // TODO(jam): have way to cancel objectwatcher posttask so that we |
| + // can have a dcheck here and catch any erronous conditions. right |
| + // now this fires for non-problems. |
| + return; |
| + |
| + |
| + |
| + } |
| + // this isn't covered with above lock |
| + // DCHECK(!owner_->debug_started_sending()); |
| + DCHECK(handle_.is_valid()); |
| + } |
| + OnIOCompleted(&read_context_, read_context_.overlapped.InternalHigh, |
| + read_context_.overlapped.Internal); |
| + |
| + } else { |
| + if (write_context_.overlapped.Internal == STATUS_CANCELLED) |
| + return; |
| + DCHECK(!owner_->debug_started_sending()); |
| + DCHECK(handle_.is_valid()); |
| + CHECK(object == write_event); |
| + OnIOCompleted(&write_context_, write_context_.overlapped.InternalHigh, |
|
yzshen1
2015/09/23 22:47:09
Since write could happen on any thread, and we sta
|
| + write_context_.overlapped.Internal); |
| + } |
| + } |
| + HANDLE read_event, write_event; |
| + base::win::ObjectWatcher read_watcher_, write_watcher_; |
| + |
| + bool pipe_disconnected() { return pipe_disconnected_; } |
| + |
| + void QuitNow() { |
| + pipe_disconnected_ = true; |
| + owner_ = nullptr; |
| + base::MessageLoop::current()->DeleteSoon(FROM_HERE, this); |
| + } |
| + |
| + private: |
| + // Returns true if |owner_| has been reset and there is not pending read or |
| + // write. |
| + // Must be called on the I/O thread. |
| + bool ShouldSelfDestruct() const { |
| + if (owner_ || suppress_self_destruct_) |
| + return false; |
| + |
| + if (pipe_disconnected_) |
| + return true; |
| + |
| + // Note: Detached, hence no lock needed for |pending_write_|. |
| + return !pending_read_ && !pending_write_; |
| + } |
| + |
| + // Must be called on the I/O thread. It may be called before or after |
| + // detaching from the owner. |
| + void OnReadCompleted(DWORD bytes_read, DWORD error) { |
| + DCHECK(owner_); |
| + DCHECK(base::MessageLoop::current() == owner_->message_loop_for_io()); |
| + DCHECK(suppress_self_destruct_); |
| + |
| + if (g_use_autoreset_event && !pending_read_) |
| + return; |
| + |
| + CHECK(pending_read_); |
| + pending_read_ = false; |
| + if (!owner_) |
| + return; |
| + |
| + // Note: |OnReadCompleted()| may detach us from |owner_|. |
| + if (error == ERROR_SUCCESS) { |
| + DCHECK_GT(bytes_read, 0u); |
| + owner_->OnReadCompleted(IO_SUCCEEDED, bytes_read); |
| + } else if (error == ERROR_BROKEN_PIPE || |
| + (g_use_autoreset_event && error == STATUS_PIPE_BROKEN)) { |
| + DCHECK_EQ(bytes_read, 0u); |
| + pipe_disconnected_ = true; |
| + owner_->OnReadCompleted(IO_FAILED_SHUTDOWN, 0); |
| + } else if (error == ERROR_NO_MORE_ITEMS && g_use_autoreset_event) { |
| + return owner_->OnReadCompleted(IO_SUCCEEDED, bytes_read); |
| + } else { |
| + DCHECK_EQ(bytes_read, 0u); |
| + LOG(WARNING) << "ReadFile: " << logging::SystemErrorCodeToString(error); |
| + owner_->OnReadCompleted(IO_FAILED_UNKNOWN, 0); |
| + } |
| + } |
| + |
| + // Must be called on the I/O thread. It may be called before or after |
| + // detaching from the owner. |
| + void OnWriteCompleted(DWORD bytes_written, DWORD error) { |
| + DCHECK(base::MessageLoop::current() == owner_->message_loop_for_io()); |
| + DCHECK(suppress_self_destruct_); |
| + |
| + if (!owner_) { |
| + // No lock needed. |
| + CHECK(pending_write_); |
| + pending_write_ = false; |
| + return; |
| + } |
| + |
| + { |
| + base::AutoLock locker(owner_->write_lock()); |
| + if (g_use_autoreset_event && !pending_write_) |
| + return; |
| + |
| + CHECK(pending_write_); |
| + pending_write_ = false; |
| + } |
| + |
| + // Note: |OnWriteCompleted()| may detach us from |owner_|. |
| + if (error == ERROR_SUCCESS) { |
| + // Reset |platform_handles_written_| before calling |OnWriteCompleted()| |
| + // since that function may call back to this class and set it again. |
| + size_t local_platform_handles_written_ = platform_handles_written_; |
| + platform_handles_written_ = 0; |
| + owner_->OnWriteCompleted(IO_SUCCEEDED, local_platform_handles_written_, |
| + bytes_written); |
| + } else if (error == ERROR_BROKEN_PIPE || |
| + (g_use_autoreset_event && error ==STATUS_PIPE_BROKEN)) { |
|
yzshen1
2015/09/23 22:47:09
It seems nice to unify the error code used.
|
| + pipe_disconnected_ = true; |
| + owner_->OnWriteCompleted(IO_FAILED_SHUTDOWN, 0, 0); |
| + } else if (error == ERROR_NO_MORE_ITEMS && g_use_autoreset_event) { |
| + size_t local_platform_handles_written_ = platform_handles_written_; |
| + platform_handles_written_ = 0; |
| + owner_->OnWriteCompleted(IO_SUCCEEDED, local_platform_handles_written_, |
| + bytes_written); |
| + } else { |
| + LOG(WARNING) << "WriteFile: " |
| + << logging::SystemErrorCodeToString(error); |
| + owner_->OnWriteCompleted(IO_FAILED_UNKNOWN, 0, 0); |
| + } |
| + } |
| + |
| + embedder::ScopedPlatformHandle handle_; |
| + |
| + // |owner_| is reset on the I/O thread under |owner_->write_lock()|. |
| + // Therefore, it may be used on any thread under lock; or on the I/O thread |
| + // without locking. |
| + RawChannelWin* owner_; |
| + |
| + // The following members must be used on the I/O thread. |
| + scoped_ptr<ReadBuffer> preserved_read_buffer_after_detach_; |
| + scoped_ptr<WriteBuffer> preserved_write_buffer_after_detach_; |
| + bool suppress_self_destruct_; |
| + |
| + bool pending_read_; |
| + base::MessageLoopForIO::IOContext read_context_; |
| + |
| + // The following members must be used under |owner_->write_lock()| while the |
| + // object is still attached to the owner, and only on the I/O thread |
| + // afterwards. |
| + bool pending_write_; |
| + size_t platform_handles_written_; |
| + base::MessageLoopForIO::IOContext write_context_; |
| + |
| + bool pipe_disconnected_; |
| + |
| + MOJO_DISALLOW_COPY_AND_ASSIGN(RawChannelIOHandler); |
| + }; |
| + |
| + embedder::ScopedPlatformHandle ReleaseHandleNoLock( |
| + std::vector<char>* read_buffer_out) override { |
| + std::vector<WriteBuffer::Buffer> buffers; |
| + write_buffer_no_lock()->GetBuffers(&buffers); |
| + if (!buffers.empty()) { |
| + // TODO(jam): copy code in OnShutdownNoLock |
| + NOTREACHED() << "releasing handle with pending write buffer"; |
| + } |
| + |
| + |
| + if( handle_.is_valid()) { |
| + // SetInitialBuffer could have been called on main thread before OnInit |
| + // is called on Io thread. and in meantime releasehandle called. |
| + //DCHECK(read_buffer()->num_valid_bytes() == 0); |
| + if (read_buffer()->num_valid_bytes()) { |
| + read_buffer_out->resize(read_buffer()->num_valid_bytes()); |
| + memcpy(&(*read_buffer_out)[0], read_buffer()->buffer(), |
| + read_buffer()->num_valid_bytes()); |
| + read_buffer()->Reset(); |
| + } |
| + DCHECK(write_buffer_no_lock()->queue_size() == 0); |
| + return embedder::ScopedPlatformHandle( |
| + embedder::PlatformHandle(handle_.release().handle)); |
| + } |
| + |
| + return io_handler_->ReleaseHandle(read_buffer_out); |
| + } |
| + embedder::PlatformHandle HandleForDebuggingNoLock() override { |
| + if (handle_.is_valid()) |
| + return handle_.get(); |
| + |
| + if (!io_handler_) |
| + return embedder::PlatformHandle(); |
| + |
| + return embedder::PlatformHandle(io_handler_->handle()); |
| + } |
| + |
| + IOResult Read(size_t* bytes_read) override { |
| + DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); |
| + |
| + char* buffer = nullptr; |
| + size_t bytes_to_read = 0; |
| + read_buffer()->GetBuffer(&buffer, &bytes_to_read); |
| + |
| + DCHECK(io_handler_); |
| + DCHECK(!io_handler_->pending_read()); |
| + BOOL result = ReadFile( |
| + io_handler_->handle(), buffer, static_cast<DWORD>(bytes_to_read), |
| + nullptr, &io_handler_->read_context()->overlapped); |
| + if (!result) { |
| + DWORD error = GetLastError(); |
| + if (error == ERROR_BROKEN_PIPE) |
| + return IO_FAILED_SHUTDOWN; |
| + if (error != ERROR_IO_PENDING) { |
| + LOG(WARNING) << "ReadFile: " << logging::SystemErrorCodeToString(error); |
| + return IO_FAILED_UNKNOWN; |
| + } |
| + } |
| + |
| + if (result && skip_completion_port_on_success_) { |
| + DWORD bytes_read_dword = 0; |
| + BOOL get_size_result = GetOverlappedResult( |
| + io_handler_->handle(), &io_handler_->read_context()->overlapped, |
| + &bytes_read_dword, FALSE); |
| + DPCHECK(get_size_result); |
| + *bytes_read = bytes_read_dword; |
| + return IO_SUCCEEDED; |
| + } |
| + |
| + if (!g_use_autoreset_event) { |
| + if (!g_use_iocp) { |
| + io_handler_->read_watcher_.StartWatching( |
| + io_handler_->read_event, io_handler_, false); |
| + } |
| + } |
| + // If the read is pending or the read has succeeded but we don't skip |
| + // completion port on success, instruct |io_handler_| to wait for the |
| + // completion packet. |
| + // |
| + // TODO(yzshen): It seems there isn't document saying that all error cases |
| + // (other than ERROR_IO_PENDING) are guaranteed to *not* queue a completion |
| + // packet. If we do get one for errors, |
| + // |RawChannelIOHandler::OnIOCompleted()| will crash so we will learn about |
| + // it. |
| + |
| + io_handler_->OnPendingReadStarted(); |
| + return IO_PENDING; |
| + } |
| + |
| + IOResult ScheduleRead() override { |
| + if (!io_handler_) |
| + return IO_PENDING; // OnInit could have earlied out. |
| + |
| + DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); |
| + DCHECK(io_handler_); |
| + DCHECK(!io_handler_->pending_read()); |
| + |
| + size_t bytes_read = 0; |
| + IOResult io_result = Read(&bytes_read); |
| + if (io_result == IO_SUCCEEDED) { |
| + DCHECK(skip_completion_port_on_success_); |
| + |
| + // We have finished reading successfully. Queue a notification manually. |
| + io_handler_->OnPendingReadStarted(); |
| + // |io_handler_| won't go away before the task is run, so it is safe to |
| + // use |base::Unretained()|. |
| + message_loop_for_io()->PostTask( |
| + FROM_HERE, base::Bind(&RawChannelIOHandler::OnIOCompleted, |
| + base::Unretained(io_handler_), |
| + base::Unretained(io_handler_->read_context()), |
| + static_cast<DWORD>(bytes_read), ERROR_SUCCESS)); |
| + return IO_PENDING; |
| + } |
| + |
| + return io_result; |
| + } |
| + embedder::ScopedPlatformHandleVectorPtr GetReadPlatformHandles( |
| + size_t num_platform_handles, |
| + const void* platform_handle_table) override { |
| + // TODO(jam): this code will have to be updated once it's used in a sandbox |
| + // and the receiving process doesn't have duplicate permission for the |
| + // receiver. Once there's a broker and we have a connection to it (possibly |
| + // through ConnectionManager), then we can make a sync IPC to it here to get |
| + // a token for this handle, and it will duplicate the handle to is process. |
| + // Then we pass the token to the receiver, which will then make a sync call |
| + // to the broker to get a duplicated handle. This will also allow us to |
| + // avoid leaks of the handle if the receiver dies, since the broker can |
| + // notice that. |
| + DCHECK_GT(num_platform_handles, 0u); |
| + embedder::ScopedPlatformHandleVectorPtr rv( |
| + new embedder::PlatformHandleVector()); |
| + |
| + const char* serialization_data = |
| + static_cast<const char*>(platform_handle_table); |
| + for (size_t i = 0; i < num_platform_handles; i++) { |
| + DWORD pid = *reinterpret_cast<const DWORD*>(serialization_data); |
| + serialization_data += sizeof(DWORD); |
| + HANDLE source_handle = |
| + *reinterpret_cast<const HANDLE*>(serialization_data); |
| + serialization_data += sizeof(HANDLE); |
| + base::Process sender = |
| + base::Process::OpenWithAccess(pid, PROCESS_DUP_HANDLE); |
| + DCHECK(sender.IsValid()); |
| + HANDLE target_handle = NULL; |
| + BOOL dup_result = DuplicateHandle( |
| + sender.Handle(), source_handle, |
| + base::GetCurrentProcessHandle(), &target_handle, 0, |
| + FALSE, DUPLICATE_SAME_ACCESS | DUPLICATE_CLOSE_SOURCE); |
| + DCHECK(dup_result); |
| + rv->push_back(embedder::PlatformHandle(target_handle)); |
| + } |
| + return rv.Pass(); |
| + } |
| + |
| + IOResult WriteNoLock(size_t* platform_handles_written, |
| + size_t* bytes_written) override { |
| + write_lock().AssertAcquired(); |
| + |
| + DCHECK(io_handler_); |
| + DCHECK(!io_handler_->pending_write_no_lock()); |
| + |
| + size_t num_platform_handles = 0; |
| + if (write_buffer_no_lock()->HavePlatformHandlesToSend()) { |
| + // Since we're not sure which process might ultimately deserialize this |
| + // message, we can't duplicate the handle now. Instead, write the process |
| + // ID and handle now and let the receiver duplicate it. |
| + embedder::PlatformHandle* platform_handles; |
| + void* serialization_data_temp; |
| + write_buffer_no_lock()->GetPlatformHandlesToSend( |
| + &num_platform_handles, &platform_handles, &serialization_data_temp); |
| + char* serialization_data = static_cast<char*>(serialization_data_temp); |
| + DCHECK_GT(num_platform_handles, 0u); |
| + DCHECK(platform_handles); |
| + |
| + DWORD current_process_id = base::GetCurrentProcId(); |
| + for (size_t i = 0; i < num_platform_handles; i++) { |
| + *reinterpret_cast<DWORD*>(serialization_data) = current_process_id; |
| + serialization_data += sizeof(DWORD); |
| + *reinterpret_cast<HANDLE*>(serialization_data) = |
| + platform_handles[i].handle; |
| + serialization_data += sizeof(HANDLE); |
| + platform_handles[i] = embedder::PlatformHandle(); |
| + } |
| + } |
| + |
| + std::vector<WriteBuffer::Buffer> buffers; |
| + write_buffer_no_lock()->GetBuffers(&buffers); |
| + DCHECK(!buffers.empty()); |
| + |
| + // TODO(yzshen): Handle multi-segment writes more efficiently. |
| + DWORD bytes_written_dword = 0; |
| + |
| + |
| + |
| + |
| + // TODO(jam): right now we get in bad situation where we might first write |
|
yzshen1
2015/09/23 22:47:09
I haven't understood: I thought we synchronously f
|
| + // the main buffer and then the MP gets sent before we write the transport |
| + // buffer. We can fix this by sending information about partially written |
| + // messages, or by teaching transport buffer how to grow the main buffer and |
| + // write its data there. |
| + // Until that's done, for now make another copy. |
| + |
| + size_t total_size = buffers[0].size; |
| + if (buffers.size() > 1) |
| + total_size+=buffers[1].size; |
| + char* buf = new char[total_size]; |
| + memcpy(buf, buffers[0].addr, buffers[0].size); |
| + if (buffers.size() > 1) |
| + memcpy(buf + buffers[0].size, buffers[1].addr, buffers[1].size); |
| + |
| + BOOL result = WriteFile( |
| + io_handler_->handle(), buf, |
| + static_cast<DWORD>(total_size), |
| + &bytes_written_dword, |
| + &io_handler_->write_context_no_lock()->overlapped); |
| + delete [] buf; |
|
yzshen1
2015/09/23 22:47:09
I think we need to keep the buffer valid until the
|
| + |
| + if (!result) { |
| + DWORD error = GetLastError(); |
| + if (error == ERROR_BROKEN_PIPE) |
| + return IO_FAILED_SHUTDOWN; |
| + if (error != ERROR_IO_PENDING) { |
| + LOG(WARNING) << "WriteFile: " |
| + << logging::SystemErrorCodeToString(error); |
| + return IO_FAILED_UNKNOWN; |
| + } |
| + } |
| + |
| + if (result && skip_completion_port_on_success_) { |
| + *platform_handles_written = num_platform_handles; |
| + *bytes_written = bytes_written_dword; |
| + return IO_SUCCEEDED; |
| + } |
| + |
| + if (!g_use_autoreset_event) { |
| + if (!g_use_iocp) { |
| + io_handler_->write_watcher_.StartWatching( |
| + io_handler_->write_event, io_handler_, false); |
| + } |
| + } |
| + // If the write is pending or the write has succeeded but we don't skip |
| + // completion port on success, instruct |io_handler_| to wait for the |
| + // completion packet. |
| + // |
| + // TODO(yzshen): it seems there isn't document saying that all error cases |
| + // (other than ERROR_IO_PENDING) are guaranteed to *not* queue a completion |
| + // packet. If we do get one for errors, |
| + // |RawChannelIOHandler::OnIOCompleted()| will crash so we will learn about |
| + // it. |
| + |
| + io_handler_->OnPendingWriteStartedNoLock(num_platform_handles); |
| + return IO_PENDING; |
| + } |
| + |
| + IOResult ScheduleWriteNoLock() override { |
| + write_lock().AssertAcquired(); |
| + |
| + DCHECK(io_handler_); |
| + DCHECK(!io_handler_->pending_write_no_lock()); |
| + |
| + size_t platform_handles_written = 0; |
| + size_t bytes_written = 0; |
| + IOResult io_result = WriteNoLock(&platform_handles_written, &bytes_written); |
| + if (io_result == IO_SUCCEEDED) { |
| + DCHECK(skip_completion_port_on_success_); |
| + |
| + // We have finished writing successfully. Queue a notification manually. |
| + io_handler_->OnPendingWriteStartedNoLock(platform_handles_written); |
| + // |io_handler_| won't go away before that task is run, so it is safe to |
| + // use |base::Unretained()|. |
| + message_loop_for_io()->PostTask( |
| + FROM_HERE, |
| + base::Bind(&RawChannelIOHandler::OnIOCompleted, |
| + base::Unretained(io_handler_), |
| + base::Unretained(io_handler_->write_context_no_lock()), |
| + static_cast<DWORD>(bytes_written), ERROR_SUCCESS)); |
| + return IO_PENDING; |
| + } |
| + |
| + return io_result; |
| + } |
| + |
| + void OnInit() override { |
| + DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); |
| + |
| + if (!handle_.is_valid()) { |
| + LOG(ERROR) << "Note: RawChannelWin " << this |
| + << " early exiting in OnInit because no handle"; |
| + return; |
| + } |
| + |
| + DCHECK(handle_.is_valid()); |
| + if (skip_completion_port_on_success_) { |
| + // I don't know how this can fail (unless |handle_| is bad, in which case |
| + // it's a bug in our code). |
| + CHECK(g_vista_or_higher_functions.Get(). |
| + SetFileCompletionNotificationModes( |
| + handle_.get().handle, FILE_SKIP_COMPLETION_PORT_ON_SUCCESS)); |
| + } |
| + |
| + DCHECK(!io_handler_); |
| + io_handler_ = new RawChannelIOHandler(this, handle_.Pass()); |
| + } |
| + |
| + void OnShutdownNoLock(scoped_ptr<ReadBuffer> read_buffer, |
| + scoped_ptr<WriteBuffer> write_buffer) override { |
| + // happens on shutdown if didn't call init when doing createduplicate |
| + if (message_loop_for_io()) { |
| + DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); |
| + } |
| + |
| + if (!io_handler_) { |
| + // This is hit when creating a duplicate dispatcher since we don't call |
|
yzshen1
2015/09/23 22:47:09
Before this CL, Init() is restricted to be called
|
| + // Init on it. |
| + DCHECK_EQ(read_buffer->num_valid_bytes(), 0U); |
| + DCHECK_EQ(write_buffer->queue_size(), 0U); |
| + return; |
| + } |
| + |
| + if (io_handler_->pending_read() || io_handler_->pending_write_no_lock()) { |
| + // |io_handler_| will be alive until pending read/write (if any) |
| + // completes. Call |CancelIoEx()| or |CancelIo()| so that resources can be |
| + // freed up as soon as possible. |
| + // Note: |CancelIo()| only cancels read/write requests started from this |
| + // thread. |
| + if (g_vista_or_higher_functions.Get().is_vista_or_higher()) { |
| + g_vista_or_higher_functions.Get().CancelIoEx(io_handler_->handle(), |
| + nullptr); |
| + } else { |
| + CancelIo(io_handler_->handle()); |
| + } |
| + } |
| + |
| + io_handler_->DetachFromOwnerNoLock(read_buffer.Pass(), write_buffer.Pass()); |
| + io_handler_ = nullptr; |
| + } |
| + |
| + // Passed to |io_handler_| during initialization. |
| + embedder::ScopedPlatformHandle handle_; |
| + |
| + RawChannelIOHandler* io_handler_; |
| + |
| + const bool skip_completion_port_on_success_; |
| + |
| + MOJO_DISALLOW_COPY_AND_ASSIGN(RawChannelWin); |
| +}; |
| + |
| + |
| +} // namespace |
| + |
| +// ----------------------------------------------------------------------------- |
| + |
| +RawChannel* RawChannel::Create(embedder::ScopedPlatformHandle handle) { |
| + return new RawChannelWin(handle.Pass()); |
| +} |
| + |
| +size_t RawChannel::GetSerializedPlatformHandleSize() { |
| + return sizeof(DWORD) + sizeof(HANDLE); |
| +} |
| + |
| +} // namespace system |
| +} // namespace mojo |