Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(164)

Side by Side Diff: mojo/edk/system/data_pipe.cc

Issue 926553006: Make DataPipe own an impl. (Closed) Base URL: https://github.com/domokit/mojo.git@local_data_pipe_impl_1
Patch Set: DataPipe::CreateLocal() Created 5 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « mojo/edk/system/data_pipe.h ('k') | mojo/edk/system/data_pipe_impl.h » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2013 The Chromium Authors. All rights reserved. 1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/edk/system/data_pipe.h" 5 #include "mojo/edk/system/data_pipe.h"
6 6
7 #include <string.h> 7 #include <string.h>
8 8
9 #include <algorithm> 9 #include <algorithm>
10 #include <limits> 10 #include <limits>
11 11
12 #include "base/logging.h" 12 #include "base/logging.h"
13 #include "mojo/edk/system/awakable_list.h" 13 #include "mojo/edk/system/awakable_list.h"
14 #include "mojo/edk/system/configuration.h" 14 #include "mojo/edk/system/configuration.h"
15 #include "mojo/edk/system/data_pipe_impl.h"
16 #include "mojo/edk/system/local_data_pipe_impl.h"
15 #include "mojo/edk/system/memory.h" 17 #include "mojo/edk/system/memory.h"
16 #include "mojo/edk/system/options_validation.h" 18 #include "mojo/edk/system/options_validation.h"
17 19
18 namespace mojo { 20 namespace mojo {
19 namespace system { 21 namespace system {
20 22
21 // static 23 // static
22 MojoCreateDataPipeOptions DataPipe::GetDefaultCreateOptions() { 24 MojoCreateDataPipeOptions DataPipe::GetDefaultCreateOptions() {
23 MojoCreateDataPipeOptions result = { 25 MojoCreateDataPipeOptions result = {
24 static_cast<uint32_t>(sizeof(MojoCreateDataPipeOptions)), 26 static_cast<uint32_t>(sizeof(MojoCreateDataPipeOptions)),
(...skipping 51 matching lines...) Expand 10 before | Expand all | Expand 10 after
76 if (reader.options().capacity_num_bytes % out_options->element_num_bytes != 0) 78 if (reader.options().capacity_num_bytes % out_options->element_num_bytes != 0)
77 return MOJO_RESULT_INVALID_ARGUMENT; 79 return MOJO_RESULT_INVALID_ARGUMENT;
78 if (reader.options().capacity_num_bytes > 80 if (reader.options().capacity_num_bytes >
79 GetConfiguration().max_data_pipe_capacity_bytes) 81 GetConfiguration().max_data_pipe_capacity_bytes)
80 return MOJO_RESULT_RESOURCE_EXHAUSTED; 82 return MOJO_RESULT_RESOURCE_EXHAUSTED;
81 out_options->capacity_num_bytes = reader.options().capacity_num_bytes; 83 out_options->capacity_num_bytes = reader.options().capacity_num_bytes;
82 84
83 return MOJO_RESULT_OK; 85 return MOJO_RESULT_OK;
84 } 86 }
85 87
88 // static
89 DataPipe* DataPipe::CreateLocal(
90 const MojoCreateDataPipeOptions& validated_options) {
91 return new DataPipe(true, true, validated_options,
92 make_scoped_ptr(new LocalDataPipeImpl()));
93 }
94
86 void DataPipe::ProducerCancelAllAwakables() { 95 void DataPipe::ProducerCancelAllAwakables() {
87 base::AutoLock locker(lock_); 96 base::AutoLock locker(lock_);
88 DCHECK(has_local_producer_no_lock()); 97 DCHECK(has_local_producer_no_lock());
89 producer_awakable_list_->CancelAll(); 98 producer_awakable_list_->CancelAll();
90 } 99 }
91 100
92 void DataPipe::ProducerClose() { 101 void DataPipe::ProducerClose() {
93 base::AutoLock locker(lock_); 102 base::AutoLock locker(lock_);
94 ProducerCloseNoLock(); 103 ProducerCloseNoLock();
95 } 104 }
(...skipping 13 matching lines...) Expand all
109 uint32_t max_num_bytes_to_write = num_bytes.Get(); 118 uint32_t max_num_bytes_to_write = num_bytes.Get();
110 if (max_num_bytes_to_write % element_num_bytes_ != 0) 119 if (max_num_bytes_to_write % element_num_bytes_ != 0)
111 return MOJO_RESULT_INVALID_ARGUMENT; 120 return MOJO_RESULT_INVALID_ARGUMENT;
112 121
113 if (max_num_bytes_to_write == 0) 122 if (max_num_bytes_to_write == 0)
114 return MOJO_RESULT_OK; // Nothing to do. 123 return MOJO_RESULT_OK; // Nothing to do.
115 124
116 uint32_t min_num_bytes_to_write = all_or_none ? max_num_bytes_to_write : 0; 125 uint32_t min_num_bytes_to_write = all_or_none ? max_num_bytes_to_write : 0;
117 126
118 HandleSignalsState old_consumer_state = 127 HandleSignalsState old_consumer_state =
119 ConsumerGetHandleSignalsStateImplNoLock(); 128 impl_->ConsumerGetHandleSignalsState();
120 MojoResult rv = ProducerWriteDataImplNoLock( 129 MojoResult rv = impl_->ProducerWriteData(
121 elements, num_bytes, max_num_bytes_to_write, min_num_bytes_to_write); 130 elements, num_bytes, max_num_bytes_to_write, min_num_bytes_to_write);
122 HandleSignalsState new_consumer_state = 131 HandleSignalsState new_consumer_state =
123 ConsumerGetHandleSignalsStateImplNoLock(); 132 impl_->ConsumerGetHandleSignalsState();
124 if (!new_consumer_state.equals(old_consumer_state)) 133 if (!new_consumer_state.equals(old_consumer_state))
125 AwakeConsumerAwakablesForStateChangeNoLock(new_consumer_state); 134 AwakeConsumerAwakablesForStateChangeNoLock(new_consumer_state);
126 return rv; 135 return rv;
127 } 136 }
128 137
129 MojoResult DataPipe::ProducerBeginWriteData( 138 MojoResult DataPipe::ProducerBeginWriteData(
130 UserPointer<void*> buffer, 139 UserPointer<void*> buffer,
131 UserPointer<uint32_t> buffer_num_bytes, 140 UserPointer<uint32_t> buffer_num_bytes,
132 bool all_or_none) { 141 bool all_or_none) {
133 base::AutoLock locker(lock_); 142 base::AutoLock locker(lock_);
134 DCHECK(has_local_producer_no_lock()); 143 DCHECK(has_local_producer_no_lock());
135 144
136 if (producer_in_two_phase_write_no_lock()) 145 if (producer_in_two_phase_write_no_lock())
137 return MOJO_RESULT_BUSY; 146 return MOJO_RESULT_BUSY;
138 if (!consumer_open_no_lock()) 147 if (!consumer_open_no_lock())
139 return MOJO_RESULT_FAILED_PRECONDITION; 148 return MOJO_RESULT_FAILED_PRECONDITION;
140 149
141 uint32_t min_num_bytes_to_write = 0; 150 uint32_t min_num_bytes_to_write = 0;
142 if (all_or_none) { 151 if (all_or_none) {
143 min_num_bytes_to_write = buffer_num_bytes.Get(); 152 min_num_bytes_to_write = buffer_num_bytes.Get();
144 if (min_num_bytes_to_write % element_num_bytes_ != 0) 153 if (min_num_bytes_to_write % element_num_bytes_ != 0)
145 return MOJO_RESULT_INVALID_ARGUMENT; 154 return MOJO_RESULT_INVALID_ARGUMENT;
146 } 155 }
147 156
148 MojoResult rv = ProducerBeginWriteDataImplNoLock(buffer, buffer_num_bytes, 157 MojoResult rv = impl_->ProducerBeginWriteData(buffer, buffer_num_bytes,
149 min_num_bytes_to_write); 158 min_num_bytes_to_write);
150 if (rv != MOJO_RESULT_OK) 159 if (rv != MOJO_RESULT_OK)
151 return rv; 160 return rv;
152 // Note: No need to awake producer awakables, even though we're going from 161 // Note: No need to awake producer awakables, even though we're going from
153 // writable to non-writable (since you can't wait on non-writability). 162 // writable to non-writable (since you can't wait on non-writability).
154 // Similarly, though this may have discarded data (in "may discard" mode), 163 // Similarly, though this may have discarded data (in "may discard" mode),
155 // making it non-readable, there's still no need to awake consumer awakables. 164 // making it non-readable, there's still no need to awake consumer awakables.
156 DCHECK(producer_in_two_phase_write_no_lock()); 165 DCHECK(producer_in_two_phase_write_no_lock());
157 return MOJO_RESULT_OK; 166 return MOJO_RESULT_OK;
158 } 167 }
159 168
160 MojoResult DataPipe::ProducerEndWriteData(uint32_t num_bytes_written) { 169 MojoResult DataPipe::ProducerEndWriteData(uint32_t num_bytes_written) {
161 base::AutoLock locker(lock_); 170 base::AutoLock locker(lock_);
162 DCHECK(has_local_producer_no_lock()); 171 DCHECK(has_local_producer_no_lock());
163 172
164 if (!producer_in_two_phase_write_no_lock()) 173 if (!producer_in_two_phase_write_no_lock())
165 return MOJO_RESULT_FAILED_PRECONDITION; 174 return MOJO_RESULT_FAILED_PRECONDITION;
166 // Note: Allow successful completion of the two-phase write even if the 175 // Note: Allow successful completion of the two-phase write even if the
167 // consumer has been closed. 176 // consumer has been closed.
168 177
169 HandleSignalsState old_consumer_state = 178 HandleSignalsState old_consumer_state =
170 ConsumerGetHandleSignalsStateImplNoLock(); 179 impl_->ConsumerGetHandleSignalsState();
171 MojoResult rv; 180 MojoResult rv;
172 if (num_bytes_written > producer_two_phase_max_num_bytes_written_ || 181 if (num_bytes_written > producer_two_phase_max_num_bytes_written_ ||
173 num_bytes_written % element_num_bytes_ != 0) { 182 num_bytes_written % element_num_bytes_ != 0) {
174 rv = MOJO_RESULT_INVALID_ARGUMENT; 183 rv = MOJO_RESULT_INVALID_ARGUMENT;
175 producer_two_phase_max_num_bytes_written_ = 0; 184 producer_two_phase_max_num_bytes_written_ = 0;
176 } else { 185 } else {
177 rv = ProducerEndWriteDataImplNoLock(num_bytes_written); 186 rv = impl_->ProducerEndWriteData(num_bytes_written);
178 } 187 }
179 // Two-phase write ended even on failure. 188 // Two-phase write ended even on failure.
180 DCHECK(!producer_in_two_phase_write_no_lock()); 189 DCHECK(!producer_in_two_phase_write_no_lock());
181 // If we're now writable, we *became* writable (since we weren't writable 190 // If we're now writable, we *became* writable (since we weren't writable
182 // during the two-phase write), so awake producer awakables. 191 // during the two-phase write), so awake producer awakables.
183 HandleSignalsState new_producer_state = 192 HandleSignalsState new_producer_state =
184 ProducerGetHandleSignalsStateImplNoLock(); 193 impl_->ProducerGetHandleSignalsState();
185 if (new_producer_state.satisfies(MOJO_HANDLE_SIGNAL_WRITABLE)) 194 if (new_producer_state.satisfies(MOJO_HANDLE_SIGNAL_WRITABLE))
186 AwakeProducerAwakablesForStateChangeNoLock(new_producer_state); 195 AwakeProducerAwakablesForStateChangeNoLock(new_producer_state);
187 HandleSignalsState new_consumer_state = 196 HandleSignalsState new_consumer_state =
188 ConsumerGetHandleSignalsStateImplNoLock(); 197 impl_->ConsumerGetHandleSignalsState();
189 if (!new_consumer_state.equals(old_consumer_state)) 198 if (!new_consumer_state.equals(old_consumer_state))
190 AwakeConsumerAwakablesForStateChangeNoLock(new_consumer_state); 199 AwakeConsumerAwakablesForStateChangeNoLock(new_consumer_state);
191 return rv; 200 return rv;
192 } 201 }
193 202
194 HandleSignalsState DataPipe::ProducerGetHandleSignalsState() { 203 HandleSignalsState DataPipe::ProducerGetHandleSignalsState() {
195 base::AutoLock locker(lock_); 204 base::AutoLock locker(lock_);
196 DCHECK(has_local_producer_no_lock()); 205 DCHECK(has_local_producer_no_lock());
197 return ProducerGetHandleSignalsStateImplNoLock(); 206 return impl_->ProducerGetHandleSignalsState();
198 } 207 }
199 208
200 MojoResult DataPipe::ProducerAddAwakable(Awakable* awakable, 209 MojoResult DataPipe::ProducerAddAwakable(Awakable* awakable,
201 MojoHandleSignals signals, 210 MojoHandleSignals signals,
202 uint32_t context, 211 uint32_t context,
203 HandleSignalsState* signals_state) { 212 HandleSignalsState* signals_state) {
204 base::AutoLock locker(lock_); 213 base::AutoLock locker(lock_);
205 DCHECK(has_local_producer_no_lock()); 214 DCHECK(has_local_producer_no_lock());
206 215
207 HandleSignalsState producer_state = ProducerGetHandleSignalsStateImplNoLock(); 216 HandleSignalsState producer_state = impl_->ProducerGetHandleSignalsState();
208 if (producer_state.satisfies(signals)) { 217 if (producer_state.satisfies(signals)) {
209 if (signals_state) 218 if (signals_state)
210 *signals_state = producer_state; 219 *signals_state = producer_state;
211 return MOJO_RESULT_ALREADY_EXISTS; 220 return MOJO_RESULT_ALREADY_EXISTS;
212 } 221 }
213 if (!producer_state.can_satisfy(signals)) { 222 if (!producer_state.can_satisfy(signals)) {
214 if (signals_state) 223 if (signals_state)
215 *signals_state = producer_state; 224 *signals_state = producer_state;
216 return MOJO_RESULT_FAILED_PRECONDITION; 225 return MOJO_RESULT_FAILED_PRECONDITION;
217 } 226 }
218 227
219 producer_awakable_list_->Add(awakable, signals, context); 228 producer_awakable_list_->Add(awakable, signals, context);
220 return MOJO_RESULT_OK; 229 return MOJO_RESULT_OK;
221 } 230 }
222 231
223 void DataPipe::ProducerRemoveAwakable(Awakable* awakable, 232 void DataPipe::ProducerRemoveAwakable(Awakable* awakable,
224 HandleSignalsState* signals_state) { 233 HandleSignalsState* signals_state) {
225 base::AutoLock locker(lock_); 234 base::AutoLock locker(lock_);
226 DCHECK(has_local_producer_no_lock()); 235 DCHECK(has_local_producer_no_lock());
227 producer_awakable_list_->Remove(awakable); 236 producer_awakable_list_->Remove(awakable);
228 if (signals_state) 237 if (signals_state)
229 *signals_state = ProducerGetHandleSignalsStateImplNoLock(); 238 *signals_state = impl_->ProducerGetHandleSignalsState();
230 } 239 }
231 240
232 void DataPipe::ProducerStartSerialize(Channel* channel, 241 void DataPipe::ProducerStartSerialize(Channel* channel,
233 size_t* max_size, 242 size_t* max_size,
234 size_t* max_platform_handles) { 243 size_t* max_platform_handles) {
235 base::AutoLock locker(lock_); 244 base::AutoLock locker(lock_);
236 DCHECK(has_local_producer_no_lock()); 245 DCHECK(has_local_producer_no_lock());
237 ProducerStartSerializeImplNoLock(channel, max_size, max_platform_handles); 246 impl_->ProducerStartSerialize(channel, max_size, max_platform_handles);
238 } 247 }
239 248
240 bool DataPipe::ProducerEndSerialize( 249 bool DataPipe::ProducerEndSerialize(
241 Channel* channel, 250 Channel* channel,
242 void* destination, 251 void* destination,
243 size_t* actual_size, 252 size_t* actual_size,
244 embedder::PlatformHandleVector* platform_handles) { 253 embedder::PlatformHandleVector* platform_handles) {
245 base::AutoLock locker(lock_); 254 base::AutoLock locker(lock_);
246 DCHECK(has_local_producer_no_lock()); 255 DCHECK(has_local_producer_no_lock());
247 return ProducerEndSerializeImplNoLock(channel, destination, actual_size, 256 return impl_->ProducerEndSerialize(channel, destination, actual_size,
248 platform_handles); 257 platform_handles);
249 } 258 }
250 259
251 bool DataPipe::ProducerIsBusy() const { 260 bool DataPipe::ProducerIsBusy() const {
252 base::AutoLock locker(lock_); 261 base::AutoLock locker(lock_);
253 return producer_in_two_phase_write_no_lock(); 262 return producer_in_two_phase_write_no_lock();
254 } 263 }
255 264
256 void DataPipe::ConsumerCancelAllAwakables() { 265 void DataPipe::ConsumerCancelAllAwakables() {
257 base::AutoLock locker(lock_); 266 base::AutoLock locker(lock_);
258 DCHECK(has_local_consumer_no_lock()); 267 DCHECK(has_local_consumer_no_lock());
(...skipping 18 matching lines...) Expand all
277 uint32_t max_num_bytes_to_read = num_bytes.Get(); 286 uint32_t max_num_bytes_to_read = num_bytes.Get();
278 if (max_num_bytes_to_read % element_num_bytes_ != 0) 287 if (max_num_bytes_to_read % element_num_bytes_ != 0)
279 return MOJO_RESULT_INVALID_ARGUMENT; 288 return MOJO_RESULT_INVALID_ARGUMENT;
280 289
281 if (max_num_bytes_to_read == 0) 290 if (max_num_bytes_to_read == 0)
282 return MOJO_RESULT_OK; // Nothing to do. 291 return MOJO_RESULT_OK; // Nothing to do.
283 292
284 uint32_t min_num_bytes_to_read = all_or_none ? max_num_bytes_to_read : 0; 293 uint32_t min_num_bytes_to_read = all_or_none ? max_num_bytes_to_read : 0;
285 294
286 HandleSignalsState old_producer_state = 295 HandleSignalsState old_producer_state =
287 ProducerGetHandleSignalsStateImplNoLock(); 296 impl_->ProducerGetHandleSignalsState();
288 MojoResult rv = ConsumerReadDataImplNoLock( 297 MojoResult rv = impl_->ConsumerReadData(
289 elements, num_bytes, max_num_bytes_to_read, min_num_bytes_to_read, peek); 298 elements, num_bytes, max_num_bytes_to_read, min_num_bytes_to_read, peek);
290 HandleSignalsState new_producer_state = 299 HandleSignalsState new_producer_state =
291 ProducerGetHandleSignalsStateImplNoLock(); 300 impl_->ProducerGetHandleSignalsState();
292 if (!new_producer_state.equals(old_producer_state)) 301 if (!new_producer_state.equals(old_producer_state))
293 AwakeProducerAwakablesForStateChangeNoLock(new_producer_state); 302 AwakeProducerAwakablesForStateChangeNoLock(new_producer_state);
294 return rv; 303 return rv;
295 } 304 }
296 305
297 MojoResult DataPipe::ConsumerDiscardData(UserPointer<uint32_t> num_bytes, 306 MojoResult DataPipe::ConsumerDiscardData(UserPointer<uint32_t> num_bytes,
298 bool all_or_none) { 307 bool all_or_none) {
299 base::AutoLock locker(lock_); 308 base::AutoLock locker(lock_);
300 DCHECK(has_local_consumer_no_lock()); 309 DCHECK(has_local_consumer_no_lock());
301 310
302 if (consumer_in_two_phase_read_no_lock()) 311 if (consumer_in_two_phase_read_no_lock())
303 return MOJO_RESULT_BUSY; 312 return MOJO_RESULT_BUSY;
304 313
305 uint32_t max_num_bytes_to_discard = num_bytes.Get(); 314 uint32_t max_num_bytes_to_discard = num_bytes.Get();
306 if (max_num_bytes_to_discard % element_num_bytes_ != 0) 315 if (max_num_bytes_to_discard % element_num_bytes_ != 0)
307 return MOJO_RESULT_INVALID_ARGUMENT; 316 return MOJO_RESULT_INVALID_ARGUMENT;
308 317
309 if (max_num_bytes_to_discard == 0) 318 if (max_num_bytes_to_discard == 0)
310 return MOJO_RESULT_OK; // Nothing to do. 319 return MOJO_RESULT_OK; // Nothing to do.
311 320
312 uint32_t min_num_bytes_to_discard = 321 uint32_t min_num_bytes_to_discard =
313 all_or_none ? max_num_bytes_to_discard : 0; 322 all_or_none ? max_num_bytes_to_discard : 0;
314 323
315 HandleSignalsState old_producer_state = 324 HandleSignalsState old_producer_state =
316 ProducerGetHandleSignalsStateImplNoLock(); 325 impl_->ProducerGetHandleSignalsState();
317 MojoResult rv = ConsumerDiscardDataImplNoLock( 326 MojoResult rv = impl_->ConsumerDiscardData(
318 num_bytes, max_num_bytes_to_discard, min_num_bytes_to_discard); 327 num_bytes, max_num_bytes_to_discard, min_num_bytes_to_discard);
319 HandleSignalsState new_producer_state = 328 HandleSignalsState new_producer_state =
320 ProducerGetHandleSignalsStateImplNoLock(); 329 impl_->ProducerGetHandleSignalsState();
321 if (!new_producer_state.equals(old_producer_state)) 330 if (!new_producer_state.equals(old_producer_state))
322 AwakeProducerAwakablesForStateChangeNoLock(new_producer_state); 331 AwakeProducerAwakablesForStateChangeNoLock(new_producer_state);
323 return rv; 332 return rv;
324 } 333 }
325 334
326 MojoResult DataPipe::ConsumerQueryData(UserPointer<uint32_t> num_bytes) { 335 MojoResult DataPipe::ConsumerQueryData(UserPointer<uint32_t> num_bytes) {
327 base::AutoLock locker(lock_); 336 base::AutoLock locker(lock_);
328 DCHECK(has_local_consumer_no_lock()); 337 DCHECK(has_local_consumer_no_lock());
329 338
330 if (consumer_in_two_phase_read_no_lock()) 339 if (consumer_in_two_phase_read_no_lock())
331 return MOJO_RESULT_BUSY; 340 return MOJO_RESULT_BUSY;
332 341
333 // Note: Don't need to validate |*num_bytes| for query. 342 // Note: Don't need to validate |*num_bytes| for query.
334 return ConsumerQueryDataImplNoLock(num_bytes); 343 return impl_->ConsumerQueryData(num_bytes);
335 } 344 }
336 345
337 MojoResult DataPipe::ConsumerBeginReadData( 346 MojoResult DataPipe::ConsumerBeginReadData(
338 UserPointer<const void*> buffer, 347 UserPointer<const void*> buffer,
339 UserPointer<uint32_t> buffer_num_bytes, 348 UserPointer<uint32_t> buffer_num_bytes,
340 bool all_or_none) { 349 bool all_or_none) {
341 base::AutoLock locker(lock_); 350 base::AutoLock locker(lock_);
342 DCHECK(has_local_consumer_no_lock()); 351 DCHECK(has_local_consumer_no_lock());
343 352
344 if (consumer_in_two_phase_read_no_lock()) 353 if (consumer_in_two_phase_read_no_lock())
345 return MOJO_RESULT_BUSY; 354 return MOJO_RESULT_BUSY;
346 355
347 uint32_t min_num_bytes_to_read = 0; 356 uint32_t min_num_bytes_to_read = 0;
348 if (all_or_none) { 357 if (all_or_none) {
349 min_num_bytes_to_read = buffer_num_bytes.Get(); 358 min_num_bytes_to_read = buffer_num_bytes.Get();
350 if (min_num_bytes_to_read % element_num_bytes_ != 0) 359 if (min_num_bytes_to_read % element_num_bytes_ != 0)
351 return MOJO_RESULT_INVALID_ARGUMENT; 360 return MOJO_RESULT_INVALID_ARGUMENT;
352 } 361 }
353 362
354 MojoResult rv = ConsumerBeginReadDataImplNoLock(buffer, buffer_num_bytes, 363 MojoResult rv = impl_->ConsumerBeginReadData(buffer, buffer_num_bytes,
355 min_num_bytes_to_read); 364 min_num_bytes_to_read);
356 if (rv != MOJO_RESULT_OK) 365 if (rv != MOJO_RESULT_OK)
357 return rv; 366 return rv;
358 DCHECK(consumer_in_two_phase_read_no_lock()); 367 DCHECK(consumer_in_two_phase_read_no_lock());
359 return MOJO_RESULT_OK; 368 return MOJO_RESULT_OK;
360 } 369 }
361 370
362 MojoResult DataPipe::ConsumerEndReadData(uint32_t num_bytes_read) { 371 MojoResult DataPipe::ConsumerEndReadData(uint32_t num_bytes_read) {
363 base::AutoLock locker(lock_); 372 base::AutoLock locker(lock_);
364 DCHECK(has_local_consumer_no_lock()); 373 DCHECK(has_local_consumer_no_lock());
365 374
366 if (!consumer_in_two_phase_read_no_lock()) 375 if (!consumer_in_two_phase_read_no_lock())
367 return MOJO_RESULT_FAILED_PRECONDITION; 376 return MOJO_RESULT_FAILED_PRECONDITION;
368 377
369 HandleSignalsState old_producer_state = 378 HandleSignalsState old_producer_state =
370 ProducerGetHandleSignalsStateImplNoLock(); 379 impl_->ProducerGetHandleSignalsState();
371 MojoResult rv; 380 MojoResult rv;
372 if (num_bytes_read > consumer_two_phase_max_num_bytes_read_ || 381 if (num_bytes_read > consumer_two_phase_max_num_bytes_read_ ||
373 num_bytes_read % element_num_bytes_ != 0) { 382 num_bytes_read % element_num_bytes_ != 0) {
374 rv = MOJO_RESULT_INVALID_ARGUMENT; 383 rv = MOJO_RESULT_INVALID_ARGUMENT;
375 consumer_two_phase_max_num_bytes_read_ = 0; 384 consumer_two_phase_max_num_bytes_read_ = 0;
376 } else { 385 } else {
377 rv = ConsumerEndReadDataImplNoLock(num_bytes_read); 386 rv = impl_->ConsumerEndReadData(num_bytes_read);
378 } 387 }
379 // Two-phase read ended even on failure. 388 // Two-phase read ended even on failure.
380 DCHECK(!consumer_in_two_phase_read_no_lock()); 389 DCHECK(!consumer_in_two_phase_read_no_lock());
381 // If we're now readable, we *became* readable (since we weren't readable 390 // If we're now readable, we *became* readable (since we weren't readable
382 // during the two-phase read), so awake consumer awakables. 391 // during the two-phase read), so awake consumer awakables.
383 HandleSignalsState new_consumer_state = 392 HandleSignalsState new_consumer_state =
384 ConsumerGetHandleSignalsStateImplNoLock(); 393 impl_->ConsumerGetHandleSignalsState();
385 if (new_consumer_state.satisfies(MOJO_HANDLE_SIGNAL_READABLE)) 394 if (new_consumer_state.satisfies(MOJO_HANDLE_SIGNAL_READABLE))
386 AwakeConsumerAwakablesForStateChangeNoLock(new_consumer_state); 395 AwakeConsumerAwakablesForStateChangeNoLock(new_consumer_state);
387 HandleSignalsState new_producer_state = 396 HandleSignalsState new_producer_state =
388 ProducerGetHandleSignalsStateImplNoLock(); 397 impl_->ProducerGetHandleSignalsState();
389 if (!new_producer_state.equals(old_producer_state)) 398 if (!new_producer_state.equals(old_producer_state))
390 AwakeProducerAwakablesForStateChangeNoLock(new_producer_state); 399 AwakeProducerAwakablesForStateChangeNoLock(new_producer_state);
391 return rv; 400 return rv;
392 } 401 }
393 402
394 HandleSignalsState DataPipe::ConsumerGetHandleSignalsState() { 403 HandleSignalsState DataPipe::ConsumerGetHandleSignalsState() {
395 base::AutoLock locker(lock_); 404 base::AutoLock locker(lock_);
396 DCHECK(has_local_consumer_no_lock()); 405 DCHECK(has_local_consumer_no_lock());
397 return ConsumerGetHandleSignalsStateImplNoLock(); 406 return impl_->ConsumerGetHandleSignalsState();
398 } 407 }
399 408
400 MojoResult DataPipe::ConsumerAddAwakable(Awakable* awakable, 409 MojoResult DataPipe::ConsumerAddAwakable(Awakable* awakable,
401 MojoHandleSignals signals, 410 MojoHandleSignals signals,
402 uint32_t context, 411 uint32_t context,
403 HandleSignalsState* signals_state) { 412 HandleSignalsState* signals_state) {
404 base::AutoLock locker(lock_); 413 base::AutoLock locker(lock_);
405 DCHECK(has_local_consumer_no_lock()); 414 DCHECK(has_local_consumer_no_lock());
406 415
407 HandleSignalsState consumer_state = ConsumerGetHandleSignalsStateImplNoLock(); 416 HandleSignalsState consumer_state = impl_->ConsumerGetHandleSignalsState();
408 if (consumer_state.satisfies(signals)) { 417 if (consumer_state.satisfies(signals)) {
409 if (signals_state) 418 if (signals_state)
410 *signals_state = consumer_state; 419 *signals_state = consumer_state;
411 return MOJO_RESULT_ALREADY_EXISTS; 420 return MOJO_RESULT_ALREADY_EXISTS;
412 } 421 }
413 if (!consumer_state.can_satisfy(signals)) { 422 if (!consumer_state.can_satisfy(signals)) {
414 if (signals_state) 423 if (signals_state)
415 *signals_state = consumer_state; 424 *signals_state = consumer_state;
416 return MOJO_RESULT_FAILED_PRECONDITION; 425 return MOJO_RESULT_FAILED_PRECONDITION;
417 } 426 }
418 427
419 consumer_awakable_list_->Add(awakable, signals, context); 428 consumer_awakable_list_->Add(awakable, signals, context);
420 return MOJO_RESULT_OK; 429 return MOJO_RESULT_OK;
421 } 430 }
422 431
423 void DataPipe::ConsumerRemoveAwakable(Awakable* awakable, 432 void DataPipe::ConsumerRemoveAwakable(Awakable* awakable,
424 HandleSignalsState* signals_state) { 433 HandleSignalsState* signals_state) {
425 base::AutoLock locker(lock_); 434 base::AutoLock locker(lock_);
426 DCHECK(has_local_consumer_no_lock()); 435 DCHECK(has_local_consumer_no_lock());
427 consumer_awakable_list_->Remove(awakable); 436 consumer_awakable_list_->Remove(awakable);
428 if (signals_state) 437 if (signals_state)
429 *signals_state = ConsumerGetHandleSignalsStateImplNoLock(); 438 *signals_state = impl_->ConsumerGetHandleSignalsState();
430 } 439 }
431 440
432 void DataPipe::ConsumerStartSerialize(Channel* channel, 441 void DataPipe::ConsumerStartSerialize(Channel* channel,
433 size_t* max_size, 442 size_t* max_size,
434 size_t* max_platform_handles) { 443 size_t* max_platform_handles) {
435 base::AutoLock locker(lock_); 444 base::AutoLock locker(lock_);
436 DCHECK(has_local_consumer_no_lock()); 445 DCHECK(has_local_consumer_no_lock());
437 ConsumerStartSerializeImplNoLock(channel, max_size, max_platform_handles); 446 impl_->ConsumerStartSerialize(channel, max_size, max_platform_handles);
438 } 447 }
439 448
440 bool DataPipe::ConsumerEndSerialize( 449 bool DataPipe::ConsumerEndSerialize(
441 Channel* channel, 450 Channel* channel,
442 void* destination, 451 void* destination,
443 size_t* actual_size, 452 size_t* actual_size,
444 embedder::PlatformHandleVector* platform_handles) { 453 embedder::PlatformHandleVector* platform_handles) {
445 base::AutoLock locker(lock_); 454 base::AutoLock locker(lock_);
446 DCHECK(has_local_consumer_no_lock()); 455 DCHECK(has_local_consumer_no_lock());
447 return ConsumerEndSerializeImplNoLock(channel, destination, actual_size, 456 return impl_->ConsumerEndSerialize(channel, destination, actual_size,
448 platform_handles); 457 platform_handles);
449 } 458 }
450 459
451 bool DataPipe::ConsumerIsBusy() const { 460 bool DataPipe::ConsumerIsBusy() const {
452 base::AutoLock locker(lock_); 461 base::AutoLock locker(lock_);
453 return consumer_in_two_phase_read_no_lock(); 462 return consumer_in_two_phase_read_no_lock();
454 } 463 }
455 464
456 DataPipe::DataPipe(bool has_local_producer, 465 DataPipe::DataPipe(bool has_local_producer,
457 bool has_local_consumer, 466 bool has_local_consumer,
458 const MojoCreateDataPipeOptions& validated_options) 467 const MojoCreateDataPipeOptions& validated_options,
468 scoped_ptr<DataPipeImpl> impl)
459 : may_discard_((validated_options.flags & 469 : may_discard_((validated_options.flags &
460 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_MAY_DISCARD)), 470 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_MAY_DISCARD)),
461 element_num_bytes_(validated_options.element_num_bytes), 471 element_num_bytes_(validated_options.element_num_bytes),
462 capacity_num_bytes_(validated_options.capacity_num_bytes), 472 capacity_num_bytes_(validated_options.capacity_num_bytes),
463 producer_open_(true), 473 producer_open_(true),
464 consumer_open_(true), 474 consumer_open_(true),
465 producer_awakable_list_(has_local_producer ? new AwakableList() 475 producer_awakable_list_(has_local_producer ? new AwakableList()
466 : nullptr), 476 : nullptr),
467 consumer_awakable_list_(has_local_consumer ? new AwakableList() 477 consumer_awakable_list_(has_local_consumer ? new AwakableList()
468 : nullptr), 478 : nullptr),
469 producer_two_phase_max_num_bytes_written_(0), 479 producer_two_phase_max_num_bytes_written_(0),
470 consumer_two_phase_max_num_bytes_read_(0) { 480 consumer_two_phase_max_num_bytes_read_(0),
481 impl_(impl.Pass()) {
482 impl_->set_owner(this);
483
484 #if !defined(NDEBUG) || defined(DCHECK_ALWAYS_ON)
471 // Check that the passed in options actually are validated. 485 // Check that the passed in options actually are validated.
472 MojoCreateDataPipeOptions unused = {0}; 486 MojoCreateDataPipeOptions unused = {0};
473 DCHECK_EQ(ValidateCreateOptions(MakeUserPointer(&validated_options), &unused), 487 DCHECK_EQ(ValidateCreateOptions(MakeUserPointer(&validated_options), &unused),
474 MOJO_RESULT_OK); 488 MOJO_RESULT_OK);
489 #endif // !defined(NDEBUG) || defined(DCHECK_ALWAYS_ON)
475 } 490 }
476 491
477 DataPipe::~DataPipe() { 492 DataPipe::~DataPipe() {
478 DCHECK(!producer_open_); 493 DCHECK(!producer_open_);
479 DCHECK(!consumer_open_); 494 DCHECK(!consumer_open_);
480 DCHECK(!producer_awakable_list_); 495 DCHECK(!producer_awakable_list_);
481 DCHECK(!consumer_awakable_list_); 496 DCHECK(!consumer_awakable_list_);
482 } 497 }
483 498
484 void DataPipe::ProducerCloseNoLock() { 499 void DataPipe::ProducerCloseNoLock() {
485 lock_.AssertAcquired(); 500 lock_.AssertAcquired();
486 DCHECK(producer_open_); 501 DCHECK(producer_open_);
487 producer_open_ = false; 502 producer_open_ = false;
488 DCHECK(has_local_producer_no_lock()); 503 DCHECK(has_local_producer_no_lock());
489 producer_awakable_list_.reset(); 504 producer_awakable_list_.reset();
490 // Not a bug, except possibly in "user" code. 505 // Not a bug, except possibly in "user" code.
491 DVLOG_IF(2, producer_in_two_phase_write_no_lock()) 506 DVLOG_IF(2, producer_in_two_phase_write_no_lock())
492 << "Producer closed with active two-phase write"; 507 << "Producer closed with active two-phase write";
493 producer_two_phase_max_num_bytes_written_ = 0; 508 producer_two_phase_max_num_bytes_written_ = 0;
494 ProducerCloseImplNoLock(); 509 impl_->ProducerClose();
495 AwakeConsumerAwakablesForStateChangeNoLock( 510 AwakeConsumerAwakablesForStateChangeNoLock(
496 ConsumerGetHandleSignalsStateImplNoLock()); 511 impl_->ConsumerGetHandleSignalsState());
497 } 512 }
498 513
499 void DataPipe::ConsumerCloseNoLock() { 514 void DataPipe::ConsumerCloseNoLock() {
500 lock_.AssertAcquired(); 515 lock_.AssertAcquired();
501 DCHECK(consumer_open_); 516 DCHECK(consumer_open_);
502 consumer_open_ = false; 517 consumer_open_ = false;
503 DCHECK(has_local_consumer_no_lock()); 518 DCHECK(has_local_consumer_no_lock());
504 consumer_awakable_list_.reset(); 519 consumer_awakable_list_.reset();
505 // Not a bug, except possibly in "user" code. 520 // Not a bug, except possibly in "user" code.
506 DVLOG_IF(2, consumer_in_two_phase_read_no_lock()) 521 DVLOG_IF(2, consumer_in_two_phase_read_no_lock())
507 << "Consumer closed with active two-phase read"; 522 << "Consumer closed with active two-phase read";
508 consumer_two_phase_max_num_bytes_read_ = 0; 523 consumer_two_phase_max_num_bytes_read_ = 0;
509 ConsumerCloseImplNoLock(); 524 impl_->ConsumerClose();
510 AwakeProducerAwakablesForStateChangeNoLock( 525 AwakeProducerAwakablesForStateChangeNoLock(
511 ProducerGetHandleSignalsStateImplNoLock()); 526 impl_->ProducerGetHandleSignalsState());
512 } 527 }
513 528
514 void DataPipe::AwakeProducerAwakablesForStateChangeNoLock( 529 void DataPipe::AwakeProducerAwakablesForStateChangeNoLock(
515 const HandleSignalsState& new_producer_state) { 530 const HandleSignalsState& new_producer_state) {
516 lock_.AssertAcquired(); 531 lock_.AssertAcquired();
517 if (!has_local_producer_no_lock()) 532 if (!has_local_producer_no_lock())
518 return; 533 return;
519 producer_awakable_list_->AwakeForStateChange(new_producer_state); 534 producer_awakable_list_->AwakeForStateChange(new_producer_state);
520 } 535 }
521 536
522 void DataPipe::AwakeConsumerAwakablesForStateChangeNoLock( 537 void DataPipe::AwakeConsumerAwakablesForStateChangeNoLock(
523 const HandleSignalsState& new_consumer_state) { 538 const HandleSignalsState& new_consumer_state) {
524 lock_.AssertAcquired(); 539 lock_.AssertAcquired();
525 if (!has_local_consumer_no_lock()) 540 if (!has_local_consumer_no_lock())
526 return; 541 return;
527 consumer_awakable_list_->AwakeForStateChange(new_consumer_state); 542 consumer_awakable_list_->AwakeForStateChange(new_consumer_state);
528 } 543 }
529 544
530 } // namespace system 545 } // namespace system
531 } // namespace mojo 546 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/edk/system/data_pipe.h ('k') | mojo/edk/system/data_pipe_impl.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698