Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(165)

Side by Side Diff: components/cronet/android/test/javatests/src/org/chromium/net/TestBidirectionalStreamCallback.java

Issue 1856073002: Coalesce small buffers in net::BidirectionalStream (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Address Andrei's comments Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package org.chromium.net; 5 package org.chromium.net;
6 6
7 import android.os.ConditionVariable; 7 import android.os.ConditionVariable;
8 8
9 import static junit.framework.Assert.assertEquals; 9 import static junit.framework.Assert.assertEquals;
10 import static junit.framework.Assert.assertFalse; 10 import static junit.framework.Assert.assertFalse;
11 import static junit.framework.Assert.assertNull; 11 import static junit.framework.Assert.assertNull;
12 import static junit.framework.Assert.assertTrue; 12 import static junit.framework.Assert.assertTrue;
13 13
14 import java.nio.ByteBuffer; 14 import java.nio.ByteBuffer;
15 import java.util.ArrayList; 15 import java.util.ArrayList;
16 import java.util.Iterator;
16 import java.util.concurrent.Executor; 17 import java.util.concurrent.Executor;
17 import java.util.concurrent.ExecutorService; 18 import java.util.concurrent.ExecutorService;
18 import java.util.concurrent.Executors; 19 import java.util.concurrent.Executors;
19 import java.util.concurrent.ThreadFactory; 20 import java.util.concurrent.ThreadFactory;
20 21
21 /** 22 /**
22 * Callback that tracks information from different callbacks and and has a 23 * Callback that tracks information from different callbacks and and has a
23 * method to block thread until the stream completes on another thread. 24 * method to block thread until the stream completes on another thread.
24 * Allows to cancel, block stream or throw an exception from an arbitrary step. 25 * Allows to cancel, block stream or throw an exception from an arbitrary step.
25 */ 26 */
26 public class TestBidirectionalStreamCallback extends BidirectionalStream.Callbac k { 27 public class TestBidirectionalStreamCallback extends BidirectionalStream.Callbac k {
27 public UrlResponseInfo mResponseInfo; 28 public UrlResponseInfo mResponseInfo;
28 public CronetException mError; 29 public CronetException mError;
29 30
30 public ResponseStep mResponseStep = ResponseStep.NOTHING; 31 public ResponseStep mResponseStep = ResponseStep.NOTHING;
31 32
32 public boolean mOnErrorCalled = false; 33 public boolean mOnErrorCalled = false;
33 public boolean mOnCanceledCalled = false; 34 public boolean mOnCanceledCalled = false;
34 35
35 public int mHttpResponseDataLength = 0; 36 public int mHttpResponseDataLength = 0;
36 public String mResponseAsString = ""; 37 public String mResponseAsString = "";
37 38
38 public UrlResponseInfo.HeaderBlock mTrailers; 39 public UrlResponseInfo.HeaderBlock mTrailers;
39 40
40 private static final int READ_BUFFER_SIZE = 32 * 1024; 41 private static final int READ_BUFFER_SIZE = 32 * 1024;
41 42
43 private boolean mHandleFlush;
42 // When false, the consumer is responsible for all calls into the stream 44 // When false, the consumer is responsible for all calls into the stream
43 // that advance it. 45 // that advance it.
44 private boolean mAutoAdvance = true; 46 private boolean mAutoAdvance = true;
45 47
46 // Conditionally fail on certain steps. 48 // Conditionally fail on certain steps.
47 private FailureType mFailureType = FailureType.NONE; 49 private FailureType mFailureType = FailureType.NONE;
48 private ResponseStep mFailureStep = ResponseStep.NOTHING; 50 private ResponseStep mFailureStep = ResponseStep.NOTHING;
49 51
50 // Signals when the stream is done either successfully or not. 52 // Signals when the stream is done either successfully or not.
51 private final ConditionVariable mDone = new ConditionVariable(); 53 private final ConditionVariable mDone = new ConditionVariable();
52 54
53 // Signaled on each step when mAutoAdvance is false. 55 // Signaled on each step when mAutoAdvance is false.
54 private final ConditionVariable mReadStepBlock = new ConditionVariable(); 56 private final ConditionVariable mReadStepBlock = new ConditionVariable();
55 private final ConditionVariable mWriteStepBlock = new ConditionVariable(); 57 private final ConditionVariable mWriteStepBlock = new ConditionVariable();
56 58
57 // Executor Service for Cronet callbacks. 59 // Executor Service for Cronet callbacks.
58 private final ExecutorService mExecutorService = 60 private final ExecutorService mExecutorService =
59 Executors.newSingleThreadExecutor(new ExecutorThreadFactory()); 61 Executors.newSingleThreadExecutor(new ExecutorThreadFactory());
60 private Thread mExecutorThread; 62 private Thread mExecutorThread;
61 63
62 // position() of ByteBuffer prior to read() call. 64 // position() of ByteBuffer prior to read() call.
63 private int mBufferPositionBeforeRead; 65 private int mBufferPositionBeforeRead;
64 66
65 // Data to write. 67 // Data to write.
66 private ArrayList<ByteBuffer> mWriteBuffers = new ArrayList<ByteBuffer>(); 68 private ArrayList<WriteBuffer> mWriteBuffers = new ArrayList<WriteBuffer>();
69
70 // Buffers that we yet to receive the corresponding onWriteCompleted callbac k.
71 private ArrayList<WriteBuffer> mWriteBuffersToBeAcked = new ArrayList<WriteB uffer>();
67 72
68 private class ExecutorThreadFactory implements ThreadFactory { 73 private class ExecutorThreadFactory implements ThreadFactory {
69 public Thread newThread(Runnable r) { 74 public Thread newThread(Runnable r) {
70 mExecutorThread = new Thread(r); 75 mExecutorThread = new Thread(r);
71 return mExecutorThread; 76 return mExecutorThread;
72 } 77 }
73 } 78 }
74 79
80 private static class WriteBuffer {
81 final ByteBuffer mBuffer;
82 final boolean mFlush;
83 public WriteBuffer(ByteBuffer buffer, boolean flush) {
84 mBuffer = buffer;
85 mFlush = flush;
86 }
87 }
88
75 public enum ResponseStep { 89 public enum ResponseStep {
76 NOTHING, 90 NOTHING,
77 ON_REQUEST_HEADERS_SENT, 91 ON_STREAM_READY,
78 ON_RESPONSE_STARTED, 92 ON_RESPONSE_STARTED,
79 ON_READ_COMPLETED, 93 ON_READ_COMPLETED,
80 ON_WRITE_COMPLETED, 94 ON_WRITE_COMPLETED,
81 ON_TRAILERS, 95 ON_TRAILERS,
82 ON_CANCELED, 96 ON_CANCELED,
83 ON_FAILED, 97 ON_FAILED,
84 ON_SUCCEEDED 98 ON_SUCCEEDED
85 } 99 }
86 100
87 public enum FailureType { 101 public enum FailureType {
88 NONE, 102 NONE,
89 CANCEL_SYNC, 103 CANCEL_SYNC,
90 CANCEL_ASYNC, 104 CANCEL_ASYNC,
91 // Same as above, but continues to advance the stream after posting 105 // Same as above, but continues to advance the stream after posting
92 // the cancellation task. 106 // the cancellation task.
93 CANCEL_ASYNC_WITHOUT_PAUSE, 107 CANCEL_ASYNC_WITHOUT_PAUSE,
94 THROW_SYNC 108 THROW_SYNC
95 } 109 }
96 110
111 public void setHandleFlush() {
112 mHandleFlush = true;
113 }
114
97 public void setAutoAdvance(boolean autoAdvance) { 115 public void setAutoAdvance(boolean autoAdvance) {
98 mAutoAdvance = autoAdvance; 116 mAutoAdvance = autoAdvance;
99 } 117 }
100 118
101 public void setFailure(FailureType failureType, ResponseStep failureStep) { 119 public void setFailure(FailureType failureType, ResponseStep failureStep) {
102 mFailureStep = failureStep; 120 mFailureStep = failureStep;
103 mFailureType = failureType; 121 mFailureType = failureType;
104 } 122 }
105 123
106 public void blockForDone() { 124 public void blockForDone() {
(...skipping 12 matching lines...) Expand all
119 137
120 public Executor getExecutor() { 138 public Executor getExecutor() {
121 return mExecutorService; 139 return mExecutorService;
122 } 140 }
123 141
124 public void shutdownExecutor() { 142 public void shutdownExecutor() {
125 mExecutorService.shutdown(); 143 mExecutorService.shutdown();
126 } 144 }
127 145
128 public void addWriteData(byte[] data) { 146 public void addWriteData(byte[] data) {
147 addWriteData(data, false);
148 }
149
150 public void addWriteData(byte[] data, boolean flush) {
129 ByteBuffer writeBuffer = ByteBuffer.allocateDirect(data.length); 151 ByteBuffer writeBuffer = ByteBuffer.allocateDirect(data.length);
130 writeBuffer.put(data); 152 writeBuffer.put(data);
131 writeBuffer.flip(); 153 writeBuffer.flip();
132 mWriteBuffers.add(writeBuffer); 154 if (flush && !mHandleFlush) {
kapishnikov 2016/04/12 19:27:39 Related to the previous comment. I think that shou
xunjieli 2016/04/12 19:59:03 Done.
155 throw new IllegalArgumentException("cannot add flush when auto flush is on");
156 }
157 mWriteBuffers.add(new WriteBuffer(writeBuffer, flush));
158 mWriteBuffersToBeAcked.add(new WriteBuffer(writeBuffer, flush));
133 } 159 }
134 160
135 @Override 161 @Override
136 public void onRequestHeadersSent(BidirectionalStream stream) { 162 public void onStreamReady(BidirectionalStream stream) {
137 assertEquals(mExecutorThread, Thread.currentThread()); 163 assertEquals(mExecutorThread, Thread.currentThread());
138 assertFalse(stream.isDone()); 164 assertFalse(stream.isDone());
139 assertEquals(ResponseStep.NOTHING, mResponseStep); 165 assertEquals(ResponseStep.NOTHING, mResponseStep);
140 assertNull(mError); 166 assertNull(mError);
141 167
142 mResponseStep = ResponseStep.ON_REQUEST_HEADERS_SENT; 168 mResponseStep = ResponseStep.ON_STREAM_READY;
143 if (maybeThrowCancelOrPause(stream, mWriteStepBlock)) { 169 if (maybeThrowCancelOrPause(stream, mWriteStepBlock)) {
144 return; 170 return;
145 } 171 }
146 startNextWrite(stream); 172 startNextWrite(stream);
147 } 173 }
148 174
149 @Override 175 @Override
150 public void onResponseHeadersReceived(BidirectionalStream stream, UrlRespons eInfo info) { 176 public void onResponseHeadersReceived(BidirectionalStream stream, UrlRespons eInfo info) {
151 assertEquals(mExecutorThread, Thread.currentThread()); 177 assertEquals(mExecutorThread, Thread.currentThread());
152 assertFalse(stream.isDone()); 178 assertFalse(stream.isDone());
153 assertTrue(mResponseStep == ResponseStep.NOTHING 179 assertTrue(mResponseStep == ResponseStep.NOTHING
154 || mResponseStep == ResponseStep.ON_REQUEST_HEADERS_SENT 180 || mResponseStep == ResponseStep.ON_STREAM_READY
155 || mResponseStep == ResponseStep.ON_WRITE_COMPLETED); 181 || mResponseStep == ResponseStep.ON_WRITE_COMPLETED);
156 assertNull(mError); 182 assertNull(mError);
157 183
158 mResponseStep = ResponseStep.ON_RESPONSE_STARTED; 184 mResponseStep = ResponseStep.ON_RESPONSE_STARTED;
159 mResponseInfo = info; 185 mResponseInfo = info;
160 if (maybeThrowCancelOrPause(stream, mReadStepBlock)) { 186 if (maybeThrowCancelOrPause(stream, mReadStepBlock)) {
161 return; 187 return;
162 } 188 }
163 startNextRead(stream); 189 startNextRead(stream);
164 } 190 }
(...skipping 28 matching lines...) Expand all
193 startNextRead(stream); 219 startNextRead(stream);
194 } 220 }
195 221
196 @Override 222 @Override
197 public void onWriteCompleted( 223 public void onWriteCompleted(
198 BidirectionalStream stream, UrlResponseInfo info, ByteBuffer buffer) { 224 BidirectionalStream stream, UrlResponseInfo info, ByteBuffer buffer) {
199 assertEquals(mExecutorThread, Thread.currentThread()); 225 assertEquals(mExecutorThread, Thread.currentThread());
200 assertFalse(stream.isDone()); 226 assertFalse(stream.isDone());
201 assertNull(mError); 227 assertNull(mError);
202 mResponseStep = ResponseStep.ON_WRITE_COMPLETED; 228 mResponseStep = ResponseStep.ON_WRITE_COMPLETED;
203 if (!mWriteBuffers.isEmpty()) { 229 if (!mWriteBuffersToBeAcked.isEmpty()) {
204 assertEquals(buffer, mWriteBuffers.get(0)); 230 assertEquals(buffer, mWriteBuffersToBeAcked.get(0).mBuffer);
205 mWriteBuffers.remove(0); 231 mWriteBuffersToBeAcked.remove(0);
206 } 232 }
207 if (maybeThrowCancelOrPause(stream, mWriteStepBlock)) { 233 if (maybeThrowCancelOrPause(stream, mWriteStepBlock)) {
208 return; 234 return;
209 } 235 }
210 startNextWrite(stream); 236 startNextWrite(stream);
211 } 237 }
212 238
213 @Override 239 @Override
214 public void onResponseTrailersReceived(BidirectionalStream stream, UrlRespon seInfo info, 240 public void onResponseTrailersReceived(BidirectionalStream stream, UrlRespon seInfo info,
215 UrlResponseInfo.HeaderBlock trailers) { 241 UrlResponseInfo.HeaderBlock trailers) {
(...skipping 11 matching lines...) Expand all
227 public void onSucceeded(BidirectionalStream stream, UrlResponseInfo info) { 253 public void onSucceeded(BidirectionalStream stream, UrlResponseInfo info) {
228 assertEquals(mExecutorThread, Thread.currentThread()); 254 assertEquals(mExecutorThread, Thread.currentThread());
229 assertTrue(stream.isDone()); 255 assertTrue(stream.isDone());
230 assertTrue(mResponseStep == ResponseStep.ON_RESPONSE_STARTED 256 assertTrue(mResponseStep == ResponseStep.ON_RESPONSE_STARTED
231 || mResponseStep == ResponseStep.ON_READ_COMPLETED 257 || mResponseStep == ResponseStep.ON_READ_COMPLETED
232 || mResponseStep == ResponseStep.ON_WRITE_COMPLETED 258 || mResponseStep == ResponseStep.ON_WRITE_COMPLETED
233 || mResponseStep == ResponseStep.ON_TRAILERS); 259 || mResponseStep == ResponseStep.ON_TRAILERS);
234 assertFalse(mOnErrorCalled); 260 assertFalse(mOnErrorCalled);
235 assertFalse(mOnCanceledCalled); 261 assertFalse(mOnCanceledCalled);
236 assertNull(mError); 262 assertNull(mError);
263 assertEquals(0, mWriteBuffers.size());
264 assertEquals(0, mWriteBuffersToBeAcked.size());
237 265
238 mResponseStep = ResponseStep.ON_SUCCEEDED; 266 mResponseStep = ResponseStep.ON_SUCCEEDED;
239 mResponseInfo = info; 267 mResponseInfo = info;
240 openDone(); 268 openDone();
241 maybeThrowCancelOrPause(stream, mReadStepBlock); 269 maybeThrowCancelOrPause(stream, mReadStepBlock);
242 } 270 }
243 271
244 @Override 272 @Override
245 public void onFailed(BidirectionalStream stream, UrlResponseInfo info, Crone tException error) { 273 public void onFailed(BidirectionalStream stream, UrlResponseInfo info, Crone tException error) {
246 assertEquals(mExecutorThread, Thread.currentThread()); 274 assertEquals(mExecutorThread, Thread.currentThread());
(...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after
278 startNextRead(stream, ByteBuffer.allocateDirect(READ_BUFFER_SIZE)); 306 startNextRead(stream, ByteBuffer.allocateDirect(READ_BUFFER_SIZE));
279 } 307 }
280 308
281 public void startNextRead(BidirectionalStream stream, ByteBuffer buffer) { 309 public void startNextRead(BidirectionalStream stream, ByteBuffer buffer) {
282 mBufferPositionBeforeRead = buffer.position(); 310 mBufferPositionBeforeRead = buffer.position();
283 stream.read(buffer); 311 stream.read(buffer);
284 } 312 }
285 313
286 public void startNextWrite(BidirectionalStream stream) { 314 public void startNextWrite(BidirectionalStream stream) {
287 if (!mWriteBuffers.isEmpty()) { 315 if (!mWriteBuffers.isEmpty()) {
288 boolean isLastBuffer = mWriteBuffers.size() == 1; 316 if (mHandleFlush) {
289 stream.write(mWriteBuffers.get(0), isLastBuffer); 317 Iterator<WriteBuffer> iterator = mWriteBuffers.iterator();
318 while (iterator.hasNext()) {
319 WriteBuffer b = iterator.next();
320 stream.write(b.mBuffer, !iterator.hasNext());
321 iterator.remove();
322 if (b.mFlush) {
323 stream.flush();
324 break;
325 }
326 }
327 } else {
328 boolean isLastBuffer = (mWriteBuffers.size() == 1);
329 stream.write(mWriteBuffers.get(0).mBuffer, isLastBuffer);
330 mWriteBuffers.remove(0);
331 }
290 } 332 }
291 } 333 }
292 334
293 public boolean isDone() { 335 public boolean isDone() {
294 // It's not mentioned by the Android docs, but block(0) seems to block 336 // It's not mentioned by the Android docs, but block(0) seems to block
295 // indefinitely, so have to block for one millisecond to get state 337 // indefinitely, so have to block for one millisecond to get state
296 // without blocking. 338 // without blocking.
297 return mDone.block(1); 339 return mDone.block(1);
298 } 340 }
299 341
(...skipping 25 matching lines...) Expand all
325 }; 367 };
326 if (mFailureType == FailureType.CANCEL_ASYNC 368 if (mFailureType == FailureType.CANCEL_ASYNC
327 || mFailureType == FailureType.CANCEL_ASYNC_WITHOUT_PAUSE) { 369 || mFailureType == FailureType.CANCEL_ASYNC_WITHOUT_PAUSE) {
328 getExecutor().execute(task); 370 getExecutor().execute(task);
329 } else { 371 } else {
330 task.run(); 372 task.run();
331 } 373 }
332 return mFailureType != FailureType.CANCEL_ASYNC_WITHOUT_PAUSE; 374 return mFailureType != FailureType.CANCEL_ASYNC_WITHOUT_PAUSE;
333 } 375 }
334 } 376 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698