| OLD | NEW |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package org.chromium.net.urlconnection; | 5 package org.chromium.net.urlconnection; |
| 6 | 6 |
| 7 import org.chromium.net.UploadDataProvider; | 7 import org.chromium.net.UploadDataProvider; |
| 8 import org.chromium.net.UploadDataSink; | 8 import org.chromium.net.UploadDataSink; |
| 9 | 9 |
| 10 import java.io.IOException; | 10 import java.io.IOException; |
| 11 import java.net.HttpRetryException; | 11 import java.net.HttpRetryException; |
| 12 import java.nio.ByteBuffer; | 12 import java.nio.ByteBuffer; |
| 13 | 13 |
| 14 /** | 14 /** |
| 15 * An implementation of {@link java.io.OutputStream} to send data to a server, | 15 * An implementation of {@link java.io.OutputStream} to send data to a server, |
| 16 * when {@link CronetHttpURLConnection#setChunkedStreamingMode} is used. | 16 * when {@link CronetHttpURLConnection#setChunkedStreamingMode} is used. |
| 17 * This implementation does not buffer the entire request body in memory. | 17 * This implementation does not buffer the entire request body in memory. |
| 18 * It does not support rewind. Note that {@link #write} should only be called | 18 * It does not support rewind. Note that {@link #write} should only be called |
| 19 * from the thread on which the {@link #mConnection} is created. | 19 * from the thread on which the {@link #mConnection} is created. |
| 20 */ | 20 */ |
| 21 final class CronetChunkedOutputStream extends CronetOutputStream { | 21 final class CronetChunkedOutputStream extends CronetOutputStream { |
| 22 private final CronetHttpURLConnection mConnection; | 22 private final CronetHttpURLConnection mConnection; |
| 23 private final MessageLoop mMessageLoop; | 23 private final MessageLoop mMessageLoop; |
| 24 private final ByteBuffer mBuffer; | 24 private final ByteBuffer mBuffer; |
| 25 private final UploadDataProvider mUploadDataProvider = new UploadDataProvide
rImpl(); | 25 private final UploadDataProvider mUploadDataProvider = new UploadDataProvide
rImpl(); |
| 26 private long mBytesWritten; | |
| 27 private boolean mLastChunk = false; | 26 private boolean mLastChunk = false; |
| 28 | 27 |
| 29 /** | 28 /** |
| 30 * Package protected constructor. | 29 * Package protected constructor. |
| 31 * @param connection The CronetHttpURLConnection object. | 30 * @param connection The CronetHttpURLConnection object. |
| 32 * @param contentLength The content length of the request body. Non-zero for | 31 * @param chunkLength The chunk length of the request body in bytes. It must |
| 33 * non-chunked upload. | 32 * be a positive number. |
| 34 */ | 33 */ |
| 35 CronetChunkedOutputStream( | 34 CronetChunkedOutputStream( |
| 36 CronetHttpURLConnection connection, int chunkLength, MessageLoop mes
sageLoop) { | 35 CronetHttpURLConnection connection, int chunkLength, MessageLoop mes
sageLoop) { |
| 37 if (connection == null) { | 36 if (connection == null) { |
| 38 throw new NullPointerException(); | 37 throw new NullPointerException(); |
| 39 } | 38 } |
| 40 if (chunkLength <= 0) { | 39 if (chunkLength <= 0) { |
| 41 throw new IllegalArgumentException("chunkLength should be greater th
an 0"); | 40 throw new IllegalArgumentException("chunkLength should be greater th
an 0"); |
| 42 } | 41 } |
| 43 mBuffer = ByteBuffer.allocate(chunkLength); | 42 mBuffer = ByteBuffer.allocate(chunkLength); |
| 44 mConnection = connection; | 43 mConnection = connection; |
| 45 mMessageLoop = messageLoop; | 44 mMessageLoop = messageLoop; |
| 46 mBytesWritten = 0; | |
| 47 } | 45 } |
| 48 | 46 |
| 49 @Override | 47 @Override |
| 50 public void write(int oneByte) throws IOException { | 48 public void write(int oneByte) throws IOException { |
| 51 checkNotClosed(); | 49 ensureBufferHasRemaining(); |
| 52 while (mBuffer.position() == mBuffer.limit()) { | |
| 53 // Wait until buffer is consumed. | |
| 54 mMessageLoop.loop(); | |
| 55 } | |
| 56 mBuffer.put((byte) oneByte); | 50 mBuffer.put((byte) oneByte); |
| 57 mBytesWritten++; | |
| 58 } | 51 } |
| 59 | 52 |
| 60 @Override | 53 @Override |
| 61 public void write(byte[] buffer, int offset, int count) throws IOException { | 54 public void write(byte[] buffer, int offset, int count) throws IOException { |
| 62 checkNotClosed(); | 55 checkNotClosed(); |
| 63 if (buffer.length - offset < count || offset < 0 || count < 0) { | 56 if (buffer.length - offset < count || offset < 0 || count < 0) { |
| 64 throw new IndexOutOfBoundsException(); | 57 throw new IndexOutOfBoundsException(); |
| 65 } | 58 } |
| 66 if (count == 0) { | |
| 67 return; | |
| 68 } | |
| 69 int toSend = count; | 59 int toSend = count; |
| 70 // TODO(xunjieli): refactor commond code with CronetFixedModeOutputStrea
m. | |
| 71 while (toSend > 0) { | 60 while (toSend > 0) { |
| 72 if (mBuffer.position() == mBuffer.limit()) { | 61 int sent = Math.min(toSend, mBuffer.remaining()); |
| 73 checkNotClosed(); | |
| 74 // Wait until buffer is consumed. | |
| 75 mMessageLoop.loop(); | |
| 76 } | |
| 77 int sent = Math.min(toSend, mBuffer.limit() - mBuffer.position()); | |
| 78 mBuffer.put(buffer, offset + count - toSend, sent); | 62 mBuffer.put(buffer, offset + count - toSend, sent); |
| 79 toSend -= sent; | 63 toSend -= sent; |
| 64 // Upload mBuffer now if an entire chunk is written. |
| 65 ensureBufferHasRemaining(); |
| 80 } | 66 } |
| 81 mBytesWritten += count; | |
| 82 } | 67 } |
| 83 | 68 |
| 84 @Override | 69 @Override |
| 85 public void close() throws IOException { | 70 public void close() throws IOException { |
| 86 super.close(); | 71 super.close(); |
| 87 // Last chunk is written. | 72 if (!mLastChunk) { |
| 88 mLastChunk = true; | 73 // Consumer can only call close() when message loop is not running. |
| 74 // Set mLastChunk to be true and flip mBuffer to upload its contents
. |
| 75 mLastChunk = true; |
| 76 mBuffer.flip(); |
| 77 } |
| 89 } | 78 } |
| 90 | 79 |
| 91 // Below are CronetOutputStream implementations: | 80 // Below are CronetOutputStream implementations: |
| 92 | 81 |
| 93 @Override | 82 @Override |
| 94 void setConnected() throws IOException { | 83 void setConnected() throws IOException { |
| 95 // Do nothing. | 84 // Do nothing. |
| 96 } | 85 } |
| 97 | 86 |
| 98 @Override | 87 @Override |
| 99 void checkReceivedEnoughContent() throws IOException { | 88 void checkReceivedEnoughContent() throws IOException { |
| 100 // Do nothing. | 89 // Do nothing. |
| 101 } | 90 } |
| 102 | 91 |
| 103 @Override | 92 @Override |
| 104 UploadDataProvider getUploadDataProvider() { | 93 UploadDataProvider getUploadDataProvider() { |
| 105 return mUploadDataProvider; | 94 return mUploadDataProvider; |
| 106 } | 95 } |
| 107 | 96 |
| 108 private class UploadDataProviderImpl extends UploadDataProvider { | 97 private class UploadDataProviderImpl extends UploadDataProvider { |
| 109 @Override | 98 @Override |
| 110 public long getLength() { | 99 public long getLength() { |
| 111 return -1; | 100 return -1; |
| 112 } | 101 } |
| 113 | 102 |
| 114 @Override | 103 @Override |
| 115 public void read(final UploadDataSink uploadDataSink, final ByteBuffer b
yteBuffer) { | 104 public void read(final UploadDataSink uploadDataSink, final ByteBuffer b
yteBuffer) { |
| 116 final int availableSpace = byteBuffer.remaining(); | 105 if (byteBuffer.remaining() >= mBuffer.remaining()) { |
| 117 if (availableSpace < mBuffer.position()) { | |
| 118 // byteBuffer does not have enough capacity, so only put a porti
on | |
| 119 // of mBuffer in it. | |
| 120 byteBuffer.put(mBuffer.array(), 0, availableSpace); | |
| 121 mBuffer.position(availableSpace); | |
| 122 // Move remaining buffer to the head of the buffer for use in th
e | |
| 123 // next read call. | |
| 124 mBuffer.compact(); | |
| 125 uploadDataSink.onReadSucceeded(false); | |
| 126 } else { | |
| 127 // byteBuffer has enough capacity to hold the content of mBuffer
. | |
| 128 mBuffer.flip(); | |
| 129 byteBuffer.put(mBuffer); | 106 byteBuffer.put(mBuffer); |
| 130 // Reuse this buffer. | |
| 131 mBuffer.clear(); | 107 mBuffer.clear(); |
| 132 uploadDataSink.onReadSucceeded(mLastChunk); | 108 uploadDataSink.onReadSucceeded(mLastChunk); |
| 133 if (!mLastChunk) { | 109 if (!mLastChunk) { |
| 134 // Quit message loop so embedder can write more data. | 110 // Quit message loop so embedder can write more data. |
| 135 mMessageLoop.quit(); | 111 mMessageLoop.quit(); |
| 136 } | 112 } |
| 113 } else { |
| 114 int oldLimit = mBuffer.limit(); |
| 115 mBuffer.limit(mBuffer.position() + byteBuffer.remaining()); |
| 116 byteBuffer.put(mBuffer); |
| 117 mBuffer.limit(oldLimit); |
| 118 uploadDataSink.onReadSucceeded(false); |
| 137 } | 119 } |
| 138 } | 120 } |
| 139 | 121 |
| 140 @Override | 122 @Override |
| 141 public void rewind(UploadDataSink uploadDataSink) { | 123 public void rewind(UploadDataSink uploadDataSink) { |
| 142 uploadDataSink.onRewindError( | 124 uploadDataSink.onRewindError( |
| 143 new HttpRetryException("Cannot retry streamed Http body", -1
)); | 125 new HttpRetryException("Cannot retry streamed Http body", -1
)); |
| 144 } | 126 } |
| 145 } | 127 } |
| 128 |
| 129 /** |
| 130 * If {@code mBuffer} is full, wait until it is consumed and there is |
| 131 * space to write more data to it. |
| 132 */ |
| 133 private void ensureBufferHasRemaining() throws IOException { |
| 134 if (!mBuffer.hasRemaining()) { |
| 135 uploadBufferInternal(); |
| 136 } |
| 137 } |
| 138 |
| 139 /** |
| 140 * Helper function to upload {@code mBuffer} to the native stack. This |
| 141 * function blocks until {@code mBuffer} is consumed and there is space to |
| 142 * write more data. |
| 143 */ |
| 144 private void uploadBufferInternal() throws IOException { |
| 145 checkNotClosed(); |
| 146 mBuffer.flip(); |
| 147 mMessageLoop.loop(); |
| 148 } |
| 146 } | 149 } |
| OLD | NEW |