OLD | NEW |
---|---|
1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 package org.chromium.net; | 5 package org.chromium.net; |
6 | 6 |
7 import android.annotation.TargetApi; | 7 import android.annotation.TargetApi; |
8 import android.net.TrafficStats; | 8 import android.net.TrafficStats; |
9 import android.os.Build; | 9 import android.os.Build; |
10 import android.util.Log; | 10 import android.util.Log; |
(...skipping 38 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
49 /** | 49 /** |
50 * This is the source of thread safety in this class - no other synchronizat ion is performed. | 50 * This is the source of thread safety in this class - no other synchronizat ion is performed. |
51 * By compare-and-swapping from one state to another, we guarantee that oper ations aren't | 51 * By compare-and-swapping from one state to another, we guarantee that oper ations aren't |
52 * running concurrently. Only the winner of a CAS proceeds. | 52 * running concurrently. Only the winner of a CAS proceeds. |
53 * | 53 * |
54 * <p>A caller can lose a CAS for three reasons - user error (two calls to r ead() without | 54 * <p>A caller can lose a CAS for three reasons - user error (two calls to r ead() without |
55 * waiting for the read to succeed), runtime error (network code or user cod e throws an | 55 * waiting for the read to succeed), runtime error (network code or user cod e throws an |
56 * exception), or cancellation. | 56 * exception), or cancellation. |
57 */ | 57 */ |
58 private final AtomicReference<State> mState = new AtomicReference<>(State.NO T_STARTED); | 58 private final AtomicReference<State> mState = new AtomicReference<>(State.NO T_STARTED); |
59 private final AtomicBoolean mUploadProviderClosed = new AtomicBoolean(false) ; | |
59 | 60 |
60 /** | 61 /** |
61 * Traffic stats tag to associate this requests' data use with. It's capture d when the request | 62 * Traffic stats tag to associate this requests' data use with. It's capture d when the request |
62 * is created, so that applications doing work on behalf of another app can correctly attribute | 63 * is created, so that applications doing work on behalf of another app can correctly attribute |
63 * that data use. | 64 * that data use. |
64 */ | 65 */ |
65 private final int mTrafficStatsTag; | 66 private final int mTrafficStatsTag; |
67 private final boolean mAllowDirectExecutor; | |
66 | 68 |
67 /* These don't change with redirects */ | 69 /* These don't change with redirects */ |
68 private String mInitialMethod; | 70 private String mInitialMethod; |
69 private UploadDataProvider mUploadDataProvider; | 71 private UploadDataProvider mUploadDataProvider; |
70 private Executor mUploadExecutor; | 72 private Executor mUploadExecutor; |
71 private AtomicBoolean mUploadProviderClosed = new AtomicBoolean(false); | |
72 | 73 |
73 /** | 74 /** |
74 * Holds a subset of StatusValues - {@link State#STARTED} can represent | 75 * Holds a subset of StatusValues - {@link State#STARTED} can represent |
75 * {@link Status#SENDING_REQUEST} or {@link Status#WAITING_FOR_RESPONSE}. Wh ile the distinction | 76 * {@link Status#SENDING_REQUEST} or {@link Status#WAITING_FOR_RESPONSE}. Wh ile the distinction |
76 * isn't needed to implement the logic in this class, it is needed to implem ent | 77 * isn't needed to implement the logic in this class, it is needed to implem ent |
77 * {@link #getStatus(StatusListener)}. | 78 * {@link #getStatus(StatusListener)}. |
78 * | 79 * |
79 * <p>Concurrency notes - this value is not atomically updated with mState, so there is some | 80 * <p>Concurrency notes - this value is not atomically updated with mState, so there is some |
80 * risk that we'd get an inconsistent snapshot of both - however, it also ha ppens that this | 81 * risk that we'd get an inconsistent snapshot of both - however, it also ha ppens that this |
81 * value is only used with the STARTED state, so it's inconsequential. | 82 * value is only used with the STARTED state, so it's inconsequential. |
(...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
113 ERROR, | 114 ERROR, |
114 COMPLETE, | 115 COMPLETE, |
115 CANCELLED, | 116 CANCELLED, |
116 } | 117 } |
117 | 118 |
118 /** | 119 /** |
119 * @param executor The executor used for reading and writing from sockets | 120 * @param executor The executor used for reading and writing from sockets |
120 * @param userExecutor The executor used to dispatch to {@code callback} | 121 * @param userExecutor The executor used to dispatch to {@code callback} |
121 */ | 122 */ |
122 JavaUrlRequest(Callback callback, final Executor executor, Executor userExec utor, String url, | 123 JavaUrlRequest(Callback callback, final Executor executor, Executor userExec utor, String url, |
123 String userAgent) { | 124 String userAgent, boolean allowDirectExecutor) { |
125 this.mAllowDirectExecutor = allowDirectExecutor; | |
mef
2016/08/30 16:29:44
nit: move it to the end, below userAgent?
Charles
2016/08/30 20:11:06
This value gets read by AsyncUrlRequestCallback. M
| |
124 if (url == null) { | 126 if (url == null) { |
125 throw new NullPointerException("URL is required"); | 127 throw new NullPointerException("URL is required"); |
126 } | 128 } |
127 if (callback == null) { | 129 if (callback == null) { |
128 throw new NullPointerException("Listener is required"); | 130 throw new NullPointerException("Listener is required"); |
129 } | 131 } |
130 if (executor == null) { | 132 if (executor == null) { |
131 throw new NullPointerException("Executor is required"); | 133 throw new NullPointerException("Executor is required"); |
132 } | 134 } |
133 if (userExecutor == null) { | 135 if (userExecutor == null) { |
(...skipping 96 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
230 } | 232 } |
231 if (!mRequestHeaders.containsKey("Content-Type")) { | 233 if (!mRequestHeaders.containsKey("Content-Type")) { |
232 throw new IllegalArgumentException( | 234 throw new IllegalArgumentException( |
233 "Requests with upload data must have a Content-Type."); | 235 "Requests with upload data must have a Content-Type."); |
234 } | 236 } |
235 checkNotStarted(); | 237 checkNotStarted(); |
236 if (mInitialMethod == null) { | 238 if (mInitialMethod == null) { |
237 mInitialMethod = "POST"; | 239 mInitialMethod = "POST"; |
238 } | 240 } |
239 this.mUploadDataProvider = uploadDataProvider; | 241 this.mUploadDataProvider = uploadDataProvider; |
240 this.mUploadExecutor = executor; | 242 if (mAllowDirectExecutor) { |
243 this.mUploadExecutor = executor; | |
244 } else { | |
245 this.mUploadExecutor = new DirectPreventingExecutor(executor); | |
246 } | |
241 } | 247 } |
242 | 248 |
243 private enum SinkState { | 249 private enum SinkState { |
244 AWAITING_READ_RESULT, | 250 AWAITING_READ_RESULT, |
245 AWAITING_REWIND_RESULT, | 251 AWAITING_REWIND_RESULT, |
246 UPLOADING, | 252 UPLOADING, |
247 NOT_STARTED, | 253 NOT_STARTED, |
248 } | 254 } |
249 | 255 |
250 private final class OutputStreamDataSink implements UploadDataSink { | 256 private final class OutputStreamDataSink implements UploadDataSink { |
(...skipping 41 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
292 "Read upload data length %d exceeds expected len gth %d", | 298 "Read upload data length %d exceeds expected len gth %d", |
293 mWrittenBytes + mBuffer.remaining(), mTotalBytes ))); | 299 mWrittenBytes + mBuffer.remaining(), mTotalBytes ))); |
294 return; | 300 return; |
295 } | 301 } |
296 while (mBuffer.hasRemaining()) { | 302 while (mBuffer.hasRemaining()) { |
297 mWrittenBytes += mOutputChannel.write(mBuffer); | 303 mWrittenBytes += mOutputChannel.write(mBuffer); |
298 } | 304 } |
299 if (mWrittenBytes < mTotalBytes || (mTotalBytes == -1 && !fi nalChunk)) { | 305 if (mWrittenBytes < mTotalBytes || (mTotalBytes == -1 && !fi nalChunk)) { |
300 mBuffer.clear(); | 306 mBuffer.clear(); |
301 mSinkState.set(SinkState.AWAITING_READ_RESULT); | 307 mSinkState.set(SinkState.AWAITING_READ_RESULT); |
302 mUserExecutor.execute(uploadErrorSetting(new CheckedRunn able() { | 308 executeOnUploadExecutor(new CheckedRunnable() { |
303 @Override | 309 @Override |
304 public void run() throws Exception { | 310 public void run() throws Exception { |
305 mUploadProvider.read(OutputStreamDataSink.this, mBuffer); | 311 mUploadProvider.read(OutputStreamDataSink.this, mBuffer); |
306 } | 312 } |
307 })); | 313 }); |
308 } else if (mTotalBytes == -1) { | 314 } else if (mTotalBytes == -1) { |
309 finish(); | 315 finish(); |
310 } else if (mTotalBytes == mWrittenBytes) { | 316 } else if (mTotalBytes == mWrittenBytes) { |
311 finish(); | 317 finish(); |
312 } else { | 318 } else { |
313 enterUploadErrorState(new IllegalArgumentException(Strin g.format( | 319 enterUploadErrorState(new IllegalArgumentException(Strin g.format( |
314 "Read upload data length %d exceeds expected len gth %d", | 320 "Read upload data length %d exceeds expected len gth %d", |
315 mWrittenBytes, mTotalBytes))); | 321 mWrittenBytes, mTotalBytes))); |
316 } | 322 } |
317 } | 323 } |
(...skipping 22 matching lines...) Expand all Loading... | |
340 mExecutor.execute(errorSetting(State.STARTED, new CheckedRunnable() { | 346 mExecutor.execute(errorSetting(State.STARTED, new CheckedRunnable() { |
341 @Override | 347 @Override |
342 public void run() throws Exception { | 348 public void run() throws Exception { |
343 if (mOutputChannel == null) { | 349 if (mOutputChannel == null) { |
344 mAdditionalStatusDetails = Status.CONNECTING; | 350 mAdditionalStatusDetails = Status.CONNECTING; |
345 mUrlConnection.connect(); | 351 mUrlConnection.connect(); |
346 mAdditionalStatusDetails = Status.SENDING_REQUEST; | 352 mAdditionalStatusDetails = Status.SENDING_REQUEST; |
347 mOutputChannel = Channels.newChannel(mUrlConnection.getO utputStream()); | 353 mOutputChannel = Channels.newChannel(mUrlConnection.getO utputStream()); |
348 } | 354 } |
349 mSinkState.set(SinkState.AWAITING_READ_RESULT); | 355 mSinkState.set(SinkState.AWAITING_READ_RESULT); |
350 mUserExecutor.execute(uploadErrorSetting(new CheckedRunnable () { | 356 executeOnUploadExecutor(new CheckedRunnable() { |
351 @Override | 357 @Override |
352 public void run() throws Exception { | 358 public void run() throws Exception { |
353 mUploadProvider.read(OutputStreamDataSink.this, mBuf fer); | 359 mUploadProvider.read(OutputStreamDataSink.this, mBuf fer); |
354 } | 360 } |
355 })); | 361 }); |
356 } | 362 } |
357 })); | 363 })); |
358 } | 364 } |
359 | 365 |
366 private void executeOnUploadExecutor(CheckedRunnable runnable) { | |
367 try { | |
368 mUserExecutor.execute(uploadErrorSetting(runnable)); | |
mef
2016/08/30 16:29:44
Should this execute on mUploadExecutor instead of
Charles
2016/08/30 20:11:05
In this lexical scope mUserExecutor is referring t
mef
2016/08/31 17:26:52
I see. I've missed the fact that this is part of O
Charles
2016/08/31 19:08:36
Done.
| |
369 } catch (RejectedExecutionException e) { | |
370 enterUploadErrorState(e); | |
371 } | |
372 } | |
373 | |
360 void finish() throws IOException { | 374 void finish() throws IOException { |
361 if (mOutputChannel != null) { | 375 if (mOutputChannel != null) { |
362 mOutputChannel.close(); | 376 mOutputChannel.close(); |
363 } | 377 } |
364 fireGetHeaders(); | 378 fireGetHeaders(); |
365 } | 379 } |
366 | 380 |
367 void start(final boolean firstTime) { | 381 void start(final boolean firstTime) { |
368 mUserExecutor.execute(uploadErrorSetting(new CheckedRunnable() { | 382 executeOnUploadExecutor(new CheckedRunnable() { |
369 @Override | 383 @Override |
370 public void run() throws Exception { | 384 public void run() throws Exception { |
371 mTotalBytes = mUploadProvider.getLength(); | 385 mTotalBytes = mUploadProvider.getLength(); |
372 if (mTotalBytes == 0) { | 386 if (mTotalBytes == 0) { |
373 finish(); | 387 finish(); |
374 } else { | 388 } else { |
375 // If we know how much data we have to upload, and it's small, we can save | 389 // If we know how much data we have to upload, and it's small, we can save |
376 // memory by allocating a reasonably sized buffer to rea d into. | 390 // memory by allocating a reasonably sized buffer to rea d into. |
377 if (mTotalBytes > 0 && mTotalBytes < DEFAULT_UPLOAD_BUFF ER_SIZE) { | 391 if (mTotalBytes > 0 && mTotalBytes < DEFAULT_UPLOAD_BUFF ER_SIZE) { |
378 // Allocate one byte more than necessary, to detect callers uploading | 392 // Allocate one byte more than necessary, to detect callers uploading |
(...skipping 15 matching lines...) Expand all Loading... | |
394 mUrlConnection.setChunkedStreamingMode(DEFAULT_CHUNK _LENGTH); | 408 mUrlConnection.setChunkedStreamingMode(DEFAULT_CHUNK _LENGTH); |
395 } | 409 } |
396 if (firstTime) { | 410 if (firstTime) { |
397 startRead(); | 411 startRead(); |
398 } else { | 412 } else { |
399 mSinkState.set(SinkState.AWAITING_REWIND_RESULT); | 413 mSinkState.set(SinkState.AWAITING_REWIND_RESULT); |
400 mUploadProvider.rewind(OutputStreamDataSink.this); | 414 mUploadProvider.rewind(OutputStreamDataSink.this); |
401 } | 415 } |
402 } | 416 } |
403 } | 417 } |
404 })); | 418 }); |
405 } | 419 } |
406 } | 420 } |
407 | 421 |
408 @Override | 422 @Override |
409 public void start() { | 423 public void start() { |
410 mAdditionalStatusDetails = Status.CONNECTING; | 424 mAdditionalStatusDetails = Status.CONNECTING; |
411 transitionStates(State.NOT_STARTED, State.STARTED, new Runnable() { | 425 transitionStates(State.NOT_STARTED, State.STARTED, new Runnable() { |
412 @Override | 426 @Override |
413 public void run() { | 427 public void run() { |
414 mUrlChain.add(mCurrentUrl); | 428 mUrlChain.add(mCurrentUrl); |
(...skipping 322 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
737 throw new IllegalStateException("Switch is exhaustive: " + state ); | 751 throw new IllegalStateException("Switch is exhaustive: " + state ); |
738 } | 752 } |
739 | 753 |
740 mCallbackAsync.sendStatus(listener, status); | 754 mCallbackAsync.sendStatus(listener, status); |
741 } | 755 } |
742 | 756 |
743 /** This wrapper ensures that callbacks are always called on the correct exe cutor */ | 757 /** This wrapper ensures that callbacks are always called on the correct exe cutor */ |
744 private final class AsyncUrlRequestCallback { | 758 private final class AsyncUrlRequestCallback { |
745 final UrlRequest.Callback mCallback; | 759 final UrlRequest.Callback mCallback; |
746 final Executor mUserExecutor; | 760 final Executor mUserExecutor; |
761 final Executor mFallbackExecutor; | |
747 | 762 |
748 AsyncUrlRequestCallback(Callback callback, final Executor userExecutor) { | 763 AsyncUrlRequestCallback(Callback callback, final Executor userExecutor) { |
749 this.mCallback = callback; | 764 this.mCallback = callback; |
750 this.mUserExecutor = userExecutor; | 765 if (mAllowDirectExecutor) { |
766 this.mUserExecutor = userExecutor; | |
767 this.mFallbackExecutor = null; | |
768 } else { | |
769 mUserExecutor = new DirectPreventingExecutor(userExecutor); | |
770 mFallbackExecutor = userExecutor; | |
771 } | |
751 } | 772 } |
752 | 773 |
753 void sendStatus(final StatusListener listener, final int status) { | 774 void sendStatus(final StatusListener listener, final int status) { |
754 mUserExecutor.execute(new Runnable() { | 775 mUserExecutor.execute(new Runnable() { |
755 @Override | 776 @Override |
756 public void run() { | 777 public void run() { |
757 listener.onStatus(status); | 778 listener.onStatus(status); |
758 } | 779 } |
759 }); | 780 }); |
760 } | 781 } |
761 | 782 |
762 void execute(State currentState, CheckedRunnable runnable) { | 783 void execute(State currentState, CheckedRunnable runnable) { |
763 try { | 784 try { |
764 mUserExecutor.execute(userErrorSetting(currentState, runnable)); | 785 mUserExecutor.execute(userErrorSetting(currentState, runnable)); |
765 } catch (RejectedExecutionException e) { | 786 } catch (RejectedExecutionException e) { |
766 enterUserErrorState(currentState, e); | 787 enterErrorState(currentState, |
788 new UrlRequestException("Exception posting task to execu tor", e)); | |
767 } | 789 } |
768 } | 790 } |
769 | 791 |
770 void onRedirectReceived(final UrlResponseInfo info, final String newLoca tionUrl) { | 792 void onRedirectReceived(final UrlResponseInfo info, final String newLoca tionUrl) { |
771 execute(State.AWAITING_FOLLOW_REDIRECT, new CheckedRunnable() { | 793 execute(State.AWAITING_FOLLOW_REDIRECT, new CheckedRunnable() { |
772 @Override | 794 @Override |
773 public void run() throws Exception { | 795 public void run() throws Exception { |
774 mCallback.onRedirectReceived(JavaUrlRequest.this, info, newL ocationUrl); | 796 mCallback.onRedirectReceived(JavaUrlRequest.this, info, newL ocationUrl); |
775 } | 797 } |
776 }); | 798 }); |
777 } | 799 } |
778 | 800 |
779 void onResponseStarted(UrlResponseInfo info) { | 801 void onResponseStarted(UrlResponseInfo info) { |
780 execute(State.AWAITING_READ, new CheckedRunnable() { | 802 if (mState.compareAndSet(State.STARTED, State.AWAITING_READ)) { |
mef
2016/08/30 16:29:44
This is changing thread where mState is accessed f
Charles
2016/08/30 20:11:06
mState is safe from any thread, since it's an atom
mef
2016/08/31 17:26:52
Acknowledged.
| |
781 @Override | 803 execute(State.AWAITING_READ, new CheckedRunnable() { |
782 public void run() throws Exception { | 804 @Override |
783 if (mState.compareAndSet(State.STARTED, State.AWAITING_READ) ) { | 805 public void run() throws Exception { |
784 mCallback.onResponseStarted(JavaUrlRequest.this, mUrlRes ponseInfo); | 806 mCallback.onResponseStarted(JavaUrlRequest.this, mUrlRes ponseInfo); |
785 } | 807 } |
786 } | 808 }); |
787 }); | 809 } |
788 } | 810 } |
789 | 811 |
790 void onReadCompleted(final UrlResponseInfo info, final ByteBuffer byteBu ffer) { | 812 void onReadCompleted(final UrlResponseInfo info, final ByteBuffer byteBu ffer) { |
791 execute(State.AWAITING_READ, new CheckedRunnable() { | 813 if (mState.compareAndSet(State.READING, State.AWAITING_READ)) { |
mef
2016/08/30 16:29:44
ditto.
Charles
2016/08/30 20:11:05
This one too - we want a REE thrown when submittin
| |
792 @Override | 814 execute(State.AWAITING_READ, new CheckedRunnable() { |
793 public void run() throws Exception { | 815 @Override |
794 if (mState.compareAndSet(State.READING, State.AWAITING_READ) ) { | 816 public void run() throws Exception { |
795 mCallback.onReadCompleted(JavaUrlRequest.this, info, byt eBuffer); | 817 mCallback.onReadCompleted(JavaUrlRequest.this, info, byt eBuffer); |
796 } | 818 } |
797 } | 819 }); |
798 }); | 820 } |
799 } | 821 } |
800 | 822 |
801 void onCanceled(final UrlResponseInfo info) { | 823 void onCanceled(final UrlResponseInfo info) { |
802 closeQuietly(mResponseChannel); | 824 closeQuietly(mResponseChannel); |
803 mUserExecutor.execute(new Runnable() { | 825 mUserExecutor.execute(new Runnable() { |
804 @Override | 826 @Override |
805 public void run() { | 827 public void run() { |
806 try { | 828 try { |
807 mCallback.onCanceled(JavaUrlRequest.this, info); | 829 mCallback.onCanceled(JavaUrlRequest.this, info); |
808 } catch (Exception exception) { | 830 } catch (Exception exception) { |
(...skipping 11 matching lines...) Expand all Loading... | |
820 mCallback.onSucceeded(JavaUrlRequest.this, info); | 842 mCallback.onSucceeded(JavaUrlRequest.this, info); |
821 } catch (Exception exception) { | 843 } catch (Exception exception) { |
822 Log.e(TAG, "Exception in onSucceeded method", exception) ; | 844 Log.e(TAG, "Exception in onSucceeded method", exception) ; |
823 } | 845 } |
824 } | 846 } |
825 }); | 847 }); |
826 } | 848 } |
827 | 849 |
828 void onFailed(final UrlResponseInfo urlResponseInfo, final UrlRequestExc eption e) { | 850 void onFailed(final UrlResponseInfo urlResponseInfo, final UrlRequestExc eption e) { |
829 closeQuietly(mResponseChannel); | 851 closeQuietly(mResponseChannel); |
830 mUserExecutor.execute(new Runnable() { | 852 Runnable runnable = new Runnable() { |
831 @Override | 853 @Override |
832 public void run() { | 854 public void run() { |
833 try { | 855 try { |
834 mCallback.onFailed(JavaUrlRequest.this, urlResponseInfo, e); | 856 mCallback.onFailed(JavaUrlRequest.this, urlResponseInfo, e); |
835 } catch (Exception exception) { | 857 } catch (Exception exception) { |
836 Log.e(TAG, "Exception in onFailed method", exception); | 858 Log.e(TAG, "Exception in onFailed method", exception); |
837 } | 859 } |
838 } | 860 } |
839 }); | 861 }; |
862 try { | |
863 mUserExecutor.execute(runnable); | |
864 } catch (InlineExecutionProhibitedException wasDirect) { | |
865 if (mFallbackExecutor != null) { | |
866 mFallbackExecutor.execute(runnable); | |
867 } | |
868 } | |
840 } | 869 } |
841 } | 870 } |
842 | 871 |
843 private static void closeQuietly(Closeable closeable) { | 872 private static void closeQuietly(Closeable closeable) { |
844 if (closeable == null) { | 873 if (closeable == null) { |
845 return; | 874 return; |
846 } | 875 } |
847 try { | 876 try { |
848 closeable.close(); | 877 closeable.close(); |
849 } catch (IOException e) { | 878 } catch (IOException e) { |
850 e.printStackTrace(); | 879 e.printStackTrace(); |
851 } | 880 } |
852 } | 881 } |
853 } | 882 } |
OLD | NEW |