Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(539)

Side by Side Diff: mojo/edk/system/message_pipe_dispatcher.cc

Issue 2725133002: Mojo: Armed Watchers (Closed)
Patch Set: . Created 3 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/edk/system/message_pipe_dispatcher.h" 5 #include "mojo/edk/system/message_pipe_dispatcher.h"
6 6
7 #include <limits> 7 #include <limits>
8 #include <memory> 8 #include <memory>
9 9
10 #include "base/logging.h" 10 #include "base/logging.h"
(...skipping 165 matching lines...) Expand 10 before | Expand all | Expand 10 after
176 bool MessagePipeDispatcher::Fuse(MessagePipeDispatcher* other) { 176 bool MessagePipeDispatcher::Fuse(MessagePipeDispatcher* other) {
177 node_controller_->SetPortObserver(port_, nullptr); 177 node_controller_->SetPortObserver(port_, nullptr);
178 node_controller_->SetPortObserver(other->port_, nullptr); 178 node_controller_->SetPortObserver(other->port_, nullptr);
179 179
180 ports::PortRef port0; 180 ports::PortRef port0;
181 { 181 {
182 base::AutoLock lock(signal_lock_); 182 base::AutoLock lock(signal_lock_);
183 port0 = port_; 183 port0 = port_;
184 port_closed_.Set(true); 184 port_closed_.Set(true);
185 awakables_.CancelAll(); 185 awakables_.CancelAll();
186 watchers_.NotifyClosed();
186 } 187 }
187 188
188 ports::PortRef port1; 189 ports::PortRef port1;
189 { 190 {
190 base::AutoLock lock(other->signal_lock_); 191 base::AutoLock lock(other->signal_lock_);
191 port1 = other->port_; 192 port1 = other->port_;
192 other->port_closed_.Set(true); 193 other->port_closed_.Set(true);
193 other->awakables_.CancelAll(); 194 other->awakables_.CancelAll();
195 other->watchers_.NotifyClosed();
194 } 196 }
195 197
196 // Both ports are always closed by this call. 198 // Both ports are always closed by this call.
197 int rv = node_controller_->MergeLocalPorts(port0, port1); 199 int rv = node_controller_->MergeLocalPorts(port0, port1);
198 return rv == ports::OK; 200 return rv == ports::OK;
199 } 201 }
200 202
201 Dispatcher::Type MessagePipeDispatcher::GetType() const { 203 Dispatcher::Type MessagePipeDispatcher::GetType() const {
202 return Type::MESSAGE_PIPE; 204 return Type::MESSAGE_PIPE;
203 } 205 }
204 206
205 MojoResult MessagePipeDispatcher::Close() { 207 MojoResult MessagePipeDispatcher::Close() {
206 base::AutoLock lock(signal_lock_); 208 base::AutoLock lock(signal_lock_);
207 DVLOG(2) << "Closing message pipe " << pipe_id_ << " endpoint " << endpoint_ 209 DVLOG(2) << "Closing message pipe " << pipe_id_ << " endpoint " << endpoint_
208 << " [port=" << port_.name() << "]"; 210 << " [port=" << port_.name() << "]";
209 return CloseNoLock(); 211 return CloseNoLock();
210 } 212 }
211 213
212 MojoResult MessagePipeDispatcher::Watch(MojoHandleSignals signals, 214 MojoResult MessagePipeDispatcher::RegisterWatcher(
213 const Watcher::WatchCallback& callback, 215 MojoHandleSignals signals,
214 uintptr_t context) { 216 const Watcher::WatchCallback& callback,
217 uintptr_t context) {
215 base::AutoLock lock(signal_lock_); 218 base::AutoLock lock(signal_lock_);
216 219
217 if (port_closed_ || in_transit_) 220 if (port_closed_ || in_transit_)
218 return MOJO_RESULT_INVALID_ARGUMENT; 221 return MOJO_RESULT_INVALID_ARGUMENT;
219 222
220 return awakables_.AddWatcher( 223 return watchers_.Add(signals, callback, context,
221 signals, callback, context, GetHandleSignalsStateNoLock()); 224 GetHandleSignalsStateNoLock());
222 } 225 }
223 226
224 MojoResult MessagePipeDispatcher::CancelWatch(uintptr_t context) { 227 MojoResult MessagePipeDispatcher::ArmWatcher(uintptr_t context) {
225 base::AutoLock lock(signal_lock_); 228 base::AutoLock lock(signal_lock_);
226 229
227 if (port_closed_ || in_transit_) 230 if (port_closed_ || in_transit_)
228 return MOJO_RESULT_INVALID_ARGUMENT; 231 return MOJO_RESULT_INVALID_ARGUMENT;
229 232
230 return awakables_.RemoveWatcher(context); 233 return watchers_.Arm(context, GetHandleSignalsStateNoLock());
234 }
235
236 MojoResult MessagePipeDispatcher::UnregisterWatcher(uintptr_t context) {
237 base::AutoLock lock(signal_lock_);
238
239 if (port_closed_ || in_transit_)
240 return MOJO_RESULT_INVALID_ARGUMENT;
241
242 return watchers_.Remove(context);
231 } 243 }
232 244
233 MojoResult MessagePipeDispatcher::WriteMessage( 245 MojoResult MessagePipeDispatcher::WriteMessage(
234 std::unique_ptr<MessageForTransit> message, 246 std::unique_ptr<MessageForTransit> message,
235 MojoWriteMessageFlags flags) { 247 MojoWriteMessageFlags flags) {
236 if (port_closed_ || in_transit_) 248 if (port_closed_ || in_transit_)
237 return MOJO_RESULT_INVALID_ARGUMENT; 249 return MOJO_RESULT_INVALID_ARGUMENT;
238 250
239 size_t num_bytes = message->num_bytes(); 251 size_t num_bytes = message->num_bytes();
240 int rv = node_controller_->SendMessage(port_, message->TakePortsMessage()); 252 int rv = node_controller_->SendMessage(port_, message->TakePortsMessage());
(...skipping 38 matching lines...) Expand 10 before | Expand all | Expand 10 after
279 // that it specifies a size at least as large as the next available payload. 291 // that it specifies a size at least as large as the next available payload.
280 // 292 //
281 // If |read_any_size| is true, the input value of |*num_bytes| is ignored. 293 // If |read_any_size| is true, the input value of |*num_bytes| is ignored.
282 // This flag exists to support both new and old API behavior. 294 // This flag exists to support both new and old API behavior.
283 295
284 ports::ScopedMessage ports_message; 296 ports::ScopedMessage ports_message;
285 ReadMessageFilter filter(read_any_size, may_discard, num_bytes, num_handles, 297 ReadMessageFilter filter(read_any_size, may_discard, num_bytes, num_handles,
286 &no_space, &invalid_message); 298 &no_space, &invalid_message);
287 int rv = node_controller_->node()->GetMessage(port_, &ports_message, &filter); 299 int rv = node_controller_->node()->GetMessage(port_, &ports_message, &filter);
288 300
301 {
302 base::AutoLock lock(signal_lock_);
303 watchers_.NotifyState(GetHandleSignalsStateNoLock());
304 }
305
289 if (invalid_message) 306 if (invalid_message)
290 return MOJO_RESULT_UNKNOWN; 307 return MOJO_RESULT_UNKNOWN;
291 308
292 if (rv != ports::OK && rv != ports::ERROR_PORT_PEER_CLOSED) { 309 if (rv != ports::OK && rv != ports::ERROR_PORT_PEER_CLOSED) {
293 if (rv == ports::ERROR_PORT_UNKNOWN || 310 if (rv == ports::ERROR_PORT_UNKNOWN ||
294 rv == ports::ERROR_PORT_STATE_UNEXPECTED) 311 rv == ports::ERROR_PORT_STATE_UNEXPECTED)
295 return MOJO_RESULT_INVALID_ARGUMENT; 312 return MOJO_RESULT_INVALID_ARGUMENT;
296 313
297 NOTREACHED(); 314 NOTREACHED();
298 return MOJO_RESULT_UNKNOWN; // TODO: Add a better error code here? 315 return MOJO_RESULT_UNKNOWN; // TODO: Add a better error code here?
(...skipping 190 matching lines...) Expand 10 before | Expand all | Expand 10 after
489 port_transferred_ = true; 506 port_transferred_ = true;
490 in_transit_.Set(false); 507 in_transit_.Set(false);
491 CloseNoLock(); 508 CloseNoLock();
492 } 509 }
493 510
494 void MessagePipeDispatcher::CancelTransit() { 511 void MessagePipeDispatcher::CancelTransit() {
495 base::AutoLock lock(signal_lock_); 512 base::AutoLock lock(signal_lock_);
496 in_transit_.Set(false); 513 in_transit_.Set(false);
497 514
498 // Something may have happened while we were waiting for potential transit. 515 // Something may have happened while we were waiting for potential transit.
499 awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock()); 516 HandleSignalsState state = GetHandleSignalsStateNoLock();
517 awakables_.AwakeForStateChange(state);
518 watchers_.NotifyState(state);
500 } 519 }
501 520
502 // static 521 // static
503 scoped_refptr<Dispatcher> MessagePipeDispatcher::Deserialize( 522 scoped_refptr<Dispatcher> MessagePipeDispatcher::Deserialize(
504 const void* data, 523 const void* data,
505 size_t num_bytes, 524 size_t num_bytes,
506 const ports::PortName* ports, 525 const ports::PortName* ports,
507 size_t num_ports, 526 size_t num_ports,
508 PlatformHandle* handles, 527 PlatformHandle* handles,
509 size_t num_handles) { 528 size_t num_handles) {
(...skipping 15 matching lines...) Expand all
525 DCHECK(port_closed_ && !in_transit_); 544 DCHECK(port_closed_ && !in_transit_);
526 } 545 }
527 546
528 MojoResult MessagePipeDispatcher::CloseNoLock() { 547 MojoResult MessagePipeDispatcher::CloseNoLock() {
529 signal_lock_.AssertAcquired(); 548 signal_lock_.AssertAcquired();
530 if (port_closed_ || in_transit_) 549 if (port_closed_ || in_transit_)
531 return MOJO_RESULT_INVALID_ARGUMENT; 550 return MOJO_RESULT_INVALID_ARGUMENT;
532 551
533 port_closed_.Set(true); 552 port_closed_.Set(true);
534 awakables_.CancelAll(); 553 awakables_.CancelAll();
554 watchers_.NotifyClosed();
535 555
536 if (!port_transferred_) { 556 if (!port_transferred_) {
537 base::AutoUnlock unlock(signal_lock_); 557 base::AutoUnlock unlock(signal_lock_);
538 node_controller_->ClosePort(port_); 558 node_controller_->ClosePort(port_);
539 } 559 }
540 560
541 return MOJO_RESULT_OK; 561 return MOJO_RESULT_OK;
542 } 562 }
543 563
544 HandleSignalsState MessagePipeDispatcher::GetHandleSignalsStateNoLock() const { 564 HandleSignalsState MessagePipeDispatcher::GetHandleSignalsStateNoLock() const {
(...skipping 44 matching lines...) Expand 10 before | Expand all | Expand 10 after
589 << " endpoint " << endpoint_ << " [port=" << port_.name() 609 << " endpoint " << endpoint_ << " [port=" << port_.name()
590 << "; size=" << filter.message_size() << "]"; 610 << "; size=" << filter.message_size() << "]";
591 } 611 }
592 if (port_status.peer_closed) { 612 if (port_status.peer_closed) {
593 DVLOG(2) << "Peer closure detected on message pipe " << pipe_id_ 613 DVLOG(2) << "Peer closure detected on message pipe " << pipe_id_
594 << " endpoint " << endpoint_ << " [port=" << port_.name() << "]"; 614 << " endpoint " << endpoint_ << " [port=" << port_.name() << "]";
595 } 615 }
596 } 616 }
597 #endif 617 #endif
598 618
599 awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock()); 619 HandleSignalsState state = GetHandleSignalsStateNoLock();
620 awakables_.AwakeForStateChange(state);
621 watchers_.NotifyState(state);
600 } 622 }
601 623
602 } // namespace edk 624 } // namespace edk
603 } // namespace mojo 625 } // namespace mojo
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698