Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1390)

Side by Side Diff: components/cronet/android/api/src/org/chromium/net/JavaUrlRequest.java

Issue 2283243002: Allow direct executors in cronet. (Closed)
Patch Set: Improve thread checking Created 4 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package org.chromium.net; 5 package org.chromium.net;
6 6
7 import android.annotation.TargetApi; 7 import android.annotation.TargetApi;
8 import android.net.TrafficStats; 8 import android.net.TrafficStats;
9 import android.os.Build; 9 import android.os.Build;
10 import android.util.Log; 10 import android.util.Log;
(...skipping 38 matching lines...) Expand 10 before | Expand all | Expand 10 after
49 /** 49 /**
50 * This is the source of thread safety in this class - no other synchronizat ion is performed. 50 * This is the source of thread safety in this class - no other synchronizat ion is performed.
51 * By compare-and-swapping from one state to another, we guarantee that oper ations aren't 51 * By compare-and-swapping from one state to another, we guarantee that oper ations aren't
52 * running concurrently. Only the winner of a CAS proceeds. 52 * running concurrently. Only the winner of a CAS proceeds.
53 * 53 *
54 * <p>A caller can lose a CAS for three reasons - user error (two calls to r ead() without 54 * <p>A caller can lose a CAS for three reasons - user error (two calls to r ead() without
55 * waiting for the read to succeed), runtime error (network code or user cod e throws an 55 * waiting for the read to succeed), runtime error (network code or user cod e throws an
56 * exception), or cancellation. 56 * exception), or cancellation.
57 */ 57 */
58 private final AtomicReference<State> mState = new AtomicReference<>(State.NO T_STARTED); 58 private final AtomicReference<State> mState = new AtomicReference<>(State.NO T_STARTED);
59 private final AtomicBoolean mUploadProviderClosed = new AtomicBoolean(false) ;
59 60
60 /** 61 /**
61 * Traffic stats tag to associate this requests' data use with. It's capture d when the request 62 * Traffic stats tag to associate this requests' data use with. It's capture d when the request
62 * is created, so that applications doing work on behalf of another app can correctly attribute 63 * is created, so that applications doing work on behalf of another app can correctly attribute
63 * that data use. 64 * that data use.
64 */ 65 */
65 private final int mTrafficStatsTag; 66 private final int mTrafficStatsTag;
67 private final boolean mAllowDirectExecutor;
66 68
67 /* These don't change with redirects */ 69 /* These don't change with redirects */
68 private String mInitialMethod; 70 private String mInitialMethod;
69 private UploadDataProvider mUploadDataProvider; 71 private UploadDataProvider mUploadDataProvider;
70 private Executor mUploadExecutor; 72 private Executor mUploadExecutor;
71 private AtomicBoolean mUploadProviderClosed = new AtomicBoolean(false);
72 73
73 /** 74 /**
74 * Holds a subset of StatusValues - {@link State#STARTED} can represent 75 * Holds a subset of StatusValues - {@link State#STARTED} can represent
75 * {@link Status#SENDING_REQUEST} or {@link Status#WAITING_FOR_RESPONSE}. Wh ile the distinction 76 * {@link Status#SENDING_REQUEST} or {@link Status#WAITING_FOR_RESPONSE}. Wh ile the distinction
76 * isn't needed to implement the logic in this class, it is needed to implem ent 77 * isn't needed to implement the logic in this class, it is needed to implem ent
77 * {@link #getStatus(StatusListener)}. 78 * {@link #getStatus(StatusListener)}.
78 * 79 *
79 * <p>Concurrency notes - this value is not atomically updated with mState, so there is some 80 * <p>Concurrency notes - this value is not atomically updated with mState, so there is some
80 * risk that we'd get an inconsistent snapshot of both - however, it also ha ppens that this 81 * risk that we'd get an inconsistent snapshot of both - however, it also ha ppens that this
81 * value is only used with the STARTED state, so it's inconsequential. 82 * value is only used with the STARTED state, so it's inconsequential.
(...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after
113 ERROR, 114 ERROR,
114 COMPLETE, 115 COMPLETE,
115 CANCELLED, 116 CANCELLED,
116 } 117 }
117 118
118 /** 119 /**
119 * @param executor The executor used for reading and writing from sockets 120 * @param executor The executor used for reading and writing from sockets
120 * @param userExecutor The executor used to dispatch to {@code callback} 121 * @param userExecutor The executor used to dispatch to {@code callback}
121 */ 122 */
122 JavaUrlRequest(Callback callback, final Executor executor, Executor userExec utor, String url, 123 JavaUrlRequest(Callback callback, final Executor executor, Executor userExec utor, String url,
123 String userAgent) { 124 String userAgent, boolean allowDirectExecutor) {
124 if (url == null) { 125 if (url == null) {
125 throw new NullPointerException("URL is required"); 126 throw new NullPointerException("URL is required");
126 } 127 }
127 if (callback == null) { 128 if (callback == null) {
128 throw new NullPointerException("Listener is required"); 129 throw new NullPointerException("Listener is required");
129 } 130 }
130 if (executor == null) { 131 if (executor == null) {
131 throw new NullPointerException("Executor is required"); 132 throw new NullPointerException("Executor is required");
132 } 133 }
133 if (userExecutor == null) { 134 if (userExecutor == null) {
134 throw new NullPointerException("userExecutor is required"); 135 throw new NullPointerException("userExecutor is required");
135 } 136 }
137
138 this.mAllowDirectExecutor = allowDirectExecutor;
136 this.mCallbackAsync = new AsyncUrlRequestCallback(callback, userExecutor ); 139 this.mCallbackAsync = new AsyncUrlRequestCallback(callback, userExecutor );
137 this.mTrafficStatsTag = TrafficStats.getThreadStatsTag(); 140 this.mTrafficStatsTag = TrafficStats.getThreadStatsTag();
138 this.mExecutor = new Executor() { 141 this.mExecutor = new Executor() {
139 @Override 142 @Override
140 public void execute(final Runnable command) { 143 public void execute(final Runnable command) {
141 executor.execute(new Runnable() { 144 executor.execute(new Runnable() {
142 @Override 145 @Override
143 public void run() { 146 public void run() {
144 int oldTag = TrafficStats.getThreadStatsTag(); 147 int oldTag = TrafficStats.getThreadStatsTag();
145 TrafficStats.setThreadStatsTag(mTrafficStatsTag); 148 TrafficStats.setThreadStatsTag(mTrafficStatsTag);
(...skipping 84 matching lines...) Expand 10 before | Expand all | Expand 10 after
230 } 233 }
231 if (!mRequestHeaders.containsKey("Content-Type")) { 234 if (!mRequestHeaders.containsKey("Content-Type")) {
232 throw new IllegalArgumentException( 235 throw new IllegalArgumentException(
233 "Requests with upload data must have a Content-Type."); 236 "Requests with upload data must have a Content-Type.");
234 } 237 }
235 checkNotStarted(); 238 checkNotStarted();
236 if (mInitialMethod == null) { 239 if (mInitialMethod == null) {
237 mInitialMethod = "POST"; 240 mInitialMethod = "POST";
238 } 241 }
239 this.mUploadDataProvider = uploadDataProvider; 242 this.mUploadDataProvider = uploadDataProvider;
240 this.mUploadExecutor = executor; 243 if (mAllowDirectExecutor) {
244 this.mUploadExecutor = executor;
245 } else {
246 this.mUploadExecutor = new DirectPreventingExecutor(executor);
247 }
241 } 248 }
242 249
243 private enum SinkState { 250 private enum SinkState {
244 AWAITING_READ_RESULT, 251 AWAITING_READ_RESULT,
245 AWAITING_REWIND_RESULT, 252 AWAITING_REWIND_RESULT,
246 UPLOADING, 253 UPLOADING,
247 NOT_STARTED, 254 NOT_STARTED,
248 } 255 }
249 256
250 private final class OutputStreamDataSink implements UploadDataSink { 257 private final class OutputStreamDataSink implements UploadDataSink {
251 final AtomicReference<SinkState> mSinkState = new AtomicReference<>(Sink State.NOT_STARTED); 258 final AtomicReference<SinkState> mSinkState = new AtomicReference<>(Sink State.NOT_STARTED);
252 final Executor mUserExecutor; 259 final Executor mUserUploadExecutor;
253 final Executor mExecutor; 260 final Executor mExecutor;
254 final HttpURLConnection mUrlConnection; 261 final HttpURLConnection mUrlConnection;
255 WritableByteChannel mOutputChannel; 262 WritableByteChannel mOutputChannel;
256 final UploadDataProvider mUploadProvider; 263 final UploadDataProvider mUploadProvider;
257 ByteBuffer mBuffer; 264 ByteBuffer mBuffer;
258 /** This holds the total bytes to send (the content-length). -1 if unkno wn. */ 265 /** This holds the total bytes to send (the content-length). -1 if unkno wn. */
259 long mTotalBytes; 266 long mTotalBytes;
260 /** This holds the bytes written so far */ 267 /** This holds the bytes written so far */
261 long mWrittenBytes = 0; 268 long mWrittenBytes = 0;
262 269
263 OutputStreamDataSink(final Executor userExecutor, Executor executor, 270 OutputStreamDataSink(final Executor userExecutor, Executor executor,
264 HttpURLConnection urlConnection, UploadDataProvider provider) { 271 HttpURLConnection urlConnection, UploadDataProvider provider) {
265 this.mUserExecutor = new Executor() { 272 this.mUserUploadExecutor = new Executor() {
266 @Override 273 @Override
267 public void execute(Runnable runnable) { 274 public void execute(Runnable runnable) {
268 try { 275 try {
269 userExecutor.execute(runnable); 276 userExecutor.execute(runnable);
270 } catch (RejectedExecutionException e) { 277 } catch (RejectedExecutionException e) {
271 enterUploadErrorState(e); 278 enterUploadErrorState(e);
272 } 279 }
273 } 280 }
274 }; 281 };
275 this.mExecutor = executor; 282 this.mExecutor = executor;
(...skipping 16 matching lines...) Expand all
292 "Read upload data length %d exceeds expected len gth %d", 299 "Read upload data length %d exceeds expected len gth %d",
293 mWrittenBytes + mBuffer.remaining(), mTotalBytes ))); 300 mWrittenBytes + mBuffer.remaining(), mTotalBytes )));
294 return; 301 return;
295 } 302 }
296 while (mBuffer.hasRemaining()) { 303 while (mBuffer.hasRemaining()) {
297 mWrittenBytes += mOutputChannel.write(mBuffer); 304 mWrittenBytes += mOutputChannel.write(mBuffer);
298 } 305 }
299 if (mWrittenBytes < mTotalBytes || (mTotalBytes == -1 && !fi nalChunk)) { 306 if (mWrittenBytes < mTotalBytes || (mTotalBytes == -1 && !fi nalChunk)) {
300 mBuffer.clear(); 307 mBuffer.clear();
301 mSinkState.set(SinkState.AWAITING_READ_RESULT); 308 mSinkState.set(SinkState.AWAITING_READ_RESULT);
302 mUserExecutor.execute(uploadErrorSetting(new CheckedRunn able() { 309 executeOnUploadExecutor(new CheckedRunnable() {
303 @Override 310 @Override
304 public void run() throws Exception { 311 public void run() throws Exception {
305 mUploadProvider.read(OutputStreamDataSink.this, mBuffer); 312 mUploadProvider.read(OutputStreamDataSink.this, mBuffer);
306 } 313 }
307 })); 314 });
308 } else if (mTotalBytes == -1) { 315 } else if (mTotalBytes == -1) {
309 finish(); 316 finish();
310 } else if (mTotalBytes == mWrittenBytes) { 317 } else if (mTotalBytes == mWrittenBytes) {
311 finish(); 318 finish();
312 } else { 319 } else {
313 enterUploadErrorState(new IllegalArgumentException(Strin g.format( 320 enterUploadErrorState(new IllegalArgumentException(Strin g.format(
314 "Read upload data length %d exceeds expected len gth %d", 321 "Read upload data length %d exceeds expected len gth %d",
315 mWrittenBytes, mTotalBytes))); 322 mWrittenBytes, mTotalBytes)));
316 } 323 }
317 } 324 }
(...skipping 22 matching lines...) Expand all
340 mExecutor.execute(errorSetting(State.STARTED, new CheckedRunnable() { 347 mExecutor.execute(errorSetting(State.STARTED, new CheckedRunnable() {
341 @Override 348 @Override
342 public void run() throws Exception { 349 public void run() throws Exception {
343 if (mOutputChannel == null) { 350 if (mOutputChannel == null) {
344 mAdditionalStatusDetails = Status.CONNECTING; 351 mAdditionalStatusDetails = Status.CONNECTING;
345 mUrlConnection.connect(); 352 mUrlConnection.connect();
346 mAdditionalStatusDetails = Status.SENDING_REQUEST; 353 mAdditionalStatusDetails = Status.SENDING_REQUEST;
347 mOutputChannel = Channels.newChannel(mUrlConnection.getO utputStream()); 354 mOutputChannel = Channels.newChannel(mUrlConnection.getO utputStream());
348 } 355 }
349 mSinkState.set(SinkState.AWAITING_READ_RESULT); 356 mSinkState.set(SinkState.AWAITING_READ_RESULT);
350 mUserExecutor.execute(uploadErrorSetting(new CheckedRunnable () { 357 executeOnUploadExecutor(new CheckedRunnable() {
351 @Override 358 @Override
352 public void run() throws Exception { 359 public void run() throws Exception {
353 mUploadProvider.read(OutputStreamDataSink.this, mBuf fer); 360 mUploadProvider.read(OutputStreamDataSink.this, mBuf fer);
354 } 361 }
355 })); 362 });
356 } 363 }
357 })); 364 }));
358 } 365 }
359 366
367 private void executeOnUploadExecutor(CheckedRunnable runnable) {
368 try {
369 mUserUploadExecutor.execute(uploadErrorSetting(runnable));
370 } catch (RejectedExecutionException e) {
371 enterUploadErrorState(e);
372 }
373 }
374
360 void finish() throws IOException { 375 void finish() throws IOException {
361 if (mOutputChannel != null) { 376 if (mOutputChannel != null) {
362 mOutputChannel.close(); 377 mOutputChannel.close();
363 } 378 }
364 fireGetHeaders(); 379 fireGetHeaders();
365 } 380 }
366 381
367 void start(final boolean firstTime) { 382 void start(final boolean firstTime) {
368 mUserExecutor.execute(uploadErrorSetting(new CheckedRunnable() { 383 executeOnUploadExecutor(new CheckedRunnable() {
369 @Override 384 @Override
370 public void run() throws Exception { 385 public void run() throws Exception {
371 mTotalBytes = mUploadProvider.getLength(); 386 mTotalBytes = mUploadProvider.getLength();
372 if (mTotalBytes == 0) { 387 if (mTotalBytes == 0) {
373 finish(); 388 finish();
374 } else { 389 } else {
375 // If we know how much data we have to upload, and it's small, we can save 390 // If we know how much data we have to upload, and it's small, we can save
376 // memory by allocating a reasonably sized buffer to rea d into. 391 // memory by allocating a reasonably sized buffer to rea d into.
377 if (mTotalBytes > 0 && mTotalBytes < DEFAULT_UPLOAD_BUFF ER_SIZE) { 392 if (mTotalBytes > 0 && mTotalBytes < DEFAULT_UPLOAD_BUFF ER_SIZE) {
378 // Allocate one byte more than necessary, to detect callers uploading 393 // Allocate one byte more than necessary, to detect callers uploading
(...skipping 15 matching lines...) Expand all
394 mUrlConnection.setChunkedStreamingMode(DEFAULT_CHUNK _LENGTH); 409 mUrlConnection.setChunkedStreamingMode(DEFAULT_CHUNK _LENGTH);
395 } 410 }
396 if (firstTime) { 411 if (firstTime) {
397 startRead(); 412 startRead();
398 } else { 413 } else {
399 mSinkState.set(SinkState.AWAITING_REWIND_RESULT); 414 mSinkState.set(SinkState.AWAITING_REWIND_RESULT);
400 mUploadProvider.rewind(OutputStreamDataSink.this); 415 mUploadProvider.rewind(OutputStreamDataSink.this);
401 } 416 }
402 } 417 }
403 } 418 }
404 })); 419 });
405 } 420 }
406 } 421 }
407 422
408 @Override 423 @Override
409 public void start() { 424 public void start() {
410 mAdditionalStatusDetails = Status.CONNECTING; 425 mAdditionalStatusDetails = Status.CONNECTING;
411 transitionStates(State.NOT_STARTED, State.STARTED, new Runnable() { 426 transitionStates(State.NOT_STARTED, State.STARTED, new Runnable() {
412 @Override 427 @Override
413 public void run() { 428 public void run() {
414 mUrlChain.add(mCurrentUrl); 429 mUrlChain.add(mCurrentUrl);
(...skipping 322 matching lines...) Expand 10 before | Expand all | Expand 10 after
737 throw new IllegalStateException("Switch is exhaustive: " + state ); 752 throw new IllegalStateException("Switch is exhaustive: " + state );
738 } 753 }
739 754
740 mCallbackAsync.sendStatus(listener, status); 755 mCallbackAsync.sendStatus(listener, status);
741 } 756 }
742 757
743 /** This wrapper ensures that callbacks are always called on the correct exe cutor */ 758 /** This wrapper ensures that callbacks are always called on the correct exe cutor */
744 private final class AsyncUrlRequestCallback { 759 private final class AsyncUrlRequestCallback {
745 final UrlRequest.Callback mCallback; 760 final UrlRequest.Callback mCallback;
746 final Executor mUserExecutor; 761 final Executor mUserExecutor;
762 final Executor mFallbackExecutor;
747 763
748 AsyncUrlRequestCallback(Callback callback, final Executor userExecutor) { 764 AsyncUrlRequestCallback(Callback callback, final Executor userExecutor) {
749 this.mCallback = callback; 765 this.mCallback = callback;
750 this.mUserExecutor = userExecutor; 766 if (mAllowDirectExecutor) {
767 this.mUserExecutor = userExecutor;
768 this.mFallbackExecutor = null;
769 } else {
770 mUserExecutor = new DirectPreventingExecutor(userExecutor);
771 mFallbackExecutor = userExecutor;
772 }
751 } 773 }
752 774
753 void sendStatus(final StatusListener listener, final int status) { 775 void sendStatus(final StatusListener listener, final int status) {
754 mUserExecutor.execute(new Runnable() { 776 mUserExecutor.execute(new Runnable() {
755 @Override 777 @Override
756 public void run() { 778 public void run() {
757 listener.onStatus(status); 779 listener.onStatus(status);
758 } 780 }
759 }); 781 });
760 } 782 }
761 783
762 void execute(State currentState, CheckedRunnable runnable) { 784 void execute(State currentState, CheckedRunnable runnable) {
763 try { 785 try {
764 mUserExecutor.execute(userErrorSetting(currentState, runnable)); 786 mUserExecutor.execute(userErrorSetting(currentState, runnable));
765 } catch (RejectedExecutionException e) { 787 } catch (RejectedExecutionException e) {
766 enterUserErrorState(currentState, e); 788 enterErrorState(currentState,
789 new UrlRequestException("Exception posting task to execu tor", e));
767 } 790 }
768 } 791 }
769 792
770 void onRedirectReceived(final UrlResponseInfo info, final String newLoca tionUrl) { 793 void onRedirectReceived(final UrlResponseInfo info, final String newLoca tionUrl) {
771 execute(State.AWAITING_FOLLOW_REDIRECT, new CheckedRunnable() { 794 execute(State.AWAITING_FOLLOW_REDIRECT, new CheckedRunnable() {
772 @Override 795 @Override
773 public void run() throws Exception { 796 public void run() throws Exception {
774 mCallback.onRedirectReceived(JavaUrlRequest.this, info, newL ocationUrl); 797 mCallback.onRedirectReceived(JavaUrlRequest.this, info, newL ocationUrl);
775 } 798 }
776 }); 799 });
777 } 800 }
778 801
779 void onResponseStarted(UrlResponseInfo info) { 802 void onResponseStarted(UrlResponseInfo info) {
780 execute(State.AWAITING_READ, new CheckedRunnable() { 803 if (mState.compareAndSet(State.STARTED, State.AWAITING_READ)) {
781 @Override 804 execute(State.AWAITING_READ, new CheckedRunnable() {
782 public void run() throws Exception { 805 @Override
783 if (mState.compareAndSet(State.STARTED, State.AWAITING_READ) ) { 806 public void run() throws Exception {
784 mCallback.onResponseStarted(JavaUrlRequest.this, mUrlRes ponseInfo); 807 mCallback.onResponseStarted(JavaUrlRequest.this, mUrlRes ponseInfo);
785 } 808 }
786 } 809 });
787 }); 810 }
788 } 811 }
789 812
790 void onReadCompleted(final UrlResponseInfo info, final ByteBuffer byteBu ffer) { 813 void onReadCompleted(final UrlResponseInfo info, final ByteBuffer byteBu ffer) {
791 execute(State.AWAITING_READ, new CheckedRunnable() { 814 if (mState.compareAndSet(State.READING, State.AWAITING_READ)) {
792 @Override 815 execute(State.AWAITING_READ, new CheckedRunnable() {
793 public void run() throws Exception { 816 @Override
794 if (mState.compareAndSet(State.READING, State.AWAITING_READ) ) { 817 public void run() throws Exception {
795 mCallback.onReadCompleted(JavaUrlRequest.this, info, byt eBuffer); 818 mCallback.onReadCompleted(JavaUrlRequest.this, info, byt eBuffer);
796 } 819 }
797 } 820 });
798 }); 821 }
799 } 822 }
800 823
801 void onCanceled(final UrlResponseInfo info) { 824 void onCanceled(final UrlResponseInfo info) {
802 closeResponseChannel(); 825 closeResponseChannel();
803 mUserExecutor.execute(new Runnable() { 826 mUserExecutor.execute(new Runnable() {
804 @Override 827 @Override
805 public void run() { 828 public void run() {
806 try { 829 try {
807 mCallback.onCanceled(JavaUrlRequest.this, info); 830 mCallback.onCanceled(JavaUrlRequest.this, info);
808 } catch (Exception exception) { 831 } catch (Exception exception) {
(...skipping 11 matching lines...) Expand all
820 mCallback.onSucceeded(JavaUrlRequest.this, info); 843 mCallback.onSucceeded(JavaUrlRequest.this, info);
821 } catch (Exception exception) { 844 } catch (Exception exception) {
822 Log.e(TAG, "Exception in onSucceeded method", exception) ; 845 Log.e(TAG, "Exception in onSucceeded method", exception) ;
823 } 846 }
824 } 847 }
825 }); 848 });
826 } 849 }
827 850
828 void onFailed(final UrlResponseInfo urlResponseInfo, final UrlRequestExc eption e) { 851 void onFailed(final UrlResponseInfo urlResponseInfo, final UrlRequestExc eption e) {
829 closeResponseChannel(); 852 closeResponseChannel();
830 mUserExecutor.execute(new Runnable() { 853 Runnable runnable = new Runnable() {
831 @Override 854 @Override
832 public void run() { 855 public void run() {
833 try { 856 try {
834 mCallback.onFailed(JavaUrlRequest.this, urlResponseInfo, e); 857 mCallback.onFailed(JavaUrlRequest.this, urlResponseInfo, e);
835 } catch (Exception exception) { 858 } catch (Exception exception) {
836 Log.e(TAG, "Exception in onFailed method", exception); 859 Log.e(TAG, "Exception in onFailed method", exception);
837 } 860 }
838 } 861 }
839 }); 862 };
863 try {
864 mUserExecutor.execute(runnable);
865 } catch (InlineExecutionProhibitedException wasDirect) {
866 if (mFallbackExecutor != null) {
867 mFallbackExecutor.execute(runnable);
868 }
869 }
840 } 870 }
841 } 871 }
842 872
843 private void closeResponseChannel() { 873 private void closeResponseChannel() {
844 final Closeable closeable = mResponseChannel; 874 final Closeable closeable = mResponseChannel;
845 if (closeable == null) { 875 if (closeable == null) {
846 return; 876 return;
847 } 877 }
848 mResponseChannel = null; 878 mResponseChannel = null;
849 mExecutor.execute(new Runnable() { 879 mExecutor.execute(new Runnable() {
850 @Override 880 @Override
851 public void run() { 881 public void run() {
852 try { 882 try {
853 closeable.close(); 883 closeable.close();
854 } catch (IOException e) { 884 } catch (IOException e) {
855 e.printStackTrace(); 885 e.printStackTrace();
856 } 886 }
857 } 887 }
858 }); 888 });
859 } 889 }
890
891 /**
892 * Executor that detects and throws if its mDelegate runs a submitted runnab le inline.
893 */
894 static final class DirectPreventingExecutor implements Executor {
895 private final Executor mDelegate;
896
897 DirectPreventingExecutor(Executor delegate) {
898 this.mDelegate = delegate;
899 }
900
901 @Override
902 public void execute(Runnable command) {
903 Thread currentThread = Thread.currentThread();
904 InlineCheckingRunnable runnable = new InlineCheckingRunnable(command , currentThread);
905 mDelegate.execute(runnable);
906 if (runnable.mExecutedInline != null) {
907 throw runnable.mExecutedInline;
908 } else {
909 // It's possible that this method is being called on an executor , and the runnable
910 // that
911 // was just queued will run on this thread after the current run nable returns. By
912 // nulling out the mCallingThread field, the InlineCheckingRunna ble's current thread
913 // comparison will not fire.
914 runnable.mCallingThread = null;
915 }
916 }
917
918 private static final class InlineCheckingRunnable implements Runnable {
919 private final Runnable mCommand;
920 private Thread mCallingThread;
921 private InlineExecutionProhibitedException mExecutedInline = null;
922
923 private InlineCheckingRunnable(Runnable command, Thread callingThrea d) {
924 this.mCommand = command;
925 this.mCallingThread = callingThread;
926 }
927
928 @Override
929 public void run() {
930 if (Thread.currentThread() == mCallingThread) {
931 // Can't throw directly from here, since the delegate execut or could catch this
932 // exception.
933 mExecutedInline = new InlineExecutionProhibitedException();
934 return;
935 }
936 mCommand.run();
937 }
938 }
939 }
860 } 940 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698