Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(47)

Side by Side Diff: components/cronet/android/test/javatests/src/org/chromium/net/TestBidirectionalStreamCallback.java

Issue 1412243012: Initial implementation of CronetBidirectionalStream. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Address Helen's comments. Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package org.chromium.net;
6
7 import android.os.ConditionVariable;
8
9 import static junit.framework.Assert.assertEquals;
10 import static junit.framework.Assert.assertFalse;
11 import static junit.framework.Assert.assertNull;
12 import static junit.framework.Assert.assertTrue;
13
14 import java.nio.ByteBuffer;
15 import java.util.ArrayList;
16 import java.util.concurrent.Executor;
17 import java.util.concurrent.ExecutorService;
18 import java.util.concurrent.Executors;
19 import java.util.concurrent.ThreadFactory;
20
21 /**
22 * Callback that tracks information from different callbacks and and has a
23 * method to block thread until the stream completes on another thread.
24 * Allows to cancel, block stream or throw an exception from an arbitrary step.
25 */
26 public class TestBidirectionalStreamCallback extends BidirectionalStream.Callbac k {
27 public UrlResponseInfo mResponseInfo;
28 public CronetException mError;
29
30 public ResponseStep mResponseStep = ResponseStep.NOTHING;
31
32 public boolean mOnErrorCalled = false;
33 public boolean mOnCanceledCalled = false;
34
35 public int mHttpResponseDataLength = 0;
36 public String mResponseAsString = "";
37
38 public UrlResponseInfo.HeaderBlock mTrailers;
39
40 private static final int READ_BUFFER_SIZE = 32 * 1024;
41
42 // When false, the consumer is responsible for all calls into the stream
43 // that advance it.
44 private boolean mAutoAdvance = true;
45
46 // Conditionally fail on certain steps.
47 private FailureType mFailureType = FailureType.NONE;
48 private ResponseStep mFailureStep = ResponseStep.NOTHING;
49
50 // Signals when the stream is done either successfully or not.
51 private final ConditionVariable mDone = new ConditionVariable();
52
53 // Signaled on each step when mAutoAdvance is false.
54 private final ConditionVariable mReadStepBlock = new ConditionVariable();
55 private final ConditionVariable mWriteStepBlock = new ConditionVariable();
56
57 // Executor Service for Cronet callbacks.
58 private final ExecutorService mExecutorService =
59 Executors.newSingleThreadExecutor(new ExecutorThreadFactory());
60 private Thread mExecutorThread;
61
62 // position() of ByteBuffer prior to read() call.
63 private int mBufferPositionBeforeRead;
64
65 // Data to write.
66 private ArrayList<ByteBuffer> mWriteBuffers = new ArrayList<ByteBuffer>();
67
68 private class ExecutorThreadFactory implements ThreadFactory {
69 public Thread newThread(Runnable r) {
70 mExecutorThread = new Thread(r);
71 return mExecutorThread;
72 }
73 }
74
75 public enum ResponseStep {
76 NOTHING,
77 ON_REQUEST_HEADERS_SENT,
78 ON_RESPONSE_STARTED,
79 ON_READ_COMPLETED,
80 ON_WRITE_COMPLETED,
81 ON_TRAILERS,
82 ON_CANCELED,
83 ON_FAILED,
84 ON_SUCCEEDED
85 }
86
87 public enum FailureType {
88 NONE,
89 CANCEL_SYNC,
90 CANCEL_ASYNC,
91 // Same as above, but continues to advance the stream after posting
92 // the cancellation task.
93 CANCEL_ASYNC_WITHOUT_PAUSE,
94 THROW_SYNC
95 }
96
97 public void setAutoAdvance(boolean autoAdvance) {
98 mAutoAdvance = autoAdvance;
99 }
100
101 public void setFailure(FailureType failureType, ResponseStep failureStep) {
102 mFailureStep = failureStep;
103 mFailureType = failureType;
104 }
105
106 public void blockForDone() {
107 mDone.block();
108 }
109
110 public void waitForNextReadStep() {
111 mReadStepBlock.block();
112 mReadStepBlock.close();
113 }
114
115 public void waitForNextWriteStep() {
116 mWriteStepBlock.block();
117 mWriteStepBlock.close();
118 }
119
120 public Executor getExecutor() {
121 return mExecutorService;
122 }
123
124 public void shutdownExecutor() {
125 mExecutorService.shutdown();
126 }
127
128 public void addWriteData(byte[] data) {
129 ByteBuffer writeBuffer = ByteBuffer.allocateDirect(data.length);
130 writeBuffer.put(data);
131 writeBuffer.flip();
132 mWriteBuffers.add(writeBuffer);
133 }
134
135 @Override
136 public void onRequestHeadersSent(BidirectionalStream stream) {
137 assertEquals(mExecutorThread, Thread.currentThread());
138 assertFalse(stream.isDone());
139 assertEquals(ResponseStep.NOTHING, mResponseStep);
140 assertNull(mError);
141
142 mResponseStep = ResponseStep.ON_REQUEST_HEADERS_SENT;
143 if (maybeThrowCancelOrPause(stream, mWriteStepBlock)) {
144 return;
145 }
146 startNextWrite(stream);
147 }
148
149 @Override
150 public void onResponseHeadersReceived(BidirectionalStream stream, UrlRespons eInfo info) {
151 assertEquals(mExecutorThread, Thread.currentThread());
152 assertFalse(stream.isDone());
153 assertTrue(mResponseStep == ResponseStep.NOTHING
154 || mResponseStep == ResponseStep.ON_REQUEST_HEADERS_SENT
155 || mResponseStep == ResponseStep.ON_WRITE_COMPLETED);
156 assertNull(mError);
157
158 mResponseStep = ResponseStep.ON_RESPONSE_STARTED;
159 mResponseInfo = info;
160 if (maybeThrowCancelOrPause(stream, mReadStepBlock)) {
161 return;
162 }
163 startNextRead(stream);
164 }
165
166 @Override
167 public void onReadCompleted(
168 BidirectionalStream stream, UrlResponseInfo info, ByteBuffer byteBuf fer) {
169 assertEquals(mExecutorThread, Thread.currentThread());
170 assertFalse(stream.isDone());
171 assertTrue(mResponseStep == ResponseStep.ON_RESPONSE_STARTED
172 || mResponseStep == ResponseStep.ON_READ_COMPLETED
173 || mResponseStep == ResponseStep.ON_WRITE_COMPLETED
174 || mResponseStep == ResponseStep.ON_TRAILERS);
175 assertNull(mError);
176
177 mResponseStep = ResponseStep.ON_READ_COMPLETED;
178
179 final byte[] lastDataReceivedAsBytes;
180 final int bytesRead = byteBuffer.position() - mBufferPositionBeforeRead;
181 mHttpResponseDataLength += bytesRead;
182 lastDataReceivedAsBytes = new byte[bytesRead];
183 // Rewind byteBuffer.position() to pre-read() position.
184 byteBuffer.position(mBufferPositionBeforeRead);
185 // This restores byteBuffer.position() to its value on entrance to
186 // this function.
187 byteBuffer.get(lastDataReceivedAsBytes);
188
189 mResponseAsString += new String(lastDataReceivedAsBytes);
190
191 if (maybeThrowCancelOrPause(stream, mReadStepBlock)) {
192 return;
193 }
194 startNextRead(stream);
195 }
196
197 @Override
198 public void onWriteCompleted(
199 BidirectionalStream stream, UrlResponseInfo info, ByteBuffer buffer) {
200 assertEquals(mExecutorThread, Thread.currentThread());
201 assertFalse(stream.isDone());
202 assertNull(mError);
203 mResponseStep = ResponseStep.ON_WRITE_COMPLETED;
204 if (!mWriteBuffers.isEmpty()) {
205 assertEquals(buffer, mWriteBuffers.get(0));
206 mWriteBuffers.remove(0);
207 }
208 if (maybeThrowCancelOrPause(stream, mWriteStepBlock)) {
209 return;
210 }
211 startNextWrite(stream);
212 }
213
214 @Override
215 public void onResponseTrailersReceived(BidirectionalStream stream, UrlRespon seInfo info,
216 UrlResponseInfo.HeaderBlock trailers) {
217 assertEquals(mExecutorThread, Thread.currentThread());
218 assertFalse(stream.isDone());
219 assertNull(mError);
220 mResponseStep = ResponseStep.ON_TRAILERS;
221 mTrailers = trailers;
222 if (maybeThrowCancelOrPause(stream, mReadStepBlock)) {
223 return;
224 }
225 }
226
227 @Override
228 public void onSucceeded(BidirectionalStream stream, UrlResponseInfo info) {
229 assertEquals(mExecutorThread, Thread.currentThread());
230 assertTrue(stream.isDone());
231 assertTrue(mResponseStep == ResponseStep.ON_RESPONSE_STARTED
232 || mResponseStep == ResponseStep.ON_READ_COMPLETED
233 || mResponseStep == ResponseStep.ON_WRITE_COMPLETED
234 || mResponseStep == ResponseStep.ON_TRAILERS);
235 assertFalse(mOnErrorCalled);
236 assertFalse(mOnCanceledCalled);
237 assertNull(mError);
238
239 mResponseStep = ResponseStep.ON_SUCCEEDED;
240 mResponseInfo = info;
241 openDone();
242 maybeThrowCancelOrPause(stream, mReadStepBlock);
243 }
244
245 @Override
246 public void onFailed(BidirectionalStream stream, UrlResponseInfo info, Crone tException error) {
247 assertEquals(mExecutorThread, Thread.currentThread());
248 assertTrue(stream.isDone());
249 // Shouldn't happen after success.
250 assertTrue(mResponseStep != ResponseStep.ON_SUCCEEDED);
251 // Should happen at most once for a single stream.
252 assertFalse(mOnErrorCalled);
253 assertFalse(mOnCanceledCalled);
254 assertNull(mError);
255 mResponseStep = ResponseStep.ON_FAILED;
256
257 mOnErrorCalled = true;
258 mError = error;
259 openDone();
260 maybeThrowCancelOrPause(stream, mReadStepBlock);
261 }
262
263 @Override
264 public void onCanceled(BidirectionalStream stream, UrlResponseInfo info) {
265 assertEquals(mExecutorThread, Thread.currentThread());
266 assertTrue(stream.isDone());
267 // Should happen at most once for a single stream.
268 assertFalse(mOnCanceledCalled);
269 assertFalse(mOnErrorCalled);
270 assertNull(mError);
271 mResponseStep = ResponseStep.ON_CANCELED;
272
273 mOnCanceledCalled = true;
274 openDone();
275 maybeThrowCancelOrPause(stream, mReadStepBlock);
276 }
277
278 public void startNextRead(BidirectionalStream stream) {
279 startNextRead(stream, ByteBuffer.allocateDirect(READ_BUFFER_SIZE));
280 }
281
282 public void startNextRead(BidirectionalStream stream, ByteBuffer buffer) {
283 mBufferPositionBeforeRead = buffer.position();
284 stream.read(buffer);
285 }
286
287 public void startNextWrite(BidirectionalStream stream) {
288 if (!mWriteBuffers.isEmpty()) {
289 boolean isLastBuffer = mWriteBuffers.size() == 1;
290 stream.write(mWriteBuffers.get(0), isLastBuffer);
291 }
292 }
293
294 public boolean isDone() {
295 // It's not mentioned by the Android docs, but block(0) seems to block
296 // indefinitely, so have to block for one millisecond to get state
297 // without blocking.
298 return mDone.block(1);
299 }
300
301 protected void openDone() {
302 mDone.open();
303 }
304
305 /**
306 * Returns {@code false} if the callback should continue to advance the
307 * stream.
308 */
309 private boolean maybeThrowCancelOrPause(
310 final BidirectionalStream stream, ConditionVariable stepBlock) {
311 if (mResponseStep != mFailureStep || mFailureType == FailureType.NONE) {
312 if (!mAutoAdvance) {
313 stepBlock.open();
314 return true;
315 }
316 return false;
317 }
318
319 if (mFailureType == FailureType.THROW_SYNC) {
320 throw new IllegalStateException("Callback Exception.");
321 }
322 Runnable task = new Runnable() {
323 public void run() {
324 stream.cancel();
325 }
326 };
327 if (mFailureType == FailureType.CANCEL_ASYNC
328 || mFailureType == FailureType.CANCEL_ASYNC_WITHOUT_PAUSE) {
329 getExecutor().execute(task);
330 } else {
331 task.run();
332 }
333 return mFailureType != FailureType.CANCEL_ASYNC_WITHOUT_PAUSE;
334 }
335 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698