Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package org.chromium.net; | 5 package org.chromium.net; |
| 6 | 6 |
| 7 import org.chromium.base.Log; | 7 import org.chromium.base.Log; |
| 8 import org.chromium.base.VisibleForTesting; | 8 import org.chromium.base.VisibleForTesting; |
| 9 import org.chromium.base.annotations.CalledByNative; | 9 import org.chromium.base.annotations.CalledByNative; |
| 10 import org.chromium.base.annotations.JNINamespace; | 10 import org.chromium.base.annotations.JNINamespace; |
| (...skipping 66 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 77 private final boolean mDisableAutoFlush; | 77 private final boolean mDisableAutoFlush; |
| 78 private final boolean mDelayRequestHeadersUntilFirstFlush; | 78 private final boolean mDelayRequestHeadersUntilFirstFlush; |
| 79 | 79 |
| 80 /* | 80 /* |
| 81 * Synchronizes access to mNativeStream, mReadState and mWriteState. | 81 * Synchronizes access to mNativeStream, mReadState and mWriteState. |
| 82 */ | 82 */ |
| 83 private final Object mNativeStreamLock = new Object(); | 83 private final Object mNativeStreamLock = new Object(); |
| 84 | 84 |
| 85 @GuardedBy("mNativeStreamLock") | 85 @GuardedBy("mNativeStreamLock") |
| 86 // Pending write data. | 86 // Pending write data. |
| 87 private LinkedList<ByteBuffer> mPendingData; | 87 private LinkedList<ByteBuffer> mPendingQueue; |
|
mef
2016/06/21 21:12:27
Any particular reason for XyzData -> XyzQueue rena
xunjieli
2016/06/22 14:26:47
I felt "Data" is a bit overloaded. I changed it to
mef
2016/06/24 19:36:30
Yeah, unfortunately Queue is also very generic, an
| |
| 88 | 88 |
| 89 @GuardedBy("mNativeStreamLock") | 89 @GuardedBy("mNativeStreamLock") |
| 90 // Flush data queue that should be pushed to the native stack when the previ ous | 90 // Flush data queue that should be pushed to the native stack when the previ ous |
| 91 // nativeWritevData completes. | 91 // nativeWritevData completes. |
| 92 private LinkedList<ByteBuffer> mFlushData; | 92 private LinkedList<ByteBuffer> mFlushQueue; |
| 93 | 93 |
| 94 @GuardedBy("mNativeStreamLock") | 94 @GuardedBy("mNativeStreamLock") |
| 95 // Whether an end-of-stream flag is passed in through write(). | 95 // Whether an end-of-stream flag is passed in through write(). |
| 96 private boolean mEndOfStreamWritten; | 96 private boolean mEndOfStreamWritten; |
| 97 | 97 |
| 98 @GuardedBy("mNativeStreamLock") | 98 @GuardedBy("mNativeStreamLock") |
| 99 // Whether request headers have been sent. | 99 // Whether request headers have been sent. |
| 100 private boolean mRequestHeadersSent; | 100 private boolean mRequestHeadersSent; |
| 101 | 101 |
| 102 /* Native BidirectionalStream object, owned by CronetBidirectionalStream. */ | 102 /* Native BidirectionalStream object, owned by CronetBidirectionalStream. */ |
| (...skipping 111 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 214 boolean disableAutoFlush, boolean delayRequestHeadersUntilNextFlush) { | 214 boolean disableAutoFlush, boolean delayRequestHeadersUntilNextFlush) { |
| 215 mRequestContext = requestContext; | 215 mRequestContext = requestContext; |
| 216 mInitialUrl = url; | 216 mInitialUrl = url; |
| 217 mInitialPriority = convertStreamPriority(priority); | 217 mInitialPriority = convertStreamPriority(priority); |
| 218 mCallback = callback; | 218 mCallback = callback; |
| 219 mExecutor = executor; | 219 mExecutor = executor; |
| 220 mInitialMethod = httpMethod; | 220 mInitialMethod = httpMethod; |
| 221 mRequestHeaders = stringsFromHeaderList(requestHeaders); | 221 mRequestHeaders = stringsFromHeaderList(requestHeaders); |
| 222 mDisableAutoFlush = disableAutoFlush; | 222 mDisableAutoFlush = disableAutoFlush; |
| 223 mDelayRequestHeadersUntilFirstFlush = delayRequestHeadersUntilNextFlush; | 223 mDelayRequestHeadersUntilFirstFlush = delayRequestHeadersUntilNextFlush; |
| 224 mPendingData = new LinkedList<>(); | 224 mPendingQueue = new LinkedList<>(); |
| 225 mFlushData = new LinkedList<>(); | 225 mFlushQueue = new LinkedList<>(); |
| 226 } | 226 } |
| 227 | 227 |
| 228 @Override | 228 @Override |
| 229 public void start() { | 229 public void start() { |
| 230 synchronized (mNativeStreamLock) { | 230 synchronized (mNativeStreamLock) { |
| 231 if (mReadState != State.NOT_STARTED) { | 231 if (mReadState != State.NOT_STARTED) { |
| 232 throw new IllegalStateException("Stream is already started."); | 232 throw new IllegalStateException("Stream is already started."); |
| 233 } | 233 } |
| 234 try { | 234 try { |
| 235 mNativeStream = nativeCreateBidirectionalStream( | 235 mNativeStream = nativeCreateBidirectionalStream( |
| (...skipping 51 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 287 Preconditions.checkDirect(buffer); | 287 Preconditions.checkDirect(buffer); |
| 288 if (!buffer.hasRemaining() && !endOfStream) { | 288 if (!buffer.hasRemaining() && !endOfStream) { |
| 289 throw new IllegalArgumentException("Empty buffer before end of s tream."); | 289 throw new IllegalArgumentException("Empty buffer before end of s tream."); |
| 290 } | 290 } |
| 291 if (mEndOfStreamWritten) { | 291 if (mEndOfStreamWritten) { |
| 292 throw new IllegalArgumentException("Write after writing end of s tream."); | 292 throw new IllegalArgumentException("Write after writing end of s tream."); |
| 293 } | 293 } |
| 294 if (isDoneLocked()) { | 294 if (isDoneLocked()) { |
| 295 return; | 295 return; |
| 296 } | 296 } |
| 297 mPendingData.add(buffer); | 297 mPendingQueue.add(buffer); |
| 298 if (endOfStream) { | 298 if (endOfStream) { |
| 299 mEndOfStreamWritten = true; | 299 mEndOfStreamWritten = true; |
| 300 } | 300 } |
| 301 if (!mDisableAutoFlush) { | 301 if (!mDisableAutoFlush) { |
| 302 flushLocked(); | 302 flushLocked(); |
| 303 } | 303 } |
| 304 } | 304 } |
| 305 } | 305 } |
| 306 | 306 |
| 307 @Override | 307 @Override |
| 308 public void flush() { | 308 public void flush() { |
| 309 synchronized (mNativeStreamLock) { | 309 synchronized (mNativeStreamLock) { |
| 310 flushLocked(); | 310 flushLocked(); |
| 311 } | 311 } |
| 312 } | 312 } |
| 313 | 313 |
| 314 @SuppressWarnings("GuardedByChecker") | 314 @SuppressWarnings("GuardedByChecker") |
| 315 private void flushLocked() { | 315 private void flushLocked() { |
| 316 if (isDoneLocked() | 316 if (isDoneLocked() |
| 317 || (mWriteState != State.WAITING_FOR_FLUSH && mWriteState != Sta te.WRITING)) { | 317 || (mWriteState != State.WAITING_FOR_FLUSH && mWriteState != Sta te.WRITING)) { |
| 318 return; | 318 return; |
| 319 } | 319 } |
| 320 if (mPendingData.isEmpty() && mFlushData.isEmpty()) { | 320 if (mPendingQueue.isEmpty() && mFlushQueue.isEmpty()) { |
| 321 // If there is no pending write when flush() is called, see if | 321 // If there is no pending write when flush() is called, see if |
| 322 // request headers need to be flushed. | 322 // request headers need to be flushed. |
| 323 if (!mRequestHeadersSent) { | 323 if (!mRequestHeadersSent) { |
| 324 mRequestHeadersSent = true; | 324 mRequestHeadersSent = true; |
| 325 nativeSendRequestHeaders(mNativeStream); | 325 nativeSendRequestHeaders(mNativeStream); |
| 326 if (!doesMethodAllowWriteData(mInitialMethod)) { | 326 if (!doesMethodAllowWriteData(mInitialMethod)) { |
| 327 mWriteState = State.WRITING_DONE; | 327 mWriteState = State.WRITING_DONE; |
| 328 } | 328 } |
| 329 } | 329 } |
| 330 return; | 330 return; |
| 331 } | 331 } |
| 332 | 332 |
| 333 assert !mPendingData.isEmpty() || !mFlushData.isEmpty(); | 333 assert !mPendingQueue.isEmpty() || !mFlushQueue.isEmpty(); |
| 334 | 334 |
| 335 // Move buffers from mPendingData to the flushing queue. | 335 // Move buffers from mPendingQueue to the flushing queue. |
| 336 if (!mPendingData.isEmpty()) { | 336 if (!mPendingQueue.isEmpty()) { |
| 337 mFlushData.addAll(mPendingData); | 337 mFlushQueue.addAll(mPendingQueue); |
| 338 mPendingData.clear(); | 338 mPendingQueue.clear(); |
| 339 } | 339 } |
| 340 | 340 |
| 341 if (mWriteState == State.WRITING) { | 341 if (mWriteState == State.WRITING) { |
| 342 // If there is a write already pending, wait until onWritevCompleted is | 342 // If there is a write already pending, wait until onWritevCompleted is |
| 343 // called before pushing data to the native stack. | 343 // called before pushing data to the native stack. |
| 344 return; | 344 return; |
| 345 } | 345 } |
| 346 sendFlushDataLocked(); | |
| 347 } | |
| 348 | |
| 349 // Helper method to send buffers in mFlushQueue. Caller needs to acquire | |
| 350 // mNativeStreamLock and make sure mWriteState is WAITING_FOR_FLUSH and | |
| 351 // mFlushQueue queue isn't empty. | |
| 352 @SuppressWarnings("GuardedByChecker") | |
| 353 private void sendFlushDataLocked() { | |
| 346 assert mWriteState == State.WAITING_FOR_FLUSH; | 354 assert mWriteState == State.WAITING_FOR_FLUSH; |
| 347 int size = mFlushData.size(); | 355 int size = mFlushQueue.size(); |
| 348 ByteBuffer[] buffers = new ByteBuffer[size]; | 356 ByteBuffer[] buffers = new ByteBuffer[size]; |
| 349 int[] positions = new int[size]; | 357 int[] positions = new int[size]; |
| 350 int[] limits = new int[size]; | 358 int[] limits = new int[size]; |
| 351 for (int i = 0; i < size; i++) { | 359 for (int i = 0; i < size; i++) { |
| 352 ByteBuffer buffer = mFlushData.poll(); | 360 ByteBuffer buffer = mFlushQueue.poll(); |
| 353 buffers[i] = buffer; | 361 buffers[i] = buffer; |
| 354 positions[i] = buffer.position(); | 362 positions[i] = buffer.position(); |
| 355 limits[i] = buffer.limit(); | 363 limits[i] = buffer.limit(); |
| 356 } | 364 } |
| 357 assert mFlushData.isEmpty(); | 365 assert mFlushQueue.isEmpty(); |
| 358 assert buffers.length >= 1; | 366 assert buffers.length >= 1; |
| 359 mWriteState = State.WRITING; | 367 mWriteState = State.WRITING; |
| 360 if (!nativeWritevData(mNativeStream, buffers, positions, limits, mEndOfS treamWritten)) { | 368 if (!nativeWritevData(mNativeStream, buffers, positions, limits, |
| 369 mEndOfStreamWritten && mPendingQueue.isEmpty())) { | |
| 361 // Still waiting on flush. This is just to have consistent | 370 // Still waiting on flush. This is just to have consistent |
| 362 // behavior with the other error cases. | 371 // behavior with the other error cases. |
| 363 mWriteState = State.WAITING_FOR_FLUSH; | 372 mWriteState = State.WAITING_FOR_FLUSH; |
| 364 throw new IllegalArgumentException("Unable to call native writev."); | 373 throw new IllegalArgumentException("Unable to call native writev."); |
| 365 } | 374 } |
| 366 } | 375 } |
| 367 | 376 |
| 377 @VisibleForTesting | |
| 378 public int getPendingQueueSizeForTesting() { | |
| 379 synchronized (mNativeStreamLock) { | |
| 380 return mPendingQueue.size(); | |
| 381 } | |
| 382 } | |
| 383 | |
| 384 @VisibleForTesting | |
| 385 public int getFlushQueueSizeForTesting() { | |
| 386 synchronized (mNativeStreamLock) { | |
| 387 return mFlushQueue.size(); | |
| 388 } | |
| 389 } | |
| 390 | |
| 368 @Override | 391 @Override |
| 369 public void ping(PingCallback callback, Executor executor) { | 392 public void ping(PingCallback callback, Executor executor) { |
| 370 // TODO(mef): May be last thing to be implemented on Android. | 393 // TODO(mef): May be last thing to be implemented on Android. |
| 371 throw new UnsupportedOperationException("ping is not supported yet."); | 394 throw new UnsupportedOperationException("ping is not supported yet."); |
| 372 } | 395 } |
| 373 | 396 |
| 374 @Override | 397 @Override |
| 375 public void windowUpdate(int windowSizeIncrement) { | 398 public void windowUpdate(int windowSizeIncrement) { |
| 376 // TODO(mef): Understand the needs and semantics of this method. | 399 // TODO(mef): Understand the needs and semantics of this method. |
| 377 throw new UnsupportedOperationException("windowUpdate is not supported y et."); | 400 throw new UnsupportedOperationException("windowUpdate is not supported y et."); |
| (...skipping 128 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 506 } | 529 } |
| 507 | 530 |
| 508 @SuppressWarnings("unused") | 531 @SuppressWarnings("unused") |
| 509 @CalledByNative | 532 @CalledByNative |
| 510 private void onWritevCompleted(final ByteBuffer[] byteBuffers, int[] initial Positions, | 533 private void onWritevCompleted(final ByteBuffer[] byteBuffers, int[] initial Positions, |
| 511 int[] initialLimits, boolean endOfStream) { | 534 int[] initialLimits, boolean endOfStream) { |
| 512 assert byteBuffers.length == initialPositions.length; | 535 assert byteBuffers.length == initialPositions.length; |
| 513 assert byteBuffers.length == initialLimits.length; | 536 assert byteBuffers.length == initialLimits.length; |
| 514 synchronized (mNativeStreamLock) { | 537 synchronized (mNativeStreamLock) { |
| 515 mWriteState = State.WAITING_FOR_FLUSH; | 538 mWriteState = State.WAITING_FOR_FLUSH; |
| 516 // Flush if there is anything in the flush queue mFlushData. | 539 // Flush if there is anything in the flush queue mFlushQueue. |
| 517 if (!mFlushData.isEmpty()) { | 540 if (!mFlushQueue.isEmpty()) { |
| 518 flushLocked(); | 541 sendFlushDataLocked(); |
| 519 } | 542 } |
| 520 } | 543 } |
| 521 for (int i = 0; i < byteBuffers.length; i++) { | 544 for (int i = 0; i < byteBuffers.length; i++) { |
| 522 ByteBuffer buffer = byteBuffers[i]; | 545 ByteBuffer buffer = byteBuffers[i]; |
| 523 if (buffer.position() != initialPositions[i] || buffer.limit() != in itialLimits[i]) { | 546 if (buffer.position() != initialPositions[i] || buffer.limit() != in itialLimits[i]) { |
| 524 failWithException( | 547 failWithException( |
| 525 new CronetException("ByteBuffer modified externally duri ng write", null)); | 548 new CronetException("ByteBuffer modified externally duri ng write", null)); |
| 526 return; | 549 return; |
| 527 } | 550 } |
| 528 // Current implementation always writes the complete buffer. | 551 // Current implementation always writes the complete buffer. |
| 529 buffer.position(buffer.limit()); | 552 buffer.position(buffer.limit()); |
| 530 postTaskToExecutor(new OnWriteCompletedRunnable(buffer, endOfStream) ); | 553 postTaskToExecutor(new OnWriteCompletedRunnable( |
| 554 buffer, endOfStream && i == byteBuffers.length - 1)); | |
|
mef
2016/06/21 21:12:27
this is a bit hard to understand, maybe it deserve
xunjieli
2016/06/22 14:26:47
Done.
| |
| 531 } | 555 } |
| 532 } | 556 } |
| 533 | 557 |
| 534 @SuppressWarnings("unused") | 558 @SuppressWarnings("unused") |
| 535 @CalledByNative | 559 @CalledByNative |
| 536 private void onResponseTrailersReceived(String[] trailers) { | 560 private void onResponseTrailersReceived(String[] trailers) { |
| 537 final UrlResponseInfo.HeaderBlock trailersBlock = | 561 final UrlResponseInfo.HeaderBlock trailersBlock = |
| 538 new UrlResponseInfo.HeaderBlock(headersListFromStrings(trailers) ); | 562 new UrlResponseInfo.HeaderBlock(headersListFromStrings(trailers) ); |
| 539 postTaskToExecutor(new Runnable() { | 563 postTaskToExecutor(new Runnable() { |
| 540 public void run() { | 564 public void run() { |
| (...skipping 185 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 726 private native boolean nativeReadData( | 750 private native boolean nativeReadData( |
| 727 long nativePtr, ByteBuffer byteBuffer, int position, int limit); | 751 long nativePtr, ByteBuffer byteBuffer, int position, int limit); |
| 728 | 752 |
| 729 @NativeClassQualifiedName("CronetBidirectionalStreamAdapter") | 753 @NativeClassQualifiedName("CronetBidirectionalStreamAdapter") |
| 730 private native boolean nativeWritevData(long nativePtr, ByteBuffer[] buffers , int[] positions, | 754 private native boolean nativeWritevData(long nativePtr, ByteBuffer[] buffers , int[] positions, |
| 731 int[] limits, boolean endOfStream); | 755 int[] limits, boolean endOfStream); |
| 732 | 756 |
| 733 @NativeClassQualifiedName("CronetBidirectionalStreamAdapter") | 757 @NativeClassQualifiedName("CronetBidirectionalStreamAdapter") |
| 734 private native void nativeDestroy(long nativePtr, boolean sendOnCanceled); | 758 private native void nativeDestroy(long nativePtr, boolean sendOnCanceled); |
| 735 } | 759 } |
| OLD | NEW |