OLD | NEW |
---|---|
1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 package org.chromium.net; | 5 package org.chromium.net; |
6 | 6 |
7 import android.os.ConditionVariable; | 7 import android.os.ConditionVariable; |
8 | 8 |
9 import static junit.framework.Assert.assertEquals; | 9 import static junit.framework.Assert.assertEquals; |
10 import static junit.framework.Assert.assertFalse; | 10 import static junit.framework.Assert.assertFalse; |
11 import static junit.framework.Assert.assertNull; | 11 import static junit.framework.Assert.assertNull; |
12 import static junit.framework.Assert.assertTrue; | 12 import static junit.framework.Assert.assertTrue; |
13 | 13 |
14 import java.nio.ByteBuffer; | 14 import java.nio.ByteBuffer; |
15 import java.util.ArrayList; | 15 import java.util.ArrayList; |
16 import java.util.Iterator; | |
16 import java.util.concurrent.Executor; | 17 import java.util.concurrent.Executor; |
17 import java.util.concurrent.ExecutorService; | 18 import java.util.concurrent.ExecutorService; |
18 import java.util.concurrent.Executors; | 19 import java.util.concurrent.Executors; |
19 import java.util.concurrent.ThreadFactory; | 20 import java.util.concurrent.ThreadFactory; |
20 | 21 |
21 /** | 22 /** |
22 * Callback that tracks information from different callbacks and and has a | 23 * Callback that tracks information from different callbacks and and has a |
23 * method to block thread until the stream completes on another thread. | 24 * method to block thread until the stream completes on another thread. |
24 * Allows to cancel, block stream or throw an exception from an arbitrary step. | 25 * Allows to cancel, block stream or throw an exception from an arbitrary step. |
25 */ | 26 */ |
26 public class TestBidirectionalStreamCallback extends BidirectionalStream.Callbac k { | 27 public class TestBidirectionalStreamCallback extends BidirectionalStream.Callbac k { |
27 public UrlResponseInfo mResponseInfo; | 28 public UrlResponseInfo mResponseInfo; |
28 public CronetException mError; | 29 public CronetException mError; |
29 | 30 |
30 public ResponseStep mResponseStep = ResponseStep.NOTHING; | 31 public ResponseStep mResponseStep = ResponseStep.NOTHING; |
31 | 32 |
32 public boolean mOnErrorCalled = false; | 33 public boolean mOnErrorCalled = false; |
33 public boolean mOnCanceledCalled = false; | 34 public boolean mOnCanceledCalled = false; |
34 | 35 |
35 public int mHttpResponseDataLength = 0; | 36 public int mHttpResponseDataLength = 0; |
36 public String mResponseAsString = ""; | 37 public String mResponseAsString = ""; |
37 | 38 |
38 public UrlResponseInfo.HeaderBlock mTrailers; | 39 public UrlResponseInfo.HeaderBlock mTrailers; |
39 | 40 |
40 private static final int READ_BUFFER_SIZE = 32 * 1024; | 41 private static final int READ_BUFFER_SIZE = 32 * 1024; |
41 | 42 |
43 private boolean mHandleFlush; | |
42 // When false, the consumer is responsible for all calls into the stream | 44 // When false, the consumer is responsible for all calls into the stream |
43 // that advance it. | 45 // that advance it. |
44 private boolean mAutoAdvance = true; | 46 private boolean mAutoAdvance = true; |
45 | 47 |
46 // Conditionally fail on certain steps. | 48 // Conditionally fail on certain steps. |
47 private FailureType mFailureType = FailureType.NONE; | 49 private FailureType mFailureType = FailureType.NONE; |
48 private ResponseStep mFailureStep = ResponseStep.NOTHING; | 50 private ResponseStep mFailureStep = ResponseStep.NOTHING; |
49 | 51 |
50 // Signals when the stream is done either successfully or not. | 52 // Signals when the stream is done either successfully or not. |
51 private final ConditionVariable mDone = new ConditionVariable(); | 53 private final ConditionVariable mDone = new ConditionVariable(); |
52 | 54 |
53 // Signaled on each step when mAutoAdvance is false. | 55 // Signaled on each step when mAutoAdvance is false. |
54 private final ConditionVariable mReadStepBlock = new ConditionVariable(); | 56 private final ConditionVariable mReadStepBlock = new ConditionVariable(); |
55 private final ConditionVariable mWriteStepBlock = new ConditionVariable(); | 57 private final ConditionVariable mWriteStepBlock = new ConditionVariable(); |
56 | 58 |
57 // Executor Service for Cronet callbacks. | 59 // Executor Service for Cronet callbacks. |
58 private final ExecutorService mExecutorService = | 60 private final ExecutorService mExecutorService = |
59 Executors.newSingleThreadExecutor(new ExecutorThreadFactory()); | 61 Executors.newSingleThreadExecutor(new ExecutorThreadFactory()); |
60 private Thread mExecutorThread; | 62 private Thread mExecutorThread; |
61 | 63 |
62 // position() of ByteBuffer prior to read() call. | 64 // position() of ByteBuffer prior to read() call. |
63 private int mBufferPositionBeforeRead; | 65 private int mBufferPositionBeforeRead; |
64 | 66 |
65 // Data to write. | 67 // Data to write. |
66 private ArrayList<ByteBuffer> mWriteBuffers = new ArrayList<ByteBuffer>(); | 68 private ArrayList<WriteBuffer> mWriteBuffers = new ArrayList<WriteBuffer>(); |
69 | |
70 // Buffers that we yet to receive the corresponding onWriteCompleted callbac k. | |
71 private ArrayList<WriteBuffer> mWriteBuffersToBeAcked = new ArrayList<WriteB uffer>(); | |
67 | 72 |
68 private class ExecutorThreadFactory implements ThreadFactory { | 73 private class ExecutorThreadFactory implements ThreadFactory { |
69 public Thread newThread(Runnable r) { | 74 public Thread newThread(Runnable r) { |
70 mExecutorThread = new Thread(r); | 75 mExecutorThread = new Thread(r); |
71 return mExecutorThread; | 76 return mExecutorThread; |
72 } | 77 } |
73 } | 78 } |
74 | 79 |
80 private static class WriteBuffer { | |
81 final ByteBuffer mBuffer; | |
82 final boolean mFlush; | |
83 public WriteBuffer(ByteBuffer buffer, boolean flush) { | |
84 mBuffer = buffer; | |
85 mFlush = flush; | |
86 } | |
87 } | |
88 | |
75 public enum ResponseStep { | 89 public enum ResponseStep { |
76 NOTHING, | 90 NOTHING, |
77 ON_REQUEST_HEADERS_SENT, | 91 ON_STREAM_READY, |
78 ON_RESPONSE_STARTED, | 92 ON_RESPONSE_STARTED, |
79 ON_READ_COMPLETED, | 93 ON_READ_COMPLETED, |
80 ON_WRITE_COMPLETED, | 94 ON_WRITE_COMPLETED, |
81 ON_TRAILERS, | 95 ON_TRAILERS, |
82 ON_CANCELED, | 96 ON_CANCELED, |
83 ON_FAILED, | 97 ON_FAILED, |
84 ON_SUCCEEDED | 98 ON_SUCCEEDED |
85 } | 99 } |
86 | 100 |
87 public enum FailureType { | 101 public enum FailureType { |
88 NONE, | 102 NONE, |
89 CANCEL_SYNC, | 103 CANCEL_SYNC, |
90 CANCEL_ASYNC, | 104 CANCEL_ASYNC, |
91 // Same as above, but continues to advance the stream after posting | 105 // Same as above, but continues to advance the stream after posting |
92 // the cancellation task. | 106 // the cancellation task. |
93 CANCEL_ASYNC_WITHOUT_PAUSE, | 107 CANCEL_ASYNC_WITHOUT_PAUSE, |
94 THROW_SYNC | 108 THROW_SYNC |
95 } | 109 } |
96 | 110 |
111 public void setHandleFlush() { | |
112 mHandleFlush = true; | |
113 } | |
114 | |
97 public void setAutoAdvance(boolean autoAdvance) { | 115 public void setAutoAdvance(boolean autoAdvance) { |
98 mAutoAdvance = autoAdvance; | 116 mAutoAdvance = autoAdvance; |
99 } | 117 } |
100 | 118 |
101 public void setFailure(FailureType failureType, ResponseStep failureStep) { | 119 public void setFailure(FailureType failureType, ResponseStep failureStep) { |
102 mFailureStep = failureStep; | 120 mFailureStep = failureStep; |
103 mFailureType = failureType; | 121 mFailureType = failureType; |
104 } | 122 } |
105 | 123 |
106 public void blockForDone() { | 124 public void blockForDone() { |
(...skipping 12 matching lines...) Expand all Loading... | |
119 | 137 |
120 public Executor getExecutor() { | 138 public Executor getExecutor() { |
121 return mExecutorService; | 139 return mExecutorService; |
122 } | 140 } |
123 | 141 |
124 public void shutdownExecutor() { | 142 public void shutdownExecutor() { |
125 mExecutorService.shutdown(); | 143 mExecutorService.shutdown(); |
126 } | 144 } |
127 | 145 |
128 public void addWriteData(byte[] data) { | 146 public void addWriteData(byte[] data) { |
147 addWriteData(data, false); | |
148 } | |
149 | |
150 public void addWriteData(byte[] data, boolean flush) { | |
129 ByteBuffer writeBuffer = ByteBuffer.allocateDirect(data.length); | 151 ByteBuffer writeBuffer = ByteBuffer.allocateDirect(data.length); |
130 writeBuffer.put(data); | 152 writeBuffer.put(data); |
131 writeBuffer.flip(); | 153 writeBuffer.flip(); |
132 mWriteBuffers.add(writeBuffer); | 154 if (flush && !mHandleFlush) { |
kapishnikov
2016/04/12 19:27:39
Related to the previous comment. I think that shou
xunjieli
2016/04/12 19:59:03
Done.
| |
155 throw new IllegalArgumentException("cannot add flush when auto flush is on"); | |
156 } | |
157 mWriteBuffers.add(new WriteBuffer(writeBuffer, flush)); | |
158 mWriteBuffersToBeAcked.add(new WriteBuffer(writeBuffer, flush)); | |
133 } | 159 } |
134 | 160 |
135 @Override | 161 @Override |
136 public void onRequestHeadersSent(BidirectionalStream stream) { | 162 public void onStreamReady(BidirectionalStream stream) { |
137 assertEquals(mExecutorThread, Thread.currentThread()); | 163 assertEquals(mExecutorThread, Thread.currentThread()); |
138 assertFalse(stream.isDone()); | 164 assertFalse(stream.isDone()); |
139 assertEquals(ResponseStep.NOTHING, mResponseStep); | 165 assertEquals(ResponseStep.NOTHING, mResponseStep); |
140 assertNull(mError); | 166 assertNull(mError); |
141 | 167 |
142 mResponseStep = ResponseStep.ON_REQUEST_HEADERS_SENT; | 168 mResponseStep = ResponseStep.ON_STREAM_READY; |
143 if (maybeThrowCancelOrPause(stream, mWriteStepBlock)) { | 169 if (maybeThrowCancelOrPause(stream, mWriteStepBlock)) { |
144 return; | 170 return; |
145 } | 171 } |
146 startNextWrite(stream); | 172 startNextWrite(stream); |
147 } | 173 } |
148 | 174 |
149 @Override | 175 @Override |
150 public void onResponseHeadersReceived(BidirectionalStream stream, UrlRespons eInfo info) { | 176 public void onResponseHeadersReceived(BidirectionalStream stream, UrlRespons eInfo info) { |
151 assertEquals(mExecutorThread, Thread.currentThread()); | 177 assertEquals(mExecutorThread, Thread.currentThread()); |
152 assertFalse(stream.isDone()); | 178 assertFalse(stream.isDone()); |
153 assertTrue(mResponseStep == ResponseStep.NOTHING | 179 assertTrue(mResponseStep == ResponseStep.NOTHING |
154 || mResponseStep == ResponseStep.ON_REQUEST_HEADERS_SENT | 180 || mResponseStep == ResponseStep.ON_STREAM_READY |
155 || mResponseStep == ResponseStep.ON_WRITE_COMPLETED); | 181 || mResponseStep == ResponseStep.ON_WRITE_COMPLETED); |
156 assertNull(mError); | 182 assertNull(mError); |
157 | 183 |
158 mResponseStep = ResponseStep.ON_RESPONSE_STARTED; | 184 mResponseStep = ResponseStep.ON_RESPONSE_STARTED; |
159 mResponseInfo = info; | 185 mResponseInfo = info; |
160 if (maybeThrowCancelOrPause(stream, mReadStepBlock)) { | 186 if (maybeThrowCancelOrPause(stream, mReadStepBlock)) { |
161 return; | 187 return; |
162 } | 188 } |
163 startNextRead(stream); | 189 startNextRead(stream); |
164 } | 190 } |
(...skipping 28 matching lines...) Expand all Loading... | |
193 startNextRead(stream); | 219 startNextRead(stream); |
194 } | 220 } |
195 | 221 |
196 @Override | 222 @Override |
197 public void onWriteCompleted( | 223 public void onWriteCompleted( |
198 BidirectionalStream stream, UrlResponseInfo info, ByteBuffer buffer) { | 224 BidirectionalStream stream, UrlResponseInfo info, ByteBuffer buffer) { |
199 assertEquals(mExecutorThread, Thread.currentThread()); | 225 assertEquals(mExecutorThread, Thread.currentThread()); |
200 assertFalse(stream.isDone()); | 226 assertFalse(stream.isDone()); |
201 assertNull(mError); | 227 assertNull(mError); |
202 mResponseStep = ResponseStep.ON_WRITE_COMPLETED; | 228 mResponseStep = ResponseStep.ON_WRITE_COMPLETED; |
203 if (!mWriteBuffers.isEmpty()) { | 229 if (!mWriteBuffersToBeAcked.isEmpty()) { |
204 assertEquals(buffer, mWriteBuffers.get(0)); | 230 assertEquals(buffer, mWriteBuffersToBeAcked.get(0).mBuffer); |
205 mWriteBuffers.remove(0); | 231 mWriteBuffersToBeAcked.remove(0); |
206 } | 232 } |
207 if (maybeThrowCancelOrPause(stream, mWriteStepBlock)) { | 233 if (maybeThrowCancelOrPause(stream, mWriteStepBlock)) { |
208 return; | 234 return; |
209 } | 235 } |
210 startNextWrite(stream); | 236 startNextWrite(stream); |
211 } | 237 } |
212 | 238 |
213 @Override | 239 @Override |
214 public void onResponseTrailersReceived(BidirectionalStream stream, UrlRespon seInfo info, | 240 public void onResponseTrailersReceived(BidirectionalStream stream, UrlRespon seInfo info, |
215 UrlResponseInfo.HeaderBlock trailers) { | 241 UrlResponseInfo.HeaderBlock trailers) { |
(...skipping 11 matching lines...) Expand all Loading... | |
227 public void onSucceeded(BidirectionalStream stream, UrlResponseInfo info) { | 253 public void onSucceeded(BidirectionalStream stream, UrlResponseInfo info) { |
228 assertEquals(mExecutorThread, Thread.currentThread()); | 254 assertEquals(mExecutorThread, Thread.currentThread()); |
229 assertTrue(stream.isDone()); | 255 assertTrue(stream.isDone()); |
230 assertTrue(mResponseStep == ResponseStep.ON_RESPONSE_STARTED | 256 assertTrue(mResponseStep == ResponseStep.ON_RESPONSE_STARTED |
231 || mResponseStep == ResponseStep.ON_READ_COMPLETED | 257 || mResponseStep == ResponseStep.ON_READ_COMPLETED |
232 || mResponseStep == ResponseStep.ON_WRITE_COMPLETED | 258 || mResponseStep == ResponseStep.ON_WRITE_COMPLETED |
233 || mResponseStep == ResponseStep.ON_TRAILERS); | 259 || mResponseStep == ResponseStep.ON_TRAILERS); |
234 assertFalse(mOnErrorCalled); | 260 assertFalse(mOnErrorCalled); |
235 assertFalse(mOnCanceledCalled); | 261 assertFalse(mOnCanceledCalled); |
236 assertNull(mError); | 262 assertNull(mError); |
263 assertEquals(0, mWriteBuffers.size()); | |
264 assertEquals(0, mWriteBuffersToBeAcked.size()); | |
237 | 265 |
238 mResponseStep = ResponseStep.ON_SUCCEEDED; | 266 mResponseStep = ResponseStep.ON_SUCCEEDED; |
239 mResponseInfo = info; | 267 mResponseInfo = info; |
240 openDone(); | 268 openDone(); |
241 maybeThrowCancelOrPause(stream, mReadStepBlock); | 269 maybeThrowCancelOrPause(stream, mReadStepBlock); |
242 } | 270 } |
243 | 271 |
244 @Override | 272 @Override |
245 public void onFailed(BidirectionalStream stream, UrlResponseInfo info, Crone tException error) { | 273 public void onFailed(BidirectionalStream stream, UrlResponseInfo info, Crone tException error) { |
246 assertEquals(mExecutorThread, Thread.currentThread()); | 274 assertEquals(mExecutorThread, Thread.currentThread()); |
(...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
278 startNextRead(stream, ByteBuffer.allocateDirect(READ_BUFFER_SIZE)); | 306 startNextRead(stream, ByteBuffer.allocateDirect(READ_BUFFER_SIZE)); |
279 } | 307 } |
280 | 308 |
281 public void startNextRead(BidirectionalStream stream, ByteBuffer buffer) { | 309 public void startNextRead(BidirectionalStream stream, ByteBuffer buffer) { |
282 mBufferPositionBeforeRead = buffer.position(); | 310 mBufferPositionBeforeRead = buffer.position(); |
283 stream.read(buffer); | 311 stream.read(buffer); |
284 } | 312 } |
285 | 313 |
286 public void startNextWrite(BidirectionalStream stream) { | 314 public void startNextWrite(BidirectionalStream stream) { |
287 if (!mWriteBuffers.isEmpty()) { | 315 if (!mWriteBuffers.isEmpty()) { |
288 boolean isLastBuffer = mWriteBuffers.size() == 1; | 316 if (mHandleFlush) { |
289 stream.write(mWriteBuffers.get(0), isLastBuffer); | 317 Iterator<WriteBuffer> iterator = mWriteBuffers.iterator(); |
318 while (iterator.hasNext()) { | |
319 WriteBuffer b = iterator.next(); | |
320 stream.write(b.mBuffer, !iterator.hasNext()); | |
321 iterator.remove(); | |
322 if (b.mFlush) { | |
323 stream.flush(); | |
324 break; | |
325 } | |
326 } | |
327 } else { | |
328 boolean isLastBuffer = (mWriteBuffers.size() == 1); | |
329 stream.write(mWriteBuffers.get(0).mBuffer, isLastBuffer); | |
330 mWriteBuffers.remove(0); | |
331 } | |
290 } | 332 } |
291 } | 333 } |
292 | 334 |
293 public boolean isDone() { | 335 public boolean isDone() { |
294 // It's not mentioned by the Android docs, but block(0) seems to block | 336 // It's not mentioned by the Android docs, but block(0) seems to block |
295 // indefinitely, so have to block for one millisecond to get state | 337 // indefinitely, so have to block for one millisecond to get state |
296 // without blocking. | 338 // without blocking. |
297 return mDone.block(1); | 339 return mDone.block(1); |
298 } | 340 } |
299 | 341 |
(...skipping 25 matching lines...) Expand all Loading... | |
325 }; | 367 }; |
326 if (mFailureType == FailureType.CANCEL_ASYNC | 368 if (mFailureType == FailureType.CANCEL_ASYNC |
327 || mFailureType == FailureType.CANCEL_ASYNC_WITHOUT_PAUSE) { | 369 || mFailureType == FailureType.CANCEL_ASYNC_WITHOUT_PAUSE) { |
328 getExecutor().execute(task); | 370 getExecutor().execute(task); |
329 } else { | 371 } else { |
330 task.run(); | 372 task.run(); |
331 } | 373 } |
332 return mFailureType != FailureType.CANCEL_ASYNC_WITHOUT_PAUSE; | 374 return mFailureType != FailureType.CANCEL_ASYNC_WITHOUT_PAUSE; |
333 } | 375 } |
334 } | 376 } |
OLD | NEW |