Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(186)

Side by Side Diff: mojo/edk/system/message_pipe_dispatcher.cc

Issue 2725133002: Mojo: Armed Watchers (Closed)
Patch Set: rebase Created 3 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/edk/system/message_pipe_dispatcher.h" 5 #include "mojo/edk/system/message_pipe_dispatcher.h"
6 6
7 #include <limits> 7 #include <limits>
8 #include <memory> 8 #include <memory>
9 9
10 #include "base/logging.h" 10 #include "base/logging.h"
(...skipping 146 matching lines...) Expand 10 before | Expand all | Expand 10 after
157 157
158 #endif // DCHECK_IS_ON() 158 #endif // DCHECK_IS_ON()
159 159
160 MessagePipeDispatcher::MessagePipeDispatcher(NodeController* node_controller, 160 MessagePipeDispatcher::MessagePipeDispatcher(NodeController* node_controller,
161 const ports::PortRef& port, 161 const ports::PortRef& port,
162 uint64_t pipe_id, 162 uint64_t pipe_id,
163 int endpoint) 163 int endpoint)
164 : node_controller_(node_controller), 164 : node_controller_(node_controller),
165 port_(port), 165 port_(port),
166 pipe_id_(pipe_id), 166 pipe_id_(pipe_id),
167 endpoint_(endpoint) { 167 endpoint_(endpoint),
168 watchers_(this) {
168 DVLOG(2) << "Creating new MessagePipeDispatcher for port " << port.name() 169 DVLOG(2) << "Creating new MessagePipeDispatcher for port " << port.name()
169 << " [pipe_id=" << pipe_id << "; endpoint=" << endpoint << "]"; 170 << " [pipe_id=" << pipe_id << "; endpoint=" << endpoint << "]";
170 171
171 node_controller_->SetPortObserver( 172 node_controller_->SetPortObserver(
172 port_, 173 port_,
173 make_scoped_refptr(new PortObserverThunk(this))); 174 make_scoped_refptr(new PortObserverThunk(this)));
174 } 175 }
175 176
176 bool MessagePipeDispatcher::Fuse(MessagePipeDispatcher* other) { 177 bool MessagePipeDispatcher::Fuse(MessagePipeDispatcher* other) {
177 node_controller_->SetPortObserver(port_, nullptr); 178 node_controller_->SetPortObserver(port_, nullptr);
178 node_controller_->SetPortObserver(other->port_, nullptr); 179 node_controller_->SetPortObserver(other->port_, nullptr);
179 180
180 ports::PortRef port0; 181 ports::PortRef port0;
181 { 182 {
182 base::AutoLock lock(signal_lock_); 183 base::AutoLock lock(signal_lock_);
183 port0 = port_; 184 port0 = port_;
184 port_closed_.Set(true); 185 port_closed_.Set(true);
185 awakables_.CancelAll(); 186 awakables_.CancelAll();
187 watchers_.NotifyClosed();
186 } 188 }
187 189
188 ports::PortRef port1; 190 ports::PortRef port1;
189 { 191 {
190 base::AutoLock lock(other->signal_lock_); 192 base::AutoLock lock(other->signal_lock_);
191 port1 = other->port_; 193 port1 = other->port_;
192 other->port_closed_.Set(true); 194 other->port_closed_.Set(true);
193 other->awakables_.CancelAll(); 195 other->awakables_.CancelAll();
196 other->watchers_.NotifyClosed();
194 } 197 }
195 198
196 // Both ports are always closed by this call. 199 // Both ports are always closed by this call.
197 int rv = node_controller_->MergeLocalPorts(port0, port1); 200 int rv = node_controller_->MergeLocalPorts(port0, port1);
198 return rv == ports::OK; 201 return rv == ports::OK;
199 } 202 }
200 203
201 Dispatcher::Type MessagePipeDispatcher::GetType() const { 204 Dispatcher::Type MessagePipeDispatcher::GetType() const {
202 return Type::MESSAGE_PIPE; 205 return Type::MESSAGE_PIPE;
203 } 206 }
204 207
205 MojoResult MessagePipeDispatcher::Close() { 208 MojoResult MessagePipeDispatcher::Close() {
206 base::AutoLock lock(signal_lock_); 209 base::AutoLock lock(signal_lock_);
207 DVLOG(2) << "Closing message pipe " << pipe_id_ << " endpoint " << endpoint_ 210 DVLOG(2) << "Closing message pipe " << pipe_id_ << " endpoint " << endpoint_
208 << " [port=" << port_.name() << "]"; 211 << " [port=" << port_.name() << "]";
209 return CloseNoLock(); 212 return CloseNoLock();
210 } 213 }
211 214
212 MojoResult MessagePipeDispatcher::Watch(MojoHandleSignals signals,
213 const Watcher::WatchCallback& callback,
214 uintptr_t context) {
215 base::AutoLock lock(signal_lock_);
216
217 if (port_closed_ || in_transit_)
218 return MOJO_RESULT_INVALID_ARGUMENT;
219
220 return awakables_.AddWatcher(
221 signals, callback, context, GetHandleSignalsStateNoLock());
222 }
223
224 MojoResult MessagePipeDispatcher::CancelWatch(uintptr_t context) {
225 base::AutoLock lock(signal_lock_);
226
227 if (port_closed_ || in_transit_)
228 return MOJO_RESULT_INVALID_ARGUMENT;
229
230 return awakables_.RemoveWatcher(context);
231 }
232
233 MojoResult MessagePipeDispatcher::WriteMessage( 215 MojoResult MessagePipeDispatcher::WriteMessage(
234 std::unique_ptr<MessageForTransit> message, 216 std::unique_ptr<MessageForTransit> message,
235 MojoWriteMessageFlags flags) { 217 MojoWriteMessageFlags flags) {
236 if (port_closed_ || in_transit_) 218 if (port_closed_ || in_transit_)
237 return MOJO_RESULT_INVALID_ARGUMENT; 219 return MOJO_RESULT_INVALID_ARGUMENT;
238 220
239 size_t num_bytes = message->num_bytes(); 221 size_t num_bytes = message->num_bytes();
240 int rv = node_controller_->SendMessage(port_, message->TakePortsMessage()); 222 int rv = node_controller_->SendMessage(port_, message->TakePortsMessage());
241 223
242 DVLOG(4) << "Sent message on pipe " << pipe_id_ << " endpoint " << endpoint_ 224 DVLOG(4) << "Sent message on pipe " << pipe_id_ << " endpoint " << endpoint_
(...skipping 49 matching lines...) Expand 10 before | Expand all | Expand 10 after
292 if (rv != ports::OK && rv != ports::ERROR_PORT_PEER_CLOSED) { 274 if (rv != ports::OK && rv != ports::ERROR_PORT_PEER_CLOSED) {
293 if (rv == ports::ERROR_PORT_UNKNOWN || 275 if (rv == ports::ERROR_PORT_UNKNOWN ||
294 rv == ports::ERROR_PORT_STATE_UNEXPECTED) 276 rv == ports::ERROR_PORT_STATE_UNEXPECTED)
295 return MOJO_RESULT_INVALID_ARGUMENT; 277 return MOJO_RESULT_INVALID_ARGUMENT;
296 278
297 NOTREACHED(); 279 NOTREACHED();
298 return MOJO_RESULT_UNKNOWN; // TODO: Add a better error code here? 280 return MOJO_RESULT_UNKNOWN; // TODO: Add a better error code here?
299 } 281 }
300 282
301 if (no_space) { 283 if (no_space) {
284 if (may_discard) {
285 // May have been the last message on the pipe. Need to update signals just
286 // in case.
287 base::AutoLock lock(signal_lock_);
288 watchers_.NotifyState(GetHandleSignalsStateNoLock());
289 }
302 // |*num_handles| (and/or |*num_bytes| if |read_any_size| is false) wasn't 290 // |*num_handles| (and/or |*num_bytes| if |read_any_size| is false) wasn't
303 // sufficient to hold this message's data. The message will still be in 291 // sufficient to hold this message's data. The message will still be in
304 // queue unless MOJO_READ_MESSAGE_FLAG_MAY_DISCARD was set. 292 // queue unless MOJO_READ_MESSAGE_FLAG_MAY_DISCARD was set.
305 return MOJO_RESULT_RESOURCE_EXHAUSTED; 293 return MOJO_RESULT_RESOURCE_EXHAUSTED;
306 } 294 }
307 295
308 if (!ports_message) { 296 if (!ports_message) {
309 // No message was available in queue. 297 // No message was available in queue.
310 298
311 if (rv == ports::OK) 299 if (rv == ports::OK)
312 return MOJO_RESULT_SHOULD_WAIT; 300 return MOJO_RESULT_SHOULD_WAIT;
313 301
314 // Peer is closed and there are no more messages to read. 302 // Peer is closed and there are no more messages to read.
315 DCHECK_EQ(rv, ports::ERROR_PORT_PEER_CLOSED); 303 DCHECK_EQ(rv, ports::ERROR_PORT_PEER_CLOSED);
316 return MOJO_RESULT_FAILED_PRECONDITION; 304 return MOJO_RESULT_FAILED_PRECONDITION;
317 } 305 }
318 306
319 // Alright! We have a message and the caller has provided sufficient storage 307 // Alright! We have a message and the caller has provided sufficient storage
320 // in which to receive it. 308 // in which to receive it.
321 309
310 {
311 // We need to update anyone watching our signals in case that was the last
312 // available message.
313 base::AutoLock lock(signal_lock_);
314 watchers_.NotifyState(GetHandleSignalsStateNoLock());
315 }
316
322 std::unique_ptr<PortsMessage> msg( 317 std::unique_ptr<PortsMessage> msg(
323 static_cast<PortsMessage*>(ports_message.release())); 318 static_cast<PortsMessage*>(ports_message.release()));
324 319
325 const MessageHeader* header = 320 const MessageHeader* header =
326 static_cast<const MessageHeader*>(msg->payload_bytes()); 321 static_cast<const MessageHeader*>(msg->payload_bytes());
327 const DispatcherHeader* dispatcher_headers = 322 const DispatcherHeader* dispatcher_headers =
328 reinterpret_cast<const DispatcherHeader*>(header + 1); 323 reinterpret_cast<const DispatcherHeader*>(header + 1);
329 324
330 if (header->num_dispatchers > std::numeric_limits<uint16_t>::max()) 325 if (header->num_dispatchers > std::numeric_limits<uint16_t>::max())
331 return MOJO_RESULT_UNKNOWN; 326 return MOJO_RESULT_UNKNOWN;
(...skipping 57 matching lines...) Expand 10 before | Expand all | Expand 10 after
389 *message = MessageForTransit::WrapPortsMessage(std::move(msg)); 384 *message = MessageForTransit::WrapPortsMessage(std::move(msg));
390 return MOJO_RESULT_OK; 385 return MOJO_RESULT_OK;
391 } 386 }
392 387
393 HandleSignalsState 388 HandleSignalsState
394 MessagePipeDispatcher::GetHandleSignalsState() const { 389 MessagePipeDispatcher::GetHandleSignalsState() const {
395 base::AutoLock lock(signal_lock_); 390 base::AutoLock lock(signal_lock_);
396 return GetHandleSignalsStateNoLock(); 391 return GetHandleSignalsStateNoLock();
397 } 392 }
398 393
394 MojoResult MessagePipeDispatcher::AddWatcherRef(
395 const scoped_refptr<WatcherDispatcher>& watcher,
396 uintptr_t context) {
397 base::AutoLock lock(signal_lock_);
398 if (port_closed_ || in_transit_)
399 return MOJO_RESULT_INVALID_ARGUMENT;
400 return watchers_.Add(watcher, context, GetHandleSignalsStateNoLock());
401 }
402
403 MojoResult MessagePipeDispatcher::RemoveWatcherRef(WatcherDispatcher* watcher,
404 uintptr_t context) {
405 base::AutoLock lock(signal_lock_);
406 if (port_closed_ || in_transit_)
407 return MOJO_RESULT_INVALID_ARGUMENT;
408 return watchers_.Remove(watcher, context);
409 }
410
399 MojoResult MessagePipeDispatcher::AddAwakable( 411 MojoResult MessagePipeDispatcher::AddAwakable(
400 Awakable* awakable, 412 Awakable* awakable,
401 MojoHandleSignals signals, 413 MojoHandleSignals signals,
402 uintptr_t context, 414 uintptr_t context,
403 HandleSignalsState* signals_state) { 415 HandleSignalsState* signals_state) {
404 base::AutoLock lock(signal_lock_); 416 base::AutoLock lock(signal_lock_);
405 417
406 if (port_closed_ || in_transit_) { 418 if (port_closed_ || in_transit_) {
407 if (signals_state) 419 if (signals_state)
408 *signals_state = HandleSignalsState(); 420 *signals_state = HandleSignalsState();
(...skipping 80 matching lines...) Expand 10 before | Expand all | Expand 10 after
489 port_transferred_ = true; 501 port_transferred_ = true;
490 in_transit_.Set(false); 502 in_transit_.Set(false);
491 CloseNoLock(); 503 CloseNoLock();
492 } 504 }
493 505
494 void MessagePipeDispatcher::CancelTransit() { 506 void MessagePipeDispatcher::CancelTransit() {
495 base::AutoLock lock(signal_lock_); 507 base::AutoLock lock(signal_lock_);
496 in_transit_.Set(false); 508 in_transit_.Set(false);
497 509
498 // Something may have happened while we were waiting for potential transit. 510 // Something may have happened while we were waiting for potential transit.
499 awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock()); 511 HandleSignalsState state = GetHandleSignalsStateNoLock();
512 awakables_.AwakeForStateChange(state);
513 watchers_.NotifyState(state);
500 } 514 }
501 515
502 // static 516 // static
503 scoped_refptr<Dispatcher> MessagePipeDispatcher::Deserialize( 517 scoped_refptr<Dispatcher> MessagePipeDispatcher::Deserialize(
504 const void* data, 518 const void* data,
505 size_t num_bytes, 519 size_t num_bytes,
506 const ports::PortName* ports, 520 const ports::PortName* ports,
507 size_t num_ports, 521 size_t num_ports,
508 PlatformHandle* handles, 522 PlatformHandle* handles,
509 size_t num_handles) { 523 size_t num_handles) {
(...skipping 15 matching lines...) Expand all
525 DCHECK(port_closed_ && !in_transit_); 539 DCHECK(port_closed_ && !in_transit_);
526 } 540 }
527 541
528 MojoResult MessagePipeDispatcher::CloseNoLock() { 542 MojoResult MessagePipeDispatcher::CloseNoLock() {
529 signal_lock_.AssertAcquired(); 543 signal_lock_.AssertAcquired();
530 if (port_closed_ || in_transit_) 544 if (port_closed_ || in_transit_)
531 return MOJO_RESULT_INVALID_ARGUMENT; 545 return MOJO_RESULT_INVALID_ARGUMENT;
532 546
533 port_closed_.Set(true); 547 port_closed_.Set(true);
534 awakables_.CancelAll(); 548 awakables_.CancelAll();
549 watchers_.NotifyClosed();
535 550
536 if (!port_transferred_) { 551 if (!port_transferred_) {
537 base::AutoUnlock unlock(signal_lock_); 552 base::AutoUnlock unlock(signal_lock_);
538 node_controller_->ClosePort(port_); 553 node_controller_->ClosePort(port_);
539 } 554 }
540 555
541 return MOJO_RESULT_OK; 556 return MOJO_RESULT_OK;
542 } 557 }
543 558
544 HandleSignalsState MessagePipeDispatcher::GetHandleSignalsStateNoLock() const { 559 HandleSignalsState MessagePipeDispatcher::GetHandleSignalsStateNoLock() const {
(...skipping 44 matching lines...) Expand 10 before | Expand all | Expand 10 after
589 << " endpoint " << endpoint_ << " [port=" << port_.name() 604 << " endpoint " << endpoint_ << " [port=" << port_.name()
590 << "; size=" << filter.message_size() << "]"; 605 << "; size=" << filter.message_size() << "]";
591 } 606 }
592 if (port_status.peer_closed) { 607 if (port_status.peer_closed) {
593 DVLOG(2) << "Peer closure detected on message pipe " << pipe_id_ 608 DVLOG(2) << "Peer closure detected on message pipe " << pipe_id_
594 << " endpoint " << endpoint_ << " [port=" << port_.name() << "]"; 609 << " endpoint " << endpoint_ << " [port=" << port_.name() << "]";
595 } 610 }
596 } 611 }
597 #endif 612 #endif
598 613
599 awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock()); 614 HandleSignalsState state = GetHandleSignalsStateNoLock();
615 awakables_.AwakeForStateChange(state);
616 watchers_.NotifyState(state);
600 } 617 }
601 618
602 } // namespace edk 619 } // namespace edk
603 } // namespace mojo 620 } // namespace mojo
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698