Chromium Code Reviews| Index: components/cronet/android/java/src/org/chromium/net/CronetBidirectionalStream.java |
| diff --git a/components/cronet/android/java/src/org/chromium/net/CronetBidirectionalStream.java b/components/cronet/android/java/src/org/chromium/net/CronetBidirectionalStream.java |
| index 4c0fae741dbf49d5534a516ada1703a9e03fb8b0..a779e85bcbeafd7fb62df84a6498c4e2a56d1e23 100644 |
| --- a/components/cronet/android/java/src/org/chromium/net/CronetBidirectionalStream.java |
| +++ b/components/cronet/android/java/src/org/chromium/net/CronetBidirectionalStream.java |
| @@ -14,6 +14,7 @@ import java.nio.ByteBuffer; |
| import java.util.AbstractMap; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| +import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.Executor; |
| @@ -54,9 +55,9 @@ class CronetBidirectionalStream extends BidirectionalStream { |
| ERROR, |
| /* Reading and writing are done, and the stream is closed successfully. */ |
| SUCCESS, |
| - /* Waiting for {@code write()} to be called. */ |
| + /* Waiting for {@code nativeWritevData()} to be called. */ |
| WAITING_FOR_WRITE, |
| - /* Writing to the remote, {@code onWriteCompleted()} callback will be called when done. */ |
| + /* Writing to the remote, {@code onWritevCompleted()} callback will be called when done. */ |
| WRITING, |
| /* There is no more data to write and stream is half-closed by the local side. */ |
| WRITING_DONE, |
| @@ -69,12 +70,26 @@ class CronetBidirectionalStream extends BidirectionalStream { |
| private final int mInitialPriority; |
| private final String mInitialMethod; |
| private final String mRequestHeaders[]; |
| + private final boolean mDisableAutoFlush; |
| /* |
| * Synchronizes access to mNativeStream, mReadState and mWriteState. |
| */ |
| private final Object mNativeStreamLock = new Object(); |
| + @GuardedBy("mNativeStreamLock") |
| + // Pending write data. |
| + private LinkedList<ByteBuffer> mPendingData; |
| + |
| + @GuardedBy("mNativeStreamLock") |
| + // Flush data queue that should be pushed to the native stack when the previous |
| + // nativeWritevData completes. |
| + private LinkedList<ByteBuffer> mFlushData; |
| + |
| + @GuardedBy("mNativeStreamLock") |
| + // Whether an end-of-stream flag is passed in through write(). |
| + private boolean mEndOfStreamWritten; |
| + |
| /* Native BidirectionalStream object, owned by CronetBidirectionalStream. */ |
| @GuardedBy("mNativeStreamLock") |
| private long mNativeStream; |
| @@ -91,28 +106,24 @@ class CronetBidirectionalStream extends BidirectionalStream { |
| /** |
| * Write state is tracking writing flow. |
| - * / <--- WRITING <--- \ |
| - * | | |
| - * \ / |
| + * / <--- WRITING <--- \ |
| + * | | |
| + * \ / |
| * NOT_STARTED -> STARTED --> WAITING_FOR_WRITE -> WRITING_DONE -> SUCCESS |
| */ |
| @GuardedBy("mNativeStreamLock") |
| private State mWriteState = State.NOT_STARTED; |
| + // Only modified on the network thread. |
| private UrlResponseInfo mResponseInfo; |
| /* |
| * OnReadCompleted callback is repeatedly invoked when each read is completed, so it |
| * is cached as a member variable. |
| */ |
| + // Only modified on the network thread. |
| private OnReadCompletedRunnable mOnReadCompletedTask; |
| - /* |
| - * OnWriteCompleted callback is repeatedly invoked when each write is completed, so it |
| - * is cached as a member variable. |
| - */ |
| - private OnWriteCompletedRunnable mOnWriteCompletedTask; |
| - |
| private Runnable mOnDestroyedCallbackForTesting; |
| private final class OnReadCompletedRunnable implements Runnable { |
| @@ -127,20 +138,23 @@ class CronetBidirectionalStream extends BidirectionalStream { |
| // Null out mByteBuffer, to pass buffer ownership to callback or release if done. |
| ByteBuffer buffer = mByteBuffer; |
| mByteBuffer = null; |
| + boolean maybeOnSucceeded = false; |
| synchronized (mNativeStreamLock) { |
| if (isDoneLocked()) { |
| return; |
| } |
| if (mEndOfStream) { |
| mReadState = State.READING_DONE; |
| - if (maybeSucceedLocked()) { |
| - return; |
| - } |
| + maybeOnSucceeded = (mWriteState == State.WRITING_DONE); |
| } else { |
| mReadState = State.WAITING_FOR_READ; |
| } |
| } |
| - mCallback.onReadCompleted(CronetBidirectionalStream.this, mResponseInfo, buffer); |
| + mCallback.onReadCompleted( |
| + CronetBidirectionalStream.this, mResponseInfo, buffer, mEndOfStream); |
| + if (maybeOnSucceeded) { |
| + maybeOnSucceededOnExecutor(); |
| + } |
| } catch (Exception e) { |
| onCallbackException(e); |
| } |
| @@ -149,9 +163,14 @@ class CronetBidirectionalStream extends BidirectionalStream { |
| private final class OnWriteCompletedRunnable implements Runnable { |
| // Buffer passed back from current invocation of onWriteCompleted. |
| - ByteBuffer mByteBuffer; |
| + private ByteBuffer mByteBuffer; |
| // End of stream flag from current call to write. |
| - boolean mEndOfStream; |
| + private final boolean mEndOfStream; |
| + |
| + public OnWriteCompletedRunnable(ByteBuffer buffer, boolean endOfStream) { |
|
kapishnikov
2016/04/20 22:05:42
Don't need to be public.
xunjieli
2016/04/21 14:56:24
Done.
|
| + mByteBuffer = buffer; |
| + mEndOfStream = endOfStream; |
| + } |
| @Override |
| public void run() { |
| @@ -159,20 +178,21 @@ class CronetBidirectionalStream extends BidirectionalStream { |
| // Null out mByteBuffer, to pass buffer ownership to callback or release if done. |
| ByteBuffer buffer = mByteBuffer; |
| mByteBuffer = null; |
| + boolean maybeOnSucceeded = false; |
| synchronized (mNativeStreamLock) { |
| if (isDoneLocked()) { |
| return; |
| } |
| if (mEndOfStream) { |
| mWriteState = State.WRITING_DONE; |
| - if (maybeSucceedLocked()) { |
| - return; |
| - } |
| - } else { |
| - mWriteState = State.WAITING_FOR_WRITE; |
| + maybeOnSucceeded = (mReadState == State.READING_DONE); |
| } |
| } |
| - mCallback.onWriteCompleted(CronetBidirectionalStream.this, mResponseInfo, buffer); |
| + mCallback.onWriteCompleted( |
| + CronetBidirectionalStream.this, mResponseInfo, buffer, mEndOfStream); |
| + if (maybeOnSucceeded) { |
| + maybeOnSucceededOnExecutor(); |
| + } |
| } catch (Exception e) { |
| onCallbackException(e); |
| } |
| @@ -181,7 +201,8 @@ class CronetBidirectionalStream extends BidirectionalStream { |
| CronetBidirectionalStream(CronetUrlRequestContext requestContext, String url, |
| @BidirectionalStream.Builder.StreamPriority int priority, Callback callback, |
| - Executor executor, String httpMethod, List<Map.Entry<String, String>> requestHeaders) { |
| + Executor executor, String httpMethod, List<Map.Entry<String, String>> requestHeaders, |
| + boolean disableAutoFlush) { |
| mRequestContext = requestContext; |
| mInitialUrl = url; |
| mInitialPriority = convertStreamPriority(priority); |
| @@ -189,6 +210,9 @@ class CronetBidirectionalStream extends BidirectionalStream { |
| mExecutor = executor; |
| mInitialMethod = httpMethod; |
| mRequestHeaders = stringsFromHeaderList(requestHeaders); |
| + mDisableAutoFlush = disableAutoFlush; |
| + mPendingData = new LinkedList<ByteBuffer>(); |
|
kapishnikov
2016/04/20 22:05:42
Can be replaced with
mPendingData = new LinkedList
xunjieli
2016/04/21 14:56:24
Done.
|
| + mFlushData = new LinkedList<ByteBuffer>(); |
| } |
| @Override |
| @@ -199,7 +223,7 @@ class CronetBidirectionalStream extends BidirectionalStream { |
| } |
| try { |
| mNativeStream = nativeCreateBidirectionalStream( |
| - mRequestContext.getUrlRequestContextAdapter()); |
| + mRequestContext.getUrlRequestContextAdapter(), mDisableAutoFlush); |
| mRequestContext.onRequestStarted(); |
| // Non-zero startResult means an argument error. |
| int startResult = nativeStart(mNativeStream, mInitialUrl, mInitialPriority, |
| @@ -253,28 +277,68 @@ class CronetBidirectionalStream extends BidirectionalStream { |
| if (!buffer.hasRemaining() && !endOfStream) { |
| throw new IllegalArgumentException("Empty buffer before end of stream."); |
| } |
| - if (mWriteState != State.WAITING_FOR_WRITE) { |
| - throw new IllegalStateException("Unexpected write attempt."); |
| + if (mEndOfStreamWritten) { |
| + throw new IllegalArgumentException("Write after writing end of stream."); |
| } |
| if (isDoneLocked()) { |
| return; |
| } |
| - if (mOnWriteCompletedTask == null) { |
| - mOnWriteCompletedTask = new OnWriteCompletedRunnable(); |
| + mPendingData.add(buffer); |
| + if (endOfStream) { |
| + mEndOfStreamWritten = true; |
| } |
| - mOnWriteCompletedTask.mEndOfStream = endOfStream; |
| - mWriteState = State.WRITING; |
| - if (!nativeWriteData( |
| - mNativeStream, buffer, buffer.position(), buffer.limit(), endOfStream)) { |
| - // Still waiting on write. This is just to have consistent |
| - // behavior with the other error cases. |
| - mWriteState = State.WAITING_FOR_WRITE; |
| - throw new IllegalArgumentException("Unable to call native write"); |
| + if (!mDisableAutoFlush) { |
| + flushLocked(); |
| } |
| } |
| } |
| @Override |
| + public void flush() { |
| + synchronized (mNativeStreamLock) { |
| + flushLocked(); |
| + } |
| + } |
| + |
| + @SuppressWarnings("GuardedByChecker") |
| + private void flushLocked() { |
|
kapishnikov
2016/04/20 22:05:42
Should we call isDoneLocked() here to make sure th
xunjieli
2016/04/21 14:56:24
Done. Good catch! Not checking the adapter is defi
|
| + if (mPendingData.isEmpty() && mFlushData.isEmpty()) { |
| + // No-op if there is nothing to write. |
| + return; |
| + } |
| + |
| + // Move buffers from mPendingData to the flushing queue. |
| + if (!mPendingData.isEmpty()) { |
| + mFlushData.addAll(mPendingData); |
| + mPendingData.clear(); |
| + } |
| + |
| + if (mWriteState == State.WRITING) { |
| + // If there is a write already pending, wait until onWritevCompleted is |
| + // called before pushing data to the native stack. |
| + return; |
| + } |
| + int size = mFlushData.size(); |
| + ByteBuffer[] buffers = new ByteBuffer[size]; |
| + int[] positions = new int[size]; |
| + int[] limits = new int[size]; |
| + for (int i = 0; i < size; i++) { |
| + ByteBuffer buffer = mFlushData.poll(); |
| + buffers[i] = buffer; |
| + positions[i] = buffer.position(); |
| + limits[i] = buffer.limit(); |
| + } |
| + assert mFlushData.isEmpty(); |
| + mWriteState = State.WRITING; |
| + if (!nativeWritevData(mNativeStream, buffers, positions, limits, mEndOfStreamWritten)) { |
| + // Still waiting on write. This is just to have consistent |
| + // behavior with the other error cases. |
| + mWriteState = State.WAITING_FOR_WRITE; |
|
kapishnikov
2016/04/20 22:05:42
Just to double check. Is it correct to change the
xunjieli
2016/04/21 14:56:24
I don't fully understand the comment. I have retai
kapishnikov
2016/04/21 15:24:04
I see. We can address it in another CL. Just two t
xunjieli
2016/04/21 16:10:27
This is a really good point! Although the exceptio
kapishnikov
2016/04/21 17:06:11
Since IllegalArgumentException is unchecked (Runti
xunjieli
2016/04/21 17:16:46
Do we want the client to recover from the error? I
kapishnikov
2016/04/21 19:41:25
Talked to Helen offline. We should revisit the mWr
xunjieli
2016/04/21 19:47:52
Great. Thanks, Andrei. I will file a crbug to addr
|
| + throw new IllegalArgumentException("Unable to call native writev."); |
| + } |
| + } |
| + |
| + @Override |
| public void ping(PingCallback callback, Executor executor) { |
| // TODO(mef): May be last thing to be implemented on Android. |
| throw new UnsupportedOperationException("ping is not supported yet."); |
| @@ -309,9 +373,32 @@ class CronetBidirectionalStream extends BidirectionalStream { |
| return mReadState != State.NOT_STARTED && mNativeStream == 0; |
| } |
| + /* |
| + * Runs an onSucceeded callback if both Read and Write sides are closed. |
| + */ |
| + private void maybeOnSucceededOnExecutor() { |
| + synchronized (mNativeStreamLock) { |
| + if (isDoneLocked()) { |
| + return; |
| + } |
| + if (!(mWriteState == State.WRITING_DONE && mReadState == State.READING_DONE)) { |
| + return; |
| + } |
| + mReadState = mWriteState = State.SUCCESS; |
| + // Destroy native stream first, so UrlRequestContext could be shut |
| + // down from the listener. |
| + destroyNativeStreamLocked(false); |
| + } |
| + try { |
| + mCallback.onSucceeded(CronetBidirectionalStream.this, mResponseInfo); |
| + } catch (Exception e) { |
| + Log.e(CronetUrlRequestContext.LOG_TAG, "Exception in onSucceeded method", e); |
| + } |
| + } |
| + |
| @SuppressWarnings("unused") |
| @CalledByNative |
| - private void onRequestHeadersSent() { |
| + private void onStreamReady() { |
|
kapishnikov
2016/04/20 22:05:42
Should onStreamReady() set mReadState to WAITING_F
xunjieli
2016/04/21 14:56:25
Done. Good point! I added a test as well.
|
| postTaskToExecutor(new Runnable() { |
| public void run() { |
| synchronized (mNativeStreamLock) { |
| @@ -326,7 +413,7 @@ class CronetBidirectionalStream extends BidirectionalStream { |
| } |
| try { |
| - mCallback.onRequestHeadersSent(CronetBidirectionalStream.this); |
| + mCallback.onStreamReady(CronetBidirectionalStream.this); |
| } catch (Exception e) { |
| onCallbackException(e); |
| } |
| @@ -391,18 +478,29 @@ class CronetBidirectionalStream extends BidirectionalStream { |
| @SuppressWarnings("unused") |
| @CalledByNative |
| - private void onWriteCompleted( |
| - final ByteBuffer byteBuffer, int initialPosition, int initialLimit) { |
| - if (byteBuffer.position() != initialPosition || byteBuffer.limit() != initialLimit) { |
| - failWithException( |
| - new CronetException("ByteBuffer modified externally during write", null)); |
| - return; |
| + private void onWritevCompleted(final ByteBuffer[] byteBuffers, int[] initialPositions, |
| + int[] initialLimits, boolean endOfStream) { |
| + assert byteBuffers.length == initialPositions.length; |
| + assert byteBuffers.length == initialLimits.length; |
| + assert initialPositions.length == initialLimits.length; |
|
kapishnikov
2016/04/20 22:05:42
This assert is not required because if both variab
xunjieli
2016/04/21 14:56:24
Done.
|
| + synchronized (mNativeStreamLock) { |
| + mWriteState = State.WAITING_FOR_WRITE; |
| + // Flush if there is anything in the flush queue mFlushData. |
| + if (!mFlushData.isEmpty()) { |
| + flushLocked(); |
| + } |
| + } |
| + for (int i = 0; i < byteBuffers.length; i++) { |
| + ByteBuffer buffer = byteBuffers[i]; |
| + if (buffer.position() != initialPositions[i] || buffer.limit() != initialLimits[i]) { |
| + failWithException( |
| + new CronetException("ByteBuffer modified externally during write", null)); |
| + return; |
| + } |
| + // Current implementation always writes the complete buffer. |
| + buffer.position(buffer.limit()); |
| + postTaskToExecutor(new OnWriteCompletedRunnable(buffer, endOfStream)); |
| } |
| - // Current implementation always writes the complete buffer. |
| - byteBuffer.position(byteBuffer.limit()); |
| - assert mOnWriteCompletedTask.mByteBuffer == null; |
| - mOnWriteCompletedTask.mByteBuffer = byteBuffer; |
| - postTaskToExecutor(mOnWriteCompletedTask); |
| } |
| @SuppressWarnings("unused") |
| @@ -501,38 +599,6 @@ class CronetBidirectionalStream extends BidirectionalStream { |
| } |
| /** |
| - * Checks whether reading and writing are done. |
| - * @return false if either reading or writing is not done. If both reading and writing |
| - * are done, then posts cleanup task and returns true. |
| - */ |
| - @GuardedBy("mNativeStreamLock") |
| - private boolean maybeSucceedLocked() { |
| - if (mReadState != State.READING_DONE || mWriteState != State.WRITING_DONE) { |
| - return false; |
| - } |
| - |
| - mReadState = mWriteState = State.SUCCESS; |
| - postTaskToExecutor(new Runnable() { |
| - public void run() { |
| - synchronized (mNativeStreamLock) { |
| - if (isDoneLocked()) { |
| - return; |
| - } |
| - // Destroy native stream first, so UrlRequestContext could be shut |
| - // down from the listener. |
| - destroyNativeStreamLocked(false); |
| - } |
| - try { |
| - mCallback.onSucceeded(CronetBidirectionalStream.this, mResponseInfo); |
| - } catch (Exception e) { |
| - Log.e(CronetUrlRequestContext.LOG_TAG, "Exception in onSucceeded method", e); |
| - } |
| - } |
| - }); |
| - return true; |
| - } |
| - |
| - /** |
| * Posts task to application Executor. Used for callbacks |
| * and other tasks that should not be executed on network thread. |
| */ |
| @@ -578,7 +644,7 @@ class CronetBidirectionalStream extends BidirectionalStream { |
| * Fails the stream with an exception. Only called on the Executor. |
| */ |
| private void failWithExceptionOnExecutor(CronetException e) { |
| - // Do not call into listener if request is complete. |
| + // Do not call into mCallback if request is complete. |
| synchronized (mNativeStreamLock) { |
| if (isDoneLocked()) { |
| return; |
| @@ -618,7 +684,8 @@ class CronetBidirectionalStream extends BidirectionalStream { |
| } |
| // Native methods are implemented in cronet_bidirectional_stream_adapter.cc. |
| - private native long nativeCreateBidirectionalStream(long urlRequestContextAdapter); |
| + private native long nativeCreateBidirectionalStream( |
| + long urlRequestContextAdapter, boolean disableAutoFlush); |
| @NativeClassQualifiedName("CronetBidirectionalStreamAdapter") |
| private native int nativeStart(long nativePtr, String url, int priority, String method, |
| @@ -629,8 +696,8 @@ class CronetBidirectionalStream extends BidirectionalStream { |
| long nativePtr, ByteBuffer byteBuffer, int position, int limit); |
| @NativeClassQualifiedName("CronetBidirectionalStreamAdapter") |
| - private native boolean nativeWriteData( |
| - long nativePtr, ByteBuffer byteBuffer, int position, int limit, boolean endOfStream); |
| + private native boolean nativeWritevData(long nativePtr, ByteBuffer[] buffers, int[] positions, |
| + int[] limits, boolean endOfStream); |
| @NativeClassQualifiedName("CronetBidirectionalStreamAdapter") |
| private native void nativeDestroy(long nativePtr, boolean sendOnCanceled); |