OLD | NEW |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "ipc/mojo/async_handle_waiter.h" | 5 #include "ipc/mojo/async_handle_waiter.h" |
6 | 6 |
7 #include "base/atomic_ref_count.h" | 7 #include "base/atomic_ref_count.h" |
8 #include "base/bind.h" | 8 #include "base/bind.h" |
9 #include "base/bind_helpers.h" | 9 #include "base/bind_helpers.h" |
10 #include "base/location.h" | 10 #include "base/location.h" |
(...skipping 15 matching lines...) Expand all Loading... |
26 class AsyncHandleWaiter::Context | 26 class AsyncHandleWaiter::Context |
27 : public base::RefCountedThreadSafe<AsyncHandleWaiter::Context, | 27 : public base::RefCountedThreadSafe<AsyncHandleWaiter::Context, |
28 AsyncHandleWaiterContextTraits>, | 28 AsyncHandleWaiterContextTraits>, |
29 public base::MessageLoopForIO::IOObserver { | 29 public base::MessageLoopForIO::IOObserver { |
30 public: | 30 public: |
31 Context(base::WeakPtr<AsyncHandleWaiter> waiter) | 31 Context(base::WeakPtr<AsyncHandleWaiter> waiter) |
32 : io_runner_(base::MessageLoopForIO::current()->task_runner()), | 32 : io_runner_(base::MessageLoopForIO::current()->task_runner()), |
33 waiter_(waiter), | 33 waiter_(waiter), |
34 last_result_(MOJO_RESULT_INTERNAL), | 34 last_result_(MOJO_RESULT_INTERNAL), |
35 processing_(false), | 35 processing_(false), |
36 should_invoke_callback_(false) { | 36 should_invoke_callback_(false), |
| 37 should_invoke_message_callback_(false) { |
37 base::MessageLoopForIO::current()->AddIOObserver(this); | 38 base::MessageLoopForIO::current()->AddIOObserver(this); |
38 } | 39 } |
39 | 40 |
| 41 bool MessageWasReceived(const void* bytes, uint32_t num_bytes) { |
| 42 DCHECK(IsCalledFromIOHandler()); |
| 43 |
| 44 // For simplicity, we don't steal more than one message at a time. |
| 45 if (!message_body_.empty()) |
| 46 return false; |
| 47 |
| 48 should_invoke_message_callback_ = true; |
| 49 message_body_.resize(num_bytes); |
| 50 memcpy(&message_body_[0], bytes, num_bytes); |
| 51 return true; |
| 52 } |
| 53 |
40 void HandleIsReady(MojoResult result) { | 54 void HandleIsReady(MojoResult result) { |
41 last_result_ = result; | 55 last_result_ = result; |
42 | 56 |
43 // If the signaling happens in the IO handler, use |IOObserver| callback | 57 // If the signaling happens in the IO handler, use |IOObserver| callback |
44 // to invoke the callback. | 58 // to invoke the callback. |
45 if (IsCalledFromIOHandler()) { | 59 if (IsCalledFromIOHandler()) { |
46 should_invoke_callback_ = true; | 60 should_invoke_callback_ = true; |
47 return; | 61 return; |
48 } | 62 } |
49 | 63 |
(...skipping 21 matching lines...) Expand all Loading... |
71 } | 85 } |
72 | 86 |
73 // Called from |io_runner_| thus safe to touch |waiter_|. | 87 // Called from |io_runner_| thus safe to touch |waiter_|. |
74 void InvokeWaiterCallback() { | 88 void InvokeWaiterCallback() { |
75 MojoResult result = last_result_; | 89 MojoResult result = last_result_; |
76 last_result_ = MOJO_RESULT_INTERNAL; | 90 last_result_ = MOJO_RESULT_INTERNAL; |
77 if (waiter_) | 91 if (waiter_) |
78 waiter_->InvokeCallback(result); | 92 waiter_->InvokeCallback(result); |
79 } | 93 } |
80 | 94 |
| 95 void InvokeWaiterMessageCallback() { |
| 96 if (waiter_) { |
| 97 DCHECK(!message_body_.empty()); |
| 98 waiter_->InvokeMessageCallback( |
| 99 &message_body_[0], static_cast<uint32_t>(message_body_.size())); |
| 100 } |
| 101 |
| 102 message_body_.clear(); |
| 103 } |
| 104 |
81 // IOObserver implementation: | 105 // IOObserver implementation: |
82 | 106 |
83 void WillProcessIOEvent() override { | 107 void WillProcessIOEvent() override { |
84 DCHECK(!should_invoke_callback_); | 108 DCHECK(!should_invoke_callback_); |
85 DCHECK(!processing_); | 109 DCHECK(!processing_); |
86 processing_ = true; | 110 processing_ = true; |
87 } | 111 } |
88 | 112 |
89 void DidProcessIOEvent() override { | 113 void DidProcessIOEvent() override { |
90 DCHECK(processing_); | 114 DCHECK(processing_); |
91 | 115 |
92 // The zero |waiter_| indicates that |this| have lost the owner and can be | 116 // The zero |waiter_| indicates that |this| have lost the owner and can be |
93 // under destruction. So we cannot wrap it with a |scoped_refptr| anymore. | 117 // under destruction. So we cannot wrap it with a |scoped_refptr| anymore. |
94 if (!waiter_) { | 118 if (!waiter_) { |
95 should_invoke_callback_ = false; | 119 should_invoke_callback_ = should_invoke_message_callback_ = false; |
96 processing_ = false; | 120 processing_ = false; |
97 return; | 121 return; |
98 } | 122 } |
99 | 123 |
100 // We have to protect |this| because |AsyncHandleWaiter| can be | 124 // We have to protect |this| because |AsyncHandleWaiter| can be |
101 // deleted during the callback. | 125 // deleted during the callback. |
102 scoped_refptr<Context> protect(this); | 126 scoped_refptr<Context> protect(this); |
103 while (should_invoke_callback_) { | 127 while (should_invoke_callback_ || should_invoke_message_callback_) { |
104 should_invoke_callback_ = false; | 128 if (should_invoke_callback_) { |
105 InvokeWaiterCallback(); | 129 should_invoke_callback_ = false; |
| 130 InvokeWaiterCallback(); |
| 131 } |
| 132 |
| 133 if (should_invoke_message_callback_) { |
| 134 should_invoke_message_callback_ = false; |
| 135 InvokeWaiterMessageCallback(); |
| 136 } |
106 } | 137 } |
107 | 138 |
108 processing_ = false; | 139 processing_ = false; |
109 } | 140 } |
110 | 141 |
111 // Only |io_runner_| is accessed from arbitrary threads. Others are touched | 142 // Only |io_runner_| is accessed from arbitrary threads. Others are touched |
112 // only from the IO thread. | 143 // only from the IO thread. |
113 const scoped_refptr<base::TaskRunner> io_runner_; | 144 const scoped_refptr<base::TaskRunner> io_runner_; |
114 | 145 |
115 const base::WeakPtr<AsyncHandleWaiter> waiter_; | 146 const base::WeakPtr<AsyncHandleWaiter> waiter_; |
116 MojoResult last_result_; | 147 MojoResult last_result_; |
| 148 std::vector<unsigned char> message_body_; |
117 bool processing_; | 149 bool processing_; |
118 bool should_invoke_callback_; | 150 bool should_invoke_callback_; // XXX: Rename |
| 151 bool should_invoke_message_callback_; // XXX: Rename |
119 | 152 |
120 DISALLOW_COPY_AND_ASSIGN(Context); | 153 DISALLOW_COPY_AND_ASSIGN(Context); |
121 }; | 154 }; |
122 | 155 |
123 AsyncHandleWaiter::AsyncHandleWaiter(base::Callback<void(MojoResult)> callback) | 156 AsyncHandleWaiter::AsyncHandleWaiter(AsyncHandleWaiter::Delegate* delegate) |
124 : callback_(callback), | 157 : delegate_(delegate), weak_factory_(this) { |
125 weak_factory_(this) { | |
126 context_ = new Context(weak_factory_.GetWeakPtr()); | 158 context_ = new Context(weak_factory_.GetWeakPtr()); |
| 159 context_callback_ = base::Bind(&Context::HandleIsReady, context_); |
| 160 context_message_callback_ = |
| 161 base::Bind(&Context::MessageWasReceived, context_); |
127 } | 162 } |
128 | 163 |
129 AsyncHandleWaiter::~AsyncHandleWaiter() { | 164 AsyncHandleWaiter::~AsyncHandleWaiter() { |
130 } | 165 } |
131 | 166 |
| 167 void AsyncHandleWaiter::ClearMessageCallback(MojoHandle handle) { |
| 168 MojoResult rv = mojo::embedder::SetAsyncMessageCallback( |
| 169 handle, base::Callback<bool(const void*, uint32_t)>()); |
| 170 DCHECK_EQ(rv, MOJO_RESULT_OK); |
| 171 } |
| 172 |
| 173 void AsyncHandleWaiter::SetMessageCallback(MojoHandle handle) { |
| 174 MojoResult rv = mojo::embedder::SetAsyncMessageCallback( |
| 175 handle, context_message_callback_); |
| 176 DCHECK_EQ(rv, MOJO_RESULT_OK); |
| 177 } |
| 178 |
132 MojoResult AsyncHandleWaiter::Wait(MojoHandle handle, | 179 MojoResult AsyncHandleWaiter::Wait(MojoHandle handle, |
133 MojoHandleSignals signals) { | 180 MojoHandleSignals signals) { |
134 return mojo::embedder::AsyncWait( | 181 return mojo::embedder::AsyncWait(handle, signals, context_callback_); |
135 handle, signals, base::Bind(&Context::HandleIsReady, context_)); | |
136 } | 182 } |
137 | 183 |
138 void AsyncHandleWaiter::InvokeCallback(MojoResult result) { | 184 void AsyncHandleWaiter::InvokeCallback(MojoResult result) { |
139 callback_.Run(result); | 185 delegate_->PipeIsReady(result); |
| 186 } |
| 187 |
| 188 void AsyncHandleWaiter::InvokeMessageCallback(const void* bytes, |
| 189 uint32_t num_bytes) { |
| 190 delegate_->MessageWasArrived(bytes, num_bytes); |
140 } | 191 } |
141 | 192 |
142 // static | 193 // static |
143 void AsyncHandleWaiterContextTraits::Destruct( | 194 void AsyncHandleWaiterContextTraits::Destruct( |
144 const AsyncHandleWaiter::Context* context) { | 195 const AsyncHandleWaiter::Context* context) { |
145 context->io_runner_->PostTask( | 196 context->io_runner_->PostTask( |
146 FROM_HERE, | 197 FROM_HERE, |
147 base::Bind(&base::DeletePointer<const AsyncHandleWaiter::Context>, | 198 base::Bind(&base::DeletePointer<const AsyncHandleWaiter::Context>, |
148 base::Unretained(context))); | 199 base::Unretained(context))); |
149 } | 200 } |
150 | 201 |
151 } // namespace internal | 202 } // namespace internal |
152 } // namespace IPC | 203 } // namespace IPC |
OLD | NEW |