Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(144)

Side by Side Diff: components/cronet/android/api/src/org/chromium/net/JavaUrlRequest.java

Issue 2339223002: Cronet API Refactoring (Closed)
Patch Set: Javadoc + rebase Created 4 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package org.chromium.net;
6
7 import android.annotation.TargetApi;
8 import android.net.TrafficStats;
9 import android.os.Build;
10 import android.util.Log;
11
12 import java.io.Closeable;
13 import java.io.IOException;
14 import java.net.HttpURLConnection;
15 import java.net.URI;
16 import java.net.URL;
17 import java.nio.ByteBuffer;
18 import java.nio.channels.Channels;
19 import java.nio.channels.ReadableByteChannel;
20 import java.nio.channels.WritableByteChannel;
21 import java.util.AbstractMap.SimpleEntry;
22 import java.util.ArrayList;
23 import java.util.Collections;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.TreeMap;
27 import java.util.concurrent.Executor;
28 import java.util.concurrent.RejectedExecutionException;
29 import java.util.concurrent.atomic.AtomicBoolean;
30 import java.util.concurrent.atomic.AtomicReference;
31
32 /**
33 * Pure java UrlRequest, backed by {@link HttpURLConnection}.
34 */
35 @TargetApi(Build.VERSION_CODES.ICE_CREAM_SANDWICH) // TrafficStats only availabl e on ICS
36 final class JavaUrlRequest implements UrlRequest {
37 private static final String X_ANDROID = "X-Android";
38 private static final String X_ANDROID_SELECTED_TRANSPORT = "X-Android-Select ed-Transport";
39 private static final String TAG = "JavaUrlConnection";
40 private static final int DEFAULT_UPLOAD_BUFFER_SIZE = 8192;
41 private static final int DEFAULT_CHUNK_LENGTH = DEFAULT_UPLOAD_BUFFER_SIZE;
42 private static final String USER_AGENT = "User-Agent";
43 private final AsyncUrlRequestCallback mCallbackAsync;
44 private final Executor mExecutor;
45 private final String mUserAgent;
46 private final Map<String, String> mRequestHeaders =
47 new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
48 private final List<String> mUrlChain = new ArrayList<>();
49 /**
50 * This is the source of thread safety in this class - no other synchronizat ion is performed.
51 * By compare-and-swapping from one state to another, we guarantee that oper ations aren't
52 * running concurrently. Only the winner of a CAS proceeds.
53 *
54 * <p>A caller can lose a CAS for three reasons - user error (two calls to r ead() without
55 * waiting for the read to succeed), runtime error (network code or user cod e throws an
56 * exception), or cancellation.
57 */
58 private final AtomicReference<State> mState = new AtomicReference<>(State.NO T_STARTED);
59 private final AtomicBoolean mUploadProviderClosed = new AtomicBoolean(false) ;
60
61 /**
62 * Traffic stats tag to associate this requests' data use with. It's capture d when the request
63 * is created, so that applications doing work on behalf of another app can correctly attribute
64 * that data use.
65 */
66 private final int mTrafficStatsTag;
67 private final boolean mAllowDirectExecutor;
68
69 /* These don't change with redirects */
70 private String mInitialMethod;
71 private UploadDataProvider mUploadDataProvider;
72 private Executor mUploadExecutor;
73
74 /**
75 * Holds a subset of StatusValues - {@link State#STARTED} can represent
76 * {@link Status#SENDING_REQUEST} or {@link Status#WAITING_FOR_RESPONSE}. Wh ile the distinction
77 * isn't needed to implement the logic in this class, it is needed to implem ent
78 * {@link #getStatus(StatusListener)}.
79 *
80 * <p>Concurrency notes - this value is not atomically updated with mState, so there is some
81 * risk that we'd get an inconsistent snapshot of both - however, it also ha ppens that this
82 * value is only used with the STARTED state, so it's inconsequential.
83 */
84 @Status.StatusValues private volatile int mAdditionalStatusDetails = Status. INVALID;
85
86 /* These change with redirects. */
87 private String mCurrentUrl;
88 private ReadableByteChannel mResponseChannel;
89 private UrlResponseInfo mUrlResponseInfo;
90 private String mPendingRedirectUrl;
91 /**
92 * The happens-before edges created by the executor submission and AtomicRef erence setting are
93 * sufficient to guarantee the correct behavior of this field; however, this is an
94 * AtomicReference so that we can cleanly dispose of a new connection if we' re cancelled during
95 * a redirect, which requires get-and-set semantics.
96 * */
97 private final AtomicReference<HttpURLConnection> mCurrentUrlConnection =
98 new AtomicReference<>();
99
100 /**
101 * /- AWAITING_FOLLOW_REDIRECT <- REDIRECT_RECEIVED <-\ /- READING <--\
102 * | | | |
103 * \ / \ /
104 * NOT_STARTED ---> STARTED ----> AW AITING_READ --->
105 * COMPLETE
106 */
107 private enum State {
108 NOT_STARTED,
109 STARTED,
110 REDIRECT_RECEIVED,
111 AWAITING_FOLLOW_REDIRECT,
112 AWAITING_READ,
113 READING,
114 ERROR,
115 COMPLETE,
116 CANCELLED,
117 }
118
119 /**
120 * @param executor The executor used for reading and writing from sockets
121 * @param userExecutor The executor used to dispatch to {@code callback}
122 */
123 JavaUrlRequest(Callback callback, final Executor executor, Executor userExec utor, String url,
124 String userAgent, boolean allowDirectExecutor) {
125 if (url == null) {
126 throw new NullPointerException("URL is required");
127 }
128 if (callback == null) {
129 throw new NullPointerException("Listener is required");
130 }
131 if (executor == null) {
132 throw new NullPointerException("Executor is required");
133 }
134 if (userExecutor == null) {
135 throw new NullPointerException("userExecutor is required");
136 }
137
138 this.mAllowDirectExecutor = allowDirectExecutor;
139 this.mCallbackAsync = new AsyncUrlRequestCallback(callback, userExecutor );
140 this.mTrafficStatsTag = TrafficStats.getThreadStatsTag();
141 this.mExecutor = new Executor() {
142 @Override
143 public void execute(final Runnable command) {
144 executor.execute(new Runnable() {
145 @Override
146 public void run() {
147 int oldTag = TrafficStats.getThreadStatsTag();
148 TrafficStats.setThreadStatsTag(mTrafficStatsTag);
149 try {
150 command.run();
151 } finally {
152 TrafficStats.setThreadStatsTag(oldTag);
153 }
154 }
155 });
156 }
157 };
158 this.mCurrentUrl = url;
159 this.mUserAgent = userAgent;
160 }
161
162 @Override
163 public void setHttpMethod(String method) {
164 checkNotStarted();
165 if (method == null) {
166 throw new NullPointerException("Method is required.");
167 }
168 if ("OPTIONS".equalsIgnoreCase(method) || "GET".equalsIgnoreCase(method)
169 || "HEAD".equalsIgnoreCase(method) || "POST".equalsIgnoreCase(me thod)
170 || "PUT".equalsIgnoreCase(method) || "DELETE".equalsIgnoreCase(m ethod)
171 || "TRACE".equalsIgnoreCase(method) || "PATCH".equalsIgnoreCase( method)) {
172 mInitialMethod = method;
173 } else {
174 throw new IllegalArgumentException("Invalid http method " + method);
175 }
176 }
177
178 private void checkNotStarted() {
179 State state = mState.get();
180 if (state != State.NOT_STARTED) {
181 throw new IllegalStateException("Request is already started. State i s: " + state);
182 }
183 }
184
185 @Override
186 public void addHeader(String header, String value) {
187 checkNotStarted();
188 if (!isValidHeaderName(header) || value.contains("\r\n")) {
189 throw new IllegalArgumentException("Invalid header " + header + "=" + value);
190 }
191 if (mRequestHeaders.containsKey(header)) {
192 mRequestHeaders.remove(header);
193 }
194 mRequestHeaders.put(header, value);
195 }
196
197 private boolean isValidHeaderName(String header) {
198 for (int i = 0; i < header.length(); i++) {
199 char c = header.charAt(i);
200 switch (c) {
201 case '(':
202 case ')':
203 case '<':
204 case '>':
205 case '@':
206 case ',':
207 case ';':
208 case ':':
209 case '\\':
210 case '\'':
211 case '/':
212 case '[':
213 case ']':
214 case '?':
215 case '=':
216 case '{':
217 case '}':
218 return false;
219 default: {
220 if (Character.isISOControl(c) || Character.isWhitespace(c)) {
221 return false;
222 }
223 }
224 }
225 }
226 return true;
227 }
228
229 @Override
230 public void setUploadDataProvider(UploadDataProvider uploadDataProvider, Exe cutor executor) {
231 if (uploadDataProvider == null) {
232 throw new NullPointerException("Invalid UploadDataProvider.");
233 }
234 if (!mRequestHeaders.containsKey("Content-Type")) {
235 throw new IllegalArgumentException(
236 "Requests with upload data must have a Content-Type.");
237 }
238 checkNotStarted();
239 if (mInitialMethod == null) {
240 mInitialMethod = "POST";
241 }
242 this.mUploadDataProvider = uploadDataProvider;
243 if (mAllowDirectExecutor) {
244 this.mUploadExecutor = executor;
245 } else {
246 this.mUploadExecutor = new DirectPreventingExecutor(executor);
247 }
248 }
249
250 private enum SinkState {
251 AWAITING_READ_RESULT,
252 AWAITING_REWIND_RESULT,
253 UPLOADING,
254 NOT_STARTED,
255 }
256
257 private final class OutputStreamDataSink implements UploadDataSink {
258 final AtomicReference<SinkState> mSinkState = new AtomicReference<>(Sink State.NOT_STARTED);
259 final Executor mUserUploadExecutor;
260 final Executor mExecutor;
261 final HttpURLConnection mUrlConnection;
262 WritableByteChannel mOutputChannel;
263 final UploadDataProvider mUploadProvider;
264 ByteBuffer mBuffer;
265 /** This holds the total bytes to send (the content-length). -1 if unkno wn. */
266 long mTotalBytes;
267 /** This holds the bytes written so far */
268 long mWrittenBytes = 0;
269
270 OutputStreamDataSink(final Executor userExecutor, Executor executor,
271 HttpURLConnection urlConnection, UploadDataProvider provider) {
272 this.mUserUploadExecutor = new Executor() {
273 @Override
274 public void execute(Runnable runnable) {
275 try {
276 userExecutor.execute(runnable);
277 } catch (RejectedExecutionException e) {
278 enterUploadErrorState(e);
279 }
280 }
281 };
282 this.mExecutor = executor;
283 this.mUrlConnection = urlConnection;
284 this.mUploadProvider = provider;
285 }
286
287 @Override
288 public void onReadSucceeded(final boolean finalChunk) {
289 if (!mSinkState.compareAndSet(SinkState.AWAITING_READ_RESULT, SinkSt ate.UPLOADING)) {
290 throw new IllegalStateException(
291 "Not expecting a read result, expecting: " + mSinkState. get());
292 }
293 mExecutor.execute(errorSetting(new CheckedRunnable() {
294 @Override
295 public void run() throws Exception {
296 mBuffer.flip();
297 if (mTotalBytes != -1 && mTotalBytes - mWrittenBytes < mBuff er.remaining()) {
298 enterUploadErrorState(new IllegalArgumentException(Strin g.format(
299 "Read upload data length %d exceeds expected len gth %d",
300 mWrittenBytes + mBuffer.remaining(), mTotalBytes )));
301 return;
302 }
303 while (mBuffer.hasRemaining()) {
304 mWrittenBytes += mOutputChannel.write(mBuffer);
305 }
306 if (mWrittenBytes < mTotalBytes || (mTotalBytes == -1 && !fi nalChunk)) {
307 mBuffer.clear();
308 mSinkState.set(SinkState.AWAITING_READ_RESULT);
309 executeOnUploadExecutor(new CheckedRunnable() {
310 @Override
311 public void run() throws Exception {
312 mUploadProvider.read(OutputStreamDataSink.this, mBuffer);
313 }
314 });
315 } else if (mTotalBytes == -1) {
316 finish();
317 } else if (mTotalBytes == mWrittenBytes) {
318 finish();
319 } else {
320 enterUploadErrorState(new IllegalArgumentException(Strin g.format(
321 "Read upload data length %d exceeds expected len gth %d",
322 mWrittenBytes, mTotalBytes)));
323 }
324 }
325 }));
326 }
327
328 @Override
329 public void onRewindSucceeded() {
330 if (!mSinkState.compareAndSet(SinkState.AWAITING_REWIND_RESULT, Sink State.UPLOADING)) {
331 throw new IllegalStateException("Not expecting a read result");
332 }
333 startRead();
334 }
335
336 @Override
337 public void onReadError(Exception exception) {
338 enterUploadErrorState(exception);
339 }
340
341 @Override
342 public void onRewindError(Exception exception) {
343 enterUploadErrorState(exception);
344 }
345
346 void startRead() {
347 mExecutor.execute(errorSetting(new CheckedRunnable() {
348 @Override
349 public void run() throws Exception {
350 if (mOutputChannel == null) {
351 mAdditionalStatusDetails = Status.CONNECTING;
352 mUrlConnection.connect();
353 mAdditionalStatusDetails = Status.SENDING_REQUEST;
354 mOutputChannel = Channels.newChannel(mUrlConnection.getO utputStream());
355 }
356 mSinkState.set(SinkState.AWAITING_READ_RESULT);
357 executeOnUploadExecutor(new CheckedRunnable() {
358 @Override
359 public void run() throws Exception {
360 mUploadProvider.read(OutputStreamDataSink.this, mBuf fer);
361 }
362 });
363 }
364 }));
365 }
366
367 private void executeOnUploadExecutor(CheckedRunnable runnable) {
368 try {
369 mUserUploadExecutor.execute(uploadErrorSetting(runnable));
370 } catch (RejectedExecutionException e) {
371 enterUploadErrorState(e);
372 }
373 }
374
375 void finish() throws IOException {
376 if (mOutputChannel != null) {
377 mOutputChannel.close();
378 }
379 fireGetHeaders();
380 }
381
382 void start(final boolean firstTime) {
383 executeOnUploadExecutor(new CheckedRunnable() {
384 @Override
385 public void run() throws Exception {
386 mTotalBytes = mUploadProvider.getLength();
387 if (mTotalBytes == 0) {
388 finish();
389 } else {
390 // If we know how much data we have to upload, and it's small, we can save
391 // memory by allocating a reasonably sized buffer to rea d into.
392 if (mTotalBytes > 0 && mTotalBytes < DEFAULT_UPLOAD_BUFF ER_SIZE) {
393 // Allocate one byte more than necessary, to detect callers uploading
394 // more bytes than they specified in length.
395 mBuffer = ByteBuffer.allocateDirect((int) mTotalByte s + 1);
396 } else {
397 mBuffer = ByteBuffer.allocateDirect(DEFAULT_UPLOAD_B UFFER_SIZE);
398 }
399
400 if (mTotalBytes > 0 && mTotalBytes <= Integer.MAX_VALUE) {
401 mUrlConnection.setFixedLengthStreamingMode((int) mTo talBytes);
402 } else if (mTotalBytes > Integer.MAX_VALUE
403 && Build.VERSION.SDK_INT >= Build.VERSION_CODES. KITKAT) {
404 mUrlConnection.setFixedLengthStreamingMode(mTotalByt es);
405 } else {
406 // If we know the length, but we're running pre-kitk at and it's larger
407 // than an int can hold, we have to use chunked - ot herwise we'll end up
408 // buffering the whole response in memory.
409 mUrlConnection.setChunkedStreamingMode(DEFAULT_CHUNK _LENGTH);
410 }
411 if (firstTime) {
412 startRead();
413 } else {
414 mSinkState.set(SinkState.AWAITING_REWIND_RESULT);
415 mUploadProvider.rewind(OutputStreamDataSink.this);
416 }
417 }
418 }
419 });
420 }
421 }
422
423 @Override
424 public void start() {
425 mAdditionalStatusDetails = Status.CONNECTING;
426 transitionStates(State.NOT_STARTED, State.STARTED, new Runnable() {
427 @Override
428 public void run() {
429 mUrlChain.add(mCurrentUrl);
430 fireOpenConnection();
431 }
432 });
433 }
434
435 private void enterErrorState(final UrlRequestException error) {
436 if (setTerminalState(State.ERROR)) {
437 fireDisconnect();
438 fireCloseUploadDataProvider();
439 mCallbackAsync.onFailed(mUrlResponseInfo, error);
440 }
441 }
442
443 private boolean setTerminalState(State error) {
444 while (true) {
445 State oldState = mState.get();
446 switch (oldState) {
447 case NOT_STARTED:
448 throw new IllegalStateException("Can't enter error state bef ore start");
449 case ERROR: // fallthrough
450 case COMPLETE: // fallthrough
451 case CANCELLED:
452 return false; // Already in a terminal state
453 default: {
454 if (mState.compareAndSet(oldState, error)) {
455 return true;
456 }
457 }
458 }
459 }
460 }
461
462 /** Ends the request with an error, caused by an exception thrown from user code. */
463 private void enterUserErrorState(final Throwable error) {
464 enterErrorState(
465 new UrlRequestException("Exception received from UrlRequest.Call back", error));
466 }
467
468 /** Ends the request with an error, caused by an exception thrown from user code. */
469 private void enterUploadErrorState(final Throwable error) {
470 enterErrorState(
471 new UrlRequestException("Exception received from UploadDataProvi der", error));
472 }
473
474 private void enterCronetErrorState(final Throwable error) {
475 // TODO(clm) mapping from Java exception (UnknownHostException, for exam ple) to net error
476 // code goes here.
477 enterErrorState(new UrlRequestException("System error", error));
478 }
479
480 /**
481 * Atomically swaps from the expected state to a new state. If the swap fail s, and it's not
482 * due to an earlier error or cancellation, throws an exception.
483 *
484 * @param afterTransition Callback to run after transition completes success fully.
485 */
486 private void transitionStates(State expected, State newState, Runnable after Transition) {
487 if (!mState.compareAndSet(expected, newState)) {
488 State state = mState.get();
489 if (!(state == State.CANCELLED || state == State.ERROR)) {
490 throw new IllegalStateException(
491 "Invalid state transition - expected " + expected + " bu t was " + state);
492 }
493 } else {
494 afterTransition.run();
495 }
496 }
497
498 @Override
499 public void followRedirect() {
500 transitionStates(State.AWAITING_FOLLOW_REDIRECT, State.STARTED, new Runn able() {
501 @Override
502 public void run() {
503 mCurrentUrl = mPendingRedirectUrl;
504 mPendingRedirectUrl = null;
505 fireOpenConnection();
506 }
507 });
508 }
509
510 private void fireGetHeaders() {
511 mAdditionalStatusDetails = Status.WAITING_FOR_RESPONSE;
512 mExecutor.execute(errorSetting(new CheckedRunnable() {
513 @Override
514 public void run() throws Exception {
515 HttpURLConnection connection = mCurrentUrlConnection.get();
516 if (connection == null) {
517 return; // We've been cancelled
518 }
519 final List<Map.Entry<String, String>> headerList = new ArrayList <>();
520 String selectedTransport = "http/1.1";
521 String headerKey;
522 for (int i = 0; (headerKey = connection.getHeaderFieldKey(i)) != null; i++) {
523 if (X_ANDROID_SELECTED_TRANSPORT.equalsIgnoreCase(headerKey) ) {
524 selectedTransport = connection.getHeaderField(i);
525 }
526 if (!headerKey.startsWith(X_ANDROID)) {
527 headerList.add(new SimpleEntry<>(headerKey, connection.g etHeaderField(i)));
528 }
529 }
530
531 int responseCode = connection.getResponseCode();
532 // Important to copy the list here, because although we never co ncurrently modify
533 // the list ourselves, user code might iterate over it while we' re redirecting, and
534 // that would throw ConcurrentModificationException.
535 mUrlResponseInfo = new UrlResponseInfo(new ArrayList<>(mUrlChain ), responseCode,
536 connection.getResponseMessage(), Collections.unmodifiabl eList(headerList),
537 false, selectedTransport, "");
538 // TODO(clm) actual redirect handling? post -> get and whatnot?
539 if (responseCode >= 300 && responseCode < 400) {
540 fireRedirectReceived(mUrlResponseInfo.getAllHeaders());
541 return;
542 }
543 fireCloseUploadDataProvider();
544 if (responseCode >= 400) {
545 mResponseChannel = InputStreamChannel.wrap(connection.getErr orStream());
546 mCallbackAsync.onResponseStarted(mUrlResponseInfo);
547 } else {
548 mResponseChannel = InputStreamChannel.wrap(connection.getInp utStream());
549 mCallbackAsync.onResponseStarted(mUrlResponseInfo);
550 }
551 }
552 }));
553 }
554
555 private void fireCloseUploadDataProvider() {
556 if (mUploadDataProvider != null && mUploadProviderClosed.compareAndSet(f alse, true)) {
557 try {
558 mUploadExecutor.execute(uploadErrorSetting(new CheckedRunnable() {
559 @Override
560 public void run() throws Exception {
561 mUploadDataProvider.close();
562 }
563 }));
564 } catch (RejectedExecutionException e) {
565 Log.e(TAG, "Exception when closing uploadDataProvider", e);
566 }
567 }
568 }
569
570 private void fireRedirectReceived(final Map<String, List<String>> headerFiel ds) {
571 transitionStates(State.STARTED, State.REDIRECT_RECEIVED, new Runnable() {
572 @Override
573 public void run() {
574 mPendingRedirectUrl = URI.create(mCurrentUrl)
575 .resolve(headerFields.get("locatio n").get(0))
576 .toString();
577 mUrlChain.add(mPendingRedirectUrl);
578 transitionStates(
579 State.REDIRECT_RECEIVED, State.AWAITING_FOLLOW_REDIRECT, new Runnable() {
580 @Override
581 public void run() {
582 mCallbackAsync.onRedirectReceived(
583 mUrlResponseInfo, mPendingRedirectUrl);
584 }
585 });
586 }
587 });
588 }
589
590 private void fireOpenConnection() {
591 mExecutor.execute(errorSetting(new CheckedRunnable() {
592 @Override
593 public void run() throws Exception {
594 // If we're cancelled, then our old connection will be disconnec ted for us and
595 // we shouldn't open a new one.
596 if (mState.get() == State.CANCELLED) {
597 return;
598 }
599
600 final URL url = new URL(mCurrentUrl);
601 HttpURLConnection newConnection = (HttpURLConnection) url.openCo nnection();
602 HttpURLConnection oldConnection = mCurrentUrlConnection.getAndSe t(newConnection);
603 if (oldConnection != null) {
604 oldConnection.disconnect();
605 }
606 newConnection.setInstanceFollowRedirects(false);
607 if (!mRequestHeaders.containsKey(USER_AGENT)) {
608 mRequestHeaders.put(USER_AGENT, mUserAgent);
609 }
610 for (Map.Entry<String, String> entry : mRequestHeaders.entrySet( )) {
611 newConnection.setRequestProperty(entry.getKey(), entry.getVa lue());
612 }
613 if (mInitialMethod == null) {
614 mInitialMethod = "GET";
615 }
616 newConnection.setRequestMethod(mInitialMethod);
617 if (mUploadDataProvider != null) {
618 OutputStreamDataSink dataSink = new OutputStreamDataSink(
619 mUploadExecutor, mExecutor, newConnection, mUploadDa taProvider);
620 dataSink.start(mUrlChain.size() == 1);
621 } else {
622 mAdditionalStatusDetails = Status.CONNECTING;
623 newConnection.connect();
624 fireGetHeaders();
625 }
626 }
627 }));
628 }
629
630 private Runnable errorSetting(final CheckedRunnable delegate) {
631 return new Runnable() {
632 @Override
633 public void run() {
634 try {
635 delegate.run();
636 } catch (Throwable t) {
637 enterCronetErrorState(t);
638 }
639 }
640 };
641 }
642
643 private Runnable userErrorSetting(final CheckedRunnable delegate) {
644 return new Runnable() {
645 @Override
646 public void run() {
647 try {
648 delegate.run();
649 } catch (Throwable t) {
650 enterUserErrorState(t);
651 }
652 }
653 };
654 }
655
656 private Runnable uploadErrorSetting(final CheckedRunnable delegate) {
657 return new Runnable() {
658 @Override
659 public void run() {
660 try {
661 delegate.run();
662 } catch (Throwable t) {
663 enterUploadErrorState(t);
664 }
665 }
666 };
667 }
668
669 private interface CheckedRunnable { void run() throws Exception; }
670
671 @Override
672 public void read(final ByteBuffer buffer) {
673 Preconditions.checkDirect(buffer);
674 Preconditions.checkHasRemaining(buffer);
675 transitionStates(State.AWAITING_READ, State.READING, new Runnable() {
676 @Override
677 public void run() {
678 mExecutor.execute(errorSetting(new CheckedRunnable() {
679 @Override
680 public void run() throws Exception {
681 int read = mResponseChannel.read(buffer);
682 processReadResult(read, buffer);
683 }
684 }));
685 }
686 });
687 }
688
689 private void processReadResult(int read, final ByteBuffer buffer) throws IOE xception {
690 if (read != -1) {
691 mCallbackAsync.onReadCompleted(mUrlResponseInfo, buffer);
692 } else {
693 mResponseChannel.close();
694 if (mState.compareAndSet(State.READING, State.COMPLETE)) {
695 fireDisconnect();
696 mCallbackAsync.onSucceeded(mUrlResponseInfo);
697 }
698 }
699 }
700
701 private void fireDisconnect() {
702 final HttpURLConnection connection = mCurrentUrlConnection.getAndSet(nul l);
703 if (connection != null) {
704 mExecutor.execute(new Runnable() {
705 @Override
706 public void run() {
707 connection.disconnect();
708 }
709 });
710 }
711 }
712
713 @Override
714 public void cancel() {
715 State oldState = mState.getAndSet(State.CANCELLED);
716 switch (oldState) {
717 // We've just scheduled some user code to run. When they perform the ir next operation,
718 // they'll observe it and fail. However, if user code is cancelling in response to one
719 // of these callbacks, we'll never actually cancel!
720 // TODO(clm) figure out if it's possible to avoid concurrency in use r callbacks.
721 case REDIRECT_RECEIVED:
722 case AWAITING_FOLLOW_REDIRECT:
723 case AWAITING_READ:
724
725 // User code is waiting on us - cancel away!
726 case STARTED:
727 case READING:
728 fireDisconnect();
729 fireCloseUploadDataProvider();
730 mCallbackAsync.onCanceled(mUrlResponseInfo);
731 break;
732 // The rest are all termination cases - we're too late to cancel.
733 case ERROR:
734 case COMPLETE:
735 case CANCELLED:
736 break;
737 }
738 }
739
740 @Override
741 public boolean isDone() {
742 State state = mState.get();
743 return state == State.COMPLETE | state == State.ERROR | state == State.C ANCELLED;
744 }
745
746 @Override
747 public void getStatus(StatusListener listener) {
748 State state = mState.get();
749 int extraStatus = this.mAdditionalStatusDetails;
750
751 @Status.StatusValues final int status;
752 switch (state) {
753 case ERROR:
754 case COMPLETE:
755 case CANCELLED:
756 case NOT_STARTED:
757 status = Status.INVALID;
758 break;
759 case STARTED:
760 status = extraStatus;
761 break;
762 case REDIRECT_RECEIVED:
763 case AWAITING_FOLLOW_REDIRECT:
764 case AWAITING_READ:
765 status = Status.IDLE;
766 break;
767 case READING:
768 status = Status.READING_RESPONSE;
769 break;
770 default:
771 throw new IllegalStateException("Switch is exhaustive: " + state );
772 }
773
774 mCallbackAsync.sendStatus(listener, status);
775 }
776
777 /** This wrapper ensures that callbacks are always called on the correct exe cutor */
778 private final class AsyncUrlRequestCallback {
779 final UrlRequest.Callback mCallback;
780 final Executor mUserExecutor;
781 final Executor mFallbackExecutor;
782
783 AsyncUrlRequestCallback(Callback callback, final Executor userExecutor) {
784 this.mCallback = callback;
785 if (mAllowDirectExecutor) {
786 this.mUserExecutor = userExecutor;
787 this.mFallbackExecutor = null;
788 } else {
789 mUserExecutor = new DirectPreventingExecutor(userExecutor);
790 mFallbackExecutor = userExecutor;
791 }
792 }
793
794 void sendStatus(final StatusListener listener, final int status) {
795 mUserExecutor.execute(new Runnable() {
796 @Override
797 public void run() {
798 listener.onStatus(status);
799 }
800 });
801 }
802
803 void execute(CheckedRunnable runnable) {
804 try {
805 mUserExecutor.execute(userErrorSetting(runnable));
806 } catch (RejectedExecutionException e) {
807 enterErrorState(new UrlRequestException("Exception posting task to executor", e));
808 }
809 }
810
811 void onRedirectReceived(final UrlResponseInfo info, final String newLoca tionUrl) {
812 execute(new CheckedRunnable() {
813 @Override
814 public void run() throws Exception {
815 mCallback.onRedirectReceived(JavaUrlRequest.this, info, newL ocationUrl);
816 }
817 });
818 }
819
820 void onResponseStarted(UrlResponseInfo info) {
821 execute(new CheckedRunnable() {
822 @Override
823 public void run() throws Exception {
824 if (mState.compareAndSet(State.STARTED, State.AWAITING_READ) ) {
825 mCallback.onResponseStarted(JavaUrlRequest.this, mUrlRes ponseInfo);
826 }
827 }
828 });
829 }
830
831 void onReadCompleted(final UrlResponseInfo info, final ByteBuffer byteBu ffer) {
832 execute(new CheckedRunnable() {
833 @Override
834 public void run() throws Exception {
835 if (mState.compareAndSet(State.READING, State.AWAITING_READ) ) {
836 mCallback.onReadCompleted(JavaUrlRequest.this, info, byt eBuffer);
837 }
838 }
839 });
840 }
841
842 void onCanceled(final UrlResponseInfo info) {
843 closeResponseChannel();
844 mUserExecutor.execute(new Runnable() {
845 @Override
846 public void run() {
847 try {
848 mCallback.onCanceled(JavaUrlRequest.this, info);
849 } catch (Exception exception) {
850 Log.e(TAG, "Exception in onCanceled method", exception);
851 }
852 }
853 });
854 }
855
856 void onSucceeded(final UrlResponseInfo info) {
857 mUserExecutor.execute(new Runnable() {
858 @Override
859 public void run() {
860 try {
861 mCallback.onSucceeded(JavaUrlRequest.this, info);
862 } catch (Exception exception) {
863 Log.e(TAG, "Exception in onSucceeded method", exception) ;
864 }
865 }
866 });
867 }
868
869 void onFailed(final UrlResponseInfo urlResponseInfo, final UrlRequestExc eption e) {
870 closeResponseChannel();
871 Runnable runnable = new Runnable() {
872 @Override
873 public void run() {
874 try {
875 mCallback.onFailed(JavaUrlRequest.this, urlResponseInfo, e);
876 } catch (Exception exception) {
877 Log.e(TAG, "Exception in onFailed method", exception);
878 }
879 }
880 };
881 try {
882 mUserExecutor.execute(runnable);
883 } catch (InlineExecutionProhibitedException wasDirect) {
884 if (mFallbackExecutor != null) {
885 mFallbackExecutor.execute(runnable);
886 }
887 }
888 }
889 }
890
891 private void closeResponseChannel() {
892 final Closeable closeable = mResponseChannel;
893 if (closeable == null) {
894 return;
895 }
896 mResponseChannel = null;
897 mExecutor.execute(new Runnable() {
898 @Override
899 public void run() {
900 try {
901 closeable.close();
902 } catch (IOException e) {
903 e.printStackTrace();
904 }
905 }
906 });
907 }
908
909 /**
910 * Executor that detects and throws if its mDelegate runs a submitted runnab le inline.
911 */
912 static final class DirectPreventingExecutor implements Executor {
913 private final Executor mDelegate;
914
915 DirectPreventingExecutor(Executor delegate) {
916 this.mDelegate = delegate;
917 }
918
919 @Override
920 public void execute(Runnable command) {
921 Thread currentThread = Thread.currentThread();
922 InlineCheckingRunnable runnable = new InlineCheckingRunnable(command , currentThread);
923 mDelegate.execute(runnable);
924 if (runnable.mExecutedInline != null) {
925 throw runnable.mExecutedInline;
926 } else {
927 // It's possible that this method is being called on an executor , and the runnable
928 // that
929 // was just queued will run on this thread after the current run nable returns. By
930 // nulling out the mCallingThread field, the InlineCheckingRunna ble's current thread
931 // comparison will not fire.
932 runnable.mCallingThread = null;
933 }
934 }
935
936 private static final class InlineCheckingRunnable implements Runnable {
937 private final Runnable mCommand;
938 private Thread mCallingThread;
939 private InlineExecutionProhibitedException mExecutedInline = null;
940
941 private InlineCheckingRunnable(Runnable command, Thread callingThrea d) {
942 this.mCommand = command;
943 this.mCallingThread = callingThread;
944 }
945
946 @Override
947 public void run() {
948 if (Thread.currentThread() == mCallingThread) {
949 // Can't throw directly from here, since the delegate execut or could catch this
950 // exception.
951 mExecutedInline = new InlineExecutionProhibitedException();
952 return;
953 }
954 mCommand.run();
955 }
956 }
957 }
958 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698