Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(141)

Side by Side Diff: components/cronet/android/api/src/org/chromium/net/JavaUrlRequest.java

Issue 2339223002: Cronet API Refactoring (Closed)
Patch Set: Rebased onto Charles change + Paul's Comments Created 4 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package org.chromium.net;
6
7 import android.annotation.TargetApi;
8 import android.net.TrafficStats;
9 import android.os.Build;
10 import android.util.Log;
11
12 import java.io.Closeable;
13 import java.io.IOException;
14 import java.io.OutputStream;
15 import java.net.HttpURLConnection;
16 import java.net.URI;
17 import java.net.URL;
18 import java.nio.ByteBuffer;
19 import java.nio.channels.Channels;
20 import java.nio.channels.ReadableByteChannel;
21 import java.nio.channels.WritableByteChannel;
22 import java.util.AbstractMap.SimpleEntry;
23 import java.util.ArrayList;
24 import java.util.Collections;
25 import java.util.List;
26 import java.util.Map;
27 import java.util.TreeMap;
28 import java.util.concurrent.Executor;
29 import java.util.concurrent.RejectedExecutionException;
30 import java.util.concurrent.atomic.AtomicBoolean;
31 import java.util.concurrent.atomic.AtomicReference;
32
33 /**
34 * Pure java UrlRequest, backed by {@link HttpURLConnection}.
35 */
36 @TargetApi(Build.VERSION_CODES.ICE_CREAM_SANDWICH) // TrafficStats only availabl e on ICS
37 final class JavaUrlRequest implements UrlRequest {
pauljensen 2016/09/26 14:51:20 do we need a separate build target (i.e. a jar) co
kapishnikov 2016/09/27 18:38:25 This is a good question that we should think about
pauljensen 2016/09/27 19:08:09 Agreed, we'll add the new build target later if th
38 private static final String X_ANDROID = "X-Android";
39 private static final String X_ANDROID_SELECTED_TRANSPORT = "X-Android-Select ed-Transport";
40 private static final String TAG = "JavaUrlConnection";
41 private static final int DEFAULT_UPLOAD_BUFFER_SIZE = 8192;
42 private static final int DEFAULT_CHUNK_LENGTH = DEFAULT_UPLOAD_BUFFER_SIZE;
43 private static final String USER_AGENT = "User-Agent";
44 private final AsyncUrlRequestCallback mCallbackAsync;
45 private final Executor mExecutor;
46 private final String mUserAgent;
47 private final Map<String, String> mRequestHeaders =
48 new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
49 private final List<String> mUrlChain = new ArrayList<>();
50 /**
51 * This is the source of thread safety in this class - no other synchronizat ion is performed.
52 * By compare-and-swapping from one state to another, we guarantee that oper ations aren't
53 * running concurrently. Only the winner of a CAS proceeds.
54 *
55 * <p>A caller can lose a CAS for three reasons - user error (two calls to r ead() without
56 * waiting for the read to succeed), runtime error (network code or user cod e throws an
57 * exception), or cancellation.
58 */
59 private final AtomicReference<State> mState = new AtomicReference<>(State.NO T_STARTED);
60 private final AtomicBoolean mUploadProviderClosed = new AtomicBoolean(false) ;
61
62 /**
63 * Traffic stats tag to associate this requests' data use with. It's capture d when the request
64 * is created, so that applications doing work on behalf of another app can correctly attribute
65 * that data use.
66 */
67 private final int mTrafficStatsTag;
68 private final boolean mAllowDirectExecutor;
69
70 /* These don't change with redirects */
71 private String mInitialMethod;
72 private UploadDataProvider mUploadDataProvider;
73 private Executor mUploadExecutor;
74
75 /**
76 * Holds a subset of StatusValues - {@link State#STARTED} can represent
77 * {@link Status#SENDING_REQUEST} or {@link Status#WAITING_FOR_RESPONSE}. Wh ile the distinction
78 * isn't needed to implement the logic in this class, it is needed to implem ent
79 * {@link #getStatus(StatusListener)}.
80 *
81 * <p>Concurrency notes - this value is not atomically updated with mState, so there is some
82 * risk that we'd get an inconsistent snapshot of both - however, it also ha ppens that this
83 * value is only used with the STARTED state, so it's inconsequential.
84 */
85 @Status.StatusValues private volatile int mAdditionalStatusDetails = Status. INVALID;
86
87 /* These change with redirects. */
88 private String mCurrentUrl;
89 private ReadableByteChannel mResponseChannel;
90 private UrlResponseInfo mUrlResponseInfo;
91 private String mPendingRedirectUrl;
92 /**
93 * The happens-before edges created by the executor submission and AtomicRef erence setting are
94 * sufficient to guarantee the correct behavior of this field; however, this is an
95 * AtomicReference so that we can cleanly dispose of a new connection if we' re cancelled during
96 * a redirect, which requires get-and-set semantics.
97 * */
98 private final AtomicReference<HttpURLConnection> mCurrentUrlConnection =
99 new AtomicReference<>();
100
101 /**
102 * /- AWAITING_FOLLOW_REDIRECT <- REDIRECT_RECEIVED <-\ /- READING <--\
103 * | | | |
104 * \ / \ /
105 * NOT_STARTED ---> STARTED ----> AW AITING_READ --->
106 * COMPLETE
107 */
108 private enum State {
109 NOT_STARTED,
110 STARTED,
111 REDIRECT_RECEIVED,
112 AWAITING_FOLLOW_REDIRECT,
113 AWAITING_READ,
114 READING,
115 ERROR,
116 COMPLETE,
117 CANCELLED,
118 }
119
120 /**
121 * @param executor The executor used for reading and writing from sockets
122 * @param userExecutor The executor used to dispatch to {@code callback}
123 */
124 JavaUrlRequest(Callback callback, final Executor executor, Executor userExec utor, String url,
125 String userAgent, boolean allowDirectExecutor) {
126 if (url == null) {
127 throw new NullPointerException("URL is required");
128 }
129 if (callback == null) {
130 throw new NullPointerException("Listener is required");
131 }
132 if (executor == null) {
133 throw new NullPointerException("Executor is required");
134 }
135 if (userExecutor == null) {
136 throw new NullPointerException("userExecutor is required");
137 }
138
139 this.mAllowDirectExecutor = allowDirectExecutor;
140 this.mCallbackAsync = new AsyncUrlRequestCallback(callback, userExecutor );
141 this.mTrafficStatsTag = TrafficStats.getThreadStatsTag();
142 this.mExecutor = new Executor() {
143 @Override
144 public void execute(final Runnable command) {
145 executor.execute(new Runnable() {
146 @Override
147 public void run() {
148 int oldTag = TrafficStats.getThreadStatsTag();
149 TrafficStats.setThreadStatsTag(mTrafficStatsTag);
150 try {
151 command.run();
152 } finally {
153 TrafficStats.setThreadStatsTag(oldTag);
154 }
155 }
156 });
157 }
158 };
159 this.mCurrentUrl = url;
160 this.mUserAgent = userAgent;
161 }
162
163 @Override
164 public void setHttpMethod(String method) {
165 checkNotStarted();
166 if (method == null) {
167 throw new NullPointerException("Method is required.");
168 }
169 if ("OPTIONS".equalsIgnoreCase(method) || "GET".equalsIgnoreCase(method)
170 || "HEAD".equalsIgnoreCase(method) || "POST".equalsIgnoreCase(me thod)
171 || "PUT".equalsIgnoreCase(method) || "DELETE".equalsIgnoreCase(m ethod)
172 || "TRACE".equalsIgnoreCase(method) || "PATCH".equalsIgnoreCase( method)) {
173 mInitialMethod = method;
174 } else {
175 throw new IllegalArgumentException("Invalid http method " + method);
176 }
177 }
178
179 private void checkNotStarted() {
180 State state = mState.get();
181 if (state != State.NOT_STARTED) {
182 throw new IllegalStateException("Request is already started. State i s: " + state);
183 }
184 }
185
186 @Override
187 public void addHeader(String header, String value) {
188 checkNotStarted();
189 if (!isValidHeaderName(header) || value.contains("\r\n")) {
190 throw new IllegalArgumentException("Invalid header " + header + "=" + value);
191 }
192 if (mRequestHeaders.containsKey(header)) {
193 mRequestHeaders.remove(header);
194 }
195 mRequestHeaders.put(header, value);
196 }
197
198 private boolean isValidHeaderName(String header) {
199 for (int i = 0; i < header.length(); i++) {
200 char c = header.charAt(i);
201 switch (c) {
202 case '(':
203 case ')':
204 case '<':
205 case '>':
206 case '@':
207 case ',':
208 case ';':
209 case ':':
210 case '\\':
211 case '\'':
212 case '/':
213 case '[':
214 case ']':
215 case '?':
216 case '=':
217 case '{':
218 case '}':
219 return false;
220 default: {
221 if (Character.isISOControl(c) || Character.isWhitespace(c)) {
222 return false;
223 }
224 }
225 }
226 }
227 return true;
228 }
229
230 @Override
231 public void setUploadDataProvider(UploadDataProvider uploadDataProvider, Exe cutor executor) {
232 if (uploadDataProvider == null) {
233 throw new NullPointerException("Invalid UploadDataProvider.");
234 }
235 if (!mRequestHeaders.containsKey("Content-Type")) {
236 throw new IllegalArgumentException(
237 "Requests with upload data must have a Content-Type.");
238 }
239 checkNotStarted();
240 if (mInitialMethod == null) {
241 mInitialMethod = "POST";
242 }
243 this.mUploadDataProvider = uploadDataProvider;
244 if (mAllowDirectExecutor) {
245 this.mUploadExecutor = executor;
246 } else {
247 this.mUploadExecutor = new DirectPreventingExecutor(executor);
248 }
249 }
250
251 private enum SinkState {
252 AWAITING_READ_RESULT,
253 AWAITING_REWIND_RESULT,
254 UPLOADING,
255 NOT_STARTED,
256 }
257
258 private final class OutputStreamDataSink implements UploadDataSink {
259 final AtomicReference<SinkState> mSinkState = new AtomicReference<>(Sink State.NOT_STARTED);
260 final Executor mUserUploadExecutor;
261 final Executor mExecutor;
262 final HttpURLConnection mUrlConnection;
263 WritableByteChannel mOutputChannel;
264 OutputStream mUrlConnectionOutputStream;
265 final UploadDataProvider mUploadProvider;
266 ByteBuffer mBuffer;
267 /** This holds the total bytes to send (the content-length). -1 if unkno wn. */
268 long mTotalBytes;
269 /** This holds the bytes written so far */
270 long mWrittenBytes = 0;
271
272 OutputStreamDataSink(final Executor userExecutor, Executor executor,
273 HttpURLConnection urlConnection, UploadDataProvider provider) {
274 this.mUserUploadExecutor = new Executor() {
275 @Override
276 public void execute(Runnable runnable) {
277 try {
278 userExecutor.execute(runnable);
279 } catch (RejectedExecutionException e) {
280 enterUploadErrorState(e);
281 }
282 }
283 };
284 this.mExecutor = executor;
285 this.mUrlConnection = urlConnection;
286 this.mUploadProvider = provider;
287 }
288
289 @Override
290 public void onReadSucceeded(final boolean finalChunk) {
291 if (!mSinkState.compareAndSet(SinkState.AWAITING_READ_RESULT, SinkSt ate.UPLOADING)) {
292 throw new IllegalStateException(
293 "Not expecting a read result, expecting: " + mSinkState. get());
294 }
295 mExecutor.execute(errorSetting(new CheckedRunnable() {
296 @Override
297 public void run() throws Exception {
298 mBuffer.flip();
299 if (mTotalBytes != -1 && mTotalBytes - mWrittenBytes < mBuff er.remaining()) {
300 enterUploadErrorState(new IllegalArgumentException(Strin g.format(
301 "Read upload data length %d exceeds expected len gth %d",
302 mWrittenBytes + mBuffer.remaining(), mTotalBytes )));
303 return;
304 }
305 while (mBuffer.hasRemaining()) {
306 mWrittenBytes += mOutputChannel.write(mBuffer);
307 }
308 // Forces a chunk to be sent, rather than buffering to the D EFAULT_CHUNK_LENGTH.
309 // This allows clients to trickle-upload bytes as they becom e available without
310 // introducing latency due to buffering.
311 mUrlConnectionOutputStream.flush();
312
313 if (mWrittenBytes < mTotalBytes || (mTotalBytes == -1 && !fi nalChunk)) {
314 mBuffer.clear();
315 mSinkState.set(SinkState.AWAITING_READ_RESULT);
316 executeOnUploadExecutor(new CheckedRunnable() {
317 @Override
318 public void run() throws Exception {
319 mUploadProvider.read(OutputStreamDataSink.this, mBuffer);
320 }
321 });
322 } else if (mTotalBytes == -1) {
323 finish();
324 } else if (mTotalBytes == mWrittenBytes) {
325 finish();
326 } else {
327 enterUploadErrorState(new IllegalArgumentException(Strin g.format(
328 "Read upload data length %d exceeds expected len gth %d",
329 mWrittenBytes, mTotalBytes)));
330 }
331 }
332 }));
333 }
334
335 @Override
336 public void onRewindSucceeded() {
337 if (!mSinkState.compareAndSet(SinkState.AWAITING_REWIND_RESULT, Sink State.UPLOADING)) {
338 throw new IllegalStateException("Not expecting a read result");
339 }
340 startRead();
341 }
342
343 @Override
344 public void onReadError(Exception exception) {
345 enterUploadErrorState(exception);
346 }
347
348 @Override
349 public void onRewindError(Exception exception) {
350 enterUploadErrorState(exception);
351 }
352
353 void startRead() {
354 mExecutor.execute(errorSetting(new CheckedRunnable() {
355 @Override
356 public void run() throws Exception {
357 if (mOutputChannel == null) {
358 mAdditionalStatusDetails = Status.CONNECTING;
359 mUrlConnection.connect();
360 mAdditionalStatusDetails = Status.SENDING_REQUEST;
361 mUrlConnectionOutputStream = mUrlConnection.getOutputStr eam();
362 mOutputChannel = Channels.newChannel(mUrlConnectionOutpu tStream);
363 }
364 mSinkState.set(SinkState.AWAITING_READ_RESULT);
365 executeOnUploadExecutor(new CheckedRunnable() {
366 @Override
367 public void run() throws Exception {
368 mUploadProvider.read(OutputStreamDataSink.this, mBuf fer);
369 }
370 });
371 }
372 }));
373 }
374
375 private void executeOnUploadExecutor(CheckedRunnable runnable) {
376 try {
377 mUserUploadExecutor.execute(uploadErrorSetting(runnable));
378 } catch (RejectedExecutionException e) {
379 enterUploadErrorState(e);
380 }
381 }
382
383 void finish() throws IOException {
384 if (mOutputChannel != null) {
385 mOutputChannel.close();
386 }
387 fireGetHeaders();
388 }
389
390 void start(final boolean firstTime) {
391 executeOnUploadExecutor(new CheckedRunnable() {
392 @Override
393 public void run() throws Exception {
394 mTotalBytes = mUploadProvider.getLength();
395 if (mTotalBytes == 0) {
396 finish();
397 } else {
398 // If we know how much data we have to upload, and it's small, we can save
399 // memory by allocating a reasonably sized buffer to rea d into.
400 if (mTotalBytes > 0 && mTotalBytes < DEFAULT_UPLOAD_BUFF ER_SIZE) {
401 // Allocate one byte more than necessary, to detect callers uploading
402 // more bytes than they specified in length.
403 mBuffer = ByteBuffer.allocateDirect((int) mTotalByte s + 1);
404 } else {
405 mBuffer = ByteBuffer.allocateDirect(DEFAULT_UPLOAD_B UFFER_SIZE);
406 }
407
408 if (mTotalBytes > 0 && mTotalBytes <= Integer.MAX_VALUE) {
409 mUrlConnection.setFixedLengthStreamingMode((int) mTo talBytes);
410 } else if (mTotalBytes > Integer.MAX_VALUE
411 && Build.VERSION.SDK_INT >= Build.VERSION_CODES. KITKAT) {
412 mUrlConnection.setFixedLengthStreamingMode(mTotalByt es);
413 } else {
414 // If we know the length, but we're running pre-kitk at and it's larger
415 // than an int can hold, we have to use chunked - ot herwise we'll end up
416 // buffering the whole response in memory.
417 mUrlConnection.setChunkedStreamingMode(DEFAULT_CHUNK _LENGTH);
418 }
419 if (firstTime) {
420 startRead();
421 } else {
422 mSinkState.set(SinkState.AWAITING_REWIND_RESULT);
423 mUploadProvider.rewind(OutputStreamDataSink.this);
424 }
425 }
426 }
427 });
428 }
429 }
430
431 @Override
432 public void start() {
433 mAdditionalStatusDetails = Status.CONNECTING;
434 transitionStates(State.NOT_STARTED, State.STARTED, new Runnable() {
435 @Override
436 public void run() {
437 mUrlChain.add(mCurrentUrl);
438 fireOpenConnection();
439 }
440 });
441 }
442
443 private void enterErrorState(final UrlRequestException error) {
444 if (setTerminalState(State.ERROR)) {
445 fireDisconnect();
446 fireCloseUploadDataProvider();
447 mCallbackAsync.onFailed(mUrlResponseInfo, error);
448 }
449 }
450
451 private boolean setTerminalState(State error) {
452 while (true) {
453 State oldState = mState.get();
454 switch (oldState) {
455 case NOT_STARTED:
456 throw new IllegalStateException("Can't enter error state bef ore start");
457 case ERROR: // fallthrough
458 case COMPLETE: // fallthrough
459 case CANCELLED:
460 return false; // Already in a terminal state
461 default: {
462 if (mState.compareAndSet(oldState, error)) {
463 return true;
464 }
465 }
466 }
467 }
468 }
469
470 /** Ends the request with an error, caused by an exception thrown from user code. */
471 private void enterUserErrorState(final Throwable error) {
472 enterErrorState(
473 new UrlRequestException("Exception received from UrlRequest.Call back", error));
474 }
475
476 /** Ends the request with an error, caused by an exception thrown from user code. */
477 private void enterUploadErrorState(final Throwable error) {
478 enterErrorState(
479 new UrlRequestException("Exception received from UploadDataProvi der", error));
480 }
481
482 private void enterCronetErrorState(final Throwable error) {
483 // TODO(clm) mapping from Java exception (UnknownHostException, for exam ple) to net error
484 // code goes here.
485 enterErrorState(new UrlRequestException("System error", error));
486 }
487
488 /**
489 * Atomically swaps from the expected state to a new state. If the swap fail s, and it's not
490 * due to an earlier error or cancellation, throws an exception.
491 *
492 * @param afterTransition Callback to run after transition completes success fully.
493 */
494 private void transitionStates(State expected, State newState, Runnable after Transition) {
495 if (!mState.compareAndSet(expected, newState)) {
496 State state = mState.get();
497 if (!(state == State.CANCELLED || state == State.ERROR)) {
498 throw new IllegalStateException(
499 "Invalid state transition - expected " + expected + " bu t was " + state);
500 }
501 } else {
502 afterTransition.run();
503 }
504 }
505
506 @Override
507 public void followRedirect() {
508 transitionStates(State.AWAITING_FOLLOW_REDIRECT, State.STARTED, new Runn able() {
509 @Override
510 public void run() {
511 mCurrentUrl = mPendingRedirectUrl;
512 mPendingRedirectUrl = null;
513 fireOpenConnection();
514 }
515 });
516 }
517
518 private void fireGetHeaders() {
519 mAdditionalStatusDetails = Status.WAITING_FOR_RESPONSE;
520 mExecutor.execute(errorSetting(new CheckedRunnable() {
521 @Override
522 public void run() throws Exception {
523 HttpURLConnection connection = mCurrentUrlConnection.get();
524 if (connection == null) {
525 return; // We've been cancelled
526 }
527 final List<Map.Entry<String, String>> headerList = new ArrayList <>();
528 String selectedTransport = "http/1.1";
529 String headerKey;
530 for (int i = 0; (headerKey = connection.getHeaderFieldKey(i)) != null; i++) {
531 if (X_ANDROID_SELECTED_TRANSPORT.equalsIgnoreCase(headerKey) ) {
532 selectedTransport = connection.getHeaderField(i);
533 }
534 if (!headerKey.startsWith(X_ANDROID)) {
535 headerList.add(new SimpleEntry<>(headerKey, connection.g etHeaderField(i)));
536 }
537 }
538
539 int responseCode = connection.getResponseCode();
540 // Important to copy the list here, because although we never co ncurrently modify
541 // the list ourselves, user code might iterate over it while we' re redirecting, and
542 // that would throw ConcurrentModificationException.
543 mUrlResponseInfo = new UrlResponseInfo(new ArrayList<>(mUrlChain ), responseCode,
544 connection.getResponseMessage(), Collections.unmodifiabl eList(headerList),
545 false, selectedTransport, "");
546 // TODO(clm) actual redirect handling? post -> get and whatnot?
547 if (responseCode >= 300 && responseCode < 400) {
548 fireRedirectReceived(mUrlResponseInfo.getAllHeaders());
549 return;
550 }
551 fireCloseUploadDataProvider();
552 if (responseCode >= 400) {
553 mResponseChannel = InputStreamChannel.wrap(connection.getErr orStream());
554 mCallbackAsync.onResponseStarted(mUrlResponseInfo);
555 } else {
556 mResponseChannel = InputStreamChannel.wrap(connection.getInp utStream());
557 mCallbackAsync.onResponseStarted(mUrlResponseInfo);
558 }
559 }
560 }));
561 }
562
563 private void fireCloseUploadDataProvider() {
564 if (mUploadDataProvider != null && mUploadProviderClosed.compareAndSet(f alse, true)) {
565 try {
566 mUploadExecutor.execute(uploadErrorSetting(new CheckedRunnable() {
567 @Override
568 public void run() throws Exception {
569 mUploadDataProvider.close();
570 }
571 }));
572 } catch (RejectedExecutionException e) {
573 Log.e(TAG, "Exception when closing uploadDataProvider", e);
574 }
575 }
576 }
577
578 private void fireRedirectReceived(final Map<String, List<String>> headerFiel ds) {
579 transitionStates(State.STARTED, State.REDIRECT_RECEIVED, new Runnable() {
580 @Override
581 public void run() {
582 mPendingRedirectUrl = URI.create(mCurrentUrl)
583 .resolve(headerFields.get("locatio n").get(0))
584 .toString();
585 mUrlChain.add(mPendingRedirectUrl);
586 transitionStates(
587 State.REDIRECT_RECEIVED, State.AWAITING_FOLLOW_REDIRECT, new Runnable() {
588 @Override
589 public void run() {
590 mCallbackAsync.onRedirectReceived(
591 mUrlResponseInfo, mPendingRedirectUrl);
592 }
593 });
594 }
595 });
596 }
597
598 private void fireOpenConnection() {
599 mExecutor.execute(errorSetting(new CheckedRunnable() {
600 @Override
601 public void run() throws Exception {
602 // If we're cancelled, then our old connection will be disconnec ted for us and
603 // we shouldn't open a new one.
604 if (mState.get() == State.CANCELLED) {
605 return;
606 }
607
608 final URL url = new URL(mCurrentUrl);
609 HttpURLConnection newConnection = (HttpURLConnection) url.openCo nnection();
610 HttpURLConnection oldConnection = mCurrentUrlConnection.getAndSe t(newConnection);
611 if (oldConnection != null) {
612 oldConnection.disconnect();
613 }
614 newConnection.setInstanceFollowRedirects(false);
615 if (!mRequestHeaders.containsKey(USER_AGENT)) {
616 mRequestHeaders.put(USER_AGENT, mUserAgent);
617 }
618 for (Map.Entry<String, String> entry : mRequestHeaders.entrySet( )) {
619 newConnection.setRequestProperty(entry.getKey(), entry.getVa lue());
620 }
621 if (mInitialMethod == null) {
622 mInitialMethod = "GET";
623 }
624 newConnection.setRequestMethod(mInitialMethod);
625 if (mUploadDataProvider != null) {
626 OutputStreamDataSink dataSink = new OutputStreamDataSink(
627 mUploadExecutor, mExecutor, newConnection, mUploadDa taProvider);
628 dataSink.start(mUrlChain.size() == 1);
629 } else {
630 mAdditionalStatusDetails = Status.CONNECTING;
631 newConnection.connect();
632 fireGetHeaders();
633 }
634 }
635 }));
636 }
637
638 private Runnable errorSetting(final CheckedRunnable delegate) {
639 return new Runnable() {
640 @Override
641 public void run() {
642 try {
643 delegate.run();
644 } catch (Throwable t) {
645 enterCronetErrorState(t);
646 }
647 }
648 };
649 }
650
651 private Runnable userErrorSetting(final CheckedRunnable delegate) {
652 return new Runnable() {
653 @Override
654 public void run() {
655 try {
656 delegate.run();
657 } catch (Throwable t) {
658 enterUserErrorState(t);
659 }
660 }
661 };
662 }
663
664 private Runnable uploadErrorSetting(final CheckedRunnable delegate) {
665 return new Runnable() {
666 @Override
667 public void run() {
668 try {
669 delegate.run();
670 } catch (Throwable t) {
671 enterUploadErrorState(t);
672 }
673 }
674 };
675 }
676
677 private interface CheckedRunnable { void run() throws Exception; }
678
679 @Override
680 public void read(final ByteBuffer buffer) {
681 Preconditions.checkDirect(buffer);
682 Preconditions.checkHasRemaining(buffer);
683 transitionStates(State.AWAITING_READ, State.READING, new Runnable() {
684 @Override
685 public void run() {
686 mExecutor.execute(errorSetting(new CheckedRunnable() {
687 @Override
688 public void run() throws Exception {
689 int read = mResponseChannel.read(buffer);
690 processReadResult(read, buffer);
691 }
692 }));
693 }
694 });
695 }
696
697 private void processReadResult(int read, final ByteBuffer buffer) throws IOE xception {
698 if (read != -1) {
699 mCallbackAsync.onReadCompleted(mUrlResponseInfo, buffer);
700 } else {
701 mResponseChannel.close();
702 if (mState.compareAndSet(State.READING, State.COMPLETE)) {
703 fireDisconnect();
704 mCallbackAsync.onSucceeded(mUrlResponseInfo);
705 }
706 }
707 }
708
709 private void fireDisconnect() {
710 final HttpURLConnection connection = mCurrentUrlConnection.getAndSet(nul l);
711 if (connection != null) {
712 mExecutor.execute(new Runnable() {
713 @Override
714 public void run() {
715 connection.disconnect();
716 }
717 });
718 }
719 }
720
721 @Override
722 public void cancel() {
723 State oldState = mState.getAndSet(State.CANCELLED);
724 switch (oldState) {
725 // We've just scheduled some user code to run. When they perform the ir next operation,
726 // they'll observe it and fail. However, if user code is cancelling in response to one
727 // of these callbacks, we'll never actually cancel!
728 // TODO(clm) figure out if it's possible to avoid concurrency in use r callbacks.
729 case REDIRECT_RECEIVED:
730 case AWAITING_FOLLOW_REDIRECT:
731 case AWAITING_READ:
732
733 // User code is waiting on us - cancel away!
734 case STARTED:
735 case READING:
736 fireDisconnect();
737 fireCloseUploadDataProvider();
738 mCallbackAsync.onCanceled(mUrlResponseInfo);
739 break;
740 // The rest are all termination cases - we're too late to cancel.
741 case ERROR:
742 case COMPLETE:
743 case CANCELLED:
744 break;
745 }
746 }
747
748 @Override
749 public boolean isDone() {
750 State state = mState.get();
751 return state == State.COMPLETE | state == State.ERROR | state == State.C ANCELLED;
752 }
753
754 @Override
755 public void getStatus(StatusListener listener) {
756 State state = mState.get();
757 int extraStatus = this.mAdditionalStatusDetails;
758
759 @Status.StatusValues final int status;
760 switch (state) {
761 case ERROR:
762 case COMPLETE:
763 case CANCELLED:
764 case NOT_STARTED:
765 status = Status.INVALID;
766 break;
767 case STARTED:
768 status = extraStatus;
769 break;
770 case REDIRECT_RECEIVED:
771 case AWAITING_FOLLOW_REDIRECT:
772 case AWAITING_READ:
773 status = Status.IDLE;
774 break;
775 case READING:
776 status = Status.READING_RESPONSE;
777 break;
778 default:
779 throw new IllegalStateException("Switch is exhaustive: " + state );
780 }
781
782 mCallbackAsync.sendStatus(listener, status);
783 }
784
785 /** This wrapper ensures that callbacks are always called on the correct exe cutor */
786 private final class AsyncUrlRequestCallback {
787 final UrlRequest.Callback mCallback;
788 final Executor mUserExecutor;
789 final Executor mFallbackExecutor;
790
791 AsyncUrlRequestCallback(Callback callback, final Executor userExecutor) {
792 this.mCallback = callback;
793 if (mAllowDirectExecutor) {
794 this.mUserExecutor = userExecutor;
795 this.mFallbackExecutor = null;
796 } else {
797 mUserExecutor = new DirectPreventingExecutor(userExecutor);
798 mFallbackExecutor = userExecutor;
799 }
800 }
801
802 void sendStatus(final StatusListener listener, final int status) {
803 mUserExecutor.execute(new Runnable() {
804 @Override
805 public void run() {
806 listener.onStatus(status);
807 }
808 });
809 }
810
811 void execute(CheckedRunnable runnable) {
812 try {
813 mUserExecutor.execute(userErrorSetting(runnable));
814 } catch (RejectedExecutionException e) {
815 enterErrorState(new UrlRequestException("Exception posting task to executor", e));
816 }
817 }
818
819 void onRedirectReceived(final UrlResponseInfo info, final String newLoca tionUrl) {
820 execute(new CheckedRunnable() {
821 @Override
822 public void run() throws Exception {
823 mCallback.onRedirectReceived(JavaUrlRequest.this, info, newL ocationUrl);
824 }
825 });
826 }
827
828 void onResponseStarted(UrlResponseInfo info) {
829 execute(new CheckedRunnable() {
830 @Override
831 public void run() throws Exception {
832 if (mState.compareAndSet(State.STARTED, State.AWAITING_READ) ) {
833 mCallback.onResponseStarted(JavaUrlRequest.this, mUrlRes ponseInfo);
834 }
835 }
836 });
837 }
838
839 void onReadCompleted(final UrlResponseInfo info, final ByteBuffer byteBu ffer) {
840 execute(new CheckedRunnable() {
841 @Override
842 public void run() throws Exception {
843 if (mState.compareAndSet(State.READING, State.AWAITING_READ) ) {
844 mCallback.onReadCompleted(JavaUrlRequest.this, info, byt eBuffer);
845 }
846 }
847 });
848 }
849
850 void onCanceled(final UrlResponseInfo info) {
851 closeResponseChannel();
852 mUserExecutor.execute(new Runnable() {
853 @Override
854 public void run() {
855 try {
856 mCallback.onCanceled(JavaUrlRequest.this, info);
857 } catch (Exception exception) {
858 Log.e(TAG, "Exception in onCanceled method", exception);
859 }
860 }
861 });
862 }
863
864 void onSucceeded(final UrlResponseInfo info) {
865 mUserExecutor.execute(new Runnable() {
866 @Override
867 public void run() {
868 try {
869 mCallback.onSucceeded(JavaUrlRequest.this, info);
870 } catch (Exception exception) {
871 Log.e(TAG, "Exception in onSucceeded method", exception) ;
872 }
873 }
874 });
875 }
876
877 void onFailed(final UrlResponseInfo urlResponseInfo, final UrlRequestExc eption e) {
878 closeResponseChannel();
879 Runnable runnable = new Runnable() {
880 @Override
881 public void run() {
882 try {
883 mCallback.onFailed(JavaUrlRequest.this, urlResponseInfo, e);
884 } catch (Exception exception) {
885 Log.e(TAG, "Exception in onFailed method", exception);
886 }
887 }
888 };
889 try {
890 mUserExecutor.execute(runnable);
891 } catch (InlineExecutionProhibitedException wasDirect) {
892 if (mFallbackExecutor != null) {
893 mFallbackExecutor.execute(runnable);
894 }
895 }
896 }
897 }
898
899 private void closeResponseChannel() {
900 final Closeable closeable = mResponseChannel;
901 if (closeable == null) {
902 return;
903 }
904 mResponseChannel = null;
905 mExecutor.execute(new Runnable() {
906 @Override
907 public void run() {
908 try {
909 closeable.close();
910 } catch (IOException e) {
911 e.printStackTrace();
912 }
913 }
914 });
915 }
916
917 /**
918 * Executor that detects and throws if its mDelegate runs a submitted runnab le inline.
919 */
920 static final class DirectPreventingExecutor implements Executor {
921 private final Executor mDelegate;
922
923 DirectPreventingExecutor(Executor delegate) {
924 this.mDelegate = delegate;
925 }
926
927 @Override
928 public void execute(Runnable command) {
929 Thread currentThread = Thread.currentThread();
930 InlineCheckingRunnable runnable = new InlineCheckingRunnable(command , currentThread);
931 mDelegate.execute(runnable);
932 if (runnable.mExecutedInline != null) {
933 throw runnable.mExecutedInline;
934 } else {
935 // It's possible that this method is being called on an executor , and the runnable
936 // that
937 // was just queued will run on this thread after the current run nable returns. By
938 // nulling out the mCallingThread field, the InlineCheckingRunna ble's current thread
939 // comparison will not fire.
940 runnable.mCallingThread = null;
941 }
942 }
943
944 private static final class InlineCheckingRunnable implements Runnable {
945 private final Runnable mCommand;
946 private Thread mCallingThread;
947 private InlineExecutionProhibitedException mExecutedInline = null;
948
949 private InlineCheckingRunnable(Runnable command, Thread callingThrea d) {
950 this.mCommand = command;
951 this.mCallingThread = callingThread;
952 }
953
954 @Override
955 public void run() {
956 if (Thread.currentThread() == mCallingThread) {
957 // Can't throw directly from here, since the delegate execut or could catch this
958 // exception.
959 mExecutedInline = new InlineExecutionProhibitedException();
960 return;
961 }
962 mCommand.run();
963 }
964 }
965 }
966 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698