Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(38)

Side by Side Diff: components/cronet/android/java/src/org/chromium/net/CronetBidirectionalStream.java

Issue 1856073002: Coalesce small buffers in net::BidirectionalStream (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Fix javadoc Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package org.chromium.net; 5 package org.chromium.net;
6 6
7 import org.chromium.base.Log; 7 import org.chromium.base.Log;
8 import org.chromium.base.VisibleForTesting; 8 import org.chromium.base.VisibleForTesting;
9 import org.chromium.base.annotations.CalledByNative; 9 import org.chromium.base.annotations.CalledByNative;
10 import org.chromium.base.annotations.JNINamespace; 10 import org.chromium.base.annotations.JNINamespace;
11 import org.chromium.base.annotations.NativeClassQualifiedName; 11 import org.chromium.base.annotations.NativeClassQualifiedName;
12 12
13 import java.nio.ByteBuffer; 13 import java.nio.ByteBuffer;
14 import java.util.AbstractMap; 14 import java.util.AbstractMap;
15 import java.util.ArrayList; 15 import java.util.ArrayList;
16 import java.util.Arrays; 16 import java.util.Arrays;
17 import java.util.LinkedList;
17 import java.util.List; 18 import java.util.List;
18 import java.util.Map; 19 import java.util.Map;
19 import java.util.concurrent.Executor; 20 import java.util.concurrent.Executor;
20 import java.util.concurrent.RejectedExecutionException; 21 import java.util.concurrent.RejectedExecutionException;
21 22
22 import javax.annotation.concurrent.GuardedBy; 23 import javax.annotation.concurrent.GuardedBy;
23 24
24 /** 25 /**
25 * {@link BidirectionalStream} implementation using Chromium network stack. 26 * {@link BidirectionalStream} implementation using Chromium network stack.
26 * All @CalledByNative methods are called on the native network thread 27 * All @CalledByNative methods are called on the native network thread
(...skipping 20 matching lines...) Expand all
47 /* Reading from the remote, {@code onReadCompleted()} callback will be c alled when done. */ 48 /* Reading from the remote, {@code onReadCompleted()} callback will be c alled when done. */
48 READING, 49 READING,
49 /* There is no more data to read and stream is half-closed by the remote side. */ 50 /* There is no more data to read and stream is half-closed by the remote side. */
50 READING_DONE, 51 READING_DONE,
51 /* Stream is canceled. */ 52 /* Stream is canceled. */
52 CANCELED, 53 CANCELED,
53 /* Error has occured, stream is closed. */ 54 /* Error has occured, stream is closed. */
54 ERROR, 55 ERROR,
55 /* Reading and writing are done, and the stream is closed successfully. */ 56 /* Reading and writing are done, and the stream is closed successfully. */
56 SUCCESS, 57 SUCCESS,
57 /* Waiting for {@code write()} to be called. */ 58 /* Waiting for {@code nativeWritevData()} to be called. */
58 WAITING_FOR_WRITE, 59 WAITING_FOR_WRITE,
59 /* Writing to the remote, {@code onWriteCompleted()} callback will be ca lled when done. */ 60 /* Writing to the remote, {@code onWritevCompleted()} callback will be c alled when done. */
60 WRITING, 61 WRITING,
61 /* There is no more data to write and stream is half-closed by the local side. */ 62 /* There is no more data to write and stream is half-closed by the local side. */
62 WRITING_DONE, 63 WRITING_DONE,
63 } 64 }
64 65
65 private final CronetUrlRequestContext mRequestContext; 66 private final CronetUrlRequestContext mRequestContext;
66 private final Executor mExecutor; 67 private final Executor mExecutor;
67 private final Callback mCallback; 68 private final Callback mCallback;
68 private final String mInitialUrl; 69 private final String mInitialUrl;
69 private final int mInitialPriority; 70 private final int mInitialPriority;
70 private final String mInitialMethod; 71 private final String mInitialMethod;
71 private final String mRequestHeaders[]; 72 private final String mRequestHeaders[];
73 private final boolean mDisableAutoFlush;
72 74
73 /* 75 /*
74 * Synchronizes access to mNativeStream, mReadState and mWriteState. 76 * Synchronizes access to mNativeStream, mReadState and mWriteState.
75 */ 77 */
76 private final Object mNativeStreamLock = new Object(); 78 private final Object mNativeStreamLock = new Object();
77 79
80 @GuardedBy("mNativeStreamLock")
81 // Pending write data.
82 private LinkedList<ByteBuffer> mPendingData;
83
84 @GuardedBy("mNativeStreamLock")
85 // Flush data queue that should be pushed to the native stack when the previ ous
86 // nativeWritevData completes.
87 private LinkedList<ByteBuffer> mFlushData;
88
89 @GuardedBy("mNativeStreamLock")
90 // Whether an end-of-stream flag is passed in through write().
91 private boolean mEndOfStreamWritten;
92
78 /* Native BidirectionalStream object, owned by CronetBidirectionalStream. */ 93 /* Native BidirectionalStream object, owned by CronetBidirectionalStream. */
79 @GuardedBy("mNativeStreamLock") 94 @GuardedBy("mNativeStreamLock")
80 private long mNativeStream; 95 private long mNativeStream;
81 96
82 /** 97 /**
83 * Read state is tracking reading flow. 98 * Read state is tracking reading flow.
84 * / <--- READING <--- \ 99 * / <--- READING <--- \
85 * | | 100 * | |
86 * \ / 101 * \ /
87 * NOT_STARTED -> STARTED --> WAITING_FOR_READ -> READING_DONE -> SUCCESS 102 * NOT_STARTED -> STARTED --> WAITING_FOR_READ -> READING_DONE -> SUCCESS
88 */ 103 */
89 @GuardedBy("mNativeStreamLock") 104 @GuardedBy("mNativeStreamLock")
90 private State mReadState = State.NOT_STARTED; 105 private State mReadState = State.NOT_STARTED;
91 106
92 /** 107 /**
93 * Write state is tracking writing flow. 108 * Write state is tracking writing flow.
94 * / <--- WRITING <--- \ 109 * / <--- WRITING <--- \
95 * | | 110 * | |
96 * \ / 111 * \ /
97 * NOT_STARTED -> STARTED --> WAITING_FOR_WRITE -> WRITING_DONE -> SUCCESS 112 * NOT_STARTED -> STARTED --> WAITING_FOR_WRITE -> WRITING_DONE -> SUCCESS
98 */ 113 */
99 @GuardedBy("mNativeStreamLock") 114 @GuardedBy("mNativeStreamLock")
100 private State mWriteState = State.NOT_STARTED; 115 private State mWriteState = State.NOT_STARTED;
101 116
117 // Only modified on the network thread.
102 private UrlResponseInfo mResponseInfo; 118 private UrlResponseInfo mResponseInfo;
103 119
104 /* 120 /*
105 * OnReadCompleted callback is repeatedly invoked when each read is complete d, so it 121 * OnReadCompleted callback is repeatedly invoked when each read is complete d, so it
106 * is cached as a member variable. 122 * is cached as a member variable.
107 */ 123 */
124 // Only modified on the network thread.
108 private OnReadCompletedRunnable mOnReadCompletedTask; 125 private OnReadCompletedRunnable mOnReadCompletedTask;
109 126
110 /*
111 * OnWriteCompleted callback is repeatedly invoked when each write is comple ted, so it
112 * is cached as a member variable.
113 */
114 private OnWriteCompletedRunnable mOnWriteCompletedTask;
115
116 private Runnable mOnDestroyedCallbackForTesting; 127 private Runnable mOnDestroyedCallbackForTesting;
117 128
118 private final class OnReadCompletedRunnable implements Runnable { 129 private final class OnReadCompletedRunnable implements Runnable {
119 // Buffer passed back from current invocation of onReadCompleted. 130 // Buffer passed back from current invocation of onReadCompleted.
120 ByteBuffer mByteBuffer; 131 ByteBuffer mByteBuffer;
121 // End of stream flag from current invocation of onReadCompleted. 132 // End of stream flag from current invocation of onReadCompleted.
122 boolean mEndOfStream; 133 boolean mEndOfStream;
123 134
124 @Override 135 @Override
125 public void run() { 136 public void run() {
126 try { 137 try {
127 // Null out mByteBuffer, to pass buffer ownership to callback or release if done. 138 // Null out mByteBuffer, to pass buffer ownership to callback or release if done.
128 ByteBuffer buffer = mByteBuffer; 139 ByteBuffer buffer = mByteBuffer;
129 mByteBuffer = null; 140 mByteBuffer = null;
141 boolean maybeOnSucceeded = false;
130 synchronized (mNativeStreamLock) { 142 synchronized (mNativeStreamLock) {
131 if (isDoneLocked()) { 143 if (isDoneLocked()) {
132 return; 144 return;
133 } 145 }
134 if (mEndOfStream) { 146 if (mEndOfStream) {
135 mReadState = State.READING_DONE; 147 mReadState = State.READING_DONE;
136 if (maybeSucceedLocked()) { 148 maybeOnSucceeded = (mWriteState == State.WRITING_DONE);
137 return;
138 }
139 } else { 149 } else {
140 mReadState = State.WAITING_FOR_READ; 150 mReadState = State.WAITING_FOR_READ;
141 } 151 }
142 } 152 }
143 mCallback.onReadCompleted(CronetBidirectionalStream.this, mRespo nseInfo, buffer); 153 mCallback.onReadCompleted(
154 CronetBidirectionalStream.this, mResponseInfo, buffer, m EndOfStream);
155 if (maybeOnSucceeded) {
156 maybeOnSucceededOnExecutor();
157 }
144 } catch (Exception e) { 158 } catch (Exception e) {
145 onCallbackException(e); 159 onCallbackException(e);
146 } 160 }
147 } 161 }
148 } 162 }
149 163
150 private final class OnWriteCompletedRunnable implements Runnable { 164 private final class OnWriteCompletedRunnable implements Runnable {
151 // Buffer passed back from current invocation of onWriteCompleted. 165 // Buffer passed back from current invocation of onWriteCompleted.
152 ByteBuffer mByteBuffer; 166 private ByteBuffer mByteBuffer;
153 // End of stream flag from current call to write. 167 // End of stream flag from current call to write.
154 boolean mEndOfStream; 168 private final boolean mEndOfStream;
169
170 OnWriteCompletedRunnable(ByteBuffer buffer, boolean endOfStream) {
171 mByteBuffer = buffer;
172 mEndOfStream = endOfStream;
173 }
155 174
156 @Override 175 @Override
157 public void run() { 176 public void run() {
158 try { 177 try {
159 // Null out mByteBuffer, to pass buffer ownership to callback or release if done. 178 // Null out mByteBuffer, to pass buffer ownership to callback or release if done.
160 ByteBuffer buffer = mByteBuffer; 179 ByteBuffer buffer = mByteBuffer;
161 mByteBuffer = null; 180 mByteBuffer = null;
181 boolean maybeOnSucceeded = false;
162 synchronized (mNativeStreamLock) { 182 synchronized (mNativeStreamLock) {
163 if (isDoneLocked()) { 183 if (isDoneLocked()) {
164 return; 184 return;
165 } 185 }
166 if (mEndOfStream) { 186 if (mEndOfStream) {
167 mWriteState = State.WRITING_DONE; 187 mWriteState = State.WRITING_DONE;
168 if (maybeSucceedLocked()) { 188 maybeOnSucceeded = (mReadState == State.READING_DONE);
169 return;
170 }
171 } else {
172 mWriteState = State.WAITING_FOR_WRITE;
173 } 189 }
174 } 190 }
175 mCallback.onWriteCompleted(CronetBidirectionalStream.this, mResp onseInfo, buffer); 191 mCallback.onWriteCompleted(
192 CronetBidirectionalStream.this, mResponseInfo, buffer, m EndOfStream);
193 if (maybeOnSucceeded) {
194 maybeOnSucceededOnExecutor();
195 }
176 } catch (Exception e) { 196 } catch (Exception e) {
177 onCallbackException(e); 197 onCallbackException(e);
178 } 198 }
179 } 199 }
180 } 200 }
181 201
182 CronetBidirectionalStream(CronetUrlRequestContext requestContext, String url , 202 CronetBidirectionalStream(CronetUrlRequestContext requestContext, String url ,
183 @BidirectionalStream.Builder.StreamPriority int priority, Callback c allback, 203 @BidirectionalStream.Builder.StreamPriority int priority, Callback c allback,
184 Executor executor, String httpMethod, List<Map.Entry<String, String> > requestHeaders) { 204 Executor executor, String httpMethod, List<Map.Entry<String, String> > requestHeaders,
205 boolean disableAutoFlush) {
185 mRequestContext = requestContext; 206 mRequestContext = requestContext;
186 mInitialUrl = url; 207 mInitialUrl = url;
187 mInitialPriority = convertStreamPriority(priority); 208 mInitialPriority = convertStreamPriority(priority);
188 mCallback = callback; 209 mCallback = callback;
189 mExecutor = executor; 210 mExecutor = executor;
190 mInitialMethod = httpMethod; 211 mInitialMethod = httpMethod;
191 mRequestHeaders = stringsFromHeaderList(requestHeaders); 212 mRequestHeaders = stringsFromHeaderList(requestHeaders);
213 mDisableAutoFlush = disableAutoFlush;
214 mPendingData = new LinkedList<>();
215 mFlushData = new LinkedList<>();
192 } 216 }
193 217
194 @Override 218 @Override
195 public void start() { 219 public void start() {
196 synchronized (mNativeStreamLock) { 220 synchronized (mNativeStreamLock) {
197 if (mReadState != State.NOT_STARTED) { 221 if (mReadState != State.NOT_STARTED) {
198 throw new IllegalStateException("Stream is already started."); 222 throw new IllegalStateException("Stream is already started.");
199 } 223 }
200 try { 224 try {
201 mNativeStream = nativeCreateBidirectionalStream( 225 mNativeStream = nativeCreateBidirectionalStream(
202 mRequestContext.getUrlRequestContextAdapter()); 226 mRequestContext.getUrlRequestContextAdapter(), mDisableA utoFlush);
203 mRequestContext.onRequestStarted(); 227 mRequestContext.onRequestStarted();
204 // Non-zero startResult means an argument error. 228 // Non-zero startResult means an argument error.
205 int startResult = nativeStart(mNativeStream, mInitialUrl, mIniti alPriority, 229 int startResult = nativeStart(mNativeStream, mInitialUrl, mIniti alPriority,
206 mInitialMethod, mRequestHeaders, !doesMethodAllowWriteDa ta(mInitialMethod)); 230 mInitialMethod, mRequestHeaders, !doesMethodAllowWriteDa ta(mInitialMethod));
207 if (startResult == -1) { 231 if (startResult == -1) {
208 throw new IllegalArgumentException("Invalid http method " + mInitialMethod); 232 throw new IllegalArgumentException("Invalid http method " + mInitialMethod);
209 } 233 }
210 if (startResult > 0) { 234 if (startResult > 0) {
211 int headerPos = startResult - 1; 235 int headerPos = startResult - 1;
212 throw new IllegalArgumentException("Invalid header " 236 throw new IllegalArgumentException("Invalid header "
(...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after
246 } 270 }
247 } 271 }
248 272
249 @Override 273 @Override
250 public void write(ByteBuffer buffer, boolean endOfStream) { 274 public void write(ByteBuffer buffer, boolean endOfStream) {
251 synchronized (mNativeStreamLock) { 275 synchronized (mNativeStreamLock) {
252 Preconditions.checkDirect(buffer); 276 Preconditions.checkDirect(buffer);
253 if (!buffer.hasRemaining() && !endOfStream) { 277 if (!buffer.hasRemaining() && !endOfStream) {
254 throw new IllegalArgumentException("Empty buffer before end of s tream."); 278 throw new IllegalArgumentException("Empty buffer before end of s tream.");
255 } 279 }
256 if (mWriteState != State.WAITING_FOR_WRITE) { 280 if (mEndOfStreamWritten) {
257 throw new IllegalStateException("Unexpected write attempt."); 281 throw new IllegalArgumentException("Write after writing end of s tream.");
258 } 282 }
259 if (isDoneLocked()) { 283 if (isDoneLocked()) {
260 return; 284 return;
261 } 285 }
262 if (mOnWriteCompletedTask == null) { 286 mPendingData.add(buffer);
263 mOnWriteCompletedTask = new OnWriteCompletedRunnable(); 287 if (endOfStream) {
288 mEndOfStreamWritten = true;
264 } 289 }
265 mOnWriteCompletedTask.mEndOfStream = endOfStream; 290 if (!mDisableAutoFlush) {
266 mWriteState = State.WRITING; 291 flushLocked();
267 if (!nativeWriteData(
268 mNativeStream, buffer, buffer.position(), buffer.limit() , endOfStream)) {
269 // Still waiting on write. This is just to have consistent
270 // behavior with the other error cases.
271 mWriteState = State.WAITING_FOR_WRITE;
272 throw new IllegalArgumentException("Unable to call native write" );
273 } 292 }
274 } 293 }
275 } 294 }
276 295
277 @Override 296 @Override
297 public void flush() {
298 synchronized (mNativeStreamLock) {
299 flushLocked();
300 }
301 }
302
303 @SuppressWarnings("GuardedByChecker")
304 private void flushLocked() {
305 if (isDoneLocked()) {
306 return;
307 }
308 if (mPendingData.isEmpty() && mFlushData.isEmpty()) {
309 // No-op if there is nothing to write.
310 return;
311 }
312
313 // Move buffers from mPendingData to the flushing queue.
314 if (!mPendingData.isEmpty()) {
315 mFlushData.addAll(mPendingData);
316 mPendingData.clear();
317 }
318
319 if (mWriteState == State.WRITING) {
320 // If there is a write already pending, wait until onWritevCompleted is
321 // called before pushing data to the native stack.
322 return;
323 }
324 int size = mFlushData.size();
325 ByteBuffer[] buffers = new ByteBuffer[size];
326 int[] positions = new int[size];
327 int[] limits = new int[size];
328 for (int i = 0; i < size; i++) {
329 ByteBuffer buffer = mFlushData.poll();
330 buffers[i] = buffer;
331 positions[i] = buffer.position();
332 limits[i] = buffer.limit();
333 }
334 assert mFlushData.isEmpty();
335 mWriteState = State.WRITING;
336 if (!nativeWritevData(mNativeStream, buffers, positions, limits, mEndOfS treamWritten)) {
337 // Still waiting on write. This is just to have consistent
338 // behavior with the other error cases.
339 mWriteState = State.WAITING_FOR_WRITE;
340 throw new IllegalArgumentException("Unable to call native writev.");
341 }
342 }
343
344 @Override
278 public void ping(PingCallback callback, Executor executor) { 345 public void ping(PingCallback callback, Executor executor) {
279 // TODO(mef): May be last thing to be implemented on Android. 346 // TODO(mef): May be last thing to be implemented on Android.
280 throw new UnsupportedOperationException("ping is not supported yet."); 347 throw new UnsupportedOperationException("ping is not supported yet.");
281 } 348 }
282 349
283 @Override 350 @Override
284 public void windowUpdate(int windowSizeIncrement) { 351 public void windowUpdate(int windowSizeIncrement) {
285 // TODO(mef): Understand the needs and semantics of this method. 352 // TODO(mef): Understand the needs and semantics of this method.
286 throw new UnsupportedOperationException("windowUpdate is not supported y et."); 353 throw new UnsupportedOperationException("windowUpdate is not supported y et.");
287 } 354 }
(...skipping 14 matching lines...) Expand all
302 synchronized (mNativeStreamLock) { 369 synchronized (mNativeStreamLock) {
303 return isDoneLocked(); 370 return isDoneLocked();
304 } 371 }
305 } 372 }
306 373
307 @GuardedBy("mNativeStreamLock") 374 @GuardedBy("mNativeStreamLock")
308 private boolean isDoneLocked() { 375 private boolean isDoneLocked() {
309 return mReadState != State.NOT_STARTED && mNativeStream == 0; 376 return mReadState != State.NOT_STARTED && mNativeStream == 0;
310 } 377 }
311 378
379 /*
380 * Runs an onSucceeded callback if both Read and Write sides are closed.
381 */
382 private void maybeOnSucceededOnExecutor() {
383 synchronized (mNativeStreamLock) {
384 if (isDoneLocked()) {
385 return;
386 }
387 if (!(mWriteState == State.WRITING_DONE && mReadState == State.READI NG_DONE)) {
388 return;
389 }
390 mReadState = mWriteState = State.SUCCESS;
391 // Destroy native stream first, so UrlRequestContext could be shut
392 // down from the listener.
393 destroyNativeStreamLocked(false);
394 }
395 try {
396 mCallback.onSucceeded(CronetBidirectionalStream.this, mResponseInfo) ;
397 } catch (Exception e) {
398 Log.e(CronetUrlRequestContext.LOG_TAG, "Exception in onSucceeded met hod", e);
399 }
400 }
401
312 @SuppressWarnings("unused") 402 @SuppressWarnings("unused")
313 @CalledByNative 403 @CalledByNative
314 private void onRequestHeadersSent() { 404 private void onStreamReady() {
315 postTaskToExecutor(new Runnable() { 405 postTaskToExecutor(new Runnable() {
316 public void run() { 406 public void run() {
317 synchronized (mNativeStreamLock) { 407 synchronized (mNativeStreamLock) {
318 if (isDoneLocked()) { 408 if (isDoneLocked()) {
319 return; 409 return;
320 } 410 }
321 if (doesMethodAllowWriteData(mInitialMethod)) { 411 if (doesMethodAllowWriteData(mInitialMethod)) {
322 mWriteState = State.WAITING_FOR_WRITE; 412 mWriteState = State.WAITING_FOR_WRITE;
413 mReadState = State.WAITING_FOR_READ;
323 } else { 414 } else {
324 mWriteState = State.WRITING_DONE; 415 mWriteState = State.WRITING_DONE;
325 } 416 }
326 } 417 }
327 418
328 try { 419 try {
329 mCallback.onRequestHeadersSent(CronetBidirectionalStream.thi s); 420 mCallback.onStreamReady(CronetBidirectionalStream.this);
330 } catch (Exception e) { 421 } catch (Exception e) {
331 onCallbackException(e); 422 onCallbackException(e);
332 } 423 }
333 } 424 }
334 }); 425 });
335 } 426 }
336 427
337 /** 428 /**
338 * Called when the final set of headers, after all redirects, 429 * Called when the final set of headers, after all redirects,
339 * is received. Can only be called once for each stream. 430 * is received. Can only be called once for each stream.
(...skipping 44 matching lines...) Expand 10 before | Expand all | Expand 10 after
384 } 475 }
385 byteBuffer.position(initialPosition + bytesRead); 476 byteBuffer.position(initialPosition + bytesRead);
386 assert mOnReadCompletedTask.mByteBuffer == null; 477 assert mOnReadCompletedTask.mByteBuffer == null;
387 mOnReadCompletedTask.mByteBuffer = byteBuffer; 478 mOnReadCompletedTask.mByteBuffer = byteBuffer;
388 mOnReadCompletedTask.mEndOfStream = (bytesRead == 0); 479 mOnReadCompletedTask.mEndOfStream = (bytesRead == 0);
389 postTaskToExecutor(mOnReadCompletedTask); 480 postTaskToExecutor(mOnReadCompletedTask);
390 } 481 }
391 482
392 @SuppressWarnings("unused") 483 @SuppressWarnings("unused")
393 @CalledByNative 484 @CalledByNative
394 private void onWriteCompleted( 485 private void onWritevCompleted(final ByteBuffer[] byteBuffers, int[] initial Positions,
395 final ByteBuffer byteBuffer, int initialPosition, int initialLimit) { 486 int[] initialLimits, boolean endOfStream) {
396 if (byteBuffer.position() != initialPosition || byteBuffer.limit() != in itialLimit) { 487 assert byteBuffers.length == initialPositions.length;
397 failWithException( 488 assert byteBuffers.length == initialLimits.length;
398 new CronetException("ByteBuffer modified externally during w rite", null)); 489 synchronized (mNativeStreamLock) {
399 return; 490 mWriteState = State.WAITING_FOR_WRITE;
491 // Flush if there is anything in the flush queue mFlushData.
492 if (!mFlushData.isEmpty()) {
493 flushLocked();
494 }
400 } 495 }
401 // Current implementation always writes the complete buffer. 496 for (int i = 0; i < byteBuffers.length; i++) {
402 byteBuffer.position(byteBuffer.limit()); 497 ByteBuffer buffer = byteBuffers[i];
403 assert mOnWriteCompletedTask.mByteBuffer == null; 498 if (buffer.position() != initialPositions[i] || buffer.limit() != in itialLimits[i]) {
404 mOnWriteCompletedTask.mByteBuffer = byteBuffer; 499 failWithException(
405 postTaskToExecutor(mOnWriteCompletedTask); 500 new CronetException("ByteBuffer modified externally duri ng write", null));
501 return;
502 }
503 // Current implementation always writes the complete buffer.
504 buffer.position(buffer.limit());
505 postTaskToExecutor(new OnWriteCompletedRunnable(buffer, endOfStream) );
506 }
406 } 507 }
407 508
408 @SuppressWarnings("unused") 509 @SuppressWarnings("unused")
409 @CalledByNative 510 @CalledByNative
410 private void onResponseTrailersReceived(String[] trailers) { 511 private void onResponseTrailersReceived(String[] trailers) {
411 final UrlResponseInfo.HeaderBlock trailersBlock = 512 final UrlResponseInfo.HeaderBlock trailersBlock =
412 new UrlResponseInfo.HeaderBlock(headersListFromStrings(trailers) ); 513 new UrlResponseInfo.HeaderBlock(headersListFromStrings(trailers) );
413 postTaskToExecutor(new Runnable() { 514 postTaskToExecutor(new Runnable() {
414 public void run() { 515 public void run() {
415 synchronized (mNativeStreamLock) { 516 synchronized (mNativeStreamLock) {
(...skipping 78 matching lines...) Expand 10 before | Expand all | Expand 10 after
494 case Builder.STREAM_PRIORITY_MEDIUM: 595 case Builder.STREAM_PRIORITY_MEDIUM:
495 return RequestPriority.MEDIUM; 596 return RequestPriority.MEDIUM;
496 case Builder.STREAM_PRIORITY_HIGHEST: 597 case Builder.STREAM_PRIORITY_HIGHEST:
497 return RequestPriority.HIGHEST; 598 return RequestPriority.HIGHEST;
498 default: 599 default:
499 throw new IllegalArgumentException("Invalid stream priority."); 600 throw new IllegalArgumentException("Invalid stream priority.");
500 } 601 }
501 } 602 }
502 603
503 /** 604 /**
504 * Checks whether reading and writing are done.
505 * @return false if either reading or writing is not done. If both reading a nd writing
506 * are done, then posts cleanup task and returns true.
507 */
508 @GuardedBy("mNativeStreamLock")
509 private boolean maybeSucceedLocked() {
510 if (mReadState != State.READING_DONE || mWriteState != State.WRITING_DON E) {
511 return false;
512 }
513
514 mReadState = mWriteState = State.SUCCESS;
515 postTaskToExecutor(new Runnable() {
516 public void run() {
517 synchronized (mNativeStreamLock) {
518 if (isDoneLocked()) {
519 return;
520 }
521 // Destroy native stream first, so UrlRequestContext could b e shut
522 // down from the listener.
523 destroyNativeStreamLocked(false);
524 }
525 try {
526 mCallback.onSucceeded(CronetBidirectionalStream.this, mRespo nseInfo);
527 } catch (Exception e) {
528 Log.e(CronetUrlRequestContext.LOG_TAG, "Exception in onSucce eded method", e);
529 }
530 }
531 });
532 return true;
533 }
534
535 /**
536 * Posts task to application Executor. Used for callbacks 605 * Posts task to application Executor. Used for callbacks
537 * and other tasks that should not be executed on network thread. 606 * and other tasks that should not be executed on network thread.
538 */ 607 */
539 private void postTaskToExecutor(Runnable task) { 608 private void postTaskToExecutor(Runnable task) {
540 try { 609 try {
541 mExecutor.execute(task); 610 mExecutor.execute(task);
542 } catch (RejectedExecutionException failException) { 611 } catch (RejectedExecutionException failException) {
543 Log.e(CronetUrlRequestContext.LOG_TAG, "Exception posting task to ex ecutor", 612 Log.e(CronetUrlRequestContext.LOG_TAG, "Exception posting task to ex ecutor",
544 failException); 613 failException);
545 // If posting a task throws an exception, then there is no choice 614 // If posting a task throws an exception, then there is no choice
(...skipping 25 matching lines...) Expand all
571 mRequestContext.onRequestDestroyed(); 640 mRequestContext.onRequestDestroyed();
572 if (mOnDestroyedCallbackForTesting != null) { 641 if (mOnDestroyedCallbackForTesting != null) {
573 mOnDestroyedCallbackForTesting.run(); 642 mOnDestroyedCallbackForTesting.run();
574 } 643 }
575 } 644 }
576 645
577 /** 646 /**
578 * Fails the stream with an exception. Only called on the Executor. 647 * Fails the stream with an exception. Only called on the Executor.
579 */ 648 */
580 private void failWithExceptionOnExecutor(CronetException e) { 649 private void failWithExceptionOnExecutor(CronetException e) {
581 // Do not call into listener if request is complete. 650 // Do not call into mCallback if request is complete.
582 synchronized (mNativeStreamLock) { 651 synchronized (mNativeStreamLock) {
583 if (isDoneLocked()) { 652 if (isDoneLocked()) {
584 return; 653 return;
585 } 654 }
586 mReadState = mWriteState = State.ERROR; 655 mReadState = mWriteState = State.ERROR;
587 destroyNativeStreamLocked(false); 656 destroyNativeStreamLocked(false);
588 } 657 }
589 try { 658 try {
590 mCallback.onFailed(this, mResponseInfo, e); 659 mCallback.onFailed(this, mResponseInfo, e);
591 } catch (Exception failException) { 660 } catch (Exception failException) {
(...skipping 19 matching lines...) Expand all
611 */ 680 */
612 private void failWithException(final CronetException exception) { 681 private void failWithException(final CronetException exception) {
613 postTaskToExecutor(new Runnable() { 682 postTaskToExecutor(new Runnable() {
614 public void run() { 683 public void run() {
615 failWithExceptionOnExecutor(exception); 684 failWithExceptionOnExecutor(exception);
616 } 685 }
617 }); 686 });
618 } 687 }
619 688
620 // Native methods are implemented in cronet_bidirectional_stream_adapter.cc. 689 // Native methods are implemented in cronet_bidirectional_stream_adapter.cc.
621 private native long nativeCreateBidirectionalStream(long urlRequestContextAd apter); 690 private native long nativeCreateBidirectionalStream(
691 long urlRequestContextAdapter, boolean disableAutoFlush);
622 692
623 @NativeClassQualifiedName("CronetBidirectionalStreamAdapter") 693 @NativeClassQualifiedName("CronetBidirectionalStreamAdapter")
624 private native int nativeStart(long nativePtr, String url, int priority, Str ing method, 694 private native int nativeStart(long nativePtr, String url, int priority, Str ing method,
625 String[] headers, boolean endOfStream); 695 String[] headers, boolean endOfStream);
626 696
627 @NativeClassQualifiedName("CronetBidirectionalStreamAdapter") 697 @NativeClassQualifiedName("CronetBidirectionalStreamAdapter")
628 private native boolean nativeReadData( 698 private native boolean nativeReadData(
629 long nativePtr, ByteBuffer byteBuffer, int position, int limit); 699 long nativePtr, ByteBuffer byteBuffer, int position, int limit);
630 700
631 @NativeClassQualifiedName("CronetBidirectionalStreamAdapter") 701 @NativeClassQualifiedName("CronetBidirectionalStreamAdapter")
632 private native boolean nativeWriteData( 702 private native boolean nativeWritevData(long nativePtr, ByteBuffer[] buffers , int[] positions,
633 long nativePtr, ByteBuffer byteBuffer, int position, int limit, bool ean endOfStream); 703 int[] limits, boolean endOfStream);
634 704
635 @NativeClassQualifiedName("CronetBidirectionalStreamAdapter") 705 @NativeClassQualifiedName("CronetBidirectionalStreamAdapter")
636 private native void nativeDestroy(long nativePtr, boolean sendOnCanceled); 706 private native void nativeDestroy(long nativePtr, boolean sendOnCanceled);
637 } 707 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698