Chromium Code Reviews| Index: components/cronet/android/java/src/org/chromium/net/CronetUploadDataStream.java |
| diff --git a/components/cronet/android/java/src/org/chromium/net/CronetUploadDataStream.java b/components/cronet/android/java/src/org/chromium/net/CronetUploadDataStream.java |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..d16e2cec4b9922ad99d926312c4caffc27cfb76a |
| --- /dev/null |
| +++ b/components/cronet/android/java/src/org/chromium/net/CronetUploadDataStream.java |
| @@ -0,0 +1,286 @@ |
| +// Copyright 2015 The Chromium Authors. All rights reserved. |
| +// Use of this source code is governed by a BSD-style license that can be |
| +// found in the LICENSE file. |
| + |
| +package org.chromium.net; |
| + |
| +import org.chromium.base.CalledByNative; |
| +import org.chromium.base.JNINamespace; |
| + |
| +import java.nio.ByteBuffer; |
| +import java.util.concurrent.Executor; |
| + |
| +/** |
| + * Pass an upload body to a UrlRequest using an UploadDataProvider. |
| + */ |
| +@JNINamespace("cronet") |
| +public class CronetUploadDataStream implements UploadDataSink { |
| + // These are never changed, once a request starts. |
| + private final Executor mExecutor; |
| + private final UploadDataProvider mDataProvider; |
| + private final long mLength; |
| + private CronetUrlRequest mRequest; |
| + |
| + // Reusable read task, to reduce redundant memory allocation. |
| + private final Runnable mReadTask = new Runnable() { |
| + @Override |
| + public void run() { |
| + synchronized (mLock) { |
| + if (mReading || mRewinding || mByteBuffer == null |
| + || mUploadDataStreamDelegate == 0) { |
| + throw new IllegalStateException( |
| + "Unexpected readData call."); |
| + } |
| + mReading = true; |
| + } |
| + |
| + try { |
| + mDataProvider.read(CronetUploadDataStream.this, mByteBuffer); |
| + } catch (Exception exception) { |
| + onError(exception); |
| + } |
| + } |
| + }; |
| + |
| + private ByteBuffer mByteBuffer = null; |
| + |
| + // Lock that protects all subsequent variables. The delegate has to be |
| + // protected to ensure safe shutdown, mReading and mRewinding are protected |
| + // to robustly detect getting read/rewind results more often than expected. |
| + private final Object mLock = new Object(); |
| + |
| + // Native adapter object, owned by the CronetUploadDataStream. It's only |
| + // deleted after the native UploadDataStream object is destroyed. All |
| + // access to the adapter is synchronized, for safe usage and cleanup. |
| + private long mUploadDataStreamDelegate; |
| + |
| + private boolean mReading = false; |
| + private boolean mRewinding = false; |
| + private boolean mDestroyAdapterPostponed = false; |
| + |
| + public CronetUploadDataStream(UploadDataProvider dataProvider, |
| + Executor executor) { |
| + mExecutor = executor; |
| + mDataProvider = dataProvider; |
| + mLength = mDataProvider.getLength(); |
|
pauljensen
2015/02/05 15:53:39
While using the upload API I found it rather unexp
xunjieli
2015/02/05 18:11:29
I tried doing #1. but I think getLength() is nice
|
| + } |
| + |
| + @SuppressWarnings("unused") |
| + @CalledByNative |
| + void readData(ByteBuffer byteBuffer) { |
| + mByteBuffer = byteBuffer; |
| + try { |
| + mExecutor.execute(mReadTask); |
| + } catch (Exception exception) { |
| + onError(exception); |
| + } |
| + } |
| + |
| + // TODO(mmenke): Consider implementing a cancel method. |
| + // currently wait for any pending read to complete. |
| + |
| + @SuppressWarnings("unused") |
| + @CalledByNative |
| + void rewind() { |
| + Runnable task = new Runnable() { |
| + @Override |
| + public void run() { |
| + synchronized (mLock) { |
| + if (mReading || mRewinding |
| + || mUploadDataStreamDelegate == 0) { |
| + throw new IllegalStateException( |
| + "Unexpected rewind call."); |
| + } |
| + mRewinding = true; |
| + try { |
| + mDataProvider.rewind(CronetUploadDataStream.this); |
| + } catch (Exception exception) { |
| + onError(exception); |
| + } |
| + } |
| + } |
| + }; |
| + |
| + mExecutor.execute(task); |
| + } |
| + |
| + @SuppressWarnings("unused") |
| + @CalledByNative |
| + void onAdapterDestroyed() { |
| + Runnable task = new Runnable() { |
| + @Override |
| + public void run() { |
| + destroyAdapter(); |
| + } |
| + }; |
| + |
| + mExecutor.execute(task); |
| + } |
| + |
| + private void onError(Exception exception) { |
| + synchronized (mLock) { |
| + if (!mReading && !mRewinding) { |
| + throw new IllegalStateException( |
| + "There is no read or rewind in progress."); |
| + } |
| + mReading = false; |
| + mRewinding = false; |
| + mByteBuffer = null; |
| + destroyAdapterIfPostponed(); |
| + } |
| + |
| + // Just fail the request - simpler to fail directly, and |
| + // UploadDataStream only supports failing during initialization, not |
| + // while reading. This should be safe, even if we deleted the adapter, |
| + // because in that case, the request has already been cancelled. |
| + mRequest.onUploadException(exception); |
| + } |
| + |
| + @Override |
| + public void onReadSucceeded(boolean lastChunk) { |
| + synchronized (mLock) { |
| + if (!mReading) { |
| + throw new IllegalStateException("Non-existent read succeeded."); |
| + } |
| + if (lastChunk && mLength >= 0) { |
| + throw new IllegalArgumentException( |
| + "Non-chunked upload can't have last chunk"); |
| + } |
| + int position = mByteBuffer.position(); |
| + int limit = mByteBuffer.limit(); |
|
mmenke
2015/02/05 15:52:29
Don't need the limit any more, and suggest renamin
xunjieli
2015/02/05 18:11:29
Done.
|
| + |
| + mByteBuffer = null; |
| + mReading = false; |
| + |
| + destroyAdapterIfPostponed(); |
| + // Request may been canceled already. |
| + if (mUploadDataStreamDelegate == 0) { |
| + return; |
| + } |
| + nativeOnReadSucceeded(mUploadDataStreamDelegate, position, limit, |
| + lastChunk); |
| + } |
| + } |
| + |
| + public void onReadError(Exception exception) { |
| + synchronized (mLock) { |
| + if (!mReading) { |
| + throw new IllegalStateException("Non-existent read failed."); |
| + } |
| + // Request may been canceled already. |
| + if (mUploadDataStreamDelegate == 0) { |
| + return; |
| + } |
| + onError(exception); |
| + } |
| + } |
| + |
| + public void onRewindSucceeded() { |
| + synchronized (mLock) { |
| + if (!mRewinding) { |
| + throw new IllegalStateException( |
| + "Non-existent rewind succeeded."); |
| + } |
| + mRewinding = false; |
| + // Request may been canceled already. |
| + if (mUploadDataStreamDelegate == 0) { |
| + return; |
| + } |
| + nativeOnRewindSucceeded(mUploadDataStreamDelegate); |
| + } |
| + } |
| + |
| + public void onRewindError(Exception exception) { |
| + synchronized (mLock) { |
| + if (!mRewinding) { |
| + throw new IllegalStateException("Non-existent rewind failed."); |
| + } |
| + // Request may been canceled already. |
| + if (mUploadDataStreamDelegate == 0) { |
| + return; |
| + } |
| + onError(exception); |
| + } |
| + } |
| + |
| + /** |
| + * The adapter is owned by the CronetUploadDataStream, so it can be |
| + * destroyed safely when there is no pending read; however, destruction is |
| + * initiated by the destruction of the native UploadDataStream. |
| + */ |
| + private void destroyAdapter() { |
| + synchronized (mLock) { |
| + if (mReading) { |
| + // Wait for the read to complete before destroy the adapter. |
| + mDestroyAdapterPostponed = true; |
| + return; |
| + } |
| + if (mUploadDataStreamDelegate == 0) { |
| + return; |
| + } |
| + nativeDestroyDelegate(mUploadDataStreamDelegate); |
| + mUploadDataStreamDelegate = 0; |
| + } |
| + } |
| + |
| + /** |
| + * Destroy the native adapter if the destruction is postponed due to a |
| + * pending read, which has since completed. Caller needs to be on executor |
| + * thread and acquire mLock. |
| + */ |
| + private void destroyAdapterIfPostponed() { |
| + if (mReading) { |
| + throw new IllegalStateException( |
| + "Method should not be called when read has not completed."); |
| + } |
| + if (mDestroyAdapterPostponed) { |
| + destroyAdapter(); |
| + } |
| + } |
| + |
| + /** |
| + * Creates native objects and attaches them to the underlying request |
| + * adapter object. |
| + * TODO(mmenke): If more types of native upload streams are needed, create |
| + * an interface with just this method, to minimize CronetURLRequest's |
| + * dependencies on each upload stream type. |
| + */ |
| + void attachToRequest(CronetUrlRequest request, long requestAdapter) { |
| + if (mLength < 0) { |
| + // TODO(mmenke): Add tests and remove this line. |
| + throw new IllegalArgumentException( |
| + "Chunked uploads not supported."); |
| + } |
| + mRequest = request; |
| + mUploadDataStreamDelegate = |
| + nativeAttachUploadDataToRequest(requestAdapter, mLength); |
| + } |
| + |
| + /** |
| + * Creates a native UploadDataStreamDelegate and UploadDataStreamAdapter |
| + * for testing. |
| + * @return the address of the native CronetUploadDataStreamAdapter object |
| + */ |
| + public long createAdapterForTesting() { |
| + mUploadDataStreamDelegate = nativeCreateDelegateForTesting(); |
| + return nativeCreateAdapterForTesting(mLength, mUploadDataStreamDelegate); |
| + } |
| + |
| + // Native methods are implemented in upload_data_stream_adapter.cc. |
| + |
| + private native long nativeAttachUploadDataToRequest(long urlRequestAdapter, |
| + long length); |
| + |
| + private native long nativeCreateDelegateForTesting(); |
| + |
| + private native long nativeCreateAdapterForTesting(long length, |
| + long delegate); |
| + |
| + private native void nativeOnReadSucceeded(long uploadDataStreamDelegate, |
| + int position, int limit, boolean finalChunk); |
| + |
| + private native void nativeOnRewindSucceeded(long uploadDataStreamDelegate); |
| + |
| + private static native void nativeDestroyDelegate( |
| + long uploadDataStreamDelegate); |
| +} |