Index: mojo/public/bindings/lib/connector.cc |
diff --git a/mojo/public/bindings/lib/connector.cc b/mojo/public/bindings/lib/connector.cc |
new file mode 100644 |
index 0000000000000000000000000000000000000000..8a1144489aaf4e4cd9ade11115e08e28756e99a0 |
--- /dev/null |
+++ b/mojo/public/bindings/lib/connector.cc |
@@ -0,0 +1,161 @@ |
+// Copyright 2013 The Chromium Authors. All rights reserved. |
+// Use of this source code is governed by a BSD-style license that can be |
+// found in the LICENSE file. |
+ |
+#include "mojo/public/bindings/lib/connector.h" |
+ |
+#include <assert.h> |
+#include <stdlib.h> |
+ |
+#include <algorithm> |
+ |
+namespace mojo { |
+ |
+// ---------------------------------------------------------------------------- |
+ |
+Connector::Connector(Handle message_pipe) |
+ : message_pipe_(message_pipe), |
+ incoming_receiver_(NULL), |
+ error_(false) { |
+} |
+ |
+Connector::~Connector() { |
+ if (read_callback_.IsPending()) |
+ read_callback_.Cancel(); |
+ if (write_callback_.IsPending()) |
+ write_callback_.Cancel(); |
+} |
+ |
+void Connector::SetIncomingReceiver(MessageReceiver* receiver) { |
+ assert(!incoming_receiver_); |
+ incoming_receiver_ = receiver; |
+ if (incoming_receiver_) |
+ WaitToReadMore(); |
+} |
+ |
+bool Connector::Accept(Message* message) { |
+ if (error_) |
+ return false; |
+ |
+ write_queue_.push(Message()); |
+ write_queue_.back().Swap(message); |
+ WriteMore(); |
+ return !error_; |
+} |
+ |
+void Connector::OnHandleReady(Callback* callback, MojoResult result) { |
+ if (callback == &read_callback_) |
+ ReadMore(); |
+ if (callback == &write_callback_) |
+ WriteMore(); |
+} |
+ |
+void Connector::WaitToReadMore() { |
+ read_callback_.SetOwnerToNotify(this); |
+ |
+ bool ok = BindingsSupport::Get()->AsyncWait(message_pipe_, |
+ MOJO_WAIT_FLAG_READABLE, |
+ MOJO_DEADLINE_INDEFINITE, |
+ &read_callback_); |
+ if (!ok) |
+ error_ = true; |
+} |
+ |
+void Connector::WaitToWriteMore() { |
+ write_callback_.SetOwnerToNotify(this); |
+ |
+ bool ok = BindingsSupport::Get()->AsyncWait(message_pipe_, |
+ MOJO_WAIT_FLAG_WRITABLE, |
+ MOJO_DEADLINE_INDEFINITE, |
+ &write_callback_); |
+ if (!ok) |
+ error_ = true; |
+} |
+ |
+void Connector::ReadMore() { |
+ for (;;) { |
+ MojoResult rv; |
+ |
+ uint32_t num_bytes = 0, num_handles = 0; |
+ rv = ReadMessage(message_pipe_, |
viettrungluu
2013/11/05 22:45:29
I wonder if it wouldn't be better (mostly for perf
|
+ NULL, |
+ &num_bytes, |
+ NULL, |
+ &num_handles, |
+ MOJO_READ_MESSAGE_FLAG_NONE); |
+ if (rv == MOJO_RESULT_NOT_FOUND) { |
+ WaitToReadMore(); |
+ break; |
+ } |
+ if (rv != MOJO_RESULT_RESOURCE_EXHAUSTED) { |
+ error_ = true; |
+ break; |
+ } |
+ |
+ Message message; |
+ message.data = static_cast<MessageData*>(malloc(num_bytes)); |
+ message.handles.resize(num_handles); |
+ |
+ rv = ReadMessage(message_pipe_, |
+ message.data, |
+ &num_bytes, |
+ &message.handles[0], |
+ &num_handles, |
+ MOJO_READ_MESSAGE_FLAG_NONE); |
+ if (rv != MOJO_RESULT_OK) { |
+ error_ = true; |
+ break; |
+ } |
+ |
+ incoming_receiver_->Accept(&message); |
+ } |
+} |
+ |
+void Connector::WriteMore() { |
+ while (!write_queue_.empty()) { |
+ const Message& message = write_queue_.back(); |
+ |
+ MojoResult rv = WriteMessage(message_pipe_, |
+ message.data, |
+ message.data->header.num_bytes, |
+ message.handles.data(), |
+ message.handles.size(), |
+ MOJO_WRITE_MESSAGE_FLAG_NONE); |
+ if (rv == MOJO_RESULT_OK) { |
+ write_queue_.pop(); |
+ continue; // Write another message. |
+ } |
+ |
+ error_ = true; |
+ break; |
+ } |
+} |
+ |
+// ---------------------------------------------------------------------------- |
+ |
+Connector::Callback::Callback() |
+ : owner_(NULL) { |
+} |
+ |
+void Connector::Callback::Cancel() { |
+ owner_ = NULL; |
+ BindingsSupport::Get()->CancelWait(this); |
+} |
+ |
+void Connector::Callback::SetOwnerToNotify(Connector* owner) { |
+ assert(!owner_); |
+ owner_ = owner; |
+} |
+ |
+bool Connector::Callback::IsPending() const { |
+ return owner_ != NULL; |
+} |
+ |
+void Connector::Callback::OnHandleReady(MojoResult result) { |
+ assert(owner_); |
+ Connector* owner = NULL; |
+ std::swap(owner, owner_); |
+ owner->OnHandleReady(this, result); |
+} |
+ |
+} // namespace mojo |