Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(212)

Side by Side Diff: components/cronet/android/test/javatests/src/org/chromium/net/TestBidirectionalStreamCallback.java

Issue 1856073002: Coalesce small buffers in net::BidirectionalStream (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Address Ryan's comments Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package org.chromium.net; 5 package org.chromium.net;
6 6
7 import android.os.ConditionVariable; 7 import android.os.ConditionVariable;
8 8
9 import static junit.framework.Assert.assertEquals; 9 import static junit.framework.Assert.assertEquals;
10 import static junit.framework.Assert.assertFalse; 10 import static junit.framework.Assert.assertFalse;
11 import static junit.framework.Assert.assertNull; 11 import static junit.framework.Assert.assertNull;
12 import static junit.framework.Assert.assertTrue; 12 import static junit.framework.Assert.assertTrue;
13 13
14 import java.nio.ByteBuffer; 14 import java.nio.ByteBuffer;
15 import java.util.ArrayList; 15 import java.util.ArrayList;
16 import java.util.Iterator;
16 import java.util.concurrent.Executor; 17 import java.util.concurrent.Executor;
17 import java.util.concurrent.ExecutorService; 18 import java.util.concurrent.ExecutorService;
18 import java.util.concurrent.Executors; 19 import java.util.concurrent.Executors;
19 import java.util.concurrent.ThreadFactory; 20 import java.util.concurrent.ThreadFactory;
20 21
21 /** 22 /**
22 * Callback that tracks information from different callbacks and and has a 23 * Callback that tracks information from different callbacks and and has a
23 * method to block thread until the stream completes on another thread. 24 * method to block thread until the stream completes on another thread.
24 * Allows to cancel, block stream or throw an exception from an arbitrary step. 25 * Allows to cancel, block stream or throw an exception from an arbitrary step.
25 */ 26 */
(...skipping 30 matching lines...) Expand all
56 57
57 // Executor Service for Cronet callbacks. 58 // Executor Service for Cronet callbacks.
58 private final ExecutorService mExecutorService = 59 private final ExecutorService mExecutorService =
59 Executors.newSingleThreadExecutor(new ExecutorThreadFactory()); 60 Executors.newSingleThreadExecutor(new ExecutorThreadFactory());
60 private Thread mExecutorThread; 61 private Thread mExecutorThread;
61 62
62 // position() of ByteBuffer prior to read() call. 63 // position() of ByteBuffer prior to read() call.
63 private int mBufferPositionBeforeRead; 64 private int mBufferPositionBeforeRead;
64 65
65 // Data to write. 66 // Data to write.
66 private ArrayList<ByteBuffer> mWriteBuffers = new ArrayList<ByteBuffer>(); 67 private ArrayList<WriteBuffer> mWriteBuffers = new ArrayList<WriteBuffer>();
68
69 // Buffers that we yet to receive the corresponding onWriteCompleted callbac k.
70 private ArrayList<WriteBuffer> mWriteBuffersToBeAcked = new ArrayList<WriteB uffer>();
67 71
68 private class ExecutorThreadFactory implements ThreadFactory { 72 private class ExecutorThreadFactory implements ThreadFactory {
69 public Thread newThread(Runnable r) { 73 public Thread newThread(Runnable r) {
70 mExecutorThread = new Thread(r); 74 mExecutorThread = new Thread(r);
71 return mExecutorThread; 75 return mExecutorThread;
72 } 76 }
73 } 77 }
74 78
79 private static class WriteBuffer {
80 final ByteBuffer mBuffer;
81 final boolean mFlush;
82 public WriteBuffer(ByteBuffer buffer, boolean flush) {
83 mBuffer = buffer;
84 mFlush = flush;
85 }
86 }
87
75 public enum ResponseStep { 88 public enum ResponseStep {
76 NOTHING, 89 NOTHING,
77 ON_REQUEST_HEADERS_SENT, 90 ON_STREAM_READY,
78 ON_RESPONSE_STARTED, 91 ON_RESPONSE_STARTED,
79 ON_READ_COMPLETED, 92 ON_READ_COMPLETED,
80 ON_WRITE_COMPLETED, 93 ON_WRITE_COMPLETED,
81 ON_TRAILERS, 94 ON_TRAILERS,
82 ON_CANCELED, 95 ON_CANCELED,
83 ON_FAILED, 96 ON_FAILED,
84 ON_SUCCEEDED 97 ON_SUCCEEDED
85 } 98 }
86 99
87 public enum FailureType { 100 public enum FailureType {
(...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after
119 132
120 public Executor getExecutor() { 133 public Executor getExecutor() {
121 return mExecutorService; 134 return mExecutorService;
122 } 135 }
123 136
124 public void shutdownExecutor() { 137 public void shutdownExecutor() {
125 mExecutorService.shutdown(); 138 mExecutorService.shutdown();
126 } 139 }
127 140
128 public void addWriteData(byte[] data) { 141 public void addWriteData(byte[] data) {
142 addWriteData(data, true);
143 }
144
145 public void addWriteData(byte[] data, boolean flush) {
129 ByteBuffer writeBuffer = ByteBuffer.allocateDirect(data.length); 146 ByteBuffer writeBuffer = ByteBuffer.allocateDirect(data.length);
130 writeBuffer.put(data); 147 writeBuffer.put(data);
131 writeBuffer.flip(); 148 writeBuffer.flip();
132 mWriteBuffers.add(writeBuffer); 149 mWriteBuffers.add(new WriteBuffer(writeBuffer, flush));
150 mWriteBuffersToBeAcked.add(new WriteBuffer(writeBuffer, flush));
133 } 151 }
134 152
135 @Override 153 @Override
136 public void onRequestHeadersSent(BidirectionalStream stream) { 154 public void onStreamReady(BidirectionalStream stream) {
137 assertEquals(mExecutorThread, Thread.currentThread()); 155 assertEquals(mExecutorThread, Thread.currentThread());
138 assertFalse(stream.isDone()); 156 assertFalse(stream.isDone());
139 assertEquals(ResponseStep.NOTHING, mResponseStep); 157 assertEquals(ResponseStep.NOTHING, mResponseStep);
140 assertNull(mError); 158 assertNull(mError);
141 159
142 mResponseStep = ResponseStep.ON_REQUEST_HEADERS_SENT; 160 mResponseStep = ResponseStep.ON_STREAM_READY;
143 if (maybeThrowCancelOrPause(stream, mWriteStepBlock)) { 161 if (maybeThrowCancelOrPause(stream, mWriteStepBlock)) {
144 return; 162 return;
145 } 163 }
146 startNextWrite(stream); 164 startNextWrite(stream);
147 } 165 }
148 166
149 @Override 167 @Override
150 public void onResponseHeadersReceived(BidirectionalStream stream, UrlRespons eInfo info) { 168 public void onResponseHeadersReceived(BidirectionalStream stream, UrlRespons eInfo info) {
151 assertEquals(mExecutorThread, Thread.currentThread()); 169 assertEquals(mExecutorThread, Thread.currentThread());
152 assertFalse(stream.isDone()); 170 assertFalse(stream.isDone());
153 assertTrue(mResponseStep == ResponseStep.NOTHING 171 assertTrue(mResponseStep == ResponseStep.NOTHING
154 || mResponseStep == ResponseStep.ON_REQUEST_HEADERS_SENT 172 || mResponseStep == ResponseStep.ON_STREAM_READY
155 || mResponseStep == ResponseStep.ON_WRITE_COMPLETED); 173 || mResponseStep == ResponseStep.ON_WRITE_COMPLETED);
156 assertNull(mError); 174 assertNull(mError);
157 175
158 mResponseStep = ResponseStep.ON_RESPONSE_STARTED; 176 mResponseStep = ResponseStep.ON_RESPONSE_STARTED;
159 mResponseInfo = info; 177 mResponseInfo = info;
160 if (maybeThrowCancelOrPause(stream, mReadStepBlock)) { 178 if (maybeThrowCancelOrPause(stream, mReadStepBlock)) {
161 return; 179 return;
162 } 180 }
163 startNextRead(stream); 181 startNextRead(stream);
164 } 182 }
(...skipping 28 matching lines...) Expand all
193 startNextRead(stream); 211 startNextRead(stream);
194 } 212 }
195 213
196 @Override 214 @Override
197 public void onWriteCompleted( 215 public void onWriteCompleted(
198 BidirectionalStream stream, UrlResponseInfo info, ByteBuffer buffer) { 216 BidirectionalStream stream, UrlResponseInfo info, ByteBuffer buffer) {
199 assertEquals(mExecutorThread, Thread.currentThread()); 217 assertEquals(mExecutorThread, Thread.currentThread());
200 assertFalse(stream.isDone()); 218 assertFalse(stream.isDone());
201 assertNull(mError); 219 assertNull(mError);
202 mResponseStep = ResponseStep.ON_WRITE_COMPLETED; 220 mResponseStep = ResponseStep.ON_WRITE_COMPLETED;
203 if (!mWriteBuffers.isEmpty()) { 221 if (!mWriteBuffersToBeAcked.isEmpty()) {
204 assertEquals(buffer, mWriteBuffers.get(0)); 222 assertEquals(buffer, mWriteBuffersToBeAcked.get(0).mBuffer);
205 mWriteBuffers.remove(0); 223 mWriteBuffersToBeAcked.remove(0);
206 } 224 }
207 if (maybeThrowCancelOrPause(stream, mWriteStepBlock)) { 225 if (maybeThrowCancelOrPause(stream, mWriteStepBlock)) {
208 return; 226 return;
209 } 227 }
210 startNextWrite(stream); 228 startNextWrite(stream);
211 } 229 }
212 230
213 @Override 231 @Override
214 public void onResponseTrailersReceived(BidirectionalStream stream, UrlRespon seInfo info, 232 public void onResponseTrailersReceived(BidirectionalStream stream, UrlRespon seInfo info,
215 UrlResponseInfo.HeaderBlock trailers) { 233 UrlResponseInfo.HeaderBlock trailers) {
(...skipping 11 matching lines...) Expand all
227 public void onSucceeded(BidirectionalStream stream, UrlResponseInfo info) { 245 public void onSucceeded(BidirectionalStream stream, UrlResponseInfo info) {
228 assertEquals(mExecutorThread, Thread.currentThread()); 246 assertEquals(mExecutorThread, Thread.currentThread());
229 assertTrue(stream.isDone()); 247 assertTrue(stream.isDone());
230 assertTrue(mResponseStep == ResponseStep.ON_RESPONSE_STARTED 248 assertTrue(mResponseStep == ResponseStep.ON_RESPONSE_STARTED
231 || mResponseStep == ResponseStep.ON_READ_COMPLETED 249 || mResponseStep == ResponseStep.ON_READ_COMPLETED
232 || mResponseStep == ResponseStep.ON_WRITE_COMPLETED 250 || mResponseStep == ResponseStep.ON_WRITE_COMPLETED
233 || mResponseStep == ResponseStep.ON_TRAILERS); 251 || mResponseStep == ResponseStep.ON_TRAILERS);
234 assertFalse(mOnErrorCalled); 252 assertFalse(mOnErrorCalled);
235 assertFalse(mOnCanceledCalled); 253 assertFalse(mOnCanceledCalled);
236 assertNull(mError); 254 assertNull(mError);
255 assertEquals(0, mWriteBuffers.size());
256 assertEquals(0, mWriteBuffersToBeAcked.size());
237 257
238 mResponseStep = ResponseStep.ON_SUCCEEDED; 258 mResponseStep = ResponseStep.ON_SUCCEEDED;
239 mResponseInfo = info; 259 mResponseInfo = info;
240 openDone(); 260 openDone();
241 maybeThrowCancelOrPause(stream, mReadStepBlock); 261 maybeThrowCancelOrPause(stream, mReadStepBlock);
242 } 262 }
243 263
244 @Override 264 @Override
245 public void onFailed(BidirectionalStream stream, UrlResponseInfo info, Crone tException error) { 265 public void onFailed(BidirectionalStream stream, UrlResponseInfo info, Crone tException error) {
246 assertEquals(mExecutorThread, Thread.currentThread()); 266 assertEquals(mExecutorThread, Thread.currentThread());
(...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after
278 startNextRead(stream, ByteBuffer.allocateDirect(READ_BUFFER_SIZE)); 298 startNextRead(stream, ByteBuffer.allocateDirect(READ_BUFFER_SIZE));
279 } 299 }
280 300
281 public void startNextRead(BidirectionalStream stream, ByteBuffer buffer) { 301 public void startNextRead(BidirectionalStream stream, ByteBuffer buffer) {
282 mBufferPositionBeforeRead = buffer.position(); 302 mBufferPositionBeforeRead = buffer.position();
283 stream.read(buffer); 303 stream.read(buffer);
284 } 304 }
285 305
286 public void startNextWrite(BidirectionalStream stream) { 306 public void startNextWrite(BidirectionalStream stream) {
287 if (!mWriteBuffers.isEmpty()) { 307 if (!mWriteBuffers.isEmpty()) {
288 boolean isLastBuffer = mWriteBuffers.size() == 1; 308 Iterator<WriteBuffer> iterator = mWriteBuffers.iterator();
289 stream.write(mWriteBuffers.get(0), isLastBuffer); 309 while (iterator.hasNext()) {
310 WriteBuffer b = iterator.next();
311 stream.write(b.mBuffer, !iterator.hasNext());
312 iterator.remove();
313 if (b.mFlush) {
314 stream.flush();
315 break;
316 }
317 }
290 } 318 }
291 } 319 }
292 320
293 public boolean isDone() { 321 public boolean isDone() {
294 // It's not mentioned by the Android docs, but block(0) seems to block 322 // It's not mentioned by the Android docs, but block(0) seems to block
295 // indefinitely, so have to block for one millisecond to get state 323 // indefinitely, so have to block for one millisecond to get state
296 // without blocking. 324 // without blocking.
297 return mDone.block(1); 325 return mDone.block(1);
298 } 326 }
299 327
(...skipping 25 matching lines...) Expand all
325 }; 353 };
326 if (mFailureType == FailureType.CANCEL_ASYNC 354 if (mFailureType == FailureType.CANCEL_ASYNC
327 || mFailureType == FailureType.CANCEL_ASYNC_WITHOUT_PAUSE) { 355 || mFailureType == FailureType.CANCEL_ASYNC_WITHOUT_PAUSE) {
328 getExecutor().execute(task); 356 getExecutor().execute(task);
329 } else { 357 } else {
330 task.run(); 358 task.run();
331 } 359 }
332 return mFailureType != FailureType.CANCEL_ASYNC_WITHOUT_PAUSE; 360 return mFailureType != FailureType.CANCEL_ASYNC_WITHOUT_PAUSE;
333 } 361 }
334 } 362 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698