Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(54)

Side by Side Diff: components/cronet/android/java/src/org/chromium/net/CronetBidirectionalStream.java

Issue 1856073002: Coalesce small buffers in net::BidirectionalStream (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: self review Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package org.chromium.net; 5 package org.chromium.net;
6 6
7 import org.chromium.base.Log; 7 import org.chromium.base.Log;
8 import org.chromium.base.VisibleForTesting; 8 import org.chromium.base.VisibleForTesting;
9 import org.chromium.base.annotations.CalledByNative; 9 import org.chromium.base.annotations.CalledByNative;
10 import org.chromium.base.annotations.JNINamespace; 10 import org.chromium.base.annotations.JNINamespace;
11 import org.chromium.base.annotations.NativeClassQualifiedName; 11 import org.chromium.base.annotations.NativeClassQualifiedName;
12 12
13 import java.nio.ByteBuffer; 13 import java.nio.ByteBuffer;
14 import java.util.AbstractMap; 14 import java.util.AbstractMap;
15 import java.util.ArrayList; 15 import java.util.ArrayList;
16 import java.util.Arrays; 16 import java.util.Arrays;
17 import java.util.LinkedList;
17 import java.util.List; 18 import java.util.List;
18 import java.util.Map; 19 import java.util.Map;
19 import java.util.concurrent.Executor; 20 import java.util.concurrent.Executor;
20 import java.util.concurrent.RejectedExecutionException; 21 import java.util.concurrent.RejectedExecutionException;
21 22
22 import javax.annotation.concurrent.GuardedBy; 23 import javax.annotation.concurrent.GuardedBy;
23 24
24 /** 25 /**
25 * {@link BidirectionalStream} implementation using Chromium network stack. 26 * {@link BidirectionalStream} implementation using Chromium network stack.
26 * All @CalledByNative methods are called on the native network thread 27 * All @CalledByNative methods are called on the native network thread
(...skipping 20 matching lines...) Expand all
47 /* Reading from the remote, {@code onReadCompleted()} callback will be c alled when done. */ 48 /* Reading from the remote, {@code onReadCompleted()} callback will be c alled when done. */
48 READING, 49 READING,
49 /* There is no more data to read and stream is half-closed by the remote side. */ 50 /* There is no more data to read and stream is half-closed by the remote side. */
50 READING_DONE, 51 READING_DONE,
51 /* Stream is canceled. */ 52 /* Stream is canceled. */
52 CANCELED, 53 CANCELED,
53 /* Error has occured, stream is closed. */ 54 /* Error has occured, stream is closed. */
54 ERROR, 55 ERROR,
55 /* Reading and writing are done, and the stream is closed successfully. */ 56 /* Reading and writing are done, and the stream is closed successfully. */
56 SUCCESS, 57 SUCCESS,
57 /* Waiting for {@code write()} to be called. */ 58 /* Waiting for {@code nativeWritevData()} to be called. */
58 WAITING_FOR_WRITE, 59 WAITING_FOR_WRITE,
59 /* Writing to the remote, {@code onWriteCompleted()} callback will be ca lled when done. */ 60 /* Writing to the remote, {@code onWritevCompleted()} callback will be c alled when done. */
60 WRITING, 61 WRITING,
61 /* There is no more data to write and stream is half-closed by the local side. */ 62 /* There is no more data to write and stream is half-closed by the local side. */
62 WRITING_DONE, 63 WRITING_DONE,
63 } 64 }
64 65
65 private final CronetUrlRequestContext mRequestContext; 66 private final CronetUrlRequestContext mRequestContext;
66 private final Executor mExecutor; 67 private final Executor mExecutor;
67 private final Callback mCallback; 68 private final Callback mCallback;
68 private final String mInitialUrl; 69 private final String mInitialUrl;
69 private final int mInitialPriority; 70 private final int mInitialPriority;
70 private final String mInitialMethod; 71 private final String mInitialMethod;
71 private final String mRequestHeaders[]; 72 private final String mRequestHeaders[];
73 private final boolean mDisableAutoFlush;
72 74
73 /* 75 /*
74 * Synchronizes access to mNativeStream, mReadState and mWriteState. 76 * Synchronizes access to mNativeStream, mReadState and mWriteState.
75 */ 77 */
76 private final Object mNativeStreamLock = new Object(); 78 private final Object mNativeStreamLock = new Object();
77 79
80 @GuardedBy("mNativeStreamLock")
81 // Pending write data.
82 private LinkedList<ByteBuffer> mPendingData;
83
84 @GuardedBy("mNativeStreamLock")
85 // Flush data queue that should be pushed to the native stack when the previ ous
86 // nativeWritevData completes.
87 private LinkedList<ByteBuffer> mFlushData;
88
89 @GuardedBy("mNativeStreamLock")
90 // Whether an end-of-stream flag is passed in through write().
91 private boolean mEndOfStreamWritten;
92
78 /* Native BidirectionalStream object, owned by CronetBidirectionalStream. */ 93 /* Native BidirectionalStream object, owned by CronetBidirectionalStream. */
79 @GuardedBy("mNativeStreamLock") 94 @GuardedBy("mNativeStreamLock")
80 private long mNativeStream; 95 private long mNativeStream;
81 96
82 /** 97 /**
83 * Read state is tracking reading flow. 98 * Read state is tracking reading flow.
84 * / <--- READING <--- \ 99 * / <--- READING <--- \
85 * | | 100 * | |
86 * \ / 101 * \ /
87 * NOT_STARTED -> STARTED --> WAITING_FOR_READ -> READING_DONE -> SUCCESS 102 * NOT_STARTED -> STARTED --> WAITING_FOR_READ -> READING_DONE -> SUCCESS
88 */ 103 */
89 @GuardedBy("mNativeStreamLock") 104 @GuardedBy("mNativeStreamLock")
90 private State mReadState = State.NOT_STARTED; 105 private State mReadState = State.NOT_STARTED;
91 106
92 /** 107 /**
93 * Write state is tracking writing flow. 108 * Write state is tracking writing flow.
94 * / <--- WRITING <--- \ 109 * / <--- WRITING <--- \
95 * | | 110 * | |
96 * \ / 111 * \ /
97 * NOT_STARTED -> STARTED --> WAITING_FOR_WRITE -> WRITING_DONE -> SUCCESS 112 * NOT_STARTED -> STARTED --> WAITING_FOR_WRITE -> WRITING_DONE -> SUCCESS
98 */ 113 */
99 @GuardedBy("mNativeStreamLock") 114 @GuardedBy("mNativeStreamLock")
100 private State mWriteState = State.NOT_STARTED; 115 private State mWriteState = State.NOT_STARTED;
101 116
117 // Only modified on the network thread.
102 private UrlResponseInfo mResponseInfo; 118 private UrlResponseInfo mResponseInfo;
103 119
104 /* 120 /*
105 * OnReadCompleted callback is repeatedly invoked when each read is complete d, so it 121 * OnReadCompleted callback is repeatedly invoked when each read is complete d, so it
106 * is cached as a member variable. 122 * is cached as a member variable.
107 */ 123 */
124 // Only modified on the network thread.
108 private OnReadCompletedRunnable mOnReadCompletedTask; 125 private OnReadCompletedRunnable mOnReadCompletedTask;
109 126
110 /*
111 * OnWriteCompleted callback is repeatedly invoked when each write is comple ted, so it
112 * is cached as a member variable.
113 */
114 private OnWriteCompletedRunnable mOnWriteCompletedTask;
115
116 private Runnable mOnDestroyedCallbackForTesting; 127 private Runnable mOnDestroyedCallbackForTesting;
117 128
118 private final class OnReadCompletedRunnable implements Runnable { 129 private final class OnReadCompletedRunnable implements Runnable {
119 // Buffer passed back from current invocation of onReadCompleted. 130 // Buffer passed back from current invocation of onReadCompleted.
120 ByteBuffer mByteBuffer; 131 ByteBuffer mByteBuffer;
121 // End of stream flag from current invocation of onReadCompleted. 132 // End of stream flag from current invocation of onReadCompleted.
122 boolean mEndOfStream; 133 boolean mEndOfStream;
123 134
124 @Override 135 @Override
125 public void run() { 136 public void run() {
126 try { 137 try {
127 // Null out mByteBuffer, to pass buffer ownership to callback or release if done. 138 // Null out mByteBuffer, to pass buffer ownership to callback or release if done.
128 ByteBuffer buffer = mByteBuffer; 139 ByteBuffer buffer = mByteBuffer;
129 mByteBuffer = null; 140 mByteBuffer = null;
141 boolean maybeOnSucceeded = false;
130 synchronized (mNativeStreamLock) { 142 synchronized (mNativeStreamLock) {
131 if (isDoneLocked()) { 143 if (isDoneLocked()) {
132 return; 144 return;
133 } 145 }
134 if (mEndOfStream) { 146 if (mEndOfStream) {
135 mReadState = State.READING_DONE; 147 mReadState = State.READING_DONE;
136 if (maybeSucceedLocked()) { 148 maybeOnSucceeded = (mWriteState == State.WRITING_DONE);
137 return;
138 }
139 } else { 149 } else {
140 mReadState = State.WAITING_FOR_READ; 150 mReadState = State.WAITING_FOR_READ;
141 } 151 }
142 } 152 }
143 mCallback.onReadCompleted(CronetBidirectionalStream.this, mRespo nseInfo, buffer); 153 mCallback.onReadCompleted(
154 CronetBidirectionalStream.this, mResponseInfo, buffer, m EndOfStream);
155 if (maybeOnSucceeded) {
156 maybeOnSucceededOnExecutor();
157 }
144 } catch (Exception e) { 158 } catch (Exception e) {
145 onCallbackException(e); 159 onCallbackException(e);
146 } 160 }
147 } 161 }
148 } 162 }
149 163
150 private final class OnWriteCompletedRunnable implements Runnable { 164 private final class OnWriteCompletedRunnable implements Runnable {
151 // Buffer passed back from current invocation of onWriteCompleted. 165 // Buffer passed back from current invocation of onWriteCompleted.
152 ByteBuffer mByteBuffer; 166 private ByteBuffer mByteBuffer;
153 // End of stream flag from current call to write. 167 // End of stream flag from current call to write.
154 boolean mEndOfStream; 168 private final boolean mEndOfStream;
169
170 public OnWriteCompletedRunnable(ByteBuffer buffer, boolean endOfStream) {
kapishnikov 2016/04/20 22:05:42 Don't need to be public.
xunjieli 2016/04/21 14:56:24 Done.
171 mByteBuffer = buffer;
172 mEndOfStream = endOfStream;
173 }
155 174
156 @Override 175 @Override
157 public void run() { 176 public void run() {
158 try { 177 try {
159 // Null out mByteBuffer, to pass buffer ownership to callback or release if done. 178 // Null out mByteBuffer, to pass buffer ownership to callback or release if done.
160 ByteBuffer buffer = mByteBuffer; 179 ByteBuffer buffer = mByteBuffer;
161 mByteBuffer = null; 180 mByteBuffer = null;
181 boolean maybeOnSucceeded = false;
162 synchronized (mNativeStreamLock) { 182 synchronized (mNativeStreamLock) {
163 if (isDoneLocked()) { 183 if (isDoneLocked()) {
164 return; 184 return;
165 } 185 }
166 if (mEndOfStream) { 186 if (mEndOfStream) {
167 mWriteState = State.WRITING_DONE; 187 mWriteState = State.WRITING_DONE;
168 if (maybeSucceedLocked()) { 188 maybeOnSucceeded = (mReadState == State.READING_DONE);
169 return;
170 }
171 } else {
172 mWriteState = State.WAITING_FOR_WRITE;
173 } 189 }
174 } 190 }
175 mCallback.onWriteCompleted(CronetBidirectionalStream.this, mResp onseInfo, buffer); 191 mCallback.onWriteCompleted(
192 CronetBidirectionalStream.this, mResponseInfo, buffer, m EndOfStream);
193 if (maybeOnSucceeded) {
194 maybeOnSucceededOnExecutor();
195 }
176 } catch (Exception e) { 196 } catch (Exception e) {
177 onCallbackException(e); 197 onCallbackException(e);
178 } 198 }
179 } 199 }
180 } 200 }
181 201
182 CronetBidirectionalStream(CronetUrlRequestContext requestContext, String url , 202 CronetBidirectionalStream(CronetUrlRequestContext requestContext, String url ,
183 @BidirectionalStream.Builder.StreamPriority int priority, Callback c allback, 203 @BidirectionalStream.Builder.StreamPriority int priority, Callback c allback,
184 Executor executor, String httpMethod, List<Map.Entry<String, String> > requestHeaders) { 204 Executor executor, String httpMethod, List<Map.Entry<String, String> > requestHeaders,
205 boolean disableAutoFlush) {
185 mRequestContext = requestContext; 206 mRequestContext = requestContext;
186 mInitialUrl = url; 207 mInitialUrl = url;
187 mInitialPriority = convertStreamPriority(priority); 208 mInitialPriority = convertStreamPriority(priority);
188 mCallback = callback; 209 mCallback = callback;
189 mExecutor = executor; 210 mExecutor = executor;
190 mInitialMethod = httpMethod; 211 mInitialMethod = httpMethod;
191 mRequestHeaders = stringsFromHeaderList(requestHeaders); 212 mRequestHeaders = stringsFromHeaderList(requestHeaders);
213 mDisableAutoFlush = disableAutoFlush;
214 mPendingData = new LinkedList<ByteBuffer>();
kapishnikov 2016/04/20 22:05:42 Can be replaced with mPendingData = new LinkedList
xunjieli 2016/04/21 14:56:24 Done.
215 mFlushData = new LinkedList<ByteBuffer>();
192 } 216 }
193 217
194 @Override 218 @Override
195 public void start() { 219 public void start() {
196 synchronized (mNativeStreamLock) { 220 synchronized (mNativeStreamLock) {
197 if (mReadState != State.NOT_STARTED) { 221 if (mReadState != State.NOT_STARTED) {
198 throw new IllegalStateException("Stream is already started."); 222 throw new IllegalStateException("Stream is already started.");
199 } 223 }
200 try { 224 try {
201 mNativeStream = nativeCreateBidirectionalStream( 225 mNativeStream = nativeCreateBidirectionalStream(
202 mRequestContext.getUrlRequestContextAdapter()); 226 mRequestContext.getUrlRequestContextAdapter(), mDisableA utoFlush);
203 mRequestContext.onRequestStarted(); 227 mRequestContext.onRequestStarted();
204 // Non-zero startResult means an argument error. 228 // Non-zero startResult means an argument error.
205 int startResult = nativeStart(mNativeStream, mInitialUrl, mIniti alPriority, 229 int startResult = nativeStart(mNativeStream, mInitialUrl, mIniti alPriority,
206 mInitialMethod, mRequestHeaders, !doesMethodAllowWriteDa ta(mInitialMethod)); 230 mInitialMethod, mRequestHeaders, !doesMethodAllowWriteDa ta(mInitialMethod));
207 if (startResult == -1) { 231 if (startResult == -1) {
208 throw new IllegalArgumentException("Invalid http method " + mInitialMethod); 232 throw new IllegalArgumentException("Invalid http method " + mInitialMethod);
209 } 233 }
210 if (startResult > 0) { 234 if (startResult > 0) {
211 int headerPos = startResult - 1; 235 int headerPos = startResult - 1;
212 throw new IllegalArgumentException("Invalid header " 236 throw new IllegalArgumentException("Invalid header "
(...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after
246 } 270 }
247 } 271 }
248 272
249 @Override 273 @Override
250 public void write(ByteBuffer buffer, boolean endOfStream) { 274 public void write(ByteBuffer buffer, boolean endOfStream) {
251 synchronized (mNativeStreamLock) { 275 synchronized (mNativeStreamLock) {
252 Preconditions.checkDirect(buffer); 276 Preconditions.checkDirect(buffer);
253 if (!buffer.hasRemaining() && !endOfStream) { 277 if (!buffer.hasRemaining() && !endOfStream) {
254 throw new IllegalArgumentException("Empty buffer before end of s tream."); 278 throw new IllegalArgumentException("Empty buffer before end of s tream.");
255 } 279 }
256 if (mWriteState != State.WAITING_FOR_WRITE) { 280 if (mEndOfStreamWritten) {
257 throw new IllegalStateException("Unexpected write attempt."); 281 throw new IllegalArgumentException("Write after writing end of s tream.");
258 } 282 }
259 if (isDoneLocked()) { 283 if (isDoneLocked()) {
260 return; 284 return;
261 } 285 }
262 if (mOnWriteCompletedTask == null) { 286 mPendingData.add(buffer);
263 mOnWriteCompletedTask = new OnWriteCompletedRunnable(); 287 if (endOfStream) {
288 mEndOfStreamWritten = true;
264 } 289 }
265 mOnWriteCompletedTask.mEndOfStream = endOfStream; 290 if (!mDisableAutoFlush) {
266 mWriteState = State.WRITING; 291 flushLocked();
267 if (!nativeWriteData(
268 mNativeStream, buffer, buffer.position(), buffer.limit() , endOfStream)) {
269 // Still waiting on write. This is just to have consistent
270 // behavior with the other error cases.
271 mWriteState = State.WAITING_FOR_WRITE;
272 throw new IllegalArgumentException("Unable to call native write" );
273 } 292 }
274 } 293 }
275 } 294 }
276 295
277 @Override 296 @Override
297 public void flush() {
298 synchronized (mNativeStreamLock) {
299 flushLocked();
300 }
301 }
302
303 @SuppressWarnings("GuardedByChecker")
304 private void flushLocked() {
kapishnikov 2016/04/20 22:05:42 Should we call isDoneLocked() here to make sure th
xunjieli 2016/04/21 14:56:24 Done. Good catch! Not checking the adapter is defi
305 if (mPendingData.isEmpty() && mFlushData.isEmpty()) {
306 // No-op if there is nothing to write.
307 return;
308 }
309
310 // Move buffers from mPendingData to the flushing queue.
311 if (!mPendingData.isEmpty()) {
312 mFlushData.addAll(mPendingData);
313 mPendingData.clear();
314 }
315
316 if (mWriteState == State.WRITING) {
317 // If there is a write already pending, wait until onWritevCompleted is
318 // called before pushing data to the native stack.
319 return;
320 }
321 int size = mFlushData.size();
322 ByteBuffer[] buffers = new ByteBuffer[size];
323 int[] positions = new int[size];
324 int[] limits = new int[size];
325 for (int i = 0; i < size; i++) {
326 ByteBuffer buffer = mFlushData.poll();
327 buffers[i] = buffer;
328 positions[i] = buffer.position();
329 limits[i] = buffer.limit();
330 }
331 assert mFlushData.isEmpty();
332 mWriteState = State.WRITING;
333 if (!nativeWritevData(mNativeStream, buffers, positions, limits, mEndOfS treamWritten)) {
334 // Still waiting on write. This is just to have consistent
335 // behavior with the other error cases.
336 mWriteState = State.WAITING_FOR_WRITE;
kapishnikov 2016/04/20 22:05:42 Just to double check. Is it correct to change the
xunjieli 2016/04/21 14:56:24 I don't fully understand the comment. I have retai
kapishnikov 2016/04/21 15:24:04 I see. We can address it in another CL. Just two t
xunjieli 2016/04/21 16:10:27 This is a really good point! Although the exceptio
kapishnikov 2016/04/21 17:06:11 Since IllegalArgumentException is unchecked (Runti
xunjieli 2016/04/21 17:16:46 Do we want the client to recover from the error? I
kapishnikov 2016/04/21 19:41:25 Talked to Helen offline. We should revisit the mWr
xunjieli 2016/04/21 19:47:52 Great. Thanks, Andrei. I will file a crbug to addr
337 throw new IllegalArgumentException("Unable to call native writev.");
338 }
339 }
340
341 @Override
278 public void ping(PingCallback callback, Executor executor) { 342 public void ping(PingCallback callback, Executor executor) {
279 // TODO(mef): May be last thing to be implemented on Android. 343 // TODO(mef): May be last thing to be implemented on Android.
280 throw new UnsupportedOperationException("ping is not supported yet."); 344 throw new UnsupportedOperationException("ping is not supported yet.");
281 } 345 }
282 346
283 @Override 347 @Override
284 public void windowUpdate(int windowSizeIncrement) { 348 public void windowUpdate(int windowSizeIncrement) {
285 // TODO(mef): Understand the needs and semantics of this method. 349 // TODO(mef): Understand the needs and semantics of this method.
286 throw new UnsupportedOperationException("windowUpdate is not supported y et."); 350 throw new UnsupportedOperationException("windowUpdate is not supported y et.");
287 } 351 }
(...skipping 14 matching lines...) Expand all
302 synchronized (mNativeStreamLock) { 366 synchronized (mNativeStreamLock) {
303 return isDoneLocked(); 367 return isDoneLocked();
304 } 368 }
305 } 369 }
306 370
307 @GuardedBy("mNativeStreamLock") 371 @GuardedBy("mNativeStreamLock")
308 private boolean isDoneLocked() { 372 private boolean isDoneLocked() {
309 return mReadState != State.NOT_STARTED && mNativeStream == 0; 373 return mReadState != State.NOT_STARTED && mNativeStream == 0;
310 } 374 }
311 375
376 /*
377 * Runs an onSucceeded callback if both Read and Write sides are closed.
378 */
379 private void maybeOnSucceededOnExecutor() {
380 synchronized (mNativeStreamLock) {
381 if (isDoneLocked()) {
382 return;
383 }
384 if (!(mWriteState == State.WRITING_DONE && mReadState == State.READI NG_DONE)) {
385 return;
386 }
387 mReadState = mWriteState = State.SUCCESS;
388 // Destroy native stream first, so UrlRequestContext could be shut
389 // down from the listener.
390 destroyNativeStreamLocked(false);
391 }
392 try {
393 mCallback.onSucceeded(CronetBidirectionalStream.this, mResponseInfo) ;
394 } catch (Exception e) {
395 Log.e(CronetUrlRequestContext.LOG_TAG, "Exception in onSucceeded met hod", e);
396 }
397 }
398
312 @SuppressWarnings("unused") 399 @SuppressWarnings("unused")
313 @CalledByNative 400 @CalledByNative
314 private void onRequestHeadersSent() { 401 private void onStreamReady() {
kapishnikov 2016/04/20 22:05:42 Should onStreamReady() set mReadState to WAITING_F
xunjieli 2016/04/21 14:56:25 Done. Good point! I added a test as well.
315 postTaskToExecutor(new Runnable() { 402 postTaskToExecutor(new Runnable() {
316 public void run() { 403 public void run() {
317 synchronized (mNativeStreamLock) { 404 synchronized (mNativeStreamLock) {
318 if (isDoneLocked()) { 405 if (isDoneLocked()) {
319 return; 406 return;
320 } 407 }
321 if (doesMethodAllowWriteData(mInitialMethod)) { 408 if (doesMethodAllowWriteData(mInitialMethod)) {
322 mWriteState = State.WAITING_FOR_WRITE; 409 mWriteState = State.WAITING_FOR_WRITE;
323 } else { 410 } else {
324 mWriteState = State.WRITING_DONE; 411 mWriteState = State.WRITING_DONE;
325 } 412 }
326 } 413 }
327 414
328 try { 415 try {
329 mCallback.onRequestHeadersSent(CronetBidirectionalStream.thi s); 416 mCallback.onStreamReady(CronetBidirectionalStream.this);
330 } catch (Exception e) { 417 } catch (Exception e) {
331 onCallbackException(e); 418 onCallbackException(e);
332 } 419 }
333 } 420 }
334 }); 421 });
335 } 422 }
336 423
337 /** 424 /**
338 * Called when the final set of headers, after all redirects, 425 * Called when the final set of headers, after all redirects,
339 * is received. Can only be called once for each stream. 426 * is received. Can only be called once for each stream.
(...skipping 44 matching lines...) Expand 10 before | Expand all | Expand 10 after
384 } 471 }
385 byteBuffer.position(initialPosition + bytesRead); 472 byteBuffer.position(initialPosition + bytesRead);
386 assert mOnReadCompletedTask.mByteBuffer == null; 473 assert mOnReadCompletedTask.mByteBuffer == null;
387 mOnReadCompletedTask.mByteBuffer = byteBuffer; 474 mOnReadCompletedTask.mByteBuffer = byteBuffer;
388 mOnReadCompletedTask.mEndOfStream = (bytesRead == 0); 475 mOnReadCompletedTask.mEndOfStream = (bytesRead == 0);
389 postTaskToExecutor(mOnReadCompletedTask); 476 postTaskToExecutor(mOnReadCompletedTask);
390 } 477 }
391 478
392 @SuppressWarnings("unused") 479 @SuppressWarnings("unused")
393 @CalledByNative 480 @CalledByNative
394 private void onWriteCompleted( 481 private void onWritevCompleted(final ByteBuffer[] byteBuffers, int[] initial Positions,
395 final ByteBuffer byteBuffer, int initialPosition, int initialLimit) { 482 int[] initialLimits, boolean endOfStream) {
396 if (byteBuffer.position() != initialPosition || byteBuffer.limit() != in itialLimit) { 483 assert byteBuffers.length == initialPositions.length;
397 failWithException( 484 assert byteBuffers.length == initialLimits.length;
398 new CronetException("ByteBuffer modified externally during w rite", null)); 485 assert initialPositions.length == initialLimits.length;
kapishnikov 2016/04/20 22:05:42 This assert is not required because if both variab
xunjieli 2016/04/21 14:56:24 Done.
399 return; 486 synchronized (mNativeStreamLock) {
487 mWriteState = State.WAITING_FOR_WRITE;
488 // Flush if there is anything in the flush queue mFlushData.
489 if (!mFlushData.isEmpty()) {
490 flushLocked();
491 }
400 } 492 }
401 // Current implementation always writes the complete buffer. 493 for (int i = 0; i < byteBuffers.length; i++) {
402 byteBuffer.position(byteBuffer.limit()); 494 ByteBuffer buffer = byteBuffers[i];
403 assert mOnWriteCompletedTask.mByteBuffer == null; 495 if (buffer.position() != initialPositions[i] || buffer.limit() != in itialLimits[i]) {
404 mOnWriteCompletedTask.mByteBuffer = byteBuffer; 496 failWithException(
405 postTaskToExecutor(mOnWriteCompletedTask); 497 new CronetException("ByteBuffer modified externally duri ng write", null));
498 return;
499 }
500 // Current implementation always writes the complete buffer.
501 buffer.position(buffer.limit());
502 postTaskToExecutor(new OnWriteCompletedRunnable(buffer, endOfStream) );
503 }
406 } 504 }
407 505
408 @SuppressWarnings("unused") 506 @SuppressWarnings("unused")
409 @CalledByNative 507 @CalledByNative
410 private void onResponseTrailersReceived(String[] trailers) { 508 private void onResponseTrailersReceived(String[] trailers) {
411 final UrlResponseInfo.HeaderBlock trailersBlock = 509 final UrlResponseInfo.HeaderBlock trailersBlock =
412 new UrlResponseInfo.HeaderBlock(headersListFromStrings(trailers) ); 510 new UrlResponseInfo.HeaderBlock(headersListFromStrings(trailers) );
413 postTaskToExecutor(new Runnable() { 511 postTaskToExecutor(new Runnable() {
414 public void run() { 512 public void run() {
415 synchronized (mNativeStreamLock) { 513 synchronized (mNativeStreamLock) {
(...skipping 78 matching lines...) Expand 10 before | Expand all | Expand 10 after
494 case Builder.STREAM_PRIORITY_MEDIUM: 592 case Builder.STREAM_PRIORITY_MEDIUM:
495 return RequestPriority.MEDIUM; 593 return RequestPriority.MEDIUM;
496 case Builder.STREAM_PRIORITY_HIGHEST: 594 case Builder.STREAM_PRIORITY_HIGHEST:
497 return RequestPriority.HIGHEST; 595 return RequestPriority.HIGHEST;
498 default: 596 default:
499 throw new IllegalArgumentException("Invalid stream priority."); 597 throw new IllegalArgumentException("Invalid stream priority.");
500 } 598 }
501 } 599 }
502 600
503 /** 601 /**
504 * Checks whether reading and writing are done.
505 * @return false if either reading or writing is not done. If both reading a nd writing
506 * are done, then posts cleanup task and returns true.
507 */
508 @GuardedBy("mNativeStreamLock")
509 private boolean maybeSucceedLocked() {
510 if (mReadState != State.READING_DONE || mWriteState != State.WRITING_DON E) {
511 return false;
512 }
513
514 mReadState = mWriteState = State.SUCCESS;
515 postTaskToExecutor(new Runnable() {
516 public void run() {
517 synchronized (mNativeStreamLock) {
518 if (isDoneLocked()) {
519 return;
520 }
521 // Destroy native stream first, so UrlRequestContext could b e shut
522 // down from the listener.
523 destroyNativeStreamLocked(false);
524 }
525 try {
526 mCallback.onSucceeded(CronetBidirectionalStream.this, mRespo nseInfo);
527 } catch (Exception e) {
528 Log.e(CronetUrlRequestContext.LOG_TAG, "Exception in onSucce eded method", e);
529 }
530 }
531 });
532 return true;
533 }
534
535 /**
536 * Posts task to application Executor. Used for callbacks 602 * Posts task to application Executor. Used for callbacks
537 * and other tasks that should not be executed on network thread. 603 * and other tasks that should not be executed on network thread.
538 */ 604 */
539 private void postTaskToExecutor(Runnable task) { 605 private void postTaskToExecutor(Runnable task) {
540 try { 606 try {
541 mExecutor.execute(task); 607 mExecutor.execute(task);
542 } catch (RejectedExecutionException failException) { 608 } catch (RejectedExecutionException failException) {
543 Log.e(CronetUrlRequestContext.LOG_TAG, "Exception posting task to ex ecutor", 609 Log.e(CronetUrlRequestContext.LOG_TAG, "Exception posting task to ex ecutor",
544 failException); 610 failException);
545 // If posting a task throws an exception, then there is no choice 611 // If posting a task throws an exception, then there is no choice
(...skipping 25 matching lines...) Expand all
571 mRequestContext.onRequestDestroyed(); 637 mRequestContext.onRequestDestroyed();
572 if (mOnDestroyedCallbackForTesting != null) { 638 if (mOnDestroyedCallbackForTesting != null) {
573 mOnDestroyedCallbackForTesting.run(); 639 mOnDestroyedCallbackForTesting.run();
574 } 640 }
575 } 641 }
576 642
577 /** 643 /**
578 * Fails the stream with an exception. Only called on the Executor. 644 * Fails the stream with an exception. Only called on the Executor.
579 */ 645 */
580 private void failWithExceptionOnExecutor(CronetException e) { 646 private void failWithExceptionOnExecutor(CronetException e) {
581 // Do not call into listener if request is complete. 647 // Do not call into mCallback if request is complete.
582 synchronized (mNativeStreamLock) { 648 synchronized (mNativeStreamLock) {
583 if (isDoneLocked()) { 649 if (isDoneLocked()) {
584 return; 650 return;
585 } 651 }
586 mReadState = mWriteState = State.ERROR; 652 mReadState = mWriteState = State.ERROR;
587 destroyNativeStreamLocked(false); 653 destroyNativeStreamLocked(false);
588 } 654 }
589 try { 655 try {
590 mCallback.onFailed(this, mResponseInfo, e); 656 mCallback.onFailed(this, mResponseInfo, e);
591 } catch (Exception failException) { 657 } catch (Exception failException) {
(...skipping 19 matching lines...) Expand all
611 */ 677 */
612 private void failWithException(final CronetException exception) { 678 private void failWithException(final CronetException exception) {
613 postTaskToExecutor(new Runnable() { 679 postTaskToExecutor(new Runnable() {
614 public void run() { 680 public void run() {
615 failWithExceptionOnExecutor(exception); 681 failWithExceptionOnExecutor(exception);
616 } 682 }
617 }); 683 });
618 } 684 }
619 685
620 // Native methods are implemented in cronet_bidirectional_stream_adapter.cc. 686 // Native methods are implemented in cronet_bidirectional_stream_adapter.cc.
621 private native long nativeCreateBidirectionalStream(long urlRequestContextAd apter); 687 private native long nativeCreateBidirectionalStream(
688 long urlRequestContextAdapter, boolean disableAutoFlush);
622 689
623 @NativeClassQualifiedName("CronetBidirectionalStreamAdapter") 690 @NativeClassQualifiedName("CronetBidirectionalStreamAdapter")
624 private native int nativeStart(long nativePtr, String url, int priority, Str ing method, 691 private native int nativeStart(long nativePtr, String url, int priority, Str ing method,
625 String[] headers, boolean endOfStream); 692 String[] headers, boolean endOfStream);
626 693
627 @NativeClassQualifiedName("CronetBidirectionalStreamAdapter") 694 @NativeClassQualifiedName("CronetBidirectionalStreamAdapter")
628 private native boolean nativeReadData( 695 private native boolean nativeReadData(
629 long nativePtr, ByteBuffer byteBuffer, int position, int limit); 696 long nativePtr, ByteBuffer byteBuffer, int position, int limit);
630 697
631 @NativeClassQualifiedName("CronetBidirectionalStreamAdapter") 698 @NativeClassQualifiedName("CronetBidirectionalStreamAdapter")
632 private native boolean nativeWriteData( 699 private native boolean nativeWritevData(long nativePtr, ByteBuffer[] buffers , int[] positions,
633 long nativePtr, ByteBuffer byteBuffer, int position, int limit, bool ean endOfStream); 700 int[] limits, boolean endOfStream);
634 701
635 @NativeClassQualifiedName("CronetBidirectionalStreamAdapter") 702 @NativeClassQualifiedName("CronetBidirectionalStreamAdapter")
636 private native void nativeDestroy(long nativePtr, boolean sendOnCanceled); 703 private native void nativeDestroy(long nativePtr, boolean sendOnCanceled);
637 } 704 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698