| OLD | NEW |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package org.chromium.net; | 5 package org.chromium.net; |
| 6 | 6 |
| 7 import android.os.ConditionVariable; | 7 import android.os.ConditionVariable; |
| 8 | 8 |
| 9 import static junit.framework.Assert.assertEquals; | 9 import static junit.framework.Assert.assertEquals; |
| 10 import static junit.framework.Assert.assertFalse; | 10 import static junit.framework.Assert.assertFalse; |
| 11 import static junit.framework.Assert.assertNull; | 11 import static junit.framework.Assert.assertNull; |
| 12 import static junit.framework.Assert.assertTrue; | 12 import static junit.framework.Assert.assertTrue; |
| 13 | 13 |
| 14 import java.nio.ByteBuffer; | 14 import java.nio.ByteBuffer; |
| 15 import java.util.ArrayList; | 15 import java.util.ArrayList; |
| 16 import java.util.Iterator; |
| 16 import java.util.concurrent.Executor; | 17 import java.util.concurrent.Executor; |
| 17 import java.util.concurrent.ExecutorService; | 18 import java.util.concurrent.ExecutorService; |
| 18 import java.util.concurrent.Executors; | 19 import java.util.concurrent.Executors; |
| 19 import java.util.concurrent.ThreadFactory; | 20 import java.util.concurrent.ThreadFactory; |
| 20 | 21 |
| 21 /** | 22 /** |
| 22 * Callback that tracks information from different callbacks and and has a | 23 * Callback that tracks information from different callbacks and and has a |
| 23 * method to block thread until the stream completes on another thread. | 24 * method to block thread until the stream completes on another thread. |
| 24 * Allows to cancel, block stream or throw an exception from an arbitrary step. | 25 * Allows to cancel, block stream or throw an exception from an arbitrary step. |
| 25 */ | 26 */ |
| 26 public class TestBidirectionalStreamCallback extends BidirectionalStream.Callbac
k { | 27 public class TestBidirectionalStreamCallback extends BidirectionalStream.Callbac
k { |
| 27 public UrlResponseInfo mResponseInfo; | 28 public UrlResponseInfo mResponseInfo; |
| 28 public CronetException mError; | 29 public CronetException mError; |
| 29 | 30 |
| 30 public ResponseStep mResponseStep = ResponseStep.NOTHING; | 31 public ResponseStep mResponseStep = ResponseStep.NOTHING; |
| 31 | 32 |
| 32 public boolean mOnErrorCalled = false; | 33 public boolean mOnErrorCalled = false; |
| 33 public boolean mOnCanceledCalled = false; | 34 public boolean mOnCanceledCalled = false; |
| 34 | 35 |
| 35 public int mHttpResponseDataLength = 0; | 36 public int mHttpResponseDataLength = 0; |
| 36 public String mResponseAsString = ""; | 37 public String mResponseAsString = ""; |
| 37 | 38 |
| 38 public UrlResponseInfo.HeaderBlock mTrailers; | 39 public UrlResponseInfo.HeaderBlock mTrailers; |
| 39 | 40 |
| 40 private static final int READ_BUFFER_SIZE = 32 * 1024; | 41 private static final int READ_BUFFER_SIZE = 32 * 1024; |
| 41 | 42 |
| 43 private boolean mHandleFlush; |
| 42 // When false, the consumer is responsible for all calls into the stream | 44 // When false, the consumer is responsible for all calls into the stream |
| 43 // that advance it. | 45 // that advance it. |
| 44 private boolean mAutoAdvance = true; | 46 private boolean mAutoAdvance = true; |
| 45 | 47 |
| 46 // Conditionally fail on certain steps. | 48 // Conditionally fail on certain steps. |
| 47 private FailureType mFailureType = FailureType.NONE; | 49 private FailureType mFailureType = FailureType.NONE; |
| 48 private ResponseStep mFailureStep = ResponseStep.NOTHING; | 50 private ResponseStep mFailureStep = ResponseStep.NOTHING; |
| 49 | 51 |
| 50 // Signals when the stream is done either successfully or not. | 52 // Signals when the stream is done either successfully or not. |
| 51 private final ConditionVariable mDone = new ConditionVariable(); | 53 private final ConditionVariable mDone = new ConditionVariable(); |
| 52 | 54 |
| 53 // Signaled on each step when mAutoAdvance is false. | 55 // Signaled on each step when mAutoAdvance is false. |
| 54 private final ConditionVariable mReadStepBlock = new ConditionVariable(); | 56 private final ConditionVariable mReadStepBlock = new ConditionVariable(); |
| 55 private final ConditionVariable mWriteStepBlock = new ConditionVariable(); | 57 private final ConditionVariable mWriteStepBlock = new ConditionVariable(); |
| 56 | 58 |
| 57 // Executor Service for Cronet callbacks. | 59 // Executor Service for Cronet callbacks. |
| 58 private final ExecutorService mExecutorService = | 60 private final ExecutorService mExecutorService = |
| 59 Executors.newSingleThreadExecutor(new ExecutorThreadFactory()); | 61 Executors.newSingleThreadExecutor(new ExecutorThreadFactory()); |
| 60 private Thread mExecutorThread; | 62 private Thread mExecutorThread; |
| 61 | 63 |
| 62 // position() of ByteBuffer prior to read() call. | 64 // position() of ByteBuffer prior to read() call. |
| 63 private int mBufferPositionBeforeRead; | 65 private int mBufferPositionBeforeRead; |
| 64 | 66 |
| 65 // Data to write. | 67 // Data to write. |
| 66 private ArrayList<ByteBuffer> mWriteBuffers = new ArrayList<ByteBuffer>(); | 68 private ArrayList<WriteBuffer> mWriteBuffers = new ArrayList<WriteBuffer>(); |
| 69 |
| 70 // Buffers that we yet to receive the corresponding onWriteCompleted callbac
k. |
| 71 private ArrayList<WriteBuffer> mWriteBuffersToBeAcked = new ArrayList<WriteB
uffer>(); |
| 67 | 72 |
| 68 private class ExecutorThreadFactory implements ThreadFactory { | 73 private class ExecutorThreadFactory implements ThreadFactory { |
| 69 public Thread newThread(Runnable r) { | 74 public Thread newThread(Runnable r) { |
| 70 mExecutorThread = new Thread(r); | 75 mExecutorThread = new Thread(r); |
| 71 return mExecutorThread; | 76 return mExecutorThread; |
| 72 } | 77 } |
| 73 } | 78 } |
| 74 | 79 |
| 80 private static class WriteBuffer { |
| 81 final ByteBuffer mBuffer; |
| 82 final boolean mFlush; |
| 83 public WriteBuffer(ByteBuffer buffer, boolean flush) { |
| 84 mBuffer = buffer; |
| 85 mFlush = flush; |
| 86 } |
| 87 } |
| 88 |
| 75 public enum ResponseStep { | 89 public enum ResponseStep { |
| 76 NOTHING, | 90 NOTHING, |
| 77 ON_REQUEST_HEADERS_SENT, | 91 ON_STREAM_READY, |
| 78 ON_RESPONSE_STARTED, | 92 ON_RESPONSE_STARTED, |
| 79 ON_READ_COMPLETED, | 93 ON_READ_COMPLETED, |
| 80 ON_WRITE_COMPLETED, | 94 ON_WRITE_COMPLETED, |
| 81 ON_TRAILERS, | 95 ON_TRAILERS, |
| 82 ON_CANCELED, | 96 ON_CANCELED, |
| 83 ON_FAILED, | 97 ON_FAILED, |
| 84 ON_SUCCEEDED | 98 ON_SUCCEEDED |
| 85 } | 99 } |
| 86 | 100 |
| 87 public enum FailureType { | 101 public enum FailureType { |
| 88 NONE, | 102 NONE, |
| 89 CANCEL_SYNC, | 103 CANCEL_SYNC, |
| 90 CANCEL_ASYNC, | 104 CANCEL_ASYNC, |
| 91 // Same as above, but continues to advance the stream after posting | 105 // Same as above, but continues to advance the stream after posting |
| 92 // the cancellation task. | 106 // the cancellation task. |
| 93 CANCEL_ASYNC_WITHOUT_PAUSE, | 107 CANCEL_ASYNC_WITHOUT_PAUSE, |
| 94 THROW_SYNC | 108 THROW_SYNC |
| 95 } | 109 } |
| 96 | 110 |
| 111 public void setHandleFlush() { |
| 112 mHandleFlush = true; |
| 113 } |
| 114 |
| 97 public void setAutoAdvance(boolean autoAdvance) { | 115 public void setAutoAdvance(boolean autoAdvance) { |
| 98 mAutoAdvance = autoAdvance; | 116 mAutoAdvance = autoAdvance; |
| 99 } | 117 } |
| 100 | 118 |
| 101 public void setFailure(FailureType failureType, ResponseStep failureStep) { | 119 public void setFailure(FailureType failureType, ResponseStep failureStep) { |
| 102 mFailureStep = failureStep; | 120 mFailureStep = failureStep; |
| 103 mFailureType = failureType; | 121 mFailureType = failureType; |
| 104 } | 122 } |
| 105 | 123 |
| 106 public void blockForDone() { | 124 public void blockForDone() { |
| (...skipping 12 matching lines...) Expand all Loading... |
| 119 | 137 |
| 120 public Executor getExecutor() { | 138 public Executor getExecutor() { |
| 121 return mExecutorService; | 139 return mExecutorService; |
| 122 } | 140 } |
| 123 | 141 |
| 124 public void shutdownExecutor() { | 142 public void shutdownExecutor() { |
| 125 mExecutorService.shutdown(); | 143 mExecutorService.shutdown(); |
| 126 } | 144 } |
| 127 | 145 |
| 128 public void addWriteData(byte[] data) { | 146 public void addWriteData(byte[] data) { |
| 147 addWriteData(data, false); |
| 148 } |
| 149 |
| 150 public void addWriteData(byte[] data, boolean flush) { |
| 129 ByteBuffer writeBuffer = ByteBuffer.allocateDirect(data.length); | 151 ByteBuffer writeBuffer = ByteBuffer.allocateDirect(data.length); |
| 130 writeBuffer.put(data); | 152 writeBuffer.put(data); |
| 131 writeBuffer.flip(); | 153 writeBuffer.flip(); |
| 132 mWriteBuffers.add(writeBuffer); | 154 if (flush && !mHandleFlush) { |
| 155 throw new IllegalArgumentException("cannot add flush when auto flush
is on"); |
| 156 } |
| 157 mWriteBuffers.add(new WriteBuffer(writeBuffer, flush)); |
| 158 mWriteBuffersToBeAcked.add(new WriteBuffer(writeBuffer, flush)); |
| 133 } | 159 } |
| 134 | 160 |
| 135 @Override | 161 @Override |
| 136 public void onRequestHeadersSent(BidirectionalStream stream) { | 162 public void onStreamReady(BidirectionalStream stream) { |
| 137 assertEquals(mExecutorThread, Thread.currentThread()); | 163 assertEquals(mExecutorThread, Thread.currentThread()); |
| 138 assertFalse(stream.isDone()); | 164 assertFalse(stream.isDone()); |
| 139 assertEquals(ResponseStep.NOTHING, mResponseStep); | 165 assertEquals(ResponseStep.NOTHING, mResponseStep); |
| 140 assertNull(mError); | 166 assertNull(mError); |
| 141 | 167 |
| 142 mResponseStep = ResponseStep.ON_REQUEST_HEADERS_SENT; | 168 mResponseStep = ResponseStep.ON_STREAM_READY; |
| 143 if (maybeThrowCancelOrPause(stream, mWriteStepBlock)) { | 169 if (maybeThrowCancelOrPause(stream, mWriteStepBlock)) { |
| 144 return; | 170 return; |
| 145 } | 171 } |
| 146 startNextWrite(stream); | 172 startNextWrite(stream); |
| 147 } | 173 } |
| 148 | 174 |
| 149 @Override | 175 @Override |
| 150 public void onResponseHeadersReceived(BidirectionalStream stream, UrlRespons
eInfo info) { | 176 public void onResponseHeadersReceived(BidirectionalStream stream, UrlRespons
eInfo info) { |
| 151 assertEquals(mExecutorThread, Thread.currentThread()); | 177 assertEquals(mExecutorThread, Thread.currentThread()); |
| 152 assertFalse(stream.isDone()); | 178 assertFalse(stream.isDone()); |
| 153 assertTrue(mResponseStep == ResponseStep.NOTHING | 179 assertTrue(mResponseStep == ResponseStep.NOTHING |
| 154 || mResponseStep == ResponseStep.ON_REQUEST_HEADERS_SENT | 180 || mResponseStep == ResponseStep.ON_STREAM_READY |
| 155 || mResponseStep == ResponseStep.ON_WRITE_COMPLETED); | 181 || mResponseStep == ResponseStep.ON_WRITE_COMPLETED); |
| 156 assertNull(mError); | 182 assertNull(mError); |
| 157 | 183 |
| 158 mResponseStep = ResponseStep.ON_RESPONSE_STARTED; | 184 mResponseStep = ResponseStep.ON_RESPONSE_STARTED; |
| 159 mResponseInfo = info; | 185 mResponseInfo = info; |
| 160 if (maybeThrowCancelOrPause(stream, mReadStepBlock)) { | 186 if (maybeThrowCancelOrPause(stream, mReadStepBlock)) { |
| 161 return; | 187 return; |
| 162 } | 188 } |
| 163 startNextRead(stream); | 189 startNextRead(stream); |
| 164 } | 190 } |
| (...skipping 28 matching lines...) Expand all Loading... |
| 193 startNextRead(stream); | 219 startNextRead(stream); |
| 194 } | 220 } |
| 195 | 221 |
| 196 @Override | 222 @Override |
| 197 public void onWriteCompleted( | 223 public void onWriteCompleted( |
| 198 BidirectionalStream stream, UrlResponseInfo info, ByteBuffer buffer)
{ | 224 BidirectionalStream stream, UrlResponseInfo info, ByteBuffer buffer)
{ |
| 199 assertEquals(mExecutorThread, Thread.currentThread()); | 225 assertEquals(mExecutorThread, Thread.currentThread()); |
| 200 assertFalse(stream.isDone()); | 226 assertFalse(stream.isDone()); |
| 201 assertNull(mError); | 227 assertNull(mError); |
| 202 mResponseStep = ResponseStep.ON_WRITE_COMPLETED; | 228 mResponseStep = ResponseStep.ON_WRITE_COMPLETED; |
| 203 if (!mWriteBuffers.isEmpty()) { | 229 if (!mWriteBuffersToBeAcked.isEmpty()) { |
| 204 assertEquals(buffer, mWriteBuffers.get(0)); | 230 assertEquals(buffer, mWriteBuffersToBeAcked.get(0).mBuffer); |
| 205 mWriteBuffers.remove(0); | 231 mWriteBuffersToBeAcked.remove(0); |
| 206 } | 232 } |
| 207 if (maybeThrowCancelOrPause(stream, mWriteStepBlock)) { | 233 if (maybeThrowCancelOrPause(stream, mWriteStepBlock)) { |
| 208 return; | 234 return; |
| 209 } | 235 } |
| 210 startNextWrite(stream); | 236 startNextWrite(stream); |
| 211 } | 237 } |
| 212 | 238 |
| 213 @Override | 239 @Override |
| 214 public void onResponseTrailersReceived(BidirectionalStream stream, UrlRespon
seInfo info, | 240 public void onResponseTrailersReceived(BidirectionalStream stream, UrlRespon
seInfo info, |
| 215 UrlResponseInfo.HeaderBlock trailers) { | 241 UrlResponseInfo.HeaderBlock trailers) { |
| (...skipping 11 matching lines...) Expand all Loading... |
| 227 public void onSucceeded(BidirectionalStream stream, UrlResponseInfo info) { | 253 public void onSucceeded(BidirectionalStream stream, UrlResponseInfo info) { |
| 228 assertEquals(mExecutorThread, Thread.currentThread()); | 254 assertEquals(mExecutorThread, Thread.currentThread()); |
| 229 assertTrue(stream.isDone()); | 255 assertTrue(stream.isDone()); |
| 230 assertTrue(mResponseStep == ResponseStep.ON_RESPONSE_STARTED | 256 assertTrue(mResponseStep == ResponseStep.ON_RESPONSE_STARTED |
| 231 || mResponseStep == ResponseStep.ON_READ_COMPLETED | 257 || mResponseStep == ResponseStep.ON_READ_COMPLETED |
| 232 || mResponseStep == ResponseStep.ON_WRITE_COMPLETED | 258 || mResponseStep == ResponseStep.ON_WRITE_COMPLETED |
| 233 || mResponseStep == ResponseStep.ON_TRAILERS); | 259 || mResponseStep == ResponseStep.ON_TRAILERS); |
| 234 assertFalse(mOnErrorCalled); | 260 assertFalse(mOnErrorCalled); |
| 235 assertFalse(mOnCanceledCalled); | 261 assertFalse(mOnCanceledCalled); |
| 236 assertNull(mError); | 262 assertNull(mError); |
| 263 assertEquals(0, mWriteBuffers.size()); |
| 264 assertEquals(0, mWriteBuffersToBeAcked.size()); |
| 237 | 265 |
| 238 mResponseStep = ResponseStep.ON_SUCCEEDED; | 266 mResponseStep = ResponseStep.ON_SUCCEEDED; |
| 239 mResponseInfo = info; | 267 mResponseInfo = info; |
| 240 openDone(); | 268 openDone(); |
| 241 maybeThrowCancelOrPause(stream, mReadStepBlock); | 269 maybeThrowCancelOrPause(stream, mReadStepBlock); |
| 242 } | 270 } |
| 243 | 271 |
| 244 @Override | 272 @Override |
| 245 public void onFailed(BidirectionalStream stream, UrlResponseInfo info, Crone
tException error) { | 273 public void onFailed(BidirectionalStream stream, UrlResponseInfo info, Crone
tException error) { |
| 246 assertEquals(mExecutorThread, Thread.currentThread()); | 274 assertEquals(mExecutorThread, Thread.currentThread()); |
| (...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 278 startNextRead(stream, ByteBuffer.allocateDirect(READ_BUFFER_SIZE)); | 306 startNextRead(stream, ByteBuffer.allocateDirect(READ_BUFFER_SIZE)); |
| 279 } | 307 } |
| 280 | 308 |
| 281 public void startNextRead(BidirectionalStream stream, ByteBuffer buffer) { | 309 public void startNextRead(BidirectionalStream stream, ByteBuffer buffer) { |
| 282 mBufferPositionBeforeRead = buffer.position(); | 310 mBufferPositionBeforeRead = buffer.position(); |
| 283 stream.read(buffer); | 311 stream.read(buffer); |
| 284 } | 312 } |
| 285 | 313 |
| 286 public void startNextWrite(BidirectionalStream stream) { | 314 public void startNextWrite(BidirectionalStream stream) { |
| 287 if (!mWriteBuffers.isEmpty()) { | 315 if (!mWriteBuffers.isEmpty()) { |
| 288 boolean isLastBuffer = mWriteBuffers.size() == 1; | 316 if (mHandleFlush) { |
| 289 stream.write(mWriteBuffers.get(0), isLastBuffer); | 317 Iterator<WriteBuffer> iterator = mWriteBuffers.iterator(); |
| 318 while (iterator.hasNext()) { |
| 319 WriteBuffer b = iterator.next(); |
| 320 stream.write(b.mBuffer, !iterator.hasNext()); |
| 321 iterator.remove(); |
| 322 if (b.mFlush) { |
| 323 stream.flush(); |
| 324 break; |
| 325 } |
| 326 } |
| 327 } else { |
| 328 boolean isLastBuffer = (mWriteBuffers.size() == 1); |
| 329 stream.write(mWriteBuffers.get(0).mBuffer, isLastBuffer); |
| 330 mWriteBuffers.remove(0); |
| 331 } |
| 290 } | 332 } |
| 291 } | 333 } |
| 292 | 334 |
| 293 public boolean isDone() { | 335 public boolean isDone() { |
| 294 // It's not mentioned by the Android docs, but block(0) seems to block | 336 // It's not mentioned by the Android docs, but block(0) seems to block |
| 295 // indefinitely, so have to block for one millisecond to get state | 337 // indefinitely, so have to block for one millisecond to get state |
| 296 // without blocking. | 338 // without blocking. |
| 297 return mDone.block(1); | 339 return mDone.block(1); |
| 298 } | 340 } |
| 299 | 341 |
| (...skipping 25 matching lines...) Expand all Loading... |
| 325 }; | 367 }; |
| 326 if (mFailureType == FailureType.CANCEL_ASYNC | 368 if (mFailureType == FailureType.CANCEL_ASYNC |
| 327 || mFailureType == FailureType.CANCEL_ASYNC_WITHOUT_PAUSE) { | 369 || mFailureType == FailureType.CANCEL_ASYNC_WITHOUT_PAUSE) { |
| 328 getExecutor().execute(task); | 370 getExecutor().execute(task); |
| 329 } else { | 371 } else { |
| 330 task.run(); | 372 task.run(); |
| 331 } | 373 } |
| 332 return mFailureType != FailureType.CANCEL_ASYNC_WITHOUT_PAUSE; | 374 return mFailureType != FailureType.CANCEL_ASYNC_WITHOUT_PAUSE; |
| 333 } | 375 } |
| 334 } | 376 } |
| OLD | NEW |