| Index: components/cronet/android/java/src/org/chromium/net/CronetBidirectionalStream.java
|
| diff --git a/components/cronet/android/java/src/org/chromium/net/CronetBidirectionalStream.java b/components/cronet/android/java/src/org/chromium/net/CronetBidirectionalStream.java
|
| index 4c0fae741dbf49d5534a516ada1703a9e03fb8b0..188e1766ff57082a5a8052d4e229cf31ecd957dc 100644
|
| --- a/components/cronet/android/java/src/org/chromium/net/CronetBidirectionalStream.java
|
| +++ b/components/cronet/android/java/src/org/chromium/net/CronetBidirectionalStream.java
|
| @@ -14,6 +14,7 @@ import java.nio.ByteBuffer;
|
| import java.util.AbstractMap;
|
| import java.util.ArrayList;
|
| import java.util.Arrays;
|
| +import java.util.LinkedList;
|
| import java.util.List;
|
| import java.util.Map;
|
| import java.util.concurrent.Executor;
|
| @@ -54,9 +55,9 @@ class CronetBidirectionalStream extends BidirectionalStream {
|
| ERROR,
|
| /* Reading and writing are done, and the stream is closed successfully. */
|
| SUCCESS,
|
| - /* Waiting for {@code write()} to be called. */
|
| + /* Waiting for {@code nativeWritevData()} to be called. */
|
| WAITING_FOR_WRITE,
|
| - /* Writing to the remote, {@code onWriteCompleted()} callback will be called when done. */
|
| + /* Writing to the remote, {@code onWritevCompleted()} callback will be called when done. */
|
| WRITING,
|
| /* There is no more data to write and stream is half-closed by the local side. */
|
| WRITING_DONE,
|
| @@ -69,12 +70,26 @@ class CronetBidirectionalStream extends BidirectionalStream {
|
| private final int mInitialPriority;
|
| private final String mInitialMethod;
|
| private final String mRequestHeaders[];
|
| + private final boolean mDisableAutoFlush;
|
|
|
| /*
|
| * Synchronizes access to mNativeStream, mReadState and mWriteState.
|
| */
|
| private final Object mNativeStreamLock = new Object();
|
|
|
| + @GuardedBy("mNativeStreamLock")
|
| + // Pending write data.
|
| + private LinkedList<ByteBuffer> mPendingData;
|
| +
|
| + @GuardedBy("mNativeStreamLock")
|
| + // Flush data queue that should be pushed to the native stack when the previous
|
| + // nativeWritevData completes.
|
| + private LinkedList<ByteBuffer> mFlushData;
|
| +
|
| + @GuardedBy("mNativeStreamLock")
|
| + // Whether an end-of-stream flag is passed in through write().
|
| + private boolean mEndOfStreamWritten;
|
| +
|
| /* Native BidirectionalStream object, owned by CronetBidirectionalStream. */
|
| @GuardedBy("mNativeStreamLock")
|
| private long mNativeStream;
|
| @@ -91,28 +106,24 @@ class CronetBidirectionalStream extends BidirectionalStream {
|
|
|
| /**
|
| * Write state is tracking writing flow.
|
| - * / <--- WRITING <--- \
|
| - * | |
|
| - * \ /
|
| + * / <--- WRITING <--- \
|
| + * | |
|
| + * \ /
|
| * NOT_STARTED -> STARTED --> WAITING_FOR_WRITE -> WRITING_DONE -> SUCCESS
|
| */
|
| @GuardedBy("mNativeStreamLock")
|
| private State mWriteState = State.NOT_STARTED;
|
|
|
| + // Only modified on the network thread.
|
| private UrlResponseInfo mResponseInfo;
|
|
|
| /*
|
| * OnReadCompleted callback is repeatedly invoked when each read is completed, so it
|
| * is cached as a member variable.
|
| */
|
| + // Only modified on the network thread.
|
| private OnReadCompletedRunnable mOnReadCompletedTask;
|
|
|
| - /*
|
| - * OnWriteCompleted callback is repeatedly invoked when each write is completed, so it
|
| - * is cached as a member variable.
|
| - */
|
| - private OnWriteCompletedRunnable mOnWriteCompletedTask;
|
| -
|
| private Runnable mOnDestroyedCallbackForTesting;
|
|
|
| private final class OnReadCompletedRunnable implements Runnable {
|
| @@ -127,20 +138,23 @@ class CronetBidirectionalStream extends BidirectionalStream {
|
| // Null out mByteBuffer, to pass buffer ownership to callback or release if done.
|
| ByteBuffer buffer = mByteBuffer;
|
| mByteBuffer = null;
|
| + boolean maybeOnSucceeded = false;
|
| synchronized (mNativeStreamLock) {
|
| if (isDoneLocked()) {
|
| return;
|
| }
|
| if (mEndOfStream) {
|
| mReadState = State.READING_DONE;
|
| - if (maybeSucceedLocked()) {
|
| - return;
|
| - }
|
| + maybeOnSucceeded = (mWriteState == State.WRITING_DONE);
|
| } else {
|
| mReadState = State.WAITING_FOR_READ;
|
| }
|
| }
|
| - mCallback.onReadCompleted(CronetBidirectionalStream.this, mResponseInfo, buffer);
|
| + mCallback.onReadCompleted(
|
| + CronetBidirectionalStream.this, mResponseInfo, buffer, mEndOfStream);
|
| + if (maybeOnSucceeded) {
|
| + maybeOnSucceededOnExecutor();
|
| + }
|
| } catch (Exception e) {
|
| onCallbackException(e);
|
| }
|
| @@ -149,9 +163,14 @@ class CronetBidirectionalStream extends BidirectionalStream {
|
|
|
| private final class OnWriteCompletedRunnable implements Runnable {
|
| // Buffer passed back from current invocation of onWriteCompleted.
|
| - ByteBuffer mByteBuffer;
|
| + private ByteBuffer mByteBuffer;
|
| // End of stream flag from current call to write.
|
| - boolean mEndOfStream;
|
| + private final boolean mEndOfStream;
|
| +
|
| + OnWriteCompletedRunnable(ByteBuffer buffer, boolean endOfStream) {
|
| + mByteBuffer = buffer;
|
| + mEndOfStream = endOfStream;
|
| + }
|
|
|
| @Override
|
| public void run() {
|
| @@ -159,20 +178,21 @@ class CronetBidirectionalStream extends BidirectionalStream {
|
| // Null out mByteBuffer, to pass buffer ownership to callback or release if done.
|
| ByteBuffer buffer = mByteBuffer;
|
| mByteBuffer = null;
|
| + boolean maybeOnSucceeded = false;
|
| synchronized (mNativeStreamLock) {
|
| if (isDoneLocked()) {
|
| return;
|
| }
|
| if (mEndOfStream) {
|
| mWriteState = State.WRITING_DONE;
|
| - if (maybeSucceedLocked()) {
|
| - return;
|
| - }
|
| - } else {
|
| - mWriteState = State.WAITING_FOR_WRITE;
|
| + maybeOnSucceeded = (mReadState == State.READING_DONE);
|
| }
|
| }
|
| - mCallback.onWriteCompleted(CronetBidirectionalStream.this, mResponseInfo, buffer);
|
| + mCallback.onWriteCompleted(
|
| + CronetBidirectionalStream.this, mResponseInfo, buffer, mEndOfStream);
|
| + if (maybeOnSucceeded) {
|
| + maybeOnSucceededOnExecutor();
|
| + }
|
| } catch (Exception e) {
|
| onCallbackException(e);
|
| }
|
| @@ -181,7 +201,8 @@ class CronetBidirectionalStream extends BidirectionalStream {
|
|
|
| CronetBidirectionalStream(CronetUrlRequestContext requestContext, String url,
|
| @BidirectionalStream.Builder.StreamPriority int priority, Callback callback,
|
| - Executor executor, String httpMethod, List<Map.Entry<String, String>> requestHeaders) {
|
| + Executor executor, String httpMethod, List<Map.Entry<String, String>> requestHeaders,
|
| + boolean disableAutoFlush) {
|
| mRequestContext = requestContext;
|
| mInitialUrl = url;
|
| mInitialPriority = convertStreamPriority(priority);
|
| @@ -189,6 +210,9 @@ class CronetBidirectionalStream extends BidirectionalStream {
|
| mExecutor = executor;
|
| mInitialMethod = httpMethod;
|
| mRequestHeaders = stringsFromHeaderList(requestHeaders);
|
| + mDisableAutoFlush = disableAutoFlush;
|
| + mPendingData = new LinkedList<>();
|
| + mFlushData = new LinkedList<>();
|
| }
|
|
|
| @Override
|
| @@ -199,7 +223,7 @@ class CronetBidirectionalStream extends BidirectionalStream {
|
| }
|
| try {
|
| mNativeStream = nativeCreateBidirectionalStream(
|
| - mRequestContext.getUrlRequestContextAdapter());
|
| + mRequestContext.getUrlRequestContextAdapter(), mDisableAutoFlush);
|
| mRequestContext.onRequestStarted();
|
| // Non-zero startResult means an argument error.
|
| int startResult = nativeStart(mNativeStream, mInitialUrl, mInitialPriority,
|
| @@ -253,28 +277,71 @@ class CronetBidirectionalStream extends BidirectionalStream {
|
| if (!buffer.hasRemaining() && !endOfStream) {
|
| throw new IllegalArgumentException("Empty buffer before end of stream.");
|
| }
|
| - if (mWriteState != State.WAITING_FOR_WRITE) {
|
| - throw new IllegalStateException("Unexpected write attempt.");
|
| + if (mEndOfStreamWritten) {
|
| + throw new IllegalArgumentException("Write after writing end of stream.");
|
| }
|
| if (isDoneLocked()) {
|
| return;
|
| }
|
| - if (mOnWriteCompletedTask == null) {
|
| - mOnWriteCompletedTask = new OnWriteCompletedRunnable();
|
| + mPendingData.add(buffer);
|
| + if (endOfStream) {
|
| + mEndOfStreamWritten = true;
|
| }
|
| - mOnWriteCompletedTask.mEndOfStream = endOfStream;
|
| - mWriteState = State.WRITING;
|
| - if (!nativeWriteData(
|
| - mNativeStream, buffer, buffer.position(), buffer.limit(), endOfStream)) {
|
| - // Still waiting on write. This is just to have consistent
|
| - // behavior with the other error cases.
|
| - mWriteState = State.WAITING_FOR_WRITE;
|
| - throw new IllegalArgumentException("Unable to call native write");
|
| + if (!mDisableAutoFlush) {
|
| + flushLocked();
|
| }
|
| }
|
| }
|
|
|
| @Override
|
| + public void flush() {
|
| + synchronized (mNativeStreamLock) {
|
| + flushLocked();
|
| + }
|
| + }
|
| +
|
| + @SuppressWarnings("GuardedByChecker")
|
| + private void flushLocked() {
|
| + if (isDoneLocked()) {
|
| + return;
|
| + }
|
| + if (mPendingData.isEmpty() && mFlushData.isEmpty()) {
|
| + // No-op if there is nothing to write.
|
| + return;
|
| + }
|
| +
|
| + // Move buffers from mPendingData to the flushing queue.
|
| + if (!mPendingData.isEmpty()) {
|
| + mFlushData.addAll(mPendingData);
|
| + mPendingData.clear();
|
| + }
|
| +
|
| + if (mWriteState == State.WRITING) {
|
| + // If there is a write already pending, wait until onWritevCompleted is
|
| + // called before pushing data to the native stack.
|
| + return;
|
| + }
|
| + int size = mFlushData.size();
|
| + ByteBuffer[] buffers = new ByteBuffer[size];
|
| + int[] positions = new int[size];
|
| + int[] limits = new int[size];
|
| + for (int i = 0; i < size; i++) {
|
| + ByteBuffer buffer = mFlushData.poll();
|
| + buffers[i] = buffer;
|
| + positions[i] = buffer.position();
|
| + limits[i] = buffer.limit();
|
| + }
|
| + assert mFlushData.isEmpty();
|
| + mWriteState = State.WRITING;
|
| + if (!nativeWritevData(mNativeStream, buffers, positions, limits, mEndOfStreamWritten)) {
|
| + // Still waiting on write. This is just to have consistent
|
| + // behavior with the other error cases.
|
| + mWriteState = State.WAITING_FOR_WRITE;
|
| + throw new IllegalArgumentException("Unable to call native writev.");
|
| + }
|
| + }
|
| +
|
| + @Override
|
| public void ping(PingCallback callback, Executor executor) {
|
| // TODO(mef): May be last thing to be implemented on Android.
|
| throw new UnsupportedOperationException("ping is not supported yet.");
|
| @@ -309,9 +376,32 @@ class CronetBidirectionalStream extends BidirectionalStream {
|
| return mReadState != State.NOT_STARTED && mNativeStream == 0;
|
| }
|
|
|
| + /*
|
| + * Runs an onSucceeded callback if both Read and Write sides are closed.
|
| + */
|
| + private void maybeOnSucceededOnExecutor() {
|
| + synchronized (mNativeStreamLock) {
|
| + if (isDoneLocked()) {
|
| + return;
|
| + }
|
| + if (!(mWriteState == State.WRITING_DONE && mReadState == State.READING_DONE)) {
|
| + return;
|
| + }
|
| + mReadState = mWriteState = State.SUCCESS;
|
| + // Destroy native stream first, so UrlRequestContext could be shut
|
| + // down from the listener.
|
| + destroyNativeStreamLocked(false);
|
| + }
|
| + try {
|
| + mCallback.onSucceeded(CronetBidirectionalStream.this, mResponseInfo);
|
| + } catch (Exception e) {
|
| + Log.e(CronetUrlRequestContext.LOG_TAG, "Exception in onSucceeded method", e);
|
| + }
|
| + }
|
| +
|
| @SuppressWarnings("unused")
|
| @CalledByNative
|
| - private void onRequestHeadersSent() {
|
| + private void onStreamReady() {
|
| postTaskToExecutor(new Runnable() {
|
| public void run() {
|
| synchronized (mNativeStreamLock) {
|
| @@ -320,13 +410,14 @@ class CronetBidirectionalStream extends BidirectionalStream {
|
| }
|
| if (doesMethodAllowWriteData(mInitialMethod)) {
|
| mWriteState = State.WAITING_FOR_WRITE;
|
| + mReadState = State.WAITING_FOR_READ;
|
| } else {
|
| mWriteState = State.WRITING_DONE;
|
| }
|
| }
|
|
|
| try {
|
| - mCallback.onRequestHeadersSent(CronetBidirectionalStream.this);
|
| + mCallback.onStreamReady(CronetBidirectionalStream.this);
|
| } catch (Exception e) {
|
| onCallbackException(e);
|
| }
|
| @@ -391,18 +482,28 @@ class CronetBidirectionalStream extends BidirectionalStream {
|
|
|
| @SuppressWarnings("unused")
|
| @CalledByNative
|
| - private void onWriteCompleted(
|
| - final ByteBuffer byteBuffer, int initialPosition, int initialLimit) {
|
| - if (byteBuffer.position() != initialPosition || byteBuffer.limit() != initialLimit) {
|
| - failWithException(
|
| - new CronetException("ByteBuffer modified externally during write", null));
|
| - return;
|
| + private void onWritevCompleted(final ByteBuffer[] byteBuffers, int[] initialPositions,
|
| + int[] initialLimits, boolean endOfStream) {
|
| + assert byteBuffers.length == initialPositions.length;
|
| + assert byteBuffers.length == initialLimits.length;
|
| + synchronized (mNativeStreamLock) {
|
| + mWriteState = State.WAITING_FOR_WRITE;
|
| + // Flush if there is anything in the flush queue mFlushData.
|
| + if (!mFlushData.isEmpty()) {
|
| + flushLocked();
|
| + }
|
| + }
|
| + for (int i = 0; i < byteBuffers.length; i++) {
|
| + ByteBuffer buffer = byteBuffers[i];
|
| + if (buffer.position() != initialPositions[i] || buffer.limit() != initialLimits[i]) {
|
| + failWithException(
|
| + new CronetException("ByteBuffer modified externally during write", null));
|
| + return;
|
| + }
|
| + // Current implementation always writes the complete buffer.
|
| + buffer.position(buffer.limit());
|
| + postTaskToExecutor(new OnWriteCompletedRunnable(buffer, endOfStream));
|
| }
|
| - // Current implementation always writes the complete buffer.
|
| - byteBuffer.position(byteBuffer.limit());
|
| - assert mOnWriteCompletedTask.mByteBuffer == null;
|
| - mOnWriteCompletedTask.mByteBuffer = byteBuffer;
|
| - postTaskToExecutor(mOnWriteCompletedTask);
|
| }
|
|
|
| @SuppressWarnings("unused")
|
| @@ -501,38 +602,6 @@ class CronetBidirectionalStream extends BidirectionalStream {
|
| }
|
|
|
| /**
|
| - * Checks whether reading and writing are done.
|
| - * @return false if either reading or writing is not done. If both reading and writing
|
| - * are done, then posts cleanup task and returns true.
|
| - */
|
| - @GuardedBy("mNativeStreamLock")
|
| - private boolean maybeSucceedLocked() {
|
| - if (mReadState != State.READING_DONE || mWriteState != State.WRITING_DONE) {
|
| - return false;
|
| - }
|
| -
|
| - mReadState = mWriteState = State.SUCCESS;
|
| - postTaskToExecutor(new Runnable() {
|
| - public void run() {
|
| - synchronized (mNativeStreamLock) {
|
| - if (isDoneLocked()) {
|
| - return;
|
| - }
|
| - // Destroy native stream first, so UrlRequestContext could be shut
|
| - // down from the listener.
|
| - destroyNativeStreamLocked(false);
|
| - }
|
| - try {
|
| - mCallback.onSucceeded(CronetBidirectionalStream.this, mResponseInfo);
|
| - } catch (Exception e) {
|
| - Log.e(CronetUrlRequestContext.LOG_TAG, "Exception in onSucceeded method", e);
|
| - }
|
| - }
|
| - });
|
| - return true;
|
| - }
|
| -
|
| - /**
|
| * Posts task to application Executor. Used for callbacks
|
| * and other tasks that should not be executed on network thread.
|
| */
|
| @@ -578,7 +647,7 @@ class CronetBidirectionalStream extends BidirectionalStream {
|
| * Fails the stream with an exception. Only called on the Executor.
|
| */
|
| private void failWithExceptionOnExecutor(CronetException e) {
|
| - // Do not call into listener if request is complete.
|
| + // Do not call into mCallback if request is complete.
|
| synchronized (mNativeStreamLock) {
|
| if (isDoneLocked()) {
|
| return;
|
| @@ -618,7 +687,8 @@ class CronetBidirectionalStream extends BidirectionalStream {
|
| }
|
|
|
| // Native methods are implemented in cronet_bidirectional_stream_adapter.cc.
|
| - private native long nativeCreateBidirectionalStream(long urlRequestContextAdapter);
|
| + private native long nativeCreateBidirectionalStream(
|
| + long urlRequestContextAdapter, boolean disableAutoFlush);
|
|
|
| @NativeClassQualifiedName("CronetBidirectionalStreamAdapter")
|
| private native int nativeStart(long nativePtr, String url, int priority, String method,
|
| @@ -629,8 +699,8 @@ class CronetBidirectionalStream extends BidirectionalStream {
|
| long nativePtr, ByteBuffer byteBuffer, int position, int limit);
|
|
|
| @NativeClassQualifiedName("CronetBidirectionalStreamAdapter")
|
| - private native boolean nativeWriteData(
|
| - long nativePtr, ByteBuffer byteBuffer, int position, int limit, boolean endOfStream);
|
| + private native boolean nativeWritevData(long nativePtr, ByteBuffer[] buffers, int[] positions,
|
| + int[] limits, boolean endOfStream);
|
|
|
| @NativeClassQualifiedName("CronetBidirectionalStreamAdapter")
|
| private native void nativeDestroy(long nativePtr, boolean sendOnCanceled);
|
|
|