Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(271)

Side by Side Diff: mojo/edk/system/data_pipe.cc

Issue 926553006: Make DataPipe own an impl. (Closed) Base URL: https://github.com/domokit/mojo.git@local_data_pipe_impl_1
Patch Set: rebased Created 5 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2013 The Chromium Authors. All rights reserved. 1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/edk/system/data_pipe.h" 5 #include "mojo/edk/system/data_pipe.h"
6 6
7 #include <string.h> 7 #include <string.h>
8 8
9 #include <algorithm> 9 #include <algorithm>
10 #include <limits> 10 #include <limits>
11 11
12 #include "base/logging.h" 12 #include "base/logging.h"
13 #include "mojo/edk/system/awakable_list.h" 13 #include "mojo/edk/system/awakable_list.h"
14 #include "mojo/edk/system/configuration.h" 14 #include "mojo/edk/system/configuration.h"
15 #include "mojo/edk/system/data_pipe_impl.h"
15 #include "mojo/edk/system/memory.h" 16 #include "mojo/edk/system/memory.h"
16 #include "mojo/edk/system/options_validation.h" 17 #include "mojo/edk/system/options_validation.h"
17 18
18 namespace mojo { 19 namespace mojo {
19 namespace system { 20 namespace system {
20 21
22 DataPipe::DataPipe(bool has_local_producer,
23 bool has_local_consumer,
24 const MojoCreateDataPipeOptions& validated_options,
25 scoped_ptr<DataPipeImpl> impl)
26 : may_discard_((validated_options.flags &
27 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_MAY_DISCARD)),
28 element_num_bytes_(validated_options.element_num_bytes),
29 capacity_num_bytes_(validated_options.capacity_num_bytes),
30 producer_open_(true),
31 consumer_open_(true),
32 producer_awakable_list_(has_local_producer ? new AwakableList()
33 : nullptr),
34 consumer_awakable_list_(has_local_consumer ? new AwakableList()
35 : nullptr),
36 producer_two_phase_max_num_bytes_written_(0),
37 consumer_two_phase_max_num_bytes_read_(0),
38 impl_(impl.Pass()) {
39 impl_->set_owner(this);
40
41 #if !defined(NDEBUG) || defined(DCHECK_ALWAYS_ON)
42 // Check that the passed in options actually are validated.
43 MojoCreateDataPipeOptions unused = {0};
44 DCHECK_EQ(ValidateCreateOptions(MakeUserPointer(&validated_options), &unused),
45 MOJO_RESULT_OK);
46 #endif // !defined(NDEBUG) || defined(DCHECK_ALWAYS_ON)
47 }
48
21 // static 49 // static
22 MojoCreateDataPipeOptions DataPipe::GetDefaultCreateOptions() { 50 MojoCreateDataPipeOptions DataPipe::GetDefaultCreateOptions() {
23 MojoCreateDataPipeOptions result = { 51 MojoCreateDataPipeOptions result = {
24 static_cast<uint32_t>(sizeof(MojoCreateDataPipeOptions)), 52 static_cast<uint32_t>(sizeof(MojoCreateDataPipeOptions)),
25 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, 53 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE,
26 1u, 54 1u,
27 static_cast<uint32_t>( 55 static_cast<uint32_t>(
28 GetConfiguration().default_data_pipe_capacity_bytes)}; 56 GetConfiguration().default_data_pipe_capacity_bytes)};
29 return result; 57 return result;
30 } 58 }
(...skipping 78 matching lines...) Expand 10 before | Expand all | Expand 10 after
109 uint32_t max_num_bytes_to_write = num_bytes.Get(); 137 uint32_t max_num_bytes_to_write = num_bytes.Get();
110 if (max_num_bytes_to_write % element_num_bytes_ != 0) 138 if (max_num_bytes_to_write % element_num_bytes_ != 0)
111 return MOJO_RESULT_INVALID_ARGUMENT; 139 return MOJO_RESULT_INVALID_ARGUMENT;
112 140
113 if (max_num_bytes_to_write == 0) 141 if (max_num_bytes_to_write == 0)
114 return MOJO_RESULT_OK; // Nothing to do. 142 return MOJO_RESULT_OK; // Nothing to do.
115 143
116 uint32_t min_num_bytes_to_write = all_or_none ? max_num_bytes_to_write : 0; 144 uint32_t min_num_bytes_to_write = all_or_none ? max_num_bytes_to_write : 0;
117 145
118 HandleSignalsState old_consumer_state = 146 HandleSignalsState old_consumer_state =
119 ConsumerGetHandleSignalsStateImplNoLock(); 147 impl_->ConsumerGetHandleSignalsState();
120 MojoResult rv = ProducerWriteDataImplNoLock( 148 MojoResult rv = impl_->ProducerWriteData(
121 elements, num_bytes, max_num_bytes_to_write, min_num_bytes_to_write); 149 elements, num_bytes, max_num_bytes_to_write, min_num_bytes_to_write);
122 HandleSignalsState new_consumer_state = 150 HandleSignalsState new_consumer_state =
123 ConsumerGetHandleSignalsStateImplNoLock(); 151 impl_->ConsumerGetHandleSignalsState();
124 if (!new_consumer_state.equals(old_consumer_state)) 152 if (!new_consumer_state.equals(old_consumer_state))
125 AwakeConsumerAwakablesForStateChangeNoLock(new_consumer_state); 153 AwakeConsumerAwakablesForStateChangeNoLock(new_consumer_state);
126 return rv; 154 return rv;
127 } 155 }
128 156
129 MojoResult DataPipe::ProducerBeginWriteData( 157 MojoResult DataPipe::ProducerBeginWriteData(
130 UserPointer<void*> buffer, 158 UserPointer<void*> buffer,
131 UserPointer<uint32_t> buffer_num_bytes, 159 UserPointer<uint32_t> buffer_num_bytes,
132 bool all_or_none) { 160 bool all_or_none) {
133 base::AutoLock locker(lock_); 161 base::AutoLock locker(lock_);
134 DCHECK(has_local_producer_no_lock()); 162 DCHECK(has_local_producer_no_lock());
135 163
136 if (producer_in_two_phase_write_no_lock()) 164 if (producer_in_two_phase_write_no_lock())
137 return MOJO_RESULT_BUSY; 165 return MOJO_RESULT_BUSY;
138 if (!consumer_open_no_lock()) 166 if (!consumer_open_no_lock())
139 return MOJO_RESULT_FAILED_PRECONDITION; 167 return MOJO_RESULT_FAILED_PRECONDITION;
140 168
141 uint32_t min_num_bytes_to_write = 0; 169 uint32_t min_num_bytes_to_write = 0;
142 if (all_or_none) { 170 if (all_or_none) {
143 min_num_bytes_to_write = buffer_num_bytes.Get(); 171 min_num_bytes_to_write = buffer_num_bytes.Get();
144 if (min_num_bytes_to_write % element_num_bytes_ != 0) 172 if (min_num_bytes_to_write % element_num_bytes_ != 0)
145 return MOJO_RESULT_INVALID_ARGUMENT; 173 return MOJO_RESULT_INVALID_ARGUMENT;
146 } 174 }
147 175
148 MojoResult rv = ProducerBeginWriteDataImplNoLock(buffer, buffer_num_bytes, 176 MojoResult rv = impl_->ProducerBeginWriteData(buffer, buffer_num_bytes,
149 min_num_bytes_to_write); 177 min_num_bytes_to_write);
150 if (rv != MOJO_RESULT_OK) 178 if (rv != MOJO_RESULT_OK)
151 return rv; 179 return rv;
152 // Note: No need to awake producer awakables, even though we're going from 180 // Note: No need to awake producer awakables, even though we're going from
153 // writable to non-writable (since you can't wait on non-writability). 181 // writable to non-writable (since you can't wait on non-writability).
154 // Similarly, though this may have discarded data (in "may discard" mode), 182 // Similarly, though this may have discarded data (in "may discard" mode),
155 // making it non-readable, there's still no need to awake consumer awakables. 183 // making it non-readable, there's still no need to awake consumer awakables.
156 DCHECK(producer_in_two_phase_write_no_lock()); 184 DCHECK(producer_in_two_phase_write_no_lock());
157 return MOJO_RESULT_OK; 185 return MOJO_RESULT_OK;
158 } 186 }
159 187
160 MojoResult DataPipe::ProducerEndWriteData(uint32_t num_bytes_written) { 188 MojoResult DataPipe::ProducerEndWriteData(uint32_t num_bytes_written) {
161 base::AutoLock locker(lock_); 189 base::AutoLock locker(lock_);
162 DCHECK(has_local_producer_no_lock()); 190 DCHECK(has_local_producer_no_lock());
163 191
164 if (!producer_in_two_phase_write_no_lock()) 192 if (!producer_in_two_phase_write_no_lock())
165 return MOJO_RESULT_FAILED_PRECONDITION; 193 return MOJO_RESULT_FAILED_PRECONDITION;
166 // Note: Allow successful completion of the two-phase write even if the 194 // Note: Allow successful completion of the two-phase write even if the
167 // consumer has been closed. 195 // consumer has been closed.
168 196
169 HandleSignalsState old_consumer_state = 197 HandleSignalsState old_consumer_state =
170 ConsumerGetHandleSignalsStateImplNoLock(); 198 impl_->ConsumerGetHandleSignalsState();
171 MojoResult rv; 199 MojoResult rv;
172 if (num_bytes_written > producer_two_phase_max_num_bytes_written_ || 200 if (num_bytes_written > producer_two_phase_max_num_bytes_written_ ||
173 num_bytes_written % element_num_bytes_ != 0) { 201 num_bytes_written % element_num_bytes_ != 0) {
174 rv = MOJO_RESULT_INVALID_ARGUMENT; 202 rv = MOJO_RESULT_INVALID_ARGUMENT;
175 producer_two_phase_max_num_bytes_written_ = 0; 203 producer_two_phase_max_num_bytes_written_ = 0;
176 } else { 204 } else {
177 rv = ProducerEndWriteDataImplNoLock(num_bytes_written); 205 rv = impl_->ProducerEndWriteData(num_bytes_written);
178 } 206 }
179 // Two-phase write ended even on failure. 207 // Two-phase write ended even on failure.
180 DCHECK(!producer_in_two_phase_write_no_lock()); 208 DCHECK(!producer_in_two_phase_write_no_lock());
181 // If we're now writable, we *became* writable (since we weren't writable 209 // If we're now writable, we *became* writable (since we weren't writable
182 // during the two-phase write), so awake producer awakables. 210 // during the two-phase write), so awake producer awakables.
183 HandleSignalsState new_producer_state = 211 HandleSignalsState new_producer_state =
184 ProducerGetHandleSignalsStateImplNoLock(); 212 impl_->ProducerGetHandleSignalsState();
185 if (new_producer_state.satisfies(MOJO_HANDLE_SIGNAL_WRITABLE)) 213 if (new_producer_state.satisfies(MOJO_HANDLE_SIGNAL_WRITABLE))
186 AwakeProducerAwakablesForStateChangeNoLock(new_producer_state); 214 AwakeProducerAwakablesForStateChangeNoLock(new_producer_state);
187 HandleSignalsState new_consumer_state = 215 HandleSignalsState new_consumer_state =
188 ConsumerGetHandleSignalsStateImplNoLock(); 216 impl_->ConsumerGetHandleSignalsState();
189 if (!new_consumer_state.equals(old_consumer_state)) 217 if (!new_consumer_state.equals(old_consumer_state))
190 AwakeConsumerAwakablesForStateChangeNoLock(new_consumer_state); 218 AwakeConsumerAwakablesForStateChangeNoLock(new_consumer_state);
191 return rv; 219 return rv;
192 } 220 }
193 221
194 HandleSignalsState DataPipe::ProducerGetHandleSignalsState() { 222 HandleSignalsState DataPipe::ProducerGetHandleSignalsState() {
195 base::AutoLock locker(lock_); 223 base::AutoLock locker(lock_);
196 DCHECK(has_local_producer_no_lock()); 224 DCHECK(has_local_producer_no_lock());
197 return ProducerGetHandleSignalsStateImplNoLock(); 225 return impl_->ProducerGetHandleSignalsState();
198 } 226 }
199 227
200 MojoResult DataPipe::ProducerAddAwakable(Awakable* awakable, 228 MojoResult DataPipe::ProducerAddAwakable(Awakable* awakable,
201 MojoHandleSignals signals, 229 MojoHandleSignals signals,
202 uint32_t context, 230 uint32_t context,
203 HandleSignalsState* signals_state) { 231 HandleSignalsState* signals_state) {
204 base::AutoLock locker(lock_); 232 base::AutoLock locker(lock_);
205 DCHECK(has_local_producer_no_lock()); 233 DCHECK(has_local_producer_no_lock());
206 234
207 HandleSignalsState producer_state = ProducerGetHandleSignalsStateImplNoLock(); 235 HandleSignalsState producer_state = impl_->ProducerGetHandleSignalsState();
208 if (producer_state.satisfies(signals)) { 236 if (producer_state.satisfies(signals)) {
209 if (signals_state) 237 if (signals_state)
210 *signals_state = producer_state; 238 *signals_state = producer_state;
211 return MOJO_RESULT_ALREADY_EXISTS; 239 return MOJO_RESULT_ALREADY_EXISTS;
212 } 240 }
213 if (!producer_state.can_satisfy(signals)) { 241 if (!producer_state.can_satisfy(signals)) {
214 if (signals_state) 242 if (signals_state)
215 *signals_state = producer_state; 243 *signals_state = producer_state;
216 return MOJO_RESULT_FAILED_PRECONDITION; 244 return MOJO_RESULT_FAILED_PRECONDITION;
217 } 245 }
218 246
219 producer_awakable_list_->Add(awakable, signals, context); 247 producer_awakable_list_->Add(awakable, signals, context);
220 return MOJO_RESULT_OK; 248 return MOJO_RESULT_OK;
221 } 249 }
222 250
223 void DataPipe::ProducerRemoveAwakable(Awakable* awakable, 251 void DataPipe::ProducerRemoveAwakable(Awakable* awakable,
224 HandleSignalsState* signals_state) { 252 HandleSignalsState* signals_state) {
225 base::AutoLock locker(lock_); 253 base::AutoLock locker(lock_);
226 DCHECK(has_local_producer_no_lock()); 254 DCHECK(has_local_producer_no_lock());
227 producer_awakable_list_->Remove(awakable); 255 producer_awakable_list_->Remove(awakable);
228 if (signals_state) 256 if (signals_state)
229 *signals_state = ProducerGetHandleSignalsStateImplNoLock(); 257 *signals_state = impl_->ProducerGetHandleSignalsState();
230 } 258 }
231 259
232 void DataPipe::ProducerStartSerialize(Channel* channel, 260 void DataPipe::ProducerStartSerialize(Channel* channel,
233 size_t* max_size, 261 size_t* max_size,
234 size_t* max_platform_handles) { 262 size_t* max_platform_handles) {
235 base::AutoLock locker(lock_); 263 base::AutoLock locker(lock_);
236 DCHECK(has_local_producer_no_lock()); 264 DCHECK(has_local_producer_no_lock());
237 ProducerStartSerializeImplNoLock(channel, max_size, max_platform_handles); 265 impl_->ProducerStartSerialize(channel, max_size, max_platform_handles);
238 } 266 }
239 267
240 bool DataPipe::ProducerEndSerialize( 268 bool DataPipe::ProducerEndSerialize(
241 Channel* channel, 269 Channel* channel,
242 void* destination, 270 void* destination,
243 size_t* actual_size, 271 size_t* actual_size,
244 embedder::PlatformHandleVector* platform_handles) { 272 embedder::PlatformHandleVector* platform_handles) {
245 base::AutoLock locker(lock_); 273 base::AutoLock locker(lock_);
246 DCHECK(has_local_producer_no_lock()); 274 DCHECK(has_local_producer_no_lock());
247 return ProducerEndSerializeImplNoLock(channel, destination, actual_size, 275 return impl_->ProducerEndSerialize(channel, destination, actual_size,
248 platform_handles); 276 platform_handles);
249 } 277 }
250 278
251 bool DataPipe::ProducerIsBusy() const { 279 bool DataPipe::ProducerIsBusy() const {
252 base::AutoLock locker(lock_); 280 base::AutoLock locker(lock_);
253 return producer_in_two_phase_write_no_lock(); 281 return producer_in_two_phase_write_no_lock();
254 } 282 }
255 283
256 void DataPipe::ConsumerCancelAllAwakables() { 284 void DataPipe::ConsumerCancelAllAwakables() {
257 base::AutoLock locker(lock_); 285 base::AutoLock locker(lock_);
258 DCHECK(has_local_consumer_no_lock()); 286 DCHECK(has_local_consumer_no_lock());
(...skipping 18 matching lines...) Expand all
277 uint32_t max_num_bytes_to_read = num_bytes.Get(); 305 uint32_t max_num_bytes_to_read = num_bytes.Get();
278 if (max_num_bytes_to_read % element_num_bytes_ != 0) 306 if (max_num_bytes_to_read % element_num_bytes_ != 0)
279 return MOJO_RESULT_INVALID_ARGUMENT; 307 return MOJO_RESULT_INVALID_ARGUMENT;
280 308
281 if (max_num_bytes_to_read == 0) 309 if (max_num_bytes_to_read == 0)
282 return MOJO_RESULT_OK; // Nothing to do. 310 return MOJO_RESULT_OK; // Nothing to do.
283 311
284 uint32_t min_num_bytes_to_read = all_or_none ? max_num_bytes_to_read : 0; 312 uint32_t min_num_bytes_to_read = all_or_none ? max_num_bytes_to_read : 0;
285 313
286 HandleSignalsState old_producer_state = 314 HandleSignalsState old_producer_state =
287 ProducerGetHandleSignalsStateImplNoLock(); 315 impl_->ProducerGetHandleSignalsState();
288 MojoResult rv = ConsumerReadDataImplNoLock( 316 MojoResult rv = impl_->ConsumerReadData(
289 elements, num_bytes, max_num_bytes_to_read, min_num_bytes_to_read, peek); 317 elements, num_bytes, max_num_bytes_to_read, min_num_bytes_to_read, peek);
290 HandleSignalsState new_producer_state = 318 HandleSignalsState new_producer_state =
291 ProducerGetHandleSignalsStateImplNoLock(); 319 impl_->ProducerGetHandleSignalsState();
292 if (!new_producer_state.equals(old_producer_state)) 320 if (!new_producer_state.equals(old_producer_state))
293 AwakeProducerAwakablesForStateChangeNoLock(new_producer_state); 321 AwakeProducerAwakablesForStateChangeNoLock(new_producer_state);
294 return rv; 322 return rv;
295 } 323 }
296 324
297 MojoResult DataPipe::ConsumerDiscardData(UserPointer<uint32_t> num_bytes, 325 MojoResult DataPipe::ConsumerDiscardData(UserPointer<uint32_t> num_bytes,
298 bool all_or_none) { 326 bool all_or_none) {
299 base::AutoLock locker(lock_); 327 base::AutoLock locker(lock_);
300 DCHECK(has_local_consumer_no_lock()); 328 DCHECK(has_local_consumer_no_lock());
301 329
302 if (consumer_in_two_phase_read_no_lock()) 330 if (consumer_in_two_phase_read_no_lock())
303 return MOJO_RESULT_BUSY; 331 return MOJO_RESULT_BUSY;
304 332
305 uint32_t max_num_bytes_to_discard = num_bytes.Get(); 333 uint32_t max_num_bytes_to_discard = num_bytes.Get();
306 if (max_num_bytes_to_discard % element_num_bytes_ != 0) 334 if (max_num_bytes_to_discard % element_num_bytes_ != 0)
307 return MOJO_RESULT_INVALID_ARGUMENT; 335 return MOJO_RESULT_INVALID_ARGUMENT;
308 336
309 if (max_num_bytes_to_discard == 0) 337 if (max_num_bytes_to_discard == 0)
310 return MOJO_RESULT_OK; // Nothing to do. 338 return MOJO_RESULT_OK; // Nothing to do.
311 339
312 uint32_t min_num_bytes_to_discard = 340 uint32_t min_num_bytes_to_discard =
313 all_or_none ? max_num_bytes_to_discard : 0; 341 all_or_none ? max_num_bytes_to_discard : 0;
314 342
315 HandleSignalsState old_producer_state = 343 HandleSignalsState old_producer_state =
316 ProducerGetHandleSignalsStateImplNoLock(); 344 impl_->ProducerGetHandleSignalsState();
317 MojoResult rv = ConsumerDiscardDataImplNoLock( 345 MojoResult rv = impl_->ConsumerDiscardData(
318 num_bytes, max_num_bytes_to_discard, min_num_bytes_to_discard); 346 num_bytes, max_num_bytes_to_discard, min_num_bytes_to_discard);
319 HandleSignalsState new_producer_state = 347 HandleSignalsState new_producer_state =
320 ProducerGetHandleSignalsStateImplNoLock(); 348 impl_->ProducerGetHandleSignalsState();
321 if (!new_producer_state.equals(old_producer_state)) 349 if (!new_producer_state.equals(old_producer_state))
322 AwakeProducerAwakablesForStateChangeNoLock(new_producer_state); 350 AwakeProducerAwakablesForStateChangeNoLock(new_producer_state);
323 return rv; 351 return rv;
324 } 352 }
325 353
326 MojoResult DataPipe::ConsumerQueryData(UserPointer<uint32_t> num_bytes) { 354 MojoResult DataPipe::ConsumerQueryData(UserPointer<uint32_t> num_bytes) {
327 base::AutoLock locker(lock_); 355 base::AutoLock locker(lock_);
328 DCHECK(has_local_consumer_no_lock()); 356 DCHECK(has_local_consumer_no_lock());
329 357
330 if (consumer_in_two_phase_read_no_lock()) 358 if (consumer_in_two_phase_read_no_lock())
331 return MOJO_RESULT_BUSY; 359 return MOJO_RESULT_BUSY;
332 360
333 // Note: Don't need to validate |*num_bytes| for query. 361 // Note: Don't need to validate |*num_bytes| for query.
334 return ConsumerQueryDataImplNoLock(num_bytes); 362 return impl_->ConsumerQueryData(num_bytes);
335 } 363 }
336 364
337 MojoResult DataPipe::ConsumerBeginReadData( 365 MojoResult DataPipe::ConsumerBeginReadData(
338 UserPointer<const void*> buffer, 366 UserPointer<const void*> buffer,
339 UserPointer<uint32_t> buffer_num_bytes, 367 UserPointer<uint32_t> buffer_num_bytes,
340 bool all_or_none) { 368 bool all_or_none) {
341 base::AutoLock locker(lock_); 369 base::AutoLock locker(lock_);
342 DCHECK(has_local_consumer_no_lock()); 370 DCHECK(has_local_consumer_no_lock());
343 371
344 if (consumer_in_two_phase_read_no_lock()) 372 if (consumer_in_two_phase_read_no_lock())
345 return MOJO_RESULT_BUSY; 373 return MOJO_RESULT_BUSY;
346 374
347 uint32_t min_num_bytes_to_read = 0; 375 uint32_t min_num_bytes_to_read = 0;
348 if (all_or_none) { 376 if (all_or_none) {
349 min_num_bytes_to_read = buffer_num_bytes.Get(); 377 min_num_bytes_to_read = buffer_num_bytes.Get();
350 if (min_num_bytes_to_read % element_num_bytes_ != 0) 378 if (min_num_bytes_to_read % element_num_bytes_ != 0)
351 return MOJO_RESULT_INVALID_ARGUMENT; 379 return MOJO_RESULT_INVALID_ARGUMENT;
352 } 380 }
353 381
354 MojoResult rv = ConsumerBeginReadDataImplNoLock(buffer, buffer_num_bytes, 382 MojoResult rv = impl_->ConsumerBeginReadData(buffer, buffer_num_bytes,
355 min_num_bytes_to_read); 383 min_num_bytes_to_read);
356 if (rv != MOJO_RESULT_OK) 384 if (rv != MOJO_RESULT_OK)
357 return rv; 385 return rv;
358 DCHECK(consumer_in_two_phase_read_no_lock()); 386 DCHECK(consumer_in_two_phase_read_no_lock());
359 return MOJO_RESULT_OK; 387 return MOJO_RESULT_OK;
360 } 388 }
361 389
362 MojoResult DataPipe::ConsumerEndReadData(uint32_t num_bytes_read) { 390 MojoResult DataPipe::ConsumerEndReadData(uint32_t num_bytes_read) {
363 base::AutoLock locker(lock_); 391 base::AutoLock locker(lock_);
364 DCHECK(has_local_consumer_no_lock()); 392 DCHECK(has_local_consumer_no_lock());
365 393
366 if (!consumer_in_two_phase_read_no_lock()) 394 if (!consumer_in_two_phase_read_no_lock())
367 return MOJO_RESULT_FAILED_PRECONDITION; 395 return MOJO_RESULT_FAILED_PRECONDITION;
368 396
369 HandleSignalsState old_producer_state = 397 HandleSignalsState old_producer_state =
370 ProducerGetHandleSignalsStateImplNoLock(); 398 impl_->ProducerGetHandleSignalsState();
371 MojoResult rv; 399 MojoResult rv;
372 if (num_bytes_read > consumer_two_phase_max_num_bytes_read_ || 400 if (num_bytes_read > consumer_two_phase_max_num_bytes_read_ ||
373 num_bytes_read % element_num_bytes_ != 0) { 401 num_bytes_read % element_num_bytes_ != 0) {
374 rv = MOJO_RESULT_INVALID_ARGUMENT; 402 rv = MOJO_RESULT_INVALID_ARGUMENT;
375 consumer_two_phase_max_num_bytes_read_ = 0; 403 consumer_two_phase_max_num_bytes_read_ = 0;
376 } else { 404 } else {
377 rv = ConsumerEndReadDataImplNoLock(num_bytes_read); 405 rv = impl_->ConsumerEndReadData(num_bytes_read);
378 } 406 }
379 // Two-phase read ended even on failure. 407 // Two-phase read ended even on failure.
380 DCHECK(!consumer_in_two_phase_read_no_lock()); 408 DCHECK(!consumer_in_two_phase_read_no_lock());
381 // If we're now readable, we *became* readable (since we weren't readable 409 // If we're now readable, we *became* readable (since we weren't readable
382 // during the two-phase read), so awake consumer awakables. 410 // during the two-phase read), so awake consumer awakables.
383 HandleSignalsState new_consumer_state = 411 HandleSignalsState new_consumer_state =
384 ConsumerGetHandleSignalsStateImplNoLock(); 412 impl_->ConsumerGetHandleSignalsState();
385 if (new_consumer_state.satisfies(MOJO_HANDLE_SIGNAL_READABLE)) 413 if (new_consumer_state.satisfies(MOJO_HANDLE_SIGNAL_READABLE))
386 AwakeConsumerAwakablesForStateChangeNoLock(new_consumer_state); 414 AwakeConsumerAwakablesForStateChangeNoLock(new_consumer_state);
387 HandleSignalsState new_producer_state = 415 HandleSignalsState new_producer_state =
388 ProducerGetHandleSignalsStateImplNoLock(); 416 impl_->ProducerGetHandleSignalsState();
389 if (!new_producer_state.equals(old_producer_state)) 417 if (!new_producer_state.equals(old_producer_state))
390 AwakeProducerAwakablesForStateChangeNoLock(new_producer_state); 418 AwakeProducerAwakablesForStateChangeNoLock(new_producer_state);
391 return rv; 419 return rv;
392 } 420 }
393 421
394 HandleSignalsState DataPipe::ConsumerGetHandleSignalsState() { 422 HandleSignalsState DataPipe::ConsumerGetHandleSignalsState() {
395 base::AutoLock locker(lock_); 423 base::AutoLock locker(lock_);
396 DCHECK(has_local_consumer_no_lock()); 424 DCHECK(has_local_consumer_no_lock());
397 return ConsumerGetHandleSignalsStateImplNoLock(); 425 return impl_->ConsumerGetHandleSignalsState();
398 } 426 }
399 427
400 MojoResult DataPipe::ConsumerAddAwakable(Awakable* awakable, 428 MojoResult DataPipe::ConsumerAddAwakable(Awakable* awakable,
401 MojoHandleSignals signals, 429 MojoHandleSignals signals,
402 uint32_t context, 430 uint32_t context,
403 HandleSignalsState* signals_state) { 431 HandleSignalsState* signals_state) {
404 base::AutoLock locker(lock_); 432 base::AutoLock locker(lock_);
405 DCHECK(has_local_consumer_no_lock()); 433 DCHECK(has_local_consumer_no_lock());
406 434
407 HandleSignalsState consumer_state = ConsumerGetHandleSignalsStateImplNoLock(); 435 HandleSignalsState consumer_state = impl_->ConsumerGetHandleSignalsState();
408 if (consumer_state.satisfies(signals)) { 436 if (consumer_state.satisfies(signals)) {
409 if (signals_state) 437 if (signals_state)
410 *signals_state = consumer_state; 438 *signals_state = consumer_state;
411 return MOJO_RESULT_ALREADY_EXISTS; 439 return MOJO_RESULT_ALREADY_EXISTS;
412 } 440 }
413 if (!consumer_state.can_satisfy(signals)) { 441 if (!consumer_state.can_satisfy(signals)) {
414 if (signals_state) 442 if (signals_state)
415 *signals_state = consumer_state; 443 *signals_state = consumer_state;
416 return MOJO_RESULT_FAILED_PRECONDITION; 444 return MOJO_RESULT_FAILED_PRECONDITION;
417 } 445 }
418 446
419 consumer_awakable_list_->Add(awakable, signals, context); 447 consumer_awakable_list_->Add(awakable, signals, context);
420 return MOJO_RESULT_OK; 448 return MOJO_RESULT_OK;
421 } 449 }
422 450
423 void DataPipe::ConsumerRemoveAwakable(Awakable* awakable, 451 void DataPipe::ConsumerRemoveAwakable(Awakable* awakable,
424 HandleSignalsState* signals_state) { 452 HandleSignalsState* signals_state) {
425 base::AutoLock locker(lock_); 453 base::AutoLock locker(lock_);
426 DCHECK(has_local_consumer_no_lock()); 454 DCHECK(has_local_consumer_no_lock());
427 consumer_awakable_list_->Remove(awakable); 455 consumer_awakable_list_->Remove(awakable);
428 if (signals_state) 456 if (signals_state)
429 *signals_state = ConsumerGetHandleSignalsStateImplNoLock(); 457 *signals_state = impl_->ConsumerGetHandleSignalsState();
430 } 458 }
431 459
432 void DataPipe::ConsumerStartSerialize(Channel* channel, 460 void DataPipe::ConsumerStartSerialize(Channel* channel,
433 size_t* max_size, 461 size_t* max_size,
434 size_t* max_platform_handles) { 462 size_t* max_platform_handles) {
435 base::AutoLock locker(lock_); 463 base::AutoLock locker(lock_);
436 DCHECK(has_local_consumer_no_lock()); 464 DCHECK(has_local_consumer_no_lock());
437 ConsumerStartSerializeImplNoLock(channel, max_size, max_platform_handles); 465 impl_->ConsumerStartSerialize(channel, max_size, max_platform_handles);
438 } 466 }
439 467
440 bool DataPipe::ConsumerEndSerialize( 468 bool DataPipe::ConsumerEndSerialize(
441 Channel* channel, 469 Channel* channel,
442 void* destination, 470 void* destination,
443 size_t* actual_size, 471 size_t* actual_size,
444 embedder::PlatformHandleVector* platform_handles) { 472 embedder::PlatformHandleVector* platform_handles) {
445 base::AutoLock locker(lock_); 473 base::AutoLock locker(lock_);
446 DCHECK(has_local_consumer_no_lock()); 474 DCHECK(has_local_consumer_no_lock());
447 return ConsumerEndSerializeImplNoLock(channel, destination, actual_size, 475 return impl_->ConsumerEndSerialize(channel, destination, actual_size,
448 platform_handles); 476 platform_handles);
449 } 477 }
450 478
451 bool DataPipe::ConsumerIsBusy() const { 479 bool DataPipe::ConsumerIsBusy() const {
452 base::AutoLock locker(lock_); 480 base::AutoLock locker(lock_);
453 return consumer_in_two_phase_read_no_lock(); 481 return consumer_in_two_phase_read_no_lock();
454 } 482 }
455 483
456 DataPipe::DataPipe(bool has_local_producer,
457 bool has_local_consumer,
458 const MojoCreateDataPipeOptions& validated_options)
459 : may_discard_((validated_options.flags &
460 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_MAY_DISCARD)),
461 element_num_bytes_(validated_options.element_num_bytes),
462 capacity_num_bytes_(validated_options.capacity_num_bytes),
463 producer_open_(true),
464 consumer_open_(true),
465 producer_awakable_list_(has_local_producer ? new AwakableList()
466 : nullptr),
467 consumer_awakable_list_(has_local_consumer ? new AwakableList()
468 : nullptr),
469 producer_two_phase_max_num_bytes_written_(0),
470 consumer_two_phase_max_num_bytes_read_(0) {
471 // Check that the passed in options actually are validated.
472 MojoCreateDataPipeOptions unused = {0};
473 DCHECK_EQ(ValidateCreateOptions(MakeUserPointer(&validated_options), &unused),
474 MOJO_RESULT_OK);
475 }
476
477 DataPipe::~DataPipe() { 484 DataPipe::~DataPipe() {
478 DCHECK(!producer_open_); 485 DCHECK(!producer_open_);
479 DCHECK(!consumer_open_); 486 DCHECK(!consumer_open_);
480 DCHECK(!producer_awakable_list_); 487 DCHECK(!producer_awakable_list_);
481 DCHECK(!consumer_awakable_list_); 488 DCHECK(!consumer_awakable_list_);
482 } 489 }
483 490
484 void DataPipe::ProducerCloseNoLock() { 491 void DataPipe::ProducerCloseNoLock() {
485 lock_.AssertAcquired(); 492 lock_.AssertAcquired();
486 DCHECK(producer_open_); 493 DCHECK(producer_open_);
487 producer_open_ = false; 494 producer_open_ = false;
488 DCHECK(has_local_producer_no_lock()); 495 DCHECK(has_local_producer_no_lock());
489 producer_awakable_list_.reset(); 496 producer_awakable_list_.reset();
490 // Not a bug, except possibly in "user" code. 497 // Not a bug, except possibly in "user" code.
491 DVLOG_IF(2, producer_in_two_phase_write_no_lock()) 498 DVLOG_IF(2, producer_in_two_phase_write_no_lock())
492 << "Producer closed with active two-phase write"; 499 << "Producer closed with active two-phase write";
493 producer_two_phase_max_num_bytes_written_ = 0; 500 producer_two_phase_max_num_bytes_written_ = 0;
494 ProducerCloseImplNoLock(); 501 impl_->ProducerClose();
495 AwakeConsumerAwakablesForStateChangeNoLock( 502 AwakeConsumerAwakablesForStateChangeNoLock(
496 ConsumerGetHandleSignalsStateImplNoLock()); 503 impl_->ConsumerGetHandleSignalsState());
497 } 504 }
498 505
499 void DataPipe::ConsumerCloseNoLock() { 506 void DataPipe::ConsumerCloseNoLock() {
500 lock_.AssertAcquired(); 507 lock_.AssertAcquired();
501 DCHECK(consumer_open_); 508 DCHECK(consumer_open_);
502 consumer_open_ = false; 509 consumer_open_ = false;
503 DCHECK(has_local_consumer_no_lock()); 510 DCHECK(has_local_consumer_no_lock());
504 consumer_awakable_list_.reset(); 511 consumer_awakable_list_.reset();
505 // Not a bug, except possibly in "user" code. 512 // Not a bug, except possibly in "user" code.
506 DVLOG_IF(2, consumer_in_two_phase_read_no_lock()) 513 DVLOG_IF(2, consumer_in_two_phase_read_no_lock())
507 << "Consumer closed with active two-phase read"; 514 << "Consumer closed with active two-phase read";
508 consumer_two_phase_max_num_bytes_read_ = 0; 515 consumer_two_phase_max_num_bytes_read_ = 0;
509 ConsumerCloseImplNoLock(); 516 impl_->ConsumerClose();
510 AwakeProducerAwakablesForStateChangeNoLock( 517 AwakeProducerAwakablesForStateChangeNoLock(
511 ProducerGetHandleSignalsStateImplNoLock()); 518 impl_->ProducerGetHandleSignalsState());
512 } 519 }
513 520
514 void DataPipe::AwakeProducerAwakablesForStateChangeNoLock( 521 void DataPipe::AwakeProducerAwakablesForStateChangeNoLock(
515 const HandleSignalsState& new_producer_state) { 522 const HandleSignalsState& new_producer_state) {
516 lock_.AssertAcquired(); 523 lock_.AssertAcquired();
517 if (!has_local_producer_no_lock()) 524 if (!has_local_producer_no_lock())
518 return; 525 return;
519 producer_awakable_list_->AwakeForStateChange(new_producer_state); 526 producer_awakable_list_->AwakeForStateChange(new_producer_state);
520 } 527 }
521 528
522 void DataPipe::AwakeConsumerAwakablesForStateChangeNoLock( 529 void DataPipe::AwakeConsumerAwakablesForStateChangeNoLock(
523 const HandleSignalsState& new_consumer_state) { 530 const HandleSignalsState& new_consumer_state) {
524 lock_.AssertAcquired(); 531 lock_.AssertAcquired();
525 if (!has_local_consumer_no_lock()) 532 if (!has_local_consumer_no_lock())
526 return; 533 return;
527 consumer_awakable_list_->AwakeForStateChange(new_consumer_state); 534 consumer_awakable_list_->AwakeForStateChange(new_consumer_state);
528 } 535 }
529 536
530 } // namespace system 537 } // namespace system
531 } // namespace mojo 538 } // namespace mojo
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698