Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(101)

Side by Side Diff: components/cronet/android/java/src/org/chromium/net/CronetBidirectionalStream.java

Issue 1849753002: [Cronet] Separate Cronet implementation and API by package name. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: sync Created 4 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package org.chromium.net;
6
7 import org.chromium.base.Log;
8 import org.chromium.base.VisibleForTesting;
9 import org.chromium.base.annotations.CalledByNative;
10 import org.chromium.base.annotations.JNINamespace;
11 import org.chromium.base.annotations.NativeClassQualifiedName;
12
13 import java.nio.ByteBuffer;
14 import java.util.AbstractMap;
15 import java.util.ArrayList;
16 import java.util.Arrays;
17 import java.util.LinkedList;
18 import java.util.List;
19 import java.util.Map;
20 import java.util.concurrent.Executor;
21 import java.util.concurrent.RejectedExecutionException;
22
23 import javax.annotation.concurrent.GuardedBy;
24
25 /**
26 * {@link BidirectionalStream} implementation using Chromium network stack.
27 * All @CalledByNative methods are called on the native network thread
28 * and post tasks with callback calls onto Executor. Upon returning from callbac k, the native
29 * stream is called on Executor thread and posts native tasks to the native netw ork thread.
30 */
31 @JNINamespace("cronet")
32 class CronetBidirectionalStream extends BidirectionalStream {
33 /**
34 * States of BidirectionalStream are tracked in mReadState and mWriteState.
35 * The write state is separated out as it changes independently of the read state.
36 * There is one initial state: State.NOT_STARTED. There is one normal final state:
37 * State.SUCCESS, reached after State.READING_DONE and State.WRITING_DONE. T here are two
38 * exceptional final states: State.CANCELED and State.ERROR, which can be re ached from
39 * any other non-final state.
40 */
41 private enum State {
42 /* Initial state, stream not started. */
43 NOT_STARTED,
44 /*
45 * Stream started, request headers are being sent if mDelayRequestHeader sUntilNextFlush
46 * is not set to true.
47 */
48 STARTED,
49 /* Waiting for {@code read()} to be called. */
50 WAITING_FOR_READ,
51 /* Reading from the remote, {@code onReadCompleted()} callback will be c alled when done. */
52 READING,
53 /* There is no more data to read and stream is half-closed by the remote side. */
54 READING_DONE,
55 /* Stream is canceled. */
56 CANCELED,
57 /* Error has occured, stream is closed. */
58 ERROR,
59 /* Reading and writing are done, and the stream is closed successfully. */
60 SUCCESS,
61 /* Waiting for {@code nativeSendRequestHeaders()} or {@code nativeWritev Data()} to be
62 called. */
63 WAITING_FOR_FLUSH,
64 /* Writing to the remote, {@code onWritevCompleted()} callback will be c alled when done. */
65 WRITING,
66 /* There is no more data to write and stream is half-closed by the local side. */
67 WRITING_DONE,
68 }
69
70 private final CronetUrlRequestContext mRequestContext;
71 private final Executor mExecutor;
72 private final Callback mCallback;
73 private final String mInitialUrl;
74 private final int mInitialPriority;
75 private final String mInitialMethod;
76 private final String mRequestHeaders[];
77 private final boolean mDisableAutoFlush;
78 private final boolean mDelayRequestHeadersUntilFirstFlush;
79
80 /*
81 * Synchronizes access to mNativeStream, mReadState and mWriteState.
82 */
83 private final Object mNativeStreamLock = new Object();
84
85 @GuardedBy("mNativeStreamLock")
86 // Pending write data.
87 private LinkedList<ByteBuffer> mPendingData;
88
89 @GuardedBy("mNativeStreamLock")
90 // Flush data queue that should be pushed to the native stack when the previ ous
91 // nativeWritevData completes.
92 private LinkedList<ByteBuffer> mFlushData;
93
94 @GuardedBy("mNativeStreamLock")
95 // Whether an end-of-stream flag is passed in through write().
96 private boolean mEndOfStreamWritten;
97
98 @GuardedBy("mNativeStreamLock")
99 // Whether request headers have been sent.
100 private boolean mRequestHeadersSent;
101
102 /* Native BidirectionalStream object, owned by CronetBidirectionalStream. */
103 @GuardedBy("mNativeStreamLock")
104 private long mNativeStream;
105
106 /**
107 * Read state is tracking reading flow.
108 * / <--- READING <--- \
109 * | |
110 * \ /
111 * NOT_STARTED -> STARTED --> WAITING_FOR_READ -> READING_DONE -> SUCCESS
112 */
113 @GuardedBy("mNativeStreamLock")
114 private State mReadState = State.NOT_STARTED;
115
116 /**
117 * Write state is tracking writing flow.
118 * / <--- WRITING <--- \
119 * | |
120 * \ /
121 * NOT_STARTED -> STARTED --> WAITING_FOR_FLUSH -> WRITING_DONE -> SUCCESS
122 */
123 @GuardedBy("mNativeStreamLock")
124 private State mWriteState = State.NOT_STARTED;
125
126 // Only modified on the network thread.
127 private UrlResponseInfo mResponseInfo;
128
129 /*
130 * OnReadCompleted callback is repeatedly invoked when each read is complete d, so it
131 * is cached as a member variable.
132 */
133 // Only modified on the network thread.
134 private OnReadCompletedRunnable mOnReadCompletedTask;
135
136 private Runnable mOnDestroyedCallbackForTesting;
137
138 private final class OnReadCompletedRunnable implements Runnable {
139 // Buffer passed back from current invocation of onReadCompleted.
140 ByteBuffer mByteBuffer;
141 // End of stream flag from current invocation of onReadCompleted.
142 boolean mEndOfStream;
143
144 @Override
145 public void run() {
146 try {
147 // Null out mByteBuffer, to pass buffer ownership to callback or release if done.
148 ByteBuffer buffer = mByteBuffer;
149 mByteBuffer = null;
150 boolean maybeOnSucceeded = false;
151 synchronized (mNativeStreamLock) {
152 if (isDoneLocked()) {
153 return;
154 }
155 if (mEndOfStream) {
156 mReadState = State.READING_DONE;
157 maybeOnSucceeded = (mWriteState == State.WRITING_DONE);
158 } else {
159 mReadState = State.WAITING_FOR_READ;
160 }
161 }
162 mCallback.onReadCompleted(
163 CronetBidirectionalStream.this, mResponseInfo, buffer, m EndOfStream);
164 if (maybeOnSucceeded) {
165 maybeOnSucceededOnExecutor();
166 }
167 } catch (Exception e) {
168 onCallbackException(e);
169 }
170 }
171 }
172
173 private final class OnWriteCompletedRunnable implements Runnable {
174 // Buffer passed back from current invocation of onWriteCompleted.
175 private ByteBuffer mByteBuffer;
176 // End of stream flag from current call to write.
177 private final boolean mEndOfStream;
178
179 OnWriteCompletedRunnable(ByteBuffer buffer, boolean endOfStream) {
180 mByteBuffer = buffer;
181 mEndOfStream = endOfStream;
182 }
183
184 @Override
185 public void run() {
186 try {
187 // Null out mByteBuffer, to pass buffer ownership to callback or release if done.
188 ByteBuffer buffer = mByteBuffer;
189 mByteBuffer = null;
190 boolean maybeOnSucceeded = false;
191 synchronized (mNativeStreamLock) {
192 if (isDoneLocked()) {
193 return;
194 }
195 if (mEndOfStream) {
196 mWriteState = State.WRITING_DONE;
197 maybeOnSucceeded = (mReadState == State.READING_DONE);
198 }
199 }
200 mCallback.onWriteCompleted(
201 CronetBidirectionalStream.this, mResponseInfo, buffer, m EndOfStream);
202 if (maybeOnSucceeded) {
203 maybeOnSucceededOnExecutor();
204 }
205 } catch (Exception e) {
206 onCallbackException(e);
207 }
208 }
209 }
210
211 CronetBidirectionalStream(CronetUrlRequestContext requestContext, String url ,
212 @BidirectionalStream.Builder.StreamPriority int priority, Callback c allback,
213 Executor executor, String httpMethod, List<Map.Entry<String, String> > requestHeaders,
214 boolean disableAutoFlush, boolean delayRequestHeadersUntilNextFlush) {
215 mRequestContext = requestContext;
216 mInitialUrl = url;
217 mInitialPriority = convertStreamPriority(priority);
218 mCallback = callback;
219 mExecutor = executor;
220 mInitialMethod = httpMethod;
221 mRequestHeaders = stringsFromHeaderList(requestHeaders);
222 mDisableAutoFlush = disableAutoFlush;
223 mDelayRequestHeadersUntilFirstFlush = delayRequestHeadersUntilNextFlush;
224 mPendingData = new LinkedList<>();
225 mFlushData = new LinkedList<>();
226 }
227
228 @Override
229 public void start() {
230 synchronized (mNativeStreamLock) {
231 if (mReadState != State.NOT_STARTED) {
232 throw new IllegalStateException("Stream is already started.");
233 }
234 try {
235 mNativeStream = nativeCreateBidirectionalStream(
236 mRequestContext.getUrlRequestContextAdapter(),
237 !mDelayRequestHeadersUntilFirstFlush);
238 mRequestContext.onRequestStarted();
239 // Non-zero startResult means an argument error.
240 int startResult = nativeStart(mNativeStream, mInitialUrl, mIniti alPriority,
241 mInitialMethod, mRequestHeaders, !doesMethodAllowWriteDa ta(mInitialMethod));
242 if (startResult == -1) {
243 throw new IllegalArgumentException("Invalid http method " + mInitialMethod);
244 }
245 if (startResult > 0) {
246 int headerPos = startResult - 1;
247 throw new IllegalArgumentException("Invalid header "
248 + mRequestHeaders[headerPos] + "=" + mRequestHeaders [headerPos + 1]);
249 }
250 mReadState = mWriteState = State.STARTED;
251 } catch (RuntimeException e) {
252 // If there's an exception, clean up and then throw the
253 // exception to the caller.
254 destroyNativeStreamLocked(false);
255 throw e;
256 }
257 }
258 }
259
260 @Override
261 public void read(ByteBuffer buffer) {
262 synchronized (mNativeStreamLock) {
263 Preconditions.checkHasRemaining(buffer);
264 Preconditions.checkDirect(buffer);
265 if (mReadState != State.WAITING_FOR_READ) {
266 throw new IllegalStateException("Unexpected read attempt.");
267 }
268 if (isDoneLocked()) {
269 return;
270 }
271 if (mOnReadCompletedTask == null) {
272 mOnReadCompletedTask = new OnReadCompletedRunnable();
273 }
274 mReadState = State.READING;
275 if (!nativeReadData(mNativeStream, buffer, buffer.position(), buffer .limit())) {
276 // Still waiting on read. This is just to have consistent
277 // behavior with the other error cases.
278 mReadState = State.WAITING_FOR_READ;
279 throw new IllegalArgumentException("Unable to call native read") ;
280 }
281 }
282 }
283
284 @Override
285 public void write(ByteBuffer buffer, boolean endOfStream) {
286 synchronized (mNativeStreamLock) {
287 Preconditions.checkDirect(buffer);
288 if (!buffer.hasRemaining() && !endOfStream) {
289 throw new IllegalArgumentException("Empty buffer before end of s tream.");
290 }
291 if (mEndOfStreamWritten) {
292 throw new IllegalArgumentException("Write after writing end of s tream.");
293 }
294 if (isDoneLocked()) {
295 return;
296 }
297 mPendingData.add(buffer);
298 if (endOfStream) {
299 mEndOfStreamWritten = true;
300 }
301 if (!mDisableAutoFlush) {
302 flushLocked();
303 }
304 }
305 }
306
307 @Override
308 public void flush() {
309 synchronized (mNativeStreamLock) {
310 flushLocked();
311 }
312 }
313
314 @SuppressWarnings("GuardedByChecker")
315 private void flushLocked() {
316 if (isDoneLocked()
317 || (mWriteState != State.WAITING_FOR_FLUSH && mWriteState != Sta te.WRITING)) {
318 return;
319 }
320 if (mPendingData.isEmpty() && mFlushData.isEmpty()) {
321 // If there is no pending write when flush() is called, see if
322 // request headers need to be flushed.
323 if (!mRequestHeadersSent) {
324 mRequestHeadersSent = true;
325 nativeSendRequestHeaders(mNativeStream);
326 if (!doesMethodAllowWriteData(mInitialMethod)) {
327 mWriteState = State.WRITING_DONE;
328 }
329 }
330 return;
331 }
332
333 assert !mPendingData.isEmpty() || !mFlushData.isEmpty();
334
335 // Move buffers from mPendingData to the flushing queue.
336 if (!mPendingData.isEmpty()) {
337 mFlushData.addAll(mPendingData);
338 mPendingData.clear();
339 }
340
341 if (mWriteState == State.WRITING) {
342 // If there is a write already pending, wait until onWritevCompleted is
343 // called before pushing data to the native stack.
344 return;
345 }
346 sendFlushDataLocked();
347 }
348
349 // Helper method to send buffers in mFlushData. Caller needs to acquire
350 // mNativeStreamLock and make sure mWriteState is WAITING_FOR_FLUSH and
351 // mFlushData queue isn't empty.
352 @SuppressWarnings("GuardedByChecker")
353 private void sendFlushDataLocked() {
354 assert mWriteState == State.WAITING_FOR_FLUSH;
355 int size = mFlushData.size();
356 ByteBuffer[] buffers = new ByteBuffer[size];
357 int[] positions = new int[size];
358 int[] limits = new int[size];
359 for (int i = 0; i < size; i++) {
360 ByteBuffer buffer = mFlushData.poll();
361 buffers[i] = buffer;
362 positions[i] = buffer.position();
363 limits[i] = buffer.limit();
364 }
365 assert mFlushData.isEmpty();
366 assert buffers.length >= 1;
367 mWriteState = State.WRITING;
368 if (!nativeWritevData(mNativeStream, buffers, positions, limits,
369 mEndOfStreamWritten && mPendingData.isEmpty())) {
370 // Still waiting on flush. This is just to have consistent
371 // behavior with the other error cases.
372 mWriteState = State.WAITING_FOR_FLUSH;
373 throw new IllegalArgumentException("Unable to call native writev.");
374 }
375 }
376
377 /**
378 * Returns a read-only copy of {@code mPendingData} for testing.
379 */
380 @VisibleForTesting
381 public List<ByteBuffer> getPendingDataForTesting() {
382 synchronized (mNativeStreamLock) {
383 List<ByteBuffer> pendingData = new LinkedList<ByteBuffer>();
384 for (ByteBuffer buffer : mPendingData) {
385 pendingData.add(buffer.asReadOnlyBuffer());
386 }
387 return pendingData;
388 }
389 }
390
391 /**
392 * Returns a read-only copy of {@code mFlushData} for testing.
393 */
394 @VisibleForTesting
395 public List<ByteBuffer> getFlushDataForTesting() {
396 synchronized (mNativeStreamLock) {
397 List<ByteBuffer> flushData = new LinkedList<ByteBuffer>();
398 for (ByteBuffer buffer : mFlushData) {
399 flushData.add(buffer.asReadOnlyBuffer());
400 }
401 return flushData;
402 }
403 }
404
405 @Override
406 public void cancel() {
407 synchronized (mNativeStreamLock) {
408 if (isDoneLocked() || mReadState == State.NOT_STARTED) {
409 return;
410 }
411 mReadState = mWriteState = State.CANCELED;
412 destroyNativeStreamLocked(true);
413 }
414 }
415
416 @Override
417 public boolean isDone() {
418 synchronized (mNativeStreamLock) {
419 return isDoneLocked();
420 }
421 }
422
423 @GuardedBy("mNativeStreamLock")
424 private boolean isDoneLocked() {
425 return mReadState != State.NOT_STARTED && mNativeStream == 0;
426 }
427
428 /*
429 * Runs an onSucceeded callback if both Read and Write sides are closed.
430 */
431 private void maybeOnSucceededOnExecutor() {
432 synchronized (mNativeStreamLock) {
433 if (isDoneLocked()) {
434 return;
435 }
436 if (!(mWriteState == State.WRITING_DONE && mReadState == State.READI NG_DONE)) {
437 return;
438 }
439 mReadState = mWriteState = State.SUCCESS;
440 // Destroy native stream first, so UrlRequestContext could be shut
441 // down from the listener.
442 destroyNativeStreamLocked(false);
443 }
444 try {
445 mCallback.onSucceeded(CronetBidirectionalStream.this, mResponseInfo) ;
446 } catch (Exception e) {
447 Log.e(CronetUrlRequestContext.LOG_TAG, "Exception in onSucceeded met hod", e);
448 }
449 }
450
451 @SuppressWarnings("unused")
452 @CalledByNative
453 private void onStreamReady(final boolean requestHeadersSent) {
454 postTaskToExecutor(new Runnable() {
455 public void run() {
456 synchronized (mNativeStreamLock) {
457 if (isDoneLocked()) {
458 return;
459 }
460 mRequestHeadersSent = requestHeadersSent;
461 mReadState = State.WAITING_FOR_READ;
462 if (!doesMethodAllowWriteData(mInitialMethod) && mRequestHea dersSent) {
463 mWriteState = State.WRITING_DONE;
464 } else {
465 mWriteState = State.WAITING_FOR_FLUSH;
466 }
467 }
468
469 try {
470 mCallback.onStreamReady(CronetBidirectionalStream.this);
471 } catch (Exception e) {
472 onCallbackException(e);
473 }
474 }
475 });
476 }
477
478 /**
479 * Called when the final set of headers, after all redirects,
480 * is received. Can only be called once for each stream.
481 */
482 @SuppressWarnings("unused")
483 @CalledByNative
484 private void onResponseHeadersReceived(int httpStatusCode, String negotiated Protocol,
485 String[] headers, long receivedBytesCount) {
486 try {
487 mResponseInfo = prepareResponseInfoOnNetworkThread(
488 httpStatusCode, negotiatedProtocol, headers, receivedBytesCo unt);
489 } catch (Exception e) {
490 failWithException(new CronetException("Cannot prepare ResponseInfo", null));
491 return;
492 }
493 postTaskToExecutor(new Runnable() {
494 public void run() {
495 synchronized (mNativeStreamLock) {
496 if (isDoneLocked()) {
497 return;
498 }
499 mReadState = State.WAITING_FOR_READ;
500 }
501
502 try {
503 mCallback.onResponseHeadersReceived(
504 CronetBidirectionalStream.this, mResponseInfo);
505 } catch (Exception e) {
506 onCallbackException(e);
507 }
508 }
509 });
510 }
511
512 @SuppressWarnings("unused")
513 @CalledByNative
514 private void onReadCompleted(final ByteBuffer byteBuffer, int bytesRead, int initialPosition,
515 int initialLimit, long receivedBytesCount) {
516 mResponseInfo.setReceivedBytesCount(receivedBytesCount);
517 if (byteBuffer.position() != initialPosition || byteBuffer.limit() != in itialLimit) {
518 failWithException(
519 new CronetException("ByteBuffer modified externally during r ead", null));
520 return;
521 }
522 if (bytesRead < 0 || initialPosition + bytesRead > initialLimit) {
523 failWithException(new CronetException("Invalid number of bytes read" , null));
524 return;
525 }
526 byteBuffer.position(initialPosition + bytesRead);
527 assert mOnReadCompletedTask.mByteBuffer == null;
528 mOnReadCompletedTask.mByteBuffer = byteBuffer;
529 mOnReadCompletedTask.mEndOfStream = (bytesRead == 0);
530 postTaskToExecutor(mOnReadCompletedTask);
531 }
532
533 @SuppressWarnings("unused")
534 @CalledByNative
535 private void onWritevCompleted(final ByteBuffer[] byteBuffers, int[] initial Positions,
536 int[] initialLimits, boolean endOfStream) {
537 assert byteBuffers.length == initialPositions.length;
538 assert byteBuffers.length == initialLimits.length;
539 synchronized (mNativeStreamLock) {
540 mWriteState = State.WAITING_FOR_FLUSH;
541 // Flush if there is anything in the flush queue mFlushData.
542 if (!mFlushData.isEmpty()) {
543 sendFlushDataLocked();
544 }
545 }
546 for (int i = 0; i < byteBuffers.length; i++) {
547 ByteBuffer buffer = byteBuffers[i];
548 if (buffer.position() != initialPositions[i] || buffer.limit() != in itialLimits[i]) {
549 failWithException(
550 new CronetException("ByteBuffer modified externally duri ng write", null));
551 return;
552 }
553 // Current implementation always writes the complete buffer.
554 buffer.position(buffer.limit());
555 postTaskToExecutor(new OnWriteCompletedRunnable(buffer,
556 // Only set endOfStream flag if this buffer is the last in b yteBuffers.
557 endOfStream && i == byteBuffers.length - 1));
558 }
559 }
560
561 @SuppressWarnings("unused")
562 @CalledByNative
563 private void onResponseTrailersReceived(String[] trailers) {
564 final UrlResponseInfo.HeaderBlock trailersBlock =
565 new UrlResponseInfo.HeaderBlock(headersListFromStrings(trailers) );
566 postTaskToExecutor(new Runnable() {
567 public void run() {
568 synchronized (mNativeStreamLock) {
569 if (isDoneLocked()) {
570 return;
571 }
572 }
573 try {
574 mCallback.onResponseTrailersReceived(
575 CronetBidirectionalStream.this, mResponseInfo, trail ersBlock);
576 } catch (Exception e) {
577 onCallbackException(e);
578 }
579 }
580 });
581 }
582
583 @SuppressWarnings("unused")
584 @CalledByNative
585 private void onError(int errorCode, int nativeError, int nativeQuicError, St ring errorString,
586 long receivedBytesCount) {
587 if (mResponseInfo != null) {
588 mResponseInfo.setReceivedBytesCount(receivedBytesCount);
589 }
590 if (errorCode == UrlRequestException.ERROR_QUIC_PROTOCOL_FAILED) {
591 failWithException(new QuicException("Exception in BidirectionalStrea m: " + errorString,
592 nativeError, nativeQuicError));
593 } else {
594 failWithException(new CronetException(
595 "Exception in BidirectionalStream: " + errorString, errorCod e, nativeError));
596 }
597 }
598
599 /**
600 * Called when request is canceled, no callbacks will be called afterwards.
601 */
602 @SuppressWarnings("unused")
603 @CalledByNative
604 private void onCanceled() {
605 postTaskToExecutor(new Runnable() {
606 public void run() {
607 try {
608 mCallback.onCanceled(CronetBidirectionalStream.this, mRespon seInfo);
609 } catch (Exception e) {
610 Log.e(CronetUrlRequestContext.LOG_TAG, "Exception in onCance led method", e);
611 }
612 }
613 });
614 }
615
616 @VisibleForTesting
617 public void setOnDestroyedCallbackForTesting(Runnable onDestroyedCallbackFor Testing) {
618 mOnDestroyedCallbackForTesting = onDestroyedCallbackForTesting;
619 }
620
621 private static boolean doesMethodAllowWriteData(String methodName) {
622 return !methodName.equals("GET") && !methodName.equals("HEAD");
623 }
624
625 private static ArrayList<Map.Entry<String, String>> headersListFromStrings(S tring[] headers) {
626 ArrayList<Map.Entry<String, String>> headersList = new ArrayList<>(heade rs.length / 2);
627 for (int i = 0; i < headers.length; i += 2) {
628 headersList.add(new AbstractMap.SimpleImmutableEntry<>(headers[i], h eaders[i + 1]));
629 }
630 return headersList;
631 }
632
633 private static String[] stringsFromHeaderList(List<Map.Entry<String, String> > headersList) {
634 String headersArray[] = new String[headersList.size() * 2];
635 int i = 0;
636 for (Map.Entry<String, String> requestHeader : headersList) {
637 headersArray[i++] = requestHeader.getKey();
638 headersArray[i++] = requestHeader.getValue();
639 }
640 return headersArray;
641 }
642
643 private static int convertStreamPriority(
644 @BidirectionalStream.Builder.StreamPriority int priority) {
645 switch (priority) {
646 case Builder.STREAM_PRIORITY_IDLE:
647 return RequestPriority.IDLE;
648 case Builder.STREAM_PRIORITY_LOWEST:
649 return RequestPriority.LOWEST;
650 case Builder.STREAM_PRIORITY_LOW:
651 return RequestPriority.LOW;
652 case Builder.STREAM_PRIORITY_MEDIUM:
653 return RequestPriority.MEDIUM;
654 case Builder.STREAM_PRIORITY_HIGHEST:
655 return RequestPriority.HIGHEST;
656 default:
657 throw new IllegalArgumentException("Invalid stream priority.");
658 }
659 }
660
661 /**
662 * Posts task to application Executor. Used for callbacks
663 * and other tasks that should not be executed on network thread.
664 */
665 private void postTaskToExecutor(Runnable task) {
666 try {
667 mExecutor.execute(task);
668 } catch (RejectedExecutionException failException) {
669 Log.e(CronetUrlRequestContext.LOG_TAG, "Exception posting task to ex ecutor",
670 failException);
671 // If posting a task throws an exception, then there is no choice
672 // but to destroy the stream without invoking the callback.
673 synchronized (mNativeStreamLock) {
674 mReadState = mWriteState = State.ERROR;
675 destroyNativeStreamLocked(false);
676 }
677 }
678 }
679
680 private UrlResponseInfo prepareResponseInfoOnNetworkThread(int httpStatusCod e,
681 String negotiatedProtocol, String[] headers, long receivedBytesCount ) {
682 UrlResponseInfo responseInfo =
683 new UrlResponseInfo(Arrays.asList(mInitialUrl), httpStatusCode, "",
684 headersListFromStrings(headers), false, negotiatedProtoc ol, null);
685 responseInfo.setReceivedBytesCount(receivedBytesCount);
686 return responseInfo;
687 }
688
689 @GuardedBy("mNativeStreamLock")
690 private void destroyNativeStreamLocked(boolean sendOnCanceled) {
691 Log.i(CronetUrlRequestContext.LOG_TAG, "destroyNativeStreamLocked " + th is.toString());
692 if (mNativeStream == 0) {
693 return;
694 }
695 nativeDestroy(mNativeStream, sendOnCanceled);
696 mNativeStream = 0;
697 mRequestContext.onRequestDestroyed();
698 if (mOnDestroyedCallbackForTesting != null) {
699 mOnDestroyedCallbackForTesting.run();
700 }
701 }
702
703 /**
704 * Fails the stream with an exception. Only called on the Executor.
705 */
706 private void failWithExceptionOnExecutor(CronetException e) {
707 // Do not call into mCallback if request is complete.
708 synchronized (mNativeStreamLock) {
709 if (isDoneLocked()) {
710 return;
711 }
712 mReadState = mWriteState = State.ERROR;
713 destroyNativeStreamLocked(false);
714 }
715 try {
716 mCallback.onFailed(this, mResponseInfo, e);
717 } catch (Exception failException) {
718 Log.e(CronetUrlRequestContext.LOG_TAG, "Exception notifying of faile d request",
719 failException);
720 }
721 }
722
723 /**
724 * If callback method throws an exception, stream gets canceled
725 * and exception is reported via onFailed callback.
726 * Only called on the Executor.
727 */
728 private void onCallbackException(Exception e) {
729 CronetException streamError =
730 new CronetException("CalledByNative method has thrown an excepti on", e);
731 Log.e(CronetUrlRequestContext.LOG_TAG, "Exception in CalledByNative meth od", e);
732 failWithExceptionOnExecutor(streamError);
733 }
734
735 /**
736 * Fails the stream with an exception. Can be called on any thread.
737 */
738 private void failWithException(final CronetException exception) {
739 postTaskToExecutor(new Runnable() {
740 public void run() {
741 failWithExceptionOnExecutor(exception);
742 }
743 });
744 }
745
746 // Native methods are implemented in cronet_bidirectional_stream_adapter.cc.
747 private native long nativeCreateBidirectionalStream(
748 long urlRequestContextAdapter, boolean sendRequestHeadersAutomatical ly);
749
750 @NativeClassQualifiedName("CronetBidirectionalStreamAdapter")
751 private native int nativeStart(long nativePtr, String url, int priority, Str ing method,
752 String[] headers, boolean endOfStream);
753
754 @NativeClassQualifiedName("CronetBidirectionalStreamAdapter")
755 private native void nativeSendRequestHeaders(long nativePtr);
756
757 @NativeClassQualifiedName("CronetBidirectionalStreamAdapter")
758 private native boolean nativeReadData(
759 long nativePtr, ByteBuffer byteBuffer, int position, int limit);
760
761 @NativeClassQualifiedName("CronetBidirectionalStreamAdapter")
762 private native boolean nativeWritevData(long nativePtr, ByteBuffer[] buffers , int[] positions,
763 int[] limits, boolean endOfStream);
764
765 @NativeClassQualifiedName("CronetBidirectionalStreamAdapter")
766 private native void nativeDestroy(long nativePtr, boolean sendOnCanceled);
767 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698