Index: ipc/mojo/async_handle_waiter.cc |
diff --git a/ipc/mojo/async_handle_waiter.cc b/ipc/mojo/async_handle_waiter.cc |
index 7b199ed95e835eec7dae4533ef4474f400030bf6..a9eddb1b48a1f938076ee0823b8245557c544d23 100644 |
--- a/ipc/mojo/async_handle_waiter.cc |
+++ b/ipc/mojo/async_handle_waiter.cc |
@@ -33,10 +33,24 @@ class AsyncHandleWaiter::Context |
waiter_(waiter), |
last_result_(MOJO_RESULT_INTERNAL), |
processing_(false), |
- should_invoke_callback_(false) { |
+ should_invoke_callback_(false), |
+ should_invoke_message_callback_(false) { |
base::MessageLoopForIO::current()->AddIOObserver(this); |
} |
+ bool MessageWasReceived(const void* bytes, uint32_t num_bytes) { |
+ DCHECK(IsCalledFromIOHandler()); |
+ |
+ // For simplicity, we don't steal more than one message at a time. |
+ if (!message_body_.empty()) |
+ return false; |
+ |
+ should_invoke_message_callback_ = true; |
+ message_body_.resize(num_bytes); |
+ memcpy(&message_body_[0], bytes, num_bytes); |
+ return true; |
+ } |
+ |
void HandleIsReady(MojoResult result) { |
last_result_ = result; |
@@ -78,6 +92,16 @@ class AsyncHandleWaiter::Context |
waiter_->InvokeCallback(result); |
} |
+ void InvokeWaiterMessageCallback() { |
+ if (waiter_) { |
+ DCHECK(!message_body_.empty()); |
+ waiter_->InvokeMessageCallback( |
+ &message_body_[0], static_cast<uint32_t>(message_body_.size())); |
+ } |
+ |
+ message_body_.clear(); |
+ } |
+ |
// IOObserver implementation: |
void WillProcessIOEvent() override { |
@@ -92,7 +116,7 @@ class AsyncHandleWaiter::Context |
// The zero |waiter_| indicates that |this| have lost the owner and can be |
// under destruction. So we cannot wrap it with a |scoped_refptr| anymore. |
if (!waiter_) { |
- should_invoke_callback_ = false; |
+ should_invoke_callback_ = should_invoke_message_callback_ = false; |
processing_ = false; |
return; |
} |
@@ -100,9 +124,16 @@ class AsyncHandleWaiter::Context |
// We have to protect |this| because |AsyncHandleWaiter| can be |
// deleted during the callback. |
scoped_refptr<Context> protect(this); |
- while (should_invoke_callback_) { |
- should_invoke_callback_ = false; |
- InvokeWaiterCallback(); |
+ while (should_invoke_callback_ || should_invoke_message_callback_) { |
+ if (should_invoke_callback_) { |
+ should_invoke_callback_ = false; |
+ InvokeWaiterCallback(); |
+ } |
+ |
+ if (should_invoke_message_callback_) { |
+ should_invoke_message_callback_ = false; |
+ InvokeWaiterMessageCallback(); |
+ } |
} |
processing_ = false; |
@@ -114,29 +145,49 @@ class AsyncHandleWaiter::Context |
const base::WeakPtr<AsyncHandleWaiter> waiter_; |
MojoResult last_result_; |
+ std::vector<unsigned char> message_body_; |
bool processing_; |
- bool should_invoke_callback_; |
+ bool should_invoke_callback_; // XXX: Rename |
+ bool should_invoke_message_callback_; // XXX: Rename |
DISALLOW_COPY_AND_ASSIGN(Context); |
}; |
-AsyncHandleWaiter::AsyncHandleWaiter(base::Callback<void(MojoResult)> callback) |
- : callback_(callback), |
- weak_factory_(this) { |
+AsyncHandleWaiter::AsyncHandleWaiter(AsyncHandleWaiter::Delegate* delegate) |
+ : delegate_(delegate), weak_factory_(this) { |
context_ = new Context(weak_factory_.GetWeakPtr()); |
+ context_callback_ = base::Bind(&Context::HandleIsReady, context_); |
+ context_message_callback_ = |
+ base::Bind(&Context::MessageWasReceived, context_); |
} |
AsyncHandleWaiter::~AsyncHandleWaiter() { |
} |
+void AsyncHandleWaiter::ClearMessageCallback(MojoHandle handle) { |
+ MojoResult rv = mojo::embedder::SetAsyncMessageCallback( |
+ handle, base::Callback<bool(const void*, uint32_t)>()); |
+ DCHECK_EQ(rv, MOJO_RESULT_OK); |
+} |
+ |
+void AsyncHandleWaiter::SetMessageCallback(MojoHandle handle) { |
+ MojoResult rv = mojo::embedder::SetAsyncMessageCallback( |
+ handle, context_message_callback_); |
+ DCHECK_EQ(rv, MOJO_RESULT_OK); |
+} |
+ |
MojoResult AsyncHandleWaiter::Wait(MojoHandle handle, |
MojoHandleSignals signals) { |
- return mojo::embedder::AsyncWait( |
- handle, signals, base::Bind(&Context::HandleIsReady, context_)); |
+ return mojo::embedder::AsyncWait(handle, signals, context_callback_); |
} |
void AsyncHandleWaiter::InvokeCallback(MojoResult result) { |
- callback_.Run(result); |
+ delegate_->PipeIsReady(result); |
+} |
+ |
+void AsyncHandleWaiter::InvokeMessageCallback(const void* bytes, |
+ uint32_t num_bytes) { |
+ delegate_->MessageWasArrived(bytes, num_bytes); |
} |
// static |