Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(201)

Side by Side Diff: mojo/system/data_pipe.cc

Issue 129163003: Mojo: DataPipe: Implement "may discard" for two-phase writes. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: oops Created 6 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « mojo/system/data_pipe.h ('k') | mojo/system/local_data_pipe.h » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2013 The Chromium Authors. All rights reserved. 1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/system/data_pipe.h" 5 #include "mojo/system/data_pipe.h"
6 6
7 #include <string.h> 7 #include <string.h>
8 8
9 #include <algorithm> 9 #include <algorithm>
10 #include <limits> 10 #include <limits>
(...skipping 61 matching lines...) Expand 10 before | Expand all | Expand 10 after
72 base::AutoLock locker(lock_); 72 base::AutoLock locker(lock_);
73 DCHECK(producer_open_); 73 DCHECK(producer_open_);
74 producer_open_ = false; 74 producer_open_ = false;
75 DCHECK(has_local_producer_no_lock()); 75 DCHECK(has_local_producer_no_lock());
76 producer_waiter_list_.reset(); 76 producer_waiter_list_.reset();
77 // Not a bug, except possibly in "user" code. 77 // Not a bug, except possibly in "user" code.
78 DVLOG_IF(2, producer_in_two_phase_write_no_lock()) 78 DVLOG_IF(2, producer_in_two_phase_write_no_lock())
79 << "Producer closed with active two-phase write"; 79 << "Producer closed with active two-phase write";
80 producer_two_phase_max_num_bytes_written_ = 0; 80 producer_two_phase_max_num_bytes_written_ = 0;
81 ProducerCloseImplNoLock(); 81 ProducerCloseImplNoLock();
82 AwakeConsumerWaitersForStateChangeNoLock();
82 } 83 }
83 84
84 MojoResult DataPipe::ProducerWriteData(const void* elements, 85 MojoResult DataPipe::ProducerWriteData(const void* elements,
85 uint32_t* num_bytes, 86 uint32_t* num_bytes,
86 bool all_or_none) { 87 bool all_or_none) {
87 base::AutoLock locker(lock_); 88 base::AutoLock locker(lock_);
88 DCHECK(has_local_producer_no_lock()); 89 DCHECK(has_local_producer_no_lock());
89 90
90 if (producer_in_two_phase_write_no_lock()) 91 if (producer_in_two_phase_write_no_lock())
91 return MOJO_RESULT_BUSY; 92 return MOJO_RESULT_BUSY;
92 if (!consumer_open_no_lock()) 93 if (!consumer_open_no_lock())
93 return MOJO_RESULT_FAILED_PRECONDITION; 94 return MOJO_RESULT_FAILED_PRECONDITION;
94 95
95 // Returning "busy" takes priority over "invalid argument". 96 // Returning "busy" takes priority over "invalid argument".
96 if (*num_bytes % element_num_bytes_ != 0) 97 if (*num_bytes % element_num_bytes_ != 0)
97 return MOJO_RESULT_INVALID_ARGUMENT; 98 return MOJO_RESULT_INVALID_ARGUMENT;
98 99
99 if (*num_bytes == 0) 100 if (*num_bytes == 0)
100 return MOJO_RESULT_OK; // Nothing to do. 101 return MOJO_RESULT_OK; // Nothing to do.
101 102
102 return ProducerWriteDataImplNoLock(elements, num_bytes, all_or_none); 103 MojoWaitFlags old_consumer_satisfied_flags = ConsumerSatisfiedFlagsNoLock();
104 MojoResult rv = ProducerWriteDataImplNoLock(elements, num_bytes, all_or_none);
105 if (ConsumerSatisfiedFlagsNoLock() != old_consumer_satisfied_flags)
106 AwakeConsumerWaitersForStateChangeNoLock();
107 return rv;
103 } 108 }
104 109
105 MojoResult DataPipe::ProducerBeginWriteData(void** buffer, 110 MojoResult DataPipe::ProducerBeginWriteData(void** buffer,
106 uint32_t* buffer_num_bytes, 111 uint32_t* buffer_num_bytes,
107 bool all_or_none) { 112 bool all_or_none) {
108 base::AutoLock locker(lock_); 113 base::AutoLock locker(lock_);
109 DCHECK(has_local_producer_no_lock()); 114 DCHECK(has_local_producer_no_lock());
110 115
111 if (producer_in_two_phase_write_no_lock()) 116 if (producer_in_two_phase_write_no_lock())
112 return MOJO_RESULT_BUSY; 117 return MOJO_RESULT_BUSY;
113 if (!consumer_open_no_lock()) 118 if (!consumer_open_no_lock())
114 return MOJO_RESULT_FAILED_PRECONDITION; 119 return MOJO_RESULT_FAILED_PRECONDITION;
115 120
116 MojoResult rv = ProducerBeginWriteDataImplNoLock(buffer, buffer_num_bytes, 121 MojoResult rv = ProducerBeginWriteDataImplNoLock(buffer, buffer_num_bytes,
117 all_or_none); 122 all_or_none);
118 if (rv != MOJO_RESULT_OK) 123 if (rv != MOJO_RESULT_OK)
119 return rv; 124 return rv;
120 125 // Note: No need to awake producer waiters, even though we're going from
126 // writable to non-writable (since you can't wait on non-writability).
127 // Similarly, though this may have discarded data (in "may discard" mode),
128 // making it non-readable, there's still no need to awake consumer waiters.
121 DCHECK(producer_in_two_phase_write_no_lock()); 129 DCHECK(producer_in_two_phase_write_no_lock());
122 return MOJO_RESULT_OK; 130 return MOJO_RESULT_OK;
123 } 131 }
124 132
125 MojoResult DataPipe::ProducerEndWriteData(uint32_t num_bytes_written) { 133 MojoResult DataPipe::ProducerEndWriteData(uint32_t num_bytes_written) {
126 base::AutoLock locker(lock_); 134 base::AutoLock locker(lock_);
127 DCHECK(has_local_producer_no_lock()); 135 DCHECK(has_local_producer_no_lock());
128 136
129 if (!producer_in_two_phase_write_no_lock()) 137 if (!producer_in_two_phase_write_no_lock())
130 return MOJO_RESULT_FAILED_PRECONDITION; 138 return MOJO_RESULT_FAILED_PRECONDITION;
131 // Note: Allow successful completion of the two-phase write even if the 139 // Note: Allow successful completion of the two-phase write even if the
132 // consumer has been closed. 140 // consumer has been closed.
133 141
142 MojoWaitFlags old_consumer_satisfied_flags = ConsumerSatisfiedFlagsNoLock();
134 MojoResult rv = ProducerEndWriteDataImplNoLock(num_bytes_written); 143 MojoResult rv = ProducerEndWriteDataImplNoLock(num_bytes_written);
135 // Two-phase write ended even on failure. 144 // Two-phase write ended even on failure.
136 DCHECK(!producer_in_two_phase_write_no_lock()); 145 DCHECK(!producer_in_two_phase_write_no_lock());
146 // If we're now writable, we *became* writable (since we weren't writable
147 // during the two-phase write), so awake producer waiters.
148 if ((ProducerSatisfiedFlagsNoLock() & MOJO_WAIT_FLAG_WRITABLE))
149 AwakeProducerWaitersForStateChangeNoLock();
150 if (ConsumerSatisfiedFlagsNoLock() != old_consumer_satisfied_flags)
151 AwakeConsumerWaitersForStateChangeNoLock();
137 return rv; 152 return rv;
138 } 153 }
139 154
140 MojoResult DataPipe::ProducerAddWaiter(Waiter* waiter, 155 MojoResult DataPipe::ProducerAddWaiter(Waiter* waiter,
141 MojoWaitFlags flags, 156 MojoWaitFlags flags,
142 MojoResult wake_result) { 157 MojoResult wake_result) {
143 base::AutoLock locker(lock_); 158 base::AutoLock locker(lock_);
144 DCHECK(has_local_producer_no_lock()); 159 DCHECK(has_local_producer_no_lock());
145 160
146 if ((flags & ProducerSatisfiedFlagsNoLock())) 161 if ((flags & ProducerSatisfiedFlagsNoLock()))
(...skipping 21 matching lines...) Expand all
168 base::AutoLock locker(lock_); 183 base::AutoLock locker(lock_);
169 DCHECK(consumer_open_); 184 DCHECK(consumer_open_);
170 consumer_open_ = false; 185 consumer_open_ = false;
171 DCHECK(has_local_consumer_no_lock()); 186 DCHECK(has_local_consumer_no_lock());
172 consumer_waiter_list_.reset(); 187 consumer_waiter_list_.reset();
173 // Not a bug, except possibly in "user" code. 188 // Not a bug, except possibly in "user" code.
174 DVLOG_IF(2, consumer_in_two_phase_read_no_lock()) 189 DVLOG_IF(2, consumer_in_two_phase_read_no_lock())
175 << "Consumer closed with active two-phase read"; 190 << "Consumer closed with active two-phase read";
176 consumer_two_phase_max_num_bytes_read_ = 0; 191 consumer_two_phase_max_num_bytes_read_ = 0;
177 ConsumerCloseImplNoLock(); 192 ConsumerCloseImplNoLock();
193 AwakeProducerWaitersForStateChangeNoLock();
178 } 194 }
179 195
180 MojoResult DataPipe::ConsumerReadData(void* elements, 196 MojoResult DataPipe::ConsumerReadData(void* elements,
181 uint32_t* num_bytes, 197 uint32_t* num_bytes,
182 bool all_or_none) { 198 bool all_or_none) {
183 base::AutoLock locker(lock_); 199 base::AutoLock locker(lock_);
184 DCHECK(has_local_consumer_no_lock()); 200 DCHECK(has_local_consumer_no_lock());
185 201
186 if (consumer_in_two_phase_read_no_lock()) 202 if (consumer_in_two_phase_read_no_lock())
187 return MOJO_RESULT_BUSY; 203 return MOJO_RESULT_BUSY;
188 204
189 if (*num_bytes % element_num_bytes_ != 0) 205 if (*num_bytes % element_num_bytes_ != 0)
190 return MOJO_RESULT_INVALID_ARGUMENT; 206 return MOJO_RESULT_INVALID_ARGUMENT;
191 207
192 if (*num_bytes == 0) 208 if (*num_bytes == 0)
193 return MOJO_RESULT_OK; // Nothing to do. 209 return MOJO_RESULT_OK; // Nothing to do.
194 210
195 return ConsumerReadDataImplNoLock(elements, num_bytes, all_or_none); 211 MojoWaitFlags old_producer_satisfied_flags = ProducerSatisfiedFlagsNoLock();
212 MojoResult rv = ConsumerReadDataImplNoLock(elements, num_bytes, all_or_none);
213 if (ProducerSatisfiedFlagsNoLock() != old_producer_satisfied_flags)
214 AwakeProducerWaitersForStateChangeNoLock();
215 return rv;
196 } 216 }
197 217
198 MojoResult DataPipe::ConsumerDiscardData(uint32_t* num_bytes, 218 MojoResult DataPipe::ConsumerDiscardData(uint32_t* num_bytes,
199 bool all_or_none) { 219 bool all_or_none) {
200 base::AutoLock locker(lock_); 220 base::AutoLock locker(lock_);
201 DCHECK(has_local_consumer_no_lock()); 221 DCHECK(has_local_consumer_no_lock());
202 222
203 if (consumer_in_two_phase_read_no_lock()) 223 if (consumer_in_two_phase_read_no_lock())
204 return MOJO_RESULT_BUSY; 224 return MOJO_RESULT_BUSY;
205 225
206 if (*num_bytes % element_num_bytes_ != 0) 226 if (*num_bytes % element_num_bytes_ != 0)
207 return MOJO_RESULT_INVALID_ARGUMENT; 227 return MOJO_RESULT_INVALID_ARGUMENT;
208 228
209 if (*num_bytes == 0) 229 if (*num_bytes == 0)
210 return MOJO_RESULT_OK; // Nothing to do. 230 return MOJO_RESULT_OK; // Nothing to do.
211 231
212 return ConsumerDiscardDataImplNoLock(num_bytes, all_or_none); 232 MojoWaitFlags old_producer_satisfied_flags = ProducerSatisfiedFlagsNoLock();
233 MojoResult rv = ConsumerDiscardDataImplNoLock(num_bytes, all_or_none);
234 if (ProducerSatisfiedFlagsNoLock() != old_producer_satisfied_flags)
235 AwakeProducerWaitersForStateChangeNoLock();
236 return rv;
213 } 237 }
214 238
215 MojoResult DataPipe::ConsumerQueryData(uint32_t* num_bytes) { 239 MojoResult DataPipe::ConsumerQueryData(uint32_t* num_bytes) {
216 base::AutoLock locker(lock_); 240 base::AutoLock locker(lock_);
217 DCHECK(has_local_consumer_no_lock()); 241 DCHECK(has_local_consumer_no_lock());
218 242
219 if (consumer_in_two_phase_read_no_lock()) 243 if (consumer_in_two_phase_read_no_lock())
220 return MOJO_RESULT_BUSY; 244 return MOJO_RESULT_BUSY;
221 245
222 // Note: Don't need to validate |*num_bytes| for query. 246 // Note: Don't need to validate |*num_bytes| for query.
223 return ConsumerQueryDataImplNoLock(num_bytes); 247 return ConsumerQueryDataImplNoLock(num_bytes);
224 } 248 }
225 249
226 MojoResult DataPipe::ConsumerBeginReadData(const void** buffer, 250 MojoResult DataPipe::ConsumerBeginReadData(const void** buffer,
227 uint32_t* buffer_num_bytes, 251 uint32_t* buffer_num_bytes,
228 bool all_or_none) { 252 bool all_or_none) {
229 base::AutoLock locker(lock_); 253 base::AutoLock locker(lock_);
230 DCHECK(has_local_consumer_no_lock()); 254 DCHECK(has_local_consumer_no_lock());
231 255
232 if (consumer_in_two_phase_read_no_lock()) 256 if (consumer_in_two_phase_read_no_lock())
233 return MOJO_RESULT_BUSY; 257 return MOJO_RESULT_BUSY;
234 258
235 MojoResult rv = ConsumerBeginReadDataImplNoLock(buffer, buffer_num_bytes, 259 MojoResult rv = ConsumerBeginReadDataImplNoLock(buffer, buffer_num_bytes,
236 all_or_none); 260 all_or_none);
237 if (rv != MOJO_RESULT_OK) 261 if (rv != MOJO_RESULT_OK)
238 return rv; 262 return rv;
239
240 DCHECK(consumer_in_two_phase_read_no_lock()); 263 DCHECK(consumer_in_two_phase_read_no_lock());
241 return MOJO_RESULT_OK; 264 return MOJO_RESULT_OK;
242 } 265 }
243 266
244 MojoResult DataPipe::ConsumerEndReadData(uint32_t num_bytes_read) { 267 MojoResult DataPipe::ConsumerEndReadData(uint32_t num_bytes_read) {
245 base::AutoLock locker(lock_); 268 base::AutoLock locker(lock_);
246 DCHECK(has_local_consumer_no_lock()); 269 DCHECK(has_local_consumer_no_lock());
247 270
248 if (!consumer_in_two_phase_read_no_lock()) 271 if (!consumer_in_two_phase_read_no_lock())
249 return MOJO_RESULT_FAILED_PRECONDITION; 272 return MOJO_RESULT_FAILED_PRECONDITION;
250 273
274 MojoWaitFlags old_producer_satisfied_flags = ProducerSatisfiedFlagsNoLock();
251 MojoResult rv = ConsumerEndReadDataImplNoLock(num_bytes_read); 275 MojoResult rv = ConsumerEndReadDataImplNoLock(num_bytes_read);
252 // Two-phase read ended even on failure. 276 // Two-phase read ended even on failure.
253 DCHECK(!consumer_in_two_phase_read_no_lock()); 277 DCHECK(!consumer_in_two_phase_read_no_lock());
278 // If we're now readable, we *became* readable (since we weren't readable
279 // during the two-phase read), so awake consumer waiters.
280 if ((ConsumerSatisfiedFlagsNoLock() & MOJO_WAIT_FLAG_READABLE))
281 AwakeConsumerWaitersForStateChangeNoLock();
282 if (ProducerSatisfiedFlagsNoLock() != old_producer_satisfied_flags)
283 AwakeProducerWaitersForStateChangeNoLock();
254 return rv; 284 return rv;
255 } 285 }
256 286
257 MojoResult DataPipe::ConsumerAddWaiter(Waiter* waiter, 287 MojoResult DataPipe::ConsumerAddWaiter(Waiter* waiter,
258 MojoWaitFlags flags, 288 MojoWaitFlags flags,
259 MojoResult wake_result) { 289 MojoResult wake_result) {
260 base::AutoLock locker(lock_); 290 base::AutoLock locker(lock_);
261 DCHECK(has_local_consumer_no_lock()); 291 DCHECK(has_local_consumer_no_lock());
262 292
263 if ((flags & ConsumerSatisfiedFlagsNoLock())) 293 if ((flags & ConsumerSatisfiedFlagsNoLock()))
(...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after
311 void DataPipe::AwakeConsumerWaitersForStateChangeNoLock() { 341 void DataPipe::AwakeConsumerWaitersForStateChangeNoLock() {
312 lock_.AssertAcquired(); 342 lock_.AssertAcquired();
313 if (!has_local_consumer_no_lock()) 343 if (!has_local_consumer_no_lock())
314 return; 344 return;
315 consumer_waiter_list_->AwakeWaitersForStateChange( 345 consumer_waiter_list_->AwakeWaitersForStateChange(
316 ConsumerSatisfiedFlagsNoLock(), ConsumerSatisfiableFlagsNoLock()); 346 ConsumerSatisfiedFlagsNoLock(), ConsumerSatisfiableFlagsNoLock());
317 } 347 }
318 348
319 } // namespace system 349 } // namespace system
320 } // namespace mojo 350 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/system/data_pipe.h ('k') | mojo/system/local_data_pipe.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698