Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(265)

Side by Side Diff: components/cronet/android/api/src/org/chromium/net/JavaUrlRequest.java

Issue 2339223002: Cronet API Refactoring (Closed)
Patch Set: Rebase & Conflict Resolution Created 4 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package org.chromium.net;
6
7 import android.annotation.SuppressLint;
8 import android.annotation.TargetApi;
9 import android.net.TrafficStats;
10 import android.os.Build;
11 import android.util.Log;
12
13 import java.io.Closeable;
14 import java.io.IOException;
15 import java.io.OutputStream;
16 import java.net.HttpURLConnection;
17 import java.net.URI;
18 import java.net.URL;
19 import java.nio.ByteBuffer;
20 import java.nio.channels.Channels;
21 import java.nio.channels.ReadableByteChannel;
22 import java.nio.channels.WritableByteChannel;
23 import java.util.AbstractMap.SimpleEntry;
24 import java.util.ArrayList;
25 import java.util.Collections;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.TreeMap;
29 import java.util.concurrent.Executor;
30 import java.util.concurrent.RejectedExecutionException;
31 import java.util.concurrent.atomic.AtomicBoolean;
32 import java.util.concurrent.atomic.AtomicReference;
33
34 /**
35 * Pure java UrlRequest, backed by {@link HttpURLConnection}.
36 */
37 @TargetApi(Build.VERSION_CODES.ICE_CREAM_SANDWICH) // TrafficStats only availabl e on ICS
38 final class JavaUrlRequest implements UrlRequest {
39 private static final String X_ANDROID = "X-Android";
40 private static final String X_ANDROID_SELECTED_TRANSPORT = "X-Android-Select ed-Transport";
41 private static final String TAG = "JavaUrlConnection";
42 private static final int DEFAULT_UPLOAD_BUFFER_SIZE = 8192;
43 private static final int DEFAULT_CHUNK_LENGTH = DEFAULT_UPLOAD_BUFFER_SIZE;
44 private static final String USER_AGENT = "User-Agent";
45 private final AsyncUrlRequestCallback mCallbackAsync;
46 private final Executor mExecutor;
47 private final String mUserAgent;
48 private final Map<String, String> mRequestHeaders =
49 new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
50 private final List<String> mUrlChain = new ArrayList<>();
51 /**
52 * This is the source of thread safety in this class - no other synchronizat ion is performed.
53 * By compare-and-swapping from one state to another, we guarantee that oper ations aren't
54 * running concurrently. Only the winner of a CAS proceeds.
55 *
56 * <p>A caller can lose a CAS for three reasons - user error (two calls to r ead() without
57 * waiting for the read to succeed), runtime error (network code or user cod e throws an
58 * exception), or cancellation.
59 */
60 private final AtomicReference<State> mState = new AtomicReference<>(State.NO T_STARTED);
61 private final AtomicBoolean mUploadProviderClosed = new AtomicBoolean(false) ;
62
63 /**
64 * Traffic stats tag to associate this requests' data use with. It's capture d when the request
65 * is created, so that applications doing work on behalf of another app can correctly attribute
66 * that data use.
67 */
68 private final int mTrafficStatsTag;
69 private final boolean mAllowDirectExecutor;
70
71 /* These don't change with redirects */
72 private String mInitialMethod;
73 private UploadDataProvider mUploadDataProvider;
74 private Executor mUploadExecutor;
75
76 /**
77 * Holds a subset of StatusValues - {@link State#STARTED} can represent
78 * {@link Status#SENDING_REQUEST} or {@link Status#WAITING_FOR_RESPONSE}. Wh ile the distinction
79 * isn't needed to implement the logic in this class, it is needed to implem ent
80 * {@link #getStatus(StatusListener)}.
81 *
82 * <p>Concurrency notes - this value is not atomically updated with mState, so there is some
83 * risk that we'd get an inconsistent snapshot of both - however, it also ha ppens that this
84 * value is only used with the STARTED state, so it's inconsequential.
85 */
86 @Status.StatusValues private volatile int mAdditionalStatusDetails = Status. INVALID;
87
88 /* These change with redirects. */
89 private String mCurrentUrl;
90 private ReadableByteChannel mResponseChannel;
91 private UrlResponseInfo mUrlResponseInfo;
92 private String mPendingRedirectUrl;
93 /**
94 * The happens-before edges created by the executor submission and AtomicRef erence setting are
95 * sufficient to guarantee the correct behavior of this field; however, this is an
96 * AtomicReference so that we can cleanly dispose of a new connection if we' re cancelled during
97 * a redirect, which requires get-and-set semantics.
98 * */
99 private final AtomicReference<HttpURLConnection> mCurrentUrlConnection =
100 new AtomicReference<>();
101
102 /**
103 * /- AWAITING_FOLLOW_REDIRECT <- REDIRECT_RECEIVED <-\ /- READING <--\
104 * | | | |
105 * \ / \ /
106 * NOT_STARTED ---> STARTED ----> AW AITING_READ --->
107 * COMPLETE
108 */
109 private enum State {
110 NOT_STARTED,
111 STARTED,
112 REDIRECT_RECEIVED,
113 AWAITING_FOLLOW_REDIRECT,
114 AWAITING_READ,
115 READING,
116 ERROR,
117 COMPLETE,
118 CANCELLED,
119 }
120
121 /**
122 * @param executor The executor used for reading and writing from sockets
123 * @param userExecutor The executor used to dispatch to {@code callback}
124 */
125 JavaUrlRequest(Callback callback, final Executor executor, Executor userExec utor, String url,
126 String userAgent, boolean allowDirectExecutor) {
127 if (url == null) {
128 throw new NullPointerException("URL is required");
129 }
130 if (callback == null) {
131 throw new NullPointerException("Listener is required");
132 }
133 if (executor == null) {
134 throw new NullPointerException("Executor is required");
135 }
136 if (userExecutor == null) {
137 throw new NullPointerException("userExecutor is required");
138 }
139
140 this.mAllowDirectExecutor = allowDirectExecutor;
141 this.mCallbackAsync = new AsyncUrlRequestCallback(callback, userExecutor );
142 this.mTrafficStatsTag = TrafficStats.getThreadStatsTag();
143 this.mExecutor = new Executor() {
144 @Override
145 public void execute(final Runnable command) {
146 executor.execute(new Runnable() {
147 @Override
148 public void run() {
149 int oldTag = TrafficStats.getThreadStatsTag();
150 TrafficStats.setThreadStatsTag(mTrafficStatsTag);
151 try {
152 command.run();
153 } finally {
154 TrafficStats.setThreadStatsTag(oldTag);
155 }
156 }
157 });
158 }
159 };
160 this.mCurrentUrl = url;
161 this.mUserAgent = userAgent;
162 }
163
164 @Override
165 public void setHttpMethod(String method) {
166 checkNotStarted();
167 if (method == null) {
168 throw new NullPointerException("Method is required.");
169 }
170 if ("OPTIONS".equalsIgnoreCase(method) || "GET".equalsIgnoreCase(method)
171 || "HEAD".equalsIgnoreCase(method) || "POST".equalsIgnoreCase(me thod)
172 || "PUT".equalsIgnoreCase(method) || "DELETE".equalsIgnoreCase(m ethod)
173 || "TRACE".equalsIgnoreCase(method) || "PATCH".equalsIgnoreCase( method)) {
174 mInitialMethod = method;
175 } else {
176 throw new IllegalArgumentException("Invalid http method " + method);
177 }
178 }
179
180 private void checkNotStarted() {
181 State state = mState.get();
182 if (state != State.NOT_STARTED) {
183 throw new IllegalStateException("Request is already started. State i s: " + state);
184 }
185 }
186
187 @Override
188 public void addHeader(String header, String value) {
189 checkNotStarted();
190 if (!isValidHeaderName(header) || value.contains("\r\n")) {
191 throw new IllegalArgumentException("Invalid header " + header + "=" + value);
192 }
193 if (mRequestHeaders.containsKey(header)) {
194 mRequestHeaders.remove(header);
195 }
196 mRequestHeaders.put(header, value);
197 }
198
199 private boolean isValidHeaderName(String header) {
200 for (int i = 0; i < header.length(); i++) {
201 char c = header.charAt(i);
202 switch (c) {
203 case '(':
204 case ')':
205 case '<':
206 case '>':
207 case '@':
208 case ',':
209 case ';':
210 case ':':
211 case '\\':
212 case '\'':
213 case '/':
214 case '[':
215 case ']':
216 case '?':
217 case '=':
218 case '{':
219 case '}':
220 return false;
221 default: {
222 if (Character.isISOControl(c) || Character.isWhitespace(c)) {
223 return false;
224 }
225 }
226 }
227 }
228 return true;
229 }
230
231 @Override
232 public void setUploadDataProvider(UploadDataProvider uploadDataProvider, Exe cutor executor) {
233 if (uploadDataProvider == null) {
234 throw new NullPointerException("Invalid UploadDataProvider.");
235 }
236 if (!mRequestHeaders.containsKey("Content-Type")) {
237 throw new IllegalArgumentException(
238 "Requests with upload data must have a Content-Type.");
239 }
240 checkNotStarted();
241 if (mInitialMethod == null) {
242 mInitialMethod = "POST";
243 }
244 this.mUploadDataProvider = uploadDataProvider;
245 if (mAllowDirectExecutor) {
246 this.mUploadExecutor = executor;
247 } else {
248 this.mUploadExecutor = new DirectPreventingExecutor(executor);
249 }
250 }
251
252 private enum SinkState {
253 AWAITING_READ_RESULT,
254 AWAITING_REWIND_RESULT,
255 UPLOADING,
256 NOT_STARTED,
257 }
258
259 private final class OutputStreamDataSink implements UploadDataSink {
260 final AtomicReference<SinkState> mSinkState = new AtomicReference<>(Sink State.NOT_STARTED);
261 final Executor mUserUploadExecutor;
262 final Executor mExecutor;
263 final HttpURLConnection mUrlConnection;
264 WritableByteChannel mOutputChannel;
265 OutputStream mUrlConnectionOutputStream;
266 final UploadDataProvider mUploadProvider;
267 ByteBuffer mBuffer;
268 /** This holds the total bytes to send (the content-length). -1 if unkno wn. */
269 long mTotalBytes;
270 /** This holds the bytes written so far */
271 long mWrittenBytes = 0;
272
273 OutputStreamDataSink(final Executor userExecutor, Executor executor,
274 HttpURLConnection urlConnection, UploadDataProvider provider) {
275 this.mUserUploadExecutor = new Executor() {
276 @Override
277 public void execute(Runnable runnable) {
278 try {
279 userExecutor.execute(runnable);
280 } catch (RejectedExecutionException e) {
281 enterUploadErrorState(e);
282 }
283 }
284 };
285 this.mExecutor = executor;
286 this.mUrlConnection = urlConnection;
287 this.mUploadProvider = provider;
288 }
289
290 @Override
291 @SuppressLint("DefaultLocale")
292 public void onReadSucceeded(final boolean finalChunk) {
293 if (!mSinkState.compareAndSet(SinkState.AWAITING_READ_RESULT, SinkSt ate.UPLOADING)) {
294 throw new IllegalStateException(
295 "Not expecting a read result, expecting: " + mSinkState. get());
296 }
297 mExecutor.execute(errorSetting(new CheckedRunnable() {
298 @Override
299 public void run() throws Exception {
300 mBuffer.flip();
301 if (mTotalBytes != -1 && mTotalBytes - mWrittenBytes < mBuff er.remaining()) {
302 enterUploadErrorState(new IllegalArgumentException(Strin g.format(
303 "Read upload data length %d exceeds expected len gth %d",
304 mWrittenBytes + mBuffer.remaining(), mTotalBytes )));
305 return;
306 }
307 while (mBuffer.hasRemaining()) {
308 mWrittenBytes += mOutputChannel.write(mBuffer);
309 }
310 // Forces a chunk to be sent, rather than buffering to the D EFAULT_CHUNK_LENGTH.
311 // This allows clients to trickle-upload bytes as they becom e available without
312 // introducing latency due to buffering.
313 mUrlConnectionOutputStream.flush();
314
315 if (mWrittenBytes < mTotalBytes || (mTotalBytes == -1 && !fi nalChunk)) {
316 mBuffer.clear();
317 mSinkState.set(SinkState.AWAITING_READ_RESULT);
318 executeOnUploadExecutor(new CheckedRunnable() {
319 @Override
320 public void run() throws Exception {
321 mUploadProvider.read(OutputStreamDataSink.this, mBuffer);
322 }
323 });
324 } else if (mTotalBytes == -1) {
325 finish();
326 } else if (mTotalBytes == mWrittenBytes) {
327 finish();
328 } else {
329 enterUploadErrorState(new IllegalArgumentException(Strin g.format(
330 "Read upload data length %d exceeds expected len gth %d",
331 mWrittenBytes, mTotalBytes)));
332 }
333 }
334 }));
335 }
336
337 @Override
338 public void onRewindSucceeded() {
339 if (!mSinkState.compareAndSet(SinkState.AWAITING_REWIND_RESULT, Sink State.UPLOADING)) {
340 throw new IllegalStateException("Not expecting a read result");
341 }
342 startRead();
343 }
344
345 @Override
346 public void onReadError(Exception exception) {
347 enterUploadErrorState(exception);
348 }
349
350 @Override
351 public void onRewindError(Exception exception) {
352 enterUploadErrorState(exception);
353 }
354
355 void startRead() {
356 mExecutor.execute(errorSetting(new CheckedRunnable() {
357 @Override
358 public void run() throws Exception {
359 if (mOutputChannel == null) {
360 mAdditionalStatusDetails = Status.CONNECTING;
361 mUrlConnection.connect();
362 mAdditionalStatusDetails = Status.SENDING_REQUEST;
363 mUrlConnectionOutputStream = mUrlConnection.getOutputStr eam();
364 mOutputChannel = Channels.newChannel(mUrlConnectionOutpu tStream);
365 }
366 mSinkState.set(SinkState.AWAITING_READ_RESULT);
367 executeOnUploadExecutor(new CheckedRunnable() {
368 @Override
369 public void run() throws Exception {
370 mUploadProvider.read(OutputStreamDataSink.this, mBuf fer);
371 }
372 });
373 }
374 }));
375 }
376
377 private void executeOnUploadExecutor(CheckedRunnable runnable) {
378 try {
379 mUserUploadExecutor.execute(uploadErrorSetting(runnable));
380 } catch (RejectedExecutionException e) {
381 enterUploadErrorState(e);
382 }
383 }
384
385 void finish() throws IOException {
386 if (mOutputChannel != null) {
387 mOutputChannel.close();
388 }
389 fireGetHeaders();
390 }
391
392 void start(final boolean firstTime) {
393 executeOnUploadExecutor(new CheckedRunnable() {
394 @Override
395 public void run() throws Exception {
396 mTotalBytes = mUploadProvider.getLength();
397 if (mTotalBytes == 0) {
398 finish();
399 } else {
400 // If we know how much data we have to upload, and it's small, we can save
401 // memory by allocating a reasonably sized buffer to rea d into.
402 if (mTotalBytes > 0 && mTotalBytes < DEFAULT_UPLOAD_BUFF ER_SIZE) {
403 // Allocate one byte more than necessary, to detect callers uploading
404 // more bytes than they specified in length.
405 mBuffer = ByteBuffer.allocateDirect((int) mTotalByte s + 1);
406 } else {
407 mBuffer = ByteBuffer.allocateDirect(DEFAULT_UPLOAD_B UFFER_SIZE);
408 }
409
410 if (mTotalBytes > 0 && mTotalBytes <= Integer.MAX_VALUE) {
411 mUrlConnection.setFixedLengthStreamingMode((int) mTo talBytes);
412 } else if (mTotalBytes > Integer.MAX_VALUE
413 && Build.VERSION.SDK_INT >= Build.VERSION_CODES. KITKAT) {
414 mUrlConnection.setFixedLengthStreamingMode(mTotalByt es);
415 } else {
416 // If we know the length, but we're running pre-kitk at and it's larger
417 // than an int can hold, we have to use chunked - ot herwise we'll end up
418 // buffering the whole response in memory.
419 mUrlConnection.setChunkedStreamingMode(DEFAULT_CHUNK _LENGTH);
420 }
421 if (firstTime) {
422 startRead();
423 } else {
424 mSinkState.set(SinkState.AWAITING_REWIND_RESULT);
425 mUploadProvider.rewind(OutputStreamDataSink.this);
426 }
427 }
428 }
429 });
430 }
431 }
432
433 @Override
434 public void start() {
435 mAdditionalStatusDetails = Status.CONNECTING;
436 transitionStates(State.NOT_STARTED, State.STARTED, new Runnable() {
437 @Override
438 public void run() {
439 mUrlChain.add(mCurrentUrl);
440 fireOpenConnection();
441 }
442 });
443 }
444
445 private void enterErrorState(final UrlRequestException error) {
446 if (setTerminalState(State.ERROR)) {
447 fireDisconnect();
448 fireCloseUploadDataProvider();
449 mCallbackAsync.onFailed(mUrlResponseInfo, error);
450 }
451 }
452
453 private boolean setTerminalState(State error) {
454 while (true) {
455 State oldState = mState.get();
456 switch (oldState) {
457 case NOT_STARTED:
458 throw new IllegalStateException("Can't enter error state bef ore start");
459 case ERROR: // fallthrough
460 case COMPLETE: // fallthrough
461 case CANCELLED:
462 return false; // Already in a terminal state
463 default: {
464 if (mState.compareAndSet(oldState, error)) {
465 return true;
466 }
467 }
468 }
469 }
470 }
471
472 /** Ends the request with an error, caused by an exception thrown from user code. */
473 private void enterUserErrorState(final Throwable error) {
474 enterErrorState(
475 new UrlRequestException("Exception received from UrlRequest.Call back", error));
476 }
477
478 /** Ends the request with an error, caused by an exception thrown from user code. */
479 private void enterUploadErrorState(final Throwable error) {
480 enterErrorState(
481 new UrlRequestException("Exception received from UploadDataProvi der", error));
482 }
483
484 private void enterCronetErrorState(final Throwable error) {
485 // TODO(clm) mapping from Java exception (UnknownHostException, for exam ple) to net error
486 // code goes here.
487 enterErrorState(new UrlRequestException("System error", error));
488 }
489
490 /**
491 * Atomically swaps from the expected state to a new state. If the swap fail s, and it's not
492 * due to an earlier error or cancellation, throws an exception.
493 *
494 * @param afterTransition Callback to run after transition completes success fully.
495 */
496 private void transitionStates(State expected, State newState, Runnable after Transition) {
497 if (!mState.compareAndSet(expected, newState)) {
498 State state = mState.get();
499 if (!(state == State.CANCELLED || state == State.ERROR)) {
500 throw new IllegalStateException(
501 "Invalid state transition - expected " + expected + " bu t was " + state);
502 }
503 } else {
504 afterTransition.run();
505 }
506 }
507
508 @Override
509 public void followRedirect() {
510 transitionStates(State.AWAITING_FOLLOW_REDIRECT, State.STARTED, new Runn able() {
511 @Override
512 public void run() {
513 mCurrentUrl = mPendingRedirectUrl;
514 mPendingRedirectUrl = null;
515 fireOpenConnection();
516 }
517 });
518 }
519
520 private void fireGetHeaders() {
521 mAdditionalStatusDetails = Status.WAITING_FOR_RESPONSE;
522 mExecutor.execute(errorSetting(new CheckedRunnable() {
523 @Override
524 public void run() throws Exception {
525 HttpURLConnection connection = mCurrentUrlConnection.get();
526 if (connection == null) {
527 return; // We've been cancelled
528 }
529 final List<Map.Entry<String, String>> headerList = new ArrayList <>();
530 String selectedTransport = "http/1.1";
531 String headerKey;
532 for (int i = 0; (headerKey = connection.getHeaderFieldKey(i)) != null; i++) {
533 if (X_ANDROID_SELECTED_TRANSPORT.equalsIgnoreCase(headerKey) ) {
534 selectedTransport = connection.getHeaderField(i);
535 }
536 if (!headerKey.startsWith(X_ANDROID)) {
537 headerList.add(new SimpleEntry<>(headerKey, connection.g etHeaderField(i)));
538 }
539 }
540
541 int responseCode = connection.getResponseCode();
542 // Important to copy the list here, because although we never co ncurrently modify
543 // the list ourselves, user code might iterate over it while we' re redirecting, and
544 // that would throw ConcurrentModificationException.
545 mUrlResponseInfo = new UrlResponseInfo(new ArrayList<>(mUrlChain ), responseCode,
546 connection.getResponseMessage(), Collections.unmodifiabl eList(headerList),
547 false, selectedTransport, "");
548 // TODO(clm) actual redirect handling? post -> get and whatnot?
549 if (responseCode >= 300 && responseCode < 400) {
550 fireRedirectReceived(mUrlResponseInfo.getAllHeaders());
551 return;
552 }
553 fireCloseUploadDataProvider();
554 if (responseCode >= 400) {
555 mResponseChannel = InputStreamChannel.wrap(connection.getErr orStream());
556 mCallbackAsync.onResponseStarted(mUrlResponseInfo);
557 } else {
558 mResponseChannel = InputStreamChannel.wrap(connection.getInp utStream());
559 mCallbackAsync.onResponseStarted(mUrlResponseInfo);
560 }
561 }
562 }));
563 }
564
565 private void fireCloseUploadDataProvider() {
566 if (mUploadDataProvider != null && mUploadProviderClosed.compareAndSet(f alse, true)) {
567 try {
568 mUploadExecutor.execute(uploadErrorSetting(new CheckedRunnable() {
569 @Override
570 public void run() throws Exception {
571 mUploadDataProvider.close();
572 }
573 }));
574 } catch (RejectedExecutionException e) {
575 Log.e(TAG, "Exception when closing uploadDataProvider", e);
576 }
577 }
578 }
579
580 private void fireRedirectReceived(final Map<String, List<String>> headerFiel ds) {
581 transitionStates(State.STARTED, State.REDIRECT_RECEIVED, new Runnable() {
582 @Override
583 public void run() {
584 mPendingRedirectUrl = URI.create(mCurrentUrl)
585 .resolve(headerFields.get("locatio n").get(0))
586 .toString();
587 mUrlChain.add(mPendingRedirectUrl);
588 transitionStates(
589 State.REDIRECT_RECEIVED, State.AWAITING_FOLLOW_REDIRECT, new Runnable() {
590 @Override
591 public void run() {
592 mCallbackAsync.onRedirectReceived(
593 mUrlResponseInfo, mPendingRedirectUrl);
594 }
595 });
596 }
597 });
598 }
599
600 private void fireOpenConnection() {
601 mExecutor.execute(errorSetting(new CheckedRunnable() {
602 @Override
603 public void run() throws Exception {
604 // If we're cancelled, then our old connection will be disconnec ted for us and
605 // we shouldn't open a new one.
606 if (mState.get() == State.CANCELLED) {
607 return;
608 }
609
610 final URL url = new URL(mCurrentUrl);
611 HttpURLConnection newConnection = (HttpURLConnection) url.openCo nnection();
612 HttpURLConnection oldConnection = mCurrentUrlConnection.getAndSe t(newConnection);
613 if (oldConnection != null) {
614 oldConnection.disconnect();
615 }
616 newConnection.setInstanceFollowRedirects(false);
617 if (!mRequestHeaders.containsKey(USER_AGENT)) {
618 mRequestHeaders.put(USER_AGENT, mUserAgent);
619 }
620 for (Map.Entry<String, String> entry : mRequestHeaders.entrySet( )) {
621 newConnection.setRequestProperty(entry.getKey(), entry.getVa lue());
622 }
623 if (mInitialMethod == null) {
624 mInitialMethod = "GET";
625 }
626 newConnection.setRequestMethod(mInitialMethod);
627 if (mUploadDataProvider != null) {
628 OutputStreamDataSink dataSink = new OutputStreamDataSink(
629 mUploadExecutor, mExecutor, newConnection, mUploadDa taProvider);
630 dataSink.start(mUrlChain.size() == 1);
631 } else {
632 mAdditionalStatusDetails = Status.CONNECTING;
633 newConnection.connect();
634 fireGetHeaders();
635 }
636 }
637 }));
638 }
639
640 private Runnable errorSetting(final CheckedRunnable delegate) {
641 return new Runnable() {
642 @Override
643 public void run() {
644 try {
645 delegate.run();
646 } catch (Throwable t) {
647 enterCronetErrorState(t);
648 }
649 }
650 };
651 }
652
653 private Runnable userErrorSetting(final CheckedRunnable delegate) {
654 return new Runnable() {
655 @Override
656 public void run() {
657 try {
658 delegate.run();
659 } catch (Throwable t) {
660 enterUserErrorState(t);
661 }
662 }
663 };
664 }
665
666 private Runnable uploadErrorSetting(final CheckedRunnable delegate) {
667 return new Runnable() {
668 @Override
669 public void run() {
670 try {
671 delegate.run();
672 } catch (Throwable t) {
673 enterUploadErrorState(t);
674 }
675 }
676 };
677 }
678
679 private interface CheckedRunnable { void run() throws Exception; }
680
681 @Override
682 public void read(final ByteBuffer buffer) {
683 Preconditions.checkDirect(buffer);
684 Preconditions.checkHasRemaining(buffer);
685 transitionStates(State.AWAITING_READ, State.READING, new Runnable() {
686 @Override
687 public void run() {
688 mExecutor.execute(errorSetting(new CheckedRunnable() {
689 @Override
690 public void run() throws Exception {
691 int read = mResponseChannel.read(buffer);
692 processReadResult(read, buffer);
693 }
694 }));
695 }
696 });
697 }
698
699 private void processReadResult(int read, final ByteBuffer buffer) throws IOE xception {
700 if (read != -1) {
701 mCallbackAsync.onReadCompleted(mUrlResponseInfo, buffer);
702 } else {
703 mResponseChannel.close();
704 if (mState.compareAndSet(State.READING, State.COMPLETE)) {
705 fireDisconnect();
706 mCallbackAsync.onSucceeded(mUrlResponseInfo);
707 }
708 }
709 }
710
711 private void fireDisconnect() {
712 final HttpURLConnection connection = mCurrentUrlConnection.getAndSet(nul l);
713 if (connection != null) {
714 mExecutor.execute(new Runnable() {
715 @Override
716 public void run() {
717 connection.disconnect();
718 }
719 });
720 }
721 }
722
723 @Override
724 public void cancel() {
725 State oldState = mState.getAndSet(State.CANCELLED);
726 switch (oldState) {
727 // We've just scheduled some user code to run. When they perform the ir next operation,
728 // they'll observe it and fail. However, if user code is cancelling in response to one
729 // of these callbacks, we'll never actually cancel!
730 // TODO(clm) figure out if it's possible to avoid concurrency in use r callbacks.
731 case REDIRECT_RECEIVED:
732 case AWAITING_FOLLOW_REDIRECT:
733 case AWAITING_READ:
734
735 // User code is waiting on us - cancel away!
736 case STARTED:
737 case READING:
738 fireDisconnect();
739 fireCloseUploadDataProvider();
740 mCallbackAsync.onCanceled(mUrlResponseInfo);
741 break;
742 // The rest are all termination cases - we're too late to cancel.
743 case ERROR:
744 case COMPLETE:
745 case CANCELLED:
746 break;
747 }
748 }
749
750 @Override
751 public boolean isDone() {
752 State state = mState.get();
753 return state == State.COMPLETE | state == State.ERROR | state == State.C ANCELLED;
754 }
755
756 @Override
757 public void getStatus(StatusListener listener) {
758 State state = mState.get();
759 int extraStatus = this.mAdditionalStatusDetails;
760
761 @Status.StatusValues final int status;
762 switch (state) {
763 case ERROR:
764 case COMPLETE:
765 case CANCELLED:
766 case NOT_STARTED:
767 status = Status.INVALID;
768 break;
769 case STARTED:
770 status = extraStatus;
771 break;
772 case REDIRECT_RECEIVED:
773 case AWAITING_FOLLOW_REDIRECT:
774 case AWAITING_READ:
775 status = Status.IDLE;
776 break;
777 case READING:
778 status = Status.READING_RESPONSE;
779 break;
780 default:
781 throw new IllegalStateException("Switch is exhaustive: " + state );
782 }
783
784 mCallbackAsync.sendStatus(listener, status);
785 }
786
787 /** This wrapper ensures that callbacks are always called on the correct exe cutor */
788 private final class AsyncUrlRequestCallback {
789 final UrlRequest.Callback mCallback;
790 final Executor mUserExecutor;
791 final Executor mFallbackExecutor;
792
793 AsyncUrlRequestCallback(Callback callback, final Executor userExecutor) {
794 this.mCallback = callback;
795 if (mAllowDirectExecutor) {
796 this.mUserExecutor = userExecutor;
797 this.mFallbackExecutor = null;
798 } else {
799 mUserExecutor = new DirectPreventingExecutor(userExecutor);
800 mFallbackExecutor = userExecutor;
801 }
802 }
803
804 void sendStatus(final StatusListener listener, final int status) {
805 mUserExecutor.execute(new Runnable() {
806 @Override
807 public void run() {
808 listener.onStatus(status);
809 }
810 });
811 }
812
813 void execute(CheckedRunnable runnable) {
814 try {
815 mUserExecutor.execute(userErrorSetting(runnable));
816 } catch (RejectedExecutionException e) {
817 enterErrorState(new UrlRequestException("Exception posting task to executor", e));
818 }
819 }
820
821 void onRedirectReceived(final UrlResponseInfo info, final String newLoca tionUrl) {
822 execute(new CheckedRunnable() {
823 @Override
824 public void run() throws Exception {
825 mCallback.onRedirectReceived(JavaUrlRequest.this, info, newL ocationUrl);
826 }
827 });
828 }
829
830 void onResponseStarted(UrlResponseInfo info) {
831 execute(new CheckedRunnable() {
832 @Override
833 public void run() throws Exception {
834 if (mState.compareAndSet(State.STARTED, State.AWAITING_READ) ) {
835 mCallback.onResponseStarted(JavaUrlRequest.this, mUrlRes ponseInfo);
836 }
837 }
838 });
839 }
840
841 void onReadCompleted(final UrlResponseInfo info, final ByteBuffer byteBu ffer) {
842 execute(new CheckedRunnable() {
843 @Override
844 public void run() throws Exception {
845 if (mState.compareAndSet(State.READING, State.AWAITING_READ) ) {
846 mCallback.onReadCompleted(JavaUrlRequest.this, info, byt eBuffer);
847 }
848 }
849 });
850 }
851
852 void onCanceled(final UrlResponseInfo info) {
853 closeResponseChannel();
854 mUserExecutor.execute(new Runnable() {
855 @Override
856 public void run() {
857 try {
858 mCallback.onCanceled(JavaUrlRequest.this, info);
859 } catch (Exception exception) {
860 Log.e(TAG, "Exception in onCanceled method", exception);
861 }
862 }
863 });
864 }
865
866 void onSucceeded(final UrlResponseInfo info) {
867 mUserExecutor.execute(new Runnable() {
868 @Override
869 public void run() {
870 try {
871 mCallback.onSucceeded(JavaUrlRequest.this, info);
872 } catch (Exception exception) {
873 Log.e(TAG, "Exception in onSucceeded method", exception) ;
874 }
875 }
876 });
877 }
878
879 void onFailed(final UrlResponseInfo urlResponseInfo, final UrlRequestExc eption e) {
880 closeResponseChannel();
881 Runnable runnable = new Runnable() {
882 @Override
883 public void run() {
884 try {
885 mCallback.onFailed(JavaUrlRequest.this, urlResponseInfo, e);
886 } catch (Exception exception) {
887 Log.e(TAG, "Exception in onFailed method", exception);
888 }
889 }
890 };
891 try {
892 mUserExecutor.execute(runnable);
893 } catch (InlineExecutionProhibitedException wasDirect) {
894 if (mFallbackExecutor != null) {
895 mFallbackExecutor.execute(runnable);
896 }
897 }
898 }
899 }
900
901 private void closeResponseChannel() {
902 final Closeable closeable = mResponseChannel;
903 if (closeable == null) {
904 return;
905 }
906 mResponseChannel = null;
907 mExecutor.execute(new Runnable() {
908 @Override
909 public void run() {
910 try {
911 closeable.close();
912 } catch (IOException e) {
913 e.printStackTrace();
914 }
915 }
916 });
917 }
918
919 /**
920 * Executor that detects and throws if its mDelegate runs a submitted runnab le inline.
921 */
922 static final class DirectPreventingExecutor implements Executor {
923 private final Executor mDelegate;
924
925 DirectPreventingExecutor(Executor delegate) {
926 this.mDelegate = delegate;
927 }
928
929 @Override
930 public void execute(Runnable command) {
931 Thread currentThread = Thread.currentThread();
932 InlineCheckingRunnable runnable = new InlineCheckingRunnable(command , currentThread);
933 mDelegate.execute(runnable);
934 if (runnable.mExecutedInline != null) {
935 throw runnable.mExecutedInline;
936 } else {
937 // It's possible that this method is being called on an executor , and the runnable
938 // that
939 // was just queued will run on this thread after the current run nable returns. By
940 // nulling out the mCallingThread field, the InlineCheckingRunna ble's current thread
941 // comparison will not fire.
942 runnable.mCallingThread = null;
943 }
944 }
945
946 private static final class InlineCheckingRunnable implements Runnable {
947 private final Runnable mCommand;
948 private Thread mCallingThread;
949 private InlineExecutionProhibitedException mExecutedInline = null;
950
951 private InlineCheckingRunnable(Runnable command, Thread callingThrea d) {
952 this.mCommand = command;
953 this.mCallingThread = callingThread;
954 }
955
956 @Override
957 public void run() {
958 if (Thread.currentThread() == mCallingThread) {
959 // Can't throw directly from here, since the delegate execut or could catch this
960 // exception.
961 mExecutedInline = new InlineExecutionProhibitedException();
962 return;
963 }
964 mCommand.run();
965 }
966 }
967 }
968 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698