Chromium Code Reviews| Index: mojo/public/bindings/lib/connector.cc |
| diff --git a/mojo/public/bindings/lib/connector.cc b/mojo/public/bindings/lib/connector.cc |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..8a1144489aaf4e4cd9ade11115e08e28756e99a0 |
| --- /dev/null |
| +++ b/mojo/public/bindings/lib/connector.cc |
| @@ -0,0 +1,161 @@ |
| +// Copyright 2013 The Chromium Authors. All rights reserved. |
| +// Use of this source code is governed by a BSD-style license that can be |
| +// found in the LICENSE file. |
| + |
| +#include "mojo/public/bindings/lib/connector.h" |
| + |
| +#include <assert.h> |
| +#include <stdlib.h> |
| + |
| +#include <algorithm> |
| + |
| +namespace mojo { |
| + |
| +// ---------------------------------------------------------------------------- |
| + |
| +Connector::Connector(Handle message_pipe) |
| + : message_pipe_(message_pipe), |
| + incoming_receiver_(NULL), |
| + error_(false) { |
| +} |
| + |
| +Connector::~Connector() { |
| + if (read_callback_.IsPending()) |
| + read_callback_.Cancel(); |
| + if (write_callback_.IsPending()) |
| + write_callback_.Cancel(); |
| +} |
| + |
| +void Connector::SetIncomingReceiver(MessageReceiver* receiver) { |
| + assert(!incoming_receiver_); |
| + incoming_receiver_ = receiver; |
| + if (incoming_receiver_) |
| + WaitToReadMore(); |
| +} |
| + |
| +bool Connector::Accept(Message* message) { |
| + if (error_) |
| + return false; |
| + |
| + write_queue_.push(Message()); |
| + write_queue_.back().Swap(message); |
| + WriteMore(); |
| + return !error_; |
| +} |
| + |
| +void Connector::OnHandleReady(Callback* callback, MojoResult result) { |
| + if (callback == &read_callback_) |
| + ReadMore(); |
| + if (callback == &write_callback_) |
| + WriteMore(); |
| +} |
| + |
| +void Connector::WaitToReadMore() { |
| + read_callback_.SetOwnerToNotify(this); |
| + |
| + bool ok = BindingsSupport::Get()->AsyncWait(message_pipe_, |
| + MOJO_WAIT_FLAG_READABLE, |
| + MOJO_DEADLINE_INDEFINITE, |
| + &read_callback_); |
| + if (!ok) |
| + error_ = true; |
| +} |
| + |
| +void Connector::WaitToWriteMore() { |
| + write_callback_.SetOwnerToNotify(this); |
| + |
| + bool ok = BindingsSupport::Get()->AsyncWait(message_pipe_, |
| + MOJO_WAIT_FLAG_WRITABLE, |
| + MOJO_DEADLINE_INDEFINITE, |
| + &write_callback_); |
| + if (!ok) |
| + error_ = true; |
| +} |
| + |
| +void Connector::ReadMore() { |
| + for (;;) { |
| + MojoResult rv; |
| + |
| + uint32_t num_bytes = 0, num_handles = 0; |
| + rv = ReadMessage(message_pipe_, |
|
viettrungluu
2013/11/05 22:45:29
I wonder if it wouldn't be better (mostly for perf
|
| + NULL, |
| + &num_bytes, |
| + NULL, |
| + &num_handles, |
| + MOJO_READ_MESSAGE_FLAG_NONE); |
| + if (rv == MOJO_RESULT_NOT_FOUND) { |
| + WaitToReadMore(); |
| + break; |
| + } |
| + if (rv != MOJO_RESULT_RESOURCE_EXHAUSTED) { |
| + error_ = true; |
| + break; |
| + } |
| + |
| + Message message; |
| + message.data = static_cast<MessageData*>(malloc(num_bytes)); |
| + message.handles.resize(num_handles); |
| + |
| + rv = ReadMessage(message_pipe_, |
| + message.data, |
| + &num_bytes, |
| + &message.handles[0], |
| + &num_handles, |
| + MOJO_READ_MESSAGE_FLAG_NONE); |
| + if (rv != MOJO_RESULT_OK) { |
| + error_ = true; |
| + break; |
| + } |
| + |
| + incoming_receiver_->Accept(&message); |
| + } |
| +} |
| + |
| +void Connector::WriteMore() { |
| + while (!write_queue_.empty()) { |
| + const Message& message = write_queue_.back(); |
| + |
| + MojoResult rv = WriteMessage(message_pipe_, |
| + message.data, |
| + message.data->header.num_bytes, |
| + message.handles.data(), |
| + message.handles.size(), |
| + MOJO_WRITE_MESSAGE_FLAG_NONE); |
| + if (rv == MOJO_RESULT_OK) { |
| + write_queue_.pop(); |
| + continue; // Write another message. |
| + } |
| + |
| + error_ = true; |
| + break; |
| + } |
| +} |
| + |
| +// ---------------------------------------------------------------------------- |
| + |
| +Connector::Callback::Callback() |
| + : owner_(NULL) { |
| +} |
| + |
| +void Connector::Callback::Cancel() { |
| + owner_ = NULL; |
| + BindingsSupport::Get()->CancelWait(this); |
| +} |
| + |
| +void Connector::Callback::SetOwnerToNotify(Connector* owner) { |
| + assert(!owner_); |
| + owner_ = owner; |
| +} |
| + |
| +bool Connector::Callback::IsPending() const { |
| + return owner_ != NULL; |
| +} |
| + |
| +void Connector::Callback::OnHandleReady(MojoResult result) { |
| + assert(owner_); |
| + Connector* owner = NULL; |
| + std::swap(owner, owner_); |
| + owner->OnHandleReady(this, result); |
| +} |
| + |
| +} // namespace mojo |