OLD | NEW |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "mojo/edk/system/message_pipe_dispatcher.h" | 5 #include "mojo/edk/system/message_pipe_dispatcher.h" |
6 | 6 |
7 #include <limits> | 7 #include <limits> |
8 #include <memory> | 8 #include <memory> |
9 | 9 |
10 #include "base/logging.h" | 10 #include "base/logging.h" |
(...skipping 165 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
176 bool MessagePipeDispatcher::Fuse(MessagePipeDispatcher* other) { | 176 bool MessagePipeDispatcher::Fuse(MessagePipeDispatcher* other) { |
177 node_controller_->SetPortObserver(port_, nullptr); | 177 node_controller_->SetPortObserver(port_, nullptr); |
178 node_controller_->SetPortObserver(other->port_, nullptr); | 178 node_controller_->SetPortObserver(other->port_, nullptr); |
179 | 179 |
180 ports::PortRef port0; | 180 ports::PortRef port0; |
181 { | 181 { |
182 base::AutoLock lock(signal_lock_); | 182 base::AutoLock lock(signal_lock_); |
183 port0 = port_; | 183 port0 = port_; |
184 port_closed_.Set(true); | 184 port_closed_.Set(true); |
185 awakables_.CancelAll(); | 185 awakables_.CancelAll(); |
| 186 watchers_.NotifyClosed(); |
186 } | 187 } |
187 | 188 |
188 ports::PortRef port1; | 189 ports::PortRef port1; |
189 { | 190 { |
190 base::AutoLock lock(other->signal_lock_); | 191 base::AutoLock lock(other->signal_lock_); |
191 port1 = other->port_; | 192 port1 = other->port_; |
192 other->port_closed_.Set(true); | 193 other->port_closed_.Set(true); |
193 other->awakables_.CancelAll(); | 194 other->awakables_.CancelAll(); |
| 195 other->watchers_.NotifyClosed(); |
194 } | 196 } |
195 | 197 |
196 // Both ports are always closed by this call. | 198 // Both ports are always closed by this call. |
197 int rv = node_controller_->MergeLocalPorts(port0, port1); | 199 int rv = node_controller_->MergeLocalPorts(port0, port1); |
198 return rv == ports::OK; | 200 return rv == ports::OK; |
199 } | 201 } |
200 | 202 |
201 Dispatcher::Type MessagePipeDispatcher::GetType() const { | 203 Dispatcher::Type MessagePipeDispatcher::GetType() const { |
202 return Type::MESSAGE_PIPE; | 204 return Type::MESSAGE_PIPE; |
203 } | 205 } |
204 | 206 |
205 MojoResult MessagePipeDispatcher::Close() { | 207 MojoResult MessagePipeDispatcher::Close() { |
206 base::AutoLock lock(signal_lock_); | 208 base::AutoLock lock(signal_lock_); |
207 DVLOG(2) << "Closing message pipe " << pipe_id_ << " endpoint " << endpoint_ | 209 DVLOG(2) << "Closing message pipe " << pipe_id_ << " endpoint " << endpoint_ |
208 << " [port=" << port_.name() << "]"; | 210 << " [port=" << port_.name() << "]"; |
209 return CloseNoLock(); | 211 return CloseNoLock(); |
210 } | 212 } |
211 | 213 |
212 MojoResult MessagePipeDispatcher::Watch(MojoHandleSignals signals, | 214 MojoResult MessagePipeDispatcher::RegisterWatcher( |
213 const Watcher::WatchCallback& callback, | 215 MojoHandleSignals signals, |
214 uintptr_t context) { | 216 const Watcher::WatchCallback& callback, |
| 217 uintptr_t context) { |
215 base::AutoLock lock(signal_lock_); | 218 base::AutoLock lock(signal_lock_); |
216 | 219 |
217 if (port_closed_ || in_transit_) | 220 if (port_closed_ || in_transit_) |
218 return MOJO_RESULT_INVALID_ARGUMENT; | 221 return MOJO_RESULT_INVALID_ARGUMENT; |
219 | 222 |
220 return awakables_.AddWatcher( | 223 return watchers_.Add(signals, callback, context, |
221 signals, callback, context, GetHandleSignalsStateNoLock()); | 224 GetHandleSignalsStateNoLock()); |
222 } | 225 } |
223 | 226 |
224 MojoResult MessagePipeDispatcher::CancelWatch(uintptr_t context) { | 227 MojoResult MessagePipeDispatcher::ArmWatcher(uintptr_t context) { |
225 base::AutoLock lock(signal_lock_); | 228 base::AutoLock lock(signal_lock_); |
226 | 229 |
227 if (port_closed_ || in_transit_) | 230 if (port_closed_ || in_transit_) |
228 return MOJO_RESULT_INVALID_ARGUMENT; | 231 return MOJO_RESULT_INVALID_ARGUMENT; |
229 | 232 |
230 return awakables_.RemoveWatcher(context); | 233 return watchers_.Arm(context, GetHandleSignalsStateNoLock()); |
| 234 } |
| 235 |
| 236 MojoResult MessagePipeDispatcher::UnregisterWatcher(uintptr_t context) { |
| 237 base::AutoLock lock(signal_lock_); |
| 238 |
| 239 if (port_closed_ || in_transit_) |
| 240 return MOJO_RESULT_INVALID_ARGUMENT; |
| 241 |
| 242 return watchers_.Remove(context); |
231 } | 243 } |
232 | 244 |
233 MojoResult MessagePipeDispatcher::WriteMessage( | 245 MojoResult MessagePipeDispatcher::WriteMessage( |
234 std::unique_ptr<MessageForTransit> message, | 246 std::unique_ptr<MessageForTransit> message, |
235 MojoWriteMessageFlags flags) { | 247 MojoWriteMessageFlags flags) { |
236 if (port_closed_ || in_transit_) | 248 if (port_closed_ || in_transit_) |
237 return MOJO_RESULT_INVALID_ARGUMENT; | 249 return MOJO_RESULT_INVALID_ARGUMENT; |
238 | 250 |
239 size_t num_bytes = message->num_bytes(); | 251 size_t num_bytes = message->num_bytes(); |
240 int rv = node_controller_->SendMessage(port_, message->TakePortsMessage()); | 252 int rv = node_controller_->SendMessage(port_, message->TakePortsMessage()); |
(...skipping 38 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
279 // that it specifies a size at least as large as the next available payload. | 291 // that it specifies a size at least as large as the next available payload. |
280 // | 292 // |
281 // If |read_any_size| is true, the input value of |*num_bytes| is ignored. | 293 // If |read_any_size| is true, the input value of |*num_bytes| is ignored. |
282 // This flag exists to support both new and old API behavior. | 294 // This flag exists to support both new and old API behavior. |
283 | 295 |
284 ports::ScopedMessage ports_message; | 296 ports::ScopedMessage ports_message; |
285 ReadMessageFilter filter(read_any_size, may_discard, num_bytes, num_handles, | 297 ReadMessageFilter filter(read_any_size, may_discard, num_bytes, num_handles, |
286 &no_space, &invalid_message); | 298 &no_space, &invalid_message); |
287 int rv = node_controller_->node()->GetMessage(port_, &ports_message, &filter); | 299 int rv = node_controller_->node()->GetMessage(port_, &ports_message, &filter); |
288 | 300 |
| 301 { |
| 302 base::AutoLock lock(signal_lock_); |
| 303 watchers_.NotifyState(GetHandleSignalsStateNoLock()); |
| 304 } |
| 305 |
289 if (invalid_message) | 306 if (invalid_message) |
290 return MOJO_RESULT_UNKNOWN; | 307 return MOJO_RESULT_UNKNOWN; |
291 | 308 |
292 if (rv != ports::OK && rv != ports::ERROR_PORT_PEER_CLOSED) { | 309 if (rv != ports::OK && rv != ports::ERROR_PORT_PEER_CLOSED) { |
293 if (rv == ports::ERROR_PORT_UNKNOWN || | 310 if (rv == ports::ERROR_PORT_UNKNOWN || |
294 rv == ports::ERROR_PORT_STATE_UNEXPECTED) | 311 rv == ports::ERROR_PORT_STATE_UNEXPECTED) |
295 return MOJO_RESULT_INVALID_ARGUMENT; | 312 return MOJO_RESULT_INVALID_ARGUMENT; |
296 | 313 |
297 NOTREACHED(); | 314 NOTREACHED(); |
298 return MOJO_RESULT_UNKNOWN; // TODO: Add a better error code here? | 315 return MOJO_RESULT_UNKNOWN; // TODO: Add a better error code here? |
(...skipping 190 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
489 port_transferred_ = true; | 506 port_transferred_ = true; |
490 in_transit_.Set(false); | 507 in_transit_.Set(false); |
491 CloseNoLock(); | 508 CloseNoLock(); |
492 } | 509 } |
493 | 510 |
494 void MessagePipeDispatcher::CancelTransit() { | 511 void MessagePipeDispatcher::CancelTransit() { |
495 base::AutoLock lock(signal_lock_); | 512 base::AutoLock lock(signal_lock_); |
496 in_transit_.Set(false); | 513 in_transit_.Set(false); |
497 | 514 |
498 // Something may have happened while we were waiting for potential transit. | 515 // Something may have happened while we were waiting for potential transit. |
499 awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock()); | 516 HandleSignalsState state = GetHandleSignalsStateNoLock(); |
| 517 awakables_.AwakeForStateChange(state); |
| 518 watchers_.NotifyState(state); |
500 } | 519 } |
501 | 520 |
502 // static | 521 // static |
503 scoped_refptr<Dispatcher> MessagePipeDispatcher::Deserialize( | 522 scoped_refptr<Dispatcher> MessagePipeDispatcher::Deserialize( |
504 const void* data, | 523 const void* data, |
505 size_t num_bytes, | 524 size_t num_bytes, |
506 const ports::PortName* ports, | 525 const ports::PortName* ports, |
507 size_t num_ports, | 526 size_t num_ports, |
508 PlatformHandle* handles, | 527 PlatformHandle* handles, |
509 size_t num_handles) { | 528 size_t num_handles) { |
(...skipping 15 matching lines...) Expand all Loading... |
525 DCHECK(port_closed_ && !in_transit_); | 544 DCHECK(port_closed_ && !in_transit_); |
526 } | 545 } |
527 | 546 |
528 MojoResult MessagePipeDispatcher::CloseNoLock() { | 547 MojoResult MessagePipeDispatcher::CloseNoLock() { |
529 signal_lock_.AssertAcquired(); | 548 signal_lock_.AssertAcquired(); |
530 if (port_closed_ || in_transit_) | 549 if (port_closed_ || in_transit_) |
531 return MOJO_RESULT_INVALID_ARGUMENT; | 550 return MOJO_RESULT_INVALID_ARGUMENT; |
532 | 551 |
533 port_closed_.Set(true); | 552 port_closed_.Set(true); |
534 awakables_.CancelAll(); | 553 awakables_.CancelAll(); |
| 554 watchers_.NotifyClosed(); |
535 | 555 |
536 if (!port_transferred_) { | 556 if (!port_transferred_) { |
537 base::AutoUnlock unlock(signal_lock_); | 557 base::AutoUnlock unlock(signal_lock_); |
538 node_controller_->ClosePort(port_); | 558 node_controller_->ClosePort(port_); |
539 } | 559 } |
540 | 560 |
541 return MOJO_RESULT_OK; | 561 return MOJO_RESULT_OK; |
542 } | 562 } |
543 | 563 |
544 HandleSignalsState MessagePipeDispatcher::GetHandleSignalsStateNoLock() const { | 564 HandleSignalsState MessagePipeDispatcher::GetHandleSignalsStateNoLock() const { |
(...skipping 44 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
589 << " endpoint " << endpoint_ << " [port=" << port_.name() | 609 << " endpoint " << endpoint_ << " [port=" << port_.name() |
590 << "; size=" << filter.message_size() << "]"; | 610 << "; size=" << filter.message_size() << "]"; |
591 } | 611 } |
592 if (port_status.peer_closed) { | 612 if (port_status.peer_closed) { |
593 DVLOG(2) << "Peer closure detected on message pipe " << pipe_id_ | 613 DVLOG(2) << "Peer closure detected on message pipe " << pipe_id_ |
594 << " endpoint " << endpoint_ << " [port=" << port_.name() << "]"; | 614 << " endpoint " << endpoint_ << " [port=" << port_.name() << "]"; |
595 } | 615 } |
596 } | 616 } |
597 #endif | 617 #endif |
598 | 618 |
599 awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock()); | 619 HandleSignalsState state = GetHandleSignalsStateNoLock(); |
| 620 awakables_.AwakeForStateChange(state); |
| 621 watchers_.NotifyState(state); |
600 } | 622 } |
601 | 623 |
602 } // namespace edk | 624 } // namespace edk |
603 } // namespace mojo | 625 } // namespace mojo |
OLD | NEW |