Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(348)

Side by Side Diff: components/cronet/android/java/src/org/chromium/net/CronetBidirectionalStream.java

Issue 1412243012: Initial implementation of CronetBidirectionalStream. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Little cleanup. Created 5 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package org.chromium.net;
6
7 import android.util.Log;
8
9 import org.chromium.base.VisibleForTesting;
10 import org.chromium.base.annotations.CalledByNative;
11 import org.chromium.base.annotations.JNINamespace;
12 import org.chromium.base.annotations.NativeClassQualifiedName;
13
14 import java.nio.ByteBuffer;
15 import java.util.AbstractMap;
16 import java.util.ArrayList;
17 import java.util.List;
18 import java.util.Map;
19 import java.util.concurrent.Executor;
20 import java.util.concurrent.RejectedExecutionException;
21
22 import javax.annotation.concurrent.GuardedBy;
23
24 /**
25 * BidirectionalStream implementation using Chromium network stack.
26 * All @CallByNative methods are called on native network thread
27 * and post tasks with callback calls onto Executor. Upon return from callback n ative
28 * stream is called on executor thread and posts native tasks to native network thread.
29 */
30 @JNINamespace("cronet")
31 class CronetBidirectionalStream extends BidirectionalStream {
32 /* Native BidirectionalStream object, owned by CronetBidirectionalStream. */
33 @GuardedBy("mNativeStreamLock") private long mNativeStream;
xunjieli 2015/12/08 19:07:20 nit: Maybe reorder these fields. Static fields bef
mef 2015/12/11 21:28:25 Done.
34
35 private static final int STATE_NOT_STARTED = 0;
36 private static final int STATE_STARTED = 1;
37 private static final int STATE_WAITING_ON_READ = 2;
38 private static final int STATE_READING = 3;
39 private static final int STATE_READING_DONE = 4;
40 private static final int STATE_CANCELED = 5;
41 private static final int STATE_ERROR = 6;
42 private static final int STATE_SUCCESS = 7;
43 /**
44 * Stream state covering common and reading state flow.
45 * NOT_STARTED -> STARTED -> WAITING_ON_READ -> READING -> WAITING_ON_READ - >
46 * READING -> READING_DONE -> SUCCESS
47 */
48 @GuardedBy("mNativeStreamLock") private int mStreamState = STATE_NOT_STARTED ;
49
50 private static final int STATE_WAITING_ON_WRITE = 10;
51 private static final int STATE_WRITING = 11;
52 private static final int STATE_WRITING_END_OF_STREAM = 12;
53 private static final int STATE_WRITING_DONE = 13;
xunjieli 2015/12/08 19:07:19 Need a comment for each of these states. It's uncl
mef 2015/12/11 21:28:26 Done.
54 /**
55 * Write state covering writing state flow.
56 * NOT_STARTED -> WAITING_ON_WRITE -> WRITING -> WAITING_ON_WRITE ->
57 * WRITING_END_OF_STREAM -> WRITING_DONE -> SUCCESS
xunjieli 2015/12/08 19:07:20 Maybe consider combining the read state flow with
mef 2015/12/11 21:28:26 Yeah, I'm not sure what's a good way to express th
58 */
59 @GuardedBy("mNativeStreamLock") private int mWriteState = STATE_NOT_STARTED;
60
61 /*
62 * Synchronize access to mNativeStream, mStreamState and mWriteState.
63 */
64 private final Object mNativeStreamLock = new Object();
65 private final CronetUrlRequestContext mRequestContext;
66 private final Executor mExecutor;
67
68 private final Callback mCallback;
69 private final String mInitialUrl;
70 private String mInitialMethod;
xunjieli 2015/12/08 17:59:56 nit: final field.
mef 2015/12/11 21:28:26 Done.
71
72 private final ArrayList<Map.Entry<String, String>> mRequestHeaders;
73
74 private UrlResponseInfo mResponseInfo;
75
76 /*
77 * OnReadCompleted callback is repeatedly invoked when each read is complete d, so it
78 * is cached as a member variable.
79 */
80 private OnReadCompletedRunnable mOnReadCompletedTask;
81
82 /*
83 * OnWriteCompleted callback is repeatedly invoked when each write is comple ted, so it
84 * is cached as a member variable.
85 */
86 private OnWriteCompletedRunnable mOnWriteCompletedTask;
87
88 private Runnable mOnDestroyedCallbackForTests;
89
90 private final class OnReadCompletedRunnable implements Runnable {
91 ByteBuffer mByteBuffer;
92 boolean mEndOfStream;
93
94 @Override
95 public void run() {
96 Log.e(CronetUrlRequestContext.LOG_TAG,
97 "OnReadCompletedRunnable:" + mByteBuffer.toString());
xunjieli 2015/12/08 19:07:19 nit: This probably shouldn't be Log.e. Maybe we sh
mef 2015/12/11 21:28:26 Done. Used those for debugging.
98
99 if (isDone()) {
100 return;
101 }
102 try {
103 synchronized (mNativeStreamLock) {
104 if (mNativeStream == 0) {
105 return;
106 }
107 if (mEndOfStream) {
108 mStreamState = STATE_READING_DONE;
109 if (maybeSucceeded()) return;
110 } else {
111 mStreamState = STATE_WAITING_ON_READ;
112 }
113 }
114 // Null out mByteBuffer, out of paranoia. Has to be done before
115 // mCallback call, to avoid any race when there are multiple
116 // executor threads.
117 ByteBuffer buffer = mByteBuffer;
118 mByteBuffer = null;
119 mCallback.onReadCompleted(CronetBidirectionalStream.this, mRespo nseInfo, buffer);
120 } catch (Exception e) {
121 onCallbackException(e);
122 }
123 }
124 }
125
126 private final class OnWriteCompletedRunnable implements Runnable {
127 ByteBuffer mByteBuffer;
128
129 @Override
130 public void run() {
131 if (isDone()) {
132 return;
133 }
134 try {
135 synchronized (mNativeStreamLock) {
136 if (mNativeStream == 0) {
137 return;
138 }
139 if (mWriteState == STATE_WRITING_END_OF_STREAM) {
140 mWriteState = STATE_WRITING_DONE;
141 if (maybeSucceeded()) return;
142 } else {
143 mWriteState = STATE_WAITING_ON_WRITE;
144 }
145 }
146 // Null out mByteBuffer, out of paranoia. Has to be done before
147 // mCallback call, to avoid any race when there are multiple
148 // executor threads.
149 ByteBuffer buffer = mByteBuffer;
150 mByteBuffer = null;
151 mCallback.onWriteCompleted(CronetBidirectionalStream.this, mResp onseInfo, buffer);
152 } catch (Exception e) {
153 onCallbackException(e);
154 }
155 }
156 }
157
158 @GuardedBy("nativeStreamLock")
159 private boolean maybeSucceeded() {
160 if (mStreamState == STATE_READING_DONE && mWriteState == STATE_WRITING_D ONE) {
161 mStreamState = STATE_SUCCESS;
162 onSucceeded();
xunjieli 2015/12/08 17:59:56 maybe consider inline onSucceeded? since the metho
mef 2015/12/11 21:28:25 Done.
163 return true;
164 }
165 return false;
166 }
167
168 CronetBidirectionalStream(CronetUrlRequestContext requestContext, long urlRe questContextAdapter,
169 String url, Callback callback, Executor executor, String httpMethod,
170 List<Map.Entry<String, String>> requestHeaders) {
171 mRequestContext = requestContext;
172 mInitialUrl = url;
173 mCallback = callback;
174 mExecutor = executor;
175 mInitialMethod = httpMethod;
176 mRequestHeaders = new ArrayList<Map.Entry<String, String>>(requestHeader s);
177 }
178
179 @Override
180 public void start() {
181 synchronized (mNativeStreamLock) {
182 if (mStreamState != STATE_NOT_STARTED) {
183 throw new IllegalStateException("Stream is already started.");
184 }
185 try {
186 mNativeStream = nativeCreateBidirectionalStream(
187 mRequestContext.getUrlRequestContextAdapter());
188 mRequestContext.onRequestStarted();
189 String headers[] = stringsFromHeaderList(mRequestHeaders);
190 // Non-zero startResult means an argument error.
191 int startResult = nativeStart(mNativeStream, mInitialUrl, mIniti alMethod, headers);
192 if (startResult == -1) {
193 throw new IllegalArgumentException("Invalid http method " + mInitialMethod);
194 }
195 if (startResult > 0) {
196 int headerPos = startResult - 1;
197 throw new IllegalArgumentException(
198 "Invalid header " + headers[headerPos] + "=" + heade rs[headerPos + 1]);
199 }
200 mStreamState = STATE_STARTED;
201 } catch (RuntimeException e) {
202 // If there's an exception, cleanup and then throw the
203 // exception to the caller.
204 destroyNativeStream(false);
205 throw e;
206 }
207 }
208 }
209
210 @Override
211 public void read(ByteBuffer buffer) {
212 synchronized (mNativeStreamLock) {
213 if (!buffer.hasRemaining()) {
214 throw new IllegalArgumentException("ByteBuffer is already full." );
215 }
216 if (mStreamState != STATE_WAITING_ON_READ) {
217 throw new IllegalStateException("Unexpected read attempt.");
218 }
219 if (isDone()) {
220 return;
221 }
222 mStreamState = STATE_READING;
223 if (!nativeReadData(mNativeStream, buffer, buffer.position(), buffer .limit())) {
224 // Still waiting on read. This is just to have consistent
225 // behavior with the other error cases.
226 mStreamState = STATE_WAITING_ON_READ;
227 // Since accessing byteBuffer's memory failed, it's presumably
228 // not a direct ByteBuffer.
229 throw new IllegalArgumentException("byteBuffer must be a direct ByteBuffer.");
230 }
231 }
232 }
233
234 @Override
235 public void write(ByteBuffer buffer, boolean endOfStream) {
236 synchronized (mNativeStreamLock) {
237 if (!buffer.hasRemaining() && !endOfStream) {
238 throw new IllegalArgumentException("Empty buffer before end of s tream.");
239 }
240 if (mWriteState != STATE_WAITING_ON_WRITE) {
241 throw new IllegalStateException("Unexpected write attempt.");
242 }
243 if (isDone()) {
244 return;
245 }
246 mWriteState = endOfStream ? STATE_WRITING_END_OF_STREAM : STATE_WRIT ING;
247 if (!nativeWriteData(
248 mNativeStream, buffer, buffer.position(), buffer.limit() , endOfStream)) {
249 // Still waiting on write. This is just to have consistent
250 // behavior with the other error cases.
251 mWriteState = STATE_WAITING_ON_WRITE;
252 // Since accessing byteBuffer's memory failed, it's presumably
253 // not a direct ByteBuffer.
254 throw new IllegalArgumentException("byteBuffer must be a direct ByteBuffer.");
255 }
256 }
257 }
258
259 @Override
260 public void ping(PingCallback callback, Executor executor) {
261 // TODO(mef): May be last thing to be implemented on Android.
262 }
263
264 @Override
265 public void windowUpdate(int windowSizeIncrement) {
266 // TODO(mef): Understand the needs and semantics of this method.
267 }
268
269 /**
270 * Cancels the stream. Can be called at any time after {@link #start}.
271 * {@link Callback#onCanceled} will be invoked when cancelation
272 * is complete and no further callback methods will be invoked. If the
273 * stream has completed or has not started, calling {@code cancel()} has no
274 * effect and {@code onCanceled} will not be invoked. If the
275 * {@link Executor} passed in during {@code BidirectionalStream} constructio n runs
276 * tasks on a single thread, and {@code cancel()} is called on that thread,
277 * no listener methods (besides {@code onCanceled()}) will be invoked after
278 * {@code cancel()} is called. Otherwise, at most one callback method may be
279 * invoked after {@code cancel()} has completed.
280 */
281 public void cancel() {
282 synchronized (mNativeStreamLock) {
283 if (isDone() || mStreamState == STATE_NOT_STARTED) {
284 return;
285 }
286 mStreamState = STATE_CANCELED;
287 destroyNativeStream(true);
288 }
289 }
290
291 @Override
292 public boolean isDone() {
293 synchronized (mNativeStreamLock) {
294 return mStreamState != STATE_NOT_STARTED && mNativeStream == 0;
295 }
296 }
297
298 @SuppressWarnings("unused")
299 @CalledByNative
300 private void onRequestHeadersSent() {
301 Runnable task = new Runnable() {
302 public void run() {
303 synchronized (mNativeStreamLock) {
304 if (isDone()) {
305 return;
306 }
307 if (mInitialMethod == "GET") {
308 mWriteState = STATE_WRITING_DONE;
xunjieli 2015/12/08 17:59:56 After applying your latest suggested edit, client
mef 2015/12/11 21:28:26 Per our discussion requiring at least one write is
309 } else {
310 mWriteState = STATE_WAITING_ON_WRITE;
311 }
312 }
313
314 try {
315 mCallback.onRequestHeadersSent(CronetBidirectionalStream.thi s);
316 } catch (Exception e) {
317 onCallbackException(e);
318 }
319 }
320 };
321 postTaskToExecutor(task);
322 }
323
324 /**
325 * Called when the final set of headers, after all redirects,
326 * is received. Can only be called once for each stream.
327 */
328 @SuppressWarnings("unused")
329 @CalledByNative
330 private void onResponseHeadersReceived(int httpStatusCode, String[] headers) {
331 mResponseInfo = prepareResponseInfoOnNetworkThread(httpStatusCode, heade rs);
332 Runnable task = new Runnable() {
333 public void run() {
334 synchronized (mNativeStreamLock) {
335 if (isDone()) {
336 return;
337 }
338 mStreamState = STATE_WAITING_ON_READ;
339 }
340
341 try {
342 mCallback.onResponseHeadersReceived(
343 CronetBidirectionalStream.this, mResponseInfo);
344 } catch (Exception e) {
345 onCallbackException(e);
346 }
347 }
348 };
349 postTaskToExecutor(task);
350 }
351
352 @SuppressWarnings("unused")
353 @CalledByNative
354 private void onReadCompleted(final ByteBuffer byteBuffer, int bytesRead, int initialPosition,
355 long receivedBytesCount) {
356 mResponseInfo.setReceivedBytesCount(receivedBytesCount);
357 if (byteBuffer.position() != initialPosition) {
358 failWithException(
359 new CronetException("ByteBuffer modified externally during r ead", null));
360 return;
361 }
362 if (bytesRead < 0 || initialPosition + bytesRead > byteBuffer.limit()) {
363 failWithException(new CronetException("Invalid number of bytes read" , null));
364 return;
365 }
366 if (mOnReadCompletedTask == null) {
367 mOnReadCompletedTask = new OnReadCompletedRunnable();
368 }
369 byteBuffer.position(initialPosition + bytesRead);
370 mOnReadCompletedTask.mByteBuffer = byteBuffer;
371 mOnReadCompletedTask.mEndOfStream = (bytesRead == 0);
372 postTaskToExecutor(mOnReadCompletedTask);
373 }
374
375 @SuppressWarnings("unused")
376 @CalledByNative
377 private void onWriteCompleted(final ByteBuffer byteBuffer, int initialPositi on) {
378 if (byteBuffer.position() != initialPosition) {
379 failWithException(
380 new CronetException("ByteBuffer modified externally during w rite", null));
381 return;
382 }
383 if (mOnWriteCompletedTask == null) {
384 mOnWriteCompletedTask = new OnWriteCompletedRunnable();
385 }
386 // Current implementation always writes the complete buffer.
387 byteBuffer.position(byteBuffer.limit());
388 mOnWriteCompletedTask.mByteBuffer = byteBuffer;
389 postTaskToExecutor(mOnWriteCompletedTask);
390 }
391
392 @SuppressWarnings("unused")
393 @CalledByNative
394 private void onResponseTrailersReceived(String[] trailers) {
395 final UrlResponseInfo.HeaderBlock trailersBlock =
396 new UrlResponseInfo.HeaderBlock(headersListFromStrings(trailers) );
397 Runnable task = new Runnable() {
398 public void run() {
399 synchronized (mNativeStreamLock) {
400 if (isDone()) {
401 return;
402 }
403 }
404 try {
405 mCallback.onResponseTrailersReceived(
406 CronetBidirectionalStream.this, mResponseInfo, trail ersBlock);
407 } catch (Exception e) {
408 onCallbackException(e);
409 }
410 }
411 };
412 postTaskToExecutor(task);
413 }
414
415 private void onSucceeded() {
416 Runnable task = new Runnable() {
417 public void run() {
418 synchronized (mNativeStreamLock) {
419 if (isDone()) {
420 return;
421 }
422 // Destroy native stream first, so request context could be shut
423 // down from the listener.
424 destroyNativeStream(false);
425 }
426 try {
427 mCallback.onSucceeded(CronetBidirectionalStream.this, mRespo nseInfo);
428 } catch (Exception e) {
429 Log.e(CronetUrlRequestContext.LOG_TAG, "Exception in onSucce eded method", e);
430 }
431 }
432 };
433 postTaskToExecutor(task);
434 }
435
436 @SuppressWarnings("unused")
437 @CalledByNative
438 private void onError(final int nativeError, final String errorString, long r eceivedBytesCount) {
439 if (mResponseInfo != null) {
440 mResponseInfo.setReceivedBytesCount(receivedBytesCount);
441 }
442 failWithException(new CronetException(
443 "Exception in BidirectionalStream: " + errorString, nativeError) );
444 }
445
446 /**
447 * Called when request is canceled, no callbacks will be called afterwards.
448 */
449 @SuppressWarnings("unused")
450 @CalledByNative
451 private void onCanceled() {
452 Runnable task = new Runnable() {
453 public void run() {
454 try {
455 mCallback.onCanceled(CronetBidirectionalStream.this, mRespon seInfo);
456 } catch (Exception e) {
457 Log.e(CronetUrlRequestContext.LOG_TAG, "Exception in onCance led method", e);
458 }
459 }
460 };
461 postTaskToExecutor(task);
462 }
463
464 @VisibleForTesting
465 public void setOnDestroyedCallbackForTests(Runnable onDestroyedCallbackForTe sts) {
466 mOnDestroyedCallbackForTests = onDestroyedCallbackForTests;
467 }
468
469 /**
470 * Posts task to application Executor. Used for callbacks
471 * and other tasks that should not be executed on network thread.
472 */
473 private void postTaskToExecutor(Runnable task) {
474 try {
475 mExecutor.execute(task);
476 } catch (RejectedExecutionException failException) {
477 Log.e(CronetUrlRequestContext.LOG_TAG, "Exception posting task to ex ecutor",
478 failException);
479 // If posting a task throws an exception, then there is no choice
480 // but to cancel the stream.
481 cancel();
482 }
483 }
484
485 private ArrayList<Map.Entry<String, String>> headersListFromStrings(String[] headers) {
486 ArrayList<Map.Entry<String, String>> headersList =
487 new ArrayList<Map.Entry<String, String>>();
488 for (int i = 0; i < headers.length; i += 2) {
489 headersList.add(new AbstractMap.SimpleImmutableEntry<String, String> (
490 headers[i], headers[i + 1]));
491 }
492 return headersList;
493 }
494
495 private String[] stringsFromHeaderList(ArrayList<Map.Entry<String, String>> headersList) {
496 String headersArray[] = new String[headersList.size() * 2];
497 int i = 0;
498 for (Map.Entry<String, String> requestHeader : headersList) {
499 headersArray[i++] = requestHeader.getKey();
500 headersArray[i++] = requestHeader.getValue();
501 }
502 return headersArray;
503 }
504
505 private UrlResponseInfo prepareResponseInfoOnNetworkThread(
506 int httpStatusCode, String[] headers) {
507 long nativeStream;
508 synchronized (mNativeStreamLock) {
509 if (mNativeStream == 0) {
510 return null;
511 }
512 // This method is running on network thread, so even if
513 // mUrlRequestAdapter is set to 0 from another thread the actual
514 // deletion of the adapter is posted to network thread, so it is
515 // safe to preserve and use urlRequestAdapter outside the lock.
516 nativeStream = mNativeStream;
517 }
518
519 ArrayList<String> urlChain = new ArrayList<String>();
520 urlChain.add(mInitialUrl);
521
522 boolean wasCached = false;
523 String httpStatusText = "";
524 String negotiatedProtocol = nativeGetNegotiatedProtocol(nativeStream);
525 String proxyServer = null;
526
527 UrlResponseInfo responseInfo = new UrlResponseInfo(urlChain, httpStatusC ode, httpStatusText,
528 headersListFromStrings(headers), wasCached, negotiatedProtocol, proxyServer);
529 return responseInfo;
530 }
531
532 private void destroyNativeStream(boolean sendOnCanceled) {
533 synchronized (mNativeStreamLock) {
534 Log.i(CronetUrlRequestContext.LOG_TAG, "destroyNativeStream " + this .toString());
535 if (mNativeStream == 0) {
536 return;
537 }
538 nativeDestroy(mNativeStream, sendOnCanceled);
539 mRequestContext.onRequestDestroyed();
540 mNativeStream = 0;
541 if (mOnDestroyedCallbackForTests != null) {
542 mOnDestroyedCallbackForTests.run();
543 }
544 }
545 }
546
547 /**
548 * If callback method throws an exception, stream gets canceled
549 * and exception is reported via onFailed callback.
550 * Only called on the Executor.
551 */
552 private void onCallbackException(Exception e) {
553 CronetException streamError =
554 new CronetException("CalledByNative method has thrown an excepti on", e);
555 Log.e(CronetUrlRequestContext.LOG_TAG, "Exception in CalledByNative meth od", e);
556 // Do not call into listener if request is complete.
557 synchronized (mNativeStreamLock) {
558 if (isDone()) {
559 return;
560 }
561 destroyNativeStream(false);
562 }
563 try {
564 mCallback.onFailed(this, mResponseInfo, streamError);
565 } catch (Exception failException) {
566 Log.e(CronetUrlRequestContext.LOG_TAG, "Exception notifying of faile d request",
567 failException);
568 }
569 }
570
571 /**
572 * Fails the stream with an exception. Can be called on any thread.
573 */
574 private void failWithException(final CronetException exception) {
575 Runnable task = new Runnable() {
576 public void run() {
577 synchronized (mNativeStreamLock) {
578 if (isDone()) {
579 return;
580 }
581 mStreamState = STATE_ERROR;
582 destroyNativeStream(false);
583 }
584 try {
585 mCallback.onFailed(CronetBidirectionalStream.this, mResponse Info, exception);
586 } catch (Exception e) {
587 Log.e(CronetUrlRequestContext.LOG_TAG, "Exception in onError method", e);
588 }
589 }
590 };
591 postTaskToExecutor(task);
592 }
593
594 // Native methods are implemented in cronet_bidirectional_stream.cc.
595 private native long nativeCreateBidirectionalStream(long urlRequestContextAd apter);
596
597 @NativeClassQualifiedName("CronetBidirectionalStream")
598 private native int nativeStart(long nativePtr, String url, String method, St ring[] headers);
599
600 @NativeClassQualifiedName("CronetBidirectionalStream")
601 private native boolean nativeReadData(
602 long nativePtr, ByteBuffer byteBuffer, int position, int capacity);
603
604 @NativeClassQualifiedName("CronetBidirectionalStream")
605 private native boolean nativeWriteData(
606 long nativePtr, ByteBuffer byteBuffer, int position, int capacity, b oolean endOfStream);
607
608 @NativeClassQualifiedName("CronetBidirectionalStream")
609 private native void nativeDestroy(long nativePtr, boolean sendOnCanceled);
610
611 @NativeClassQualifiedName("CronetBidirectionalStream")
612 private native String nativeGetNegotiatedProtocol(long nativePtr);
613 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698