Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(35)

Side by Side Diff: mojo/edk/system/data_pipe_producer_dispatcher.cc

Issue 2725133002: Mojo: Armed Watchers (Closed)
Patch Set: . Created 3 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2013 The Chromium Authors. All rights reserved. 1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/edk/system/data_pipe_producer_dispatcher.h" 5 #include "mojo/edk/system/data_pipe_producer_dispatcher.h"
6 6
7 #include <stddef.h> 7 #include <stddef.h>
8 #include <stdint.h> 8 #include <stdint.h>
9 9
10 #include <utility> 10 #include <utility>
(...skipping 61 matching lines...) Expand 10 before | Expand all | Expand 10 after
72 NodeController* node_controller, 72 NodeController* node_controller,
73 const ports::PortRef& control_port, 73 const ports::PortRef& control_port,
74 scoped_refptr<PlatformSharedBuffer> shared_ring_buffer, 74 scoped_refptr<PlatformSharedBuffer> shared_ring_buffer,
75 const MojoCreateDataPipeOptions& options, 75 const MojoCreateDataPipeOptions& options,
76 bool initialized, 76 bool initialized,
77 uint64_t pipe_id) 77 uint64_t pipe_id)
78 : options_(options), 78 : options_(options),
79 node_controller_(node_controller), 79 node_controller_(node_controller),
80 control_port_(control_port), 80 control_port_(control_port),
81 pipe_id_(pipe_id), 81 pipe_id_(pipe_id),
82 watchers_(this),
82 shared_ring_buffer_(shared_ring_buffer), 83 shared_ring_buffer_(shared_ring_buffer),
83 available_capacity_(options_.capacity_num_bytes) { 84 available_capacity_(options_.capacity_num_bytes) {
84 if (initialized) { 85 if (initialized) {
85 base::AutoLock lock(lock_); 86 base::AutoLock lock(lock_);
86 InitializeNoLock(); 87 InitializeNoLock();
87 } 88 }
88 } 89 }
89 90
90 Dispatcher::Type DataPipeProducerDispatcher::GetType() const { 91 Dispatcher::Type DataPipeProducerDispatcher::GetType() const {
91 return Type::DATA_PIPE_PRODUCER; 92 return Type::DATA_PIPE_PRODUCER;
92 } 93 }
93 94
94 MojoResult DataPipeProducerDispatcher::Close() { 95 MojoResult DataPipeProducerDispatcher::Close() {
95 base::AutoLock lock(lock_); 96 base::AutoLock lock(lock_);
96 DVLOG(1) << "Closing data pipe producer " << pipe_id_; 97 DVLOG(1) << "Closing data pipe producer " << pipe_id_;
97 return CloseNoLock(); 98 return CloseNoLock();
98 } 99 }
99 100
100 MojoResult DataPipeProducerDispatcher::Watch(
101 MojoHandleSignals signals,
102 const Watcher::WatchCallback& callback,
103 uintptr_t context) {
104 base::AutoLock lock(lock_);
105
106 if (is_closed_ || in_transit_)
107 return MOJO_RESULT_INVALID_ARGUMENT;
108
109 return awakable_list_.AddWatcher(
110 signals, callback, context, GetHandleSignalsStateNoLock());
111 }
112
113 MojoResult DataPipeProducerDispatcher::CancelWatch(uintptr_t context) {
114 base::AutoLock lock(lock_);
115
116 if (is_closed_ || in_transit_)
117 return MOJO_RESULT_INVALID_ARGUMENT;
118
119 return awakable_list_.RemoveWatcher(context);
120 }
121
122 MojoResult DataPipeProducerDispatcher::WriteData(const void* elements, 101 MojoResult DataPipeProducerDispatcher::WriteData(const void* elements,
123 uint32_t* num_bytes, 102 uint32_t* num_bytes,
124 MojoWriteDataFlags flags) { 103 MojoWriteDataFlags flags) {
125 base::AutoLock lock(lock_); 104 base::AutoLock lock(lock_);
126 if (!shared_ring_buffer_ || in_transit_) 105 if (!shared_ring_buffer_ || in_transit_)
127 return MOJO_RESULT_INVALID_ARGUMENT; 106 return MOJO_RESULT_INVALID_ARGUMENT;
128 107
129 if (in_two_phase_write_) 108 if (in_two_phase_write_)
130 return MOJO_RESULT_BUSY; 109 return MOJO_RESULT_BUSY;
131 110
132 if (peer_closed_) 111 if (peer_closed_)
133 return MOJO_RESULT_FAILED_PRECONDITION; 112 return MOJO_RESULT_FAILED_PRECONDITION;
134 113
135 if (*num_bytes % options_.element_num_bytes != 0) 114 if (*num_bytes % options_.element_num_bytes != 0)
136 return MOJO_RESULT_INVALID_ARGUMENT; 115 return MOJO_RESULT_INVALID_ARGUMENT;
137 if (*num_bytes == 0) 116 if (*num_bytes == 0)
138 return MOJO_RESULT_OK; // Nothing to do. 117 return MOJO_RESULT_OK; // Nothing to do.
139 118
140 if ((flags & MOJO_WRITE_DATA_FLAG_ALL_OR_NONE) && 119 if ((flags & MOJO_WRITE_DATA_FLAG_ALL_OR_NONE) &&
141 (*num_bytes > available_capacity_)) { 120 (*num_bytes > available_capacity_)) {
121 if (flags & MOJO_WRITE_DATA_FLAG_CLEAR_SIGNAL) {
122 suppress_writable_signal_ = true;
123 watchers_.NotifyState(GetHandleSignalsStateNoLock());
124 }
142 // Don't return "should wait" since you can't wait for a specified amount of 125 // Don't return "should wait" since you can't wait for a specified amount of
143 // data. 126 // data.
127 //
128 // TODO(rockot): Reconsider this case now that we have support for minimum
129 // signaling capacity.
144 return MOJO_RESULT_OUT_OF_RANGE; 130 return MOJO_RESULT_OUT_OF_RANGE;
145 } 131 }
146 132
147 DCHECK_LE(available_capacity_, options_.capacity_num_bytes); 133 DCHECK_LE(available_capacity_, options_.capacity_num_bytes);
148 uint32_t num_bytes_to_write = std::min(*num_bytes, available_capacity_); 134 uint32_t num_bytes_to_write = std::min(*num_bytes, available_capacity_);
149 if (num_bytes_to_write == 0) 135 if (num_bytes_to_write == 0)
150 return MOJO_RESULT_SHOULD_WAIT; 136 return MOJO_RESULT_SHOULD_WAIT;
151 137
152 HandleSignalsState old_state = GetHandleSignalsStateNoLock(); 138 HandleSignalsState old_state = GetHandleSignalsStateNoLock();
153 139
(...skipping 18 matching lines...) Expand all
172 memcpy(data, source + tail_bytes_to_write, head_bytes_to_write); 158 memcpy(data, source + tail_bytes_to_write, head_bytes_to_write);
173 159
174 DCHECK_LE(num_bytes_to_write, available_capacity_); 160 DCHECK_LE(num_bytes_to_write, available_capacity_);
175 available_capacity_ -= num_bytes_to_write; 161 available_capacity_ -= num_bytes_to_write;
176 write_offset_ = (write_offset_ + num_bytes_to_write) % 162 write_offset_ = (write_offset_ + num_bytes_to_write) %
177 options_.capacity_num_bytes; 163 options_.capacity_num_bytes;
178 164
179 HandleSignalsState new_state = GetHandleSignalsStateNoLock(); 165 HandleSignalsState new_state = GetHandleSignalsStateNoLock();
180 if (!new_state.equals(old_state)) 166 if (!new_state.equals(old_state))
181 awakable_list_.AwakeForStateChange(new_state); 167 awakable_list_.AwakeForStateChange(new_state);
168 watchers_.NotifyState(new_state);
182 169
183 base::AutoUnlock unlock(lock_); 170 base::AutoUnlock unlock(lock_);
184 NotifyWrite(num_bytes_to_write); 171 NotifyWrite(num_bytes_to_write);
185 172
186 return MOJO_RESULT_OK; 173 return MOJO_RESULT_OK;
187 } 174 }
188 175
189 MojoResult DataPipeProducerDispatcher::BeginWriteData( 176 MojoResult DataPipeProducerDispatcher::BeginWriteData(
190 void** buffer, 177 void** buffer,
191 uint32_t* buffer_num_bytes, 178 uint32_t* buffer_num_bytes,
192 MojoWriteDataFlags flags) { 179 MojoWriteDataFlags flags) {
193 base::AutoLock lock(lock_); 180 base::AutoLock lock(lock_);
194 if (!shared_ring_buffer_ || in_transit_) 181 if (!shared_ring_buffer_ || in_transit_)
195 return MOJO_RESULT_INVALID_ARGUMENT; 182 return MOJO_RESULT_INVALID_ARGUMENT;
183
184 // These flags may not be used in two-phase mode.
185 if ((flags & MOJO_WRITE_DATA_FLAG_ALL_OR_NONE) ||
186 (flags & MOJO_WRITE_DATA_FLAG_CLEAR_SIGNAL))
187 return MOJO_RESULT_INVALID_ARGUMENT;
188
196 if (in_two_phase_write_) 189 if (in_two_phase_write_)
197 return MOJO_RESULT_BUSY; 190 return MOJO_RESULT_BUSY;
198 if (peer_closed_) 191 if (peer_closed_)
199 return MOJO_RESULT_FAILED_PRECONDITION; 192 return MOJO_RESULT_FAILED_PRECONDITION;
200 193
201 if (available_capacity_ == 0) { 194 if (available_capacity_ == 0) {
202 return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION 195 return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION
203 : MOJO_RESULT_SHOULD_WAIT; 196 : MOJO_RESULT_SHOULD_WAIT;
204 } 197 }
205 198
(...skipping 38 matching lines...) Expand 10 before | Expand all | Expand 10 after
244 NotifyWrite(num_bytes_written); 237 NotifyWrite(num_bytes_written);
245 } 238 }
246 239
247 in_two_phase_write_ = false; 240 in_two_phase_write_ = false;
248 241
249 // If we're now writable, we *became* writable (since we weren't writable 242 // If we're now writable, we *became* writable (since we weren't writable
250 // during the two-phase write), so awake producer awakables. 243 // during the two-phase write), so awake producer awakables.
251 HandleSignalsState new_state = GetHandleSignalsStateNoLock(); 244 HandleSignalsState new_state = GetHandleSignalsStateNoLock();
252 if (new_state.satisfies(MOJO_HANDLE_SIGNAL_WRITABLE)) 245 if (new_state.satisfies(MOJO_HANDLE_SIGNAL_WRITABLE))
253 awakable_list_.AwakeForStateChange(new_state); 246 awakable_list_.AwakeForStateChange(new_state);
247 watchers_.NotifyState(new_state);
254 248
255 return rv; 249 return rv;
256 } 250 }
257 251
258 HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsState() const { 252 HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsState() const {
259 base::AutoLock lock(lock_); 253 base::AutoLock lock(lock_);
260 return GetHandleSignalsStateNoLock(); 254 return GetHandleSignalsStateNoLock();
261 } 255 }
262 256
257 MojoResult DataPipeProducerDispatcher::AddWatcherRef(
258 const scoped_refptr<WatcherDispatcher>& watcher,
259 uintptr_t context) {
260 base::AutoLock lock(lock_);
261 if (is_closed_ || in_transit_)
262 return MOJO_RESULT_INVALID_ARGUMENT;
263 return watchers_.Add(watcher, context, GetHandleSignalsStateNoLock());
264 }
265
266 MojoResult DataPipeProducerDispatcher::RemoveWatcherRef(
267 WatcherDispatcher* watcher,
268 uintptr_t context) {
269 base::AutoLock lock(lock_);
270 if (is_closed_ || in_transit_)
271 return MOJO_RESULT_INVALID_ARGUMENT;
272 return watchers_.Remove(watcher, context);
273 }
274
263 MojoResult DataPipeProducerDispatcher::AddAwakable( 275 MojoResult DataPipeProducerDispatcher::AddAwakable(
264 Awakable* awakable, 276 Awakable* awakable,
265 MojoHandleSignals signals, 277 MojoHandleSignals signals,
266 uintptr_t context, 278 uintptr_t context,
267 HandleSignalsState* signals_state) { 279 HandleSignalsState* signals_state) {
268 base::AutoLock lock(lock_); 280 base::AutoLock lock(lock_);
269 if (!shared_ring_buffer_ || in_transit_) { 281 if (!shared_ring_buffer_ || in_transit_) {
270 if (signals_state) 282 if (signals_state)
271 *signals_state = HandleSignalsState(); 283 *signals_state = HandleSignalsState();
272 return MOJO_RESULT_INVALID_ARGUMENT; 284 return MOJO_RESULT_INVALID_ARGUMENT;
(...skipping 76 matching lines...) Expand 10 before | Expand all | Expand 10 after
349 in_transit_ = false; 361 in_transit_ = false;
350 ignore_result(buffer_handle_for_transit_.release()); 362 ignore_result(buffer_handle_for_transit_.release());
351 CloseNoLock(); 363 CloseNoLock();
352 } 364 }
353 365
354 void DataPipeProducerDispatcher::CancelTransit() { 366 void DataPipeProducerDispatcher::CancelTransit() {
355 base::AutoLock lock(lock_); 367 base::AutoLock lock(lock_);
356 DCHECK(in_transit_); 368 DCHECK(in_transit_);
357 in_transit_ = false; 369 in_transit_ = false;
358 buffer_handle_for_transit_.reset(); 370 buffer_handle_for_transit_.reset();
359 awakable_list_.AwakeForStateChange(GetHandleSignalsStateNoLock()); 371
372 HandleSignalsState state = GetHandleSignalsStateNoLock();
373 awakable_list_.AwakeForStateChange(state);
374 watchers_.NotifyState(state);
360 } 375 }
361 376
362 // static 377 // static
363 scoped_refptr<DataPipeProducerDispatcher> 378 scoped_refptr<DataPipeProducerDispatcher>
364 DataPipeProducerDispatcher::Deserialize(const void* data, 379 DataPipeProducerDispatcher::Deserialize(const void* data,
365 size_t num_bytes, 380 size_t num_bytes,
366 const ports::PortName* ports, 381 const ports::PortName* ports,
367 size_t num_ports, 382 size_t num_ports,
368 PlatformHandle* handles, 383 PlatformHandle* handles,
369 size_t num_handles) { 384 size_t num_handles) {
(...skipping 63 matching lines...) Expand 10 before | Expand all | Expand 10 after
433 448
434 MojoResult DataPipeProducerDispatcher::CloseNoLock() { 449 MojoResult DataPipeProducerDispatcher::CloseNoLock() {
435 lock_.AssertAcquired(); 450 lock_.AssertAcquired();
436 if (is_closed_ || in_transit_) 451 if (is_closed_ || in_transit_)
437 return MOJO_RESULT_INVALID_ARGUMENT; 452 return MOJO_RESULT_INVALID_ARGUMENT;
438 is_closed_ = true; 453 is_closed_ = true;
439 ring_buffer_mapping_.reset(); 454 ring_buffer_mapping_.reset();
440 shared_ring_buffer_ = nullptr; 455 shared_ring_buffer_ = nullptr;
441 456
442 awakable_list_.CancelAll(); 457 awakable_list_.CancelAll();
458 watchers_.NotifyClosed();
443 if (!transferred_) { 459 if (!transferred_) {
444 base::AutoUnlock unlock(lock_); 460 base::AutoUnlock unlock(lock_);
445 node_controller_->ClosePort(control_port_); 461 node_controller_->ClosePort(control_port_);
446 } 462 }
447 463
448 return MOJO_RESULT_OK; 464 return MOJO_RESULT_OK;
449 } 465 }
450 466
451 HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsStateNoLock() 467 HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsStateNoLock()
452 const { 468 const {
453 lock_.AssertAcquired(); 469 lock_.AssertAcquired();
454 HandleSignalsState rv; 470 HandleSignalsState rv;
455 if (!peer_closed_) { 471 if (!peer_closed_) {
456 if (!in_two_phase_write_ && shared_ring_buffer_ && 472 if (!in_two_phase_write_ && shared_ring_buffer_ &&
457 available_capacity_ > 0) 473 available_capacity_ > 0 && !suppress_writable_signal_)
458 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; 474 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
459 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; 475 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
460 } else { 476 } else {
461 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; 477 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
462 } 478 }
463 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; 479 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
464 return rv; 480 return rv;
465 } 481 }
466 482
467 void DataPipeProducerDispatcher::NotifyWrite(uint32_t num_bytes) { 483 void DataPipeProducerDispatcher::NotifyWrite(uint32_t num_bytes) {
(...skipping 66 matching lines...) Expand 10 before | Expand all | Expand 10 after
534 << m->num_bytes << " bytes were read. [control_port=" 550 << m->num_bytes << " bytes were read. [control_port="
535 << control_port_.name() << "]"; 551 << control_port_.name() << "]";
536 552
537 available_capacity_ += m->num_bytes; 553 available_capacity_ += m->num_bytes;
538 } 554 }
539 } while (message); 555 } while (message);
540 } 556 }
541 557
542 if (peer_closed_ != was_peer_closed || 558 if (peer_closed_ != was_peer_closed ||
543 available_capacity_ != previous_capacity) { 559 available_capacity_ != previous_capacity) {
544 awakable_list_.AwakeForStateChange(GetHandleSignalsStateNoLock()); 560 suppress_writable_signal_ = false;
561 HandleSignalsState state = GetHandleSignalsStateNoLock();
562 awakable_list_.AwakeForStateChange(state);
563 watchers_.NotifyState(state);
545 } 564 }
546 } 565 }
547 566
548 } // namespace edk 567 } // namespace edk
549 } // namespace mojo 568 } // namespace mojo
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698