Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(316)

Unified Diff: components/cronet/android/java/src/org/chromium/net/CronetBidirectionalStream.java

Issue 1856073002: Coalesce small buffers in net::BidirectionalStream (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Fix Javadoc Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: components/cronet/android/java/src/org/chromium/net/CronetBidirectionalStream.java
diff --git a/components/cronet/android/java/src/org/chromium/net/CronetBidirectionalStream.java b/components/cronet/android/java/src/org/chromium/net/CronetBidirectionalStream.java
index 4c0fae741dbf49d5534a516ada1703a9e03fb8b0..f5a9ee46b1440971dd56ee1543e15a8fe7498dd6 100644
--- a/components/cronet/android/java/src/org/chromium/net/CronetBidirectionalStream.java
+++ b/components/cronet/android/java/src/org/chromium/net/CronetBidirectionalStream.java
@@ -14,6 +14,7 @@ import java.nio.ByteBuffer;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
@@ -69,12 +70,21 @@ class CronetBidirectionalStream extends BidirectionalStream {
private final int mInitialPriority;
private final String mInitialMethod;
private final String mRequestHeaders[];
+ private final boolean mDisableAutoFlush;
/*
* Synchronizes access to mNativeStream, mReadState and mWriteState.
*/
private final Object mNativeStreamLock = new Object();
+ @GuardedBy("mNativeStreamLock")
+ // Only relevant if mDisableAutoFlush is true.
+ private final LinkedList<ByteBuffer> mPendingData;
+
+ @GuardedBy("mNativeStreamLock")
+ // Only relevant if mDisableAutoFlush is true.
kapishnikov 2016/04/11 23:09:41 Would it be worth to unify the implementation of a
xunjieli 2016/04/12 18:01:48 Done. Although partially. I think we probably shou
+ private boolean mEndOfStreamWritten;
+
/* Native BidirectionalStream object, owned by CronetBidirectionalStream. */
@GuardedBy("mNativeStreamLock")
private long mNativeStream;
@@ -181,7 +191,8 @@ class CronetBidirectionalStream extends BidirectionalStream {
CronetBidirectionalStream(CronetUrlRequestContext requestContext, String url,
@BidirectionalStream.Builder.StreamPriority int priority, Callback callback,
- Executor executor, String httpMethod, List<Map.Entry<String, String>> requestHeaders) {
+ Executor executor, String httpMethod, List<Map.Entry<String, String>> requestHeaders,
+ boolean disableAutoFlush) {
mRequestContext = requestContext;
mInitialUrl = url;
mInitialPriority = convertStreamPriority(priority);
@@ -189,6 +200,12 @@ class CronetBidirectionalStream extends BidirectionalStream {
mExecutor = executor;
mInitialMethod = httpMethod;
mRequestHeaders = stringsFromHeaderList(requestHeaders);
+ mDisableAutoFlush = disableAutoFlush;
+ if (mDisableAutoFlush) {
+ mPendingData = new LinkedList<ByteBuffer>();
+ } else {
+ mPendingData = null;
+ }
}
@Override
@@ -199,7 +216,7 @@ class CronetBidirectionalStream extends BidirectionalStream {
}
try {
mNativeStream = nativeCreateBidirectionalStream(
- mRequestContext.getUrlRequestContextAdapter());
+ mRequestContext.getUrlRequestContextAdapter(), mDisableAutoFlush);
mRequestContext.onRequestStarted();
// Non-zero startResult means an argument error.
int startResult = nativeStart(mNativeStream, mInitialUrl, mInitialPriority,
@@ -253,16 +270,23 @@ class CronetBidirectionalStream extends BidirectionalStream {
if (!buffer.hasRemaining() && !endOfStream) {
throw new IllegalArgumentException("Empty buffer before end of stream.");
}
- if (mWriteState != State.WAITING_FOR_WRITE) {
- throw new IllegalStateException("Unexpected write attempt.");
- }
if (isDoneLocked()) {
kapishnikov 2016/04/11 23:09:42 Should we throw an IllegalStateException if there
xunjieli 2016/04/12 18:01:48 Done.
return;
}
+
+ if (mDisableAutoFlush) {
+ mPendingData.add(buffer);
+ if (endOfStream) {
+ mEndOfStreamWritten = true;
+ }
+ return;
+ }
+ if (mWriteState != State.WAITING_FOR_WRITE) {
+ throw new IllegalStateException("Unexpected write attempt.");
+ }
if (mOnWriteCompletedTask == null) {
mOnWriteCompletedTask = new OnWriteCompletedRunnable();
}
- mOnWriteCompletedTask.mEndOfStream = endOfStream;
mWriteState = State.WRITING;
if (!nativeWriteData(
mNativeStream, buffer, buffer.position(), buffer.limit(), endOfStream)) {
@@ -275,6 +299,30 @@ class CronetBidirectionalStream extends BidirectionalStream {
}
@Override
+ public void flush() {
+ synchronized (mNativeStreamLock) {
+ if (!mDisableAutoFlush) {
+ throw new IllegalStateException("flush is called when auto flush is not disabled.");
+ }
+
+ int size = mPendingData.size();
+ ByteBuffer[] buffers = new ByteBuffer[size];
+ int[] positions = new int[size];
+ int[] limits = new int[size];
+ for (int i = 0; i < size; i++) {
+ ByteBuffer buffer = mPendingData.poll();
+ buffers[i] = buffer;
+ positions[i] = buffer.position();
+ limits[i] = buffer.limit();
+ }
+ assert mPendingData.isEmpty();
+ if (!nativeWritevData(mNativeStream, buffers, positions, limits, mEndOfStreamWritten)) {
+ throw new IllegalArgumentException("Unable to call native write");
+ }
+ }
+ }
+
+ @Override
public void ping(PingCallback callback, Executor executor) {
// TODO(mef): May be last thing to be implemented on Android.
throw new UnsupportedOperationException("ping is not supported yet.");
@@ -311,7 +359,7 @@ class CronetBidirectionalStream extends BidirectionalStream {
@SuppressWarnings("unused")
@CalledByNative
- private void onRequestHeadersSent() {
+ private void onStreamReady() {
postTaskToExecutor(new Runnable() {
public void run() {
synchronized (mNativeStreamLock) {
@@ -326,7 +374,7 @@ class CronetBidirectionalStream extends BidirectionalStream {
}
try {
- mCallback.onRequestHeadersSent(CronetBidirectionalStream.this);
+ mCallback.onStreamReady(CronetBidirectionalStream.this);
} catch (Exception e) {
onCallbackException(e);
}
@@ -391,8 +439,8 @@ class CronetBidirectionalStream extends BidirectionalStream {
@SuppressWarnings("unused")
@CalledByNative
- private void onWriteCompleted(
- final ByteBuffer byteBuffer, int initialPosition, int initialLimit) {
+ private void onWriteCompleted(final ByteBuffer byteBuffer, int initialPosition,
+ int initialLimit, boolean endOfStream) {
if (byteBuffer.position() != initialPosition || byteBuffer.limit() != initialLimit) {
failWithException(
new CronetException("ByteBuffer modified externally during write", null));
@@ -402,11 +450,35 @@ class CronetBidirectionalStream extends BidirectionalStream {
byteBuffer.position(byteBuffer.limit());
assert mOnWriteCompletedTask.mByteBuffer == null;
mOnWriteCompletedTask.mByteBuffer = byteBuffer;
+ mOnWriteCompletedTask.mEndOfStream = endOfStream;
postTaskToExecutor(mOnWriteCompletedTask);
}
@SuppressWarnings("unused")
@CalledByNative
+ private void onWritevCompleted(final ByteBuffer[] byteBuffers, int[] initialPositions,
kapishnikov 2016/04/11 23:09:42 It looks that onWritevCompleted() is called after
xunjieli 2016/04/12 18:01:48 As discussed, we don't have the signal about each
+ int[] initialLimits, boolean endOfStream) {
+ assert byteBuffers.length == initialPositions.length;
+ assert byteBuffers.length == initialLimits.length;
+ assert initialPositions.length == initialLimits.length;
+ for (int i = 0; i < byteBuffers.length; i++) {
+ ByteBuffer buffer = byteBuffers[i];
+ if (buffer.position() != initialPositions[i] || buffer.limit() != initialLimits[i]) {
+ failWithException(
+ new CronetException("ByteBuffer modified externally during write", null));
+ return;
+ }
+ // Current implementation always writes the complete buffer.
+ buffer.position(buffer.limit());
+ OnWriteCompletedRunnable runnable = new OnWriteCompletedRunnable();
+ runnable.mByteBuffer = buffer;
+ runnable.mEndOfStream = endOfStream;
+ postTaskToExecutor(runnable);
+ }
+ }
+
+ @SuppressWarnings("unused")
+ @CalledByNative
private void onResponseTrailersReceived(String[] trailers) {
final UrlResponseInfo.HeaderBlock trailersBlock =
new UrlResponseInfo.HeaderBlock(headersListFromStrings(trailers));
@@ -618,7 +690,8 @@ class CronetBidirectionalStream extends BidirectionalStream {
}
// Native methods are implemented in cronet_bidirectional_stream_adapter.cc.
- private native long nativeCreateBidirectionalStream(long urlRequestContextAdapter);
+ private native long nativeCreateBidirectionalStream(
+ long urlRequestContextAdapter, boolean disableAutoFlush);
@NativeClassQualifiedName("CronetBidirectionalStreamAdapter")
private native int nativeStart(long nativePtr, String url, int priority, String method,
@@ -633,5 +706,9 @@ class CronetBidirectionalStream extends BidirectionalStream {
long nativePtr, ByteBuffer byteBuffer, int position, int limit, boolean endOfStream);
@NativeClassQualifiedName("CronetBidirectionalStreamAdapter")
+ private native boolean nativeWritevData(long nativePtr, ByteBuffer[] buffers, int[] positions,
+ int[] limits, boolean endOfStream);
+
+ @NativeClassQualifiedName("CronetBidirectionalStreamAdapter")
private native void nativeDestroy(long nativePtr, boolean sendOnCanceled);
}

Powered by Google App Engine
This is Rietveld 408576698