| Index: mojo/edk/system/channel_win.cc | 
| diff --git a/mojo/edk/system/channel_win.cc b/mojo/edk/system/channel_win.cc | 
| new file mode 100644 | 
| index 0000000000000000000000000000000000000000..00d06e1f03a50e4289f6b5d906d64260ae4862f0 | 
| --- /dev/null | 
| +++ b/mojo/edk/system/channel_win.cc | 
| @@ -0,0 +1,306 @@ | 
| +// Copyright 2016 The Chromium Authors. All rights reserved. | 
| +// Use of this source code is governed by a BSD-style license that can be | 
| +// found in the LICENSE file. | 
| + | 
| +#include "mojo/edk/system/channel.h" | 
| + | 
| +#include <windows.h> | 
| + | 
| +#include <algorithm> | 
| +#include <deque> | 
| + | 
| +#include "base/bind.h" | 
| +#include "base/location.h" | 
| +#include "base/macros.h" | 
| +#include "base/memory/ref_counted.h" | 
| +#include "base/memory/scoped_ptr.h" | 
| +#include "base/message_loop/message_loop.h" | 
| +#include "base/synchronization/lock.h" | 
| +#include "base/task_runner.h" | 
| +#include "mojo/edk/embedder/platform_handle_vector.h" | 
| + | 
| +namespace mojo { | 
| +namespace edk { | 
| + | 
| +namespace { | 
| + | 
| +const size_t kMaxBatchReadCapacity = 256 * 1024; | 
| + | 
| +// A view over a Channel::Message object. The write queue uses these since | 
| +// large messages may need to be sent in chunks. | 
| +class MessageView { | 
| + public: | 
| +  // Owns |message|. |offset| indexes the first unsent byte in the message. | 
| +  MessageView(Channel::MessagePtr message, size_t offset) | 
| +      : message_(std::move(message)), | 
| +        offset_(offset) { | 
| +    DCHECK_GT(message_->data_num_bytes(), offset_); | 
| +  } | 
| + | 
| +  MessageView(MessageView&& other) { *this = std::move(other); } | 
| + | 
| +  MessageView& operator=(MessageView&& other) { | 
| +    message_ = std::move(other.message_); | 
| +    offset_ = other.offset_; | 
| +    return *this; | 
| +  } | 
| + | 
| +  ~MessageView() {} | 
| + | 
| +  const void* data() const { | 
| +    return static_cast<const char*>(message_->data()) + offset_; | 
| +  } | 
| + | 
| +  size_t data_num_bytes() const { return message_->data_num_bytes() - offset_; } | 
| + | 
| +  size_t data_offset() const { return offset_; } | 
| +  void advance_data_offset(size_t num_bytes) { | 
| +    DCHECK_GE(message_->data_num_bytes(), offset_ + num_bytes); | 
| +    offset_ += num_bytes; | 
| +  } | 
| + | 
| +  Channel::MessagePtr TakeChannelMessage() { return std::move(message_); } | 
| + | 
| + private: | 
| +  Channel::MessagePtr message_; | 
| +  size_t offset_; | 
| + | 
| +  DISALLOW_COPY_AND_ASSIGN(MessageView); | 
| +}; | 
| + | 
| +class ChannelWin : public Channel, | 
| +                   public base::MessageLoop::DestructionObserver, | 
| +                   public base::MessageLoopForIO::IOHandler { | 
| + public: | 
| +  ChannelWin(Delegate* delegate, | 
| +             ScopedPlatformHandle handle, | 
| +             scoped_refptr<base::TaskRunner> io_task_runner) | 
| +      : Channel(delegate), | 
| +        self_(this), | 
| +        handle_(std::move(handle)), | 
| +        io_task_runner_(io_task_runner) { | 
| +    memset(&read_context_, 0, sizeof(read_context_)); | 
| +    read_context_.handler = this; | 
| + | 
| +    memset(&write_context_, 0, sizeof(write_context_)); | 
| +    write_context_.handler = this; | 
| +  } | 
| + | 
| +  void Start() override { | 
| +    io_task_runner_->PostTask( | 
| +        FROM_HERE, base::Bind(&ChannelWin::StartOnIOThread, this)); | 
| +  } | 
| + | 
| +  void ShutDownImpl() override { | 
| +    // Always shut down asynchronously when called through the public interface. | 
| +    io_task_runner_->PostTask( | 
| +        FROM_HERE, base::Bind(&ChannelWin::ShutDownOnIOThread, this)); | 
| +  } | 
| + | 
| +  void Write(MessagePtr message) override { | 
| +    bool write_error = false; | 
| +    { | 
| +      base::AutoLock lock(write_lock_); | 
| +      if (reject_writes_) | 
| +        return; | 
| + | 
| +      bool write_now = !delay_writes_ && outgoing_messages_.empty(); | 
| +      outgoing_messages_.emplace_back(std::move(message), 0); | 
| + | 
| +      if (write_now && !WriteNoLock(outgoing_messages_.front())) | 
| +        reject_writes_ = write_error = true; | 
| +    } | 
| +    if (write_error) { | 
| +      // Do not synchronously invoke OnError(). Write() may have been called by | 
| +      // the delegate and we don't want to re-enter it. | 
| +      io_task_runner_->PostTask(FROM_HERE, | 
| +                                base::Bind(&ChannelWin::OnError, this)); | 
| +    } | 
| +  } | 
| + | 
| +  ScopedPlatformHandleVectorPtr GetReadPlatformHandles( | 
| +      size_t num_handles, | 
| +      void** payload, | 
| +      size_t* payload_size) override { | 
| +    size_t handles_size = sizeof(PlatformHandle) * num_handles; | 
| +    if (handles_size > *payload_size) | 
| +      return nullptr; | 
| + | 
| +    *payload_size -= handles_size; | 
| +    ScopedPlatformHandleVectorPtr handles( | 
| +        new PlatformHandleVector(num_handles)); | 
| +    memcpy(handles->data(), | 
| +           static_cast<const char*>(*payload) + *payload_size, handles_size); | 
| +    return handles; | 
| +  } | 
| + | 
| + private: | 
| +  // May run on any thread. | 
| +  ~ChannelWin() override {} | 
| + | 
| +  void StartOnIOThread() { | 
| +    base::MessageLoop::current()->AddDestructionObserver(this); | 
| +    base::MessageLoopForIO::current()->RegisterIOHandler( | 
| +        handle_.get().handle, this); | 
| + | 
| +    // Now that we have registered our IOHandler, we can start writing. | 
| +    { | 
| +      base::AutoLock lock(write_lock_); | 
| +      if (delay_writes_) { | 
| +        delay_writes_ = false; | 
| +        WriteNextNoLock(); | 
| +      } | 
| +    } | 
| + | 
| +    // Keep this alive in case we synchronously run shutdown. | 
| +    scoped_refptr<ChannelWin> keep_alive(this); | 
| +    ReadMore(0); | 
| +  } | 
| + | 
| +  void ShutDownOnIOThread() { | 
| +    base::MessageLoop::current()->RemoveDestructionObserver(this); | 
| + | 
| +    CancelIo(handle_.get().handle); | 
| +    handle_.reset(); | 
| + | 
| +    // May destroy the |this| if it was the last reference. | 
| +    self_ = nullptr; | 
| +  } | 
| + | 
| +  // base::MessageLoop::DestructionObserver: | 
| +  void WillDestroyCurrentMessageLoop() override { | 
| +    DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); | 
| +    if (self_) | 
| +      ShutDownOnIOThread(); | 
| +  } | 
| + | 
| +  // base::MessageLoop::IOHandler: | 
| +  void OnIOCompleted(base::MessageLoopForIO::IOContext* context, | 
| +                     DWORD bytes_transfered, | 
| +                     DWORD error) override { | 
| +    if (error != ERROR_SUCCESS) { | 
| +      OnError(); | 
| +    } else if (context == &read_context_) { | 
| +      OnReadDone(static_cast<size_t>(bytes_transfered)); | 
| +    } else { | 
| +      CHECK(context == &write_context_); | 
| +      OnWriteDone(static_cast<size_t>(bytes_transfered)); | 
| +    } | 
| +    Release();  // Balancing reference taken after ReadFile / WriteFile. | 
| +  } | 
| + | 
| +  void OnReadDone(size_t bytes_read) { | 
| +    if (bytes_read > 0) { | 
| +      size_t next_read_size = 0; | 
| +      if (OnReadComplete(bytes_read, &next_read_size)) { | 
| +        ReadMore(next_read_size); | 
| +      } else { | 
| +        OnError(); | 
| +      } | 
| +    } else if (bytes_read == 0) { | 
| +      OnError(); | 
| +    } | 
| +  } | 
| + | 
| +  void OnWriteDone(size_t bytes_written) { | 
| +    if (bytes_written == 0) | 
| +      return; | 
| + | 
| +    bool write_error = false; | 
| +    { | 
| +      base::AutoLock lock(write_lock_); | 
| + | 
| +      DCHECK(!outgoing_messages_.empty()); | 
| + | 
| +      MessageView& message_view = outgoing_messages_.front(); | 
| +      message_view.advance_data_offset(bytes_written); | 
| +      if (message_view.data_num_bytes() == 0) { | 
| +        Channel::MessagePtr message = message_view.TakeChannelMessage(); | 
| +        outgoing_messages_.pop_front(); | 
| + | 
| +        // Clear any handles so they don't get closed on destruction. | 
| +        ScopedPlatformHandleVectorPtr handles = message->TakeHandles(); | 
| +        if (handles) | 
| +          handles->clear(); | 
| +      } | 
| + | 
| +      if (!WriteNextNoLock()) | 
| +        reject_writes_ = write_error = true; | 
| +    } | 
| +    if (write_error) | 
| +      OnError(); | 
| +  } | 
| + | 
| +  void ReadMore(size_t next_read_size_hint) { | 
| +    size_t buffer_capacity = next_read_size_hint; | 
| +    char* buffer = GetReadBuffer(&buffer_capacity); | 
| +    DCHECK_GT(buffer_capacity, 0u); | 
| + | 
| +    BOOL ok = ReadFile(handle_.get().handle, | 
| +                       buffer, | 
| +                       static_cast<DWORD>(buffer_capacity), | 
| +                       NULL, | 
| +                       &read_context_.overlapped); | 
| + | 
| +    if (ok || GetLastError() == ERROR_IO_PENDING) { | 
| +      AddRef();  // Will be balanced in OnIOCompleted | 
| +    } else { | 
| +      OnError(); | 
| +    } | 
| +  } | 
| + | 
| +  // Attempts to write a message directly to the channel. If the full message | 
| +  // cannot be written, it's queued and a wait is initiated to write the message | 
| +  // ASAP on the I/O thread. | 
| +  bool WriteNoLock(const MessageView& message_view) { | 
| +    BOOL ok = WriteFile(handle_.get().handle, | 
| +                        message_view.data(), | 
| +                        static_cast<DWORD>(message_view.data_num_bytes()), | 
| +                        NULL, | 
| +                        &write_context_.overlapped); | 
| + | 
| +    if (ok || GetLastError() == ERROR_IO_PENDING) { | 
| +      AddRef();  // Will be balanced in OnIOCompleted. | 
| +      return true; | 
| +    } | 
| +    return false; | 
| +  } | 
| + | 
| +  bool WriteNextNoLock() { | 
| +    if (outgoing_messages_.empty()) | 
| +      return true; | 
| +    return WriteNoLock(outgoing_messages_.front()); | 
| +  } | 
| + | 
| +  // Keeps the Channel alive at least until explicit shutdown on the IO thread. | 
| +  scoped_refptr<Channel> self_; | 
| + | 
| +  ScopedPlatformHandle handle_; | 
| +  scoped_refptr<base::TaskRunner> io_task_runner_; | 
| + | 
| +  base::MessageLoopForIO::IOContext read_context_; | 
| +  base::MessageLoopForIO::IOContext write_context_; | 
| + | 
| +  // Protects |reject_writes_| and |outgoing_messages_|. | 
| +  base::Lock write_lock_; | 
| + | 
| +  bool delay_writes_ = true; | 
| + | 
| +  bool reject_writes_ = false; | 
| +  std::deque<MessageView> outgoing_messages_; | 
| + | 
| +  DISALLOW_COPY_AND_ASSIGN(ChannelWin); | 
| +}; | 
| + | 
| +}  // namespace | 
| + | 
| +// static | 
| +scoped_refptr<Channel> Channel::Create( | 
| +    Delegate* delegate, | 
| +    ScopedPlatformHandle platform_handle, | 
| +    scoped_refptr<base::TaskRunner> io_task_runner) { | 
| +  return new ChannelWin(delegate, std::move(platform_handle), io_task_runner); | 
| +} | 
| + | 
| +}  // namespace edk | 
| +}  // namespace mojo | 
|  |