Index: sdk/lib/io/http_impl.dart |
diff --git a/sdk/lib/io/http_impl.dart b/sdk/lib/io/http_impl.dart |
index 5bc5b97cae902e5b01162391de71b196bc53bc52..7990e05ffeb4ae4de9ef0e413f9dd0d693ba9ec1 100644 |
--- a/sdk/lib/io/http_impl.dart |
+++ b/sdk/lib/io/http_impl.dart |
@@ -4,7 +4,7 @@ |
part of dart.io; |
-const int _HEADERS_BUFFER_SIZE = 8 * 1024; |
+const int _OUTGOING_BUFFER_SIZE = 8 * 1024; |
class _HttpIncoming extends Stream<List<int>> { |
final int _transferLength; |
@@ -406,28 +406,25 @@ class _HttpClientResponse |
} |
-abstract class _HttpOutboundMessage<T> implements IOSink { |
+abstract class _HttpOutboundMessage<T> extends _IOSinkImpl { |
// Used to mark when the body should be written. This is used for HEAD |
// requests and in error handling. |
bool _ignoreBody = false; |
bool _headersWritten = false; |
- bool _asGZip = false; |
- IOSink _headersSink; |
- IOSink _dataSink; |
- |
- final _HttpOutgoing _outgoing; |
final Uri _uri; |
+ final _HttpOutgoing _outgoing; |
final _HttpHeaders headers; |
_HttpOutboundMessage(this._uri, |
String protocolVersion, |
- _HttpOutgoing outgoing) |
- : _outgoing = outgoing, |
- _headersSink = new IOSink(outgoing, encoding: ASCII), |
+ this._outgoing) |
+ : super(new _HttpOutboundConsumer(), null), |
headers = new _HttpHeaders(protocolVersion) { |
- _dataSink = new IOSink(new _HttpOutboundConsumer(this)); |
+ _outgoing.outbound = this; |
+ (_target as _HttpOutboundConsumer).outbound = this; |
+ _encodingMutable = false; |
} |
int get contentLength => headers.contentLength; |
@@ -450,60 +447,30 @@ abstract class _HttpOutboundMessage<T> implements IOSink { |
return Encoding.getByName(charset); |
} |
- void set encoding(Encoding value) { |
- throw new StateError("IOSink encoding is not mutable"); |
- } |
- |
- void write(Object obj) { |
- if (!_headersWritten) _dataSink.encoding = encoding; |
- _dataSink.write(obj); |
- } |
- |
- void writeAll(Iterable objects, [String separator = ""]) { |
- if (!_headersWritten) _dataSink.encoding = encoding; |
- _dataSink.writeAll(objects, separator); |
- } |
- |
- void writeln([Object obj = ""]) { |
- if (!_headersWritten) _dataSink.encoding = encoding; |
- _dataSink.writeln(obj); |
- } |
- |
- void writeCharCode(int charCode) { |
- if (!_headersWritten) _dataSink.encoding = encoding; |
- _dataSink.writeCharCode(charCode); |
- } |
- |
void add(List<int> data) { |
if (data.length == 0) return; |
- _dataSink.add(data); |
+ super.add(data); |
} |
- void addError(error, [StackTrace stackTrace]) => |
- _dataSink.addError(error, stackTrace); |
- |
- Future<T> addStream(Stream<List<int>> stream) => _dataSink.addStream(stream); |
- |
- Future flush() => _dataSink.flush(); |
- |
- Future close() => _dataSink.close(); |
- |
- Future<T> get done => _dataSink.done; |
- |
- Future _writeHeaders({bool drainRequest: true}) { |
- void write() { |
+ Future _writeHeaders({bool drainRequest: true, |
+ bool setOutgoing: true}) { |
+ // TODO(ajohnsen): Avoid excessive futures in this method. |
+ write() { |
try { |
_writeHeader(); |
- } catch (error) { |
+ } catch (_) { |
// Headers too large. |
throw new HttpException( |
- "Headers size exceeded the of '$_HEADERS_BUFFER_SIZE' bytes"); |
+ "Headers size exceeded the of '$_OUTGOING_BUFFER_SIZE'" |
+ " bytes"); |
} |
+ return this; |
} |
- if (_headersWritten) return new Future.value(); |
+ if (_headersWritten) return new Future.value(this); |
_headersWritten = true; |
- _dataSink.encoding = encoding; |
+ Future drainFuture; |
bool isServerSide = this is _HttpResponse; |
+ bool gzip = false; |
if (isServerSide) { |
var response = this; |
if (headers.chunkedTransferEncoding) { |
@@ -516,42 +483,51 @@ abstract class _HttpOutboundMessage<T> implements IOSink { |
.any((encoding) => encoding.trim().toLowerCase() == "gzip") && |
contentEncoding == null) { |
headers.set(HttpHeaders.CONTENT_ENCODING, "gzip"); |
- _asGZip = true; |
+ gzip = true; |
} |
} |
if (drainRequest && !response._httpRequest._incoming.hasSubscriber) { |
- return response._httpRequest.drain() |
- // TODO(ajohnsen): Timeout on drain? |
- .catchError((_) {}) // Ignore errors. |
- .then((_) => write()); |
+ drainFuture = response._httpRequest.drain().catchError((_) {}); |
} |
+ } else { |
+ drainRequest = false; |
+ } |
+ if (_ignoreBody) { |
+ return new Future.sync(write).then((_) => _outgoing.close()); |
+ } |
+ if (setOutgoing) { |
+ int contentLength = headers.contentLength; |
+ if (headers.chunkedTransferEncoding) { |
+ _outgoing.chunked = true; |
+ if (gzip) _outgoing.gzip = true; |
+ } else if (contentLength >= 0) { |
+ _outgoing.contentLength = contentLength; |
+ } |
+ } |
+ if (drainFuture != null) { |
+ return drainFuture.then((_) => write()); |
} |
return new Future.sync(write); |
} |
Future _addStream(Stream<List<int>> stream) { |
- return _writeHeaders() |
- .then((_) { |
- int contentLength = headers.contentLength; |
- if (_ignoreBody) { |
- stream.drain().catchError((_) {}); |
- return _headersSink.close(); |
- } |
- stream = stream.transform(const _BufferTransformer()); |
- if (headers.chunkedTransferEncoding) { |
- if (_asGZip) { |
- stream = stream.transform(GZIP.encoder); |
- } |
- stream = stream.transform(const _ChunkedTransformer()); |
- } else if (contentLength >= 0) { |
- stream = stream.transform( |
- new _ContentLengthValidator(contentLength, _uri)); |
- } |
- return _headersSink.addStream(stream); |
- }); |
+ // TODO(ajohnsen): Merge into _HttpOutgoing. |
+ if (_ignoreBody) { |
+ stream.drain().catchError((_) {}); |
+ return _writeHeaders(); |
+ } |
+ if (_headersWritten) { |
+ return _outgoing.addStream(stream); |
+ } else { |
+ var completer = new Completer.sync(); |
+ var future = _outgoing.addStream(stream, completer.future); |
+ _writeHeaders().then(completer.complete); |
+ return future; |
+ } |
} |
Future _close() { |
+ // TODO(ajohnsen): Merge into _HttpOutgoing. |
if (!_headersWritten) { |
if (!_ignoreBody && headers.contentLength == -1) { |
// If no body was written, _ignoreBody is false (it's not a HEAD |
@@ -560,14 +536,14 @@ abstract class _HttpOutboundMessage<T> implements IOSink { |
headers.chunkedTransferEncoding = false; |
headers.contentLength = 0; |
} else if (!_ignoreBody && headers.contentLength > 0) { |
- _headersSink.addError(new HttpException( |
- "No content while contentLength was specified to be greater " |
- "than 0: ${headers.contentLength}.", |
- uri: _uri)); |
- return _headersSink.done; |
+ return _outgoing.addStream( |
+ new Stream.fromFuture(new Future.error(new HttpException( |
+ "No content even though contentLength was specified to be " |
+ "greater than 0: ${headers.contentLength}.", |
+ uri: _uri)))); |
} |
} |
- return _writeHeaders().whenComplete(_headersSink.close); |
+ return _writeHeaders().whenComplete(_outgoing.close); |
} |
void _writeHeader(); |
@@ -575,146 +551,13 @@ abstract class _HttpOutboundMessage<T> implements IOSink { |
class _HttpOutboundConsumer implements StreamConsumer { |
- final _HttpOutboundMessage _outbound; |
- StreamController _controller; |
- StreamSubscription _subscription; |
- Completer _closeCompleter = new Completer(); |
- Completer _completer; |
- bool _socketError = false; |
- |
- _HttpOutboundConsumer(this._outbound); |
- |
- void _cancel() { |
- if (_subscription != null) { |
- StreamSubscription subscription = _subscription; |
- _subscription = null; |
- subscription.cancel(); |
- } |
- } |
- |
- bool _ignoreError(error) |
- => (error is SocketException || error is TlsException) && |
- _outbound is HttpResponse; |
- |
- _ensureController() { |
- if (_controller != null) return; |
- _controller = new StreamController(sync: true, |
- onPause: () => _subscription.pause(), |
- onResume: () => _subscription.resume(), |
- onListen: () => _subscription.resume(), |
- onCancel: _cancel); |
- _outbound._addStream(_controller.stream) |
- .then((_) { |
- _cancel(); |
- _done(); |
- _closeCompleter.complete(_outbound); |
- }, |
- onError: (error, [StackTrace stackTrace]) { |
- _socketError = true; |
- if (_ignoreError(error)) { |
- _cancel(); |
- _done(); |
- _closeCompleter.complete(_outbound); |
- } else { |
- if (!_done(error)) { |
- _closeCompleter.completeError(error, stackTrace); |
- } |
- } |
- }); |
- } |
+ // TODO(ajohnsen): Once _addStream and _close is merged into _HttpOutgoing, |
+ // this class can be removed. |
+ _HttpOutboundMessage outbound; |
+ _HttpOutboundConsumer(); |
- bool _done([error, StackTrace stackTrace]) { |
- if (_completer == null) return false; |
- if (error != null) { |
- _completer.completeError(error, stackTrace); |
- } else { |
- _completer.complete(_outbound); |
- } |
- _completer = null; |
- return true; |
- } |
- |
- Future addStream(var stream) { |
- // If we saw a socket error subscribe and then cancel, to ignore any data |
- // on the stream. |
- if (_socketError) { |
- stream.listen(null).cancel(); |
- return new Future.value(_outbound); |
- } |
- _completer = new Completer(); |
- _subscription = stream.listen( |
- (data) => _controller.add(data), |
- onDone: _done, |
- onError: (e, s) => _controller.addError(e, s), |
- cancelOnError: true); |
- // Pause the first request. |
- if (_controller == null) _subscription.pause(); |
- _ensureController(); |
- return _completer.future; |
- } |
- |
- Future close() { |
- Future closeOutbound() { |
- if (_socketError) return new Future.value(_outbound); |
- return _outbound._close() |
- .catchError((_) {}, test: _ignoreError) |
- .then((_) => _outbound); |
- } |
- if (_controller == null) return closeOutbound(); |
- _controller.close(); |
- return _closeCompleter.future.then((_) => closeOutbound()); |
- } |
-} |
- |
- |
-class _BufferTransformerSink implements EventSink<List<int>> { |
- static const int MIN_CHUNK_SIZE = 4 * 1024; |
- static const int MAX_BUFFER_SIZE = 16 * 1024; |
- |
- final BytesBuilder _builder = new BytesBuilder(); |
- final EventSink<List<int>> _outSink; |
- |
- _BufferTransformerSink(this._outSink); |
- |
- void add(List<int> data) { |
- // TODO(ajohnsen): Use timeout? |
- if (data.length == 0) return; |
- if (data.length >= MIN_CHUNK_SIZE) { |
- flush(); |
- _outSink.add(data); |
- } else { |
- _builder.add(data); |
- if (_builder.length >= MAX_BUFFER_SIZE) { |
- flush(); |
- } |
- } |
- } |
- |
- void addError(Object error, [StackTrace stackTrace]) { |
- _outSink.addError(error, stackTrace); |
- } |
- |
- void close() { |
- flush(); |
- _outSink.close(); |
- } |
- |
- void flush() { |
- if (_builder.length > 0) { |
- // takeBytes will clear the BytesBuilder. |
- _outSink.add(_builder.takeBytes()); |
- } |
- } |
-} |
- |
-class _BufferTransformer implements StreamTransformer<List<int>, List<int>> { |
- const _BufferTransformer(); |
- |
- Stream<List<int>> bind(Stream<List<int>> stream) { |
- return new Stream<List<int>>.eventTransformed( |
- stream, |
- (EventSink outSink) => new _BufferTransformerSink(outSink)); |
- } |
+ Future addStream(var stream) => outbound._addStream(stream); |
+ Future close() => outbound._close(); |
} |
@@ -763,7 +606,8 @@ class _HttpResponse extends _HttpOutboundMessage<HttpResponse> |
if (_headersWritten) throw new StateError("Headers already sent"); |
deadline = null; // Be sure to stop any deadline. |
var future = _httpRequest._httpConnection.detachSocket(); |
- _writeHeaders(drainRequest: false).then((_) => close()); |
+ _writeHeaders(drainRequest: false, |
+ setOutgoing: false).then((_) => close()); |
// Close connection so the socket is 'free'. |
close(); |
done.catchError((_) { |
@@ -783,12 +627,13 @@ class _HttpResponse extends _HttpOutboundMessage<HttpResponse> |
if (_deadline == null) return; |
_deadlineTimer = new Timer(_deadline, () { |
+ _outgoing._socketError = true; |
_outgoing.socket.destroy(); |
}); |
} |
void _writeHeader() { |
- Uint8List buffer = _httpRequest._httpConnection._headersBuffer; |
+ Uint8List buffer = new Uint8List(_OUTGOING_BUFFER_SIZE); |
int offset = 0; |
void write(List<int> bytes) { |
@@ -847,7 +692,7 @@ class _HttpResponse extends _HttpOutboundMessage<HttpResponse> |
offset = headers._write(buffer, offset); |
buffer[offset++] = _CharCode.CR; |
buffer[offset++] = _CharCode.LF; |
- _headersSink.add(new Uint8List.view(buffer.buffer, 0, offset)); |
+ _outgoing.setHeader(buffer, offset); |
} |
String _findReasonPhrase(int statusCode) { |
@@ -1036,7 +881,7 @@ class _HttpClientRequest extends _HttpOutboundMessage<HttpClientResponse> |
} |
void _writeHeader() { |
- Uint8List buffer = _httpClientConnection._headersBuffer; |
+ Uint8List buffer = new Uint8List(_OUTGOING_BUFFER_SIZE); |
int offset = 0; |
void write(List<int> bytes) { |
@@ -1074,41 +919,268 @@ class _HttpClientRequest extends _HttpOutboundMessage<HttpClientResponse> |
offset = headers._write(buffer, offset); |
buffer[offset++] = _CharCode.CR; |
buffer[offset++] = _CharCode.LF; |
- _headersSink.add(new Uint8List.view(buffer.buffer, 0, offset)); |
+ _outgoing.setHeader(buffer, offset); |
} |
} |
+// Used by _HttpOutgoing as a target of a chunked converter for gzip |
+// compression. |
+class _HttpGZipSink extends ByteConversionSink { |
+ final Function _consume; |
+ _HttpGZipSink(this._consume); |
+ |
+ void add(List<int> chunk) { |
+ _consume(chunk); |
+ } |
-class _ChunkedTransformerSink implements EventSink<List<int>> { |
+ void addSlice(Uint8List chunk, int start, int end, bool isLast) { |
+ _consume(new Uint8List.view(chunk.buffer, start, end - start)); |
+ } |
- int _pendingFooter = 0; |
- final EventSink<List<int>> _outSink; |
+ void close() {} |
+} |
- _ChunkedTransformerSink(this._outSink); |
- void add(List<int> data) { |
- _outSink.add(_chunkHeader(data.length)); |
- if (data.length > 0) _outSink.add(data); |
- _pendingFooter = 2; |
+// The _HttpOutgoing handles all of the following: |
+// - Buffering |
+// - GZip compressionm |
+// - Content-Length validation. |
+// - Errors. |
+// |
+// Most notable is the GZip compression, that uses a double-buffering system, |
+// one before gzip (_gzipBuffer) and one after (_buffer). |
+class _HttpOutgoing |
+ implements StreamConsumer<List<int>> { |
+ static const List<int> _footerAndChunk0Length = |
+ const [_CharCode.CR, _CharCode.LF, 0x30, _CharCode.CR, _CharCode.LF, |
+ _CharCode.CR, _CharCode.LF]; |
+ |
+ static const List<int> _chunk0Length = |
+ const [0x30, _CharCode.CR, _CharCode.LF, _CharCode.CR, _CharCode.LF]; |
+ |
+ final Completer _doneCompleter = new Completer(); |
+ final Socket socket; |
+ |
+ Uint8List _buffer; |
+ int _length = 0; |
+ |
+ Future _closeFuture; |
+ |
+ bool chunked = false; |
+ int _pendingChunkedFooter = 0; |
+ |
+ int contentLength; |
+ int _bytesWritten = 0; |
+ |
+ bool _gzip = false; |
+ ByteConversionSink _gzipSink; |
+ // _gzipAdd is set iff the sink is being added to. It's used to specify where |
+ // gzipped data should be taken (sometimes a controller, sometimes a socket). |
+ Function _gzipAdd; |
+ Uint8List _gzipBuffer; |
+ int _gzipBufferLength = 0; |
+ |
+ bool _socketError = false; |
+ |
+ _HttpOutboundMessage outbound; |
+ |
+ bool _ignoreError(error) |
+ => (error is SocketException || error is TlsException) && |
+ outbound is HttpResponse; |
+ |
+ _HttpOutgoing(this.socket); |
+ |
+ Future addStream(Stream<List<int>> stream, [Future pauseFuture]) { |
+ if (_socketError) { |
+ stream.listen(null).cancel(); |
+ return new Future.value(outbound); |
+ } |
+ var sub; |
+ var controller; |
+ // Use new stream so we are able to pause (see below listen). The |
+ // alternative is to use stream.extand, but that won't give us a way of |
+ // pausing. |
+ controller = new StreamController( |
+ onPause: () => sub.pause(), |
+ onResume: () => sub.resume(), |
+ sync: true); |
+ |
+ void onData(data) { |
+ if (_socketError) return; |
+ if (data.length == 0) return; |
+ if (chunked) { |
+ if (_gzip) { |
+ _gzipAdd = controller.add; |
+ _addGZipChunk(data, _gzipSink.add); |
+ _gzipAdd = null; |
+ return; |
+ } |
+ _addChunk(_chunkHeader(data.length), controller.add); |
+ _pendingChunkedFooter = 2; |
+ } else { |
+ if (contentLength != null) { |
+ _bytesWritten += data.length; |
+ if (_bytesWritten > contentLength) { |
+ controller.addError(new HttpException( |
+ "Content size exceeds specified contentLength. " |
+ "$_bytesWritten bytes written while expected " |
+ "$contentLength. " |
+ "[${new String.fromCharCodes(data)}]")); |
+ return; |
+ } |
+ } |
+ } |
+ _addChunk(data, controller.add); |
+ } |
+ |
+ sub = stream.listen( |
+ onData, |
+ onError: controller.addError, |
+ onDone: controller.close, |
+ cancelOnError: true); |
+ |
+ // While incoming is being drained, the pauseFuture is non-null. Pause |
+ // output until it's drained. |
+ if (pauseFuture != null) { |
+ sub.pause(pauseFuture); |
+ } |
+ |
+ return socket.addStream(controller.stream) |
+ .then((_) { |
+ return outbound; |
+ }, onError: (error) { |
+ // Be sure to close it in case of an error. |
+ if (_gzip) _gzipSink.close(); |
+ _socketError = true; |
+ _doneCompleter.completeError(error); |
+ if (_ignoreError(error)) { |
+ return outbound; |
+ } else { |
+ throw error; |
+ } |
+ }); |
} |
- void addError(Object error, [StackTrace stackTrace]) { |
- _outSink.addError(error, stackTrace); |
+ Future close() { |
+ // If we are already closed, return that future. |
+ if (_closeFuture != null) return _closeFuture; |
+ // If we earlier saw an error, return immidiate. The notification to |
+ // _Http*Connection is already done. |
+ if (_socketError) return new Future.value(outbound); |
+ // If contentLength was specified, validate it. |
+ if (contentLength != null) { |
+ if (_bytesWritten < contentLength) { |
+ var error = new HttpException( |
+ "Content size below specified contentLength. " |
+ " $_bytesWritten bytes written but expected " |
+ "$contentLength."); |
+ _doneCompleter.completeError(error); |
+ return _closeFuture = new Future.error(error); |
+ } |
+ } |
+ // In case of chunked encoding (and gzip), handle remaining gzip data and |
+ // append the 'footer' for chunked encoding. |
+ if (chunked) { |
+ if (_gzip) { |
+ _gzipAdd = socket.add; |
+ if (_gzipBufferLength > 0) { |
+ _gzipSink.add(new Uint8List.view( |
+ _gzipBuffer.buffer, 0, _gzipBufferLength)); |
+ } |
+ _gzipBuffer = null; |
+ _gzipSink.close(); |
+ _gzipAdd = null; |
+ } |
+ _addChunk(_chunkHeader(0), socket.add); |
+ } |
+ // Add any remaining data in the buffer. |
+ if (_length > 0) { |
+ socket.add(new Uint8List.view(_buffer.buffer, 0, _length)); |
+ } |
+ // Clear references, for better GC. |
+ _buffer = null; |
+ // And finally flush it. As we support keep-alive, never close it from here. |
+ // Once the socket is flushed, we'll be able to reuse it (signaled by the |
+ // 'done' future). |
+ return _closeFuture = socket.flush() |
+ .then((_) { |
+ _doneCompleter.complete(socket); |
+ return outbound; |
+ }, onError: (error) { |
+ _doneCompleter.completeError(error); |
+ if (_ignoreError(error)) { |
+ return outbound; |
+ } else { |
+ throw error; |
+ } |
+ }); |
} |
- void close() { |
- add(const []); |
- _outSink.close(); |
+ Future get done => _doneCompleter.future; |
+ |
+ void setHeader(List<int> data, int length) { |
+ assert(_length == 0); |
+ assert(data.length == _OUTGOING_BUFFER_SIZE); |
+ _buffer = data; |
+ _length = length; |
+ } |
+ |
+ void set gzip(bool value) { |
+ _gzip = value; |
+ if (_gzip) { |
+ _gzipBuffer = new Uint8List(_OUTGOING_BUFFER_SIZE); |
+ assert(_gzipSink == null); |
+ _gzipSink = new ZLibEncoder(gzip: true) |
+ .startChunkedConversion( |
+ new _HttpGZipSink((data) { |
+ // We are closing down prematurely, due to an error. Discard. |
+ if (_gzipAdd == null) return; |
+ _addChunk(_chunkHeader(data.length), _gzipAdd); |
+ _pendingChunkedFooter = 2; |
+ _addChunk(data, _gzipAdd); |
+ })); |
+ } |
+ } |
+ |
+ void _addGZipChunk(chunk, void add(List<int> data)) { |
+ if (chunk.length > _gzipBuffer.length - _gzipBufferLength) { |
+ add(new Uint8List.view( |
+ _gzipBuffer.buffer, 0, _gzipBufferLength)); |
+ _gzipBuffer = new Uint8List(_OUTGOING_BUFFER_SIZE); |
+ _gzipBufferLength = 0; |
+ } |
+ if (chunk.length > _OUTGOING_BUFFER_SIZE) { |
+ add(chunk); |
+ } else { |
+ _gzipBuffer.setRange(_gzipBufferLength, |
+ _gzipBufferLength + chunk.length, |
+ chunk); |
+ _gzipBufferLength += chunk.length; |
+ } |
+ } |
+ |
+ void _addChunk(chunk, void add(List<int> data)) { |
+ if (chunk.length > _buffer.length - _length) { |
+ add(new Uint8List.view(_buffer.buffer, 0, _length)); |
+ _buffer = new Uint8List(_OUTGOING_BUFFER_SIZE); |
+ _length = 0; |
+ } |
+ if (chunk.length > _OUTGOING_BUFFER_SIZE) { |
+ add(chunk); |
+ } else { |
+ _buffer.setRange(_length, _length + chunk.length, chunk); |
+ _length += chunk.length; |
+ } |
} |
List<int> _chunkHeader(int length) { |
const hexDigits = const [0x30, 0x31, 0x32, 0x33, 0x34, 0x35, 0x36, 0x37, |
0x38, 0x39, 0x41, 0x42, 0x43, 0x44, 0x45, 0x46]; |
if (length == 0) { |
- if (_pendingFooter == 2) return _footerAndChunk0Length; |
+ if (_pendingChunkedFooter == 2) return _footerAndChunk0Length; |
return _chunk0Length; |
} |
- int size = _pendingFooter; |
+ int size = _pendingChunkedFooter; |
int len = length; |
// Compute a fast integer version of (log(length + 1) / log(16)).ceil(). |
while (len > 0) { |
@@ -1116,12 +1188,12 @@ class _ChunkedTransformerSink implements EventSink<List<int>> { |
len >>= 4; |
} |
var footerAndHeader = new Uint8List(size + 2); |
- if (_pendingFooter == 2) { |
+ if (_pendingChunkedFooter == 2) { |
footerAndHeader[0] = _CharCode.CR; |
footerAndHeader[1] = _CharCode.LF; |
} |
int index = size; |
- while (index > _pendingFooter) { |
+ while (index > _pendingChunkedFooter) { |
footerAndHeader[--index] = hexDigits[length & 15]; |
length = length >> 4; |
} |
@@ -1129,102 +1201,6 @@ class _ChunkedTransformerSink implements EventSink<List<int>> { |
footerAndHeader[size + 1] = _CharCode.LF; |
return footerAndHeader; |
} |
- |
- static List<int> get _footerAndChunk0Length => new Uint8List.fromList( |
- const [_CharCode.CR, _CharCode.LF, 0x30, _CharCode.CR, _CharCode.LF, |
- _CharCode.CR, _CharCode.LF]); |
- |
- static List<int> get _chunk0Length => new Uint8List.fromList( |
- const [0x30, _CharCode.CR, _CharCode.LF, _CharCode.CR, _CharCode.LF]); |
-} |
- |
-// Transformer that transforms data to HTTP Chunked Encoding. |
-class _ChunkedTransformer implements StreamTransformer<List<int>, List<int>> { |
- const _ChunkedTransformer(); |
- |
- Stream<List<int>> bind(Stream<List<int>> stream) { |
- return new Stream<List<int>>.eventTransformed( |
- stream, |
- (EventSink<List<int>> sink) => new _ChunkedTransformerSink(sink)); |
- } |
-} |
- |
-// Transformer that validates the content length. |
-class _ContentLengthValidator |
- implements StreamTransformer<List<int>, List<int>>, EventSink<List<int>> { |
- final int expectedContentLength; |
- final Uri uri; |
- int _bytesWritten = 0; |
- |
- EventSink<List<int>> _outSink; |
- |
- _ContentLengthValidator(this.expectedContentLength, this.uri); |
- |
- Stream<List<int>> bind(Stream<List<int>> stream) { |
- return new Stream.eventTransformed( |
- stream, |
- (EventSink sink) { |
- if (_outSink != null) { |
- throw new StateError("Validator transformer already used"); |
- } |
- _outSink = sink; |
- return this; |
- }); |
- } |
- |
- void add(List<int> data) { |
- _bytesWritten += data.length; |
- if (_bytesWritten > expectedContentLength) { |
- _outSink.addError(new HttpException( |
- "Content size exceeds specified contentLength. " |
- "$_bytesWritten bytes written while expected " |
- "$expectedContentLength. " |
- "[${new String.fromCharCodes(data)}]", |
- uri: uri)); |
- _outSink.close(); |
- } else { |
- _outSink.add(data); |
- } |
- } |
- |
- void addError(Object error, [StackTrace stackTrace]) { |
- _outSink.addError(error, stackTrace); |
- } |
- |
- void close() { |
- if (_bytesWritten < expectedContentLength) { |
- _outSink.addError(new HttpException( |
- "Content size below specified contentLength. " |
- " $_bytesWritten bytes written while expected " |
- "$expectedContentLength.", |
- uri: uri)); |
- } |
- _outSink.close(); |
- } |
-} |
- |
- |
-// Extends StreamConsumer as this is an internal type, only used to pipe to. |
-class _HttpOutgoing implements StreamConsumer<List<int>> { |
- final Completer _doneCompleter = new Completer(); |
- final Socket socket; |
- |
- _HttpOutgoing(this.socket); |
- |
- Future addStream(Stream<List<int>> stream) { |
- return socket.addStream(stream) |
- .catchError((error) { |
- _doneCompleter.completeError(error); |
- throw error; |
- }); |
- } |
- |
- Future close() { |
- _doneCompleter.complete(socket); |
- return new Future.value(); |
- } |
- |
- Future get done => _doneCompleter.future; |
} |
class _HttpClientConnection { |
@@ -1238,7 +1214,6 @@ class _HttpClientConnection { |
Timer _idleTimer; |
bool closed = false; |
Uri _currentUri; |
- final Uint8List _headersBuffer = new Uint8List(_HEADERS_BUFFER_SIZE); |
Completer<_HttpIncoming> _nextResponseCompleter; |
Future _streamFuture; |
@@ -1901,7 +1876,6 @@ class _HttpConnection extends LinkedListEntry<_HttpConnection> { |
int _state = _IDLE; |
StreamSubscription _subscription; |
Timer _idleTimer; |
- final Uint8List _headersBuffer = new Uint8List(_HEADERS_BUFFER_SIZE); |
bool _idleMark = false; |
Future _streamFuture; |