| OLD | NEW |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "mojo/edk/system/message_pipe_dispatcher.h" | 5 #include "mojo/edk/system/message_pipe_dispatcher.h" |
| 6 | 6 |
| 7 #include <limits> | 7 #include <limits> |
| 8 #include <memory> | 8 #include <memory> |
| 9 | 9 |
| 10 #include "base/logging.h" | 10 #include "base/logging.h" |
| (...skipping 146 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 157 | 157 |
| 158 #endif // DCHECK_IS_ON() | 158 #endif // DCHECK_IS_ON() |
| 159 | 159 |
| 160 MessagePipeDispatcher::MessagePipeDispatcher(NodeController* node_controller, | 160 MessagePipeDispatcher::MessagePipeDispatcher(NodeController* node_controller, |
| 161 const ports::PortRef& port, | 161 const ports::PortRef& port, |
| 162 uint64_t pipe_id, | 162 uint64_t pipe_id, |
| 163 int endpoint) | 163 int endpoint) |
| 164 : node_controller_(node_controller), | 164 : node_controller_(node_controller), |
| 165 port_(port), | 165 port_(port), |
| 166 pipe_id_(pipe_id), | 166 pipe_id_(pipe_id), |
| 167 endpoint_(endpoint) { | 167 endpoint_(endpoint), |
| 168 watchers_(this) { |
| 168 DVLOG(2) << "Creating new MessagePipeDispatcher for port " << port.name() | 169 DVLOG(2) << "Creating new MessagePipeDispatcher for port " << port.name() |
| 169 << " [pipe_id=" << pipe_id << "; endpoint=" << endpoint << "]"; | 170 << " [pipe_id=" << pipe_id << "; endpoint=" << endpoint << "]"; |
| 170 | 171 |
| 171 node_controller_->SetPortObserver( | 172 node_controller_->SetPortObserver( |
| 172 port_, | 173 port_, |
| 173 make_scoped_refptr(new PortObserverThunk(this))); | 174 make_scoped_refptr(new PortObserverThunk(this))); |
| 174 } | 175 } |
| 175 | 176 |
| 176 bool MessagePipeDispatcher::Fuse(MessagePipeDispatcher* other) { | 177 bool MessagePipeDispatcher::Fuse(MessagePipeDispatcher* other) { |
| 177 node_controller_->SetPortObserver(port_, nullptr); | 178 node_controller_->SetPortObserver(port_, nullptr); |
| 178 node_controller_->SetPortObserver(other->port_, nullptr); | 179 node_controller_->SetPortObserver(other->port_, nullptr); |
| 179 | 180 |
| 180 ports::PortRef port0; | 181 ports::PortRef port0; |
| 181 { | 182 { |
| 182 base::AutoLock lock(signal_lock_); | 183 base::AutoLock lock(signal_lock_); |
| 183 port0 = port_; | 184 port0 = port_; |
| 184 port_closed_.Set(true); | 185 port_closed_.Set(true); |
| 185 awakables_.CancelAll(); | 186 awakables_.CancelAll(); |
| 187 watchers_.NotifyClosed(); |
| 186 } | 188 } |
| 187 | 189 |
| 188 ports::PortRef port1; | 190 ports::PortRef port1; |
| 189 { | 191 { |
| 190 base::AutoLock lock(other->signal_lock_); | 192 base::AutoLock lock(other->signal_lock_); |
| 191 port1 = other->port_; | 193 port1 = other->port_; |
| 192 other->port_closed_.Set(true); | 194 other->port_closed_.Set(true); |
| 193 other->awakables_.CancelAll(); | 195 other->awakables_.CancelAll(); |
| 196 other->watchers_.NotifyClosed(); |
| 194 } | 197 } |
| 195 | 198 |
| 196 // Both ports are always closed by this call. | 199 // Both ports are always closed by this call. |
| 197 int rv = node_controller_->MergeLocalPorts(port0, port1); | 200 int rv = node_controller_->MergeLocalPorts(port0, port1); |
| 198 return rv == ports::OK; | 201 return rv == ports::OK; |
| 199 } | 202 } |
| 200 | 203 |
| 201 Dispatcher::Type MessagePipeDispatcher::GetType() const { | 204 Dispatcher::Type MessagePipeDispatcher::GetType() const { |
| 202 return Type::MESSAGE_PIPE; | 205 return Type::MESSAGE_PIPE; |
| 203 } | 206 } |
| 204 | 207 |
| 205 MojoResult MessagePipeDispatcher::Close() { | 208 MojoResult MessagePipeDispatcher::Close() { |
| 206 base::AutoLock lock(signal_lock_); | 209 base::AutoLock lock(signal_lock_); |
| 207 DVLOG(2) << "Closing message pipe " << pipe_id_ << " endpoint " << endpoint_ | 210 DVLOG(2) << "Closing message pipe " << pipe_id_ << " endpoint " << endpoint_ |
| 208 << " [port=" << port_.name() << "]"; | 211 << " [port=" << port_.name() << "]"; |
| 209 return CloseNoLock(); | 212 return CloseNoLock(); |
| 210 } | 213 } |
| 211 | 214 |
| 212 MojoResult MessagePipeDispatcher::Watch(MojoHandleSignals signals, | |
| 213 const Watcher::WatchCallback& callback, | |
| 214 uintptr_t context) { | |
| 215 base::AutoLock lock(signal_lock_); | |
| 216 | |
| 217 if (port_closed_ || in_transit_) | |
| 218 return MOJO_RESULT_INVALID_ARGUMENT; | |
| 219 | |
| 220 return awakables_.AddWatcher( | |
| 221 signals, callback, context, GetHandleSignalsStateNoLock()); | |
| 222 } | |
| 223 | |
| 224 MojoResult MessagePipeDispatcher::CancelWatch(uintptr_t context) { | |
| 225 base::AutoLock lock(signal_lock_); | |
| 226 | |
| 227 if (port_closed_ || in_transit_) | |
| 228 return MOJO_RESULT_INVALID_ARGUMENT; | |
| 229 | |
| 230 return awakables_.RemoveWatcher(context); | |
| 231 } | |
| 232 | |
| 233 MojoResult MessagePipeDispatcher::WriteMessage( | 215 MojoResult MessagePipeDispatcher::WriteMessage( |
| 234 std::unique_ptr<MessageForTransit> message, | 216 std::unique_ptr<MessageForTransit> message, |
| 235 MojoWriteMessageFlags flags) { | 217 MojoWriteMessageFlags flags) { |
| 236 if (port_closed_ || in_transit_) | 218 if (port_closed_ || in_transit_) |
| 237 return MOJO_RESULT_INVALID_ARGUMENT; | 219 return MOJO_RESULT_INVALID_ARGUMENT; |
| 238 | 220 |
| 239 size_t num_bytes = message->num_bytes(); | 221 size_t num_bytes = message->num_bytes(); |
| 240 int rv = node_controller_->SendMessage(port_, message->TakePortsMessage()); | 222 int rv = node_controller_->SendMessage(port_, message->TakePortsMessage()); |
| 241 | 223 |
| 242 DVLOG(4) << "Sent message on pipe " << pipe_id_ << " endpoint " << endpoint_ | 224 DVLOG(4) << "Sent message on pipe " << pipe_id_ << " endpoint " << endpoint_ |
| (...skipping 49 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 292 if (rv != ports::OK && rv != ports::ERROR_PORT_PEER_CLOSED) { | 274 if (rv != ports::OK && rv != ports::ERROR_PORT_PEER_CLOSED) { |
| 293 if (rv == ports::ERROR_PORT_UNKNOWN || | 275 if (rv == ports::ERROR_PORT_UNKNOWN || |
| 294 rv == ports::ERROR_PORT_STATE_UNEXPECTED) | 276 rv == ports::ERROR_PORT_STATE_UNEXPECTED) |
| 295 return MOJO_RESULT_INVALID_ARGUMENT; | 277 return MOJO_RESULT_INVALID_ARGUMENT; |
| 296 | 278 |
| 297 NOTREACHED(); | 279 NOTREACHED(); |
| 298 return MOJO_RESULT_UNKNOWN; // TODO: Add a better error code here? | 280 return MOJO_RESULT_UNKNOWN; // TODO: Add a better error code here? |
| 299 } | 281 } |
| 300 | 282 |
| 301 if (no_space) { | 283 if (no_space) { |
| 284 if (may_discard) { |
| 285 // May have been the last message on the pipe. Need to update signals just |
| 286 // in case. |
| 287 base::AutoLock lock(signal_lock_); |
| 288 watchers_.NotifyState(GetHandleSignalsStateNoLock()); |
| 289 } |
| 302 // |*num_handles| (and/or |*num_bytes| if |read_any_size| is false) wasn't | 290 // |*num_handles| (and/or |*num_bytes| if |read_any_size| is false) wasn't |
| 303 // sufficient to hold this message's data. The message will still be in | 291 // sufficient to hold this message's data. The message will still be in |
| 304 // queue unless MOJO_READ_MESSAGE_FLAG_MAY_DISCARD was set. | 292 // queue unless MOJO_READ_MESSAGE_FLAG_MAY_DISCARD was set. |
| 305 return MOJO_RESULT_RESOURCE_EXHAUSTED; | 293 return MOJO_RESULT_RESOURCE_EXHAUSTED; |
| 306 } | 294 } |
| 307 | 295 |
| 308 if (!ports_message) { | 296 if (!ports_message) { |
| 309 // No message was available in queue. | 297 // No message was available in queue. |
| 310 | 298 |
| 311 if (rv == ports::OK) | 299 if (rv == ports::OK) |
| 312 return MOJO_RESULT_SHOULD_WAIT; | 300 return MOJO_RESULT_SHOULD_WAIT; |
| 313 | 301 |
| 314 // Peer is closed and there are no more messages to read. | 302 // Peer is closed and there are no more messages to read. |
| 315 DCHECK_EQ(rv, ports::ERROR_PORT_PEER_CLOSED); | 303 DCHECK_EQ(rv, ports::ERROR_PORT_PEER_CLOSED); |
| 316 return MOJO_RESULT_FAILED_PRECONDITION; | 304 return MOJO_RESULT_FAILED_PRECONDITION; |
| 317 } | 305 } |
| 318 | 306 |
| 319 // Alright! We have a message and the caller has provided sufficient storage | 307 // Alright! We have a message and the caller has provided sufficient storage |
| 320 // in which to receive it. | 308 // in which to receive it. |
| 321 | 309 |
| 310 { |
| 311 // We need to update anyone watching our signals in case that was the last |
| 312 // available message. |
| 313 base::AutoLock lock(signal_lock_); |
| 314 watchers_.NotifyState(GetHandleSignalsStateNoLock()); |
| 315 } |
| 316 |
| 322 std::unique_ptr<PortsMessage> msg( | 317 std::unique_ptr<PortsMessage> msg( |
| 323 static_cast<PortsMessage*>(ports_message.release())); | 318 static_cast<PortsMessage*>(ports_message.release())); |
| 324 | 319 |
| 325 const MessageHeader* header = | 320 const MessageHeader* header = |
| 326 static_cast<const MessageHeader*>(msg->payload_bytes()); | 321 static_cast<const MessageHeader*>(msg->payload_bytes()); |
| 327 const DispatcherHeader* dispatcher_headers = | 322 const DispatcherHeader* dispatcher_headers = |
| 328 reinterpret_cast<const DispatcherHeader*>(header + 1); | 323 reinterpret_cast<const DispatcherHeader*>(header + 1); |
| 329 | 324 |
| 330 if (header->num_dispatchers > std::numeric_limits<uint16_t>::max()) | 325 if (header->num_dispatchers > std::numeric_limits<uint16_t>::max()) |
| 331 return MOJO_RESULT_UNKNOWN; | 326 return MOJO_RESULT_UNKNOWN; |
| (...skipping 57 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 389 *message = MessageForTransit::WrapPortsMessage(std::move(msg)); | 384 *message = MessageForTransit::WrapPortsMessage(std::move(msg)); |
| 390 return MOJO_RESULT_OK; | 385 return MOJO_RESULT_OK; |
| 391 } | 386 } |
| 392 | 387 |
| 393 HandleSignalsState | 388 HandleSignalsState |
| 394 MessagePipeDispatcher::GetHandleSignalsState() const { | 389 MessagePipeDispatcher::GetHandleSignalsState() const { |
| 395 base::AutoLock lock(signal_lock_); | 390 base::AutoLock lock(signal_lock_); |
| 396 return GetHandleSignalsStateNoLock(); | 391 return GetHandleSignalsStateNoLock(); |
| 397 } | 392 } |
| 398 | 393 |
| 394 MojoResult MessagePipeDispatcher::AddWatcherRef( |
| 395 const scoped_refptr<WatcherDispatcher>& watcher, |
| 396 uintptr_t context) { |
| 397 base::AutoLock lock(signal_lock_); |
| 398 if (port_closed_ || in_transit_) |
| 399 return MOJO_RESULT_INVALID_ARGUMENT; |
| 400 return watchers_.Add(watcher, context, GetHandleSignalsStateNoLock()); |
| 401 } |
| 402 |
| 403 MojoResult MessagePipeDispatcher::RemoveWatcherRef(WatcherDispatcher* watcher, |
| 404 uintptr_t context) { |
| 405 base::AutoLock lock(signal_lock_); |
| 406 if (port_closed_ || in_transit_) |
| 407 return MOJO_RESULT_INVALID_ARGUMENT; |
| 408 return watchers_.Remove(watcher, context); |
| 409 } |
| 410 |
| 399 MojoResult MessagePipeDispatcher::AddAwakable( | 411 MojoResult MessagePipeDispatcher::AddAwakable( |
| 400 Awakable* awakable, | 412 Awakable* awakable, |
| 401 MojoHandleSignals signals, | 413 MojoHandleSignals signals, |
| 402 uintptr_t context, | 414 uintptr_t context, |
| 403 HandleSignalsState* signals_state) { | 415 HandleSignalsState* signals_state) { |
| 404 base::AutoLock lock(signal_lock_); | 416 base::AutoLock lock(signal_lock_); |
| 405 | 417 |
| 406 if (port_closed_ || in_transit_) { | 418 if (port_closed_ || in_transit_) { |
| 407 if (signals_state) | 419 if (signals_state) |
| 408 *signals_state = HandleSignalsState(); | 420 *signals_state = HandleSignalsState(); |
| (...skipping 80 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 489 port_transferred_ = true; | 501 port_transferred_ = true; |
| 490 in_transit_.Set(false); | 502 in_transit_.Set(false); |
| 491 CloseNoLock(); | 503 CloseNoLock(); |
| 492 } | 504 } |
| 493 | 505 |
| 494 void MessagePipeDispatcher::CancelTransit() { | 506 void MessagePipeDispatcher::CancelTransit() { |
| 495 base::AutoLock lock(signal_lock_); | 507 base::AutoLock lock(signal_lock_); |
| 496 in_transit_.Set(false); | 508 in_transit_.Set(false); |
| 497 | 509 |
| 498 // Something may have happened while we were waiting for potential transit. | 510 // Something may have happened while we were waiting for potential transit. |
| 499 awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock()); | 511 HandleSignalsState state = GetHandleSignalsStateNoLock(); |
| 512 awakables_.AwakeForStateChange(state); |
| 513 watchers_.NotifyState(state); |
| 500 } | 514 } |
| 501 | 515 |
| 502 // static | 516 // static |
| 503 scoped_refptr<Dispatcher> MessagePipeDispatcher::Deserialize( | 517 scoped_refptr<Dispatcher> MessagePipeDispatcher::Deserialize( |
| 504 const void* data, | 518 const void* data, |
| 505 size_t num_bytes, | 519 size_t num_bytes, |
| 506 const ports::PortName* ports, | 520 const ports::PortName* ports, |
| 507 size_t num_ports, | 521 size_t num_ports, |
| 508 PlatformHandle* handles, | 522 PlatformHandle* handles, |
| 509 size_t num_handles) { | 523 size_t num_handles) { |
| (...skipping 15 matching lines...) Expand all Loading... |
| 525 DCHECK(port_closed_ && !in_transit_); | 539 DCHECK(port_closed_ && !in_transit_); |
| 526 } | 540 } |
| 527 | 541 |
| 528 MojoResult MessagePipeDispatcher::CloseNoLock() { | 542 MojoResult MessagePipeDispatcher::CloseNoLock() { |
| 529 signal_lock_.AssertAcquired(); | 543 signal_lock_.AssertAcquired(); |
| 530 if (port_closed_ || in_transit_) | 544 if (port_closed_ || in_transit_) |
| 531 return MOJO_RESULT_INVALID_ARGUMENT; | 545 return MOJO_RESULT_INVALID_ARGUMENT; |
| 532 | 546 |
| 533 port_closed_.Set(true); | 547 port_closed_.Set(true); |
| 534 awakables_.CancelAll(); | 548 awakables_.CancelAll(); |
| 549 watchers_.NotifyClosed(); |
| 535 | 550 |
| 536 if (!port_transferred_) { | 551 if (!port_transferred_) { |
| 537 base::AutoUnlock unlock(signal_lock_); | 552 base::AutoUnlock unlock(signal_lock_); |
| 538 node_controller_->ClosePort(port_); | 553 node_controller_->ClosePort(port_); |
| 539 } | 554 } |
| 540 | 555 |
| 541 return MOJO_RESULT_OK; | 556 return MOJO_RESULT_OK; |
| 542 } | 557 } |
| 543 | 558 |
| 544 HandleSignalsState MessagePipeDispatcher::GetHandleSignalsStateNoLock() const { | 559 HandleSignalsState MessagePipeDispatcher::GetHandleSignalsStateNoLock() const { |
| (...skipping 44 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 589 << " endpoint " << endpoint_ << " [port=" << port_.name() | 604 << " endpoint " << endpoint_ << " [port=" << port_.name() |
| 590 << "; size=" << filter.message_size() << "]"; | 605 << "; size=" << filter.message_size() << "]"; |
| 591 } | 606 } |
| 592 if (port_status.peer_closed) { | 607 if (port_status.peer_closed) { |
| 593 DVLOG(2) << "Peer closure detected on message pipe " << pipe_id_ | 608 DVLOG(2) << "Peer closure detected on message pipe " << pipe_id_ |
| 594 << " endpoint " << endpoint_ << " [port=" << port_.name() << "]"; | 609 << " endpoint " << endpoint_ << " [port=" << port_.name() << "]"; |
| 595 } | 610 } |
| 596 } | 611 } |
| 597 #endif | 612 #endif |
| 598 | 613 |
| 599 awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock()); | 614 HandleSignalsState state = GetHandleSignalsStateNoLock(); |
| 615 awakables_.AwakeForStateChange(state); |
| 616 watchers_.NotifyState(state); |
| 600 } | 617 } |
| 601 | 618 |
| 602 } // namespace edk | 619 } // namespace edk |
| 603 } // namespace mojo | 620 } // namespace mojo |
| OLD | NEW |