Index: components/cronet/android/java/src/org/chromium/net/CronetBidirectionalStream.java |
diff --git a/components/cronet/android/java/src/org/chromium/net/CronetBidirectionalStream.java b/components/cronet/android/java/src/org/chromium/net/CronetBidirectionalStream.java |
index 4c0fae741dbf49d5534a516ada1703a9e03fb8b0..f5a9ee46b1440971dd56ee1543e15a8fe7498dd6 100644 |
--- a/components/cronet/android/java/src/org/chromium/net/CronetBidirectionalStream.java |
+++ b/components/cronet/android/java/src/org/chromium/net/CronetBidirectionalStream.java |
@@ -14,6 +14,7 @@ import java.nio.ByteBuffer; |
import java.util.AbstractMap; |
import java.util.ArrayList; |
import java.util.Arrays; |
+import java.util.LinkedList; |
import java.util.List; |
import java.util.Map; |
import java.util.concurrent.Executor; |
@@ -69,12 +70,21 @@ class CronetBidirectionalStream extends BidirectionalStream { |
private final int mInitialPriority; |
private final String mInitialMethod; |
private final String mRequestHeaders[]; |
+ private final boolean mDisableAutoFlush; |
/* |
* Synchronizes access to mNativeStream, mReadState and mWriteState. |
*/ |
private final Object mNativeStreamLock = new Object(); |
+ @GuardedBy("mNativeStreamLock") |
+ // Only relevant if mDisableAutoFlush is true. |
+ private final LinkedList<ByteBuffer> mPendingData; |
+ |
+ @GuardedBy("mNativeStreamLock") |
+ // Only relevant if mDisableAutoFlush is true. |
kapishnikov
2016/04/11 23:09:41
Would it be worth to unify the implementation of a
xunjieli
2016/04/12 18:01:48
Done. Although partially. I think we probably shou
|
+ private boolean mEndOfStreamWritten; |
+ |
/* Native BidirectionalStream object, owned by CronetBidirectionalStream. */ |
@GuardedBy("mNativeStreamLock") |
private long mNativeStream; |
@@ -181,7 +191,8 @@ class CronetBidirectionalStream extends BidirectionalStream { |
CronetBidirectionalStream(CronetUrlRequestContext requestContext, String url, |
@BidirectionalStream.Builder.StreamPriority int priority, Callback callback, |
- Executor executor, String httpMethod, List<Map.Entry<String, String>> requestHeaders) { |
+ Executor executor, String httpMethod, List<Map.Entry<String, String>> requestHeaders, |
+ boolean disableAutoFlush) { |
mRequestContext = requestContext; |
mInitialUrl = url; |
mInitialPriority = convertStreamPriority(priority); |
@@ -189,6 +200,12 @@ class CronetBidirectionalStream extends BidirectionalStream { |
mExecutor = executor; |
mInitialMethod = httpMethod; |
mRequestHeaders = stringsFromHeaderList(requestHeaders); |
+ mDisableAutoFlush = disableAutoFlush; |
+ if (mDisableAutoFlush) { |
+ mPendingData = new LinkedList<ByteBuffer>(); |
+ } else { |
+ mPendingData = null; |
+ } |
} |
@Override |
@@ -199,7 +216,7 @@ class CronetBidirectionalStream extends BidirectionalStream { |
} |
try { |
mNativeStream = nativeCreateBidirectionalStream( |
- mRequestContext.getUrlRequestContextAdapter()); |
+ mRequestContext.getUrlRequestContextAdapter(), mDisableAutoFlush); |
mRequestContext.onRequestStarted(); |
// Non-zero startResult means an argument error. |
int startResult = nativeStart(mNativeStream, mInitialUrl, mInitialPriority, |
@@ -253,16 +270,23 @@ class CronetBidirectionalStream extends BidirectionalStream { |
if (!buffer.hasRemaining() && !endOfStream) { |
throw new IllegalArgumentException("Empty buffer before end of stream."); |
} |
- if (mWriteState != State.WAITING_FOR_WRITE) { |
- throw new IllegalStateException("Unexpected write attempt."); |
- } |
if (isDoneLocked()) { |
kapishnikov
2016/04/11 23:09:42
Should we throw an IllegalStateException if there
xunjieli
2016/04/12 18:01:48
Done.
|
return; |
} |
+ |
+ if (mDisableAutoFlush) { |
+ mPendingData.add(buffer); |
+ if (endOfStream) { |
+ mEndOfStreamWritten = true; |
+ } |
+ return; |
+ } |
+ if (mWriteState != State.WAITING_FOR_WRITE) { |
+ throw new IllegalStateException("Unexpected write attempt."); |
+ } |
if (mOnWriteCompletedTask == null) { |
mOnWriteCompletedTask = new OnWriteCompletedRunnable(); |
} |
- mOnWriteCompletedTask.mEndOfStream = endOfStream; |
mWriteState = State.WRITING; |
if (!nativeWriteData( |
mNativeStream, buffer, buffer.position(), buffer.limit(), endOfStream)) { |
@@ -275,6 +299,30 @@ class CronetBidirectionalStream extends BidirectionalStream { |
} |
@Override |
+ public void flush() { |
+ synchronized (mNativeStreamLock) { |
+ if (!mDisableAutoFlush) { |
+ throw new IllegalStateException("flush is called when auto flush is not disabled."); |
+ } |
+ |
+ int size = mPendingData.size(); |
+ ByteBuffer[] buffers = new ByteBuffer[size]; |
+ int[] positions = new int[size]; |
+ int[] limits = new int[size]; |
+ for (int i = 0; i < size; i++) { |
+ ByteBuffer buffer = mPendingData.poll(); |
+ buffers[i] = buffer; |
+ positions[i] = buffer.position(); |
+ limits[i] = buffer.limit(); |
+ } |
+ assert mPendingData.isEmpty(); |
+ if (!nativeWritevData(mNativeStream, buffers, positions, limits, mEndOfStreamWritten)) { |
+ throw new IllegalArgumentException("Unable to call native write"); |
+ } |
+ } |
+ } |
+ |
+ @Override |
public void ping(PingCallback callback, Executor executor) { |
// TODO(mef): May be last thing to be implemented on Android. |
throw new UnsupportedOperationException("ping is not supported yet."); |
@@ -311,7 +359,7 @@ class CronetBidirectionalStream extends BidirectionalStream { |
@SuppressWarnings("unused") |
@CalledByNative |
- private void onRequestHeadersSent() { |
+ private void onStreamReady() { |
postTaskToExecutor(new Runnable() { |
public void run() { |
synchronized (mNativeStreamLock) { |
@@ -326,7 +374,7 @@ class CronetBidirectionalStream extends BidirectionalStream { |
} |
try { |
- mCallback.onRequestHeadersSent(CronetBidirectionalStream.this); |
+ mCallback.onStreamReady(CronetBidirectionalStream.this); |
} catch (Exception e) { |
onCallbackException(e); |
} |
@@ -391,8 +439,8 @@ class CronetBidirectionalStream extends BidirectionalStream { |
@SuppressWarnings("unused") |
@CalledByNative |
- private void onWriteCompleted( |
- final ByteBuffer byteBuffer, int initialPosition, int initialLimit) { |
+ private void onWriteCompleted(final ByteBuffer byteBuffer, int initialPosition, |
+ int initialLimit, boolean endOfStream) { |
if (byteBuffer.position() != initialPosition || byteBuffer.limit() != initialLimit) { |
failWithException( |
new CronetException("ByteBuffer modified externally during write", null)); |
@@ -402,11 +450,35 @@ class CronetBidirectionalStream extends BidirectionalStream { |
byteBuffer.position(byteBuffer.limit()); |
assert mOnWriteCompletedTask.mByteBuffer == null; |
mOnWriteCompletedTask.mByteBuffer = byteBuffer; |
+ mOnWriteCompletedTask.mEndOfStream = endOfStream; |
postTaskToExecutor(mOnWriteCompletedTask); |
} |
@SuppressWarnings("unused") |
@CalledByNative |
+ private void onWritevCompleted(final ByteBuffer[] byteBuffers, int[] initialPositions, |
kapishnikov
2016/04/11 23:09:42
It looks that onWritevCompleted() is called after
xunjieli
2016/04/12 18:01:48
As discussed, we don't have the signal about each
|
+ int[] initialLimits, boolean endOfStream) { |
+ assert byteBuffers.length == initialPositions.length; |
+ assert byteBuffers.length == initialLimits.length; |
+ assert initialPositions.length == initialLimits.length; |
+ for (int i = 0; i < byteBuffers.length; i++) { |
+ ByteBuffer buffer = byteBuffers[i]; |
+ if (buffer.position() != initialPositions[i] || buffer.limit() != initialLimits[i]) { |
+ failWithException( |
+ new CronetException("ByteBuffer modified externally during write", null)); |
+ return; |
+ } |
+ // Current implementation always writes the complete buffer. |
+ buffer.position(buffer.limit()); |
+ OnWriteCompletedRunnable runnable = new OnWriteCompletedRunnable(); |
+ runnable.mByteBuffer = buffer; |
+ runnable.mEndOfStream = endOfStream; |
+ postTaskToExecutor(runnable); |
+ } |
+ } |
+ |
+ @SuppressWarnings("unused") |
+ @CalledByNative |
private void onResponseTrailersReceived(String[] trailers) { |
final UrlResponseInfo.HeaderBlock trailersBlock = |
new UrlResponseInfo.HeaderBlock(headersListFromStrings(trailers)); |
@@ -618,7 +690,8 @@ class CronetBidirectionalStream extends BidirectionalStream { |
} |
// Native methods are implemented in cronet_bidirectional_stream_adapter.cc. |
- private native long nativeCreateBidirectionalStream(long urlRequestContextAdapter); |
+ private native long nativeCreateBidirectionalStream( |
+ long urlRequestContextAdapter, boolean disableAutoFlush); |
@NativeClassQualifiedName("CronetBidirectionalStreamAdapter") |
private native int nativeStart(long nativePtr, String url, int priority, String method, |
@@ -633,5 +706,9 @@ class CronetBidirectionalStream extends BidirectionalStream { |
long nativePtr, ByteBuffer byteBuffer, int position, int limit, boolean endOfStream); |
@NativeClassQualifiedName("CronetBidirectionalStreamAdapter") |
+ private native boolean nativeWritevData(long nativePtr, ByteBuffer[] buffers, int[] positions, |
+ int[] limits, boolean endOfStream); |
+ |
+ @NativeClassQualifiedName("CronetBidirectionalStreamAdapter") |
private native void nativeDestroy(long nativePtr, boolean sendOnCanceled); |
} |