Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(681)

Side by Side Diff: components/cronet/android/api/src/org/chromium/net/JavaUrlRequest.java

Issue 2283243002: Allow direct executors in cronet. (Closed)
Patch Set: Created 4 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package org.chromium.net; 5 package org.chromium.net;
6 6
7 import android.annotation.TargetApi; 7 import android.annotation.TargetApi;
8 import android.net.TrafficStats; 8 import android.net.TrafficStats;
9 import android.os.Build; 9 import android.os.Build;
10 import android.util.Log; 10 import android.util.Log;
(...skipping 38 matching lines...) Expand 10 before | Expand all | Expand 10 after
49 /** 49 /**
50 * This is the source of thread safety in this class - no other synchronizat ion is performed. 50 * This is the source of thread safety in this class - no other synchronizat ion is performed.
51 * By compare-and-swapping from one state to another, we guarantee that oper ations aren't 51 * By compare-and-swapping from one state to another, we guarantee that oper ations aren't
52 * running concurrently. Only the winner of a CAS proceeds. 52 * running concurrently. Only the winner of a CAS proceeds.
53 * 53 *
54 * <p>A caller can lose a CAS for three reasons - user error (two calls to r ead() without 54 * <p>A caller can lose a CAS for three reasons - user error (two calls to r ead() without
55 * waiting for the read to succeed), runtime error (network code or user cod e throws an 55 * waiting for the read to succeed), runtime error (network code or user cod e throws an
56 * exception), or cancellation. 56 * exception), or cancellation.
57 */ 57 */
58 private final AtomicReference<State> mState = new AtomicReference<>(State.NO T_STARTED); 58 private final AtomicReference<State> mState = new AtomicReference<>(State.NO T_STARTED);
59 private final AtomicBoolean mUploadProviderClosed = new AtomicBoolean(false) ;
59 60
60 /** 61 /**
61 * Traffic stats tag to associate this requests' data use with. It's capture d when the request 62 * Traffic stats tag to associate this requests' data use with. It's capture d when the request
62 * is created, so that applications doing work on behalf of another app can correctly attribute 63 * is created, so that applications doing work on behalf of another app can correctly attribute
63 * that data use. 64 * that data use.
64 */ 65 */
65 private final int mTrafficStatsTag; 66 private final int mTrafficStatsTag;
67 private final boolean mAllowDirectExecutor;
66 68
67 /* These don't change with redirects */ 69 /* These don't change with redirects */
68 private String mInitialMethod; 70 private String mInitialMethod;
69 private UploadDataProvider mUploadDataProvider; 71 private UploadDataProvider mUploadDataProvider;
70 private Executor mUploadExecutor; 72 private Executor mUploadExecutor;
71 private AtomicBoolean mUploadProviderClosed = new AtomicBoolean(false);
72 73
73 /** 74 /**
74 * Holds a subset of StatusValues - {@link State#STARTED} can represent 75 * Holds a subset of StatusValues - {@link State#STARTED} can represent
75 * {@link Status#SENDING_REQUEST} or {@link Status#WAITING_FOR_RESPONSE}. Wh ile the distinction 76 * {@link Status#SENDING_REQUEST} or {@link Status#WAITING_FOR_RESPONSE}. Wh ile the distinction
76 * isn't needed to implement the logic in this class, it is needed to implem ent 77 * isn't needed to implement the logic in this class, it is needed to implem ent
77 * {@link #getStatus(StatusListener)}. 78 * {@link #getStatus(StatusListener)}.
78 * 79 *
79 * <p>Concurrency notes - this value is not atomically updated with mState, so there is some 80 * <p>Concurrency notes - this value is not atomically updated with mState, so there is some
80 * risk that we'd get an inconsistent snapshot of both - however, it also ha ppens that this 81 * risk that we'd get an inconsistent snapshot of both - however, it also ha ppens that this
81 * value is only used with the STARTED state, so it's inconsequential. 82 * value is only used with the STARTED state, so it's inconsequential.
(...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after
113 ERROR, 114 ERROR,
114 COMPLETE, 115 COMPLETE,
115 CANCELLED, 116 CANCELLED,
116 } 117 }
117 118
118 /** 119 /**
119 * @param executor The executor used for reading and writing from sockets 120 * @param executor The executor used for reading and writing from sockets
120 * @param userExecutor The executor used to dispatch to {@code callback} 121 * @param userExecutor The executor used to dispatch to {@code callback}
121 */ 122 */
122 JavaUrlRequest(Callback callback, final Executor executor, Executor userExec utor, String url, 123 JavaUrlRequest(Callback callback, final Executor executor, Executor userExec utor, String url,
123 String userAgent) { 124 String userAgent, boolean allowDirectExecutor) {
125 this.mAllowDirectExecutor = allowDirectExecutor;
mef 2016/08/30 16:29:44 nit: move it to the end, below userAgent?
Charles 2016/08/30 20:11:06 This value gets read by AsyncUrlRequestCallback. M
124 if (url == null) { 126 if (url == null) {
125 throw new NullPointerException("URL is required"); 127 throw new NullPointerException("URL is required");
126 } 128 }
127 if (callback == null) { 129 if (callback == null) {
128 throw new NullPointerException("Listener is required"); 130 throw new NullPointerException("Listener is required");
129 } 131 }
130 if (executor == null) { 132 if (executor == null) {
131 throw new NullPointerException("Executor is required"); 133 throw new NullPointerException("Executor is required");
132 } 134 }
133 if (userExecutor == null) { 135 if (userExecutor == null) {
(...skipping 96 matching lines...) Expand 10 before | Expand all | Expand 10 after
230 } 232 }
231 if (!mRequestHeaders.containsKey("Content-Type")) { 233 if (!mRequestHeaders.containsKey("Content-Type")) {
232 throw new IllegalArgumentException( 234 throw new IllegalArgumentException(
233 "Requests with upload data must have a Content-Type."); 235 "Requests with upload data must have a Content-Type.");
234 } 236 }
235 checkNotStarted(); 237 checkNotStarted();
236 if (mInitialMethod == null) { 238 if (mInitialMethod == null) {
237 mInitialMethod = "POST"; 239 mInitialMethod = "POST";
238 } 240 }
239 this.mUploadDataProvider = uploadDataProvider; 241 this.mUploadDataProvider = uploadDataProvider;
240 this.mUploadExecutor = executor; 242 if (mAllowDirectExecutor) {
243 this.mUploadExecutor = executor;
244 } else {
245 this.mUploadExecutor = new DirectPreventingExecutor(executor);
246 }
241 } 247 }
242 248
243 private enum SinkState { 249 private enum SinkState {
244 AWAITING_READ_RESULT, 250 AWAITING_READ_RESULT,
245 AWAITING_REWIND_RESULT, 251 AWAITING_REWIND_RESULT,
246 UPLOADING, 252 UPLOADING,
247 NOT_STARTED, 253 NOT_STARTED,
248 } 254 }
249 255
250 private final class OutputStreamDataSink implements UploadDataSink { 256 private final class OutputStreamDataSink implements UploadDataSink {
(...skipping 41 matching lines...) Expand 10 before | Expand all | Expand 10 after
292 "Read upload data length %d exceeds expected len gth %d", 298 "Read upload data length %d exceeds expected len gth %d",
293 mWrittenBytes + mBuffer.remaining(), mTotalBytes ))); 299 mWrittenBytes + mBuffer.remaining(), mTotalBytes )));
294 return; 300 return;
295 } 301 }
296 while (mBuffer.hasRemaining()) { 302 while (mBuffer.hasRemaining()) {
297 mWrittenBytes += mOutputChannel.write(mBuffer); 303 mWrittenBytes += mOutputChannel.write(mBuffer);
298 } 304 }
299 if (mWrittenBytes < mTotalBytes || (mTotalBytes == -1 && !fi nalChunk)) { 305 if (mWrittenBytes < mTotalBytes || (mTotalBytes == -1 && !fi nalChunk)) {
300 mBuffer.clear(); 306 mBuffer.clear();
301 mSinkState.set(SinkState.AWAITING_READ_RESULT); 307 mSinkState.set(SinkState.AWAITING_READ_RESULT);
302 mUserExecutor.execute(uploadErrorSetting(new CheckedRunn able() { 308 executeOnUploadExecutor(new CheckedRunnable() {
303 @Override 309 @Override
304 public void run() throws Exception { 310 public void run() throws Exception {
305 mUploadProvider.read(OutputStreamDataSink.this, mBuffer); 311 mUploadProvider.read(OutputStreamDataSink.this, mBuffer);
306 } 312 }
307 })); 313 });
308 } else if (mTotalBytes == -1) { 314 } else if (mTotalBytes == -1) {
309 finish(); 315 finish();
310 } else if (mTotalBytes == mWrittenBytes) { 316 } else if (mTotalBytes == mWrittenBytes) {
311 finish(); 317 finish();
312 } else { 318 } else {
313 enterUploadErrorState(new IllegalArgumentException(Strin g.format( 319 enterUploadErrorState(new IllegalArgumentException(Strin g.format(
314 "Read upload data length %d exceeds expected len gth %d", 320 "Read upload data length %d exceeds expected len gth %d",
315 mWrittenBytes, mTotalBytes))); 321 mWrittenBytes, mTotalBytes)));
316 } 322 }
317 } 323 }
(...skipping 22 matching lines...) Expand all
340 mExecutor.execute(errorSetting(State.STARTED, new CheckedRunnable() { 346 mExecutor.execute(errorSetting(State.STARTED, new CheckedRunnable() {
341 @Override 347 @Override
342 public void run() throws Exception { 348 public void run() throws Exception {
343 if (mOutputChannel == null) { 349 if (mOutputChannel == null) {
344 mAdditionalStatusDetails = Status.CONNECTING; 350 mAdditionalStatusDetails = Status.CONNECTING;
345 mUrlConnection.connect(); 351 mUrlConnection.connect();
346 mAdditionalStatusDetails = Status.SENDING_REQUEST; 352 mAdditionalStatusDetails = Status.SENDING_REQUEST;
347 mOutputChannel = Channels.newChannel(mUrlConnection.getO utputStream()); 353 mOutputChannel = Channels.newChannel(mUrlConnection.getO utputStream());
348 } 354 }
349 mSinkState.set(SinkState.AWAITING_READ_RESULT); 355 mSinkState.set(SinkState.AWAITING_READ_RESULT);
350 mUserExecutor.execute(uploadErrorSetting(new CheckedRunnable () { 356 executeOnUploadExecutor(new CheckedRunnable() {
351 @Override 357 @Override
352 public void run() throws Exception { 358 public void run() throws Exception {
353 mUploadProvider.read(OutputStreamDataSink.this, mBuf fer); 359 mUploadProvider.read(OutputStreamDataSink.this, mBuf fer);
354 } 360 }
355 })); 361 });
356 } 362 }
357 })); 363 }));
358 } 364 }
359 365
366 private void executeOnUploadExecutor(CheckedRunnable runnable) {
367 try {
368 mUserExecutor.execute(uploadErrorSetting(runnable));
mef 2016/08/30 16:29:44 Should this execute on mUploadExecutor instead of
Charles 2016/08/30 20:11:05 In this lexical scope mUserExecutor is referring t
mef 2016/08/31 17:26:52 I see. I've missed the fact that this is part of O
Charles 2016/08/31 19:08:36 Done.
369 } catch (RejectedExecutionException e) {
370 enterUploadErrorState(e);
371 }
372 }
373
360 void finish() throws IOException { 374 void finish() throws IOException {
361 if (mOutputChannel != null) { 375 if (mOutputChannel != null) {
362 mOutputChannel.close(); 376 mOutputChannel.close();
363 } 377 }
364 fireGetHeaders(); 378 fireGetHeaders();
365 } 379 }
366 380
367 void start(final boolean firstTime) { 381 void start(final boolean firstTime) {
368 mUserExecutor.execute(uploadErrorSetting(new CheckedRunnable() { 382 executeOnUploadExecutor(new CheckedRunnable() {
369 @Override 383 @Override
370 public void run() throws Exception { 384 public void run() throws Exception {
371 mTotalBytes = mUploadProvider.getLength(); 385 mTotalBytes = mUploadProvider.getLength();
372 if (mTotalBytes == 0) { 386 if (mTotalBytes == 0) {
373 finish(); 387 finish();
374 } else { 388 } else {
375 // If we know how much data we have to upload, and it's small, we can save 389 // If we know how much data we have to upload, and it's small, we can save
376 // memory by allocating a reasonably sized buffer to rea d into. 390 // memory by allocating a reasonably sized buffer to rea d into.
377 if (mTotalBytes > 0 && mTotalBytes < DEFAULT_UPLOAD_BUFF ER_SIZE) { 391 if (mTotalBytes > 0 && mTotalBytes < DEFAULT_UPLOAD_BUFF ER_SIZE) {
378 // Allocate one byte more than necessary, to detect callers uploading 392 // Allocate one byte more than necessary, to detect callers uploading
(...skipping 15 matching lines...) Expand all
394 mUrlConnection.setChunkedStreamingMode(DEFAULT_CHUNK _LENGTH); 408 mUrlConnection.setChunkedStreamingMode(DEFAULT_CHUNK _LENGTH);
395 } 409 }
396 if (firstTime) { 410 if (firstTime) {
397 startRead(); 411 startRead();
398 } else { 412 } else {
399 mSinkState.set(SinkState.AWAITING_REWIND_RESULT); 413 mSinkState.set(SinkState.AWAITING_REWIND_RESULT);
400 mUploadProvider.rewind(OutputStreamDataSink.this); 414 mUploadProvider.rewind(OutputStreamDataSink.this);
401 } 415 }
402 } 416 }
403 } 417 }
404 })); 418 });
405 } 419 }
406 } 420 }
407 421
408 @Override 422 @Override
409 public void start() { 423 public void start() {
410 mAdditionalStatusDetails = Status.CONNECTING; 424 mAdditionalStatusDetails = Status.CONNECTING;
411 transitionStates(State.NOT_STARTED, State.STARTED, new Runnable() { 425 transitionStates(State.NOT_STARTED, State.STARTED, new Runnable() {
412 @Override 426 @Override
413 public void run() { 427 public void run() {
414 mUrlChain.add(mCurrentUrl); 428 mUrlChain.add(mCurrentUrl);
(...skipping 322 matching lines...) Expand 10 before | Expand all | Expand 10 after
737 throw new IllegalStateException("Switch is exhaustive: " + state ); 751 throw new IllegalStateException("Switch is exhaustive: " + state );
738 } 752 }
739 753
740 mCallbackAsync.sendStatus(listener, status); 754 mCallbackAsync.sendStatus(listener, status);
741 } 755 }
742 756
743 /** This wrapper ensures that callbacks are always called on the correct exe cutor */ 757 /** This wrapper ensures that callbacks are always called on the correct exe cutor */
744 private final class AsyncUrlRequestCallback { 758 private final class AsyncUrlRequestCallback {
745 final UrlRequest.Callback mCallback; 759 final UrlRequest.Callback mCallback;
746 final Executor mUserExecutor; 760 final Executor mUserExecutor;
761 final Executor mFallbackExecutor;
747 762
748 AsyncUrlRequestCallback(Callback callback, final Executor userExecutor) { 763 AsyncUrlRequestCallback(Callback callback, final Executor userExecutor) {
749 this.mCallback = callback; 764 this.mCallback = callback;
750 this.mUserExecutor = userExecutor; 765 if (mAllowDirectExecutor) {
766 this.mUserExecutor = userExecutor;
767 this.mFallbackExecutor = null;
768 } else {
769 mUserExecutor = new DirectPreventingExecutor(userExecutor);
770 mFallbackExecutor = userExecutor;
771 }
751 } 772 }
752 773
753 void sendStatus(final StatusListener listener, final int status) { 774 void sendStatus(final StatusListener listener, final int status) {
754 mUserExecutor.execute(new Runnable() { 775 mUserExecutor.execute(new Runnable() {
755 @Override 776 @Override
756 public void run() { 777 public void run() {
757 listener.onStatus(status); 778 listener.onStatus(status);
758 } 779 }
759 }); 780 });
760 } 781 }
761 782
762 void execute(State currentState, CheckedRunnable runnable) { 783 void execute(State currentState, CheckedRunnable runnable) {
763 try { 784 try {
764 mUserExecutor.execute(userErrorSetting(currentState, runnable)); 785 mUserExecutor.execute(userErrorSetting(currentState, runnable));
765 } catch (RejectedExecutionException e) { 786 } catch (RejectedExecutionException e) {
766 enterUserErrorState(currentState, e); 787 enterErrorState(currentState,
788 new UrlRequestException("Exception posting task to execu tor", e));
767 } 789 }
768 } 790 }
769 791
770 void onRedirectReceived(final UrlResponseInfo info, final String newLoca tionUrl) { 792 void onRedirectReceived(final UrlResponseInfo info, final String newLoca tionUrl) {
771 execute(State.AWAITING_FOLLOW_REDIRECT, new CheckedRunnable() { 793 execute(State.AWAITING_FOLLOW_REDIRECT, new CheckedRunnable() {
772 @Override 794 @Override
773 public void run() throws Exception { 795 public void run() throws Exception {
774 mCallback.onRedirectReceived(JavaUrlRequest.this, info, newL ocationUrl); 796 mCallback.onRedirectReceived(JavaUrlRequest.this, info, newL ocationUrl);
775 } 797 }
776 }); 798 });
777 } 799 }
778 800
779 void onResponseStarted(UrlResponseInfo info) { 801 void onResponseStarted(UrlResponseInfo info) {
780 execute(State.AWAITING_READ, new CheckedRunnable() { 802 if (mState.compareAndSet(State.STARTED, State.AWAITING_READ)) {
mef 2016/08/30 16:29:44 This is changing thread where mState is accessed f
Charles 2016/08/30 20:11:06 mState is safe from any thread, since it's an atom
mef 2016/08/31 17:26:52 Acknowledged.
781 @Override 803 execute(State.AWAITING_READ, new CheckedRunnable() {
782 public void run() throws Exception { 804 @Override
783 if (mState.compareAndSet(State.STARTED, State.AWAITING_READ) ) { 805 public void run() throws Exception {
784 mCallback.onResponseStarted(JavaUrlRequest.this, mUrlRes ponseInfo); 806 mCallback.onResponseStarted(JavaUrlRequest.this, mUrlRes ponseInfo);
785 } 807 }
786 } 808 });
787 }); 809 }
788 } 810 }
789 811
790 void onReadCompleted(final UrlResponseInfo info, final ByteBuffer byteBu ffer) { 812 void onReadCompleted(final UrlResponseInfo info, final ByteBuffer byteBu ffer) {
791 execute(State.AWAITING_READ, new CheckedRunnable() { 813 if (mState.compareAndSet(State.READING, State.AWAITING_READ)) {
mef 2016/08/30 16:29:44 ditto.
Charles 2016/08/30 20:11:05 This one too - we want a REE thrown when submittin
792 @Override 814 execute(State.AWAITING_READ, new CheckedRunnable() {
793 public void run() throws Exception { 815 @Override
794 if (mState.compareAndSet(State.READING, State.AWAITING_READ) ) { 816 public void run() throws Exception {
795 mCallback.onReadCompleted(JavaUrlRequest.this, info, byt eBuffer); 817 mCallback.onReadCompleted(JavaUrlRequest.this, info, byt eBuffer);
796 } 818 }
797 } 819 });
798 }); 820 }
799 } 821 }
800 822
801 void onCanceled(final UrlResponseInfo info) { 823 void onCanceled(final UrlResponseInfo info) {
802 closeQuietly(mResponseChannel); 824 closeQuietly(mResponseChannel);
803 mUserExecutor.execute(new Runnable() { 825 mUserExecutor.execute(new Runnable() {
804 @Override 826 @Override
805 public void run() { 827 public void run() {
806 try { 828 try {
807 mCallback.onCanceled(JavaUrlRequest.this, info); 829 mCallback.onCanceled(JavaUrlRequest.this, info);
808 } catch (Exception exception) { 830 } catch (Exception exception) {
(...skipping 11 matching lines...) Expand all
820 mCallback.onSucceeded(JavaUrlRequest.this, info); 842 mCallback.onSucceeded(JavaUrlRequest.this, info);
821 } catch (Exception exception) { 843 } catch (Exception exception) {
822 Log.e(TAG, "Exception in onSucceeded method", exception) ; 844 Log.e(TAG, "Exception in onSucceeded method", exception) ;
823 } 845 }
824 } 846 }
825 }); 847 });
826 } 848 }
827 849
828 void onFailed(final UrlResponseInfo urlResponseInfo, final UrlRequestExc eption e) { 850 void onFailed(final UrlResponseInfo urlResponseInfo, final UrlRequestExc eption e) {
829 closeQuietly(mResponseChannel); 851 closeQuietly(mResponseChannel);
830 mUserExecutor.execute(new Runnable() { 852 Runnable runnable = new Runnable() {
831 @Override 853 @Override
832 public void run() { 854 public void run() {
833 try { 855 try {
834 mCallback.onFailed(JavaUrlRequest.this, urlResponseInfo, e); 856 mCallback.onFailed(JavaUrlRequest.this, urlResponseInfo, e);
835 } catch (Exception exception) { 857 } catch (Exception exception) {
836 Log.e(TAG, "Exception in onFailed method", exception); 858 Log.e(TAG, "Exception in onFailed method", exception);
837 } 859 }
838 } 860 }
839 }); 861 };
862 try {
863 mUserExecutor.execute(runnable);
864 } catch (InlineExecutionProhibitedException wasDirect) {
865 if (mFallbackExecutor != null) {
866 mFallbackExecutor.execute(runnable);
867 }
868 }
840 } 869 }
841 } 870 }
842 871
843 private static void closeQuietly(Closeable closeable) { 872 private static void closeQuietly(Closeable closeable) {
844 if (closeable == null) { 873 if (closeable == null) {
845 return; 874 return;
846 } 875 }
847 try { 876 try {
848 closeable.close(); 877 closeable.close();
849 } catch (IOException e) { 878 } catch (IOException e) {
850 e.printStackTrace(); 879 e.printStackTrace();
851 } 880 }
852 } 881 }
853 } 882 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698