Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(941)

Side by Side Diff: components/cronet/android/test/javatests/src/org/chromium/net/TestBidirectionalStreamCallback.java

Issue 1856073002: Coalesce small buffers in net::BidirectionalStream (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Fix javadoc Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package org.chromium.net; 5 package org.chromium.net;
6 6
7 import android.os.ConditionVariable; 7 import android.os.ConditionVariable;
8 8
9 import static junit.framework.Assert.assertEquals; 9 import static junit.framework.Assert.assertEquals;
10 import static junit.framework.Assert.assertFalse; 10 import static junit.framework.Assert.assertFalse;
11 import static junit.framework.Assert.assertNull; 11 import static junit.framework.Assert.assertNull;
12 import static junit.framework.Assert.assertTrue; 12 import static junit.framework.Assert.assertTrue;
13 13
14 import java.nio.ByteBuffer; 14 import java.nio.ByteBuffer;
15 import java.util.ArrayList; 15 import java.util.ArrayList;
16 import java.util.Iterator;
16 import java.util.concurrent.Executor; 17 import java.util.concurrent.Executor;
17 import java.util.concurrent.ExecutorService; 18 import java.util.concurrent.ExecutorService;
18 import java.util.concurrent.Executors; 19 import java.util.concurrent.Executors;
19 import java.util.concurrent.ThreadFactory; 20 import java.util.concurrent.ThreadFactory;
20 21
21 /** 22 /**
22 * Callback that tracks information from different callbacks and and has a 23 * Callback that tracks information from different callbacks and and has a
23 * method to block thread until the stream completes on another thread. 24 * method to block thread until the stream completes on another thread.
24 * Allows to cancel, block stream or throw an exception from an arbitrary step. 25 * Allows to cancel, block stream or throw an exception from an arbitrary step.
25 */ 26 */
(...skipping 30 matching lines...) Expand all
56 57
57 // Executor Service for Cronet callbacks. 58 // Executor Service for Cronet callbacks.
58 private final ExecutorService mExecutorService = 59 private final ExecutorService mExecutorService =
59 Executors.newSingleThreadExecutor(new ExecutorThreadFactory()); 60 Executors.newSingleThreadExecutor(new ExecutorThreadFactory());
60 private Thread mExecutorThread; 61 private Thread mExecutorThread;
61 62
62 // position() of ByteBuffer prior to read() call. 63 // position() of ByteBuffer prior to read() call.
63 private int mBufferPositionBeforeRead; 64 private int mBufferPositionBeforeRead;
64 65
65 // Data to write. 66 // Data to write.
66 private ArrayList<ByteBuffer> mWriteBuffers = new ArrayList<ByteBuffer>(); 67 private final ArrayList<WriteBuffer> mWriteBuffers = new ArrayList<WriteBuff er>();
68
69 // Buffers that we yet to receive the corresponding onWriteCompleted callbac k.
70 private final ArrayList<WriteBuffer> mWriteBuffersToBeAcked = new ArrayList< WriteBuffer>();
67 71
68 private class ExecutorThreadFactory implements ThreadFactory { 72 private class ExecutorThreadFactory implements ThreadFactory {
69 public Thread newThread(Runnable r) { 73 public Thread newThread(Runnable r) {
70 mExecutorThread = new Thread(r); 74 mExecutorThread = new Thread(r);
71 return mExecutorThread; 75 return mExecutorThread;
72 } 76 }
73 } 77 }
74 78
79 private static class WriteBuffer {
80 final ByteBuffer mBuffer;
81 final boolean mFlush;
82 public WriteBuffer(ByteBuffer buffer, boolean flush) {
83 mBuffer = buffer;
84 mFlush = flush;
85 }
86 }
87
75 public enum ResponseStep { 88 public enum ResponseStep {
76 NOTHING, 89 NOTHING,
77 ON_REQUEST_HEADERS_SENT, 90 ON_STREAM_READY,
78 ON_RESPONSE_STARTED, 91 ON_RESPONSE_STARTED,
79 ON_READ_COMPLETED, 92 ON_READ_COMPLETED,
80 ON_WRITE_COMPLETED, 93 ON_WRITE_COMPLETED,
81 ON_TRAILERS, 94 ON_TRAILERS,
82 ON_CANCELED, 95 ON_CANCELED,
83 ON_FAILED, 96 ON_FAILED,
84 ON_SUCCEEDED 97 ON_SUCCEEDED,
85 } 98 }
86 99
87 public enum FailureType { 100 public enum FailureType {
88 NONE, 101 NONE,
89 CANCEL_SYNC, 102 CANCEL_SYNC,
90 CANCEL_ASYNC, 103 CANCEL_ASYNC,
91 // Same as above, but continues to advance the stream after posting 104 // Same as above, but continues to advance the stream after posting
92 // the cancellation task. 105 // the cancellation task.
93 CANCEL_ASYNC_WITHOUT_PAUSE, 106 CANCEL_ASYNC_WITHOUT_PAUSE,
94 THROW_SYNC 107 THROW_SYNC
(...skipping 24 matching lines...) Expand all
119 132
120 public Executor getExecutor() { 133 public Executor getExecutor() {
121 return mExecutorService; 134 return mExecutorService;
122 } 135 }
123 136
124 public void shutdownExecutor() { 137 public void shutdownExecutor() {
125 mExecutorService.shutdown(); 138 mExecutorService.shutdown();
126 } 139 }
127 140
128 public void addWriteData(byte[] data) { 141 public void addWriteData(byte[] data) {
142 addWriteData(data, true);
143 }
144
145 public void addWriteData(byte[] data, boolean flush) {
129 ByteBuffer writeBuffer = ByteBuffer.allocateDirect(data.length); 146 ByteBuffer writeBuffer = ByteBuffer.allocateDirect(data.length);
130 writeBuffer.put(data); 147 writeBuffer.put(data);
131 writeBuffer.flip(); 148 writeBuffer.flip();
132 mWriteBuffers.add(writeBuffer); 149 mWriteBuffers.add(new WriteBuffer(writeBuffer, flush));
150 mWriteBuffersToBeAcked.add(new WriteBuffer(writeBuffer, flush));
133 } 151 }
134 152
135 @Override 153 @Override
136 public void onRequestHeadersSent(BidirectionalStream stream) { 154 public void onStreamReady(BidirectionalStream stream) {
137 assertEquals(mExecutorThread, Thread.currentThread()); 155 assertEquals(mExecutorThread, Thread.currentThread());
138 assertFalse(stream.isDone()); 156 assertFalse(stream.isDone());
139 assertEquals(ResponseStep.NOTHING, mResponseStep); 157 assertEquals(ResponseStep.NOTHING, mResponseStep);
140 assertNull(mError); 158 assertNull(mError);
141 159 mResponseStep = ResponseStep.ON_STREAM_READY;
142 mResponseStep = ResponseStep.ON_REQUEST_HEADERS_SENT;
143 if (maybeThrowCancelOrPause(stream, mWriteStepBlock)) { 160 if (maybeThrowCancelOrPause(stream, mWriteStepBlock)) {
144 return; 161 return;
145 } 162 }
146 startNextWrite(stream); 163 startNextWrite(stream);
147 } 164 }
148 165
149 @Override 166 @Override
150 public void onResponseHeadersReceived(BidirectionalStream stream, UrlRespons eInfo info) { 167 public void onResponseHeadersReceived(BidirectionalStream stream, UrlRespons eInfo info) {
151 assertEquals(mExecutorThread, Thread.currentThread()); 168 assertEquals(mExecutorThread, Thread.currentThread());
152 assertFalse(stream.isDone()); 169 assertFalse(stream.isDone());
153 assertTrue(mResponseStep == ResponseStep.NOTHING 170 assertTrue(mResponseStep == ResponseStep.NOTHING
154 || mResponseStep == ResponseStep.ON_REQUEST_HEADERS_SENT 171 || mResponseStep == ResponseStep.ON_STREAM_READY
155 || mResponseStep == ResponseStep.ON_WRITE_COMPLETED); 172 || mResponseStep == ResponseStep.ON_WRITE_COMPLETED);
156 assertNull(mError); 173 assertNull(mError);
157 174
158 mResponseStep = ResponseStep.ON_RESPONSE_STARTED; 175 mResponseStep = ResponseStep.ON_RESPONSE_STARTED;
159 mResponseInfo = info; 176 mResponseInfo = info;
160 if (maybeThrowCancelOrPause(stream, mReadStepBlock)) { 177 if (maybeThrowCancelOrPause(stream, mReadStepBlock)) {
161 return; 178 return;
162 } 179 }
163 startNextRead(stream); 180 startNextRead(stream);
164 } 181 }
165 182
166 @Override 183 @Override
167 public void onReadCompleted( 184 public void onReadCompleted(BidirectionalStream stream, UrlResponseInfo info ,
168 BidirectionalStream stream, UrlResponseInfo info, ByteBuffer byteBuf fer) { 185 ByteBuffer byteBuffer, boolean endOfStream) {
169 assertEquals(mExecutorThread, Thread.currentThread()); 186 assertEquals(mExecutorThread, Thread.currentThread());
170 assertFalse(stream.isDone()); 187 assertFalse(stream.isDone());
171 assertTrue(mResponseStep == ResponseStep.ON_RESPONSE_STARTED 188 assertTrue(mResponseStep == ResponseStep.ON_RESPONSE_STARTED
172 || mResponseStep == ResponseStep.ON_READ_COMPLETED 189 || mResponseStep == ResponseStep.ON_READ_COMPLETED
173 || mResponseStep == ResponseStep.ON_WRITE_COMPLETED 190 || mResponseStep == ResponseStep.ON_WRITE_COMPLETED
174 || mResponseStep == ResponseStep.ON_TRAILERS); 191 || mResponseStep == ResponseStep.ON_TRAILERS);
175 assertNull(mError); 192 assertNull(mError);
176 193
177 mResponseStep = ResponseStep.ON_READ_COMPLETED; 194 mResponseStep = ResponseStep.ON_READ_COMPLETED;
195 mResponseInfo = info;
178 196
179 final int bytesRead = byteBuffer.position() - mBufferPositionBeforeRead; 197 final int bytesRead = byteBuffer.position() - mBufferPositionBeforeRead;
180 mHttpResponseDataLength += bytesRead; 198 mHttpResponseDataLength += bytesRead;
181 final byte[] lastDataReceivedAsBytes = new byte[bytesRead]; 199 final byte[] lastDataReceivedAsBytes = new byte[bytesRead];
182 // Rewind byteBuffer.position() to pre-read() position. 200 // Rewind byteBuffer.position() to pre-read() position.
183 byteBuffer.position(mBufferPositionBeforeRead); 201 byteBuffer.position(mBufferPositionBeforeRead);
184 // This restores byteBuffer.position() to its value on entrance to 202 // This restores byteBuffer.position() to its value on entrance to
185 // this function. 203 // this function.
186 byteBuffer.get(lastDataReceivedAsBytes); 204 byteBuffer.get(lastDataReceivedAsBytes);
187 205
188 mResponseAsString += new String(lastDataReceivedAsBytes); 206 mResponseAsString += new String(lastDataReceivedAsBytes);
189 207
190 if (maybeThrowCancelOrPause(stream, mReadStepBlock)) { 208 if (maybeThrowCancelOrPause(stream, mReadStepBlock)) {
191 return; 209 return;
192 } 210 }
193 startNextRead(stream); 211 // Do not read if EOF has been reached.
212 if (!endOfStream) {
213 startNextRead(stream);
214 }
194 } 215 }
195 216
196 @Override 217 @Override
197 public void onWriteCompleted( 218 public void onWriteCompleted(BidirectionalStream stream, UrlResponseInfo inf o,
198 BidirectionalStream stream, UrlResponseInfo info, ByteBuffer buffer) { 219 ByteBuffer buffer, boolean endOfStream) {
199 assertEquals(mExecutorThread, Thread.currentThread()); 220 assertEquals(mExecutorThread, Thread.currentThread());
200 assertFalse(stream.isDone()); 221 assertFalse(stream.isDone());
201 assertNull(mError); 222 assertNull(mError);
202 mResponseStep = ResponseStep.ON_WRITE_COMPLETED; 223 mResponseStep = ResponseStep.ON_WRITE_COMPLETED;
203 if (!mWriteBuffers.isEmpty()) { 224 mResponseInfo = info;
204 assertEquals(buffer, mWriteBuffers.get(0)); 225 if (!mWriteBuffersToBeAcked.isEmpty()) {
205 mWriteBuffers.remove(0); 226 assertEquals(buffer, mWriteBuffersToBeAcked.get(0).mBuffer);
227 mWriteBuffersToBeAcked.remove(0);
206 } 228 }
207 if (maybeThrowCancelOrPause(stream, mWriteStepBlock)) { 229 if (maybeThrowCancelOrPause(stream, mWriteStepBlock)) {
208 return; 230 return;
209 } 231 }
210 startNextWrite(stream); 232 startNextWrite(stream);
211 } 233 }
212 234
213 @Override 235 @Override
214 public void onResponseTrailersReceived(BidirectionalStream stream, UrlRespon seInfo info, 236 public void onResponseTrailersReceived(BidirectionalStream stream, UrlRespon seInfo info,
215 UrlResponseInfo.HeaderBlock trailers) { 237 UrlResponseInfo.HeaderBlock trailers) {
216 assertEquals(mExecutorThread, Thread.currentThread()); 238 assertEquals(mExecutorThread, Thread.currentThread());
217 assertFalse(stream.isDone()); 239 assertFalse(stream.isDone());
218 assertNull(mError); 240 assertNull(mError);
219 mResponseStep = ResponseStep.ON_TRAILERS; 241 mResponseStep = ResponseStep.ON_TRAILERS;
242 mResponseInfo = info;
220 mTrailers = trailers; 243 mTrailers = trailers;
221 if (maybeThrowCancelOrPause(stream, mReadStepBlock)) { 244 if (maybeThrowCancelOrPause(stream, mReadStepBlock)) {
222 return; 245 return;
223 } 246 }
224 } 247 }
225 248
226 @Override 249 @Override
227 public void onSucceeded(BidirectionalStream stream, UrlResponseInfo info) { 250 public void onSucceeded(BidirectionalStream stream, UrlResponseInfo info) {
228 assertEquals(mExecutorThread, Thread.currentThread()); 251 assertEquals(mExecutorThread, Thread.currentThread());
229 assertTrue(stream.isDone()); 252 assertTrue(stream.isDone());
230 assertTrue(mResponseStep == ResponseStep.ON_RESPONSE_STARTED 253 assertTrue(mResponseStep == ResponseStep.ON_RESPONSE_STARTED
231 || mResponseStep == ResponseStep.ON_READ_COMPLETED 254 || mResponseStep == ResponseStep.ON_READ_COMPLETED
232 || mResponseStep == ResponseStep.ON_WRITE_COMPLETED 255 || mResponseStep == ResponseStep.ON_WRITE_COMPLETED
233 || mResponseStep == ResponseStep.ON_TRAILERS); 256 || mResponseStep == ResponseStep.ON_TRAILERS);
234 assertFalse(mOnErrorCalled); 257 assertFalse(mOnErrorCalled);
235 assertFalse(mOnCanceledCalled); 258 assertFalse(mOnCanceledCalled);
236 assertNull(mError); 259 assertNull(mError);
260 assertEquals(0, mWriteBuffers.size());
261 assertEquals(0, mWriteBuffersToBeAcked.size());
237 262
238 mResponseStep = ResponseStep.ON_SUCCEEDED; 263 mResponseStep = ResponseStep.ON_SUCCEEDED;
239 mResponseInfo = info; 264 mResponseInfo = info;
240 openDone(); 265 openDone();
241 maybeThrowCancelOrPause(stream, mReadStepBlock); 266 maybeThrowCancelOrPause(stream, mReadStepBlock);
242 } 267 }
243 268
244 @Override 269 @Override
245 public void onFailed(BidirectionalStream stream, UrlResponseInfo info, Crone tException error) { 270 public void onFailed(BidirectionalStream stream, UrlResponseInfo info, Crone tException error) {
246 assertEquals(mExecutorThread, Thread.currentThread()); 271 assertEquals(mExecutorThread, Thread.currentThread());
247 assertTrue(stream.isDone()); 272 assertTrue(stream.isDone());
248 // Shouldn't happen after success. 273 // Shouldn't happen after success.
249 assertTrue(mResponseStep != ResponseStep.ON_SUCCEEDED); 274 assertTrue(mResponseStep != ResponseStep.ON_SUCCEEDED);
250 // Should happen at most once for a single stream. 275 // Should happen at most once for a single stream.
251 assertFalse(mOnErrorCalled); 276 assertFalse(mOnErrorCalled);
252 assertFalse(mOnCanceledCalled); 277 assertFalse(mOnCanceledCalled);
253 assertNull(mError); 278 assertNull(mError);
254 mResponseStep = ResponseStep.ON_FAILED; 279 mResponseStep = ResponseStep.ON_FAILED;
280 mResponseInfo = info;
255 281
256 mOnErrorCalled = true; 282 mOnErrorCalled = true;
257 mError = error; 283 mError = error;
258 openDone(); 284 openDone();
259 maybeThrowCancelOrPause(stream, mReadStepBlock); 285 maybeThrowCancelOrPause(stream, mReadStepBlock);
260 } 286 }
261 287
262 @Override 288 @Override
263 public void onCanceled(BidirectionalStream stream, UrlResponseInfo info) { 289 public void onCanceled(BidirectionalStream stream, UrlResponseInfo info) {
264 assertEquals(mExecutorThread, Thread.currentThread()); 290 assertEquals(mExecutorThread, Thread.currentThread());
265 assertTrue(stream.isDone()); 291 assertTrue(stream.isDone());
266 // Should happen at most once for a single stream. 292 // Should happen at most once for a single stream.
267 assertFalse(mOnCanceledCalled); 293 assertFalse(mOnCanceledCalled);
268 assertFalse(mOnErrorCalled); 294 assertFalse(mOnErrorCalled);
269 assertNull(mError); 295 assertNull(mError);
270 mResponseStep = ResponseStep.ON_CANCELED; 296 mResponseStep = ResponseStep.ON_CANCELED;
297 mResponseInfo = info;
271 298
272 mOnCanceledCalled = true; 299 mOnCanceledCalled = true;
273 openDone(); 300 openDone();
274 maybeThrowCancelOrPause(stream, mReadStepBlock); 301 maybeThrowCancelOrPause(stream, mReadStepBlock);
275 } 302 }
276 303
277 public void startNextRead(BidirectionalStream stream) { 304 public void startNextRead(BidirectionalStream stream) {
278 startNextRead(stream, ByteBuffer.allocateDirect(READ_BUFFER_SIZE)); 305 startNextRead(stream, ByteBuffer.allocateDirect(READ_BUFFER_SIZE));
279 } 306 }
280 307
281 public void startNextRead(BidirectionalStream stream, ByteBuffer buffer) { 308 public void startNextRead(BidirectionalStream stream, ByteBuffer buffer) {
282 mBufferPositionBeforeRead = buffer.position(); 309 mBufferPositionBeforeRead = buffer.position();
283 stream.read(buffer); 310 stream.read(buffer);
284 } 311 }
285 312
286 public void startNextWrite(BidirectionalStream stream) { 313 public void startNextWrite(BidirectionalStream stream) {
287 if (!mWriteBuffers.isEmpty()) { 314 if (!mWriteBuffers.isEmpty()) {
288 boolean isLastBuffer = mWriteBuffers.size() == 1; 315 Iterator<WriteBuffer> iterator = mWriteBuffers.iterator();
289 stream.write(mWriteBuffers.get(0), isLastBuffer); 316 while (iterator.hasNext()) {
317 WriteBuffer b = iterator.next();
318 stream.write(b.mBuffer, !iterator.hasNext());
319 iterator.remove();
320 if (b.mFlush) {
321 stream.flush();
322 break;
323 }
324 }
290 } 325 }
291 } 326 }
292 327
293 public boolean isDone() { 328 public boolean isDone() {
294 // It's not mentioned by the Android docs, but block(0) seems to block 329 // It's not mentioned by the Android docs, but block(0) seems to block
295 // indefinitely, so have to block for one millisecond to get state 330 // indefinitely, so have to block for one millisecond to get state
296 // without blocking. 331 // without blocking.
297 return mDone.block(1); 332 return mDone.block(1);
298 } 333 }
299 334
335 /**
336 * Returns the number of pending Writes.
337 */
338 public int numPendingWrites() {
339 return mWriteBuffers.size();
340 }
341
300 protected void openDone() { 342 protected void openDone() {
301 mDone.open(); 343 mDone.open();
302 } 344 }
303 345
304 /** 346 /**
305 * Returns {@code false} if the callback should continue to advance the 347 * Returns {@code false} if the callback should continue to advance the
306 * stream. 348 * stream.
307 */ 349 */
308 private boolean maybeThrowCancelOrPause( 350 private boolean maybeThrowCancelOrPause(
309 final BidirectionalStream stream, ConditionVariable stepBlock) { 351 final BidirectionalStream stream, ConditionVariable stepBlock) {
(...skipping 15 matching lines...) Expand all
325 }; 367 };
326 if (mFailureType == FailureType.CANCEL_ASYNC 368 if (mFailureType == FailureType.CANCEL_ASYNC
327 || mFailureType == FailureType.CANCEL_ASYNC_WITHOUT_PAUSE) { 369 || mFailureType == FailureType.CANCEL_ASYNC_WITHOUT_PAUSE) {
328 getExecutor().execute(task); 370 getExecutor().execute(task);
329 } else { 371 } else {
330 task.run(); 372 task.run();
331 } 373 }
332 return mFailureType != FailureType.CANCEL_ASYNC_WITHOUT_PAUSE; 374 return mFailureType != FailureType.CANCEL_ASYNC_WITHOUT_PAUSE;
333 } 375 }
334 } 376 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698