OLD | NEW |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 package org.chromium.net.urlconnection; | 5 package org.chromium.net.urlconnection; |
6 | 6 |
7 import org.chromium.net.UploadDataProvider; | 7 import org.chromium.net.UploadDataProvider; |
8 import org.chromium.net.UploadDataSink; | 8 import org.chromium.net.UploadDataSink; |
9 | 9 |
10 import java.io.IOException; | 10 import java.io.IOException; |
11 import java.net.HttpRetryException; | 11 import java.net.HttpRetryException; |
12 import java.nio.ByteBuffer; | 12 import java.nio.ByteBuffer; |
13 | 13 |
14 /** | 14 /** |
15 * An implementation of {@link java.io.OutputStream} to send data to a server, | 15 * An implementation of {@link java.io.OutputStream} to send data to a server, |
16 * when {@link CronetHttpURLConnection#setChunkedStreamingMode} is used. | 16 * when {@link CronetHttpURLConnection#setChunkedStreamingMode} is used. |
17 * This implementation does not buffer the entire request body in memory. | 17 * This implementation does not buffer the entire request body in memory. |
18 * It does not support rewind. Note that {@link #write} should only be called | 18 * It does not support rewind. Note that {@link #write} should only be called |
19 * from the thread on which the {@link #mConnection} is created. | 19 * from the thread on which the {@link #mConnection} is created. |
20 */ | 20 */ |
21 final class CronetChunkedOutputStream extends CronetOutputStream { | 21 final class CronetChunkedOutputStream extends CronetOutputStream { |
22 private final CronetHttpURLConnection mConnection; | 22 private final CronetHttpURLConnection mConnection; |
23 private final MessageLoop mMessageLoop; | 23 private final MessageLoop mMessageLoop; |
24 private final ByteBuffer mBuffer; | 24 private final ByteBuffer mBuffer; |
25 private final UploadDataProvider mUploadDataProvider = new UploadDataProvide
rImpl(); | 25 private final UploadDataProvider mUploadDataProvider = new UploadDataProvide
rImpl(); |
26 private long mBytesWritten; | |
27 private boolean mLastChunk = false; | 26 private boolean mLastChunk = false; |
28 | 27 |
29 /** | 28 /** |
30 * Package protected constructor. | 29 * Package protected constructor. |
31 * @param connection The CronetHttpURLConnection object. | 30 * @param connection The CronetHttpURLConnection object. |
32 * @param contentLength The content length of the request body. Non-zero for | 31 * @param chunkLength The chunk length of the request body in bytes. It must |
33 * non-chunked upload. | 32 * be a positive number. |
34 */ | 33 */ |
35 CronetChunkedOutputStream( | 34 CronetChunkedOutputStream( |
36 CronetHttpURLConnection connection, int chunkLength, MessageLoop mes
sageLoop) { | 35 CronetHttpURLConnection connection, int chunkLength, MessageLoop mes
sageLoop) { |
37 if (connection == null) { | 36 if (connection == null) { |
38 throw new NullPointerException(); | 37 throw new NullPointerException(); |
39 } | 38 } |
40 if (chunkLength <= 0) { | 39 if (chunkLength <= 0) { |
41 throw new IllegalArgumentException("chunkLength should be greater th
an 0"); | 40 throw new IllegalArgumentException("chunkLength should be greater th
an 0"); |
42 } | 41 } |
43 mBuffer = ByteBuffer.allocate(chunkLength); | 42 mBuffer = ByteBuffer.allocate(chunkLength); |
44 mConnection = connection; | 43 mConnection = connection; |
45 mMessageLoop = messageLoop; | 44 mMessageLoop = messageLoop; |
46 mBytesWritten = 0; | |
47 } | 45 } |
48 | 46 |
49 @Override | 47 @Override |
50 public void write(int oneByte) throws IOException { | 48 public void write(int oneByte) throws IOException { |
51 checkNotClosed(); | 49 ensureBufferHasRemaining(); |
52 while (mBuffer.position() == mBuffer.limit()) { | |
53 // Wait until buffer is consumed. | |
54 mMessageLoop.loop(); | |
55 } | |
56 mBuffer.put((byte) oneByte); | 50 mBuffer.put((byte) oneByte); |
57 mBytesWritten++; | |
58 } | 51 } |
59 | 52 |
60 @Override | 53 @Override |
61 public void write(byte[] buffer, int offset, int count) throws IOException { | 54 public void write(byte[] buffer, int offset, int count) throws IOException { |
62 checkNotClosed(); | 55 checkNotClosed(); |
63 if (buffer.length - offset < count || offset < 0 || count < 0) { | 56 if (buffer.length - offset < count || offset < 0 || count < 0) { |
64 throw new IndexOutOfBoundsException(); | 57 throw new IndexOutOfBoundsException(); |
65 } | 58 } |
66 if (count == 0) { | |
67 return; | |
68 } | |
69 int toSend = count; | 59 int toSend = count; |
70 // TODO(xunjieli): refactor commond code with CronetFixedModeOutputStrea
m. | |
71 while (toSend > 0) { | 60 while (toSend > 0) { |
72 if (mBuffer.position() == mBuffer.limit()) { | 61 int sent = Math.min(toSend, mBuffer.remaining()); |
73 checkNotClosed(); | |
74 // Wait until buffer is consumed. | |
75 mMessageLoop.loop(); | |
76 } | |
77 int sent = Math.min(toSend, mBuffer.limit() - mBuffer.position()); | |
78 mBuffer.put(buffer, offset + count - toSend, sent); | 62 mBuffer.put(buffer, offset + count - toSend, sent); |
79 toSend -= sent; | 63 toSend -= sent; |
| 64 // Upload mBuffer now if an entire chunk is written. |
| 65 ensureBufferHasRemaining(); |
80 } | 66 } |
81 mBytesWritten += count; | |
82 } | 67 } |
83 | 68 |
84 @Override | 69 @Override |
85 public void close() throws IOException { | 70 public void close() throws IOException { |
86 super.close(); | 71 super.close(); |
87 // Last chunk is written. | 72 if (!mLastChunk) { |
88 mLastChunk = true; | 73 // Consumer can only call close() when message loop is not running. |
| 74 // Set mLastChunk to be true and flip mBuffer to upload its contents
. |
| 75 mLastChunk = true; |
| 76 mBuffer.flip(); |
| 77 } |
89 } | 78 } |
90 | 79 |
91 // Below are CronetOutputStream implementations: | 80 // Below are CronetOutputStream implementations: |
92 | 81 |
93 @Override | 82 @Override |
94 void setConnected() throws IOException { | 83 void setConnected() throws IOException { |
95 // Do nothing. | 84 // Do nothing. |
96 } | 85 } |
97 | 86 |
98 @Override | 87 @Override |
99 void checkReceivedEnoughContent() throws IOException { | 88 void checkReceivedEnoughContent() throws IOException { |
100 // Do nothing. | 89 // Do nothing. |
101 } | 90 } |
102 | 91 |
103 @Override | 92 @Override |
104 UploadDataProvider getUploadDataProvider() { | 93 UploadDataProvider getUploadDataProvider() { |
105 return mUploadDataProvider; | 94 return mUploadDataProvider; |
106 } | 95 } |
107 | 96 |
108 private class UploadDataProviderImpl extends UploadDataProvider { | 97 private class UploadDataProviderImpl extends UploadDataProvider { |
109 @Override | 98 @Override |
110 public long getLength() { | 99 public long getLength() { |
111 return -1; | 100 return -1; |
112 } | 101 } |
113 | 102 |
114 @Override | 103 @Override |
115 public void read(final UploadDataSink uploadDataSink, final ByteBuffer b
yteBuffer) { | 104 public void read(final UploadDataSink uploadDataSink, final ByteBuffer b
yteBuffer) { |
116 final int availableSpace = byteBuffer.remaining(); | 105 if (byteBuffer.remaining() >= mBuffer.remaining()) { |
117 if (availableSpace < mBuffer.position()) { | |
118 // byteBuffer does not have enough capacity, so only put a porti
on | |
119 // of mBuffer in it. | |
120 byteBuffer.put(mBuffer.array(), 0, availableSpace); | |
121 mBuffer.position(availableSpace); | |
122 // Move remaining buffer to the head of the buffer for use in th
e | |
123 // next read call. | |
124 mBuffer.compact(); | |
125 uploadDataSink.onReadSucceeded(false); | |
126 } else { | |
127 // byteBuffer has enough capacity to hold the content of mBuffer
. | |
128 mBuffer.flip(); | |
129 byteBuffer.put(mBuffer); | 106 byteBuffer.put(mBuffer); |
130 // Reuse this buffer. | |
131 mBuffer.clear(); | 107 mBuffer.clear(); |
132 uploadDataSink.onReadSucceeded(mLastChunk); | 108 uploadDataSink.onReadSucceeded(mLastChunk); |
133 if (!mLastChunk) { | 109 if (!mLastChunk) { |
134 // Quit message loop so embedder can write more data. | 110 // Quit message loop so embedder can write more data. |
135 mMessageLoop.quit(); | 111 mMessageLoop.quit(); |
136 } | 112 } |
| 113 } else { |
| 114 int oldLimit = mBuffer.limit(); |
| 115 mBuffer.limit(mBuffer.position() + byteBuffer.remaining()); |
| 116 byteBuffer.put(mBuffer); |
| 117 mBuffer.limit(oldLimit); |
| 118 uploadDataSink.onReadSucceeded(false); |
137 } | 119 } |
138 } | 120 } |
139 | 121 |
140 @Override | 122 @Override |
141 public void rewind(UploadDataSink uploadDataSink) { | 123 public void rewind(UploadDataSink uploadDataSink) { |
142 uploadDataSink.onRewindError( | 124 uploadDataSink.onRewindError( |
143 new HttpRetryException("Cannot retry streamed Http body", -1
)); | 125 new HttpRetryException("Cannot retry streamed Http body", -1
)); |
144 } | 126 } |
145 } | 127 } |
| 128 |
| 129 /** |
| 130 * If {@code mBuffer} is full, wait until it is consumed and there is |
| 131 * space to write more data to it. |
| 132 */ |
| 133 private void ensureBufferHasRemaining() throws IOException { |
| 134 if (!mBuffer.hasRemaining()) { |
| 135 uploadBufferInternal(); |
| 136 } |
| 137 } |
| 138 |
| 139 /** |
| 140 * Helper function to upload {@code mBuffer} to the native stack. This |
| 141 * function blocks until {@code mBuffer} is consumed and there is space to |
| 142 * write more data. |
| 143 */ |
| 144 private void uploadBufferInternal() throws IOException { |
| 145 checkNotClosed(); |
| 146 mBuffer.flip(); |
| 147 mMessageLoop.loop(); |
| 148 } |
146 } | 149 } |
OLD | NEW |