Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(2787)

Unified Diff: components/cronet/android/java/src/org/chromium/net/CronetBidirectionalStream.java

Issue 1856073002: Coalesce small buffers in net::BidirectionalStream (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Fix javadoc Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: components/cronet/android/java/src/org/chromium/net/CronetBidirectionalStream.java
diff --git a/components/cronet/android/java/src/org/chromium/net/CronetBidirectionalStream.java b/components/cronet/android/java/src/org/chromium/net/CronetBidirectionalStream.java
index 4c0fae741dbf49d5534a516ada1703a9e03fb8b0..188e1766ff57082a5a8052d4e229cf31ecd957dc 100644
--- a/components/cronet/android/java/src/org/chromium/net/CronetBidirectionalStream.java
+++ b/components/cronet/android/java/src/org/chromium/net/CronetBidirectionalStream.java
@@ -14,6 +14,7 @@ import java.nio.ByteBuffer;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
@@ -54,9 +55,9 @@ class CronetBidirectionalStream extends BidirectionalStream {
ERROR,
/* Reading and writing are done, and the stream is closed successfully. */
SUCCESS,
- /* Waiting for {@code write()} to be called. */
+ /* Waiting for {@code nativeWritevData()} to be called. */
WAITING_FOR_WRITE,
- /* Writing to the remote, {@code onWriteCompleted()} callback will be called when done. */
+ /* Writing to the remote, {@code onWritevCompleted()} callback will be called when done. */
WRITING,
/* There is no more data to write and stream is half-closed by the local side. */
WRITING_DONE,
@@ -69,12 +70,26 @@ class CronetBidirectionalStream extends BidirectionalStream {
private final int mInitialPriority;
private final String mInitialMethod;
private final String mRequestHeaders[];
+ private final boolean mDisableAutoFlush;
/*
* Synchronizes access to mNativeStream, mReadState and mWriteState.
*/
private final Object mNativeStreamLock = new Object();
+ @GuardedBy("mNativeStreamLock")
+ // Pending write data.
+ private LinkedList<ByteBuffer> mPendingData;
+
+ @GuardedBy("mNativeStreamLock")
+ // Flush data queue that should be pushed to the native stack when the previous
+ // nativeWritevData completes.
+ private LinkedList<ByteBuffer> mFlushData;
+
+ @GuardedBy("mNativeStreamLock")
+ // Whether an end-of-stream flag is passed in through write().
+ private boolean mEndOfStreamWritten;
+
/* Native BidirectionalStream object, owned by CronetBidirectionalStream. */
@GuardedBy("mNativeStreamLock")
private long mNativeStream;
@@ -91,28 +106,24 @@ class CronetBidirectionalStream extends BidirectionalStream {
/**
* Write state is tracking writing flow.
- * / <--- WRITING <--- \
- * | |
- * \ /
+ * / <--- WRITING <--- \
+ * | |
+ * \ /
* NOT_STARTED -> STARTED --> WAITING_FOR_WRITE -> WRITING_DONE -> SUCCESS
*/
@GuardedBy("mNativeStreamLock")
private State mWriteState = State.NOT_STARTED;
+ // Only modified on the network thread.
private UrlResponseInfo mResponseInfo;
/*
* OnReadCompleted callback is repeatedly invoked when each read is completed, so it
* is cached as a member variable.
*/
+ // Only modified on the network thread.
private OnReadCompletedRunnable mOnReadCompletedTask;
- /*
- * OnWriteCompleted callback is repeatedly invoked when each write is completed, so it
- * is cached as a member variable.
- */
- private OnWriteCompletedRunnable mOnWriteCompletedTask;
-
private Runnable mOnDestroyedCallbackForTesting;
private final class OnReadCompletedRunnable implements Runnable {
@@ -127,20 +138,23 @@ class CronetBidirectionalStream extends BidirectionalStream {
// Null out mByteBuffer, to pass buffer ownership to callback or release if done.
ByteBuffer buffer = mByteBuffer;
mByteBuffer = null;
+ boolean maybeOnSucceeded = false;
synchronized (mNativeStreamLock) {
if (isDoneLocked()) {
return;
}
if (mEndOfStream) {
mReadState = State.READING_DONE;
- if (maybeSucceedLocked()) {
- return;
- }
+ maybeOnSucceeded = (mWriteState == State.WRITING_DONE);
} else {
mReadState = State.WAITING_FOR_READ;
}
}
- mCallback.onReadCompleted(CronetBidirectionalStream.this, mResponseInfo, buffer);
+ mCallback.onReadCompleted(
+ CronetBidirectionalStream.this, mResponseInfo, buffer, mEndOfStream);
+ if (maybeOnSucceeded) {
+ maybeOnSucceededOnExecutor();
+ }
} catch (Exception e) {
onCallbackException(e);
}
@@ -149,9 +163,14 @@ class CronetBidirectionalStream extends BidirectionalStream {
private final class OnWriteCompletedRunnable implements Runnable {
// Buffer passed back from current invocation of onWriteCompleted.
- ByteBuffer mByteBuffer;
+ private ByteBuffer mByteBuffer;
// End of stream flag from current call to write.
- boolean mEndOfStream;
+ private final boolean mEndOfStream;
+
+ OnWriteCompletedRunnable(ByteBuffer buffer, boolean endOfStream) {
+ mByteBuffer = buffer;
+ mEndOfStream = endOfStream;
+ }
@Override
public void run() {
@@ -159,20 +178,21 @@ class CronetBidirectionalStream extends BidirectionalStream {
// Null out mByteBuffer, to pass buffer ownership to callback or release if done.
ByteBuffer buffer = mByteBuffer;
mByteBuffer = null;
+ boolean maybeOnSucceeded = false;
synchronized (mNativeStreamLock) {
if (isDoneLocked()) {
return;
}
if (mEndOfStream) {
mWriteState = State.WRITING_DONE;
- if (maybeSucceedLocked()) {
- return;
- }
- } else {
- mWriteState = State.WAITING_FOR_WRITE;
+ maybeOnSucceeded = (mReadState == State.READING_DONE);
}
}
- mCallback.onWriteCompleted(CronetBidirectionalStream.this, mResponseInfo, buffer);
+ mCallback.onWriteCompleted(
+ CronetBidirectionalStream.this, mResponseInfo, buffer, mEndOfStream);
+ if (maybeOnSucceeded) {
+ maybeOnSucceededOnExecutor();
+ }
} catch (Exception e) {
onCallbackException(e);
}
@@ -181,7 +201,8 @@ class CronetBidirectionalStream extends BidirectionalStream {
CronetBidirectionalStream(CronetUrlRequestContext requestContext, String url,
@BidirectionalStream.Builder.StreamPriority int priority, Callback callback,
- Executor executor, String httpMethod, List<Map.Entry<String, String>> requestHeaders) {
+ Executor executor, String httpMethod, List<Map.Entry<String, String>> requestHeaders,
+ boolean disableAutoFlush) {
mRequestContext = requestContext;
mInitialUrl = url;
mInitialPriority = convertStreamPriority(priority);
@@ -189,6 +210,9 @@ class CronetBidirectionalStream extends BidirectionalStream {
mExecutor = executor;
mInitialMethod = httpMethod;
mRequestHeaders = stringsFromHeaderList(requestHeaders);
+ mDisableAutoFlush = disableAutoFlush;
+ mPendingData = new LinkedList<>();
+ mFlushData = new LinkedList<>();
}
@Override
@@ -199,7 +223,7 @@ class CronetBidirectionalStream extends BidirectionalStream {
}
try {
mNativeStream = nativeCreateBidirectionalStream(
- mRequestContext.getUrlRequestContextAdapter());
+ mRequestContext.getUrlRequestContextAdapter(), mDisableAutoFlush);
mRequestContext.onRequestStarted();
// Non-zero startResult means an argument error.
int startResult = nativeStart(mNativeStream, mInitialUrl, mInitialPriority,
@@ -253,28 +277,71 @@ class CronetBidirectionalStream extends BidirectionalStream {
if (!buffer.hasRemaining() && !endOfStream) {
throw new IllegalArgumentException("Empty buffer before end of stream.");
}
- if (mWriteState != State.WAITING_FOR_WRITE) {
- throw new IllegalStateException("Unexpected write attempt.");
+ if (mEndOfStreamWritten) {
+ throw new IllegalArgumentException("Write after writing end of stream.");
}
if (isDoneLocked()) {
return;
}
- if (mOnWriteCompletedTask == null) {
- mOnWriteCompletedTask = new OnWriteCompletedRunnable();
+ mPendingData.add(buffer);
+ if (endOfStream) {
+ mEndOfStreamWritten = true;
}
- mOnWriteCompletedTask.mEndOfStream = endOfStream;
- mWriteState = State.WRITING;
- if (!nativeWriteData(
- mNativeStream, buffer, buffer.position(), buffer.limit(), endOfStream)) {
- // Still waiting on write. This is just to have consistent
- // behavior with the other error cases.
- mWriteState = State.WAITING_FOR_WRITE;
- throw new IllegalArgumentException("Unable to call native write");
+ if (!mDisableAutoFlush) {
+ flushLocked();
}
}
}
@Override
+ public void flush() {
+ synchronized (mNativeStreamLock) {
+ flushLocked();
+ }
+ }
+
+ @SuppressWarnings("GuardedByChecker")
+ private void flushLocked() {
+ if (isDoneLocked()) {
+ return;
+ }
+ if (mPendingData.isEmpty() && mFlushData.isEmpty()) {
+ // No-op if there is nothing to write.
+ return;
+ }
+
+ // Move buffers from mPendingData to the flushing queue.
+ if (!mPendingData.isEmpty()) {
+ mFlushData.addAll(mPendingData);
+ mPendingData.clear();
+ }
+
+ if (mWriteState == State.WRITING) {
+ // If there is a write already pending, wait until onWritevCompleted is
+ // called before pushing data to the native stack.
+ return;
+ }
+ int size = mFlushData.size();
+ ByteBuffer[] buffers = new ByteBuffer[size];
+ int[] positions = new int[size];
+ int[] limits = new int[size];
+ for (int i = 0; i < size; i++) {
+ ByteBuffer buffer = mFlushData.poll();
+ buffers[i] = buffer;
+ positions[i] = buffer.position();
+ limits[i] = buffer.limit();
+ }
+ assert mFlushData.isEmpty();
+ mWriteState = State.WRITING;
+ if (!nativeWritevData(mNativeStream, buffers, positions, limits, mEndOfStreamWritten)) {
+ // Still waiting on write. This is just to have consistent
+ // behavior with the other error cases.
+ mWriteState = State.WAITING_FOR_WRITE;
+ throw new IllegalArgumentException("Unable to call native writev.");
+ }
+ }
+
+ @Override
public void ping(PingCallback callback, Executor executor) {
// TODO(mef): May be last thing to be implemented on Android.
throw new UnsupportedOperationException("ping is not supported yet.");
@@ -309,9 +376,32 @@ class CronetBidirectionalStream extends BidirectionalStream {
return mReadState != State.NOT_STARTED && mNativeStream == 0;
}
+ /*
+ * Runs an onSucceeded callback if both Read and Write sides are closed.
+ */
+ private void maybeOnSucceededOnExecutor() {
+ synchronized (mNativeStreamLock) {
+ if (isDoneLocked()) {
+ return;
+ }
+ if (!(mWriteState == State.WRITING_DONE && mReadState == State.READING_DONE)) {
+ return;
+ }
+ mReadState = mWriteState = State.SUCCESS;
+ // Destroy native stream first, so UrlRequestContext could be shut
+ // down from the listener.
+ destroyNativeStreamLocked(false);
+ }
+ try {
+ mCallback.onSucceeded(CronetBidirectionalStream.this, mResponseInfo);
+ } catch (Exception e) {
+ Log.e(CronetUrlRequestContext.LOG_TAG, "Exception in onSucceeded method", e);
+ }
+ }
+
@SuppressWarnings("unused")
@CalledByNative
- private void onRequestHeadersSent() {
+ private void onStreamReady() {
postTaskToExecutor(new Runnable() {
public void run() {
synchronized (mNativeStreamLock) {
@@ -320,13 +410,14 @@ class CronetBidirectionalStream extends BidirectionalStream {
}
if (doesMethodAllowWriteData(mInitialMethod)) {
mWriteState = State.WAITING_FOR_WRITE;
+ mReadState = State.WAITING_FOR_READ;
} else {
mWriteState = State.WRITING_DONE;
}
}
try {
- mCallback.onRequestHeadersSent(CronetBidirectionalStream.this);
+ mCallback.onStreamReady(CronetBidirectionalStream.this);
} catch (Exception e) {
onCallbackException(e);
}
@@ -391,18 +482,28 @@ class CronetBidirectionalStream extends BidirectionalStream {
@SuppressWarnings("unused")
@CalledByNative
- private void onWriteCompleted(
- final ByteBuffer byteBuffer, int initialPosition, int initialLimit) {
- if (byteBuffer.position() != initialPosition || byteBuffer.limit() != initialLimit) {
- failWithException(
- new CronetException("ByteBuffer modified externally during write", null));
- return;
+ private void onWritevCompleted(final ByteBuffer[] byteBuffers, int[] initialPositions,
+ int[] initialLimits, boolean endOfStream) {
+ assert byteBuffers.length == initialPositions.length;
+ assert byteBuffers.length == initialLimits.length;
+ synchronized (mNativeStreamLock) {
+ mWriteState = State.WAITING_FOR_WRITE;
+ // Flush if there is anything in the flush queue mFlushData.
+ if (!mFlushData.isEmpty()) {
+ flushLocked();
+ }
+ }
+ for (int i = 0; i < byteBuffers.length; i++) {
+ ByteBuffer buffer = byteBuffers[i];
+ if (buffer.position() != initialPositions[i] || buffer.limit() != initialLimits[i]) {
+ failWithException(
+ new CronetException("ByteBuffer modified externally during write", null));
+ return;
+ }
+ // Current implementation always writes the complete buffer.
+ buffer.position(buffer.limit());
+ postTaskToExecutor(new OnWriteCompletedRunnable(buffer, endOfStream));
}
- // Current implementation always writes the complete buffer.
- byteBuffer.position(byteBuffer.limit());
- assert mOnWriteCompletedTask.mByteBuffer == null;
- mOnWriteCompletedTask.mByteBuffer = byteBuffer;
- postTaskToExecutor(mOnWriteCompletedTask);
}
@SuppressWarnings("unused")
@@ -501,38 +602,6 @@ class CronetBidirectionalStream extends BidirectionalStream {
}
/**
- * Checks whether reading and writing are done.
- * @return false if either reading or writing is not done. If both reading and writing
- * are done, then posts cleanup task and returns true.
- */
- @GuardedBy("mNativeStreamLock")
- private boolean maybeSucceedLocked() {
- if (mReadState != State.READING_DONE || mWriteState != State.WRITING_DONE) {
- return false;
- }
-
- mReadState = mWriteState = State.SUCCESS;
- postTaskToExecutor(new Runnable() {
- public void run() {
- synchronized (mNativeStreamLock) {
- if (isDoneLocked()) {
- return;
- }
- // Destroy native stream first, so UrlRequestContext could be shut
- // down from the listener.
- destroyNativeStreamLocked(false);
- }
- try {
- mCallback.onSucceeded(CronetBidirectionalStream.this, mResponseInfo);
- } catch (Exception e) {
- Log.e(CronetUrlRequestContext.LOG_TAG, "Exception in onSucceeded method", e);
- }
- }
- });
- return true;
- }
-
- /**
* Posts task to application Executor. Used for callbacks
* and other tasks that should not be executed on network thread.
*/
@@ -578,7 +647,7 @@ class CronetBidirectionalStream extends BidirectionalStream {
* Fails the stream with an exception. Only called on the Executor.
*/
private void failWithExceptionOnExecutor(CronetException e) {
- // Do not call into listener if request is complete.
+ // Do not call into mCallback if request is complete.
synchronized (mNativeStreamLock) {
if (isDoneLocked()) {
return;
@@ -618,7 +687,8 @@ class CronetBidirectionalStream extends BidirectionalStream {
}
// Native methods are implemented in cronet_bidirectional_stream_adapter.cc.
- private native long nativeCreateBidirectionalStream(long urlRequestContextAdapter);
+ private native long nativeCreateBidirectionalStream(
+ long urlRequestContextAdapter, boolean disableAutoFlush);
@NativeClassQualifiedName("CronetBidirectionalStreamAdapter")
private native int nativeStart(long nativePtr, String url, int priority, String method,
@@ -629,8 +699,8 @@ class CronetBidirectionalStream extends BidirectionalStream {
long nativePtr, ByteBuffer byteBuffer, int position, int limit);
@NativeClassQualifiedName("CronetBidirectionalStreamAdapter")
- private native boolean nativeWriteData(
- long nativePtr, ByteBuffer byteBuffer, int position, int limit, boolean endOfStream);
+ private native boolean nativeWritevData(long nativePtr, ByteBuffer[] buffers, int[] positions,
+ int[] limits, boolean endOfStream);
@NativeClassQualifiedName("CronetBidirectionalStreamAdapter")
private native void nativeDestroy(long nativePtr, boolean sendOnCanceled);

Powered by Google App Engine
This is Rietveld 408576698