| OLD | NEW |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "mojo/edk/system/message_pipe_dispatcher.h" | 5 #include "mojo/edk/system/message_pipe_dispatcher.h" |
| 6 | 6 |
| 7 #include <limits> | 7 #include <limits> |
| 8 #include <memory> | 8 #include <memory> |
| 9 | 9 |
| 10 #include "base/logging.h" | 10 #include "base/logging.h" |
| (...skipping 146 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 157 | 157 |
| 158 #endif // DCHECK_IS_ON() | 158 #endif // DCHECK_IS_ON() |
| 159 | 159 |
| 160 MessagePipeDispatcher::MessagePipeDispatcher(NodeController* node_controller, | 160 MessagePipeDispatcher::MessagePipeDispatcher(NodeController* node_controller, |
| 161 const ports::PortRef& port, | 161 const ports::PortRef& port, |
| 162 uint64_t pipe_id, | 162 uint64_t pipe_id, |
| 163 int endpoint) | 163 int endpoint) |
| 164 : node_controller_(node_controller), | 164 : node_controller_(node_controller), |
| 165 port_(port), | 165 port_(port), |
| 166 pipe_id_(pipe_id), | 166 pipe_id_(pipe_id), |
| 167 endpoint_(endpoint), | 167 endpoint_(endpoint) { |
| 168 watchers_(this) { | |
| 169 DVLOG(2) << "Creating new MessagePipeDispatcher for port " << port.name() | 168 DVLOG(2) << "Creating new MessagePipeDispatcher for port " << port.name() |
| 170 << " [pipe_id=" << pipe_id << "; endpoint=" << endpoint << "]"; | 169 << " [pipe_id=" << pipe_id << "; endpoint=" << endpoint << "]"; |
| 171 | 170 |
| 172 node_controller_->SetPortObserver( | 171 node_controller_->SetPortObserver( |
| 173 port_, | 172 port_, |
| 174 make_scoped_refptr(new PortObserverThunk(this))); | 173 make_scoped_refptr(new PortObserverThunk(this))); |
| 175 } | 174 } |
| 176 | 175 |
| 177 bool MessagePipeDispatcher::Fuse(MessagePipeDispatcher* other) { | 176 bool MessagePipeDispatcher::Fuse(MessagePipeDispatcher* other) { |
| 178 node_controller_->SetPortObserver(port_, nullptr); | 177 node_controller_->SetPortObserver(port_, nullptr); |
| 179 node_controller_->SetPortObserver(other->port_, nullptr); | 178 node_controller_->SetPortObserver(other->port_, nullptr); |
| 180 | 179 |
| 181 ports::PortRef port0; | 180 ports::PortRef port0; |
| 182 { | 181 { |
| 183 base::AutoLock lock(signal_lock_); | 182 base::AutoLock lock(signal_lock_); |
| 184 port0 = port_; | 183 port0 = port_; |
| 185 port_closed_.Set(true); | 184 port_closed_.Set(true); |
| 186 awakables_.CancelAll(); | 185 awakables_.CancelAll(); |
| 187 watchers_.NotifyClosed(); | |
| 188 } | 186 } |
| 189 | 187 |
| 190 ports::PortRef port1; | 188 ports::PortRef port1; |
| 191 { | 189 { |
| 192 base::AutoLock lock(other->signal_lock_); | 190 base::AutoLock lock(other->signal_lock_); |
| 193 port1 = other->port_; | 191 port1 = other->port_; |
| 194 other->port_closed_.Set(true); | 192 other->port_closed_.Set(true); |
| 195 other->awakables_.CancelAll(); | 193 other->awakables_.CancelAll(); |
| 196 other->watchers_.NotifyClosed(); | |
| 197 } | 194 } |
| 198 | 195 |
| 199 // Both ports are always closed by this call. | 196 // Both ports are always closed by this call. |
| 200 int rv = node_controller_->MergeLocalPorts(port0, port1); | 197 int rv = node_controller_->MergeLocalPorts(port0, port1); |
| 201 return rv == ports::OK; | 198 return rv == ports::OK; |
| 202 } | 199 } |
| 203 | 200 |
| 204 Dispatcher::Type MessagePipeDispatcher::GetType() const { | 201 Dispatcher::Type MessagePipeDispatcher::GetType() const { |
| 205 return Type::MESSAGE_PIPE; | 202 return Type::MESSAGE_PIPE; |
| 206 } | 203 } |
| 207 | 204 |
| 208 MojoResult MessagePipeDispatcher::Close() { | 205 MojoResult MessagePipeDispatcher::Close() { |
| 209 base::AutoLock lock(signal_lock_); | 206 base::AutoLock lock(signal_lock_); |
| 210 DVLOG(2) << "Closing message pipe " << pipe_id_ << " endpoint " << endpoint_ | 207 DVLOG(2) << "Closing message pipe " << pipe_id_ << " endpoint " << endpoint_ |
| 211 << " [port=" << port_.name() << "]"; | 208 << " [port=" << port_.name() << "]"; |
| 212 return CloseNoLock(); | 209 return CloseNoLock(); |
| 213 } | 210 } |
| 214 | 211 |
| 212 MojoResult MessagePipeDispatcher::Watch(MojoHandleSignals signals, |
| 213 const Watcher::WatchCallback& callback, |
| 214 uintptr_t context) { |
| 215 base::AutoLock lock(signal_lock_); |
| 216 |
| 217 if (port_closed_ || in_transit_) |
| 218 return MOJO_RESULT_INVALID_ARGUMENT; |
| 219 |
| 220 return awakables_.AddWatcher( |
| 221 signals, callback, context, GetHandleSignalsStateNoLock()); |
| 222 } |
| 223 |
| 224 MojoResult MessagePipeDispatcher::CancelWatch(uintptr_t context) { |
| 225 base::AutoLock lock(signal_lock_); |
| 226 |
| 227 if (port_closed_ || in_transit_) |
| 228 return MOJO_RESULT_INVALID_ARGUMENT; |
| 229 |
| 230 return awakables_.RemoveWatcher(context); |
| 231 } |
| 232 |
| 215 MojoResult MessagePipeDispatcher::WriteMessage( | 233 MojoResult MessagePipeDispatcher::WriteMessage( |
| 216 std::unique_ptr<MessageForTransit> message, | 234 std::unique_ptr<MessageForTransit> message, |
| 217 MojoWriteMessageFlags flags) { | 235 MojoWriteMessageFlags flags) { |
| 218 if (port_closed_ || in_transit_) | 236 if (port_closed_ || in_transit_) |
| 219 return MOJO_RESULT_INVALID_ARGUMENT; | 237 return MOJO_RESULT_INVALID_ARGUMENT; |
| 220 | 238 |
| 221 size_t num_bytes = message->num_bytes(); | 239 size_t num_bytes = message->num_bytes(); |
| 222 int rv = node_controller_->SendMessage(port_, message->TakePortsMessage()); | 240 int rv = node_controller_->SendMessage(port_, message->TakePortsMessage()); |
| 223 | 241 |
| 224 DVLOG(4) << "Sent message on pipe " << pipe_id_ << " endpoint " << endpoint_ | 242 DVLOG(4) << "Sent message on pipe " << pipe_id_ << " endpoint " << endpoint_ |
| (...skipping 49 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 274 if (rv != ports::OK && rv != ports::ERROR_PORT_PEER_CLOSED) { | 292 if (rv != ports::OK && rv != ports::ERROR_PORT_PEER_CLOSED) { |
| 275 if (rv == ports::ERROR_PORT_UNKNOWN || | 293 if (rv == ports::ERROR_PORT_UNKNOWN || |
| 276 rv == ports::ERROR_PORT_STATE_UNEXPECTED) | 294 rv == ports::ERROR_PORT_STATE_UNEXPECTED) |
| 277 return MOJO_RESULT_INVALID_ARGUMENT; | 295 return MOJO_RESULT_INVALID_ARGUMENT; |
| 278 | 296 |
| 279 NOTREACHED(); | 297 NOTREACHED(); |
| 280 return MOJO_RESULT_UNKNOWN; // TODO: Add a better error code here? | 298 return MOJO_RESULT_UNKNOWN; // TODO: Add a better error code here? |
| 281 } | 299 } |
| 282 | 300 |
| 283 if (no_space) { | 301 if (no_space) { |
| 284 if (may_discard) { | |
| 285 // May have been the last message on the pipe. Need to update signals just | |
| 286 // in case. | |
| 287 base::AutoLock lock(signal_lock_); | |
| 288 watchers_.NotifyState(GetHandleSignalsStateNoLock()); | |
| 289 } | |
| 290 // |*num_handles| (and/or |*num_bytes| if |read_any_size| is false) wasn't | 302 // |*num_handles| (and/or |*num_bytes| if |read_any_size| is false) wasn't |
| 291 // sufficient to hold this message's data. The message will still be in | 303 // sufficient to hold this message's data. The message will still be in |
| 292 // queue unless MOJO_READ_MESSAGE_FLAG_MAY_DISCARD was set. | 304 // queue unless MOJO_READ_MESSAGE_FLAG_MAY_DISCARD was set. |
| 293 return MOJO_RESULT_RESOURCE_EXHAUSTED; | 305 return MOJO_RESULT_RESOURCE_EXHAUSTED; |
| 294 } | 306 } |
| 295 | 307 |
| 296 if (!ports_message) { | 308 if (!ports_message) { |
| 297 // No message was available in queue. | 309 // No message was available in queue. |
| 298 | 310 |
| 299 if (rv == ports::OK) | 311 if (rv == ports::OK) |
| 300 return MOJO_RESULT_SHOULD_WAIT; | 312 return MOJO_RESULT_SHOULD_WAIT; |
| 301 | 313 |
| 302 // Peer is closed and there are no more messages to read. | 314 // Peer is closed and there are no more messages to read. |
| 303 DCHECK_EQ(rv, ports::ERROR_PORT_PEER_CLOSED); | 315 DCHECK_EQ(rv, ports::ERROR_PORT_PEER_CLOSED); |
| 304 return MOJO_RESULT_FAILED_PRECONDITION; | 316 return MOJO_RESULT_FAILED_PRECONDITION; |
| 305 } | 317 } |
| 306 | 318 |
| 307 // Alright! We have a message and the caller has provided sufficient storage | 319 // Alright! We have a message and the caller has provided sufficient storage |
| 308 // in which to receive it. | 320 // in which to receive it. |
| 309 | 321 |
| 310 { | |
| 311 // We need to update anyone watching our signals in case that was the last | |
| 312 // available message. | |
| 313 base::AutoLock lock(signal_lock_); | |
| 314 watchers_.NotifyState(GetHandleSignalsStateNoLock()); | |
| 315 } | |
| 316 | |
| 317 std::unique_ptr<PortsMessage> msg( | 322 std::unique_ptr<PortsMessage> msg( |
| 318 static_cast<PortsMessage*>(ports_message.release())); | 323 static_cast<PortsMessage*>(ports_message.release())); |
| 319 | 324 |
| 320 const MessageHeader* header = | 325 const MessageHeader* header = |
| 321 static_cast<const MessageHeader*>(msg->payload_bytes()); | 326 static_cast<const MessageHeader*>(msg->payload_bytes()); |
| 322 const DispatcherHeader* dispatcher_headers = | 327 const DispatcherHeader* dispatcher_headers = |
| 323 reinterpret_cast<const DispatcherHeader*>(header + 1); | 328 reinterpret_cast<const DispatcherHeader*>(header + 1); |
| 324 | 329 |
| 325 if (header->num_dispatchers > std::numeric_limits<uint16_t>::max()) | 330 if (header->num_dispatchers > std::numeric_limits<uint16_t>::max()) |
| 326 return MOJO_RESULT_UNKNOWN; | 331 return MOJO_RESULT_UNKNOWN; |
| (...skipping 57 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 384 *message = MessageForTransit::WrapPortsMessage(std::move(msg)); | 389 *message = MessageForTransit::WrapPortsMessage(std::move(msg)); |
| 385 return MOJO_RESULT_OK; | 390 return MOJO_RESULT_OK; |
| 386 } | 391 } |
| 387 | 392 |
| 388 HandleSignalsState | 393 HandleSignalsState |
| 389 MessagePipeDispatcher::GetHandleSignalsState() const { | 394 MessagePipeDispatcher::GetHandleSignalsState() const { |
| 390 base::AutoLock lock(signal_lock_); | 395 base::AutoLock lock(signal_lock_); |
| 391 return GetHandleSignalsStateNoLock(); | 396 return GetHandleSignalsStateNoLock(); |
| 392 } | 397 } |
| 393 | 398 |
| 394 MojoResult MessagePipeDispatcher::AddWatcherRef( | |
| 395 const scoped_refptr<WatcherDispatcher>& watcher, | |
| 396 uintptr_t context) { | |
| 397 base::AutoLock lock(signal_lock_); | |
| 398 if (port_closed_ || in_transit_) | |
| 399 return MOJO_RESULT_INVALID_ARGUMENT; | |
| 400 return watchers_.Add(watcher, context, GetHandleSignalsStateNoLock()); | |
| 401 } | |
| 402 | |
| 403 MojoResult MessagePipeDispatcher::RemoveWatcherRef(WatcherDispatcher* watcher, | |
| 404 uintptr_t context) { | |
| 405 base::AutoLock lock(signal_lock_); | |
| 406 if (port_closed_ || in_transit_) | |
| 407 return MOJO_RESULT_INVALID_ARGUMENT; | |
| 408 return watchers_.Remove(watcher, context); | |
| 409 } | |
| 410 | |
| 411 MojoResult MessagePipeDispatcher::AddAwakable( | 399 MojoResult MessagePipeDispatcher::AddAwakable( |
| 412 Awakable* awakable, | 400 Awakable* awakable, |
| 413 MojoHandleSignals signals, | 401 MojoHandleSignals signals, |
| 414 uintptr_t context, | 402 uintptr_t context, |
| 415 HandleSignalsState* signals_state) { | 403 HandleSignalsState* signals_state) { |
| 416 base::AutoLock lock(signal_lock_); | 404 base::AutoLock lock(signal_lock_); |
| 417 | 405 |
| 418 if (port_closed_ || in_transit_) { | 406 if (port_closed_ || in_transit_) { |
| 419 if (signals_state) | 407 if (signals_state) |
| 420 *signals_state = HandleSignalsState(); | 408 *signals_state = HandleSignalsState(); |
| (...skipping 80 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 501 port_transferred_ = true; | 489 port_transferred_ = true; |
| 502 in_transit_.Set(false); | 490 in_transit_.Set(false); |
| 503 CloseNoLock(); | 491 CloseNoLock(); |
| 504 } | 492 } |
| 505 | 493 |
| 506 void MessagePipeDispatcher::CancelTransit() { | 494 void MessagePipeDispatcher::CancelTransit() { |
| 507 base::AutoLock lock(signal_lock_); | 495 base::AutoLock lock(signal_lock_); |
| 508 in_transit_.Set(false); | 496 in_transit_.Set(false); |
| 509 | 497 |
| 510 // Something may have happened while we were waiting for potential transit. | 498 // Something may have happened while we were waiting for potential transit. |
| 511 HandleSignalsState state = GetHandleSignalsStateNoLock(); | 499 awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock()); |
| 512 awakables_.AwakeForStateChange(state); | |
| 513 watchers_.NotifyState(state); | |
| 514 } | 500 } |
| 515 | 501 |
| 516 // static | 502 // static |
| 517 scoped_refptr<Dispatcher> MessagePipeDispatcher::Deserialize( | 503 scoped_refptr<Dispatcher> MessagePipeDispatcher::Deserialize( |
| 518 const void* data, | 504 const void* data, |
| 519 size_t num_bytes, | 505 size_t num_bytes, |
| 520 const ports::PortName* ports, | 506 const ports::PortName* ports, |
| 521 size_t num_ports, | 507 size_t num_ports, |
| 522 PlatformHandle* handles, | 508 PlatformHandle* handles, |
| 523 size_t num_handles) { | 509 size_t num_handles) { |
| (...skipping 15 matching lines...) Expand all Loading... |
| 539 DCHECK(port_closed_ && !in_transit_); | 525 DCHECK(port_closed_ && !in_transit_); |
| 540 } | 526 } |
| 541 | 527 |
| 542 MojoResult MessagePipeDispatcher::CloseNoLock() { | 528 MojoResult MessagePipeDispatcher::CloseNoLock() { |
| 543 signal_lock_.AssertAcquired(); | 529 signal_lock_.AssertAcquired(); |
| 544 if (port_closed_ || in_transit_) | 530 if (port_closed_ || in_transit_) |
| 545 return MOJO_RESULT_INVALID_ARGUMENT; | 531 return MOJO_RESULT_INVALID_ARGUMENT; |
| 546 | 532 |
| 547 port_closed_.Set(true); | 533 port_closed_.Set(true); |
| 548 awakables_.CancelAll(); | 534 awakables_.CancelAll(); |
| 549 watchers_.NotifyClosed(); | |
| 550 | 535 |
| 551 if (!port_transferred_) { | 536 if (!port_transferred_) { |
| 552 base::AutoUnlock unlock(signal_lock_); | 537 base::AutoUnlock unlock(signal_lock_); |
| 553 node_controller_->ClosePort(port_); | 538 node_controller_->ClosePort(port_); |
| 554 } | 539 } |
| 555 | 540 |
| 556 return MOJO_RESULT_OK; | 541 return MOJO_RESULT_OK; |
| 557 } | 542 } |
| 558 | 543 |
| 559 HandleSignalsState MessagePipeDispatcher::GetHandleSignalsStateNoLock() const { | 544 HandleSignalsState MessagePipeDispatcher::GetHandleSignalsStateNoLock() const { |
| (...skipping 44 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 604 << " endpoint " << endpoint_ << " [port=" << port_.name() | 589 << " endpoint " << endpoint_ << " [port=" << port_.name() |
| 605 << "; size=" << filter.message_size() << "]"; | 590 << "; size=" << filter.message_size() << "]"; |
| 606 } | 591 } |
| 607 if (port_status.peer_closed) { | 592 if (port_status.peer_closed) { |
| 608 DVLOG(2) << "Peer closure detected on message pipe " << pipe_id_ | 593 DVLOG(2) << "Peer closure detected on message pipe " << pipe_id_ |
| 609 << " endpoint " << endpoint_ << " [port=" << port_.name() << "]"; | 594 << " endpoint " << endpoint_ << " [port=" << port_.name() << "]"; |
| 610 } | 595 } |
| 611 } | 596 } |
| 612 #endif | 597 #endif |
| 613 | 598 |
| 614 HandleSignalsState state = GetHandleSignalsStateNoLock(); | 599 awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock()); |
| 615 awakables_.AwakeForStateChange(state); | |
| 616 watchers_.NotifyState(state); | |
| 617 } | 600 } |
| 618 | 601 |
| 619 } // namespace edk | 602 } // namespace edk |
| 620 } // namespace mojo | 603 } // namespace mojo |
| OLD | NEW |