Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(121)

Side by Side Diff: mojo/edk/system/data_pipe_consumer_dispatcher.cc

Issue 2741353002: Mojo: Add signal for new data pipe consumer data (Closed)
Patch Set: rebase Created 3 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « mojo/edk/system/data_pipe_consumer_dispatcher.h ('k') | mojo/edk/system/data_pipe_unittest.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2013 The Chromium Authors. All rights reserved. 1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/edk/system/data_pipe_consumer_dispatcher.h" 5 #include "mojo/edk/system/data_pipe_consumer_dispatcher.h"
6 6
7 #include <stddef.h> 7 #include <stddef.h>
8 #include <stdint.h> 8 #include <stdint.h>
9 9
10 #include <algorithm> 10 #include <algorithm>
(...skipping 106 matching lines...) Expand 10 before | Expand all | Expand 10 after
117 if (is_closed_ || in_transit_) 117 if (is_closed_ || in_transit_)
118 return MOJO_RESULT_INVALID_ARGUMENT; 118 return MOJO_RESULT_INVALID_ARGUMENT;
119 119
120 return awakable_list_.RemoveWatcher(context); 120 return awakable_list_.RemoveWatcher(context);
121 } 121 }
122 122
123 MojoResult DataPipeConsumerDispatcher::ReadData(void* elements, 123 MojoResult DataPipeConsumerDispatcher::ReadData(void* elements,
124 uint32_t* num_bytes, 124 uint32_t* num_bytes,
125 MojoReadDataFlags flags) { 125 MojoReadDataFlags flags) {
126 base::AutoLock lock(lock_); 126 base::AutoLock lock(lock_);
127 new_data_available_ = false;
128
127 if (!shared_ring_buffer_ || in_transit_) 129 if (!shared_ring_buffer_ || in_transit_)
128 return MOJO_RESULT_INVALID_ARGUMENT; 130 return MOJO_RESULT_INVALID_ARGUMENT;
129 131
130 if (in_two_phase_read_) 132 if (in_two_phase_read_)
131 return MOJO_RESULT_BUSY; 133 return MOJO_RESULT_BUSY;
132 134
133 if ((flags & MOJO_READ_DATA_FLAG_QUERY)) { 135 if ((flags & MOJO_READ_DATA_FLAG_QUERY)) {
134 if ((flags & MOJO_READ_DATA_FLAG_PEEK) || 136 if ((flags & MOJO_READ_DATA_FLAG_PEEK) ||
135 (flags & MOJO_READ_DATA_FLAG_DISCARD)) 137 (flags & MOJO_READ_DATA_FLAG_DISCARD))
136 return MOJO_RESULT_INVALID_ARGUMENT; 138 return MOJO_RESULT_INVALID_ARGUMENT;
(...skipping 60 matching lines...) Expand 10 before | Expand all | Expand 10 after
197 NotifyRead(bytes_to_read); 199 NotifyRead(bytes_to_read);
198 } 200 }
199 201
200 return MOJO_RESULT_OK; 202 return MOJO_RESULT_OK;
201 } 203 }
202 204
203 MojoResult DataPipeConsumerDispatcher::BeginReadData(const void** buffer, 205 MojoResult DataPipeConsumerDispatcher::BeginReadData(const void** buffer,
204 uint32_t* buffer_num_bytes, 206 uint32_t* buffer_num_bytes,
205 MojoReadDataFlags flags) { 207 MojoReadDataFlags flags) {
206 base::AutoLock lock(lock_); 208 base::AutoLock lock(lock_);
209 new_data_available_ = false;
207 if (!shared_ring_buffer_ || in_transit_) 210 if (!shared_ring_buffer_ || in_transit_)
208 return MOJO_RESULT_INVALID_ARGUMENT; 211 return MOJO_RESULT_INVALID_ARGUMENT;
209 212
210 if (in_two_phase_read_) 213 if (in_two_phase_read_)
211 return MOJO_RESULT_BUSY; 214 return MOJO_RESULT_BUSY;
212 215
213 // These flags may not be used in two-phase mode. 216 // These flags may not be used in two-phase mode.
214 if ((flags & MOJO_READ_DATA_FLAG_DISCARD) || 217 if ((flags & MOJO_READ_DATA_FLAG_DISCARD) ||
215 (flags & MOJO_READ_DATA_FLAG_QUERY) || 218 (flags & MOJO_READ_DATA_FLAG_QUERY) ||
216 (flags & MOJO_READ_DATA_FLAG_PEEK)) 219 (flags & MOJO_READ_DATA_FLAG_PEEK))
(...skipping 195 matching lines...) Expand 10 before | Expand all | Expand 10 after
412 415
413 scoped_refptr<DataPipeConsumerDispatcher> dispatcher = 416 scoped_refptr<DataPipeConsumerDispatcher> dispatcher =
414 new DataPipeConsumerDispatcher(node_controller, port, ring_buffer, 417 new DataPipeConsumerDispatcher(node_controller, port, ring_buffer,
415 state->options, false /* initialized */, 418 state->options, false /* initialized */,
416 state->pipe_id); 419 state->pipe_id);
417 420
418 { 421 {
419 base::AutoLock lock(dispatcher->lock_); 422 base::AutoLock lock(dispatcher->lock_);
420 dispatcher->read_offset_ = state->read_offset; 423 dispatcher->read_offset_ = state->read_offset;
421 dispatcher->bytes_available_ = state->bytes_available; 424 dispatcher->bytes_available_ = state->bytes_available;
425 dispatcher->new_data_available_ = state->bytes_available > 0;
422 dispatcher->peer_closed_ = state->flags & kFlagPeerClosed; 426 dispatcher->peer_closed_ = state->flags & kFlagPeerClosed;
423 dispatcher->InitializeNoLock(); 427 dispatcher->InitializeNoLock();
424 dispatcher->UpdateSignalsStateNoLock(); 428 dispatcher->UpdateSignalsStateNoLock();
425 } 429 }
426 430
427 return dispatcher; 431 return dispatcher;
428 } 432 }
429 433
430 DataPipeConsumerDispatcher::~DataPipeConsumerDispatcher() { 434 DataPipeConsumerDispatcher::~DataPipeConsumerDispatcher() {
431 DCHECK(is_closed_ && !shared_ring_buffer_ && !ring_buffer_mapping_ && 435 DCHECK(is_closed_ && !shared_ring_buffer_ && !ring_buffer_mapping_ &&
(...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after
467 471
468 return MOJO_RESULT_OK; 472 return MOJO_RESULT_OK;
469 } 473 }
470 474
471 HandleSignalsState 475 HandleSignalsState
472 DataPipeConsumerDispatcher::GetHandleSignalsStateNoLock() const { 476 DataPipeConsumerDispatcher::GetHandleSignalsStateNoLock() const {
473 lock_.AssertAcquired(); 477 lock_.AssertAcquired();
474 478
475 HandleSignalsState rv; 479 HandleSignalsState rv;
476 if (shared_ring_buffer_ && bytes_available_) { 480 if (shared_ring_buffer_ && bytes_available_) {
477 if (!in_two_phase_read_) 481 if (!in_two_phase_read_) {
478 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_READABLE; 482 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_READABLE;
483 if (new_data_available_)
484 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE;
485 }
479 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE; 486 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE;
480 } else if (!peer_closed_ && shared_ring_buffer_) { 487 } else if (!peer_closed_ && shared_ring_buffer_) {
481 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE; 488 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE;
482 } 489 }
483 490
491 if (shared_ring_buffer_) {
492 if (new_data_available_ || !peer_closed_)
493 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE;
494 }
495
484 if (peer_closed_) 496 if (peer_closed_)
485 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; 497 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
486 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; 498 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
499
487 return rv; 500 return rv;
488 } 501 }
489 502
490 void DataPipeConsumerDispatcher::NotifyRead(uint32_t num_bytes) { 503 void DataPipeConsumerDispatcher::NotifyRead(uint32_t num_bytes) {
491 DVLOG(1) << "Data pipe consumer " << pipe_id_ << " notifying peer: " 504 DVLOG(1) << "Data pipe consumer " << pipe_id_ << " notifying peer: "
492 << num_bytes << " bytes read. [control_port=" 505 << num_bytes << " bytes read. [control_port="
493 << control_port_.name() << "]"; 506 << control_port_.name() << "]";
494 507
495 SendDataPipeControlMessage(node_controller_, control_port_, 508 SendDataPipeControlMessage(node_controller_, control_port_,
496 DataPipeCommand::DATA_WAS_READ, num_bytes); 509 DataPipeCommand::DATA_WAS_READ, num_bytes);
(...skipping 59 matching lines...) Expand 10 before | Expand all | Expand 10 after
556 569
557 DVLOG(1) << "Data pipe consumer " << pipe_id_ << " is aware that " 570 DVLOG(1) << "Data pipe consumer " << pipe_id_ << " is aware that "
558 << m->num_bytes << " bytes were written. [control_port=" 571 << m->num_bytes << " bytes were written. [control_port="
559 << control_port_.name() << "]"; 572 << control_port_.name() << "]";
560 573
561 bytes_available_ += m->num_bytes; 574 bytes_available_ += m->num_bytes;
562 } 575 }
563 } while (message); 576 } while (message);
564 } 577 }
565 578
566 if (peer_closed_ != was_peer_closed || 579 bool has_new_data = bytes_available_ != previous_bytes_available;
567 bytes_available_ != previous_bytes_available) { 580 if (has_new_data)
581 new_data_available_ = true;
582
583 if (peer_closed_ != was_peer_closed || has_new_data) {
568 awakable_list_.AwakeForStateChange(GetHandleSignalsStateNoLock()); 584 awakable_list_.AwakeForStateChange(GetHandleSignalsStateNoLock());
569 } 585 }
570 } 586 }
571 587
572 } // namespace edk 588 } // namespace edk
573 } // namespace mojo 589 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/edk/system/data_pipe_consumer_dispatcher.h ('k') | mojo/edk/system/data_pipe_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698