Chromium Code Reviews| Index: components/cronet/android/java/src/org/chromium/net/CronetUploadDataStream.java |
| diff --git a/components/cronet/android/java/src/org/chromium/net/CronetUploadDataStream.java b/components/cronet/android/java/src/org/chromium/net/CronetUploadDataStream.java |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..90e191b7bb6a99c7fb4822faad132431b951e77b |
| --- /dev/null |
| +++ b/components/cronet/android/java/src/org/chromium/net/CronetUploadDataStream.java |
| @@ -0,0 +1,300 @@ |
| +// Copyright 2015 The Chromium Authors. All rights reserved. |
| +// Use of this source code is governed by a BSD-style license that can be |
| +// found in the LICENSE file. |
| + |
| +package org.chromium.net; |
| + |
| +import org.chromium.base.CalledByNative; |
| +import org.chromium.base.JNINamespace; |
| + |
| +import java.nio.ByteBuffer; |
| +import java.util.concurrent.Executor; |
| + |
| +/** |
| + * Pass an upload body to a UrlRequest using an UploadDataProvider. |
| + */ |
| +@JNINamespace("cronet") |
| +public class CronetUploadDataStream implements UploadDataSink { |
|
pauljensen
2015/02/12 17:15:40
Why is this public?
Should it be final like Cronet
xunjieli
2015/02/12 20:56:27
I think it is public is because we used it in test
xunjieli
2015/02/13 21:58:34
Alright, my renaming CL is landed. I have rebased,
|
| + // These are never changed, once a request starts. |
| + private final Executor mExecutor; |
| + private final UploadDataProvider mDataProvider; |
| + private final long mLength; |
| + private CronetUrlRequest mRequest; |
| + |
| + // Reusable read task, to reduce redundant memory allocation. |
| + private final Runnable mReadTask = new Runnable() { |
| + @Override |
| + public void run() { |
| + synchronized (mLock) { |
| + if (mReading || mRewinding || mByteBuffer == null |
| + || mUploadDataStreamDelegate == 0) { |
| + throw new IllegalStateException( |
| + "Unexpected readData call."); |
| + } |
| + mReading = true; |
| + } |
| + |
| + try { |
| + mDataProvider.read(CronetUploadDataStream.this, mByteBuffer); |
| + } catch (Exception exception) { |
| + onError(exception); |
| + } |
| + } |
| + }; |
| + |
| + // ByteBuffer created in the native code and is passed to |
| + // UploadDataProvider for reading. It is only valid from the |
| + // call to mDataProvider.read until onError or onReadSucceeded. |
| + private ByteBuffer mByteBuffer = null; |
| + |
| + // Lock that protects all subsequent variables. The delegate has to be |
| + // protected to ensure safe shutdown, mReading and mRewinding are protected |
| + // to robustly detect getting read/rewind results more often than expected. |
| + private final Object mLock = new Object(); |
| + |
| + // Native adapter delegate object, owned by the CronetUploadDataStream. |
| + // It's only deleted after the native UploadDataStream object is destroyed. |
| + // All access to the delegate is synchronized, for safe usage and cleanup. |
| + private long mUploadDataStreamDelegate = 0; |
| + |
| + private boolean mReading = false; |
| + private boolean mRewinding = false; |
| + private boolean mDestroyDelegatePostponed = false; |
| + |
| + /** |
| + * Constructs a CronetUploadDataStream. |
| + * @param dataProvider the UploadDataProvider to read data from. |
| + * @param executor the Executor to execute UploadDataProvider tasks. |
| + */ |
| + public CronetUploadDataStream(UploadDataProvider dataProvider, |
| + Executor executor) { |
| + mExecutor = executor; |
| + mDataProvider = dataProvider; |
| + mLength = mDataProvider.getLength(); |
| + } |
| + |
| + /** |
| + * Called by native code to make the UploadDataProvider read data into |
| + * {@code byteBuffer}. |
| + */ |
| + @SuppressWarnings("unused") |
| + @CalledByNative |
| + void readData(ByteBuffer byteBuffer) { |
| + mByteBuffer = byteBuffer; |
| + mExecutor.execute(mReadTask); |
| + } |
| + |
| + // TODO(mmenke): Consider implementing a cancel method. |
| + // currently wait for any pending read to complete. |
| + |
| + /** |
| + * Called by native code to make the UploadDataProvider rewind upload data. |
| + */ |
| + @SuppressWarnings("unused") |
| + @CalledByNative |
| + void rewind() { |
| + Runnable task = new Runnable() { |
| + @Override |
| + public void run() { |
| + synchronized (mLock) { |
| + if (mReading || mRewinding |
| + || mUploadDataStreamDelegate == 0) { |
| + throw new IllegalStateException( |
| + "Unexpected rewind call."); |
| + } |
| + mRewinding = true; |
| + } |
|
pauljensen
2015/02/12 17:15:40
nit: you have a new line on line 36 but not here.
xunjieli
2015/02/12 20:56:27
Done.
|
| + try { |
| + mDataProvider.rewind(CronetUploadDataStream.this); |
| + } catch (Exception exception) { |
| + onError(exception); |
| + } |
| + } |
| + }; |
| + mExecutor.execute(task); |
| + } |
| + |
| + /** |
| + * Called by native code to destroy the native adapter delegate, when the |
| + * adapter is destroyed. |
| + */ |
| + @SuppressWarnings("unused") |
| + @CalledByNative |
| + void onAdapterDestroyed() { |
| + Runnable task = new Runnable() { |
| + @Override |
| + public void run() { |
| + destroyDelegate(); |
| + } |
| + }; |
| + |
| + mExecutor.execute(task); |
| + } |
| + |
| + /** |
| + * Helper method called when an exception occurred. This method resets |
| + * states and propagates the error to the request. |
| + */ |
| + private void onError(Exception exception) { |
| + synchronized (mLock) { |
| + if (!mReading && !mRewinding) { |
| + throw new IllegalStateException( |
| + "There is no read or rewind in progress."); |
| + } |
| + mReading = false; |
| + mRewinding = false; |
| + mByteBuffer = null; |
| + destroyDelegateIfPostponed(); |
| + } |
| + |
| + // Just fail the request - simpler to fail directly, and |
| + // UploadDataStream only supports failing during initialization, not |
| + // while reading. This should be safe, even if we deleted the adapter, |
| + // because in that case, the request has already been cancelled. |
| + mRequest.onUploadException(exception); |
| + } |
| + |
| + @Override |
| + public void onReadSucceeded(boolean lastChunk) { |
| + synchronized (mLock) { |
| + if (!mReading) { |
| + throw new IllegalStateException("Non-existent read succeeded."); |
| + } |
| + if (lastChunk && mLength >= 0) { |
| + throw new IllegalArgumentException( |
| + "Non-chunked upload can't have last chunk"); |
| + } |
| + int bytesRead = mByteBuffer.position(); |
| + |
| + mByteBuffer = null; |
| + mReading = false; |
| + |
| + destroyDelegateIfPostponed(); |
| + // Request may been canceled already. |
| + if (mUploadDataStreamDelegate == 0) { |
| + return; |
| + } |
| + nativeOnReadSucceeded(mUploadDataStreamDelegate, bytesRead, |
| + lastChunk); |
| + } |
| + } |
| + |
| + @Override |
| + public void onReadError(Exception exception) { |
| + synchronized (mLock) { |
| + if (!mReading) { |
| + throw new IllegalStateException("Non-existent read failed."); |
| + } |
| + onError(exception); |
| + } |
| + } |
| + |
| + @Override |
| + public void onRewindSucceeded() { |
| + synchronized (mLock) { |
| + if (!mRewinding) { |
| + throw new IllegalStateException( |
| + "Non-existent rewind succeeded."); |
| + } |
| + mRewinding = false; |
| + // Request may been canceled already. |
| + if (mUploadDataStreamDelegate == 0) { |
| + return; |
| + } |
| + nativeOnRewindSucceeded(mUploadDataStreamDelegate); |
| + } |
| + } |
| + |
| + @Override |
| + public void onRewindError(Exception exception) { |
| + synchronized (mLock) { |
| + if (!mRewinding) { |
| + throw new IllegalStateException("Non-existent rewind failed."); |
| + } |
| + onError(exception); |
| + } |
| + } |
| + |
| + /** |
| + * The delegate is owned by the CronetUploadDataStream, so it can be |
| + * destroyed safely when there is no pending read; however, destruction is |
| + * initiated by the destruction of the native UploadDataStream. |
| + */ |
| + private void destroyDelegate() { |
| + synchronized (mLock) { |
| + if (mReading) { |
| + // Wait for the read to complete before destroy the delegate. |
| + mDestroyDelegatePostponed = true; |
| + return; |
| + } |
| + if (mUploadDataStreamDelegate == 0) { |
| + return; |
| + } |
| + nativeDestroyDelegate(mUploadDataStreamDelegate); |
| + mUploadDataStreamDelegate = 0; |
| + } |
| + } |
| + |
| + /** |
| + * Destroy the native delegate if the destruction is postponed due to a |
| + * pending read, which has since completed. Caller needs to be on executor |
| + * thread. |
| + */ |
| + private void destroyDelegateIfPostponed() { |
| + synchronized (mLock) { |
| + if (mReading) { |
| + throw new IllegalStateException( |
| + "Method should not be called when read has not completed."); |
| + } |
| + if (mDestroyDelegatePostponed) { |
| + destroyDelegate(); |
| + } |
| + } |
| + } |
| + |
| + /** |
| + * Creates native objects and attaches them to the underlying request |
| + * adapter object. |
| + * TODO(mmenke): If more types of native upload streams are needed, create |
| + * an interface with just this method, to minimize CronetURLRequest's |
| + * dependencies on each upload stream type. |
| + */ |
| + void attachToRequest(CronetUrlRequest request, long requestAdapter) { |
| + if (mLength < 0) { |
| + // TODO(mmenke): Add tests and remove this line. |
| + throw new IllegalArgumentException( |
| + "Chunked uploads not supported."); |
| + } |
| + mRequest = request; |
| + mUploadDataStreamDelegate = |
| + nativeAttachUploadDataToRequest(requestAdapter, mLength); |
| + } |
| + |
| + /** |
| + * Creates a native UploadDataStreamDelegate and UploadDataStreamAdapter |
| + * for testing. |
| + * @return the address of the native CronetUploadDataStreamAdapter object |
| + */ |
| + public long createAdapterForTesting() { |
| + mUploadDataStreamDelegate = nativeCreateDelegateForTesting(); |
| + return nativeCreateAdapterForTesting(mLength, mUploadDataStreamDelegate); |
| + } |
| + |
| + // Native methods are implemented in upload_data_stream_adapter.cc. |
| + |
| + private native long nativeAttachUploadDataToRequest(long urlRequestAdapter, |
| + long length); |
| + |
| + private native long nativeCreateDelegateForTesting(); |
| + |
| + private native long nativeCreateAdapterForTesting(long length, |
| + long delegate); |
| + |
| + private native void nativeOnReadSucceeded(long uploadDataStreamDelegate, |
| + int bytesRead, boolean finalChunk); |
| + |
| + private native void nativeOnRewindSucceeded(long uploadDataStreamDelegate); |
| + |
| + private static native void nativeDestroyDelegate( |
| + long uploadDataStreamDelegate); |
| +} |