Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(80)

Side by Side Diff: components/cronet/android/java/src/org/chromium/net/CronetBidirectionalStream.java

Issue 2078353003: [Cronet] Fix BidirectionalStream.flush() (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: self review Created 4 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « no previous file | components/cronet/android/test/javatests/src/org/chromium/net/BidirectionalStreamTest.java » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package org.chromium.net; 5 package org.chromium.net;
6 6
7 import org.chromium.base.Log; 7 import org.chromium.base.Log;
8 import org.chromium.base.VisibleForTesting; 8 import org.chromium.base.VisibleForTesting;
9 import org.chromium.base.annotations.CalledByNative; 9 import org.chromium.base.annotations.CalledByNative;
10 import org.chromium.base.annotations.JNINamespace; 10 import org.chromium.base.annotations.JNINamespace;
(...skipping 325 matching lines...) Expand 10 before | Expand all | Expand 10 after
336 if (!mPendingData.isEmpty()) { 336 if (!mPendingData.isEmpty()) {
337 mFlushData.addAll(mPendingData); 337 mFlushData.addAll(mPendingData);
338 mPendingData.clear(); 338 mPendingData.clear();
339 } 339 }
340 340
341 if (mWriteState == State.WRITING) { 341 if (mWriteState == State.WRITING) {
342 // If there is a write already pending, wait until onWritevCompleted is 342 // If there is a write already pending, wait until onWritevCompleted is
343 // called before pushing data to the native stack. 343 // called before pushing data to the native stack.
344 return; 344 return;
345 } 345 }
346 sendFlushDataLocked();
347 }
348
349 // Helper method to send buffers in mFlushData. Caller needs to acquire
350 // mNativeStreamLock and make sure mWriteState is WAITING_FOR_FLUSH and
351 // mFlushData queue isn't empty.
352 @SuppressWarnings("GuardedByChecker")
353 private void sendFlushDataLocked() {
346 assert mWriteState == State.WAITING_FOR_FLUSH; 354 assert mWriteState == State.WAITING_FOR_FLUSH;
347 int size = mFlushData.size(); 355 int size = mFlushData.size();
348 ByteBuffer[] buffers = new ByteBuffer[size]; 356 ByteBuffer[] buffers = new ByteBuffer[size];
349 int[] positions = new int[size]; 357 int[] positions = new int[size];
350 int[] limits = new int[size]; 358 int[] limits = new int[size];
351 for (int i = 0; i < size; i++) { 359 for (int i = 0; i < size; i++) {
352 ByteBuffer buffer = mFlushData.poll(); 360 ByteBuffer buffer = mFlushData.poll();
353 buffers[i] = buffer; 361 buffers[i] = buffer;
354 positions[i] = buffer.position(); 362 positions[i] = buffer.position();
355 limits[i] = buffer.limit(); 363 limits[i] = buffer.limit();
356 } 364 }
357 assert mFlushData.isEmpty(); 365 assert mFlushData.isEmpty();
358 assert buffers.length >= 1; 366 assert buffers.length >= 1;
359 mWriteState = State.WRITING; 367 mWriteState = State.WRITING;
360 if (!nativeWritevData(mNativeStream, buffers, positions, limits, mEndOfS treamWritten)) { 368 if (!nativeWritevData(mNativeStream, buffers, positions, limits,
369 mEndOfStreamWritten && mPendingData.isEmpty())) {
361 // Still waiting on flush. This is just to have consistent 370 // Still waiting on flush. This is just to have consistent
362 // behavior with the other error cases. 371 // behavior with the other error cases.
363 mWriteState = State.WAITING_FOR_FLUSH; 372 mWriteState = State.WAITING_FOR_FLUSH;
364 throw new IllegalArgumentException("Unable to call native writev."); 373 throw new IllegalArgumentException("Unable to call native writev.");
365 } 374 }
366 } 375 }
367 376
377 /**
378 * Returns a read-only copy of {@code mPendingData} for testing.
379 */
380 @VisibleForTesting
381 public List<ByteBuffer> getPendingDataForTesting() {
382 synchronized (mNativeStreamLock) {
383 List<ByteBuffer> pendingData = new LinkedList<ByteBuffer>();
384 for (ByteBuffer buffer : mPendingData) {
385 pendingData.add(buffer.asReadOnlyBuffer());
386 }
387 return pendingData;
388 }
389 }
390
391 /**
392 * Returns a read-only copy of {@code mFlushData} for testing.
393 */
394 @VisibleForTesting
395 public List<ByteBuffer> getFlushDataForTesting() {
396 synchronized (mNativeStreamLock) {
397 List<ByteBuffer> flushData = new LinkedList<ByteBuffer>();
398 for (ByteBuffer buffer : mFlushData) {
399 flushData.add(buffer.asReadOnlyBuffer());
400 }
401 return flushData;
402 }
403 }
404
368 @Override 405 @Override
369 public void ping(PingCallback callback, Executor executor) { 406 public void ping(PingCallback callback, Executor executor) {
370 // TODO(mef): May be last thing to be implemented on Android. 407 // TODO(mef): May be last thing to be implemented on Android.
371 throw new UnsupportedOperationException("ping is not supported yet."); 408 throw new UnsupportedOperationException("ping is not supported yet.");
372 } 409 }
373 410
374 @Override 411 @Override
375 public void windowUpdate(int windowSizeIncrement) { 412 public void windowUpdate(int windowSizeIncrement) {
376 // TODO(mef): Understand the needs and semantics of this method. 413 // TODO(mef): Understand the needs and semantics of this method.
377 throw new UnsupportedOperationException("windowUpdate is not supported y et."); 414 throw new UnsupportedOperationException("windowUpdate is not supported y et.");
(...skipping 130 matching lines...) Expand 10 before | Expand all | Expand 10 after
508 @SuppressWarnings("unused") 545 @SuppressWarnings("unused")
509 @CalledByNative 546 @CalledByNative
510 private void onWritevCompleted(final ByteBuffer[] byteBuffers, int[] initial Positions, 547 private void onWritevCompleted(final ByteBuffer[] byteBuffers, int[] initial Positions,
511 int[] initialLimits, boolean endOfStream) { 548 int[] initialLimits, boolean endOfStream) {
512 assert byteBuffers.length == initialPositions.length; 549 assert byteBuffers.length == initialPositions.length;
513 assert byteBuffers.length == initialLimits.length; 550 assert byteBuffers.length == initialLimits.length;
514 synchronized (mNativeStreamLock) { 551 synchronized (mNativeStreamLock) {
515 mWriteState = State.WAITING_FOR_FLUSH; 552 mWriteState = State.WAITING_FOR_FLUSH;
516 // Flush if there is anything in the flush queue mFlushData. 553 // Flush if there is anything in the flush queue mFlushData.
517 if (!mFlushData.isEmpty()) { 554 if (!mFlushData.isEmpty()) {
518 flushLocked(); 555 sendFlushDataLocked();
519 } 556 }
520 } 557 }
521 for (int i = 0; i < byteBuffers.length; i++) { 558 for (int i = 0; i < byteBuffers.length; i++) {
522 ByteBuffer buffer = byteBuffers[i]; 559 ByteBuffer buffer = byteBuffers[i];
523 if (buffer.position() != initialPositions[i] || buffer.limit() != in itialLimits[i]) { 560 if (buffer.position() != initialPositions[i] || buffer.limit() != in itialLimits[i]) {
524 failWithException( 561 failWithException(
525 new CronetException("ByteBuffer modified externally duri ng write", null)); 562 new CronetException("ByteBuffer modified externally duri ng write", null));
526 return; 563 return;
527 } 564 }
528 // Current implementation always writes the complete buffer. 565 // Current implementation always writes the complete buffer.
529 buffer.position(buffer.limit()); 566 buffer.position(buffer.limit());
530 postTaskToExecutor(new OnWriteCompletedRunnable(buffer, endOfStream) ); 567 postTaskToExecutor(new OnWriteCompletedRunnable(buffer,
568 // Only set endOfStream flag if this buffer is the last in b yteBuffers.
569 endOfStream && i == byteBuffers.length - 1));
531 } 570 }
532 } 571 }
533 572
534 @SuppressWarnings("unused") 573 @SuppressWarnings("unused")
535 @CalledByNative 574 @CalledByNative
536 private void onResponseTrailersReceived(String[] trailers) { 575 private void onResponseTrailersReceived(String[] trailers) {
537 final UrlResponseInfo.HeaderBlock trailersBlock = 576 final UrlResponseInfo.HeaderBlock trailersBlock =
538 new UrlResponseInfo.HeaderBlock(headersListFromStrings(trailers) ); 577 new UrlResponseInfo.HeaderBlock(headersListFromStrings(trailers) );
539 postTaskToExecutor(new Runnable() { 578 postTaskToExecutor(new Runnable() {
540 public void run() { 579 public void run() {
(...skipping 185 matching lines...) Expand 10 before | Expand all | Expand 10 after
726 private native boolean nativeReadData( 765 private native boolean nativeReadData(
727 long nativePtr, ByteBuffer byteBuffer, int position, int limit); 766 long nativePtr, ByteBuffer byteBuffer, int position, int limit);
728 767
729 @NativeClassQualifiedName("CronetBidirectionalStreamAdapter") 768 @NativeClassQualifiedName("CronetBidirectionalStreamAdapter")
730 private native boolean nativeWritevData(long nativePtr, ByteBuffer[] buffers , int[] positions, 769 private native boolean nativeWritevData(long nativePtr, ByteBuffer[] buffers , int[] positions,
731 int[] limits, boolean endOfStream); 770 int[] limits, boolean endOfStream);
732 771
733 @NativeClassQualifiedName("CronetBidirectionalStreamAdapter") 772 @NativeClassQualifiedName("CronetBidirectionalStreamAdapter")
734 private native void nativeDestroy(long nativePtr, boolean sendOnCanceled); 773 private native void nativeDestroy(long nativePtr, boolean sendOnCanceled);
735 } 774 }
OLDNEW
« no previous file with comments | « no previous file | components/cronet/android/test/javatests/src/org/chromium/net/BidirectionalStreamTest.java » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698