Chromium Code Reviews| Index: components/cronet/android/java/src/org/chromium/net/CronetBidirectionalStream.java |
| diff --git a/components/cronet/android/java/src/org/chromium/net/CronetBidirectionalStream.java b/components/cronet/android/java/src/org/chromium/net/CronetBidirectionalStream.java |
| index 5e943043e1d490592c137c0a7cccb463a0ff1b4c..64c3942b9836da37cdf76325e8680ebe1239fe9d 100644 |
| --- a/components/cronet/android/java/src/org/chromium/net/CronetBidirectionalStream.java |
| +++ b/components/cronet/android/java/src/org/chromium/net/CronetBidirectionalStream.java |
| @@ -84,12 +84,12 @@ class CronetBidirectionalStream extends BidirectionalStream { |
| @GuardedBy("mNativeStreamLock") |
| // Pending write data. |
| - private LinkedList<ByteBuffer> mPendingData; |
| + private LinkedList<ByteBuffer> mPendingQueue; |
| @GuardedBy("mNativeStreamLock") |
| // Flush data queue that should be pushed to the native stack when the previous |
| // nativeWritevData completes. |
| - private LinkedList<ByteBuffer> mFlushData; |
| + private LinkedList<ByteBuffer> mFlushQueue; |
| @GuardedBy("mNativeStreamLock") |
| // Whether an end-of-stream flag is passed in through write(). |
| @@ -221,8 +221,8 @@ class CronetBidirectionalStream extends BidirectionalStream { |
| mRequestHeaders = stringsFromHeaderList(requestHeaders); |
| mDisableAutoFlush = disableAutoFlush; |
| mDelayRequestHeadersUntilFirstFlush = delayRequestHeadersUntilNextFlush; |
| - mPendingData = new LinkedList<>(); |
| - mFlushData = new LinkedList<>(); |
| + mPendingQueue = new LinkedList<>(); |
| + mFlushQueue = new LinkedList<>(); |
| } |
| @Override |
| @@ -294,7 +294,7 @@ class CronetBidirectionalStream extends BidirectionalStream { |
| if (isDoneLocked()) { |
| return; |
| } |
| - mPendingData.add(buffer); |
| + mPendingQueue.add(buffer); |
| if (endOfStream) { |
| mEndOfStreamWritten = true; |
| } |
| @@ -317,7 +317,7 @@ class CronetBidirectionalStream extends BidirectionalStream { |
| || (mWriteState != State.WAITING_FOR_FLUSH && mWriteState != State.WRITING)) { |
| return; |
| } |
| - if (mPendingData.isEmpty() && mFlushData.isEmpty()) { |
| + if (mPendingQueue.isEmpty() && mFlushQueue.isEmpty()) { |
| // If there is no pending write when flush() is called, see if |
| // request headers need to be flushed. |
| if (!mRequestHeadersSent) { |
| @@ -330,12 +330,12 @@ class CronetBidirectionalStream extends BidirectionalStream { |
| return; |
| } |
| - assert !mPendingData.isEmpty() || !mFlushData.isEmpty(); |
| + assert !mPendingQueue.isEmpty() || !mFlushQueue.isEmpty(); |
| - // Move buffers from mPendingData to the flushing queue. |
| - if (!mPendingData.isEmpty()) { |
| - mFlushData.addAll(mPendingData); |
| - mPendingData.clear(); |
| + // Move buffers from mPendingQueue to the flushing queue. |
| + if (!mPendingQueue.isEmpty()) { |
| + mFlushQueue.addAll(mPendingQueue); |
| + mPendingQueue.clear(); |
| } |
| if (mWriteState == State.WRITING) { |
| @@ -343,21 +343,30 @@ class CronetBidirectionalStream extends BidirectionalStream { |
| // called before pushing data to the native stack. |
| return; |
| } |
| + sendFlushQueueLocked(); |
| + } |
| + |
| + // Helper method to send buffers in mFlushQueue. Caller needs to acquire |
| + // mNativeStreamLock and make sure mWriteState is WAITING_FOR_FLUSH and |
| + // mFlushQueue queue isn't empty. |
| + @SuppressWarnings("GuardedByChecker") |
| + private void sendFlushQueueLocked() { |
| assert mWriteState == State.WAITING_FOR_FLUSH; |
| - int size = mFlushData.size(); |
| + int size = mFlushQueue.size(); |
| ByteBuffer[] buffers = new ByteBuffer[size]; |
| int[] positions = new int[size]; |
| int[] limits = new int[size]; |
| for (int i = 0; i < size; i++) { |
| - ByteBuffer buffer = mFlushData.poll(); |
| + ByteBuffer buffer = mFlushQueue.poll(); |
| buffers[i] = buffer; |
| positions[i] = buffer.position(); |
| limits[i] = buffer.limit(); |
| } |
| - assert mFlushData.isEmpty(); |
| + assert mFlushQueue.isEmpty(); |
| assert buffers.length >= 1; |
| mWriteState = State.WRITING; |
| - if (!nativeWritevData(mNativeStream, buffers, positions, limits, mEndOfStreamWritten)) { |
| + if (!nativeWritevData(mNativeStream, buffers, positions, limits, |
| + mEndOfStreamWritten && mPendingQueue.isEmpty())) { |
| // Still waiting on flush. This is just to have consistent |
| // behavior with the other error cases. |
| mWriteState = State.WAITING_FOR_FLUSH; |
| @@ -365,6 +374,20 @@ class CronetBidirectionalStream extends BidirectionalStream { |
| } |
| } |
| + @VisibleForTesting |
| + public int getPendingQueueSizeForTesting() { |
| + synchronized (mNativeStreamLock) { |
| + return mPendingQueue.size(); |
| + } |
| + } |
| + |
| + @VisibleForTesting |
| + public int getFlushQueueSizeForTesting() { |
| + synchronized (mNativeStreamLock) { |
| + return mFlushQueue.size(); |
| + } |
| + } |
| + |
| @Override |
| public void ping(PingCallback callback, Executor executor) { |
| // TODO(mef): May be last thing to be implemented on Android. |
| @@ -513,9 +536,9 @@ class CronetBidirectionalStream extends BidirectionalStream { |
| assert byteBuffers.length == initialLimits.length; |
| synchronized (mNativeStreamLock) { |
| mWriteState = State.WAITING_FOR_FLUSH; |
| - // Flush if there is anything in the flush queue mFlushData. |
| - if (!mFlushData.isEmpty()) { |
| - flushLocked(); |
| + // Flush if there is anything in the flush queue mFlushQueue. |
| + if (!mFlushQueue.isEmpty()) { |
| + sendFlushQueueLocked(); |
| } |
| } |
| for (int i = 0; i < byteBuffers.length; i++) { |
| @@ -527,7 +550,9 @@ class CronetBidirectionalStream extends BidirectionalStream { |
| } |
| // Current implementation always writes the complete buffer. |
| buffer.position(buffer.limit()); |
| - postTaskToExecutor(new OnWriteCompletedRunnable(buffer, endOfStream)); |
| + postTaskToExecutor(new OnWriteCompletedRunnable(buffer, |
| + // Only set endOfStream flag if this buffer is the last in byteBuffers |
|
mef
2016/06/24 19:36:30
nit: End comment with period.
xunjieli
2016/06/24 20:48:22
Done.
|
| + endOfStream && i == byteBuffers.length - 1)); |
| } |
| } |