Index: components/cronet/android/java/src/org/chromium/net/CronetBidirectionalStream.java |
diff --git a/components/cronet/android/java/src/org/chromium/net/CronetBidirectionalStream.java b/components/cronet/android/java/src/org/chromium/net/CronetBidirectionalStream.java |
index 4c0fae741dbf49d5534a516ada1703a9e03fb8b0..188e1766ff57082a5a8052d4e229cf31ecd957dc 100644 |
--- a/components/cronet/android/java/src/org/chromium/net/CronetBidirectionalStream.java |
+++ b/components/cronet/android/java/src/org/chromium/net/CronetBidirectionalStream.java |
@@ -14,6 +14,7 @@ import java.nio.ByteBuffer; |
import java.util.AbstractMap; |
import java.util.ArrayList; |
import java.util.Arrays; |
+import java.util.LinkedList; |
import java.util.List; |
import java.util.Map; |
import java.util.concurrent.Executor; |
@@ -54,9 +55,9 @@ class CronetBidirectionalStream extends BidirectionalStream { |
ERROR, |
/* Reading and writing are done, and the stream is closed successfully. */ |
SUCCESS, |
- /* Waiting for {@code write()} to be called. */ |
+ /* Waiting for {@code nativeWritevData()} to be called. */ |
WAITING_FOR_WRITE, |
- /* Writing to the remote, {@code onWriteCompleted()} callback will be called when done. */ |
+ /* Writing to the remote, {@code onWritevCompleted()} callback will be called when done. */ |
WRITING, |
/* There is no more data to write and stream is half-closed by the local side. */ |
WRITING_DONE, |
@@ -69,12 +70,26 @@ class CronetBidirectionalStream extends BidirectionalStream { |
private final int mInitialPriority; |
private final String mInitialMethod; |
private final String mRequestHeaders[]; |
+ private final boolean mDisableAutoFlush; |
/* |
* Synchronizes access to mNativeStream, mReadState and mWriteState. |
*/ |
private final Object mNativeStreamLock = new Object(); |
+ @GuardedBy("mNativeStreamLock") |
+ // Pending write data. |
+ private LinkedList<ByteBuffer> mPendingData; |
+ |
+ @GuardedBy("mNativeStreamLock") |
+ // Flush data queue that should be pushed to the native stack when the previous |
+ // nativeWritevData completes. |
+ private LinkedList<ByteBuffer> mFlushData; |
+ |
+ @GuardedBy("mNativeStreamLock") |
+ // Whether an end-of-stream flag is passed in through write(). |
+ private boolean mEndOfStreamWritten; |
+ |
/* Native BidirectionalStream object, owned by CronetBidirectionalStream. */ |
@GuardedBy("mNativeStreamLock") |
private long mNativeStream; |
@@ -91,28 +106,24 @@ class CronetBidirectionalStream extends BidirectionalStream { |
/** |
* Write state is tracking writing flow. |
- * / <--- WRITING <--- \ |
- * | | |
- * \ / |
+ * / <--- WRITING <--- \ |
+ * | | |
+ * \ / |
* NOT_STARTED -> STARTED --> WAITING_FOR_WRITE -> WRITING_DONE -> SUCCESS |
*/ |
@GuardedBy("mNativeStreamLock") |
private State mWriteState = State.NOT_STARTED; |
+ // Only modified on the network thread. |
private UrlResponseInfo mResponseInfo; |
/* |
* OnReadCompleted callback is repeatedly invoked when each read is completed, so it |
* is cached as a member variable. |
*/ |
+ // Only modified on the network thread. |
private OnReadCompletedRunnable mOnReadCompletedTask; |
- /* |
- * OnWriteCompleted callback is repeatedly invoked when each write is completed, so it |
- * is cached as a member variable. |
- */ |
- private OnWriteCompletedRunnable mOnWriteCompletedTask; |
- |
private Runnable mOnDestroyedCallbackForTesting; |
private final class OnReadCompletedRunnable implements Runnable { |
@@ -127,20 +138,23 @@ class CronetBidirectionalStream extends BidirectionalStream { |
// Null out mByteBuffer, to pass buffer ownership to callback or release if done. |
ByteBuffer buffer = mByteBuffer; |
mByteBuffer = null; |
+ boolean maybeOnSucceeded = false; |
synchronized (mNativeStreamLock) { |
if (isDoneLocked()) { |
return; |
} |
if (mEndOfStream) { |
mReadState = State.READING_DONE; |
- if (maybeSucceedLocked()) { |
- return; |
- } |
+ maybeOnSucceeded = (mWriteState == State.WRITING_DONE); |
} else { |
mReadState = State.WAITING_FOR_READ; |
} |
} |
- mCallback.onReadCompleted(CronetBidirectionalStream.this, mResponseInfo, buffer); |
+ mCallback.onReadCompleted( |
+ CronetBidirectionalStream.this, mResponseInfo, buffer, mEndOfStream); |
+ if (maybeOnSucceeded) { |
+ maybeOnSucceededOnExecutor(); |
+ } |
} catch (Exception e) { |
onCallbackException(e); |
} |
@@ -149,9 +163,14 @@ class CronetBidirectionalStream extends BidirectionalStream { |
private final class OnWriteCompletedRunnable implements Runnable { |
// Buffer passed back from current invocation of onWriteCompleted. |
- ByteBuffer mByteBuffer; |
+ private ByteBuffer mByteBuffer; |
// End of stream flag from current call to write. |
- boolean mEndOfStream; |
+ private final boolean mEndOfStream; |
+ |
+ OnWriteCompletedRunnable(ByteBuffer buffer, boolean endOfStream) { |
+ mByteBuffer = buffer; |
+ mEndOfStream = endOfStream; |
+ } |
@Override |
public void run() { |
@@ -159,20 +178,21 @@ class CronetBidirectionalStream extends BidirectionalStream { |
// Null out mByteBuffer, to pass buffer ownership to callback or release if done. |
ByteBuffer buffer = mByteBuffer; |
mByteBuffer = null; |
+ boolean maybeOnSucceeded = false; |
synchronized (mNativeStreamLock) { |
if (isDoneLocked()) { |
return; |
} |
if (mEndOfStream) { |
mWriteState = State.WRITING_DONE; |
- if (maybeSucceedLocked()) { |
- return; |
- } |
- } else { |
- mWriteState = State.WAITING_FOR_WRITE; |
+ maybeOnSucceeded = (mReadState == State.READING_DONE); |
} |
} |
- mCallback.onWriteCompleted(CronetBidirectionalStream.this, mResponseInfo, buffer); |
+ mCallback.onWriteCompleted( |
+ CronetBidirectionalStream.this, mResponseInfo, buffer, mEndOfStream); |
+ if (maybeOnSucceeded) { |
+ maybeOnSucceededOnExecutor(); |
+ } |
} catch (Exception e) { |
onCallbackException(e); |
} |
@@ -181,7 +201,8 @@ class CronetBidirectionalStream extends BidirectionalStream { |
CronetBidirectionalStream(CronetUrlRequestContext requestContext, String url, |
@BidirectionalStream.Builder.StreamPriority int priority, Callback callback, |
- Executor executor, String httpMethod, List<Map.Entry<String, String>> requestHeaders) { |
+ Executor executor, String httpMethod, List<Map.Entry<String, String>> requestHeaders, |
+ boolean disableAutoFlush) { |
mRequestContext = requestContext; |
mInitialUrl = url; |
mInitialPriority = convertStreamPriority(priority); |
@@ -189,6 +210,9 @@ class CronetBidirectionalStream extends BidirectionalStream { |
mExecutor = executor; |
mInitialMethod = httpMethod; |
mRequestHeaders = stringsFromHeaderList(requestHeaders); |
+ mDisableAutoFlush = disableAutoFlush; |
+ mPendingData = new LinkedList<>(); |
+ mFlushData = new LinkedList<>(); |
} |
@Override |
@@ -199,7 +223,7 @@ class CronetBidirectionalStream extends BidirectionalStream { |
} |
try { |
mNativeStream = nativeCreateBidirectionalStream( |
- mRequestContext.getUrlRequestContextAdapter()); |
+ mRequestContext.getUrlRequestContextAdapter(), mDisableAutoFlush); |
mRequestContext.onRequestStarted(); |
// Non-zero startResult means an argument error. |
int startResult = nativeStart(mNativeStream, mInitialUrl, mInitialPriority, |
@@ -253,28 +277,71 @@ class CronetBidirectionalStream extends BidirectionalStream { |
if (!buffer.hasRemaining() && !endOfStream) { |
throw new IllegalArgumentException("Empty buffer before end of stream."); |
} |
- if (mWriteState != State.WAITING_FOR_WRITE) { |
- throw new IllegalStateException("Unexpected write attempt."); |
+ if (mEndOfStreamWritten) { |
+ throw new IllegalArgumentException("Write after writing end of stream."); |
} |
if (isDoneLocked()) { |
return; |
} |
- if (mOnWriteCompletedTask == null) { |
- mOnWriteCompletedTask = new OnWriteCompletedRunnable(); |
+ mPendingData.add(buffer); |
+ if (endOfStream) { |
+ mEndOfStreamWritten = true; |
} |
- mOnWriteCompletedTask.mEndOfStream = endOfStream; |
- mWriteState = State.WRITING; |
- if (!nativeWriteData( |
- mNativeStream, buffer, buffer.position(), buffer.limit(), endOfStream)) { |
- // Still waiting on write. This is just to have consistent |
- // behavior with the other error cases. |
- mWriteState = State.WAITING_FOR_WRITE; |
- throw new IllegalArgumentException("Unable to call native write"); |
+ if (!mDisableAutoFlush) { |
+ flushLocked(); |
} |
} |
} |
@Override |
+ public void flush() { |
+ synchronized (mNativeStreamLock) { |
+ flushLocked(); |
+ } |
+ } |
+ |
+ @SuppressWarnings("GuardedByChecker") |
+ private void flushLocked() { |
+ if (isDoneLocked()) { |
+ return; |
+ } |
+ if (mPendingData.isEmpty() && mFlushData.isEmpty()) { |
+ // No-op if there is nothing to write. |
+ return; |
+ } |
+ |
+ // Move buffers from mPendingData to the flushing queue. |
+ if (!mPendingData.isEmpty()) { |
+ mFlushData.addAll(mPendingData); |
+ mPendingData.clear(); |
+ } |
+ |
+ if (mWriteState == State.WRITING) { |
+ // If there is a write already pending, wait until onWritevCompleted is |
+ // called before pushing data to the native stack. |
+ return; |
+ } |
+ int size = mFlushData.size(); |
+ ByteBuffer[] buffers = new ByteBuffer[size]; |
+ int[] positions = new int[size]; |
+ int[] limits = new int[size]; |
+ for (int i = 0; i < size; i++) { |
+ ByteBuffer buffer = mFlushData.poll(); |
+ buffers[i] = buffer; |
+ positions[i] = buffer.position(); |
+ limits[i] = buffer.limit(); |
+ } |
+ assert mFlushData.isEmpty(); |
+ mWriteState = State.WRITING; |
+ if (!nativeWritevData(mNativeStream, buffers, positions, limits, mEndOfStreamWritten)) { |
+ // Still waiting on write. This is just to have consistent |
+ // behavior with the other error cases. |
+ mWriteState = State.WAITING_FOR_WRITE; |
+ throw new IllegalArgumentException("Unable to call native writev."); |
+ } |
+ } |
+ |
+ @Override |
public void ping(PingCallback callback, Executor executor) { |
// TODO(mef): May be last thing to be implemented on Android. |
throw new UnsupportedOperationException("ping is not supported yet."); |
@@ -309,9 +376,32 @@ class CronetBidirectionalStream extends BidirectionalStream { |
return mReadState != State.NOT_STARTED && mNativeStream == 0; |
} |
+ /* |
+ * Runs an onSucceeded callback if both Read and Write sides are closed. |
+ */ |
+ private void maybeOnSucceededOnExecutor() { |
+ synchronized (mNativeStreamLock) { |
+ if (isDoneLocked()) { |
+ return; |
+ } |
+ if (!(mWriteState == State.WRITING_DONE && mReadState == State.READING_DONE)) { |
+ return; |
+ } |
+ mReadState = mWriteState = State.SUCCESS; |
+ // Destroy native stream first, so UrlRequestContext could be shut |
+ // down from the listener. |
+ destroyNativeStreamLocked(false); |
+ } |
+ try { |
+ mCallback.onSucceeded(CronetBidirectionalStream.this, mResponseInfo); |
+ } catch (Exception e) { |
+ Log.e(CronetUrlRequestContext.LOG_TAG, "Exception in onSucceeded method", e); |
+ } |
+ } |
+ |
@SuppressWarnings("unused") |
@CalledByNative |
- private void onRequestHeadersSent() { |
+ private void onStreamReady() { |
postTaskToExecutor(new Runnable() { |
public void run() { |
synchronized (mNativeStreamLock) { |
@@ -320,13 +410,14 @@ class CronetBidirectionalStream extends BidirectionalStream { |
} |
if (doesMethodAllowWriteData(mInitialMethod)) { |
mWriteState = State.WAITING_FOR_WRITE; |
+ mReadState = State.WAITING_FOR_READ; |
} else { |
mWriteState = State.WRITING_DONE; |
} |
} |
try { |
- mCallback.onRequestHeadersSent(CronetBidirectionalStream.this); |
+ mCallback.onStreamReady(CronetBidirectionalStream.this); |
} catch (Exception e) { |
onCallbackException(e); |
} |
@@ -391,18 +482,28 @@ class CronetBidirectionalStream extends BidirectionalStream { |
@SuppressWarnings("unused") |
@CalledByNative |
- private void onWriteCompleted( |
- final ByteBuffer byteBuffer, int initialPosition, int initialLimit) { |
- if (byteBuffer.position() != initialPosition || byteBuffer.limit() != initialLimit) { |
- failWithException( |
- new CronetException("ByteBuffer modified externally during write", null)); |
- return; |
+ private void onWritevCompleted(final ByteBuffer[] byteBuffers, int[] initialPositions, |
+ int[] initialLimits, boolean endOfStream) { |
+ assert byteBuffers.length == initialPositions.length; |
+ assert byteBuffers.length == initialLimits.length; |
+ synchronized (mNativeStreamLock) { |
+ mWriteState = State.WAITING_FOR_WRITE; |
+ // Flush if there is anything in the flush queue mFlushData. |
+ if (!mFlushData.isEmpty()) { |
+ flushLocked(); |
+ } |
+ } |
+ for (int i = 0; i < byteBuffers.length; i++) { |
+ ByteBuffer buffer = byteBuffers[i]; |
+ if (buffer.position() != initialPositions[i] || buffer.limit() != initialLimits[i]) { |
+ failWithException( |
+ new CronetException("ByteBuffer modified externally during write", null)); |
+ return; |
+ } |
+ // Current implementation always writes the complete buffer. |
+ buffer.position(buffer.limit()); |
+ postTaskToExecutor(new OnWriteCompletedRunnable(buffer, endOfStream)); |
} |
- // Current implementation always writes the complete buffer. |
- byteBuffer.position(byteBuffer.limit()); |
- assert mOnWriteCompletedTask.mByteBuffer == null; |
- mOnWriteCompletedTask.mByteBuffer = byteBuffer; |
- postTaskToExecutor(mOnWriteCompletedTask); |
} |
@SuppressWarnings("unused") |
@@ -501,38 +602,6 @@ class CronetBidirectionalStream extends BidirectionalStream { |
} |
/** |
- * Checks whether reading and writing are done. |
- * @return false if either reading or writing is not done. If both reading and writing |
- * are done, then posts cleanup task and returns true. |
- */ |
- @GuardedBy("mNativeStreamLock") |
- private boolean maybeSucceedLocked() { |
- if (mReadState != State.READING_DONE || mWriteState != State.WRITING_DONE) { |
- return false; |
- } |
- |
- mReadState = mWriteState = State.SUCCESS; |
- postTaskToExecutor(new Runnable() { |
- public void run() { |
- synchronized (mNativeStreamLock) { |
- if (isDoneLocked()) { |
- return; |
- } |
- // Destroy native stream first, so UrlRequestContext could be shut |
- // down from the listener. |
- destroyNativeStreamLocked(false); |
- } |
- try { |
- mCallback.onSucceeded(CronetBidirectionalStream.this, mResponseInfo); |
- } catch (Exception e) { |
- Log.e(CronetUrlRequestContext.LOG_TAG, "Exception in onSucceeded method", e); |
- } |
- } |
- }); |
- return true; |
- } |
- |
- /** |
* Posts task to application Executor. Used for callbacks |
* and other tasks that should not be executed on network thread. |
*/ |
@@ -578,7 +647,7 @@ class CronetBidirectionalStream extends BidirectionalStream { |
* Fails the stream with an exception. Only called on the Executor. |
*/ |
private void failWithExceptionOnExecutor(CronetException e) { |
- // Do not call into listener if request is complete. |
+ // Do not call into mCallback if request is complete. |
synchronized (mNativeStreamLock) { |
if (isDoneLocked()) { |
return; |
@@ -618,7 +687,8 @@ class CronetBidirectionalStream extends BidirectionalStream { |
} |
// Native methods are implemented in cronet_bidirectional_stream_adapter.cc. |
- private native long nativeCreateBidirectionalStream(long urlRequestContextAdapter); |
+ private native long nativeCreateBidirectionalStream( |
+ long urlRequestContextAdapter, boolean disableAutoFlush); |
@NativeClassQualifiedName("CronetBidirectionalStreamAdapter") |
private native int nativeStart(long nativePtr, String url, int priority, String method, |
@@ -629,8 +699,8 @@ class CronetBidirectionalStream extends BidirectionalStream { |
long nativePtr, ByteBuffer byteBuffer, int position, int limit); |
@NativeClassQualifiedName("CronetBidirectionalStreamAdapter") |
- private native boolean nativeWriteData( |
- long nativePtr, ByteBuffer byteBuffer, int position, int limit, boolean endOfStream); |
+ private native boolean nativeWritevData(long nativePtr, ByteBuffer[] buffers, int[] positions, |
+ int[] limits, boolean endOfStream); |
@NativeClassQualifiedName("CronetBidirectionalStreamAdapter") |
private native void nativeDestroy(long nativePtr, boolean sendOnCanceled); |