Chromium Code Reviews| Index: components/cronet/android/java/src/org/chromium/net/CronetBidirectionalStream.java |
| diff --git a/components/cronet/android/java/src/org/chromium/net/CronetBidirectionalStream.java b/components/cronet/android/java/src/org/chromium/net/CronetBidirectionalStream.java |
| index 4c0fae741dbf49d5534a516ada1703a9e03fb8b0..f5a9ee46b1440971dd56ee1543e15a8fe7498dd6 100644 |
| --- a/components/cronet/android/java/src/org/chromium/net/CronetBidirectionalStream.java |
| +++ b/components/cronet/android/java/src/org/chromium/net/CronetBidirectionalStream.java |
| @@ -14,6 +14,7 @@ import java.nio.ByteBuffer; |
| import java.util.AbstractMap; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| +import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.Executor; |
| @@ -69,12 +70,21 @@ class CronetBidirectionalStream extends BidirectionalStream { |
| private final int mInitialPriority; |
| private final String mInitialMethod; |
| private final String mRequestHeaders[]; |
| + private final boolean mDisableAutoFlush; |
| /* |
| * Synchronizes access to mNativeStream, mReadState and mWriteState. |
| */ |
| private final Object mNativeStreamLock = new Object(); |
| + @GuardedBy("mNativeStreamLock") |
| + // Only relevant if mDisableAutoFlush is true. |
| + private final LinkedList<ByteBuffer> mPendingData; |
| + |
| + @GuardedBy("mNativeStreamLock") |
| + // Only relevant if mDisableAutoFlush is true. |
|
kapishnikov
2016/04/11 23:09:41
Would it be worth to unify the implementation of a
xunjieli
2016/04/12 18:01:48
Done. Although partially. I think we probably shou
|
| + private boolean mEndOfStreamWritten; |
| + |
| /* Native BidirectionalStream object, owned by CronetBidirectionalStream. */ |
| @GuardedBy("mNativeStreamLock") |
| private long mNativeStream; |
| @@ -181,7 +191,8 @@ class CronetBidirectionalStream extends BidirectionalStream { |
| CronetBidirectionalStream(CronetUrlRequestContext requestContext, String url, |
| @BidirectionalStream.Builder.StreamPriority int priority, Callback callback, |
| - Executor executor, String httpMethod, List<Map.Entry<String, String>> requestHeaders) { |
| + Executor executor, String httpMethod, List<Map.Entry<String, String>> requestHeaders, |
| + boolean disableAutoFlush) { |
| mRequestContext = requestContext; |
| mInitialUrl = url; |
| mInitialPriority = convertStreamPriority(priority); |
| @@ -189,6 +200,12 @@ class CronetBidirectionalStream extends BidirectionalStream { |
| mExecutor = executor; |
| mInitialMethod = httpMethod; |
| mRequestHeaders = stringsFromHeaderList(requestHeaders); |
| + mDisableAutoFlush = disableAutoFlush; |
| + if (mDisableAutoFlush) { |
| + mPendingData = new LinkedList<ByteBuffer>(); |
| + } else { |
| + mPendingData = null; |
| + } |
| } |
| @Override |
| @@ -199,7 +216,7 @@ class CronetBidirectionalStream extends BidirectionalStream { |
| } |
| try { |
| mNativeStream = nativeCreateBidirectionalStream( |
| - mRequestContext.getUrlRequestContextAdapter()); |
| + mRequestContext.getUrlRequestContextAdapter(), mDisableAutoFlush); |
| mRequestContext.onRequestStarted(); |
| // Non-zero startResult means an argument error. |
| int startResult = nativeStart(mNativeStream, mInitialUrl, mInitialPriority, |
| @@ -253,16 +270,23 @@ class CronetBidirectionalStream extends BidirectionalStream { |
| if (!buffer.hasRemaining() && !endOfStream) { |
| throw new IllegalArgumentException("Empty buffer before end of stream."); |
| } |
| - if (mWriteState != State.WAITING_FOR_WRITE) { |
| - throw new IllegalStateException("Unexpected write attempt."); |
| - } |
| if (isDoneLocked()) { |
|
kapishnikov
2016/04/11 23:09:42
Should we throw an IllegalStateException if there
xunjieli
2016/04/12 18:01:48
Done.
|
| return; |
| } |
| + |
| + if (mDisableAutoFlush) { |
| + mPendingData.add(buffer); |
| + if (endOfStream) { |
| + mEndOfStreamWritten = true; |
| + } |
| + return; |
| + } |
| + if (mWriteState != State.WAITING_FOR_WRITE) { |
| + throw new IllegalStateException("Unexpected write attempt."); |
| + } |
| if (mOnWriteCompletedTask == null) { |
| mOnWriteCompletedTask = new OnWriteCompletedRunnable(); |
| } |
| - mOnWriteCompletedTask.mEndOfStream = endOfStream; |
| mWriteState = State.WRITING; |
| if (!nativeWriteData( |
| mNativeStream, buffer, buffer.position(), buffer.limit(), endOfStream)) { |
| @@ -275,6 +299,30 @@ class CronetBidirectionalStream extends BidirectionalStream { |
| } |
| @Override |
| + public void flush() { |
| + synchronized (mNativeStreamLock) { |
| + if (!mDisableAutoFlush) { |
| + throw new IllegalStateException("flush is called when auto flush is not disabled."); |
| + } |
| + |
| + int size = mPendingData.size(); |
| + ByteBuffer[] buffers = new ByteBuffer[size]; |
| + int[] positions = new int[size]; |
| + int[] limits = new int[size]; |
| + for (int i = 0; i < size; i++) { |
| + ByteBuffer buffer = mPendingData.poll(); |
| + buffers[i] = buffer; |
| + positions[i] = buffer.position(); |
| + limits[i] = buffer.limit(); |
| + } |
| + assert mPendingData.isEmpty(); |
| + if (!nativeWritevData(mNativeStream, buffers, positions, limits, mEndOfStreamWritten)) { |
| + throw new IllegalArgumentException("Unable to call native write"); |
| + } |
| + } |
| + } |
| + |
| + @Override |
| public void ping(PingCallback callback, Executor executor) { |
| // TODO(mef): May be last thing to be implemented on Android. |
| throw new UnsupportedOperationException("ping is not supported yet."); |
| @@ -311,7 +359,7 @@ class CronetBidirectionalStream extends BidirectionalStream { |
| @SuppressWarnings("unused") |
| @CalledByNative |
| - private void onRequestHeadersSent() { |
| + private void onStreamReady() { |
| postTaskToExecutor(new Runnable() { |
| public void run() { |
| synchronized (mNativeStreamLock) { |
| @@ -326,7 +374,7 @@ class CronetBidirectionalStream extends BidirectionalStream { |
| } |
| try { |
| - mCallback.onRequestHeadersSent(CronetBidirectionalStream.this); |
| + mCallback.onStreamReady(CronetBidirectionalStream.this); |
| } catch (Exception e) { |
| onCallbackException(e); |
| } |
| @@ -391,8 +439,8 @@ class CronetBidirectionalStream extends BidirectionalStream { |
| @SuppressWarnings("unused") |
| @CalledByNative |
| - private void onWriteCompleted( |
| - final ByteBuffer byteBuffer, int initialPosition, int initialLimit) { |
| + private void onWriteCompleted(final ByteBuffer byteBuffer, int initialPosition, |
| + int initialLimit, boolean endOfStream) { |
| if (byteBuffer.position() != initialPosition || byteBuffer.limit() != initialLimit) { |
| failWithException( |
| new CronetException("ByteBuffer modified externally during write", null)); |
| @@ -402,11 +450,35 @@ class CronetBidirectionalStream extends BidirectionalStream { |
| byteBuffer.position(byteBuffer.limit()); |
| assert mOnWriteCompletedTask.mByteBuffer == null; |
| mOnWriteCompletedTask.mByteBuffer = byteBuffer; |
| + mOnWriteCompletedTask.mEndOfStream = endOfStream; |
| postTaskToExecutor(mOnWriteCompletedTask); |
| } |
| @SuppressWarnings("unused") |
| @CalledByNative |
| + private void onWritevCompleted(final ByteBuffer[] byteBuffers, int[] initialPositions, |
|
kapishnikov
2016/04/11 23:09:42
It looks that onWritevCompleted() is called after
xunjieli
2016/04/12 18:01:48
As discussed, we don't have the signal about each
|
| + int[] initialLimits, boolean endOfStream) { |
| + assert byteBuffers.length == initialPositions.length; |
| + assert byteBuffers.length == initialLimits.length; |
| + assert initialPositions.length == initialLimits.length; |
| + for (int i = 0; i < byteBuffers.length; i++) { |
| + ByteBuffer buffer = byteBuffers[i]; |
| + if (buffer.position() != initialPositions[i] || buffer.limit() != initialLimits[i]) { |
| + failWithException( |
| + new CronetException("ByteBuffer modified externally during write", null)); |
| + return; |
| + } |
| + // Current implementation always writes the complete buffer. |
| + buffer.position(buffer.limit()); |
| + OnWriteCompletedRunnable runnable = new OnWriteCompletedRunnable(); |
| + runnable.mByteBuffer = buffer; |
| + runnable.mEndOfStream = endOfStream; |
| + postTaskToExecutor(runnable); |
| + } |
| + } |
| + |
| + @SuppressWarnings("unused") |
| + @CalledByNative |
| private void onResponseTrailersReceived(String[] trailers) { |
| final UrlResponseInfo.HeaderBlock trailersBlock = |
| new UrlResponseInfo.HeaderBlock(headersListFromStrings(trailers)); |
| @@ -618,7 +690,8 @@ class CronetBidirectionalStream extends BidirectionalStream { |
| } |
| // Native methods are implemented in cronet_bidirectional_stream_adapter.cc. |
| - private native long nativeCreateBidirectionalStream(long urlRequestContextAdapter); |
| + private native long nativeCreateBidirectionalStream( |
| + long urlRequestContextAdapter, boolean disableAutoFlush); |
| @NativeClassQualifiedName("CronetBidirectionalStreamAdapter") |
| private native int nativeStart(long nativePtr, String url, int priority, String method, |
| @@ -633,5 +706,9 @@ class CronetBidirectionalStream extends BidirectionalStream { |
| long nativePtr, ByteBuffer byteBuffer, int position, int limit, boolean endOfStream); |
| @NativeClassQualifiedName("CronetBidirectionalStreamAdapter") |
| + private native boolean nativeWritevData(long nativePtr, ByteBuffer[] buffers, int[] positions, |
| + int[] limits, boolean endOfStream); |
| + |
| + @NativeClassQualifiedName("CronetBidirectionalStreamAdapter") |
| private native void nativeDestroy(long nativePtr, boolean sendOnCanceled); |
| } |