| OLD | NEW |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "ipc/mojo/async_handle_waiter.h" | 5 #include "ipc/mojo/async_handle_waiter.h" |
| 6 | 6 |
| 7 #include "base/atomic_ref_count.h" | 7 #include "base/atomic_ref_count.h" |
| 8 #include "base/bind.h" | 8 #include "base/bind.h" |
| 9 #include "base/bind_helpers.h" | 9 #include "base/bind_helpers.h" |
| 10 #include "base/location.h" | 10 #include "base/location.h" |
| (...skipping 15 matching lines...) Expand all Loading... |
| 26 class AsyncHandleWaiter::Context | 26 class AsyncHandleWaiter::Context |
| 27 : public base::RefCountedThreadSafe<AsyncHandleWaiter::Context, | 27 : public base::RefCountedThreadSafe<AsyncHandleWaiter::Context, |
| 28 AsyncHandleWaiterContextTraits>, | 28 AsyncHandleWaiterContextTraits>, |
| 29 public base::MessageLoopForIO::IOObserver { | 29 public base::MessageLoopForIO::IOObserver { |
| 30 public: | 30 public: |
| 31 Context(base::WeakPtr<AsyncHandleWaiter> waiter) | 31 Context(base::WeakPtr<AsyncHandleWaiter> waiter) |
| 32 : io_runner_(base::MessageLoopForIO::current()->task_runner()), | 32 : io_runner_(base::MessageLoopForIO::current()->task_runner()), |
| 33 waiter_(waiter), | 33 waiter_(waiter), |
| 34 last_result_(MOJO_RESULT_INTERNAL), | 34 last_result_(MOJO_RESULT_INTERNAL), |
| 35 processing_(false), | 35 processing_(false), |
| 36 should_invoke_callback_(false) { | 36 should_invoke_callback_(false), |
| 37 should_invoke_message_callback_(false) { |
| 37 base::MessageLoopForIO::current()->AddIOObserver(this); | 38 base::MessageLoopForIO::current()->AddIOObserver(this); |
| 38 } | 39 } |
| 39 | 40 |
| 41 bool MessageWasReceived(const void* bytes, uint32_t num_bytes) { |
| 42 DCHECK(IsCalledFromIOHandler()); |
| 43 |
| 44 // For simplicity, we don't steal more than one message at a time. |
| 45 if (!message_body_.empty()) |
| 46 return false; |
| 47 |
| 48 should_invoke_message_callback_ = true; |
| 49 message_body_.resize(num_bytes); |
| 50 memcpy(&message_body_[0], bytes, num_bytes); |
| 51 return true; |
| 52 } |
| 53 |
| 40 void HandleIsReady(MojoResult result) { | 54 void HandleIsReady(MojoResult result) { |
| 41 last_result_ = result; | 55 last_result_ = result; |
| 42 | 56 |
| 43 // If the signaling happens in the IO handler, use |IOObserver| callback | 57 // If the signaling happens in the IO handler, use |IOObserver| callback |
| 44 // to invoke the callback. | 58 // to invoke the callback. |
| 45 if (IsCalledFromIOHandler()) { | 59 if (IsCalledFromIOHandler()) { |
| 46 should_invoke_callback_ = true; | 60 should_invoke_callback_ = true; |
| 47 return; | 61 return; |
| 48 } | 62 } |
| 49 | 63 |
| (...skipping 21 matching lines...) Expand all Loading... |
| 71 } | 85 } |
| 72 | 86 |
| 73 // Called from |io_runner_| thus safe to touch |waiter_|. | 87 // Called from |io_runner_| thus safe to touch |waiter_|. |
| 74 void InvokeWaiterCallback() { | 88 void InvokeWaiterCallback() { |
| 75 MojoResult result = last_result_; | 89 MojoResult result = last_result_; |
| 76 last_result_ = MOJO_RESULT_INTERNAL; | 90 last_result_ = MOJO_RESULT_INTERNAL; |
| 77 if (waiter_) | 91 if (waiter_) |
| 78 waiter_->InvokeCallback(result); | 92 waiter_->InvokeCallback(result); |
| 79 } | 93 } |
| 80 | 94 |
| 95 void InvokeWaiterMessageCallback() { |
| 96 if (waiter_) { |
| 97 DCHECK(!message_body_.empty()); |
| 98 waiter_->InvokeMessageCallback( |
| 99 &message_body_[0], static_cast<uint32_t>(message_body_.size())); |
| 100 } |
| 101 |
| 102 message_body_.clear(); |
| 103 } |
| 104 |
| 81 // IOObserver implementation: | 105 // IOObserver implementation: |
| 82 | 106 |
| 83 void WillProcessIOEvent() override { | 107 void WillProcessIOEvent() override { |
| 84 DCHECK(!should_invoke_callback_); | 108 DCHECK(!should_invoke_callback_); |
| 85 DCHECK(!processing_); | 109 DCHECK(!processing_); |
| 86 processing_ = true; | 110 processing_ = true; |
| 87 } | 111 } |
| 88 | 112 |
| 89 void DidProcessIOEvent() override { | 113 void DidProcessIOEvent() override { |
| 90 DCHECK(processing_); | 114 DCHECK(processing_); |
| 91 | 115 |
| 92 // The zero |waiter_| indicates that |this| have lost the owner and can be | 116 // The zero |waiter_| indicates that |this| have lost the owner and can be |
| 93 // under destruction. So we cannot wrap it with a |scoped_refptr| anymore. | 117 // under destruction. So we cannot wrap it with a |scoped_refptr| anymore. |
| 94 if (!waiter_) { | 118 if (!waiter_) { |
| 95 should_invoke_callback_ = false; | 119 should_invoke_callback_ = should_invoke_message_callback_ = false; |
| 96 processing_ = false; | 120 processing_ = false; |
| 97 return; | 121 return; |
| 98 } | 122 } |
| 99 | 123 |
| 100 // We have to protect |this| because |AsyncHandleWaiter| can be | 124 // We have to protect |this| because |AsyncHandleWaiter| can be |
| 101 // deleted during the callback. | 125 // deleted during the callback. |
| 102 scoped_refptr<Context> protect(this); | 126 scoped_refptr<Context> protect(this); |
| 103 while (should_invoke_callback_) { | 127 while (should_invoke_callback_ || should_invoke_message_callback_) { |
| 104 should_invoke_callback_ = false; | 128 if (should_invoke_callback_) { |
| 105 InvokeWaiterCallback(); | 129 should_invoke_callback_ = false; |
| 130 InvokeWaiterCallback(); |
| 131 } |
| 132 |
| 133 if (should_invoke_message_callback_) { |
| 134 should_invoke_message_callback_ = false; |
| 135 InvokeWaiterMessageCallback(); |
| 136 } |
| 106 } | 137 } |
| 107 | 138 |
| 108 processing_ = false; | 139 processing_ = false; |
| 109 } | 140 } |
| 110 | 141 |
| 111 // Only |io_runner_| is accessed from arbitrary threads. Others are touched | 142 // Only |io_runner_| is accessed from arbitrary threads. Others are touched |
| 112 // only from the IO thread. | 143 // only from the IO thread. |
| 113 const scoped_refptr<base::TaskRunner> io_runner_; | 144 const scoped_refptr<base::TaskRunner> io_runner_; |
| 114 | 145 |
| 115 const base::WeakPtr<AsyncHandleWaiter> waiter_; | 146 const base::WeakPtr<AsyncHandleWaiter> waiter_; |
| 116 MojoResult last_result_; | 147 MojoResult last_result_; |
| 148 std::vector<unsigned char> message_body_; |
| 117 bool processing_; | 149 bool processing_; |
| 118 bool should_invoke_callback_; | 150 bool should_invoke_callback_; // XXX: Rename |
| 151 bool should_invoke_message_callback_; // XXX: Rename |
| 119 | 152 |
| 120 DISALLOW_COPY_AND_ASSIGN(Context); | 153 DISALLOW_COPY_AND_ASSIGN(Context); |
| 121 }; | 154 }; |
| 122 | 155 |
| 123 AsyncHandleWaiter::AsyncHandleWaiter(base::Callback<void(MojoResult)> callback) | 156 AsyncHandleWaiter::AsyncHandleWaiter(AsyncHandleWaiter::Delegate* delegate) |
| 124 : callback_(callback), | 157 : delegate_(delegate), weak_factory_(this) { |
| 125 weak_factory_(this) { | |
| 126 context_ = new Context(weak_factory_.GetWeakPtr()); | 158 context_ = new Context(weak_factory_.GetWeakPtr()); |
| 159 context_callback_ = base::Bind(&Context::HandleIsReady, context_); |
| 160 context_message_callback_ = |
| 161 base::Bind(&Context::MessageWasReceived, context_); |
| 127 } | 162 } |
| 128 | 163 |
| 129 AsyncHandleWaiter::~AsyncHandleWaiter() { | 164 AsyncHandleWaiter::~AsyncHandleWaiter() { |
| 130 } | 165 } |
| 131 | 166 |
| 167 void AsyncHandleWaiter::ClearMessageCallback(MojoHandle handle) { |
| 168 MojoResult rv = mojo::embedder::SetAsyncMessageCallback( |
| 169 handle, base::Callback<bool(const void*, uint32_t)>()); |
| 170 DCHECK_EQ(rv, MOJO_RESULT_OK); |
| 171 } |
| 172 |
| 173 void AsyncHandleWaiter::SetMessageCallback(MojoHandle handle) { |
| 174 MojoResult rv = mojo::embedder::SetAsyncMessageCallback( |
| 175 handle, context_message_callback_); |
| 176 DCHECK_EQ(rv, MOJO_RESULT_OK); |
| 177 } |
| 178 |
| 132 MojoResult AsyncHandleWaiter::Wait(MojoHandle handle, | 179 MojoResult AsyncHandleWaiter::Wait(MojoHandle handle, |
| 133 MojoHandleSignals signals) { | 180 MojoHandleSignals signals) { |
| 134 return mojo::embedder::AsyncWait( | 181 return mojo::embedder::AsyncWait(handle, signals, context_callback_); |
| 135 handle, signals, base::Bind(&Context::HandleIsReady, context_)); | |
| 136 } | 182 } |
| 137 | 183 |
| 138 void AsyncHandleWaiter::InvokeCallback(MojoResult result) { | 184 void AsyncHandleWaiter::InvokeCallback(MojoResult result) { |
| 139 callback_.Run(result); | 185 delegate_->PipeIsReady(result); |
| 186 } |
| 187 |
| 188 void AsyncHandleWaiter::InvokeMessageCallback(const void* bytes, |
| 189 uint32_t num_bytes) { |
| 190 delegate_->MessageWasArrived(bytes, num_bytes); |
| 140 } | 191 } |
| 141 | 192 |
| 142 // static | 193 // static |
| 143 void AsyncHandleWaiterContextTraits::Destruct( | 194 void AsyncHandleWaiterContextTraits::Destruct( |
| 144 const AsyncHandleWaiter::Context* context) { | 195 const AsyncHandleWaiter::Context* context) { |
| 145 context->io_runner_->PostTask( | 196 context->io_runner_->PostTask( |
| 146 FROM_HERE, | 197 FROM_HERE, |
| 147 base::Bind(&base::DeletePointer<const AsyncHandleWaiter::Context>, | 198 base::Bind(&base::DeletePointer<const AsyncHandleWaiter::Context>, |
| 148 base::Unretained(context))); | 199 base::Unretained(context))); |
| 149 } | 200 } |
| 150 | 201 |
| 151 } // namespace internal | 202 } // namespace internal |
| 152 } // namespace IPC | 203 } // namespace IPC |
| OLD | NEW |