Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(91)

Side by Side Diff: components/cronet/android/java/src/org/chromium/net/CronetBidirectionalStream.java

Issue 1992953004: [Cronet] Make delaying sending request headers explicit in bidirectional stream (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Address Andrei's comment and self review Created 4 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package org.chromium.net; 5 package org.chromium.net;
6 6
7 import org.chromium.base.Log; 7 import org.chromium.base.Log;
8 import org.chromium.base.VisibleForTesting; 8 import org.chromium.base.VisibleForTesting;
9 import org.chromium.base.annotations.CalledByNative; 9 import org.chromium.base.annotations.CalledByNative;
10 import org.chromium.base.annotations.JNINamespace; 10 import org.chromium.base.annotations.JNINamespace;
(...skipping 23 matching lines...) Expand all
34 * States of BidirectionalStream are tracked in mReadState and mWriteState. 34 * States of BidirectionalStream are tracked in mReadState and mWriteState.
35 * The write state is separated out as it changes independently of the read state. 35 * The write state is separated out as it changes independently of the read state.
36 * There is one initial state: State.NOT_STARTED. There is one normal final state: 36 * There is one initial state: State.NOT_STARTED. There is one normal final state:
37 * State.SUCCESS, reached after State.READING_DONE and State.WRITING_DONE. T here are two 37 * State.SUCCESS, reached after State.READING_DONE and State.WRITING_DONE. T here are two
38 * exceptional final states: State.CANCELED and State.ERROR, which can be re ached from 38 * exceptional final states: State.CANCELED and State.ERROR, which can be re ached from
39 * any other non-final state. 39 * any other non-final state.
40 */ 40 */
41 private enum State { 41 private enum State {
42 /* Initial state, stream not started. */ 42 /* Initial state, stream not started. */
43 NOT_STARTED, 43 NOT_STARTED,
44 /* Stream started, request headers are being sent. */ 44 /*
45 * Stream started, request headers are being sent if mDelayRequestHeader sUntilNextFlush
46 * is not set to true.
47 */
45 STARTED, 48 STARTED,
46 /* Waiting for {@code read()} to be called. */ 49 /* Waiting for {@code read()} to be called. */
47 WAITING_FOR_READ, 50 WAITING_FOR_READ,
48 /* Reading from the remote, {@code onReadCompleted()} callback will be c alled when done. */ 51 /* Reading from the remote, {@code onReadCompleted()} callback will be c alled when done. */
49 READING, 52 READING,
50 /* There is no more data to read and stream is half-closed by the remote side. */ 53 /* There is no more data to read and stream is half-closed by the remote side. */
51 READING_DONE, 54 READING_DONE,
52 /* Stream is canceled. */ 55 /* Stream is canceled. */
53 CANCELED, 56 CANCELED,
54 /* Error has occured, stream is closed. */ 57 /* Error has occured, stream is closed. */
55 ERROR, 58 ERROR,
56 /* Reading and writing are done, and the stream is closed successfully. */ 59 /* Reading and writing are done, and the stream is closed successfully. */
57 SUCCESS, 60 SUCCESS,
58 /* Waiting for {@code nativeWritevData()} to be called. */ 61 /* Waiting for {@code nativeSendRequestHeaders()} or {@code nativeWritev Data()} to be
59 WAITING_FOR_WRITE, 62 called. */
63 WAITING_FOR_FLUSH,
60 /* Writing to the remote, {@code onWritevCompleted()} callback will be c alled when done. */ 64 /* Writing to the remote, {@code onWritevCompleted()} callback will be c alled when done. */
61 WRITING, 65 WRITING,
62 /* There is no more data to write and stream is half-closed by the local side. */ 66 /* There is no more data to write and stream is half-closed by the local side. */
63 WRITING_DONE, 67 WRITING_DONE,
64 } 68 }
65 69
66 private final CronetUrlRequestContext mRequestContext; 70 private final CronetUrlRequestContext mRequestContext;
67 private final Executor mExecutor; 71 private final Executor mExecutor;
68 private final Callback mCallback; 72 private final Callback mCallback;
69 private final String mInitialUrl; 73 private final String mInitialUrl;
70 private final int mInitialPriority; 74 private final int mInitialPriority;
71 private final String mInitialMethod; 75 private final String mInitialMethod;
72 private final String mRequestHeaders[]; 76 private final String mRequestHeaders[];
73 private final boolean mDisableAutoFlush; 77 private final boolean mDisableAutoFlush;
78 private final boolean mDelayHeadersUntilNextFlush;
74 79
75 /* 80 /*
76 * Synchronizes access to mNativeStream, mReadState and mWriteState. 81 * Synchronizes access to mNativeStream, mReadState and mWriteState.
77 */ 82 */
78 private final Object mNativeStreamLock = new Object(); 83 private final Object mNativeStreamLock = new Object();
79 84
80 @GuardedBy("mNativeStreamLock") 85 @GuardedBy("mNativeStreamLock")
81 // Pending write data. 86 // Pending write data.
82 private LinkedList<ByteBuffer> mPendingData; 87 private LinkedList<ByteBuffer> mPendingData;
83 88
84 @GuardedBy("mNativeStreamLock") 89 @GuardedBy("mNativeStreamLock")
85 // Flush data queue that should be pushed to the native stack when the previ ous 90 // Flush data queue that should be pushed to the native stack when the previ ous
86 // nativeWritevData completes. 91 // nativeWritevData completes.
87 private LinkedList<ByteBuffer> mFlushData; 92 private LinkedList<ByteBuffer> mFlushData;
88 93
89 @GuardedBy("mNativeStreamLock") 94 @GuardedBy("mNativeStreamLock")
90 // Whether an end-of-stream flag is passed in through write(). 95 // Whether an end-of-stream flag is passed in through write().
91 private boolean mEndOfStreamWritten; 96 private boolean mEndOfStreamWritten;
92 97
98 @GuardedBy("mNativeStreamLock")
99 // Whether request headers have been flushed.
100 private boolean mRequestHeadersFlushed;
mef 2016/06/01 21:33:21 nit: requestHeadersFlushed => requestHeadersSent t
xunjieli 2016/06/01 22:27:16 Done.
101
93 /* Native BidirectionalStream object, owned by CronetBidirectionalStream. */ 102 /* Native BidirectionalStream object, owned by CronetBidirectionalStream. */
94 @GuardedBy("mNativeStreamLock") 103 @GuardedBy("mNativeStreamLock")
95 private long mNativeStream; 104 private long mNativeStream;
96 105
97 /** 106 /**
98 * Read state is tracking reading flow. 107 * Read state is tracking reading flow.
99 * / <--- READING <--- \ 108 * / <--- READING <--- \
100 * | | 109 * | |
101 * \ / 110 * \ /
102 * NOT_STARTED -> STARTED --> WAITING_FOR_READ -> READING_DONE -> SUCCESS 111 * NOT_STARTED -> STARTED --> WAITING_FOR_READ -> READING_DONE -> SUCCESS
103 */ 112 */
104 @GuardedBy("mNativeStreamLock") 113 @GuardedBy("mNativeStreamLock")
105 private State mReadState = State.NOT_STARTED; 114 private State mReadState = State.NOT_STARTED;
106 115
107 /** 116 /**
108 * Write state is tracking writing flow. 117 * Write state is tracking writing flow.
109 * / <--- WRITING <--- \ 118 * / <--- WRITING <--- \
110 * | | 119 * | |
111 * \ / 120 * \ /
112 * NOT_STARTED -> STARTED --> WAITING_FOR_WRITE -> WRITING_DONE -> SUCCESS 121 * NOT_STARTED -> STARTED --> WAITING_FOR_FLUSH -> WRITING_DONE -> SUCCESS
113 */ 122 */
114 @GuardedBy("mNativeStreamLock") 123 @GuardedBy("mNativeStreamLock")
115 private State mWriteState = State.NOT_STARTED; 124 private State mWriteState = State.NOT_STARTED;
116 125
117 // Only modified on the network thread. 126 // Only modified on the network thread.
118 private UrlResponseInfo mResponseInfo; 127 private UrlResponseInfo mResponseInfo;
119 128
120 /* 129 /*
121 * OnReadCompleted callback is repeatedly invoked when each read is complete d, so it 130 * OnReadCompleted callback is repeatedly invoked when each read is complete d, so it
122 * is cached as a member variable. 131 * is cached as a member variable.
(...skipping 72 matching lines...) Expand 10 before | Expand all | Expand 10 after
195 } 204 }
196 } catch (Exception e) { 205 } catch (Exception e) {
197 onCallbackException(e); 206 onCallbackException(e);
198 } 207 }
199 } 208 }
200 } 209 }
201 210
202 CronetBidirectionalStream(CronetUrlRequestContext requestContext, String url , 211 CronetBidirectionalStream(CronetUrlRequestContext requestContext, String url ,
203 @BidirectionalStream.Builder.StreamPriority int priority, Callback c allback, 212 @BidirectionalStream.Builder.StreamPriority int priority, Callback c allback,
204 Executor executor, String httpMethod, List<Map.Entry<String, String> > requestHeaders, 213 Executor executor, String httpMethod, List<Map.Entry<String, String> > requestHeaders,
205 boolean disableAutoFlush) { 214 boolean disableAutoFlush, boolean delayRequestHeadersUntilNextFlush) {
206 mRequestContext = requestContext; 215 mRequestContext = requestContext;
207 mInitialUrl = url; 216 mInitialUrl = url;
208 mInitialPriority = convertStreamPriority(priority); 217 mInitialPriority = convertStreamPriority(priority);
209 mCallback = callback; 218 mCallback = callback;
210 mExecutor = executor; 219 mExecutor = executor;
211 mInitialMethod = httpMethod; 220 mInitialMethod = httpMethod;
212 mRequestHeaders = stringsFromHeaderList(requestHeaders); 221 mRequestHeaders = stringsFromHeaderList(requestHeaders);
213 mDisableAutoFlush = disableAutoFlush; 222 mDisableAutoFlush = disableAutoFlush;
223 mDelayHeadersUntilNextFlush = delayRequestHeadersUntilNextFlush;
214 mPendingData = new LinkedList<>(); 224 mPendingData = new LinkedList<>();
215 mFlushData = new LinkedList<>(); 225 mFlushData = new LinkedList<>();
216 } 226 }
217 227
218 @Override 228 @Override
219 public void start() { 229 public void start() {
220 synchronized (mNativeStreamLock) { 230 synchronized (mNativeStreamLock) {
221 if (mReadState != State.NOT_STARTED) { 231 if (mReadState != State.NOT_STARTED) {
222 throw new IllegalStateException("Stream is already started."); 232 throw new IllegalStateException("Stream is already started.");
223 } 233 }
224 try { 234 try {
225 mNativeStream = nativeCreateBidirectionalStream( 235 mNativeStream = nativeCreateBidirectionalStream(
226 mRequestContext.getUrlRequestContextAdapter(), mDisableA utoFlush); 236 mRequestContext.getUrlRequestContextAdapter(),
237 !mDelayHeadersUntilNextFlush);
227 mRequestContext.onRequestStarted(); 238 mRequestContext.onRequestStarted();
228 // Non-zero startResult means an argument error. 239 // Non-zero startResult means an argument error.
229 int startResult = nativeStart(mNativeStream, mInitialUrl, mIniti alPriority, 240 int startResult = nativeStart(mNativeStream, mInitialUrl, mIniti alPriority,
230 mInitialMethod, mRequestHeaders, !doesMethodAllowWriteDa ta(mInitialMethod)); 241 mInitialMethod, mRequestHeaders, !doesMethodAllowWriteDa ta(mInitialMethod));
231 if (startResult == -1) { 242 if (startResult == -1) {
232 throw new IllegalArgumentException("Invalid http method " + mInitialMethod); 243 throw new IllegalArgumentException("Invalid http method " + mInitialMethod);
233 } 244 }
234 if (startResult > 0) { 245 if (startResult > 0) {
235 int headerPos = startResult - 1; 246 int headerPos = startResult - 1;
236 throw new IllegalArgumentException("Invalid header " 247 throw new IllegalArgumentException("Invalid header "
(...skipping 58 matching lines...) Expand 10 before | Expand all | Expand 10 after
295 306
296 @Override 307 @Override
297 public void flush() { 308 public void flush() {
298 synchronized (mNativeStreamLock) { 309 synchronized (mNativeStreamLock) {
299 flushLocked(); 310 flushLocked();
300 } 311 }
301 } 312 }
302 313
303 @SuppressWarnings("GuardedByChecker") 314 @SuppressWarnings("GuardedByChecker")
304 private void flushLocked() { 315 private void flushLocked() {
305 if (isDoneLocked()) { 316 if (isDoneLocked()
317 || (mWriteState != State.WAITING_FOR_FLUSH && mWriteState != Sta te.WRITING)) {
306 return; 318 return;
307 } 319 }
308 if (mPendingData.isEmpty() && mFlushData.isEmpty()) { 320 if (mPendingData.isEmpty() && mFlushData.isEmpty()) {
309 // No-op if there is nothing to write. 321 // If there is no pending write when flush() is called, see if
322 // request headers need to be flushed.
323 if (!mRequestHeadersFlushed) {
324 mRequestHeadersFlushed = true;
325 nativeSendRequestHeaders(mNativeStream);
326 if (!doesMethodAllowWriteData(mInitialMethod)) {
327 mWriteState = State.WRITING_DONE;
328 }
329 }
310 return; 330 return;
311 } 331 }
312 332
333 assert !(mPendingData.isEmpty() && mFlushData.isEmpty());
mef 2016/06/01 21:33:20 nit: maybe restate as "assert !mPendingData.isEmpt
xunjieli 2016/06/01 22:27:16 Done.
334
313 // Move buffers from mPendingData to the flushing queue. 335 // Move buffers from mPendingData to the flushing queue.
314 if (!mPendingData.isEmpty()) { 336 if (!mPendingData.isEmpty()) {
315 mFlushData.addAll(mPendingData); 337 mFlushData.addAll(mPendingData);
316 mPendingData.clear(); 338 mPendingData.clear();
317 } 339 }
318 340
319 if (mWriteState == State.WRITING) { 341 if (mWriteState == State.WRITING) {
320 // If there is a write already pending, wait until onWritevCompleted is 342 // If there is a write already pending, wait until onWritevCompleted is
321 // called before pushing data to the native stack. 343 // called before pushing data to the native stack.
322 return; 344 return;
323 } 345 }
346 assert mWriteState == State.WAITING_FOR_FLUSH;
324 int size = mFlushData.size(); 347 int size = mFlushData.size();
325 ByteBuffer[] buffers = new ByteBuffer[size]; 348 ByteBuffer[] buffers = new ByteBuffer[size];
326 int[] positions = new int[size]; 349 int[] positions = new int[size];
327 int[] limits = new int[size]; 350 int[] limits = new int[size];
328 for (int i = 0; i < size; i++) { 351 for (int i = 0; i < size; i++) {
329 ByteBuffer buffer = mFlushData.poll(); 352 ByteBuffer buffer = mFlushData.poll();
330 buffers[i] = buffer; 353 buffers[i] = buffer;
331 positions[i] = buffer.position(); 354 positions[i] = buffer.position();
332 limits[i] = buffer.limit(); 355 limits[i] = buffer.limit();
333 } 356 }
334 assert mFlushData.isEmpty(); 357 assert mFlushData.isEmpty();
358 assert buffers.length >= 1;
335 mWriteState = State.WRITING; 359 mWriteState = State.WRITING;
336 if (!nativeWritevData(mNativeStream, buffers, positions, limits, mEndOfS treamWritten)) { 360 if (!nativeWritevData(mNativeStream, buffers, positions, limits, mEndOfS treamWritten)) {
337 // Still waiting on write. This is just to have consistent 361 // Still waiting on write. This is just to have consistent
mef 2016/06/01 21:33:21 nit: on write -> for flush
xunjieli 2016/06/01 22:27:16 Done.
338 // behavior with the other error cases. 362 // behavior with the other error cases.
339 mWriteState = State.WAITING_FOR_WRITE; 363 mWriteState = State.WAITING_FOR_FLUSH;
340 throw new IllegalArgumentException("Unable to call native writev."); 364 throw new IllegalArgumentException("Unable to call native writev.");
341 } 365 }
342 } 366 }
343 367
344 @Override 368 @Override
345 public void ping(PingCallback callback, Executor executor) { 369 public void ping(PingCallback callback, Executor executor) {
346 // TODO(mef): May be last thing to be implemented on Android. 370 // TODO(mef): May be last thing to be implemented on Android.
347 throw new UnsupportedOperationException("ping is not supported yet."); 371 throw new UnsupportedOperationException("ping is not supported yet.");
348 } 372 }
349 373
(...skipping 44 matching lines...) Expand 10 before | Expand all | Expand 10 after
394 } 418 }
395 try { 419 try {
396 mCallback.onSucceeded(CronetBidirectionalStream.this, mResponseInfo) ; 420 mCallback.onSucceeded(CronetBidirectionalStream.this, mResponseInfo) ;
397 } catch (Exception e) { 421 } catch (Exception e) {
398 Log.e(CronetUrlRequestContext.LOG_TAG, "Exception in onSucceeded met hod", e); 422 Log.e(CronetUrlRequestContext.LOG_TAG, "Exception in onSucceeded met hod", e);
399 } 423 }
400 } 424 }
401 425
402 @SuppressWarnings("unused") 426 @SuppressWarnings("unused")
403 @CalledByNative 427 @CalledByNative
404 private void onStreamReady() { 428 private void onStreamReady(final boolean requestHeadersFlushed) {
405 postTaskToExecutor(new Runnable() { 429 postTaskToExecutor(new Runnable() {
406 public void run() { 430 public void run() {
407 synchronized (mNativeStreamLock) { 431 synchronized (mNativeStreamLock) {
408 if (isDoneLocked()) { 432 if (isDoneLocked()) {
409 return; 433 return;
410 } 434 }
411 if (doesMethodAllowWriteData(mInitialMethod)) { 435 mRequestHeadersFlushed = requestHeadersFlushed;
412 mWriteState = State.WAITING_FOR_WRITE; 436 mReadState = State.WAITING_FOR_READ;
413 mReadState = State.WAITING_FOR_READ; 437 if (!doesMethodAllowWriteData(mInitialMethod) && mRequestHea dersFlushed) {
438 mWriteState = State.WRITING_DONE;
414 } else { 439 } else {
415 mWriteState = State.WRITING_DONE; 440 mWriteState = State.WAITING_FOR_FLUSH;
416 } 441 }
417 } 442 }
418 443
419 try { 444 try {
420 mCallback.onStreamReady(CronetBidirectionalStream.this); 445 mCallback.onStreamReady(CronetBidirectionalStream.this);
421 } catch (Exception e) { 446 } catch (Exception e) {
422 onCallbackException(e); 447 onCallbackException(e);
423 } 448 }
424 } 449 }
425 }); 450 });
(...skipping 54 matching lines...) Expand 10 before | Expand all | Expand 10 after
480 postTaskToExecutor(mOnReadCompletedTask); 505 postTaskToExecutor(mOnReadCompletedTask);
481 } 506 }
482 507
483 @SuppressWarnings("unused") 508 @SuppressWarnings("unused")
484 @CalledByNative 509 @CalledByNative
485 private void onWritevCompleted(final ByteBuffer[] byteBuffers, int[] initial Positions, 510 private void onWritevCompleted(final ByteBuffer[] byteBuffers, int[] initial Positions,
486 int[] initialLimits, boolean endOfStream) { 511 int[] initialLimits, boolean endOfStream) {
487 assert byteBuffers.length == initialPositions.length; 512 assert byteBuffers.length == initialPositions.length;
488 assert byteBuffers.length == initialLimits.length; 513 assert byteBuffers.length == initialLimits.length;
489 synchronized (mNativeStreamLock) { 514 synchronized (mNativeStreamLock) {
490 mWriteState = State.WAITING_FOR_WRITE; 515 mWriteState = State.WAITING_FOR_FLUSH;
491 // Flush if there is anything in the flush queue mFlushData. 516 // Flush if there is anything in the flush queue mFlushData.
492 if (!mFlushData.isEmpty()) { 517 if (!mFlushData.isEmpty()) {
493 flushLocked(); 518 flushLocked();
494 } 519 }
495 } 520 }
496 for (int i = 0; i < byteBuffers.length; i++) { 521 for (int i = 0; i < byteBuffers.length; i++) {
497 ByteBuffer buffer = byteBuffers[i]; 522 ByteBuffer buffer = byteBuffers[i];
498 if (buffer.position() != initialPositions[i] || buffer.limit() != in itialLimits[i]) { 523 if (buffer.position() != initialPositions[i] || buffer.limit() != in itialLimits[i]) {
499 failWithException( 524 failWithException(
500 new CronetException("ByteBuffer modified externally duri ng write", null)); 525 new CronetException("ByteBuffer modified externally duri ng write", null));
(...skipping 180 matching lines...) Expand 10 before | Expand all | Expand 10 after
681 private void failWithException(final CronetException exception) { 706 private void failWithException(final CronetException exception) {
682 postTaskToExecutor(new Runnable() { 707 postTaskToExecutor(new Runnable() {
683 public void run() { 708 public void run() {
684 failWithExceptionOnExecutor(exception); 709 failWithExceptionOnExecutor(exception);
685 } 710 }
686 }); 711 });
687 } 712 }
688 713
689 // Native methods are implemented in cronet_bidirectional_stream_adapter.cc. 714 // Native methods are implemented in cronet_bidirectional_stream_adapter.cc.
690 private native long nativeCreateBidirectionalStream( 715 private native long nativeCreateBidirectionalStream(
691 long urlRequestContextAdapter, boolean disableAutoFlush); 716 long urlRequestContextAdapter, boolean sendRequestHeadersAutomatical ly);
692 717
693 @NativeClassQualifiedName("CronetBidirectionalStreamAdapter") 718 @NativeClassQualifiedName("CronetBidirectionalStreamAdapter")
694 private native int nativeStart(long nativePtr, String url, int priority, Str ing method, 719 private native int nativeStart(long nativePtr, String url, int priority, Str ing method,
695 String[] headers, boolean endOfStream); 720 String[] headers, boolean endOfStream);
696 721
697 @NativeClassQualifiedName("CronetBidirectionalStreamAdapter") 722 @NativeClassQualifiedName("CronetBidirectionalStreamAdapter")
723 private native void nativeSendRequestHeaders(long nativePtr);
724
725 @NativeClassQualifiedName("CronetBidirectionalStreamAdapter")
698 private native boolean nativeReadData( 726 private native boolean nativeReadData(
699 long nativePtr, ByteBuffer byteBuffer, int position, int limit); 727 long nativePtr, ByteBuffer byteBuffer, int position, int limit);
700 728
701 @NativeClassQualifiedName("CronetBidirectionalStreamAdapter") 729 @NativeClassQualifiedName("CronetBidirectionalStreamAdapter")
702 private native boolean nativeWritevData(long nativePtr, ByteBuffer[] buffers , int[] positions, 730 private native boolean nativeWritevData(long nativePtr, ByteBuffer[] buffers , int[] positions,
703 int[] limits, boolean endOfStream); 731 int[] limits, boolean endOfStream);
704 732
705 @NativeClassQualifiedName("CronetBidirectionalStreamAdapter") 733 @NativeClassQualifiedName("CronetBidirectionalStreamAdapter")
706 private native void nativeDestroy(long nativePtr, boolean sendOnCanceled); 734 private native void nativeDestroy(long nativePtr, boolean sendOnCanceled);
707 } 735 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698