Chromium Code Reviews| Index: components/cronet/android/java/src/org/chromium/net/CronetBidirectionalStream.java |
| diff --git a/components/cronet/android/java/src/org/chromium/net/CronetBidirectionalStream.java b/components/cronet/android/java/src/org/chromium/net/CronetBidirectionalStream.java |
| index 4c0fae741dbf49d5534a516ada1703a9e03fb8b0..a5e684653e93b59369da20eb911633fbbda4e019 100644 |
| --- a/components/cronet/android/java/src/org/chromium/net/CronetBidirectionalStream.java |
| +++ b/components/cronet/android/java/src/org/chromium/net/CronetBidirectionalStream.java |
| @@ -14,6 +14,7 @@ import java.nio.ByteBuffer; |
| import java.util.AbstractMap; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| +import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.Executor; |
| @@ -56,7 +57,10 @@ class CronetBidirectionalStream extends BidirectionalStream { |
| SUCCESS, |
| /* Waiting for {@code write()} to be called. */ |
| WAITING_FOR_WRITE, |
| - /* Writing to the remote, {@code onWriteCompleted()} callback will be called when done. */ |
| + /* |
| + * Writing to the remote, {@code onWriteCompleted()} callback will be called when done. |
| + * This state is only applicable when {@code mDisableAutoFlush} is false. |
| + */ |
| WRITING, |
| /* There is no more data to write and stream is half-closed by the local side. */ |
| WRITING_DONE, |
| @@ -69,12 +73,20 @@ class CronetBidirectionalStream extends BidirectionalStream { |
| private final int mInitialPriority; |
| private final String mInitialMethod; |
| private final String mRequestHeaders[]; |
| + private final boolean mDisableAutoFlush; |
| /* |
| * Synchronizes access to mNativeStream, mReadState and mWriteState. |
| */ |
| private final Object mNativeStreamLock = new Object(); |
| + @GuardedBy("mNativeStreamLock") |
| + private final LinkedList<ByteBuffer> mPendingData; |
| + |
| + @GuardedBy("mNativeStreamLock") |
| + // Whether an end-of-stream flag is passed in through write(). |
| + private boolean mEndOfStreamWritten; |
| + |
| /* Native BidirectionalStream object, owned by CronetBidirectionalStream. */ |
| @GuardedBy("mNativeStreamLock") |
| private long mNativeStream; |
| @@ -91,10 +103,10 @@ class CronetBidirectionalStream extends BidirectionalStream { |
| /** |
| * Write state is tracking writing flow. |
| - * / <--- WRITING <--- \ |
| - * | | |
| - * \ / |
| - * NOT_STARTED -> STARTED --> WAITING_FOR_WRITE -> WRITING_DONE -> SUCCESS |
| + * / <--- WRITING (if !mDisableAutoFlush)<--- \ |
| + * | | |
| + * \ / |
| + * NOT_STARTED -> STARTED --> -------------WAITING_FOR_WRITE --------> WRITING_DONE -> SUCCESS |
| */ |
| @GuardedBy("mNativeStreamLock") |
| private State mWriteState = State.NOT_STARTED; |
| @@ -107,12 +119,6 @@ class CronetBidirectionalStream extends BidirectionalStream { |
| */ |
| private OnReadCompletedRunnable mOnReadCompletedTask; |
| - /* |
| - * OnWriteCompleted callback is repeatedly invoked when each write is completed, so it |
| - * is cached as a member variable. |
| - */ |
| - private OnWriteCompletedRunnable mOnWriteCompletedTask; |
| - |
| private Runnable mOnDestroyedCallbackForTesting; |
| private final class OnReadCompletedRunnable implements Runnable { |
| @@ -165,9 +171,8 @@ class CronetBidirectionalStream extends BidirectionalStream { |
| } |
| if (mEndOfStream) { |
| mWriteState = State.WRITING_DONE; |
| - if (maybeSucceedLocked()) { |
| - return; |
| - } |
| + // Maybe post an onSucceeded callback to be executed after run() completes. |
| + maybeSucceedLocked(); |
| } else { |
| mWriteState = State.WAITING_FOR_WRITE; |
| } |
| @@ -181,7 +186,8 @@ class CronetBidirectionalStream extends BidirectionalStream { |
| CronetBidirectionalStream(CronetUrlRequestContext requestContext, String url, |
| @BidirectionalStream.Builder.StreamPriority int priority, Callback callback, |
| - Executor executor, String httpMethod, List<Map.Entry<String, String>> requestHeaders) { |
| + Executor executor, String httpMethod, List<Map.Entry<String, String>> requestHeaders, |
| + boolean disableAutoFlush) { |
| mRequestContext = requestContext; |
| mInitialUrl = url; |
| mInitialPriority = convertStreamPriority(priority); |
| @@ -189,6 +195,8 @@ class CronetBidirectionalStream extends BidirectionalStream { |
| mExecutor = executor; |
| mInitialMethod = httpMethod; |
| mRequestHeaders = stringsFromHeaderList(requestHeaders); |
| + mDisableAutoFlush = disableAutoFlush; |
| + mPendingData = new LinkedList<ByteBuffer>(); |
| } |
| @Override |
| @@ -199,7 +207,7 @@ class CronetBidirectionalStream extends BidirectionalStream { |
| } |
| try { |
| mNativeStream = nativeCreateBidirectionalStream( |
| - mRequestContext.getUrlRequestContextAdapter()); |
| + mRequestContext.getUrlRequestContextAdapter(), mDisableAutoFlush); |
| mRequestContext.onRequestStarted(); |
| // Non-zero startResult means an argument error. |
| int startResult = nativeStart(mNativeStream, mInitialUrl, mInitialPriority, |
| @@ -253,24 +261,52 @@ class CronetBidirectionalStream extends BidirectionalStream { |
| if (!buffer.hasRemaining() && !endOfStream) { |
| throw new IllegalArgumentException("Empty buffer before end of stream."); |
| } |
| + if (isDoneLocked()) { |
| + return; |
| + } |
| + if (mEndOfStreamWritten) { |
|
kapishnikov
2016/04/12 19:27:39
We should move it ahead of
if (isDoneLocked()) {
xunjieli
2016/04/12 19:59:03
Done.
|
| + throw new IllegalArgumentException("Write after writing end of stream."); |
| + } |
| if (mWriteState != State.WAITING_FOR_WRITE) { |
| throw new IllegalStateException("Unexpected write attempt."); |
| } |
| - if (isDoneLocked()) { |
| - return; |
| + mPendingData.add(buffer); |
| + if (endOfStream) { |
| + mEndOfStreamWritten = true; |
| } |
| - if (mOnWriteCompletedTask == null) { |
| - mOnWriteCompletedTask = new OnWriteCompletedRunnable(); |
| + if (mDisableAutoFlush) { |
| + return; |
| } |
| - mOnWriteCompletedTask.mEndOfStream = endOfStream; |
| mWriteState = State.WRITING; |
| - if (!nativeWriteData( |
| - mNativeStream, buffer, buffer.position(), buffer.limit(), endOfStream)) { |
| - // Still waiting on write. This is just to have consistent |
| - // behavior with the other error cases. |
| - mWriteState = State.WAITING_FOR_WRITE; |
| - throw new IllegalArgumentException("Unable to call native write"); |
| + flushLocked(); |
| + } |
| + } |
| + |
| + @Override |
| + public void flush() { |
| + synchronized (mNativeStreamLock) { |
| + if (!mDisableAutoFlush) { |
| + throw new IllegalStateException("flush is called when auto flush is not disabled."); |
|
kapishnikov
2016/04/12 19:27:39
Should we allow calling flush even when disableAut
xunjieli
2016/04/12 19:59:03
Removing the check makes me a bit uneasy, but I th
|
| } |
| + flushLocked(); |
| + } |
| + } |
| + |
| + @SuppressWarnings("GuardedByChecker") |
| + private void flushLocked() { |
| + int size = mPendingData.size(); |
| + ByteBuffer[] buffers = new ByteBuffer[size]; |
| + int[] positions = new int[size]; |
| + int[] limits = new int[size]; |
| + for (int i = 0; i < size; i++) { |
| + ByteBuffer buffer = mPendingData.poll(); |
| + buffers[i] = buffer; |
| + positions[i] = buffer.position(); |
| + limits[i] = buffer.limit(); |
| + } |
| + assert mPendingData.isEmpty(); |
| + if (!nativeWritevData(mNativeStream, buffers, positions, limits, mEndOfStreamWritten)) { |
| + throw new IllegalArgumentException("Unable to call native write"); |
| } |
| } |
| @@ -311,7 +347,7 @@ class CronetBidirectionalStream extends BidirectionalStream { |
| @SuppressWarnings("unused") |
| @CalledByNative |
| - private void onRequestHeadersSent() { |
| + private void onStreamReady() { |
| postTaskToExecutor(new Runnable() { |
| public void run() { |
| synchronized (mNativeStreamLock) { |
| @@ -326,7 +362,7 @@ class CronetBidirectionalStream extends BidirectionalStream { |
| } |
| try { |
| - mCallback.onRequestHeadersSent(CronetBidirectionalStream.this); |
| + mCallback.onStreamReady(CronetBidirectionalStream.this); |
| } catch (Exception e) { |
| onCallbackException(e); |
| } |
| @@ -391,18 +427,25 @@ class CronetBidirectionalStream extends BidirectionalStream { |
| @SuppressWarnings("unused") |
| @CalledByNative |
| - private void onWriteCompleted( |
| - final ByteBuffer byteBuffer, int initialPosition, int initialLimit) { |
| - if (byteBuffer.position() != initialPosition || byteBuffer.limit() != initialLimit) { |
| - failWithException( |
| - new CronetException("ByteBuffer modified externally during write", null)); |
| - return; |
| + private void onWritevCompleted(final ByteBuffer[] byteBuffers, int[] initialPositions, |
| + int[] initialLimits, boolean endOfStream) { |
| + assert byteBuffers.length == initialPositions.length; |
| + assert byteBuffers.length == initialLimits.length; |
| + assert initialPositions.length == initialLimits.length; |
| + for (int i = 0; i < byteBuffers.length; i++) { |
| + ByteBuffer buffer = byteBuffers[i]; |
| + if (buffer.position() != initialPositions[i] || buffer.limit() != initialLimits[i]) { |
| + failWithException( |
| + new CronetException("ByteBuffer modified externally during write", null)); |
| + return; |
| + } |
| + // Current implementation always writes the complete buffer. |
| + buffer.position(buffer.limit()); |
| + OnWriteCompletedRunnable runnable = new OnWriteCompletedRunnable(); |
| + runnable.mByteBuffer = buffer; |
| + runnable.mEndOfStream = endOfStream; |
| + postTaskToExecutor(runnable); |
| } |
| - // Current implementation always writes the complete buffer. |
| - byteBuffer.position(byteBuffer.limit()); |
| - assert mOnWriteCompletedTask.mByteBuffer == null; |
| - mOnWriteCompletedTask.mByteBuffer = byteBuffer; |
| - postTaskToExecutor(mOnWriteCompletedTask); |
| } |
| @SuppressWarnings("unused") |
| @@ -618,7 +661,8 @@ class CronetBidirectionalStream extends BidirectionalStream { |
| } |
| // Native methods are implemented in cronet_bidirectional_stream_adapter.cc. |
| - private native long nativeCreateBidirectionalStream(long urlRequestContextAdapter); |
| + private native long nativeCreateBidirectionalStream( |
| + long urlRequestContextAdapter, boolean disableAutoFlush); |
| @NativeClassQualifiedName("CronetBidirectionalStreamAdapter") |
| private native int nativeStart(long nativePtr, String url, int priority, String method, |
| @@ -629,8 +673,8 @@ class CronetBidirectionalStream extends BidirectionalStream { |
| long nativePtr, ByteBuffer byteBuffer, int position, int limit); |
| @NativeClassQualifiedName("CronetBidirectionalStreamAdapter") |
| - private native boolean nativeWriteData( |
| - long nativePtr, ByteBuffer byteBuffer, int position, int limit, boolean endOfStream); |
| + private native boolean nativeWritevData(long nativePtr, ByteBuffer[] buffers, int[] positions, |
| + int[] limits, boolean endOfStream); |
| @NativeClassQualifiedName("CronetBidirectionalStreamAdapter") |
| private native void nativeDestroy(long nativePtr, boolean sendOnCanceled); |