Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(645)

Side by Side Diff: components/cronet/android/test/javatests/src/org/chromium/net/TestBidirectionalStreamCallback.java

Issue 1412243012: Initial implementation of CronetBidirectionalStream. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Log the exception. Created 4 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package org.chromium.net;
6
7 import android.os.ConditionVariable;
8
9 import static junit.framework.Assert.assertEquals;
10 import static junit.framework.Assert.assertFalse;
11 import static junit.framework.Assert.assertNull;
12 import static junit.framework.Assert.assertTrue;
13
14 import java.nio.ByteBuffer;
15 import java.util.ArrayList;
16 import java.util.concurrent.Executor;
17 import java.util.concurrent.ExecutorService;
18 import java.util.concurrent.Executors;
19 import java.util.concurrent.ThreadFactory;
20
21 /**
22 * Callback that tracks information from different callbacks and and has a
23 * method to block thread until the stream completes on another thread.
24 * Allows to cancel, block stream or throw an exception from an arbitrary step.
25 */
26 class TestBidirectionalStreamCallback extends BidirectionalStream.Callback {
xunjieli 2016/01/27 16:16:38 nit: add public?
mef 2016/01/27 19:06:47 Done.
27 public UrlResponseInfo mResponseInfo;
28 public CronetException mError;
29
30 public ResponseStep mResponseStep = ResponseStep.NOTHING;
31
32 public boolean mOnErrorCalled = false;
33 public boolean mOnCanceledCalled = false;
34
35 public int mHttpResponseDataLength = 0;
36 public String mResponseAsString = "";
37
38 public UrlResponseInfo.HeaderBlock mTrailers;
39
40 private static final int READ_BUFFER_SIZE = 32 * 1024;
41
42 // When false, the consumer is responsible for all calls into the stream
43 // that advance it.
44 private boolean mAutoAdvance = true;
45
46 // Conditionally fail on certain steps.
47 private FailureType mFailureType = FailureType.NONE;
48 private ResponseStep mFailureStep = ResponseStep.NOTHING;
49
50 // Signals when the stream is done either successfully or not.
51 private final ConditionVariable mDone = new ConditionVariable();
52
53 // Signaled on each step when mAutoAdvance is false.
54 private final ConditionVariable mReadStepBlock = new ConditionVariable();
55 private final ConditionVariable mWriteStepBlock = new ConditionVariable();
56
57 // Blocks direct executor for testing of double reads and writes
58 private final ConditionVariable mDirectExecutorBlock = new ConditionVariable (true);
59
60 // Executor Service for Cronet callbacks.
61 private final ExecutorService mExecutorService =
62 Executors.newSingleThreadExecutor(new ExecutorThreadFactory());
63 private Thread mExecutorThread;
64
65 // position() of ByteBuffer prior to read() call.
66 private int mBufferPositionBeforeRead;
67
68 // Data to write.
69 private ArrayList<ByteBuffer> mWriteBuffers = new ArrayList<ByteBuffer>();
70
71 private class ExecutorThreadFactory implements ThreadFactory {
72 public Thread newThread(Runnable r) {
73 mExecutorThread = new Thread(r);
74 return mExecutorThread;
75 }
76 }
77
78 public enum ResponseStep {
79 NOTHING,
80 ON_REQUEST_HEADERS_SENT,
81 ON_RESPONSE_STARTED,
82 ON_READ_COMPLETED,
83 ON_WRITE_COMPLETED,
84 ON_TRAILERS,
85 ON_CANCELED,
86 ON_FAILED,
87 ON_SUCCEEDED
88 }
89
90 public enum FailureType {
91 NONE,
92 CANCEL_SYNC,
93 CANCEL_ASYNC,
94 // Same as above, but continues to advance the stream after posting
95 // the cancellation task.
96 CANCEL_ASYNC_WITHOUT_PAUSE,
97 THROW_SYNC
98 }
99
100 public void setAutoAdvance(boolean autoAdvance) {
101 mAutoAdvance = autoAdvance;
102 }
103
104 public void setFailure(FailureType failureType, ResponseStep failureStep) {
105 mFailureStep = failureStep;
106 mFailureType = failureType;
107 }
108
109 public void blockForDone() {
110 mDone.block();
111 }
112
113 public void waitForNextReadStep() {
114 mReadStepBlock.block();
115 mReadStepBlock.close();
116 }
117
118 public void waitForNextWriteStep() {
119 mWriteStepBlock.block();
120 mWriteStepBlock.close();
121 }
122
123 public Executor getExecutor() {
124 return mExecutorService;
125 }
126
127 public void shutdownExecutor() {
128 mExecutorService.shutdown();
129 }
130
131 public Executor getBlockingDirectExecutor() {
132 return new Executor() {
133 @Override
134 public void execute(Runnable command) {
135 if (mExecutorThread == null) {
136 mExecutorThread = Thread.currentThread();
137 }
138 mDirectExecutorBlock.block();
139 command.run();
140 }
141 };
142 }
143
144 public void closeDirectExecutorBlock() {
145 mDirectExecutorBlock.close();
146 }
147
148 public void openDirectExecutorBlock() {
149 mDirectExecutorBlock.open();
150 }
151
152 public void addWriteData(byte[] data) {
153 ByteBuffer writeBuffer = ByteBuffer.allocateDirect(data.length);
154 writeBuffer.put(data);
155 writeBuffer.flip();
156 mWriteBuffers.add(writeBuffer);
157 }
158
159 @Override
160 public void onRequestHeadersSent(BidirectionalStream stream) {
161 assertEquals(mExecutorThread, Thread.currentThread());
162 assertFalse(stream.isDone());
163 assertEquals(ResponseStep.NOTHING, mResponseStep);
164 assertNull(mError);
165
166 mResponseStep = ResponseStep.ON_REQUEST_HEADERS_SENT;
167 if (maybeThrowCancelOrPause(stream, mWriteStepBlock)) {
168 return;
169 }
170 startNextWrite(stream);
171 }
172
173 @Override
174 public void onResponseHeadersReceived(BidirectionalStream stream, UrlRespons eInfo info) {
175 assertEquals(mExecutorThread, Thread.currentThread());
176 assertFalse(stream.isDone());
177 assertTrue(mResponseStep == ResponseStep.NOTHING
178 || mResponseStep == ResponseStep.ON_REQUEST_HEADERS_SENT
179 || mResponseStep == ResponseStep.ON_WRITE_COMPLETED);
180 assertNull(mError);
181
182 mResponseStep = ResponseStep.ON_RESPONSE_STARTED;
183 mResponseInfo = info;
184 if (maybeThrowCancelOrPause(stream, mReadStepBlock)) {
185 return;
186 }
187 startNextRead(stream);
188 }
189
190 @Override
191 public void onReadCompleted(
192 BidirectionalStream stream, UrlResponseInfo info, ByteBuffer byteBuf fer) {
193 assertEquals(mExecutorThread, Thread.currentThread());
194 assertFalse(stream.isDone());
195 assertTrue(mResponseStep == ResponseStep.ON_RESPONSE_STARTED
196 || mResponseStep == ResponseStep.ON_READ_COMPLETED
197 || mResponseStep == ResponseStep.ON_WRITE_COMPLETED
198 || mResponseStep == ResponseStep.ON_TRAILERS);
199 assertNull(mError);
200
201 mResponseStep = ResponseStep.ON_READ_COMPLETED;
202
203 final byte[] lastDataReceivedAsBytes;
204 final int bytesRead = byteBuffer.position() - mBufferPositionBeforeRead;
205 mHttpResponseDataLength += bytesRead;
206 lastDataReceivedAsBytes = new byte[bytesRead];
207 // Rewind byteBuffer.position() to pre-read() position.
208 byteBuffer.position(mBufferPositionBeforeRead);
209 // This restores byteBuffer.position() to its value on entrance to
210 // this function.
211 byteBuffer.get(lastDataReceivedAsBytes);
212
213 mResponseAsString += new String(lastDataReceivedAsBytes);
214
215 if (maybeThrowCancelOrPause(stream, mReadStepBlock)) {
216 return;
217 }
218 startNextRead(stream);
219 }
220
221 @Override
222 public void onWriteCompleted(
223 BidirectionalStream stream, UrlResponseInfo info, ByteBuffer buffer) {
224 assertEquals(mExecutorThread, Thread.currentThread());
225 assertFalse(stream.isDone());
226 assertNull(mError);
227 mResponseStep = ResponseStep.ON_WRITE_COMPLETED;
228 if (!mWriteBuffers.isEmpty()) {
229 assertEquals(buffer, mWriteBuffers.get(0));
230 mWriteBuffers.remove(0);
231 }
232 if (maybeThrowCancelOrPause(stream, mWriteStepBlock)) {
233 return;
234 }
235 startNextWrite(stream);
236 }
237
238 @Override
239 public void onResponseTrailersReceived(BidirectionalStream stream, UrlRespon seInfo info,
240 UrlResponseInfo.HeaderBlock trailers) {
241 assertEquals(mExecutorThread, Thread.currentThread());
242 assertFalse(stream.isDone());
243 assertNull(mError);
244 mResponseStep = ResponseStep.ON_TRAILERS;
245 mTrailers = trailers;
246 if (maybeThrowCancelOrPause(stream, mReadStepBlock)) {
247 return;
248 }
249 }
250
251 @Override
252 public void onSucceeded(BidirectionalStream stream, UrlResponseInfo info) {
253 assertEquals(mExecutorThread, Thread.currentThread());
254 assertTrue(stream.isDone());
255 assertTrue(mResponseStep == ResponseStep.ON_RESPONSE_STARTED
256 || mResponseStep == ResponseStep.ON_READ_COMPLETED
257 || mResponseStep == ResponseStep.ON_WRITE_COMPLETED
258 || mResponseStep == ResponseStep.ON_TRAILERS);
259 assertFalse(mOnErrorCalled);
260 assertFalse(mOnCanceledCalled);
261 assertNull(mError);
262
263 mResponseStep = ResponseStep.ON_SUCCEEDED;
264 mResponseInfo = info;
265 openDone();
266 maybeThrowCancelOrPause(stream, mReadStepBlock);
267 }
268
269 @Override
270 public void onFailed(BidirectionalStream stream, UrlResponseInfo info, Crone tException error) {
271 assertEquals(mExecutorThread, Thread.currentThread());
272 assertTrue(stream.isDone());
273 // Shouldn't happen after success.
274 assertTrue(mResponseStep != ResponseStep.ON_SUCCEEDED);
275 // Should happen at most once for a single stream.
276 assertFalse(mOnErrorCalled);
277 assertFalse(mOnCanceledCalled);
278 assertNull(mError);
279 mResponseStep = ResponseStep.ON_FAILED;
280
281 mOnErrorCalled = true;
282 mError = error;
283 openDone();
284 maybeThrowCancelOrPause(stream, mReadStepBlock);
285 }
286
287 @Override
288 public void onCanceled(BidirectionalStream stream, UrlResponseInfo info) {
289 assertEquals(mExecutorThread, Thread.currentThread());
290 assertTrue(stream.isDone());
291 // Should happen at most once for a single stream.
292 assertFalse(mOnCanceledCalled);
293 assertFalse(mOnErrorCalled);
294 assertNull(mError);
295 mResponseStep = ResponseStep.ON_CANCELED;
296
297 mOnCanceledCalled = true;
298 openDone();
299 maybeThrowCancelOrPause(stream, mReadStepBlock);
300 }
301
302 public void startNextRead(BidirectionalStream stream) {
303 startNextRead(stream, ByteBuffer.allocateDirect(READ_BUFFER_SIZE));
304 }
305
306 public void startNextRead(BidirectionalStream stream, ByteBuffer buffer) {
307 mBufferPositionBeforeRead = buffer.position();
308 stream.read(buffer);
309 }
310
311 public void startNextWrite(BidirectionalStream stream) {
312 if (!mWriteBuffers.isEmpty()) {
313 boolean isLastBuffer = mWriteBuffers.size() == 1;
314 stream.write(mWriteBuffers.get(0), isLastBuffer);
315 }
316 }
317
318 public boolean isDone() {
319 // It's not mentioned by the Android docs, but block(0) seems to block
320 // indefinitely, so have to block for one millisecond to get state
321 // without blocking.
322 return mDone.block(1);
323 }
324
325 protected void openDone() {
326 mDone.open();
327 }
328
329 /**
330 * Returns {@code false} if the listener should continue to advance the
xunjieli 2016/01/26 21:41:10 nit: there is no longer a listener.
mef 2016/01/27 19:06:47 Done.
331 * stream.
332 */
333 private boolean maybeThrowCancelOrPause(
334 final BidirectionalStream stream, ConditionVariable stepBlock) {
335 if (mResponseStep != mFailureStep || mFailureType == FailureType.NONE) {
336 if (!mAutoAdvance) {
337 stepBlock.open();
338 return true;
339 }
340 return false;
341 }
342
343 if (mFailureType == FailureType.THROW_SYNC) {
344 throw new IllegalStateException("Callback Exception.");
345 }
346 Runnable task = new Runnable() {
347 public void run() {
348 stream.cancel();
349 }
350 };
351 if (mFailureType == FailureType.CANCEL_ASYNC
352 || mFailureType == FailureType.CANCEL_ASYNC_WITHOUT_PAUSE) {
353 getExecutor().execute(task);
354 } else {
355 task.run();
356 }
357 return mFailureType != FailureType.CANCEL_ASYNC_WITHOUT_PAUSE;
358 }
359 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698