Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1177)

Side by Side Diff: mojo/edk/system/data_pipe_producer_dispatcher.cc

Issue 2750373002: Revert of Mojo: Armed Watchers (Closed)
Patch Set: Created 3 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « mojo/edk/system/data_pipe_producer_dispatcher.h ('k') | mojo/edk/system/data_pipe_unittest.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2013 The Chromium Authors. All rights reserved. 1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/edk/system/data_pipe_producer_dispatcher.h" 5 #include "mojo/edk/system/data_pipe_producer_dispatcher.h"
6 6
7 #include <stddef.h> 7 #include <stddef.h>
8 #include <stdint.h> 8 #include <stdint.h>
9 9
10 #include <utility> 10 #include <utility>
(...skipping 61 matching lines...) Expand 10 before | Expand all | Expand 10 after
72 NodeController* node_controller, 72 NodeController* node_controller,
73 const ports::PortRef& control_port, 73 const ports::PortRef& control_port,
74 scoped_refptr<PlatformSharedBuffer> shared_ring_buffer, 74 scoped_refptr<PlatformSharedBuffer> shared_ring_buffer,
75 const MojoCreateDataPipeOptions& options, 75 const MojoCreateDataPipeOptions& options,
76 bool initialized, 76 bool initialized,
77 uint64_t pipe_id) 77 uint64_t pipe_id)
78 : options_(options), 78 : options_(options),
79 node_controller_(node_controller), 79 node_controller_(node_controller),
80 control_port_(control_port), 80 control_port_(control_port),
81 pipe_id_(pipe_id), 81 pipe_id_(pipe_id),
82 watchers_(this),
83 shared_ring_buffer_(shared_ring_buffer), 82 shared_ring_buffer_(shared_ring_buffer),
84 available_capacity_(options_.capacity_num_bytes) { 83 available_capacity_(options_.capacity_num_bytes) {
85 if (initialized) { 84 if (initialized) {
86 base::AutoLock lock(lock_); 85 base::AutoLock lock(lock_);
87 InitializeNoLock(); 86 InitializeNoLock();
88 } 87 }
89 } 88 }
90 89
91 Dispatcher::Type DataPipeProducerDispatcher::GetType() const { 90 Dispatcher::Type DataPipeProducerDispatcher::GetType() const {
92 return Type::DATA_PIPE_PRODUCER; 91 return Type::DATA_PIPE_PRODUCER;
93 } 92 }
94 93
95 MojoResult DataPipeProducerDispatcher::Close() { 94 MojoResult DataPipeProducerDispatcher::Close() {
96 base::AutoLock lock(lock_); 95 base::AutoLock lock(lock_);
97 DVLOG(1) << "Closing data pipe producer " << pipe_id_; 96 DVLOG(1) << "Closing data pipe producer " << pipe_id_;
98 return CloseNoLock(); 97 return CloseNoLock();
99 } 98 }
100 99
100 MojoResult DataPipeProducerDispatcher::Watch(
101 MojoHandleSignals signals,
102 const Watcher::WatchCallback& callback,
103 uintptr_t context) {
104 base::AutoLock lock(lock_);
105
106 if (is_closed_ || in_transit_)
107 return MOJO_RESULT_INVALID_ARGUMENT;
108
109 return awakable_list_.AddWatcher(
110 signals, callback, context, GetHandleSignalsStateNoLock());
111 }
112
113 MojoResult DataPipeProducerDispatcher::CancelWatch(uintptr_t context) {
114 base::AutoLock lock(lock_);
115
116 if (is_closed_ || in_transit_)
117 return MOJO_RESULT_INVALID_ARGUMENT;
118
119 return awakable_list_.RemoveWatcher(context);
120 }
121
101 MojoResult DataPipeProducerDispatcher::WriteData(const void* elements, 122 MojoResult DataPipeProducerDispatcher::WriteData(const void* elements,
102 uint32_t* num_bytes, 123 uint32_t* num_bytes,
103 MojoWriteDataFlags flags) { 124 MojoWriteDataFlags flags) {
104 base::AutoLock lock(lock_); 125 base::AutoLock lock(lock_);
105 if (!shared_ring_buffer_ || in_transit_) 126 if (!shared_ring_buffer_ || in_transit_)
106 return MOJO_RESULT_INVALID_ARGUMENT; 127 return MOJO_RESULT_INVALID_ARGUMENT;
107 128
108 if (in_two_phase_write_) 129 if (in_two_phase_write_)
109 return MOJO_RESULT_BUSY; 130 return MOJO_RESULT_BUSY;
110 131
(...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after
151 memcpy(data, source + tail_bytes_to_write, head_bytes_to_write); 172 memcpy(data, source + tail_bytes_to_write, head_bytes_to_write);
152 173
153 DCHECK_LE(num_bytes_to_write, available_capacity_); 174 DCHECK_LE(num_bytes_to_write, available_capacity_);
154 available_capacity_ -= num_bytes_to_write; 175 available_capacity_ -= num_bytes_to_write;
155 write_offset_ = (write_offset_ + num_bytes_to_write) % 176 write_offset_ = (write_offset_ + num_bytes_to_write) %
156 options_.capacity_num_bytes; 177 options_.capacity_num_bytes;
157 178
158 HandleSignalsState new_state = GetHandleSignalsStateNoLock(); 179 HandleSignalsState new_state = GetHandleSignalsStateNoLock();
159 if (!new_state.equals(old_state)) 180 if (!new_state.equals(old_state))
160 awakable_list_.AwakeForStateChange(new_state); 181 awakable_list_.AwakeForStateChange(new_state);
161 watchers_.NotifyState(new_state);
162 182
163 base::AutoUnlock unlock(lock_); 183 base::AutoUnlock unlock(lock_);
164 NotifyWrite(num_bytes_to_write); 184 NotifyWrite(num_bytes_to_write);
165 185
166 return MOJO_RESULT_OK; 186 return MOJO_RESULT_OK;
167 } 187 }
168 188
169 MojoResult DataPipeProducerDispatcher::BeginWriteData( 189 MojoResult DataPipeProducerDispatcher::BeginWriteData(
170 void** buffer, 190 void** buffer,
171 uint32_t* buffer_num_bytes, 191 uint32_t* buffer_num_bytes,
172 MojoWriteDataFlags flags) { 192 MojoWriteDataFlags flags) {
173 base::AutoLock lock(lock_); 193 base::AutoLock lock(lock_);
174 if (!shared_ring_buffer_ || in_transit_) 194 if (!shared_ring_buffer_ || in_transit_)
175 return MOJO_RESULT_INVALID_ARGUMENT; 195 return MOJO_RESULT_INVALID_ARGUMENT;
176
177 // These flags may not be used in two-phase mode.
178 if (flags & MOJO_WRITE_DATA_FLAG_ALL_OR_NONE)
179 return MOJO_RESULT_INVALID_ARGUMENT;
180
181 if (in_two_phase_write_) 196 if (in_two_phase_write_)
182 return MOJO_RESULT_BUSY; 197 return MOJO_RESULT_BUSY;
183 if (peer_closed_) 198 if (peer_closed_)
184 return MOJO_RESULT_FAILED_PRECONDITION; 199 return MOJO_RESULT_FAILED_PRECONDITION;
185 200
186 if (available_capacity_ == 0) { 201 if (available_capacity_ == 0) {
187 return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION 202 return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION
188 : MOJO_RESULT_SHOULD_WAIT; 203 : MOJO_RESULT_SHOULD_WAIT;
189 } 204 }
190 205
(...skipping 38 matching lines...) Expand 10 before | Expand all | Expand 10 after
229 NotifyWrite(num_bytes_written); 244 NotifyWrite(num_bytes_written);
230 } 245 }
231 246
232 in_two_phase_write_ = false; 247 in_two_phase_write_ = false;
233 248
234 // If we're now writable, we *became* writable (since we weren't writable 249 // If we're now writable, we *became* writable (since we weren't writable
235 // during the two-phase write), so awake producer awakables. 250 // during the two-phase write), so awake producer awakables.
236 HandleSignalsState new_state = GetHandleSignalsStateNoLock(); 251 HandleSignalsState new_state = GetHandleSignalsStateNoLock();
237 if (new_state.satisfies(MOJO_HANDLE_SIGNAL_WRITABLE)) 252 if (new_state.satisfies(MOJO_HANDLE_SIGNAL_WRITABLE))
238 awakable_list_.AwakeForStateChange(new_state); 253 awakable_list_.AwakeForStateChange(new_state);
239 watchers_.NotifyState(new_state);
240 254
241 return rv; 255 return rv;
242 } 256 }
243 257
244 HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsState() const { 258 HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsState() const {
245 base::AutoLock lock(lock_); 259 base::AutoLock lock(lock_);
246 return GetHandleSignalsStateNoLock(); 260 return GetHandleSignalsStateNoLock();
247 } 261 }
248 262
249 MojoResult DataPipeProducerDispatcher::AddWatcherRef(
250 const scoped_refptr<WatcherDispatcher>& watcher,
251 uintptr_t context) {
252 base::AutoLock lock(lock_);
253 if (is_closed_ || in_transit_)
254 return MOJO_RESULT_INVALID_ARGUMENT;
255 return watchers_.Add(watcher, context, GetHandleSignalsStateNoLock());
256 }
257
258 MojoResult DataPipeProducerDispatcher::RemoveWatcherRef(
259 WatcherDispatcher* watcher,
260 uintptr_t context) {
261 base::AutoLock lock(lock_);
262 if (is_closed_ || in_transit_)
263 return MOJO_RESULT_INVALID_ARGUMENT;
264 return watchers_.Remove(watcher, context);
265 }
266
267 MojoResult DataPipeProducerDispatcher::AddAwakable( 263 MojoResult DataPipeProducerDispatcher::AddAwakable(
268 Awakable* awakable, 264 Awakable* awakable,
269 MojoHandleSignals signals, 265 MojoHandleSignals signals,
270 uintptr_t context, 266 uintptr_t context,
271 HandleSignalsState* signals_state) { 267 HandleSignalsState* signals_state) {
272 base::AutoLock lock(lock_); 268 base::AutoLock lock(lock_);
273 if (!shared_ring_buffer_ || in_transit_) { 269 if (!shared_ring_buffer_ || in_transit_) {
274 if (signals_state) 270 if (signals_state)
275 *signals_state = HandleSignalsState(); 271 *signals_state = HandleSignalsState();
276 return MOJO_RESULT_INVALID_ARGUMENT; 272 return MOJO_RESULT_INVALID_ARGUMENT;
(...skipping 76 matching lines...) Expand 10 before | Expand all | Expand 10 after
353 in_transit_ = false; 349 in_transit_ = false;
354 ignore_result(buffer_handle_for_transit_.release()); 350 ignore_result(buffer_handle_for_transit_.release());
355 CloseNoLock(); 351 CloseNoLock();
356 } 352 }
357 353
358 void DataPipeProducerDispatcher::CancelTransit() { 354 void DataPipeProducerDispatcher::CancelTransit() {
359 base::AutoLock lock(lock_); 355 base::AutoLock lock(lock_);
360 DCHECK(in_transit_); 356 DCHECK(in_transit_);
361 in_transit_ = false; 357 in_transit_ = false;
362 buffer_handle_for_transit_.reset(); 358 buffer_handle_for_transit_.reset();
363 359 awakable_list_.AwakeForStateChange(GetHandleSignalsStateNoLock());
364 HandleSignalsState state = GetHandleSignalsStateNoLock();
365 awakable_list_.AwakeForStateChange(state);
366 watchers_.NotifyState(state);
367 } 360 }
368 361
369 // static 362 // static
370 scoped_refptr<DataPipeProducerDispatcher> 363 scoped_refptr<DataPipeProducerDispatcher>
371 DataPipeProducerDispatcher::Deserialize(const void* data, 364 DataPipeProducerDispatcher::Deserialize(const void* data,
372 size_t num_bytes, 365 size_t num_bytes,
373 const ports::PortName* ports, 366 const ports::PortName* ports,
374 size_t num_ports, 367 size_t num_ports,
375 PlatformHandle* handles, 368 PlatformHandle* handles,
376 size_t num_handles) { 369 size_t num_handles) {
(...skipping 63 matching lines...) Expand 10 before | Expand all | Expand 10 after
440 433
441 MojoResult DataPipeProducerDispatcher::CloseNoLock() { 434 MojoResult DataPipeProducerDispatcher::CloseNoLock() {
442 lock_.AssertAcquired(); 435 lock_.AssertAcquired();
443 if (is_closed_ || in_transit_) 436 if (is_closed_ || in_transit_)
444 return MOJO_RESULT_INVALID_ARGUMENT; 437 return MOJO_RESULT_INVALID_ARGUMENT;
445 is_closed_ = true; 438 is_closed_ = true;
446 ring_buffer_mapping_.reset(); 439 ring_buffer_mapping_.reset();
447 shared_ring_buffer_ = nullptr; 440 shared_ring_buffer_ = nullptr;
448 441
449 awakable_list_.CancelAll(); 442 awakable_list_.CancelAll();
450 watchers_.NotifyClosed();
451 if (!transferred_) { 443 if (!transferred_) {
452 base::AutoUnlock unlock(lock_); 444 base::AutoUnlock unlock(lock_);
453 node_controller_->ClosePort(control_port_); 445 node_controller_->ClosePort(control_port_);
454 } 446 }
455 447
456 return MOJO_RESULT_OK; 448 return MOJO_RESULT_OK;
457 } 449 }
458 450
459 HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsStateNoLock() 451 HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsStateNoLock()
460 const { 452 const {
461 lock_.AssertAcquired(); 453 lock_.AssertAcquired();
462 HandleSignalsState rv; 454 HandleSignalsState rv;
463 if (!peer_closed_) { 455 if (!peer_closed_) {
464 if (!in_two_phase_write_ && shared_ring_buffer_ && available_capacity_ > 0) 456 if (!in_two_phase_write_ && shared_ring_buffer_ &&
457 available_capacity_ > 0)
465 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; 458 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
466 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; 459 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
467 } else { 460 } else {
468 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; 461 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
469 } 462 }
470 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; 463 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
471 return rv; 464 return rv;
472 } 465 }
473 466
474 void DataPipeProducerDispatcher::NotifyWrite(uint32_t num_bytes) { 467 void DataPipeProducerDispatcher::NotifyWrite(uint32_t num_bytes) {
(...skipping 66 matching lines...) Expand 10 before | Expand all | Expand 10 after
541 << m->num_bytes << " bytes were read. [control_port=" 534 << m->num_bytes << " bytes were read. [control_port="
542 << control_port_.name() << "]"; 535 << control_port_.name() << "]";
543 536
544 available_capacity_ += m->num_bytes; 537 available_capacity_ += m->num_bytes;
545 } 538 }
546 } while (message); 539 } while (message);
547 } 540 }
548 541
549 if (peer_closed_ != was_peer_closed || 542 if (peer_closed_ != was_peer_closed ||
550 available_capacity_ != previous_capacity) { 543 available_capacity_ != previous_capacity) {
551 HandleSignalsState state = GetHandleSignalsStateNoLock(); 544 awakable_list_.AwakeForStateChange(GetHandleSignalsStateNoLock());
552 awakable_list_.AwakeForStateChange(state);
553 watchers_.NotifyState(state);
554 } 545 }
555 } 546 }
556 547
557 } // namespace edk 548 } // namespace edk
558 } // namespace mojo 549 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/edk/system/data_pipe_producer_dispatcher.h ('k') | mojo/edk/system/data_pipe_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698