Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(95)

Side by Side Diff: mojo/edk/system/data_pipe_producer_dispatcher.cc

Issue 2725133002: Mojo: Armed Watchers (Closed)
Patch Set: . Created 3 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « mojo/edk/system/data_pipe_producer_dispatcher.h ('k') | mojo/edk/system/data_pipe_unittest.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2013 The Chromium Authors. All rights reserved. 1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/edk/system/data_pipe_producer_dispatcher.h" 5 #include "mojo/edk/system/data_pipe_producer_dispatcher.h"
6 6
7 #include <stddef.h> 7 #include <stddef.h>
8 #include <stdint.h> 8 #include <stdint.h>
9 9
10 #include <utility> 10 #include <utility>
(...skipping 61 matching lines...) Expand 10 before | Expand all | Expand 10 after
72 NodeController* node_controller, 72 NodeController* node_controller,
73 const ports::PortRef& control_port, 73 const ports::PortRef& control_port,
74 scoped_refptr<PlatformSharedBuffer> shared_ring_buffer, 74 scoped_refptr<PlatformSharedBuffer> shared_ring_buffer,
75 const MojoCreateDataPipeOptions& options, 75 const MojoCreateDataPipeOptions& options,
76 bool initialized, 76 bool initialized,
77 uint64_t pipe_id) 77 uint64_t pipe_id)
78 : options_(options), 78 : options_(options),
79 node_controller_(node_controller), 79 node_controller_(node_controller),
80 control_port_(control_port), 80 control_port_(control_port),
81 pipe_id_(pipe_id), 81 pipe_id_(pipe_id),
82 watchers_(this),
82 shared_ring_buffer_(shared_ring_buffer), 83 shared_ring_buffer_(shared_ring_buffer),
83 available_capacity_(options_.capacity_num_bytes) { 84 available_capacity_(options_.capacity_num_bytes) {
84 if (initialized) { 85 if (initialized) {
85 base::AutoLock lock(lock_); 86 base::AutoLock lock(lock_);
86 InitializeNoLock(); 87 InitializeNoLock();
87 } 88 }
88 } 89 }
89 90
90 Dispatcher::Type DataPipeProducerDispatcher::GetType() const { 91 Dispatcher::Type DataPipeProducerDispatcher::GetType() const {
91 return Type::DATA_PIPE_PRODUCER; 92 return Type::DATA_PIPE_PRODUCER;
92 } 93 }
93 94
94 MojoResult DataPipeProducerDispatcher::Close() { 95 MojoResult DataPipeProducerDispatcher::Close() {
95 base::AutoLock lock(lock_); 96 base::AutoLock lock(lock_);
96 DVLOG(1) << "Closing data pipe producer " << pipe_id_; 97 DVLOG(1) << "Closing data pipe producer " << pipe_id_;
97 return CloseNoLock(); 98 return CloseNoLock();
98 } 99 }
99 100
100 MojoResult DataPipeProducerDispatcher::Watch(
101 MojoHandleSignals signals,
102 const Watcher::WatchCallback& callback,
103 uintptr_t context) {
104 base::AutoLock lock(lock_);
105
106 if (is_closed_ || in_transit_)
107 return MOJO_RESULT_INVALID_ARGUMENT;
108
109 return awakable_list_.AddWatcher(
110 signals, callback, context, GetHandleSignalsStateNoLock());
111 }
112
113 MojoResult DataPipeProducerDispatcher::CancelWatch(uintptr_t context) {
114 base::AutoLock lock(lock_);
115
116 if (is_closed_ || in_transit_)
117 return MOJO_RESULT_INVALID_ARGUMENT;
118
119 return awakable_list_.RemoveWatcher(context);
120 }
121
122 MojoResult DataPipeProducerDispatcher::WriteData(const void* elements, 101 MojoResult DataPipeProducerDispatcher::WriteData(const void* elements,
123 uint32_t* num_bytes, 102 uint32_t* num_bytes,
124 MojoWriteDataFlags flags) { 103 MojoWriteDataFlags flags) {
125 base::AutoLock lock(lock_); 104 base::AutoLock lock(lock_);
126 if (!shared_ring_buffer_ || in_transit_) 105 if (!shared_ring_buffer_ || in_transit_)
127 return MOJO_RESULT_INVALID_ARGUMENT; 106 return MOJO_RESULT_INVALID_ARGUMENT;
128 107
129 if (in_two_phase_write_) 108 if (in_two_phase_write_)
130 return MOJO_RESULT_BUSY; 109 return MOJO_RESULT_BUSY;
131 110
(...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after
172 memcpy(data, source + tail_bytes_to_write, head_bytes_to_write); 151 memcpy(data, source + tail_bytes_to_write, head_bytes_to_write);
173 152
174 DCHECK_LE(num_bytes_to_write, available_capacity_); 153 DCHECK_LE(num_bytes_to_write, available_capacity_);
175 available_capacity_ -= num_bytes_to_write; 154 available_capacity_ -= num_bytes_to_write;
176 write_offset_ = (write_offset_ + num_bytes_to_write) % 155 write_offset_ = (write_offset_ + num_bytes_to_write) %
177 options_.capacity_num_bytes; 156 options_.capacity_num_bytes;
178 157
179 HandleSignalsState new_state = GetHandleSignalsStateNoLock(); 158 HandleSignalsState new_state = GetHandleSignalsStateNoLock();
180 if (!new_state.equals(old_state)) 159 if (!new_state.equals(old_state))
181 awakable_list_.AwakeForStateChange(new_state); 160 awakable_list_.AwakeForStateChange(new_state);
161 watchers_.NotifyState(new_state);
182 162
183 base::AutoUnlock unlock(lock_); 163 base::AutoUnlock unlock(lock_);
184 NotifyWrite(num_bytes_to_write); 164 NotifyWrite(num_bytes_to_write);
185 165
186 return MOJO_RESULT_OK; 166 return MOJO_RESULT_OK;
187 } 167 }
188 168
189 MojoResult DataPipeProducerDispatcher::BeginWriteData( 169 MojoResult DataPipeProducerDispatcher::BeginWriteData(
190 void** buffer, 170 void** buffer,
191 uint32_t* buffer_num_bytes, 171 uint32_t* buffer_num_bytes,
192 MojoWriteDataFlags flags) { 172 MojoWriteDataFlags flags) {
193 base::AutoLock lock(lock_); 173 base::AutoLock lock(lock_);
194 if (!shared_ring_buffer_ || in_transit_) 174 if (!shared_ring_buffer_ || in_transit_)
195 return MOJO_RESULT_INVALID_ARGUMENT; 175 return MOJO_RESULT_INVALID_ARGUMENT;
176
177 // These flags may not be used in two-phase mode.
178 if (flags & MOJO_WRITE_DATA_FLAG_ALL_OR_NONE)
179 return MOJO_RESULT_INVALID_ARGUMENT;
180
196 if (in_two_phase_write_) 181 if (in_two_phase_write_)
197 return MOJO_RESULT_BUSY; 182 return MOJO_RESULT_BUSY;
198 if (peer_closed_) 183 if (peer_closed_)
199 return MOJO_RESULT_FAILED_PRECONDITION; 184 return MOJO_RESULT_FAILED_PRECONDITION;
200 185
201 if (available_capacity_ == 0) { 186 if (available_capacity_ == 0) {
202 return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION 187 return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION
203 : MOJO_RESULT_SHOULD_WAIT; 188 : MOJO_RESULT_SHOULD_WAIT;
204 } 189 }
205 190
(...skipping 38 matching lines...) Expand 10 before | Expand all | Expand 10 after
244 NotifyWrite(num_bytes_written); 229 NotifyWrite(num_bytes_written);
245 } 230 }
246 231
247 in_two_phase_write_ = false; 232 in_two_phase_write_ = false;
248 233
249 // If we're now writable, we *became* writable (since we weren't writable 234 // If we're now writable, we *became* writable (since we weren't writable
250 // during the two-phase write), so awake producer awakables. 235 // during the two-phase write), so awake producer awakables.
251 HandleSignalsState new_state = GetHandleSignalsStateNoLock(); 236 HandleSignalsState new_state = GetHandleSignalsStateNoLock();
252 if (new_state.satisfies(MOJO_HANDLE_SIGNAL_WRITABLE)) 237 if (new_state.satisfies(MOJO_HANDLE_SIGNAL_WRITABLE))
253 awakable_list_.AwakeForStateChange(new_state); 238 awakable_list_.AwakeForStateChange(new_state);
239 watchers_.NotifyState(new_state);
254 240
255 return rv; 241 return rv;
256 } 242 }
257 243
258 HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsState() const { 244 HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsState() const {
259 base::AutoLock lock(lock_); 245 base::AutoLock lock(lock_);
260 return GetHandleSignalsStateNoLock(); 246 return GetHandleSignalsStateNoLock();
261 } 247 }
262 248
249 MojoResult DataPipeProducerDispatcher::AddWatcherRef(
250 const scoped_refptr<WatcherDispatcher>& watcher,
251 uintptr_t context) {
252 base::AutoLock lock(lock_);
253 if (is_closed_ || in_transit_)
254 return MOJO_RESULT_INVALID_ARGUMENT;
255 return watchers_.Add(watcher, context, GetHandleSignalsStateNoLock());
256 }
257
258 MojoResult DataPipeProducerDispatcher::RemoveWatcherRef(
259 WatcherDispatcher* watcher,
260 uintptr_t context) {
261 base::AutoLock lock(lock_);
262 if (is_closed_ || in_transit_)
263 return MOJO_RESULT_INVALID_ARGUMENT;
264 return watchers_.Remove(watcher, context);
265 }
266
263 MojoResult DataPipeProducerDispatcher::AddAwakable( 267 MojoResult DataPipeProducerDispatcher::AddAwakable(
264 Awakable* awakable, 268 Awakable* awakable,
265 MojoHandleSignals signals, 269 MojoHandleSignals signals,
266 uintptr_t context, 270 uintptr_t context,
267 HandleSignalsState* signals_state) { 271 HandleSignalsState* signals_state) {
268 base::AutoLock lock(lock_); 272 base::AutoLock lock(lock_);
269 if (!shared_ring_buffer_ || in_transit_) { 273 if (!shared_ring_buffer_ || in_transit_) {
270 if (signals_state) 274 if (signals_state)
271 *signals_state = HandleSignalsState(); 275 *signals_state = HandleSignalsState();
272 return MOJO_RESULT_INVALID_ARGUMENT; 276 return MOJO_RESULT_INVALID_ARGUMENT;
(...skipping 76 matching lines...) Expand 10 before | Expand all | Expand 10 after
349 in_transit_ = false; 353 in_transit_ = false;
350 ignore_result(buffer_handle_for_transit_.release()); 354 ignore_result(buffer_handle_for_transit_.release());
351 CloseNoLock(); 355 CloseNoLock();
352 } 356 }
353 357
354 void DataPipeProducerDispatcher::CancelTransit() { 358 void DataPipeProducerDispatcher::CancelTransit() {
355 base::AutoLock lock(lock_); 359 base::AutoLock lock(lock_);
356 DCHECK(in_transit_); 360 DCHECK(in_transit_);
357 in_transit_ = false; 361 in_transit_ = false;
358 buffer_handle_for_transit_.reset(); 362 buffer_handle_for_transit_.reset();
359 awakable_list_.AwakeForStateChange(GetHandleSignalsStateNoLock()); 363
364 HandleSignalsState state = GetHandleSignalsStateNoLock();
365 awakable_list_.AwakeForStateChange(state);
366 watchers_.NotifyState(state);
360 } 367 }
361 368
362 // static 369 // static
363 scoped_refptr<DataPipeProducerDispatcher> 370 scoped_refptr<DataPipeProducerDispatcher>
364 DataPipeProducerDispatcher::Deserialize(const void* data, 371 DataPipeProducerDispatcher::Deserialize(const void* data,
365 size_t num_bytes, 372 size_t num_bytes,
366 const ports::PortName* ports, 373 const ports::PortName* ports,
367 size_t num_ports, 374 size_t num_ports,
368 PlatformHandle* handles, 375 PlatformHandle* handles,
369 size_t num_handles) { 376 size_t num_handles) {
(...skipping 63 matching lines...) Expand 10 before | Expand all | Expand 10 after
433 440
434 MojoResult DataPipeProducerDispatcher::CloseNoLock() { 441 MojoResult DataPipeProducerDispatcher::CloseNoLock() {
435 lock_.AssertAcquired(); 442 lock_.AssertAcquired();
436 if (is_closed_ || in_transit_) 443 if (is_closed_ || in_transit_)
437 return MOJO_RESULT_INVALID_ARGUMENT; 444 return MOJO_RESULT_INVALID_ARGUMENT;
438 is_closed_ = true; 445 is_closed_ = true;
439 ring_buffer_mapping_.reset(); 446 ring_buffer_mapping_.reset();
440 shared_ring_buffer_ = nullptr; 447 shared_ring_buffer_ = nullptr;
441 448
442 awakable_list_.CancelAll(); 449 awakable_list_.CancelAll();
450 watchers_.NotifyClosed();
443 if (!transferred_) { 451 if (!transferred_) {
444 base::AutoUnlock unlock(lock_); 452 base::AutoUnlock unlock(lock_);
445 node_controller_->ClosePort(control_port_); 453 node_controller_->ClosePort(control_port_);
446 } 454 }
447 455
448 return MOJO_RESULT_OK; 456 return MOJO_RESULT_OK;
449 } 457 }
450 458
451 HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsStateNoLock() 459 HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsStateNoLock()
452 const { 460 const {
453 lock_.AssertAcquired(); 461 lock_.AssertAcquired();
454 HandleSignalsState rv; 462 HandleSignalsState rv;
455 if (!peer_closed_) { 463 if (!peer_closed_) {
456 if (!in_two_phase_write_ && shared_ring_buffer_ && 464 if (!in_two_phase_write_ && shared_ring_buffer_ && available_capacity_ > 0)
457 available_capacity_ > 0)
458 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; 465 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
459 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; 466 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
460 } else { 467 } else {
461 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; 468 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
462 } 469 }
463 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; 470 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
464 return rv; 471 return rv;
465 } 472 }
466 473
467 void DataPipeProducerDispatcher::NotifyWrite(uint32_t num_bytes) { 474 void DataPipeProducerDispatcher::NotifyWrite(uint32_t num_bytes) {
(...skipping 66 matching lines...) Expand 10 before | Expand all | Expand 10 after
534 << m->num_bytes << " bytes were read. [control_port=" 541 << m->num_bytes << " bytes were read. [control_port="
535 << control_port_.name() << "]"; 542 << control_port_.name() << "]";
536 543
537 available_capacity_ += m->num_bytes; 544 available_capacity_ += m->num_bytes;
538 } 545 }
539 } while (message); 546 } while (message);
540 } 547 }
541 548
542 if (peer_closed_ != was_peer_closed || 549 if (peer_closed_ != was_peer_closed ||
543 available_capacity_ != previous_capacity) { 550 available_capacity_ != previous_capacity) {
544 awakable_list_.AwakeForStateChange(GetHandleSignalsStateNoLock()); 551 HandleSignalsState state = GetHandleSignalsStateNoLock();
552 awakable_list_.AwakeForStateChange(state);
553 watchers_.NotifyState(state);
545 } 554 }
546 } 555 }
547 556
548 } // namespace edk 557 } // namespace edk
549 } // namespace mojo 558 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/edk/system/data_pipe_producer_dispatcher.h ('k') | mojo/edk/system/data_pipe_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698