Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(102)

Unified Diff: components/cronet/android/java/src/org/chromium/net/CronetBidirectionalStream.java

Issue 1412243012: Initial implementation of CronetBidirectionalStream. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Little cleanup. Created 5 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: components/cronet/android/java/src/org/chromium/net/CronetBidirectionalStream.java
diff --git a/components/cronet/android/java/src/org/chromium/net/CronetBidirectionalStream.java b/components/cronet/android/java/src/org/chromium/net/CronetBidirectionalStream.java
new file mode 100644
index 0000000000000000000000000000000000000000..5795d3debdc58f1206501db424e16cd53ad4353b
--- /dev/null
+++ b/components/cronet/android/java/src/org/chromium/net/CronetBidirectionalStream.java
@@ -0,0 +1,613 @@
+// Copyright 2015 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+package org.chromium.net;
+
+import android.util.Log;
+
+import org.chromium.base.VisibleForTesting;
+import org.chromium.base.annotations.CalledByNative;
+import org.chromium.base.annotations.JNINamespace;
+import org.chromium.base.annotations.NativeClassQualifiedName;
+
+import java.nio.ByteBuffer;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
+
+import javax.annotation.concurrent.GuardedBy;
+
+/**
+ * BidirectionalStream implementation using Chromium network stack.
+ * All @CallByNative methods are called on native network thread
+ * and post tasks with callback calls onto Executor. Upon return from callback native
+ * stream is called on executor thread and posts native tasks to native network thread.
+ */
+@JNINamespace("cronet")
+class CronetBidirectionalStream extends BidirectionalStream {
+ /* Native BidirectionalStream object, owned by CronetBidirectionalStream. */
+ @GuardedBy("mNativeStreamLock") private long mNativeStream;
xunjieli 2015/12/08 19:07:20 nit: Maybe reorder these fields. Static fields bef
mef 2015/12/11 21:28:25 Done.
+
+ private static final int STATE_NOT_STARTED = 0;
+ private static final int STATE_STARTED = 1;
+ private static final int STATE_WAITING_ON_READ = 2;
+ private static final int STATE_READING = 3;
+ private static final int STATE_READING_DONE = 4;
+ private static final int STATE_CANCELED = 5;
+ private static final int STATE_ERROR = 6;
+ private static final int STATE_SUCCESS = 7;
+ /**
+ * Stream state covering common and reading state flow.
+ * NOT_STARTED -> STARTED -> WAITING_ON_READ -> READING -> WAITING_ON_READ ->
+ * READING -> READING_DONE -> SUCCESS
+ */
+ @GuardedBy("mNativeStreamLock") private int mStreamState = STATE_NOT_STARTED;
+
+ private static final int STATE_WAITING_ON_WRITE = 10;
+ private static final int STATE_WRITING = 11;
+ private static final int STATE_WRITING_END_OF_STREAM = 12;
+ private static final int STATE_WRITING_DONE = 13;
xunjieli 2015/12/08 19:07:19 Need a comment for each of these states. It's uncl
mef 2015/12/11 21:28:26 Done.
+ /**
+ * Write state covering writing state flow.
+ * NOT_STARTED -> WAITING_ON_WRITE -> WRITING -> WAITING_ON_WRITE ->
+ * WRITING_END_OF_STREAM -> WRITING_DONE -> SUCCESS
xunjieli 2015/12/08 19:07:20 Maybe consider combining the read state flow with
mef 2015/12/11 21:28:26 Yeah, I'm not sure what's a good way to express th
+ */
+ @GuardedBy("mNativeStreamLock") private int mWriteState = STATE_NOT_STARTED;
+
+ /*
+ * Synchronize access to mNativeStream, mStreamState and mWriteState.
+ */
+ private final Object mNativeStreamLock = new Object();
+ private final CronetUrlRequestContext mRequestContext;
+ private final Executor mExecutor;
+
+ private final Callback mCallback;
+ private final String mInitialUrl;
+ private String mInitialMethod;
xunjieli 2015/12/08 17:59:56 nit: final field.
mef 2015/12/11 21:28:26 Done.
+
+ private final ArrayList<Map.Entry<String, String>> mRequestHeaders;
+
+ private UrlResponseInfo mResponseInfo;
+
+ /*
+ * OnReadCompleted callback is repeatedly invoked when each read is completed, so it
+ * is cached as a member variable.
+ */
+ private OnReadCompletedRunnable mOnReadCompletedTask;
+
+ /*
+ * OnWriteCompleted callback is repeatedly invoked when each write is completed, so it
+ * is cached as a member variable.
+ */
+ private OnWriteCompletedRunnable mOnWriteCompletedTask;
+
+ private Runnable mOnDestroyedCallbackForTests;
+
+ private final class OnReadCompletedRunnable implements Runnable {
+ ByteBuffer mByteBuffer;
+ boolean mEndOfStream;
+
+ @Override
+ public void run() {
+ Log.e(CronetUrlRequestContext.LOG_TAG,
+ "OnReadCompletedRunnable:" + mByteBuffer.toString());
xunjieli 2015/12/08 19:07:19 nit: This probably shouldn't be Log.e. Maybe we sh
mef 2015/12/11 21:28:26 Done. Used those for debugging.
+
+ if (isDone()) {
+ return;
+ }
+ try {
+ synchronized (mNativeStreamLock) {
+ if (mNativeStream == 0) {
+ return;
+ }
+ if (mEndOfStream) {
+ mStreamState = STATE_READING_DONE;
+ if (maybeSucceeded()) return;
+ } else {
+ mStreamState = STATE_WAITING_ON_READ;
+ }
+ }
+ // Null out mByteBuffer, out of paranoia. Has to be done before
+ // mCallback call, to avoid any race when there are multiple
+ // executor threads.
+ ByteBuffer buffer = mByteBuffer;
+ mByteBuffer = null;
+ mCallback.onReadCompleted(CronetBidirectionalStream.this, mResponseInfo, buffer);
+ } catch (Exception e) {
+ onCallbackException(e);
+ }
+ }
+ }
+
+ private final class OnWriteCompletedRunnable implements Runnable {
+ ByteBuffer mByteBuffer;
+
+ @Override
+ public void run() {
+ if (isDone()) {
+ return;
+ }
+ try {
+ synchronized (mNativeStreamLock) {
+ if (mNativeStream == 0) {
+ return;
+ }
+ if (mWriteState == STATE_WRITING_END_OF_STREAM) {
+ mWriteState = STATE_WRITING_DONE;
+ if (maybeSucceeded()) return;
+ } else {
+ mWriteState = STATE_WAITING_ON_WRITE;
+ }
+ }
+ // Null out mByteBuffer, out of paranoia. Has to be done before
+ // mCallback call, to avoid any race when there are multiple
+ // executor threads.
+ ByteBuffer buffer = mByteBuffer;
+ mByteBuffer = null;
+ mCallback.onWriteCompleted(CronetBidirectionalStream.this, mResponseInfo, buffer);
+ } catch (Exception e) {
+ onCallbackException(e);
+ }
+ }
+ }
+
+ @GuardedBy("nativeStreamLock")
+ private boolean maybeSucceeded() {
+ if (mStreamState == STATE_READING_DONE && mWriteState == STATE_WRITING_DONE) {
+ mStreamState = STATE_SUCCESS;
+ onSucceeded();
xunjieli 2015/12/08 17:59:56 maybe consider inline onSucceeded? since the metho
mef 2015/12/11 21:28:25 Done.
+ return true;
+ }
+ return false;
+ }
+
+ CronetBidirectionalStream(CronetUrlRequestContext requestContext, long urlRequestContextAdapter,
+ String url, Callback callback, Executor executor, String httpMethod,
+ List<Map.Entry<String, String>> requestHeaders) {
+ mRequestContext = requestContext;
+ mInitialUrl = url;
+ mCallback = callback;
+ mExecutor = executor;
+ mInitialMethod = httpMethod;
+ mRequestHeaders = new ArrayList<Map.Entry<String, String>>(requestHeaders);
+ }
+
+ @Override
+ public void start() {
+ synchronized (mNativeStreamLock) {
+ if (mStreamState != STATE_NOT_STARTED) {
+ throw new IllegalStateException("Stream is already started.");
+ }
+ try {
+ mNativeStream = nativeCreateBidirectionalStream(
+ mRequestContext.getUrlRequestContextAdapter());
+ mRequestContext.onRequestStarted();
+ String headers[] = stringsFromHeaderList(mRequestHeaders);
+ // Non-zero startResult means an argument error.
+ int startResult = nativeStart(mNativeStream, mInitialUrl, mInitialMethod, headers);
+ if (startResult == -1) {
+ throw new IllegalArgumentException("Invalid http method " + mInitialMethod);
+ }
+ if (startResult > 0) {
+ int headerPos = startResult - 1;
+ throw new IllegalArgumentException(
+ "Invalid header " + headers[headerPos] + "=" + headers[headerPos + 1]);
+ }
+ mStreamState = STATE_STARTED;
+ } catch (RuntimeException e) {
+ // If there's an exception, cleanup and then throw the
+ // exception to the caller.
+ destroyNativeStream(false);
+ throw e;
+ }
+ }
+ }
+
+ @Override
+ public void read(ByteBuffer buffer) {
+ synchronized (mNativeStreamLock) {
+ if (!buffer.hasRemaining()) {
+ throw new IllegalArgumentException("ByteBuffer is already full.");
+ }
+ if (mStreamState != STATE_WAITING_ON_READ) {
+ throw new IllegalStateException("Unexpected read attempt.");
+ }
+ if (isDone()) {
+ return;
+ }
+ mStreamState = STATE_READING;
+ if (!nativeReadData(mNativeStream, buffer, buffer.position(), buffer.limit())) {
+ // Still waiting on read. This is just to have consistent
+ // behavior with the other error cases.
+ mStreamState = STATE_WAITING_ON_READ;
+ // Since accessing byteBuffer's memory failed, it's presumably
+ // not a direct ByteBuffer.
+ throw new IllegalArgumentException("byteBuffer must be a direct ByteBuffer.");
+ }
+ }
+ }
+
+ @Override
+ public void write(ByteBuffer buffer, boolean endOfStream) {
+ synchronized (mNativeStreamLock) {
+ if (!buffer.hasRemaining() && !endOfStream) {
+ throw new IllegalArgumentException("Empty buffer before end of stream.");
+ }
+ if (mWriteState != STATE_WAITING_ON_WRITE) {
+ throw new IllegalStateException("Unexpected write attempt.");
+ }
+ if (isDone()) {
+ return;
+ }
+ mWriteState = endOfStream ? STATE_WRITING_END_OF_STREAM : STATE_WRITING;
+ if (!nativeWriteData(
+ mNativeStream, buffer, buffer.position(), buffer.limit(), endOfStream)) {
+ // Still waiting on write. This is just to have consistent
+ // behavior with the other error cases.
+ mWriteState = STATE_WAITING_ON_WRITE;
+ // Since accessing byteBuffer's memory failed, it's presumably
+ // not a direct ByteBuffer.
+ throw new IllegalArgumentException("byteBuffer must be a direct ByteBuffer.");
+ }
+ }
+ }
+
+ @Override
+ public void ping(PingCallback callback, Executor executor) {
+ // TODO(mef): May be last thing to be implemented on Android.
+ }
+
+ @Override
+ public void windowUpdate(int windowSizeIncrement) {
+ // TODO(mef): Understand the needs and semantics of this method.
+ }
+
+ /**
+ * Cancels the stream. Can be called at any time after {@link #start}.
+ * {@link Callback#onCanceled} will be invoked when cancelation
+ * is complete and no further callback methods will be invoked. If the
+ * stream has completed or has not started, calling {@code cancel()} has no
+ * effect and {@code onCanceled} will not be invoked. If the
+ * {@link Executor} passed in during {@code BidirectionalStream} construction runs
+ * tasks on a single thread, and {@code cancel()} is called on that thread,
+ * no listener methods (besides {@code onCanceled()}) will be invoked after
+ * {@code cancel()} is called. Otherwise, at most one callback method may be
+ * invoked after {@code cancel()} has completed.
+ */
+ public void cancel() {
+ synchronized (mNativeStreamLock) {
+ if (isDone() || mStreamState == STATE_NOT_STARTED) {
+ return;
+ }
+ mStreamState = STATE_CANCELED;
+ destroyNativeStream(true);
+ }
+ }
+
+ @Override
+ public boolean isDone() {
+ synchronized (mNativeStreamLock) {
+ return mStreamState != STATE_NOT_STARTED && mNativeStream == 0;
+ }
+ }
+
+ @SuppressWarnings("unused")
+ @CalledByNative
+ private void onRequestHeadersSent() {
+ Runnable task = new Runnable() {
+ public void run() {
+ synchronized (mNativeStreamLock) {
+ if (isDone()) {
+ return;
+ }
+ if (mInitialMethod == "GET") {
+ mWriteState = STATE_WRITING_DONE;
xunjieli 2015/12/08 17:59:56 After applying your latest suggested edit, client
mef 2015/12/11 21:28:26 Per our discussion requiring at least one write is
+ } else {
+ mWriteState = STATE_WAITING_ON_WRITE;
+ }
+ }
+
+ try {
+ mCallback.onRequestHeadersSent(CronetBidirectionalStream.this);
+ } catch (Exception e) {
+ onCallbackException(e);
+ }
+ }
+ };
+ postTaskToExecutor(task);
+ }
+
+ /**
+ * Called when the final set of headers, after all redirects,
+ * is received. Can only be called once for each stream.
+ */
+ @SuppressWarnings("unused")
+ @CalledByNative
+ private void onResponseHeadersReceived(int httpStatusCode, String[] headers) {
+ mResponseInfo = prepareResponseInfoOnNetworkThread(httpStatusCode, headers);
+ Runnable task = new Runnable() {
+ public void run() {
+ synchronized (mNativeStreamLock) {
+ if (isDone()) {
+ return;
+ }
+ mStreamState = STATE_WAITING_ON_READ;
+ }
+
+ try {
+ mCallback.onResponseHeadersReceived(
+ CronetBidirectionalStream.this, mResponseInfo);
+ } catch (Exception e) {
+ onCallbackException(e);
+ }
+ }
+ };
+ postTaskToExecutor(task);
+ }
+
+ @SuppressWarnings("unused")
+ @CalledByNative
+ private void onReadCompleted(final ByteBuffer byteBuffer, int bytesRead, int initialPosition,
+ long receivedBytesCount) {
+ mResponseInfo.setReceivedBytesCount(receivedBytesCount);
+ if (byteBuffer.position() != initialPosition) {
+ failWithException(
+ new CronetException("ByteBuffer modified externally during read", null));
+ return;
+ }
+ if (bytesRead < 0 || initialPosition + bytesRead > byteBuffer.limit()) {
+ failWithException(new CronetException("Invalid number of bytes read", null));
+ return;
+ }
+ if (mOnReadCompletedTask == null) {
+ mOnReadCompletedTask = new OnReadCompletedRunnable();
+ }
+ byteBuffer.position(initialPosition + bytesRead);
+ mOnReadCompletedTask.mByteBuffer = byteBuffer;
+ mOnReadCompletedTask.mEndOfStream = (bytesRead == 0);
+ postTaskToExecutor(mOnReadCompletedTask);
+ }
+
+ @SuppressWarnings("unused")
+ @CalledByNative
+ private void onWriteCompleted(final ByteBuffer byteBuffer, int initialPosition) {
+ if (byteBuffer.position() != initialPosition) {
+ failWithException(
+ new CronetException("ByteBuffer modified externally during write", null));
+ return;
+ }
+ if (mOnWriteCompletedTask == null) {
+ mOnWriteCompletedTask = new OnWriteCompletedRunnable();
+ }
+ // Current implementation always writes the complete buffer.
+ byteBuffer.position(byteBuffer.limit());
+ mOnWriteCompletedTask.mByteBuffer = byteBuffer;
+ postTaskToExecutor(mOnWriteCompletedTask);
+ }
+
+ @SuppressWarnings("unused")
+ @CalledByNative
+ private void onResponseTrailersReceived(String[] trailers) {
+ final UrlResponseInfo.HeaderBlock trailersBlock =
+ new UrlResponseInfo.HeaderBlock(headersListFromStrings(trailers));
+ Runnable task = new Runnable() {
+ public void run() {
+ synchronized (mNativeStreamLock) {
+ if (isDone()) {
+ return;
+ }
+ }
+ try {
+ mCallback.onResponseTrailersReceived(
+ CronetBidirectionalStream.this, mResponseInfo, trailersBlock);
+ } catch (Exception e) {
+ onCallbackException(e);
+ }
+ }
+ };
+ postTaskToExecutor(task);
+ }
+
+ private void onSucceeded() {
+ Runnable task = new Runnable() {
+ public void run() {
+ synchronized (mNativeStreamLock) {
+ if (isDone()) {
+ return;
+ }
+ // Destroy native stream first, so request context could be shut
+ // down from the listener.
+ destroyNativeStream(false);
+ }
+ try {
+ mCallback.onSucceeded(CronetBidirectionalStream.this, mResponseInfo);
+ } catch (Exception e) {
+ Log.e(CronetUrlRequestContext.LOG_TAG, "Exception in onSucceeded method", e);
+ }
+ }
+ };
+ postTaskToExecutor(task);
+ }
+
+ @SuppressWarnings("unused")
+ @CalledByNative
+ private void onError(final int nativeError, final String errorString, long receivedBytesCount) {
+ if (mResponseInfo != null) {
+ mResponseInfo.setReceivedBytesCount(receivedBytesCount);
+ }
+ failWithException(new CronetException(
+ "Exception in BidirectionalStream: " + errorString, nativeError));
+ }
+
+ /**
+ * Called when request is canceled, no callbacks will be called afterwards.
+ */
+ @SuppressWarnings("unused")
+ @CalledByNative
+ private void onCanceled() {
+ Runnable task = new Runnable() {
+ public void run() {
+ try {
+ mCallback.onCanceled(CronetBidirectionalStream.this, mResponseInfo);
+ } catch (Exception e) {
+ Log.e(CronetUrlRequestContext.LOG_TAG, "Exception in onCanceled method", e);
+ }
+ }
+ };
+ postTaskToExecutor(task);
+ }
+
+ @VisibleForTesting
+ public void setOnDestroyedCallbackForTests(Runnable onDestroyedCallbackForTests) {
+ mOnDestroyedCallbackForTests = onDestroyedCallbackForTests;
+ }
+
+ /**
+ * Posts task to application Executor. Used for callbacks
+ * and other tasks that should not be executed on network thread.
+ */
+ private void postTaskToExecutor(Runnable task) {
+ try {
+ mExecutor.execute(task);
+ } catch (RejectedExecutionException failException) {
+ Log.e(CronetUrlRequestContext.LOG_TAG, "Exception posting task to executor",
+ failException);
+ // If posting a task throws an exception, then there is no choice
+ // but to cancel the stream.
+ cancel();
+ }
+ }
+
+ private ArrayList<Map.Entry<String, String>> headersListFromStrings(String[] headers) {
+ ArrayList<Map.Entry<String, String>> headersList =
+ new ArrayList<Map.Entry<String, String>>();
+ for (int i = 0; i < headers.length; i += 2) {
+ headersList.add(new AbstractMap.SimpleImmutableEntry<String, String>(
+ headers[i], headers[i + 1]));
+ }
+ return headersList;
+ }
+
+ private String[] stringsFromHeaderList(ArrayList<Map.Entry<String, String>> headersList) {
+ String headersArray[] = new String[headersList.size() * 2];
+ int i = 0;
+ for (Map.Entry<String, String> requestHeader : headersList) {
+ headersArray[i++] = requestHeader.getKey();
+ headersArray[i++] = requestHeader.getValue();
+ }
+ return headersArray;
+ }
+
+ private UrlResponseInfo prepareResponseInfoOnNetworkThread(
+ int httpStatusCode, String[] headers) {
+ long nativeStream;
+ synchronized (mNativeStreamLock) {
+ if (mNativeStream == 0) {
+ return null;
+ }
+ // This method is running on network thread, so even if
+ // mUrlRequestAdapter is set to 0 from another thread the actual
+ // deletion of the adapter is posted to network thread, so it is
+ // safe to preserve and use urlRequestAdapter outside the lock.
+ nativeStream = mNativeStream;
+ }
+
+ ArrayList<String> urlChain = new ArrayList<String>();
+ urlChain.add(mInitialUrl);
+
+ boolean wasCached = false;
+ String httpStatusText = "";
+ String negotiatedProtocol = nativeGetNegotiatedProtocol(nativeStream);
+ String proxyServer = null;
+
+ UrlResponseInfo responseInfo = new UrlResponseInfo(urlChain, httpStatusCode, httpStatusText,
+ headersListFromStrings(headers), wasCached, negotiatedProtocol, proxyServer);
+ return responseInfo;
+ }
+
+ private void destroyNativeStream(boolean sendOnCanceled) {
+ synchronized (mNativeStreamLock) {
+ Log.i(CronetUrlRequestContext.LOG_TAG, "destroyNativeStream " + this.toString());
+ if (mNativeStream == 0) {
+ return;
+ }
+ nativeDestroy(mNativeStream, sendOnCanceled);
+ mRequestContext.onRequestDestroyed();
+ mNativeStream = 0;
+ if (mOnDestroyedCallbackForTests != null) {
+ mOnDestroyedCallbackForTests.run();
+ }
+ }
+ }
+
+ /**
+ * If callback method throws an exception, stream gets canceled
+ * and exception is reported via onFailed callback.
+ * Only called on the Executor.
+ */
+ private void onCallbackException(Exception e) {
+ CronetException streamError =
+ new CronetException("CalledByNative method has thrown an exception", e);
+ Log.e(CronetUrlRequestContext.LOG_TAG, "Exception in CalledByNative method", e);
+ // Do not call into listener if request is complete.
+ synchronized (mNativeStreamLock) {
+ if (isDone()) {
+ return;
+ }
+ destroyNativeStream(false);
+ }
+ try {
+ mCallback.onFailed(this, mResponseInfo, streamError);
+ } catch (Exception failException) {
+ Log.e(CronetUrlRequestContext.LOG_TAG, "Exception notifying of failed request",
+ failException);
+ }
+ }
+
+ /**
+ * Fails the stream with an exception. Can be called on any thread.
+ */
+ private void failWithException(final CronetException exception) {
+ Runnable task = new Runnable() {
+ public void run() {
+ synchronized (mNativeStreamLock) {
+ if (isDone()) {
+ return;
+ }
+ mStreamState = STATE_ERROR;
+ destroyNativeStream(false);
+ }
+ try {
+ mCallback.onFailed(CronetBidirectionalStream.this, mResponseInfo, exception);
+ } catch (Exception e) {
+ Log.e(CronetUrlRequestContext.LOG_TAG, "Exception in onError method", e);
+ }
+ }
+ };
+ postTaskToExecutor(task);
+ }
+
+ // Native methods are implemented in cronet_bidirectional_stream.cc.
+ private native long nativeCreateBidirectionalStream(long urlRequestContextAdapter);
+
+ @NativeClassQualifiedName("CronetBidirectionalStream")
+ private native int nativeStart(long nativePtr, String url, String method, String[] headers);
+
+ @NativeClassQualifiedName("CronetBidirectionalStream")
+ private native boolean nativeReadData(
+ long nativePtr, ByteBuffer byteBuffer, int position, int capacity);
+
+ @NativeClassQualifiedName("CronetBidirectionalStream")
+ private native boolean nativeWriteData(
+ long nativePtr, ByteBuffer byteBuffer, int position, int capacity, boolean endOfStream);
+
+ @NativeClassQualifiedName("CronetBidirectionalStream")
+ private native void nativeDestroy(long nativePtr, boolean sendOnCanceled);
+
+ @NativeClassQualifiedName("CronetBidirectionalStream")
+ private native String nativeGetNegotiatedProtocol(long nativePtr);
+}

Powered by Google App Engine
This is Rietveld 408576698