Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(175)

Side by Side Diff: components/cronet/android/java/src/org/chromium/net/CronetBidirectionalStream.java

Issue 1412243012: Initial implementation of CronetBidirectionalStream. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Removed State.WRITING_END_OF_STREAM Created 4 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package org.chromium.net;
6
7 import org.chromium.base.Log;
8 import org.chromium.base.VisibleForTesting;
9 import org.chromium.base.annotations.CalledByNative;
10 import org.chromium.base.annotations.JNINamespace;
11 import org.chromium.base.annotations.NativeClassQualifiedName;
12
13 import java.nio.ByteBuffer;
14 import java.util.AbstractMap;
15 import java.util.ArrayList;
16 import java.util.List;
17 import java.util.Map;
18 import java.util.concurrent.Executor;
19 import java.util.concurrent.RejectedExecutionException;
20
21 import javax.annotation.concurrent.GuardedBy;
22
23 /**
24 * {@link BidirectionalStream} implementation using Chromium network stack.
25 * All @CalledByNative methods are called on the native network thread
26 * and post tasks with callback calls onto Executor. Upon returning from callbac k, the native
27 * stream is called on Executor thread and posts native tasks to the native netw ork thread.
28 */
29 @JNINamespace("cronet")
30 class CronetBidirectionalStream extends BidirectionalStream {
31 /**
32 * States of BidirectionalStream are tracked in mReadState and mWriteState.
33 * The write state is separated out as it changes independently of the read state.
34 * There is one initial state: State.NOT_STARTED. There is one normal final state:
35 * State.SUCCESS, reached after State.READING_DONE and State.WRITING_DONE. T here are two
36 * exceptional final states: State.CANCELED and State.ERROR, which can be re ached from
37 * any other non-final state.
38 */
39 private enum State {
40 /* Initial state, stream not started. */
41 NOT_STARTED,
42 /* Stream started, request headers are being sent. */
43 STARTED,
44 /* Waiting for {@code read()} to be called. */
45 WAITING_FOR_READ,
46 /* Reading from the remote, {@code onReadCompleted()} callback will be c alled when done. */
47 READING,
48 /* There is no more data to read and stream is half-closed by the remote side. */
49 READING_DONE,
50 /* Stream is canceled. */
51 CANCELED,
52 /* Error has occured, stream is closed. */
53 ERROR,
54 /* Reading and writing are done, and the stream is closed successfully. */
55 SUCCESS,
56 /* Waiting for {@code write()} to be called. */
57 WAITING_FOR_WRITE,
58 /* Writing to the remote, {@code onWriteCompleted()} callback will be ca lled when done. */
59 WRITING,
60 /* There is no more data to write and stream is half-closed by the local side. */
61 WRITING_DONE,
62 }
63
64 private final CronetUrlRequestContext mRequestContext;
65 private final Executor mExecutor;
66 private final Callback mCallback;
67 private final String mInitialUrl;
68 private final int mInitialPriority;
69 private final String mInitialMethod;
70 private final String mRequestHeaders[];
71
72 /*
73 * Synchronizes access to mNativeStream, mReadState and mWriteState.
74 */
75 private final Object mNativeStreamLock = new Object();
76
77 /* Native BidirectionalStream object, owned by CronetBidirectionalStream. */
78 @GuardedBy("mNativeStreamLock") private long mNativeStream;
79
80 /**
81 * Read state is tracking reading flow.
82 * / <--- READING <--- \
83 * | |
84 * \ /
85 * NOT_STARTED -> STARTED --> WAITING_FOR_READ -> READING_DONE -> SUCCESS
86 */
87 @GuardedBy("mNativeStreamLock") private State mReadState = State.NOT_STARTED ;
88
89 /**
90 * Write state is tracking writing flow.
91 * / <--- WRITING <--- \
92 * | |
93 * \ /
94 * NOT_STARTED -> STARTED --> WAITING_FOR_WRITE -> WRITING_DONE -> SUCCESS
95 */
96 @GuardedBy("mNativeStreamLock") private State mWriteState = State.NOT_STARTE D;
97
98 private UrlResponseInfo mResponseInfo;
99
100 /*
101 * OnReadCompleted callback is repeatedly invoked when each read is complete d, so it
102 * is cached as a member variable.
103 */
104 private OnReadCompletedRunnable mOnReadCompletedTask;
105
106 /*
107 * OnWriteCompleted callback is repeatedly invoked when each write is comple ted, so it
108 * is cached as a member variable.
109 */
110 private OnWriteCompletedRunnable mOnWriteCompletedTask;
111
112 private Runnable mOnDestroyedCallbackForTesting;
113
114 private final class OnReadCompletedRunnable implements Runnable {
115 // Buffer passed back from current invocation of onReadCompleted.
116 ByteBuffer mByteBuffer;
117 // End of stream flag from current invocation of onReadCompleted.
118 boolean mEndOfStream;
119
120 @Override
121 public void run() {
122 try {
123 // Null out mByteBuffer, to pass buffer ownership to callback or release if done.
124 ByteBuffer buffer = mByteBuffer;
125 mByteBuffer = null;
126 synchronized (mNativeStreamLock) {
127 if (isDoneLocked()) {
128 return;
129 }
130 if (mEndOfStream) {
131 mReadState = State.READING_DONE;
132 if (maybeSucceedLocked()) {
133 return;
134 }
135 } else {
136 mReadState = State.WAITING_FOR_READ;
137 }
138 }
139 mCallback.onReadCompleted(CronetBidirectionalStream.this, mRespo nseInfo, buffer);
140 } catch (Exception e) {
141 onCallbackException(e);
142 }
143 }
144 }
145
146 private final class OnWriteCompletedRunnable implements Runnable {
147 // Buffer passed back from current invocation of onWriteCompleted.
148 ByteBuffer mByteBuffer;
149 // End of stream flag from current call to write.
150 boolean mEndOfStream;
151
152 @Override
153 public void run() {
154 try {
155 // Null out mByteBuffer, to pass buffer ownership to callback or release if done.
156 ByteBuffer buffer = mByteBuffer;
157 mByteBuffer = null;
158 synchronized (mNativeStreamLock) {
159 if (isDoneLocked()) {
160 return;
161 }
162 if (mEndOfStream) {
163 mWriteState = State.WRITING_DONE;
164 if (maybeSucceedLocked()) {
165 return;
166 }
167 } else {
168 mWriteState = State.WAITING_FOR_WRITE;
169 }
170 }
171 mCallback.onWriteCompleted(CronetBidirectionalStream.this, mResp onseInfo, buffer);
172 } catch (Exception e) {
173 onCallbackException(e);
174 }
175 }
176 }
177
178 CronetBidirectionalStream(CronetUrlRequestContext requestContext, String url ,
179 @BidirectionalStream.Builder.StreamPriority int priority, Callback c allback,
180 Executor executor, String httpMethod, List<Map.Entry<String, String> > requestHeaders) {
181 mRequestContext = requestContext;
182 mInitialUrl = url;
183 mInitialPriority = convertStreamPriority(priority);
184 mCallback = callback;
185 mExecutor = executor;
186 mInitialMethod = httpMethod;
187 mRequestHeaders = stringsFromHeaderList(requestHeaders);
188 }
189
190 @Override
191 public void start() {
192 synchronized (mNativeStreamLock) {
193 if (mReadState != State.NOT_STARTED) {
194 throw new IllegalStateException("Stream is already started.");
195 }
196 try {
197 mNativeStream = nativeCreateBidirectionalStream(
198 mRequestContext.getUrlRequestContextAdapter());
199 mRequestContext.onRequestStarted();
200 // Non-zero startResult means an argument error.
201 int startResult = nativeStart(mNativeStream, mInitialUrl, mIniti alPriority,
202 mInitialMethod, mRequestHeaders, !doesMethodAllowWriteDa ta(mInitialMethod));
203 if (startResult == -1) {
204 throw new IllegalArgumentException("Invalid http method " + mInitialMethod);
205 }
206 if (startResult > 0) {
207 int headerPos = startResult - 1;
208 throw new IllegalArgumentException("Invalid header "
209 + mRequestHeaders[headerPos] + "=" + mRequestHeaders [headerPos + 1]);
210 }
211 mReadState = mWriteState = State.STARTED;
212 } catch (RuntimeException e) {
213 // If there's an exception, clean up and then throw the
214 // exception to the caller.
215 destroyNativeStreamLocked(false);
216 throw e;
217 }
218 }
219 }
220
221 @Override
222 public void read(ByteBuffer buffer) {
223 synchronized (mNativeStreamLock) {
224 Preconditions.checkHasRemaining(buffer);
225 Preconditions.checkDirect(buffer);
226 if (mReadState != State.WAITING_FOR_READ) {
227 throw new IllegalStateException("Unexpected read attempt.");
228 }
229 if (isDoneLocked()) {
230 return;
231 }
232 if (mOnReadCompletedTask == null) {
233 mOnReadCompletedTask = new OnReadCompletedRunnable();
234 }
235 mReadState = State.READING;
236 if (!nativeReadData(mNativeStream, buffer, buffer.position(), buffer .limit())) {
237 // Still waiting on read. This is just to have consistent
238 // behavior with the other error cases.
239 mReadState = State.WAITING_FOR_READ;
240 throw new IllegalArgumentException("Unable to call native read") ;
241 }
242 }
243 }
244
245 @Override
246 public void write(ByteBuffer buffer, boolean endOfStream) {
247 synchronized (mNativeStreamLock) {
248 Preconditions.checkDirect(buffer);
249 if (!buffer.hasRemaining() && !endOfStream) {
250 throw new IllegalArgumentException("Empty buffer before end of s tream.");
251 }
252 if (mWriteState != State.WAITING_FOR_WRITE) {
253 throw new IllegalStateException("Unexpected write attempt.");
254 }
255 if (isDoneLocked()) {
256 return;
257 }
258 if (mOnWriteCompletedTask == null) {
259 mOnWriteCompletedTask = new OnWriteCompletedRunnable();
260 }
261 mOnWriteCompletedTask.mEndOfStream = endOfStream;
262 mWriteState = State.WRITING;
263 if (!nativeWriteData(
264 mNativeStream, buffer, buffer.position(), buffer.limit() , endOfStream)) {
265 // Still waiting on write. This is just to have consistent
266 // behavior with the other error cases.
267 mWriteState = State.WAITING_FOR_WRITE;
268 throw new IllegalArgumentException("Unable to call native write" );
269 }
270 }
271 }
272
273 @Override
274 public void ping(PingCallback callback, Executor executor) {
275 // TODO(mef): May be last thing to be implemented on Android.
276 throw new UnsupportedOperationException("ping is not supported yet.");
277 }
278
279 @Override
280 public void windowUpdate(int windowSizeIncrement) {
281 // TODO(mef): Understand the needs and semantics of this method.
282 throw new UnsupportedOperationException("windowUpdate is not supported y et.");
283 }
284
285 @Override
286 public void cancel() {
287 synchronized (mNativeStreamLock) {
288 if (isDoneLocked() || mReadState == State.NOT_STARTED) {
289 return;
290 }
291 mReadState = mWriteState = State.CANCELED;
292 destroyNativeStreamLocked(true);
293 }
294 }
295
296 @Override
297 public boolean isDone() {
298 synchronized (mNativeStreamLock) {
299 return isDoneLocked();
300 }
301 }
302
303 @GuardedBy("mNativeStreamLock")
304 private boolean isDoneLocked() {
305 return mReadState != State.NOT_STARTED && mNativeStream == 0;
306 }
307
308 @SuppressWarnings("unused")
309 @CalledByNative
310 private void onRequestHeadersSent() {
311 postTaskToExecutor(new Runnable() {
312 public void run() {
313 synchronized (mNativeStreamLock) {
314 if (isDoneLocked()) {
315 return;
316 }
317 if (doesMethodAllowWriteData(mInitialMethod)) {
318 mWriteState = State.WAITING_FOR_WRITE;
319 } else {
320 mWriteState = State.WRITING_DONE;
321 }
322 }
323
324 try {
325 mCallback.onRequestHeadersSent(CronetBidirectionalStream.thi s);
326 } catch (Exception e) {
327 onCallbackException(e);
328 }
329 }
330 });
331 }
332
333 /**
334 * Called when the final set of headers, after all redirects,
335 * is received. Can only be called once for each stream.
336 */
337 @SuppressWarnings("unused")
338 @CalledByNative
339 private void onResponseHeadersReceived(int httpStatusCode, String negotiated Protocol,
340 String[] headers, long receivedBytesCount) {
341 try {
342 mResponseInfo = prepareResponseInfoOnNetworkThread(
343 httpStatusCode, negotiatedProtocol, headers, receivedBytesCo unt);
344 } catch (Exception e) {
xunjieli 2016/01/22 22:22:38 I think Andrei might meant that a null check is ne
mef 2016/01/25 18:11:25 prepareResponseInfoOnNetworkThread could return nu
345 failWithException(new CronetException("Cannot prepare ResponseInfo", null));
346 return;
347 }
348 postTaskToExecutor(new Runnable() {
349 public void run() {
350 synchronized (mNativeStreamLock) {
351 if (isDoneLocked()) {
352 return;
353 }
354 mReadState = State.WAITING_FOR_READ;
355 }
356
357 try {
358 mCallback.onResponseHeadersReceived(
359 CronetBidirectionalStream.this, mResponseInfo);
360 } catch (Exception e) {
361 onCallbackException(e);
362 }
363 }
364 });
365 }
366
367 @SuppressWarnings("unused")
368 @CalledByNative
369 private void onReadCompleted(final ByteBuffer byteBuffer, int bytesRead, int initialPosition,
370 int initialLimit, long receivedBytesCount) {
371 mResponseInfo.setReceivedBytesCount(receivedBytesCount);
372 if (byteBuffer.position() != initialPosition || byteBuffer.limit() != in itialLimit) {
373 failWithException(
374 new CronetException("ByteBuffer modified externally during r ead", null));
375 return;
376 }
377 if (bytesRead < 0 || initialPosition + bytesRead > initialLimit) {
378 failWithException(new CronetException("Invalid number of bytes read" , null));
379 return;
380 }
381 byteBuffer.position(initialPosition + bytesRead);
382 mOnReadCompletedTask.mByteBuffer = byteBuffer;
383 mOnReadCompletedTask.mEndOfStream = (bytesRead == 0);
384 postTaskToExecutor(mOnReadCompletedTask);
385 }
386
387 @SuppressWarnings("unused")
388 @CalledByNative
389 private void onWriteCompleted(
390 final ByteBuffer byteBuffer, int initialPosition, int initialLimit) {
391 if (byteBuffer.position() != initialPosition || byteBuffer.limit() != in itialLimit) {
392 failWithException(
393 new CronetException("ByteBuffer modified externally during w rite", null));
394 return;
395 }
396 // Current implementation always writes the complete buffer.
397 byteBuffer.position(byteBuffer.limit());
398 mOnWriteCompletedTask.mByteBuffer = byteBuffer;
399 postTaskToExecutor(mOnWriteCompletedTask);
400 }
401
402 @SuppressWarnings("unused")
403 @CalledByNative
404 private void onResponseTrailersReceived(String[] trailers) {
405 final UrlResponseInfo.HeaderBlock trailersBlock =
406 new UrlResponseInfo.HeaderBlock(headersListFromStrings(trailers) );
407 postTaskToExecutor(new Runnable() {
408 public void run() {
409 synchronized (mNativeStreamLock) {
410 if (isDoneLocked()) {
411 return;
412 }
413 }
414 try {
415 mCallback.onResponseTrailersReceived(
416 CronetBidirectionalStream.this, mResponseInfo, trail ersBlock);
417 } catch (Exception e) {
418 onCallbackException(e);
419 }
420 }
421 });
422 }
423
424 @SuppressWarnings("unused")
425 @CalledByNative
426 private void onError(final int nativeError, final String errorString, long r eceivedBytesCount) {
427 if (mResponseInfo != null) {
428 mResponseInfo.setReceivedBytesCount(receivedBytesCount);
429 }
430 failWithException(new CronetException(
431 "Exception in BidirectionalStream: " + errorString, nativeError) );
432 }
433
434 /**
435 * Called when request is canceled, no callbacks will be called afterwards.
436 */
437 @SuppressWarnings("unused")
438 @CalledByNative
439 private void onCanceled() {
440 postTaskToExecutor(new Runnable() {
441 public void run() {
442 try {
443 mCallback.onCanceled(CronetBidirectionalStream.this, mRespon seInfo);
444 } catch (Exception e) {
445 Log.e(CronetUrlRequestContext.LOG_TAG, "Exception in onCance led method", e);
446 }
447 }
448 });
449 }
450
451 @VisibleForTesting
452 public void setOnDestroyedCallbackForTesting(Runnable onDestroyedCallbackFor Testing) {
453 mOnDestroyedCallbackForTesting = onDestroyedCallbackForTesting;
454 }
455
456 private static boolean doesMethodAllowWriteData(String methodName) {
457 return !methodName.equals("GET") && !methodName.equals("HEAD");
458 }
459
460 private static ArrayList<Map.Entry<String, String>> headersListFromStrings(S tring[] headers) {
461 ArrayList<Map.Entry<String, String>> headersList = new ArrayList<>(heade rs.length / 2);
462 for (int i = 0; i < headers.length; i += 2) {
463 headersList.add(new AbstractMap.SimpleImmutableEntry<>(headers[i], h eaders[i + 1]));
464 }
465 return headersList;
466 }
467
468 private static String[] stringsFromHeaderList(List<Map.Entry<String, String> > headersList) {
469 String headersArray[] = new String[headersList.size() * 2];
470 int i = 0;
471 for (Map.Entry<String, String> requestHeader : headersList) {
472 headersArray[i++] = requestHeader.getKey();
473 headersArray[i++] = requestHeader.getValue();
kapishnikov 2016/01/22 23:19:33 How will 'null' header values be interpreted? From
mef 2016/01/25 18:11:25 BidirectionalStream.Builder.addHeader doesn't allo
474 }
475 return headersArray;
476 }
477
478 private static int convertStreamPriority(
479 @BidirectionalStream.Builder.StreamPriority int priority) {
480 switch (priority) {
481 case Builder.STREAM_PRIORITY_IDLE:
482 return RequestPriority.IDLE;
483 case Builder.STREAM_PRIORITY_LOWEST:
484 return RequestPriority.LOWEST;
485 case Builder.STREAM_PRIORITY_LOW:
486 return RequestPriority.LOW;
487 case Builder.STREAM_PRIORITY_MEDIUM:
488 return RequestPriority.MEDIUM;
489 case Builder.STREAM_PRIORITY_HIGHEST:
490 return RequestPriority.HIGHEST;
491 default:
492 throw new IllegalArgumentException("Invalid stream priority.");
493 }
494 }
495
496 /**
497 * Checks whether reading and writing are done.
498 * @return false if either reading or writing is not done. If both reading a nd writing
499 * are done, then posts cleanup task and returns true.
500 */
501 @GuardedBy("mNativeStreamLock")
502 private boolean maybeSucceedLocked() {
503 if (mReadState != State.READING_DONE || mWriteState != State.WRITING_DON E) {
504 return false;
505 }
506
507 mReadState = mWriteState = State.SUCCESS;
508 postTaskToExecutor(new Runnable() {
509 public void run() {
510 synchronized (mNativeStreamLock) {
511 if (isDoneLocked()) {
512 return;
513 }
514 // Destroy native stream first, so UrlRequestContext could b e shut
515 // down from the listener.
516 destroyNativeStreamLocked(false);
517 }
518 try {
519 mCallback.onSucceeded(CronetBidirectionalStream.this, mRespo nseInfo);
520 } catch (Exception e) {
521 Log.e(CronetUrlRequestContext.LOG_TAG, "Exception in onSucce eded method", e);
522 }
523 }
524 });
525 return true;
526 }
527
528 /**
529 * Posts task to application Executor. Used for callbacks
530 * and other tasks that should not be executed on network thread.
531 */
532 private void postTaskToExecutor(Runnable task) {
533 try {
534 mExecutor.execute(task);
535 } catch (RejectedExecutionException failException) {
536 Log.e(CronetUrlRequestContext.LOG_TAG, "Exception posting task to ex ecutor",
537 failException);
538 // If posting a task throws an exception, then there is no choice
539 // but to destroy the stream without invoking the callback.
540 synchronized (mNativeStreamLock) {
541 mReadState = mWriteState = State.ERROR;
542 destroyNativeStreamLocked(false);
543 }
544 }
545 }
546
547 private UrlResponseInfo prepareResponseInfoOnNetworkThread(int httpStatusCod e,
548 String negotiatedProtocol, String[] headers, long receivedBytesCount ) {
549 synchronized (mNativeStreamLock) {
550 if (mNativeStream == 0) {
551 return null;
552 }
553 }
554
555 ArrayList<String> urlChain = new ArrayList<>();
556 urlChain.add(mInitialUrl);
557
558 UrlResponseInfo responseInfo = new UrlResponseInfo(urlChain, httpStatusC ode, "",
559 headersListFromStrings(headers), false, negotiatedProtocol, null );
560
561 responseInfo.setReceivedBytesCount(receivedBytesCount);
562 return responseInfo;
563 }
564
565 @GuardedBy("mNativeStreamLock")
566 private void destroyNativeStreamLocked(boolean sendOnCanceled) {
567 Log.i(CronetUrlRequestContext.LOG_TAG, "destroyNativeStreamLocked " + th is.toString());
568 if (mNativeStream == 0) {
569 return;
570 }
571 nativeDestroy(mNativeStream, sendOnCanceled);
572 mNativeStream = 0;
573 mRequestContext.onRequestDestroyed();
574 if (mOnDestroyedCallbackForTesting != null) {
575 mOnDestroyedCallbackForTesting.run();
576 }
577 }
578
579 /**
580 * Fails the stream with an exception. Only called on the Executor.
581 */
582 private void failWithExceptionOnExecutor(CronetException e) {
583 // Do not call into listener if request is complete.
584 synchronized (mNativeStreamLock) {
585 if (isDoneLocked()) {
586 return;
587 }
588 mReadState = mWriteState = State.ERROR;
589 destroyNativeStreamLocked(false);
590 }
591 try {
592 mCallback.onFailed(this, mResponseInfo, e);
593 } catch (Exception failException) {
594 Log.e(CronetUrlRequestContext.LOG_TAG, "Exception notifying of faile d request",
595 failException);
596 }
597 }
598
599 /**
600 * If callback method throws an exception, stream gets canceled
601 * and exception is reported via onFailed callback.
602 * Only called on the Executor.
603 */
604 private void onCallbackException(Exception e) {
605 CronetException streamError =
606 new CronetException("CalledByNative method has thrown an excepti on", e);
607 Log.e(CronetUrlRequestContext.LOG_TAG, "Exception in CalledByNative meth od", e);
608 failWithExceptionOnExecutor(streamError);
609 }
610
611 /**
612 * Fails the stream with an exception. Can be called on any thread.
613 */
614 private void failWithException(final CronetException exception) {
615 postTaskToExecutor(new Runnable() {
616 public void run() {
617 failWithExceptionOnExecutor(exception);
618 }
619 });
620 }
621
622 // Native methods are implemented in cronet_bidirectional_stream_adapter.cc.
623 private native long nativeCreateBidirectionalStream(long urlRequestContextAd apter);
624
625 @NativeClassQualifiedName("CronetBidirectionalStreamAdapter")
626 private native int nativeStart(long nativePtr, String url, int priority, Str ing method,
627 String[] headers, boolean endOfStream);
628
629 @NativeClassQualifiedName("CronetBidirectionalStreamAdapter")
630 private native boolean nativeReadData(
631 long nativePtr, ByteBuffer byteBuffer, int position, int limit);
632
633 @NativeClassQualifiedName("CronetBidirectionalStreamAdapter")
634 private native boolean nativeWriteData(
635 long nativePtr, ByteBuffer byteBuffer, int position, int limit, bool ean endOfStream);
636
637 @NativeClassQualifiedName("CronetBidirectionalStreamAdapter")
638 private native void nativeDestroy(long nativePtr, boolean sendOnCanceled);
639 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698