Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(66)

Side by Side Diff: components/cronet/android/java/src/org/chromium/net/CronetBidirectionalStream.java

Issue 2078353003: [Cronet] Fix BidirectionalStream.flush() (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « no previous file | components/cronet/android/test/javatests/src/org/chromium/net/BidirectionalStreamTest.java » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package org.chromium.net; 5 package org.chromium.net;
6 6
7 import org.chromium.base.Log; 7 import org.chromium.base.Log;
8 import org.chromium.base.VisibleForTesting; 8 import org.chromium.base.VisibleForTesting;
9 import org.chromium.base.annotations.CalledByNative; 9 import org.chromium.base.annotations.CalledByNative;
10 import org.chromium.base.annotations.JNINamespace; 10 import org.chromium.base.annotations.JNINamespace;
(...skipping 66 matching lines...) Expand 10 before | Expand all | Expand 10 after
77 private final boolean mDisableAutoFlush; 77 private final boolean mDisableAutoFlush;
78 private final boolean mDelayRequestHeadersUntilFirstFlush; 78 private final boolean mDelayRequestHeadersUntilFirstFlush;
79 79
80 /* 80 /*
81 * Synchronizes access to mNativeStream, mReadState and mWriteState. 81 * Synchronizes access to mNativeStream, mReadState and mWriteState.
82 */ 82 */
83 private final Object mNativeStreamLock = new Object(); 83 private final Object mNativeStreamLock = new Object();
84 84
85 @GuardedBy("mNativeStreamLock") 85 @GuardedBy("mNativeStreamLock")
86 // Pending write data. 86 // Pending write data.
87 private LinkedList<ByteBuffer> mPendingData; 87 private LinkedList<ByteBuffer> mPendingQueue;
mef 2016/06/21 21:12:27 Any particular reason for XyzData -> XyzQueue rena
xunjieli 2016/06/22 14:26:47 I felt "Data" is a bit overloaded. I changed it to
mef 2016/06/24 19:36:30 Yeah, unfortunately Queue is also very generic, an
88 88
89 @GuardedBy("mNativeStreamLock") 89 @GuardedBy("mNativeStreamLock")
90 // Flush data queue that should be pushed to the native stack when the previ ous 90 // Flush data queue that should be pushed to the native stack when the previ ous
91 // nativeWritevData completes. 91 // nativeWritevData completes.
92 private LinkedList<ByteBuffer> mFlushData; 92 private LinkedList<ByteBuffer> mFlushQueue;
93 93
94 @GuardedBy("mNativeStreamLock") 94 @GuardedBy("mNativeStreamLock")
95 // Whether an end-of-stream flag is passed in through write(). 95 // Whether an end-of-stream flag is passed in through write().
96 private boolean mEndOfStreamWritten; 96 private boolean mEndOfStreamWritten;
97 97
98 @GuardedBy("mNativeStreamLock") 98 @GuardedBy("mNativeStreamLock")
99 // Whether request headers have been sent. 99 // Whether request headers have been sent.
100 private boolean mRequestHeadersSent; 100 private boolean mRequestHeadersSent;
101 101
102 /* Native BidirectionalStream object, owned by CronetBidirectionalStream. */ 102 /* Native BidirectionalStream object, owned by CronetBidirectionalStream. */
(...skipping 111 matching lines...) Expand 10 before | Expand all | Expand 10 after
214 boolean disableAutoFlush, boolean delayRequestHeadersUntilNextFlush) { 214 boolean disableAutoFlush, boolean delayRequestHeadersUntilNextFlush) {
215 mRequestContext = requestContext; 215 mRequestContext = requestContext;
216 mInitialUrl = url; 216 mInitialUrl = url;
217 mInitialPriority = convertStreamPriority(priority); 217 mInitialPriority = convertStreamPriority(priority);
218 mCallback = callback; 218 mCallback = callback;
219 mExecutor = executor; 219 mExecutor = executor;
220 mInitialMethod = httpMethod; 220 mInitialMethod = httpMethod;
221 mRequestHeaders = stringsFromHeaderList(requestHeaders); 221 mRequestHeaders = stringsFromHeaderList(requestHeaders);
222 mDisableAutoFlush = disableAutoFlush; 222 mDisableAutoFlush = disableAutoFlush;
223 mDelayRequestHeadersUntilFirstFlush = delayRequestHeadersUntilNextFlush; 223 mDelayRequestHeadersUntilFirstFlush = delayRequestHeadersUntilNextFlush;
224 mPendingData = new LinkedList<>(); 224 mPendingQueue = new LinkedList<>();
225 mFlushData = new LinkedList<>(); 225 mFlushQueue = new LinkedList<>();
226 } 226 }
227 227
228 @Override 228 @Override
229 public void start() { 229 public void start() {
230 synchronized (mNativeStreamLock) { 230 synchronized (mNativeStreamLock) {
231 if (mReadState != State.NOT_STARTED) { 231 if (mReadState != State.NOT_STARTED) {
232 throw new IllegalStateException("Stream is already started."); 232 throw new IllegalStateException("Stream is already started.");
233 } 233 }
234 try { 234 try {
235 mNativeStream = nativeCreateBidirectionalStream( 235 mNativeStream = nativeCreateBidirectionalStream(
(...skipping 51 matching lines...) Expand 10 before | Expand all | Expand 10 after
287 Preconditions.checkDirect(buffer); 287 Preconditions.checkDirect(buffer);
288 if (!buffer.hasRemaining() && !endOfStream) { 288 if (!buffer.hasRemaining() && !endOfStream) {
289 throw new IllegalArgumentException("Empty buffer before end of s tream."); 289 throw new IllegalArgumentException("Empty buffer before end of s tream.");
290 } 290 }
291 if (mEndOfStreamWritten) { 291 if (mEndOfStreamWritten) {
292 throw new IllegalArgumentException("Write after writing end of s tream."); 292 throw new IllegalArgumentException("Write after writing end of s tream.");
293 } 293 }
294 if (isDoneLocked()) { 294 if (isDoneLocked()) {
295 return; 295 return;
296 } 296 }
297 mPendingData.add(buffer); 297 mPendingQueue.add(buffer);
298 if (endOfStream) { 298 if (endOfStream) {
299 mEndOfStreamWritten = true; 299 mEndOfStreamWritten = true;
300 } 300 }
301 if (!mDisableAutoFlush) { 301 if (!mDisableAutoFlush) {
302 flushLocked(); 302 flushLocked();
303 } 303 }
304 } 304 }
305 } 305 }
306 306
307 @Override 307 @Override
308 public void flush() { 308 public void flush() {
309 synchronized (mNativeStreamLock) { 309 synchronized (mNativeStreamLock) {
310 flushLocked(); 310 flushLocked();
311 } 311 }
312 } 312 }
313 313
314 @SuppressWarnings("GuardedByChecker") 314 @SuppressWarnings("GuardedByChecker")
315 private void flushLocked() { 315 private void flushLocked() {
316 if (isDoneLocked() 316 if (isDoneLocked()
317 || (mWriteState != State.WAITING_FOR_FLUSH && mWriteState != Sta te.WRITING)) { 317 || (mWriteState != State.WAITING_FOR_FLUSH && mWriteState != Sta te.WRITING)) {
318 return; 318 return;
319 } 319 }
320 if (mPendingData.isEmpty() && mFlushData.isEmpty()) { 320 if (mPendingQueue.isEmpty() && mFlushQueue.isEmpty()) {
321 // If there is no pending write when flush() is called, see if 321 // If there is no pending write when flush() is called, see if
322 // request headers need to be flushed. 322 // request headers need to be flushed.
323 if (!mRequestHeadersSent) { 323 if (!mRequestHeadersSent) {
324 mRequestHeadersSent = true; 324 mRequestHeadersSent = true;
325 nativeSendRequestHeaders(mNativeStream); 325 nativeSendRequestHeaders(mNativeStream);
326 if (!doesMethodAllowWriteData(mInitialMethod)) { 326 if (!doesMethodAllowWriteData(mInitialMethod)) {
327 mWriteState = State.WRITING_DONE; 327 mWriteState = State.WRITING_DONE;
328 } 328 }
329 } 329 }
330 return; 330 return;
331 } 331 }
332 332
333 assert !mPendingData.isEmpty() || !mFlushData.isEmpty(); 333 assert !mPendingQueue.isEmpty() || !mFlushQueue.isEmpty();
334 334
335 // Move buffers from mPendingData to the flushing queue. 335 // Move buffers from mPendingQueue to the flushing queue.
336 if (!mPendingData.isEmpty()) { 336 if (!mPendingQueue.isEmpty()) {
337 mFlushData.addAll(mPendingData); 337 mFlushQueue.addAll(mPendingQueue);
338 mPendingData.clear(); 338 mPendingQueue.clear();
339 } 339 }
340 340
341 if (mWriteState == State.WRITING) { 341 if (mWriteState == State.WRITING) {
342 // If there is a write already pending, wait until onWritevCompleted is 342 // If there is a write already pending, wait until onWritevCompleted is
343 // called before pushing data to the native stack. 343 // called before pushing data to the native stack.
344 return; 344 return;
345 } 345 }
346 sendFlushDataLocked();
347 }
348
349 // Helper method to send buffers in mFlushQueue. Caller needs to acquire
350 // mNativeStreamLock and make sure mWriteState is WAITING_FOR_FLUSH and
351 // mFlushQueue queue isn't empty.
352 @SuppressWarnings("GuardedByChecker")
353 private void sendFlushDataLocked() {
346 assert mWriteState == State.WAITING_FOR_FLUSH; 354 assert mWriteState == State.WAITING_FOR_FLUSH;
347 int size = mFlushData.size(); 355 int size = mFlushQueue.size();
348 ByteBuffer[] buffers = new ByteBuffer[size]; 356 ByteBuffer[] buffers = new ByteBuffer[size];
349 int[] positions = new int[size]; 357 int[] positions = new int[size];
350 int[] limits = new int[size]; 358 int[] limits = new int[size];
351 for (int i = 0; i < size; i++) { 359 for (int i = 0; i < size; i++) {
352 ByteBuffer buffer = mFlushData.poll(); 360 ByteBuffer buffer = mFlushQueue.poll();
353 buffers[i] = buffer; 361 buffers[i] = buffer;
354 positions[i] = buffer.position(); 362 positions[i] = buffer.position();
355 limits[i] = buffer.limit(); 363 limits[i] = buffer.limit();
356 } 364 }
357 assert mFlushData.isEmpty(); 365 assert mFlushQueue.isEmpty();
358 assert buffers.length >= 1; 366 assert buffers.length >= 1;
359 mWriteState = State.WRITING; 367 mWriteState = State.WRITING;
360 if (!nativeWritevData(mNativeStream, buffers, positions, limits, mEndOfS treamWritten)) { 368 if (!nativeWritevData(mNativeStream, buffers, positions, limits,
369 mEndOfStreamWritten && mPendingQueue.isEmpty())) {
361 // Still waiting on flush. This is just to have consistent 370 // Still waiting on flush. This is just to have consistent
362 // behavior with the other error cases. 371 // behavior with the other error cases.
363 mWriteState = State.WAITING_FOR_FLUSH; 372 mWriteState = State.WAITING_FOR_FLUSH;
364 throw new IllegalArgumentException("Unable to call native writev."); 373 throw new IllegalArgumentException("Unable to call native writev.");
365 } 374 }
366 } 375 }
367 376
377 @VisibleForTesting
378 public int getPendingQueueSizeForTesting() {
379 synchronized (mNativeStreamLock) {
380 return mPendingQueue.size();
381 }
382 }
383
384 @VisibleForTesting
385 public int getFlushQueueSizeForTesting() {
386 synchronized (mNativeStreamLock) {
387 return mFlushQueue.size();
388 }
389 }
390
368 @Override 391 @Override
369 public void ping(PingCallback callback, Executor executor) { 392 public void ping(PingCallback callback, Executor executor) {
370 // TODO(mef): May be last thing to be implemented on Android. 393 // TODO(mef): May be last thing to be implemented on Android.
371 throw new UnsupportedOperationException("ping is not supported yet."); 394 throw new UnsupportedOperationException("ping is not supported yet.");
372 } 395 }
373 396
374 @Override 397 @Override
375 public void windowUpdate(int windowSizeIncrement) { 398 public void windowUpdate(int windowSizeIncrement) {
376 // TODO(mef): Understand the needs and semantics of this method. 399 // TODO(mef): Understand the needs and semantics of this method.
377 throw new UnsupportedOperationException("windowUpdate is not supported y et."); 400 throw new UnsupportedOperationException("windowUpdate is not supported y et.");
(...skipping 128 matching lines...) Expand 10 before | Expand all | Expand 10 after
506 } 529 }
507 530
508 @SuppressWarnings("unused") 531 @SuppressWarnings("unused")
509 @CalledByNative 532 @CalledByNative
510 private void onWritevCompleted(final ByteBuffer[] byteBuffers, int[] initial Positions, 533 private void onWritevCompleted(final ByteBuffer[] byteBuffers, int[] initial Positions,
511 int[] initialLimits, boolean endOfStream) { 534 int[] initialLimits, boolean endOfStream) {
512 assert byteBuffers.length == initialPositions.length; 535 assert byteBuffers.length == initialPositions.length;
513 assert byteBuffers.length == initialLimits.length; 536 assert byteBuffers.length == initialLimits.length;
514 synchronized (mNativeStreamLock) { 537 synchronized (mNativeStreamLock) {
515 mWriteState = State.WAITING_FOR_FLUSH; 538 mWriteState = State.WAITING_FOR_FLUSH;
516 // Flush if there is anything in the flush queue mFlushData. 539 // Flush if there is anything in the flush queue mFlushQueue.
517 if (!mFlushData.isEmpty()) { 540 if (!mFlushQueue.isEmpty()) {
518 flushLocked(); 541 sendFlushDataLocked();
519 } 542 }
520 } 543 }
521 for (int i = 0; i < byteBuffers.length; i++) { 544 for (int i = 0; i < byteBuffers.length; i++) {
522 ByteBuffer buffer = byteBuffers[i]; 545 ByteBuffer buffer = byteBuffers[i];
523 if (buffer.position() != initialPositions[i] || buffer.limit() != in itialLimits[i]) { 546 if (buffer.position() != initialPositions[i] || buffer.limit() != in itialLimits[i]) {
524 failWithException( 547 failWithException(
525 new CronetException("ByteBuffer modified externally duri ng write", null)); 548 new CronetException("ByteBuffer modified externally duri ng write", null));
526 return; 549 return;
527 } 550 }
528 // Current implementation always writes the complete buffer. 551 // Current implementation always writes the complete buffer.
529 buffer.position(buffer.limit()); 552 buffer.position(buffer.limit());
530 postTaskToExecutor(new OnWriteCompletedRunnable(buffer, endOfStream) ); 553 postTaskToExecutor(new OnWriteCompletedRunnable(
554 buffer, endOfStream && i == byteBuffers.length - 1));
mef 2016/06/21 21:12:27 this is a bit hard to understand, maybe it deserve
xunjieli 2016/06/22 14:26:47 Done.
531 } 555 }
532 } 556 }
533 557
534 @SuppressWarnings("unused") 558 @SuppressWarnings("unused")
535 @CalledByNative 559 @CalledByNative
536 private void onResponseTrailersReceived(String[] trailers) { 560 private void onResponseTrailersReceived(String[] trailers) {
537 final UrlResponseInfo.HeaderBlock trailersBlock = 561 final UrlResponseInfo.HeaderBlock trailersBlock =
538 new UrlResponseInfo.HeaderBlock(headersListFromStrings(trailers) ); 562 new UrlResponseInfo.HeaderBlock(headersListFromStrings(trailers) );
539 postTaskToExecutor(new Runnable() { 563 postTaskToExecutor(new Runnable() {
540 public void run() { 564 public void run() {
(...skipping 185 matching lines...) Expand 10 before | Expand all | Expand 10 after
726 private native boolean nativeReadData( 750 private native boolean nativeReadData(
727 long nativePtr, ByteBuffer byteBuffer, int position, int limit); 751 long nativePtr, ByteBuffer byteBuffer, int position, int limit);
728 752
729 @NativeClassQualifiedName("CronetBidirectionalStreamAdapter") 753 @NativeClassQualifiedName("CronetBidirectionalStreamAdapter")
730 private native boolean nativeWritevData(long nativePtr, ByteBuffer[] buffers , int[] positions, 754 private native boolean nativeWritevData(long nativePtr, ByteBuffer[] buffers , int[] positions,
731 int[] limits, boolean endOfStream); 755 int[] limits, boolean endOfStream);
732 756
733 @NativeClassQualifiedName("CronetBidirectionalStreamAdapter") 757 @NativeClassQualifiedName("CronetBidirectionalStreamAdapter")
734 private native void nativeDestroy(long nativePtr, boolean sendOnCanceled); 758 private native void nativeDestroy(long nativePtr, boolean sendOnCanceled);
735 } 759 }
OLDNEW
« no previous file with comments | « no previous file | components/cronet/android/test/javatests/src/org/chromium/net/BidirectionalStreamTest.java » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698