Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(279)

Unified Diff: components/cronet/android/java/src/org/chromium/net/CronetUploadDataStream.java

Issue 849903002: [Cronet] Upload support for async APIs (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Addressed Misha's comments Created 5 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: components/cronet/android/java/src/org/chromium/net/CronetUploadDataStream.java
diff --git a/components/cronet/android/java/src/org/chromium/net/CronetUploadDataStream.java b/components/cronet/android/java/src/org/chromium/net/CronetUploadDataStream.java
new file mode 100644
index 0000000000000000000000000000000000000000..90e191b7bb6a99c7fb4822faad132431b951e77b
--- /dev/null
+++ b/components/cronet/android/java/src/org/chromium/net/CronetUploadDataStream.java
@@ -0,0 +1,300 @@
+// Copyright 2015 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+package org.chromium.net;
+
+import org.chromium.base.CalledByNative;
+import org.chromium.base.JNINamespace;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.Executor;
+
+/**
+ * Pass an upload body to a UrlRequest using an UploadDataProvider.
+ */
+@JNINamespace("cronet")
+public class CronetUploadDataStream implements UploadDataSink {
pauljensen 2015/02/12 17:15:40 Why is this public? Should it be final like Cronet
xunjieli 2015/02/12 20:56:27 I think it is public is because we used it in test
xunjieli 2015/02/13 21:58:34 Alright, my renaming CL is landed. I have rebased,
+ // These are never changed, once a request starts.
+ private final Executor mExecutor;
+ private final UploadDataProvider mDataProvider;
+ private final long mLength;
+ private CronetUrlRequest mRequest;
+
+ // Reusable read task, to reduce redundant memory allocation.
+ private final Runnable mReadTask = new Runnable() {
+ @Override
+ public void run() {
+ synchronized (mLock) {
+ if (mReading || mRewinding || mByteBuffer == null
+ || mUploadDataStreamDelegate == 0) {
+ throw new IllegalStateException(
+ "Unexpected readData call.");
+ }
+ mReading = true;
+ }
+
+ try {
+ mDataProvider.read(CronetUploadDataStream.this, mByteBuffer);
+ } catch (Exception exception) {
+ onError(exception);
+ }
+ }
+ };
+
+ // ByteBuffer created in the native code and is passed to
+ // UploadDataProvider for reading. It is only valid from the
+ // call to mDataProvider.read until onError or onReadSucceeded.
+ private ByteBuffer mByteBuffer = null;
+
+ // Lock that protects all subsequent variables. The delegate has to be
+ // protected to ensure safe shutdown, mReading and mRewinding are protected
+ // to robustly detect getting read/rewind results more often than expected.
+ private final Object mLock = new Object();
+
+ // Native adapter delegate object, owned by the CronetUploadDataStream.
+ // It's only deleted after the native UploadDataStream object is destroyed.
+ // All access to the delegate is synchronized, for safe usage and cleanup.
+ private long mUploadDataStreamDelegate = 0;
+
+ private boolean mReading = false;
+ private boolean mRewinding = false;
+ private boolean mDestroyDelegatePostponed = false;
+
+ /**
+ * Constructs a CronetUploadDataStream.
+ * @param dataProvider the UploadDataProvider to read data from.
+ * @param executor the Executor to execute UploadDataProvider tasks.
+ */
+ public CronetUploadDataStream(UploadDataProvider dataProvider,
+ Executor executor) {
+ mExecutor = executor;
+ mDataProvider = dataProvider;
+ mLength = mDataProvider.getLength();
+ }
+
+ /**
+ * Called by native code to make the UploadDataProvider read data into
+ * {@code byteBuffer}.
+ */
+ @SuppressWarnings("unused")
+ @CalledByNative
+ void readData(ByteBuffer byteBuffer) {
+ mByteBuffer = byteBuffer;
+ mExecutor.execute(mReadTask);
+ }
+
+ // TODO(mmenke): Consider implementing a cancel method.
+ // currently wait for any pending read to complete.
+
+ /**
+ * Called by native code to make the UploadDataProvider rewind upload data.
+ */
+ @SuppressWarnings("unused")
+ @CalledByNative
+ void rewind() {
+ Runnable task = new Runnable() {
+ @Override
+ public void run() {
+ synchronized (mLock) {
+ if (mReading || mRewinding
+ || mUploadDataStreamDelegate == 0) {
+ throw new IllegalStateException(
+ "Unexpected rewind call.");
+ }
+ mRewinding = true;
+ }
pauljensen 2015/02/12 17:15:40 nit: you have a new line on line 36 but not here.
xunjieli 2015/02/12 20:56:27 Done.
+ try {
+ mDataProvider.rewind(CronetUploadDataStream.this);
+ } catch (Exception exception) {
+ onError(exception);
+ }
+ }
+ };
+ mExecutor.execute(task);
+ }
+
+ /**
+ * Called by native code to destroy the native adapter delegate, when the
+ * adapter is destroyed.
+ */
+ @SuppressWarnings("unused")
+ @CalledByNative
+ void onAdapterDestroyed() {
+ Runnable task = new Runnable() {
+ @Override
+ public void run() {
+ destroyDelegate();
+ }
+ };
+
+ mExecutor.execute(task);
+ }
+
+ /**
+ * Helper method called when an exception occurred. This method resets
+ * states and propagates the error to the request.
+ */
+ private void onError(Exception exception) {
+ synchronized (mLock) {
+ if (!mReading && !mRewinding) {
+ throw new IllegalStateException(
+ "There is no read or rewind in progress.");
+ }
+ mReading = false;
+ mRewinding = false;
+ mByteBuffer = null;
+ destroyDelegateIfPostponed();
+ }
+
+ // Just fail the request - simpler to fail directly, and
+ // UploadDataStream only supports failing during initialization, not
+ // while reading. This should be safe, even if we deleted the adapter,
+ // because in that case, the request has already been cancelled.
+ mRequest.onUploadException(exception);
+ }
+
+ @Override
+ public void onReadSucceeded(boolean lastChunk) {
+ synchronized (mLock) {
+ if (!mReading) {
+ throw new IllegalStateException("Non-existent read succeeded.");
+ }
+ if (lastChunk && mLength >= 0) {
+ throw new IllegalArgumentException(
+ "Non-chunked upload can't have last chunk");
+ }
+ int bytesRead = mByteBuffer.position();
+
+ mByteBuffer = null;
+ mReading = false;
+
+ destroyDelegateIfPostponed();
+ // Request may been canceled already.
+ if (mUploadDataStreamDelegate == 0) {
+ return;
+ }
+ nativeOnReadSucceeded(mUploadDataStreamDelegate, bytesRead,
+ lastChunk);
+ }
+ }
+
+ @Override
+ public void onReadError(Exception exception) {
+ synchronized (mLock) {
+ if (!mReading) {
+ throw new IllegalStateException("Non-existent read failed.");
+ }
+ onError(exception);
+ }
+ }
+
+ @Override
+ public void onRewindSucceeded() {
+ synchronized (mLock) {
+ if (!mRewinding) {
+ throw new IllegalStateException(
+ "Non-existent rewind succeeded.");
+ }
+ mRewinding = false;
+ // Request may been canceled already.
+ if (mUploadDataStreamDelegate == 0) {
+ return;
+ }
+ nativeOnRewindSucceeded(mUploadDataStreamDelegate);
+ }
+ }
+
+ @Override
+ public void onRewindError(Exception exception) {
+ synchronized (mLock) {
+ if (!mRewinding) {
+ throw new IllegalStateException("Non-existent rewind failed.");
+ }
+ onError(exception);
+ }
+ }
+
+ /**
+ * The delegate is owned by the CronetUploadDataStream, so it can be
+ * destroyed safely when there is no pending read; however, destruction is
+ * initiated by the destruction of the native UploadDataStream.
+ */
+ private void destroyDelegate() {
+ synchronized (mLock) {
+ if (mReading) {
+ // Wait for the read to complete before destroy the delegate.
+ mDestroyDelegatePostponed = true;
+ return;
+ }
+ if (mUploadDataStreamDelegate == 0) {
+ return;
+ }
+ nativeDestroyDelegate(mUploadDataStreamDelegate);
+ mUploadDataStreamDelegate = 0;
+ }
+ }
+
+ /**
+ * Destroy the native delegate if the destruction is postponed due to a
+ * pending read, which has since completed. Caller needs to be on executor
+ * thread.
+ */
+ private void destroyDelegateIfPostponed() {
+ synchronized (mLock) {
+ if (mReading) {
+ throw new IllegalStateException(
+ "Method should not be called when read has not completed.");
+ }
+ if (mDestroyDelegatePostponed) {
+ destroyDelegate();
+ }
+ }
+ }
+
+ /**
+ * Creates native objects and attaches them to the underlying request
+ * adapter object.
+ * TODO(mmenke): If more types of native upload streams are needed, create
+ * an interface with just this method, to minimize CronetURLRequest's
+ * dependencies on each upload stream type.
+ */
+ void attachToRequest(CronetUrlRequest request, long requestAdapter) {
+ if (mLength < 0) {
+ // TODO(mmenke): Add tests and remove this line.
+ throw new IllegalArgumentException(
+ "Chunked uploads not supported.");
+ }
+ mRequest = request;
+ mUploadDataStreamDelegate =
+ nativeAttachUploadDataToRequest(requestAdapter, mLength);
+ }
+
+ /**
+ * Creates a native UploadDataStreamDelegate and UploadDataStreamAdapter
+ * for testing.
+ * @return the address of the native CronetUploadDataStreamAdapter object
+ */
+ public long createAdapterForTesting() {
+ mUploadDataStreamDelegate = nativeCreateDelegateForTesting();
+ return nativeCreateAdapterForTesting(mLength, mUploadDataStreamDelegate);
+ }
+
+ // Native methods are implemented in upload_data_stream_adapter.cc.
+
+ private native long nativeAttachUploadDataToRequest(long urlRequestAdapter,
+ long length);
+
+ private native long nativeCreateDelegateForTesting();
+
+ private native long nativeCreateAdapterForTesting(long length,
+ long delegate);
+
+ private native void nativeOnReadSucceeded(long uploadDataStreamDelegate,
+ int bytesRead, boolean finalChunk);
+
+ private native void nativeOnRewindSucceeded(long uploadDataStreamDelegate);
+
+ private static native void nativeDestroyDelegate(
+ long uploadDataStreamDelegate);
+}

Powered by Google App Engine
This is Rietveld 408576698