Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(18)

Side by Side Diff: content/common/message_port.cc

Issue 2725133002: Mojo: Armed Watchers (Closed)
Patch Set: . Created 3 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2017 The Chromium Authors. All rights reserved. 1 // Copyright 2017 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "content/common/message_port.h" 5 #include "content/common/message_port.h"
6 6
7 #include "base/bind.h"
7 #include "base/logging.h" 8 #include "base/logging.h"
9 #include "base/threading/thread_task_runner_handle.h"
8 10
9 namespace content { 11 namespace content {
10 12
11 MessagePort::~MessagePort() { 13 MessagePort::~MessagePort() {
12 } 14 }
13 15
14 MessagePort::MessagePort() : state_(new State()) { 16 MessagePort::MessagePort() : state_(new State()) {
15 } 17 }
16 18
17 MessagePort::MessagePort(const MessagePort& other) : state_(other.state_) { 19 MessagePort::MessagePort(const MessagePort& other) : state_(other.state_) {
18 } 20 }
19 21
20 MessagePort& MessagePort::operator=(const MessagePort& other) { 22 MessagePort& MessagePort::operator=(const MessagePort& other) {
21 state_ = other.state_; 23 state_ = other.state_;
22 return *this; 24 return *this;
23 } 25 }
24 26
25 MessagePort::MessagePort(mojo::ScopedMessagePipeHandle handle) 27 MessagePort::MessagePort(mojo::ScopedMessagePipeHandle handle)
26 : state_(new State(std::move(handle))) { 28 : state_(new State(std::move(handle))) {
27 } 29 }
28 30
29 const mojo::ScopedMessagePipeHandle& MessagePort::GetHandle() const { 31 const mojo::ScopedMessagePipeHandle& MessagePort::GetHandle() const {
30 return state_->handle_; 32 return state_->handle_;
31 } 33 }
32 34
33 mojo::ScopedMessagePipeHandle MessagePort::ReleaseHandle() const { 35 mojo::ScopedMessagePipeHandle MessagePort::ReleaseHandle() const {
34 state_->CancelWatch(); 36 state_->UnregisterWatcher();
35 return std::move(state_->handle_); 37 return std::move(state_->handle_);
36 } 38 }
37 39
38 // static 40 // static
39 std::vector<mojo::ScopedMessagePipeHandle> MessagePort::ReleaseHandles( 41 std::vector<mojo::ScopedMessagePipeHandle> MessagePort::ReleaseHandles(
40 const std::vector<MessagePort>& ports) { 42 const std::vector<MessagePort>& ports) {
41 std::vector<mojo::ScopedMessagePipeHandle> handles(ports.size()); 43 std::vector<mojo::ScopedMessagePipeHandle> handles(ports.size());
42 for (size_t i = 0; i < ports.size(); ++i) 44 for (size_t i = 0; i < ports.size(); ++i)
43 handles[i] = ports[i].ReleaseHandle(); 45 handles[i] = ports[i].ReleaseHandle();
44 return handles; 46 return handles;
(...skipping 74 matching lines...) Expand 10 before | Expand all | Expand 10 after
119 ports->resize(static_cast<size_t>(num_handles)); 121 ports->resize(static_cast<size_t>(num_handles));
120 for (uint32_t i = 0; i < num_handles; ++i) { 122 for (uint32_t i = 0; i < num_handles; ++i) {
121 ports->at(i) = MessagePort( 123 ports->at(i) = MessagePort(
122 mojo::ScopedMessagePipeHandle(mojo::MessagePipeHandle(handles[i]))); 124 mojo::ScopedMessagePipeHandle(mojo::MessagePipeHandle(handles[i])));
123 } 125 }
124 } 126 }
125 return true; 127 return true;
126 } 128 }
127 129
128 void MessagePort::SetCallback(const base::Closure& callback) { 130 void MessagePort::SetCallback(const base::Closure& callback) {
129 state_->CancelWatch(); 131 state_->UnregisterWatcher();
130 state_->callback_ = callback; 132 state_->callback_ = callback;
131 state_->AddWatch(); 133 state_->RegisterWatcher();
132 } 134 }
133 135
134 void MessagePort::ClearCallback() { 136 void MessagePort::ClearCallback() {
135 state_->CancelWatch(); 137 state_->UnregisterWatcher();
136 state_->callback_.Reset(); 138 state_->callback_.Reset();
137 } 139 }
138 140
139 MessagePort::State::State() { 141 MessagePort::State::State() {
140 } 142 }
141 143
142 MessagePort::State::State(mojo::ScopedMessagePipeHandle handle) 144 MessagePort::State::State(mojo::ScopedMessagePipeHandle handle)
143 : handle_(std::move(handle)) { 145 : handle_(std::move(handle)) {
144 } 146 }
145 147
146 void MessagePort::State::AddWatch() { 148 void MessagePort::State::RegisterWatcher() {
147 if (!callback_) 149 if (!callback_)
148 return; 150 return;
149 151
150 // NOTE: An HTML MessagePort does not receive an event to tell it when the 152 // NOTE: An HTML MessagePort does not receive an event to tell it when the
151 // peer has gone away, so we only watch for readability here. 153 // peer has gone away, so we only watch for readability here.
152 MojoResult rv = MojoWatch(handle_.get().value(), 154 MojoResult rv =
153 MOJO_HANDLE_SIGNAL_READABLE, 155 MojoRegisterWatcher(handle_.get().value(), MOJO_HANDLE_SIGNAL_READABLE,
154 &MessagePort::State::OnHandleReady, 156 &MessagePort::State::CallOnHandleReady,
155 reinterpret_cast<uintptr_t>(this)); 157 reinterpret_cast<uintptr_t>(this));
156 if (rv != MOJO_RESULT_OK) 158 if (rv != MOJO_RESULT_OK) {
157 DVLOG(1) << this << " MojoWatch failed: " << rv; 159 DVLOG(1) << this << " MojoRegisterWatcher failed: " << rv;
160 return;
161 }
162
163 ArmWatcher();
158 } 164 }
159 165
160 void MessagePort::State::CancelWatch() { 166 void MessagePort::State::UnregisterWatcher() {
161 if (!callback_) 167 if (!callback_)
162 return; 168 return;
163 169
164 // NOTE: This synchronizes with the thread where OnHandleReady runs so we are 170 // NOTE: This synchronizes with the thread where OnHandleReady runs so we are
165 // sure to not be racing with it. 171 // sure to not be racing with it.
166 MojoCancelWatch(handle_.get().value(), reinterpret_cast<uintptr_t>(this)); 172 MojoUnregisterWatcher(handle_.get().value(),
173 reinterpret_cast<uintptr_t>(this));
167 } 174 }
168 175
169 // static 176 MessagePort::State::~State() {
170 void MessagePort::State::OnHandleReady( 177 UnregisterWatcher();
171 uintptr_t context, 178 }
172 MojoResult result, 179
173 MojoHandleSignalsState signals_state, 180 void MessagePort::State::ArmWatcher() {
174 MojoWatchNotificationFlags flags) { 181 if (!callback_)
182 return;
183
184 MojoResult rv =
185 MojoArmWatcher(handle_.get().value(), reinterpret_cast<uintptr_t>(this));
186 if (rv == MOJO_RESULT_OK)
187 return;
188
189 if (rv == MOJO_RESULT_ALREADY_EXISTS) {
190 // The handle is already signalled, so we trigger a callback immediately.
191 base::ThreadTaskRunnerHandle::Get()->PostTask(
192 FROM_HERE, base::Bind(&State::OnHandleReady, this, MOJO_RESULT_OK));
193 return;
194 }
195
196 if (rv == MOJO_RESULT_FAILED_PRECONDITION) {
197 DVLOG(1) << this << " MojoArmWatcher failed because of a broken pipe.";
198 return;
199 }
200
201 NOTREACHED();
202 }
203
204 void MessagePort::State::OnHandleReady(MojoResult result) {
175 if (result == MOJO_RESULT_OK) { 205 if (result == MOJO_RESULT_OK) {
176 reinterpret_cast<MessagePort::State*>(context)->callback_.Run(); 206 callback_.Run();
207 ArmWatcher();
177 } else { 208 } else {
178 // And now his watch is ended. 209 // And now his watch is ended.
179 } 210 }
180 } 211 }
181 212
182 MessagePort::State::~State() { 213 // static
183 CancelWatch(); 214 void MessagePort::State::CallOnHandleReady(uintptr_t context,
215 MojoResult result,
216 MojoHandleSignalsState signals_state,
217 MojoWatchNotificationFlags flags) {
218 reinterpret_cast<State*>(context)->OnHandleReady(result);
184 } 219 }
185 220
186 } // namespace content 221 } // namespace content
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698