| Index: components/cronet/android/java/src/org/chromium/net/CronetBidirectionalStream.java
|
| diff --git a/components/cronet/android/java/src/org/chromium/net/CronetBidirectionalStream.java b/components/cronet/android/java/src/org/chromium/net/CronetBidirectionalStream.java
|
| index 4c0fae741dbf49d5534a516ada1703a9e03fb8b0..f5a9ee46b1440971dd56ee1543e15a8fe7498dd6 100644
|
| --- a/components/cronet/android/java/src/org/chromium/net/CronetBidirectionalStream.java
|
| +++ b/components/cronet/android/java/src/org/chromium/net/CronetBidirectionalStream.java
|
| @@ -14,6 +14,7 @@ import java.nio.ByteBuffer;
|
| import java.util.AbstractMap;
|
| import java.util.ArrayList;
|
| import java.util.Arrays;
|
| +import java.util.LinkedList;
|
| import java.util.List;
|
| import java.util.Map;
|
| import java.util.concurrent.Executor;
|
| @@ -69,12 +70,21 @@ class CronetBidirectionalStream extends BidirectionalStream {
|
| private final int mInitialPriority;
|
| private final String mInitialMethod;
|
| private final String mRequestHeaders[];
|
| + private final boolean mDisableAutoFlush;
|
|
|
| /*
|
| * Synchronizes access to mNativeStream, mReadState and mWriteState.
|
| */
|
| private final Object mNativeStreamLock = new Object();
|
|
|
| + @GuardedBy("mNativeStreamLock")
|
| + // Only relevant if mDisableAutoFlush is true.
|
| + private final LinkedList<ByteBuffer> mPendingData;
|
| +
|
| + @GuardedBy("mNativeStreamLock")
|
| + // Only relevant if mDisableAutoFlush is true.
|
| + private boolean mEndOfStreamWritten;
|
| +
|
| /* Native BidirectionalStream object, owned by CronetBidirectionalStream. */
|
| @GuardedBy("mNativeStreamLock")
|
| private long mNativeStream;
|
| @@ -181,7 +191,8 @@ class CronetBidirectionalStream extends BidirectionalStream {
|
|
|
| CronetBidirectionalStream(CronetUrlRequestContext requestContext, String url,
|
| @BidirectionalStream.Builder.StreamPriority int priority, Callback callback,
|
| - Executor executor, String httpMethod, List<Map.Entry<String, String>> requestHeaders) {
|
| + Executor executor, String httpMethod, List<Map.Entry<String, String>> requestHeaders,
|
| + boolean disableAutoFlush) {
|
| mRequestContext = requestContext;
|
| mInitialUrl = url;
|
| mInitialPriority = convertStreamPriority(priority);
|
| @@ -189,6 +200,12 @@ class CronetBidirectionalStream extends BidirectionalStream {
|
| mExecutor = executor;
|
| mInitialMethod = httpMethod;
|
| mRequestHeaders = stringsFromHeaderList(requestHeaders);
|
| + mDisableAutoFlush = disableAutoFlush;
|
| + if (mDisableAutoFlush) {
|
| + mPendingData = new LinkedList<ByteBuffer>();
|
| + } else {
|
| + mPendingData = null;
|
| + }
|
| }
|
|
|
| @Override
|
| @@ -199,7 +216,7 @@ class CronetBidirectionalStream extends BidirectionalStream {
|
| }
|
| try {
|
| mNativeStream = nativeCreateBidirectionalStream(
|
| - mRequestContext.getUrlRequestContextAdapter());
|
| + mRequestContext.getUrlRequestContextAdapter(), mDisableAutoFlush);
|
| mRequestContext.onRequestStarted();
|
| // Non-zero startResult means an argument error.
|
| int startResult = nativeStart(mNativeStream, mInitialUrl, mInitialPriority,
|
| @@ -253,16 +270,23 @@ class CronetBidirectionalStream extends BidirectionalStream {
|
| if (!buffer.hasRemaining() && !endOfStream) {
|
| throw new IllegalArgumentException("Empty buffer before end of stream.");
|
| }
|
| - if (mWriteState != State.WAITING_FOR_WRITE) {
|
| - throw new IllegalStateException("Unexpected write attempt.");
|
| - }
|
| if (isDoneLocked()) {
|
| return;
|
| }
|
| +
|
| + if (mDisableAutoFlush) {
|
| + mPendingData.add(buffer);
|
| + if (endOfStream) {
|
| + mEndOfStreamWritten = true;
|
| + }
|
| + return;
|
| + }
|
| + if (mWriteState != State.WAITING_FOR_WRITE) {
|
| + throw new IllegalStateException("Unexpected write attempt.");
|
| + }
|
| if (mOnWriteCompletedTask == null) {
|
| mOnWriteCompletedTask = new OnWriteCompletedRunnable();
|
| }
|
| - mOnWriteCompletedTask.mEndOfStream = endOfStream;
|
| mWriteState = State.WRITING;
|
| if (!nativeWriteData(
|
| mNativeStream, buffer, buffer.position(), buffer.limit(), endOfStream)) {
|
| @@ -275,6 +299,30 @@ class CronetBidirectionalStream extends BidirectionalStream {
|
| }
|
|
|
| @Override
|
| + public void flush() {
|
| + synchronized (mNativeStreamLock) {
|
| + if (!mDisableAutoFlush) {
|
| + throw new IllegalStateException("flush is called when auto flush is not disabled.");
|
| + }
|
| +
|
| + int size = mPendingData.size();
|
| + ByteBuffer[] buffers = new ByteBuffer[size];
|
| + int[] positions = new int[size];
|
| + int[] limits = new int[size];
|
| + for (int i = 0; i < size; i++) {
|
| + ByteBuffer buffer = mPendingData.poll();
|
| + buffers[i] = buffer;
|
| + positions[i] = buffer.position();
|
| + limits[i] = buffer.limit();
|
| + }
|
| + assert mPendingData.isEmpty();
|
| + if (!nativeWritevData(mNativeStream, buffers, positions, limits, mEndOfStreamWritten)) {
|
| + throw new IllegalArgumentException("Unable to call native write");
|
| + }
|
| + }
|
| + }
|
| +
|
| + @Override
|
| public void ping(PingCallback callback, Executor executor) {
|
| // TODO(mef): May be last thing to be implemented on Android.
|
| throw new UnsupportedOperationException("ping is not supported yet.");
|
| @@ -311,7 +359,7 @@ class CronetBidirectionalStream extends BidirectionalStream {
|
|
|
| @SuppressWarnings("unused")
|
| @CalledByNative
|
| - private void onRequestHeadersSent() {
|
| + private void onStreamReady() {
|
| postTaskToExecutor(new Runnable() {
|
| public void run() {
|
| synchronized (mNativeStreamLock) {
|
| @@ -326,7 +374,7 @@ class CronetBidirectionalStream extends BidirectionalStream {
|
| }
|
|
|
| try {
|
| - mCallback.onRequestHeadersSent(CronetBidirectionalStream.this);
|
| + mCallback.onStreamReady(CronetBidirectionalStream.this);
|
| } catch (Exception e) {
|
| onCallbackException(e);
|
| }
|
| @@ -391,8 +439,8 @@ class CronetBidirectionalStream extends BidirectionalStream {
|
|
|
| @SuppressWarnings("unused")
|
| @CalledByNative
|
| - private void onWriteCompleted(
|
| - final ByteBuffer byteBuffer, int initialPosition, int initialLimit) {
|
| + private void onWriteCompleted(final ByteBuffer byteBuffer, int initialPosition,
|
| + int initialLimit, boolean endOfStream) {
|
| if (byteBuffer.position() != initialPosition || byteBuffer.limit() != initialLimit) {
|
| failWithException(
|
| new CronetException("ByteBuffer modified externally during write", null));
|
| @@ -402,11 +450,35 @@ class CronetBidirectionalStream extends BidirectionalStream {
|
| byteBuffer.position(byteBuffer.limit());
|
| assert mOnWriteCompletedTask.mByteBuffer == null;
|
| mOnWriteCompletedTask.mByteBuffer = byteBuffer;
|
| + mOnWriteCompletedTask.mEndOfStream = endOfStream;
|
| postTaskToExecutor(mOnWriteCompletedTask);
|
| }
|
|
|
| @SuppressWarnings("unused")
|
| @CalledByNative
|
| + private void onWritevCompleted(final ByteBuffer[] byteBuffers, int[] initialPositions,
|
| + int[] initialLimits, boolean endOfStream) {
|
| + assert byteBuffers.length == initialPositions.length;
|
| + assert byteBuffers.length == initialLimits.length;
|
| + assert initialPositions.length == initialLimits.length;
|
| + for (int i = 0; i < byteBuffers.length; i++) {
|
| + ByteBuffer buffer = byteBuffers[i];
|
| + if (buffer.position() != initialPositions[i] || buffer.limit() != initialLimits[i]) {
|
| + failWithException(
|
| + new CronetException("ByteBuffer modified externally during write", null));
|
| + return;
|
| + }
|
| + // Current implementation always writes the complete buffer.
|
| + buffer.position(buffer.limit());
|
| + OnWriteCompletedRunnable runnable = new OnWriteCompletedRunnable();
|
| + runnable.mByteBuffer = buffer;
|
| + runnable.mEndOfStream = endOfStream;
|
| + postTaskToExecutor(runnable);
|
| + }
|
| + }
|
| +
|
| + @SuppressWarnings("unused")
|
| + @CalledByNative
|
| private void onResponseTrailersReceived(String[] trailers) {
|
| final UrlResponseInfo.HeaderBlock trailersBlock =
|
| new UrlResponseInfo.HeaderBlock(headersListFromStrings(trailers));
|
| @@ -618,7 +690,8 @@ class CronetBidirectionalStream extends BidirectionalStream {
|
| }
|
|
|
| // Native methods are implemented in cronet_bidirectional_stream_adapter.cc.
|
| - private native long nativeCreateBidirectionalStream(long urlRequestContextAdapter);
|
| + private native long nativeCreateBidirectionalStream(
|
| + long urlRequestContextAdapter, boolean disableAutoFlush);
|
|
|
| @NativeClassQualifiedName("CronetBidirectionalStreamAdapter")
|
| private native int nativeStart(long nativePtr, String url, int priority, String method,
|
| @@ -633,5 +706,9 @@ class CronetBidirectionalStream extends BidirectionalStream {
|
| long nativePtr, ByteBuffer byteBuffer, int position, int limit, boolean endOfStream);
|
|
|
| @NativeClassQualifiedName("CronetBidirectionalStreamAdapter")
|
| + private native boolean nativeWritevData(long nativePtr, ByteBuffer[] buffers, int[] positions,
|
| + int[] limits, boolean endOfStream);
|
| +
|
| + @NativeClassQualifiedName("CronetBidirectionalStreamAdapter")
|
| private native void nativeDestroy(long nativePtr, boolean sendOnCanceled);
|
| }
|
|
|