Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(93)

Side by Side Diff: mojo/edk/system/message_pipe_dispatcher.cc

Issue 2750373002: Revert of Mojo: Armed Watchers (Closed)
Patch Set: Created 3 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/edk/system/message_pipe_dispatcher.h" 5 #include "mojo/edk/system/message_pipe_dispatcher.h"
6 6
7 #include <limits> 7 #include <limits>
8 #include <memory> 8 #include <memory>
9 9
10 #include "base/logging.h" 10 #include "base/logging.h"
(...skipping 146 matching lines...) Expand 10 before | Expand all | Expand 10 after
157 157
158 #endif // DCHECK_IS_ON() 158 #endif // DCHECK_IS_ON()
159 159
160 MessagePipeDispatcher::MessagePipeDispatcher(NodeController* node_controller, 160 MessagePipeDispatcher::MessagePipeDispatcher(NodeController* node_controller,
161 const ports::PortRef& port, 161 const ports::PortRef& port,
162 uint64_t pipe_id, 162 uint64_t pipe_id,
163 int endpoint) 163 int endpoint)
164 : node_controller_(node_controller), 164 : node_controller_(node_controller),
165 port_(port), 165 port_(port),
166 pipe_id_(pipe_id), 166 pipe_id_(pipe_id),
167 endpoint_(endpoint), 167 endpoint_(endpoint) {
168 watchers_(this) {
169 DVLOG(2) << "Creating new MessagePipeDispatcher for port " << port.name() 168 DVLOG(2) << "Creating new MessagePipeDispatcher for port " << port.name()
170 << " [pipe_id=" << pipe_id << "; endpoint=" << endpoint << "]"; 169 << " [pipe_id=" << pipe_id << "; endpoint=" << endpoint << "]";
171 170
172 node_controller_->SetPortObserver( 171 node_controller_->SetPortObserver(
173 port_, 172 port_,
174 make_scoped_refptr(new PortObserverThunk(this))); 173 make_scoped_refptr(new PortObserverThunk(this)));
175 } 174 }
176 175
177 bool MessagePipeDispatcher::Fuse(MessagePipeDispatcher* other) { 176 bool MessagePipeDispatcher::Fuse(MessagePipeDispatcher* other) {
178 node_controller_->SetPortObserver(port_, nullptr); 177 node_controller_->SetPortObserver(port_, nullptr);
179 node_controller_->SetPortObserver(other->port_, nullptr); 178 node_controller_->SetPortObserver(other->port_, nullptr);
180 179
181 ports::PortRef port0; 180 ports::PortRef port0;
182 { 181 {
183 base::AutoLock lock(signal_lock_); 182 base::AutoLock lock(signal_lock_);
184 port0 = port_; 183 port0 = port_;
185 port_closed_.Set(true); 184 port_closed_.Set(true);
186 awakables_.CancelAll(); 185 awakables_.CancelAll();
187 watchers_.NotifyClosed();
188 } 186 }
189 187
190 ports::PortRef port1; 188 ports::PortRef port1;
191 { 189 {
192 base::AutoLock lock(other->signal_lock_); 190 base::AutoLock lock(other->signal_lock_);
193 port1 = other->port_; 191 port1 = other->port_;
194 other->port_closed_.Set(true); 192 other->port_closed_.Set(true);
195 other->awakables_.CancelAll(); 193 other->awakables_.CancelAll();
196 other->watchers_.NotifyClosed();
197 } 194 }
198 195
199 // Both ports are always closed by this call. 196 // Both ports are always closed by this call.
200 int rv = node_controller_->MergeLocalPorts(port0, port1); 197 int rv = node_controller_->MergeLocalPorts(port0, port1);
201 return rv == ports::OK; 198 return rv == ports::OK;
202 } 199 }
203 200
204 Dispatcher::Type MessagePipeDispatcher::GetType() const { 201 Dispatcher::Type MessagePipeDispatcher::GetType() const {
205 return Type::MESSAGE_PIPE; 202 return Type::MESSAGE_PIPE;
206 } 203 }
207 204
208 MojoResult MessagePipeDispatcher::Close() { 205 MojoResult MessagePipeDispatcher::Close() {
209 base::AutoLock lock(signal_lock_); 206 base::AutoLock lock(signal_lock_);
210 DVLOG(2) << "Closing message pipe " << pipe_id_ << " endpoint " << endpoint_ 207 DVLOG(2) << "Closing message pipe " << pipe_id_ << " endpoint " << endpoint_
211 << " [port=" << port_.name() << "]"; 208 << " [port=" << port_.name() << "]";
212 return CloseNoLock(); 209 return CloseNoLock();
213 } 210 }
214 211
212 MojoResult MessagePipeDispatcher::Watch(MojoHandleSignals signals,
213 const Watcher::WatchCallback& callback,
214 uintptr_t context) {
215 base::AutoLock lock(signal_lock_);
216
217 if (port_closed_ || in_transit_)
218 return MOJO_RESULT_INVALID_ARGUMENT;
219
220 return awakables_.AddWatcher(
221 signals, callback, context, GetHandleSignalsStateNoLock());
222 }
223
224 MojoResult MessagePipeDispatcher::CancelWatch(uintptr_t context) {
225 base::AutoLock lock(signal_lock_);
226
227 if (port_closed_ || in_transit_)
228 return MOJO_RESULT_INVALID_ARGUMENT;
229
230 return awakables_.RemoveWatcher(context);
231 }
232
215 MojoResult MessagePipeDispatcher::WriteMessage( 233 MojoResult MessagePipeDispatcher::WriteMessage(
216 std::unique_ptr<MessageForTransit> message, 234 std::unique_ptr<MessageForTransit> message,
217 MojoWriteMessageFlags flags) { 235 MojoWriteMessageFlags flags) {
218 if (port_closed_ || in_transit_) 236 if (port_closed_ || in_transit_)
219 return MOJO_RESULT_INVALID_ARGUMENT; 237 return MOJO_RESULT_INVALID_ARGUMENT;
220 238
221 size_t num_bytes = message->num_bytes(); 239 size_t num_bytes = message->num_bytes();
222 int rv = node_controller_->SendMessage(port_, message->TakePortsMessage()); 240 int rv = node_controller_->SendMessage(port_, message->TakePortsMessage());
223 241
224 DVLOG(4) << "Sent message on pipe " << pipe_id_ << " endpoint " << endpoint_ 242 DVLOG(4) << "Sent message on pipe " << pipe_id_ << " endpoint " << endpoint_
(...skipping 49 matching lines...) Expand 10 before | Expand all | Expand 10 after
274 if (rv != ports::OK && rv != ports::ERROR_PORT_PEER_CLOSED) { 292 if (rv != ports::OK && rv != ports::ERROR_PORT_PEER_CLOSED) {
275 if (rv == ports::ERROR_PORT_UNKNOWN || 293 if (rv == ports::ERROR_PORT_UNKNOWN ||
276 rv == ports::ERROR_PORT_STATE_UNEXPECTED) 294 rv == ports::ERROR_PORT_STATE_UNEXPECTED)
277 return MOJO_RESULT_INVALID_ARGUMENT; 295 return MOJO_RESULT_INVALID_ARGUMENT;
278 296
279 NOTREACHED(); 297 NOTREACHED();
280 return MOJO_RESULT_UNKNOWN; // TODO: Add a better error code here? 298 return MOJO_RESULT_UNKNOWN; // TODO: Add a better error code here?
281 } 299 }
282 300
283 if (no_space) { 301 if (no_space) {
284 if (may_discard) {
285 // May have been the last message on the pipe. Need to update signals just
286 // in case.
287 base::AutoLock lock(signal_lock_);
288 watchers_.NotifyState(GetHandleSignalsStateNoLock());
289 }
290 // |*num_handles| (and/or |*num_bytes| if |read_any_size| is false) wasn't 302 // |*num_handles| (and/or |*num_bytes| if |read_any_size| is false) wasn't
291 // sufficient to hold this message's data. The message will still be in 303 // sufficient to hold this message's data. The message will still be in
292 // queue unless MOJO_READ_MESSAGE_FLAG_MAY_DISCARD was set. 304 // queue unless MOJO_READ_MESSAGE_FLAG_MAY_DISCARD was set.
293 return MOJO_RESULT_RESOURCE_EXHAUSTED; 305 return MOJO_RESULT_RESOURCE_EXHAUSTED;
294 } 306 }
295 307
296 if (!ports_message) { 308 if (!ports_message) {
297 // No message was available in queue. 309 // No message was available in queue.
298 310
299 if (rv == ports::OK) 311 if (rv == ports::OK)
300 return MOJO_RESULT_SHOULD_WAIT; 312 return MOJO_RESULT_SHOULD_WAIT;
301 313
302 // Peer is closed and there are no more messages to read. 314 // Peer is closed and there are no more messages to read.
303 DCHECK_EQ(rv, ports::ERROR_PORT_PEER_CLOSED); 315 DCHECK_EQ(rv, ports::ERROR_PORT_PEER_CLOSED);
304 return MOJO_RESULT_FAILED_PRECONDITION; 316 return MOJO_RESULT_FAILED_PRECONDITION;
305 } 317 }
306 318
307 // Alright! We have a message and the caller has provided sufficient storage 319 // Alright! We have a message and the caller has provided sufficient storage
308 // in which to receive it. 320 // in which to receive it.
309 321
310 {
311 // We need to update anyone watching our signals in case that was the last
312 // available message.
313 base::AutoLock lock(signal_lock_);
314 watchers_.NotifyState(GetHandleSignalsStateNoLock());
315 }
316
317 std::unique_ptr<PortsMessage> msg( 322 std::unique_ptr<PortsMessage> msg(
318 static_cast<PortsMessage*>(ports_message.release())); 323 static_cast<PortsMessage*>(ports_message.release()));
319 324
320 const MessageHeader* header = 325 const MessageHeader* header =
321 static_cast<const MessageHeader*>(msg->payload_bytes()); 326 static_cast<const MessageHeader*>(msg->payload_bytes());
322 const DispatcherHeader* dispatcher_headers = 327 const DispatcherHeader* dispatcher_headers =
323 reinterpret_cast<const DispatcherHeader*>(header + 1); 328 reinterpret_cast<const DispatcherHeader*>(header + 1);
324 329
325 if (header->num_dispatchers > std::numeric_limits<uint16_t>::max()) 330 if (header->num_dispatchers > std::numeric_limits<uint16_t>::max())
326 return MOJO_RESULT_UNKNOWN; 331 return MOJO_RESULT_UNKNOWN;
(...skipping 57 matching lines...) Expand 10 before | Expand all | Expand 10 after
384 *message = MessageForTransit::WrapPortsMessage(std::move(msg)); 389 *message = MessageForTransit::WrapPortsMessage(std::move(msg));
385 return MOJO_RESULT_OK; 390 return MOJO_RESULT_OK;
386 } 391 }
387 392
388 HandleSignalsState 393 HandleSignalsState
389 MessagePipeDispatcher::GetHandleSignalsState() const { 394 MessagePipeDispatcher::GetHandleSignalsState() const {
390 base::AutoLock lock(signal_lock_); 395 base::AutoLock lock(signal_lock_);
391 return GetHandleSignalsStateNoLock(); 396 return GetHandleSignalsStateNoLock();
392 } 397 }
393 398
394 MojoResult MessagePipeDispatcher::AddWatcherRef(
395 const scoped_refptr<WatcherDispatcher>& watcher,
396 uintptr_t context) {
397 base::AutoLock lock(signal_lock_);
398 if (port_closed_ || in_transit_)
399 return MOJO_RESULT_INVALID_ARGUMENT;
400 return watchers_.Add(watcher, context, GetHandleSignalsStateNoLock());
401 }
402
403 MojoResult MessagePipeDispatcher::RemoveWatcherRef(WatcherDispatcher* watcher,
404 uintptr_t context) {
405 base::AutoLock lock(signal_lock_);
406 if (port_closed_ || in_transit_)
407 return MOJO_RESULT_INVALID_ARGUMENT;
408 return watchers_.Remove(watcher, context);
409 }
410
411 MojoResult MessagePipeDispatcher::AddAwakable( 399 MojoResult MessagePipeDispatcher::AddAwakable(
412 Awakable* awakable, 400 Awakable* awakable,
413 MojoHandleSignals signals, 401 MojoHandleSignals signals,
414 uintptr_t context, 402 uintptr_t context,
415 HandleSignalsState* signals_state) { 403 HandleSignalsState* signals_state) {
416 base::AutoLock lock(signal_lock_); 404 base::AutoLock lock(signal_lock_);
417 405
418 if (port_closed_ || in_transit_) { 406 if (port_closed_ || in_transit_) {
419 if (signals_state) 407 if (signals_state)
420 *signals_state = HandleSignalsState(); 408 *signals_state = HandleSignalsState();
(...skipping 80 matching lines...) Expand 10 before | Expand all | Expand 10 after
501 port_transferred_ = true; 489 port_transferred_ = true;
502 in_transit_.Set(false); 490 in_transit_.Set(false);
503 CloseNoLock(); 491 CloseNoLock();
504 } 492 }
505 493
506 void MessagePipeDispatcher::CancelTransit() { 494 void MessagePipeDispatcher::CancelTransit() {
507 base::AutoLock lock(signal_lock_); 495 base::AutoLock lock(signal_lock_);
508 in_transit_.Set(false); 496 in_transit_.Set(false);
509 497
510 // Something may have happened while we were waiting for potential transit. 498 // Something may have happened while we were waiting for potential transit.
511 HandleSignalsState state = GetHandleSignalsStateNoLock(); 499 awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock());
512 awakables_.AwakeForStateChange(state);
513 watchers_.NotifyState(state);
514 } 500 }
515 501
516 // static 502 // static
517 scoped_refptr<Dispatcher> MessagePipeDispatcher::Deserialize( 503 scoped_refptr<Dispatcher> MessagePipeDispatcher::Deserialize(
518 const void* data, 504 const void* data,
519 size_t num_bytes, 505 size_t num_bytes,
520 const ports::PortName* ports, 506 const ports::PortName* ports,
521 size_t num_ports, 507 size_t num_ports,
522 PlatformHandle* handles, 508 PlatformHandle* handles,
523 size_t num_handles) { 509 size_t num_handles) {
(...skipping 15 matching lines...) Expand all
539 DCHECK(port_closed_ && !in_transit_); 525 DCHECK(port_closed_ && !in_transit_);
540 } 526 }
541 527
542 MojoResult MessagePipeDispatcher::CloseNoLock() { 528 MojoResult MessagePipeDispatcher::CloseNoLock() {
543 signal_lock_.AssertAcquired(); 529 signal_lock_.AssertAcquired();
544 if (port_closed_ || in_transit_) 530 if (port_closed_ || in_transit_)
545 return MOJO_RESULT_INVALID_ARGUMENT; 531 return MOJO_RESULT_INVALID_ARGUMENT;
546 532
547 port_closed_.Set(true); 533 port_closed_.Set(true);
548 awakables_.CancelAll(); 534 awakables_.CancelAll();
549 watchers_.NotifyClosed();
550 535
551 if (!port_transferred_) { 536 if (!port_transferred_) {
552 base::AutoUnlock unlock(signal_lock_); 537 base::AutoUnlock unlock(signal_lock_);
553 node_controller_->ClosePort(port_); 538 node_controller_->ClosePort(port_);
554 } 539 }
555 540
556 return MOJO_RESULT_OK; 541 return MOJO_RESULT_OK;
557 } 542 }
558 543
559 HandleSignalsState MessagePipeDispatcher::GetHandleSignalsStateNoLock() const { 544 HandleSignalsState MessagePipeDispatcher::GetHandleSignalsStateNoLock() const {
(...skipping 44 matching lines...) Expand 10 before | Expand all | Expand 10 after
604 << " endpoint " << endpoint_ << " [port=" << port_.name() 589 << " endpoint " << endpoint_ << " [port=" << port_.name()
605 << "; size=" << filter.message_size() << "]"; 590 << "; size=" << filter.message_size() << "]";
606 } 591 }
607 if (port_status.peer_closed) { 592 if (port_status.peer_closed) {
608 DVLOG(2) << "Peer closure detected on message pipe " << pipe_id_ 593 DVLOG(2) << "Peer closure detected on message pipe " << pipe_id_
609 << " endpoint " << endpoint_ << " [port=" << port_.name() << "]"; 594 << " endpoint " << endpoint_ << " [port=" << port_.name() << "]";
610 } 595 }
611 } 596 }
612 #endif 597 #endif
613 598
614 HandleSignalsState state = GetHandleSignalsStateNoLock(); 599 awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock());
615 awakables_.AwakeForStateChange(state);
616 watchers_.NotifyState(state);
617 } 600 }
618 601
619 } // namespace edk 602 } // namespace edk
620 } // namespace mojo 603 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/edk/system/message_pipe_dispatcher.h ('k') | mojo/edk/system/multiprocess_message_pipe_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698