OLD | NEW |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "mojo/edk/system/message_pipe_dispatcher.h" | 5 #include "mojo/edk/system/message_pipe_dispatcher.h" |
6 | 6 |
7 #include <limits> | 7 #include <limits> |
8 #include <memory> | 8 #include <memory> |
9 | 9 |
10 #include "base/logging.h" | 10 #include "base/logging.h" |
(...skipping 146 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
157 | 157 |
158 #endif // DCHECK_IS_ON() | 158 #endif // DCHECK_IS_ON() |
159 | 159 |
160 MessagePipeDispatcher::MessagePipeDispatcher(NodeController* node_controller, | 160 MessagePipeDispatcher::MessagePipeDispatcher(NodeController* node_controller, |
161 const ports::PortRef& port, | 161 const ports::PortRef& port, |
162 uint64_t pipe_id, | 162 uint64_t pipe_id, |
163 int endpoint) | 163 int endpoint) |
164 : node_controller_(node_controller), | 164 : node_controller_(node_controller), |
165 port_(port), | 165 port_(port), |
166 pipe_id_(pipe_id), | 166 pipe_id_(pipe_id), |
167 endpoint_(endpoint), | 167 endpoint_(endpoint) { |
168 watchers_(this) { | |
169 DVLOG(2) << "Creating new MessagePipeDispatcher for port " << port.name() | 168 DVLOG(2) << "Creating new MessagePipeDispatcher for port " << port.name() |
170 << " [pipe_id=" << pipe_id << "; endpoint=" << endpoint << "]"; | 169 << " [pipe_id=" << pipe_id << "; endpoint=" << endpoint << "]"; |
171 | 170 |
172 node_controller_->SetPortObserver( | 171 node_controller_->SetPortObserver( |
173 port_, | 172 port_, |
174 make_scoped_refptr(new PortObserverThunk(this))); | 173 make_scoped_refptr(new PortObserverThunk(this))); |
175 } | 174 } |
176 | 175 |
177 bool MessagePipeDispatcher::Fuse(MessagePipeDispatcher* other) { | 176 bool MessagePipeDispatcher::Fuse(MessagePipeDispatcher* other) { |
178 node_controller_->SetPortObserver(port_, nullptr); | 177 node_controller_->SetPortObserver(port_, nullptr); |
179 node_controller_->SetPortObserver(other->port_, nullptr); | 178 node_controller_->SetPortObserver(other->port_, nullptr); |
180 | 179 |
181 ports::PortRef port0; | 180 ports::PortRef port0; |
182 { | 181 { |
183 base::AutoLock lock(signal_lock_); | 182 base::AutoLock lock(signal_lock_); |
184 port0 = port_; | 183 port0 = port_; |
185 port_closed_.Set(true); | 184 port_closed_.Set(true); |
186 awakables_.CancelAll(); | 185 awakables_.CancelAll(); |
187 watchers_.NotifyClosed(); | |
188 } | 186 } |
189 | 187 |
190 ports::PortRef port1; | 188 ports::PortRef port1; |
191 { | 189 { |
192 base::AutoLock lock(other->signal_lock_); | 190 base::AutoLock lock(other->signal_lock_); |
193 port1 = other->port_; | 191 port1 = other->port_; |
194 other->port_closed_.Set(true); | 192 other->port_closed_.Set(true); |
195 other->awakables_.CancelAll(); | 193 other->awakables_.CancelAll(); |
196 other->watchers_.NotifyClosed(); | |
197 } | 194 } |
198 | 195 |
199 // Both ports are always closed by this call. | 196 // Both ports are always closed by this call. |
200 int rv = node_controller_->MergeLocalPorts(port0, port1); | 197 int rv = node_controller_->MergeLocalPorts(port0, port1); |
201 return rv == ports::OK; | 198 return rv == ports::OK; |
202 } | 199 } |
203 | 200 |
204 Dispatcher::Type MessagePipeDispatcher::GetType() const { | 201 Dispatcher::Type MessagePipeDispatcher::GetType() const { |
205 return Type::MESSAGE_PIPE; | 202 return Type::MESSAGE_PIPE; |
206 } | 203 } |
207 | 204 |
208 MojoResult MessagePipeDispatcher::Close() { | 205 MojoResult MessagePipeDispatcher::Close() { |
209 base::AutoLock lock(signal_lock_); | 206 base::AutoLock lock(signal_lock_); |
210 DVLOG(2) << "Closing message pipe " << pipe_id_ << " endpoint " << endpoint_ | 207 DVLOG(2) << "Closing message pipe " << pipe_id_ << " endpoint " << endpoint_ |
211 << " [port=" << port_.name() << "]"; | 208 << " [port=" << port_.name() << "]"; |
212 return CloseNoLock(); | 209 return CloseNoLock(); |
213 } | 210 } |
214 | 211 |
| 212 MojoResult MessagePipeDispatcher::Watch(MojoHandleSignals signals, |
| 213 const Watcher::WatchCallback& callback, |
| 214 uintptr_t context) { |
| 215 base::AutoLock lock(signal_lock_); |
| 216 |
| 217 if (port_closed_ || in_transit_) |
| 218 return MOJO_RESULT_INVALID_ARGUMENT; |
| 219 |
| 220 return awakables_.AddWatcher( |
| 221 signals, callback, context, GetHandleSignalsStateNoLock()); |
| 222 } |
| 223 |
| 224 MojoResult MessagePipeDispatcher::CancelWatch(uintptr_t context) { |
| 225 base::AutoLock lock(signal_lock_); |
| 226 |
| 227 if (port_closed_ || in_transit_) |
| 228 return MOJO_RESULT_INVALID_ARGUMENT; |
| 229 |
| 230 return awakables_.RemoveWatcher(context); |
| 231 } |
| 232 |
215 MojoResult MessagePipeDispatcher::WriteMessage( | 233 MojoResult MessagePipeDispatcher::WriteMessage( |
216 std::unique_ptr<MessageForTransit> message, | 234 std::unique_ptr<MessageForTransit> message, |
217 MojoWriteMessageFlags flags) { | 235 MojoWriteMessageFlags flags) { |
218 if (port_closed_ || in_transit_) | 236 if (port_closed_ || in_transit_) |
219 return MOJO_RESULT_INVALID_ARGUMENT; | 237 return MOJO_RESULT_INVALID_ARGUMENT; |
220 | 238 |
221 size_t num_bytes = message->num_bytes(); | 239 size_t num_bytes = message->num_bytes(); |
222 int rv = node_controller_->SendMessage(port_, message->TakePortsMessage()); | 240 int rv = node_controller_->SendMessage(port_, message->TakePortsMessage()); |
223 | 241 |
224 DVLOG(4) << "Sent message on pipe " << pipe_id_ << " endpoint " << endpoint_ | 242 DVLOG(4) << "Sent message on pipe " << pipe_id_ << " endpoint " << endpoint_ |
(...skipping 49 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
274 if (rv != ports::OK && rv != ports::ERROR_PORT_PEER_CLOSED) { | 292 if (rv != ports::OK && rv != ports::ERROR_PORT_PEER_CLOSED) { |
275 if (rv == ports::ERROR_PORT_UNKNOWN || | 293 if (rv == ports::ERROR_PORT_UNKNOWN || |
276 rv == ports::ERROR_PORT_STATE_UNEXPECTED) | 294 rv == ports::ERROR_PORT_STATE_UNEXPECTED) |
277 return MOJO_RESULT_INVALID_ARGUMENT; | 295 return MOJO_RESULT_INVALID_ARGUMENT; |
278 | 296 |
279 NOTREACHED(); | 297 NOTREACHED(); |
280 return MOJO_RESULT_UNKNOWN; // TODO: Add a better error code here? | 298 return MOJO_RESULT_UNKNOWN; // TODO: Add a better error code here? |
281 } | 299 } |
282 | 300 |
283 if (no_space) { | 301 if (no_space) { |
284 if (may_discard) { | |
285 // May have been the last message on the pipe. Need to update signals just | |
286 // in case. | |
287 base::AutoLock lock(signal_lock_); | |
288 watchers_.NotifyState(GetHandleSignalsStateNoLock()); | |
289 } | |
290 // |*num_handles| (and/or |*num_bytes| if |read_any_size| is false) wasn't | 302 // |*num_handles| (and/or |*num_bytes| if |read_any_size| is false) wasn't |
291 // sufficient to hold this message's data. The message will still be in | 303 // sufficient to hold this message's data. The message will still be in |
292 // queue unless MOJO_READ_MESSAGE_FLAG_MAY_DISCARD was set. | 304 // queue unless MOJO_READ_MESSAGE_FLAG_MAY_DISCARD was set. |
293 return MOJO_RESULT_RESOURCE_EXHAUSTED; | 305 return MOJO_RESULT_RESOURCE_EXHAUSTED; |
294 } | 306 } |
295 | 307 |
296 if (!ports_message) { | 308 if (!ports_message) { |
297 // No message was available in queue. | 309 // No message was available in queue. |
298 | 310 |
299 if (rv == ports::OK) | 311 if (rv == ports::OK) |
300 return MOJO_RESULT_SHOULD_WAIT; | 312 return MOJO_RESULT_SHOULD_WAIT; |
301 | 313 |
302 // Peer is closed and there are no more messages to read. | 314 // Peer is closed and there are no more messages to read. |
303 DCHECK_EQ(rv, ports::ERROR_PORT_PEER_CLOSED); | 315 DCHECK_EQ(rv, ports::ERROR_PORT_PEER_CLOSED); |
304 return MOJO_RESULT_FAILED_PRECONDITION; | 316 return MOJO_RESULT_FAILED_PRECONDITION; |
305 } | 317 } |
306 | 318 |
307 // Alright! We have a message and the caller has provided sufficient storage | 319 // Alright! We have a message and the caller has provided sufficient storage |
308 // in which to receive it. | 320 // in which to receive it. |
309 | 321 |
310 { | |
311 // We need to update anyone watching our signals in case that was the last | |
312 // available message. | |
313 base::AutoLock lock(signal_lock_); | |
314 watchers_.NotifyState(GetHandleSignalsStateNoLock()); | |
315 } | |
316 | |
317 std::unique_ptr<PortsMessage> msg( | 322 std::unique_ptr<PortsMessage> msg( |
318 static_cast<PortsMessage*>(ports_message.release())); | 323 static_cast<PortsMessage*>(ports_message.release())); |
319 | 324 |
320 const MessageHeader* header = | 325 const MessageHeader* header = |
321 static_cast<const MessageHeader*>(msg->payload_bytes()); | 326 static_cast<const MessageHeader*>(msg->payload_bytes()); |
322 const DispatcherHeader* dispatcher_headers = | 327 const DispatcherHeader* dispatcher_headers = |
323 reinterpret_cast<const DispatcherHeader*>(header + 1); | 328 reinterpret_cast<const DispatcherHeader*>(header + 1); |
324 | 329 |
325 if (header->num_dispatchers > std::numeric_limits<uint16_t>::max()) | 330 if (header->num_dispatchers > std::numeric_limits<uint16_t>::max()) |
326 return MOJO_RESULT_UNKNOWN; | 331 return MOJO_RESULT_UNKNOWN; |
(...skipping 57 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
384 *message = MessageForTransit::WrapPortsMessage(std::move(msg)); | 389 *message = MessageForTransit::WrapPortsMessage(std::move(msg)); |
385 return MOJO_RESULT_OK; | 390 return MOJO_RESULT_OK; |
386 } | 391 } |
387 | 392 |
388 HandleSignalsState | 393 HandleSignalsState |
389 MessagePipeDispatcher::GetHandleSignalsState() const { | 394 MessagePipeDispatcher::GetHandleSignalsState() const { |
390 base::AutoLock lock(signal_lock_); | 395 base::AutoLock lock(signal_lock_); |
391 return GetHandleSignalsStateNoLock(); | 396 return GetHandleSignalsStateNoLock(); |
392 } | 397 } |
393 | 398 |
394 MojoResult MessagePipeDispatcher::AddWatcherRef( | |
395 const scoped_refptr<WatcherDispatcher>& watcher, | |
396 uintptr_t context) { | |
397 base::AutoLock lock(signal_lock_); | |
398 if (port_closed_ || in_transit_) | |
399 return MOJO_RESULT_INVALID_ARGUMENT; | |
400 return watchers_.Add(watcher, context, GetHandleSignalsStateNoLock()); | |
401 } | |
402 | |
403 MojoResult MessagePipeDispatcher::RemoveWatcherRef(WatcherDispatcher* watcher, | |
404 uintptr_t context) { | |
405 base::AutoLock lock(signal_lock_); | |
406 if (port_closed_ || in_transit_) | |
407 return MOJO_RESULT_INVALID_ARGUMENT; | |
408 return watchers_.Remove(watcher, context); | |
409 } | |
410 | |
411 MojoResult MessagePipeDispatcher::AddAwakable( | 399 MojoResult MessagePipeDispatcher::AddAwakable( |
412 Awakable* awakable, | 400 Awakable* awakable, |
413 MojoHandleSignals signals, | 401 MojoHandleSignals signals, |
414 uintptr_t context, | 402 uintptr_t context, |
415 HandleSignalsState* signals_state) { | 403 HandleSignalsState* signals_state) { |
416 base::AutoLock lock(signal_lock_); | 404 base::AutoLock lock(signal_lock_); |
417 | 405 |
418 if (port_closed_ || in_transit_) { | 406 if (port_closed_ || in_transit_) { |
419 if (signals_state) | 407 if (signals_state) |
420 *signals_state = HandleSignalsState(); | 408 *signals_state = HandleSignalsState(); |
(...skipping 80 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
501 port_transferred_ = true; | 489 port_transferred_ = true; |
502 in_transit_.Set(false); | 490 in_transit_.Set(false); |
503 CloseNoLock(); | 491 CloseNoLock(); |
504 } | 492 } |
505 | 493 |
506 void MessagePipeDispatcher::CancelTransit() { | 494 void MessagePipeDispatcher::CancelTransit() { |
507 base::AutoLock lock(signal_lock_); | 495 base::AutoLock lock(signal_lock_); |
508 in_transit_.Set(false); | 496 in_transit_.Set(false); |
509 | 497 |
510 // Something may have happened while we were waiting for potential transit. | 498 // Something may have happened while we were waiting for potential transit. |
511 HandleSignalsState state = GetHandleSignalsStateNoLock(); | 499 awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock()); |
512 awakables_.AwakeForStateChange(state); | |
513 watchers_.NotifyState(state); | |
514 } | 500 } |
515 | 501 |
516 // static | 502 // static |
517 scoped_refptr<Dispatcher> MessagePipeDispatcher::Deserialize( | 503 scoped_refptr<Dispatcher> MessagePipeDispatcher::Deserialize( |
518 const void* data, | 504 const void* data, |
519 size_t num_bytes, | 505 size_t num_bytes, |
520 const ports::PortName* ports, | 506 const ports::PortName* ports, |
521 size_t num_ports, | 507 size_t num_ports, |
522 PlatformHandle* handles, | 508 PlatformHandle* handles, |
523 size_t num_handles) { | 509 size_t num_handles) { |
(...skipping 15 matching lines...) Expand all Loading... |
539 DCHECK(port_closed_ && !in_transit_); | 525 DCHECK(port_closed_ && !in_transit_); |
540 } | 526 } |
541 | 527 |
542 MojoResult MessagePipeDispatcher::CloseNoLock() { | 528 MojoResult MessagePipeDispatcher::CloseNoLock() { |
543 signal_lock_.AssertAcquired(); | 529 signal_lock_.AssertAcquired(); |
544 if (port_closed_ || in_transit_) | 530 if (port_closed_ || in_transit_) |
545 return MOJO_RESULT_INVALID_ARGUMENT; | 531 return MOJO_RESULT_INVALID_ARGUMENT; |
546 | 532 |
547 port_closed_.Set(true); | 533 port_closed_.Set(true); |
548 awakables_.CancelAll(); | 534 awakables_.CancelAll(); |
549 watchers_.NotifyClosed(); | |
550 | 535 |
551 if (!port_transferred_) { | 536 if (!port_transferred_) { |
552 base::AutoUnlock unlock(signal_lock_); | 537 base::AutoUnlock unlock(signal_lock_); |
553 node_controller_->ClosePort(port_); | 538 node_controller_->ClosePort(port_); |
554 } | 539 } |
555 | 540 |
556 return MOJO_RESULT_OK; | 541 return MOJO_RESULT_OK; |
557 } | 542 } |
558 | 543 |
559 HandleSignalsState MessagePipeDispatcher::GetHandleSignalsStateNoLock() const { | 544 HandleSignalsState MessagePipeDispatcher::GetHandleSignalsStateNoLock() const { |
(...skipping 44 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
604 << " endpoint " << endpoint_ << " [port=" << port_.name() | 589 << " endpoint " << endpoint_ << " [port=" << port_.name() |
605 << "; size=" << filter.message_size() << "]"; | 590 << "; size=" << filter.message_size() << "]"; |
606 } | 591 } |
607 if (port_status.peer_closed) { | 592 if (port_status.peer_closed) { |
608 DVLOG(2) << "Peer closure detected on message pipe " << pipe_id_ | 593 DVLOG(2) << "Peer closure detected on message pipe " << pipe_id_ |
609 << " endpoint " << endpoint_ << " [port=" << port_.name() << "]"; | 594 << " endpoint " << endpoint_ << " [port=" << port_.name() << "]"; |
610 } | 595 } |
611 } | 596 } |
612 #endif | 597 #endif |
613 | 598 |
614 HandleSignalsState state = GetHandleSignalsStateNoLock(); | 599 awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock()); |
615 awakables_.AwakeForStateChange(state); | |
616 watchers_.NotifyState(state); | |
617 } | 600 } |
618 | 601 |
619 } // namespace edk | 602 } // namespace edk |
620 } // namespace mojo | 603 } // namespace mojo |
OLD | NEW |