Chromium Code Reviews| Index: sdk/lib/io/http_impl.dart |
| diff --git a/sdk/lib/io/http_impl.dart b/sdk/lib/io/http_impl.dart |
| index 5bc5b97cae902e5b01162391de71b196bc53bc52..6ad7168f14d8ecb50773eb9a508db9fd52fb7b82 100644 |
| --- a/sdk/lib/io/http_impl.dart |
| +++ b/sdk/lib/io/http_impl.dart |
| @@ -4,7 +4,7 @@ |
| part of dart.io; |
| -const int _HEADERS_BUFFER_SIZE = 8 * 1024; |
| +const int _OUTGOING_BUFFER_SIZE = 8 * 1024; |
| class _HttpIncoming extends Stream<List<int>> { |
| final int _transferLength; |
| @@ -406,28 +406,25 @@ class _HttpClientResponse |
| } |
| -abstract class _HttpOutboundMessage<T> implements IOSink { |
| +abstract class _HttpOutboundMessage<T> extends _IOSinkImpl { |
| // Used to mark when the body should be written. This is used for HEAD |
| // requests and in error handling. |
| bool _ignoreBody = false; |
| bool _headersWritten = false; |
| - bool _asGZip = false; |
| - IOSink _headersSink; |
| - IOSink _dataSink; |
| - |
| - final _HttpOutgoing _outgoing; |
| final Uri _uri; |
| + final _HttpOutgoing _outgoing; |
| final _HttpHeaders headers; |
| _HttpOutboundMessage(this._uri, |
| String protocolVersion, |
| - _HttpOutgoing outgoing) |
| - : _outgoing = outgoing, |
| - _headersSink = new IOSink(outgoing, encoding: ASCII), |
| + this._outgoing) |
| + : super(new _HttpOutboundConsumer(), null), |
| headers = new _HttpHeaders(protocolVersion) { |
| - _dataSink = new IOSink(new _HttpOutboundConsumer(this)); |
| + _outgoing.outbound = this; |
| + (_target as _HttpOutboundConsumer).outbound = this; |
|
Lasse Reichstein Nielsen
2014/03/03 11:37:23
Consider just doing an assignment to a _HttpOutbou
Anders Johnsen
2014/03/03 19:43:14
I see. However, this is planned to go away, so I'l
|
| + _encodingMutable = false; |
| } |
| int get contentLength => headers.contentLength; |
| @@ -450,60 +447,30 @@ abstract class _HttpOutboundMessage<T> implements IOSink { |
| return Encoding.getByName(charset); |
| } |
| - void set encoding(Encoding value) { |
| - throw new StateError("IOSink encoding is not mutable"); |
| - } |
| - |
| - void write(Object obj) { |
| - if (!_headersWritten) _dataSink.encoding = encoding; |
| - _dataSink.write(obj); |
| - } |
| - |
| - void writeAll(Iterable objects, [String separator = ""]) { |
| - if (!_headersWritten) _dataSink.encoding = encoding; |
| - _dataSink.writeAll(objects, separator); |
| - } |
| - |
| - void writeln([Object obj = ""]) { |
| - if (!_headersWritten) _dataSink.encoding = encoding; |
| - _dataSink.writeln(obj); |
| - } |
| - |
| - void writeCharCode(int charCode) { |
| - if (!_headersWritten) _dataSink.encoding = encoding; |
| - _dataSink.writeCharCode(charCode); |
| - } |
| - |
| void add(List<int> data) { |
| if (data.length == 0) return; |
| - _dataSink.add(data); |
| + super.add(data); |
| } |
| - void addError(error, [StackTrace stackTrace]) => |
| - _dataSink.addError(error, stackTrace); |
| - |
| - Future<T> addStream(Stream<List<int>> stream) => _dataSink.addStream(stream); |
| - |
| - Future flush() => _dataSink.flush(); |
| - |
| - Future close() => _dataSink.close(); |
| - |
| - Future<T> get done => _dataSink.done; |
| - |
| - Future _writeHeaders({bool drainRequest: true}) { |
| - void write() { |
| + Future _writeHeaders({bool drainRequest: true, |
| + bool setOutgoing: true}) { |
| + // TODO(ajohnsen): Avoid excessive futures in this method. |
| + write() { |
| try { |
| _writeHeader(); |
| - } catch (error) { |
| + } catch (error, s) { |
| // Headers too large. |
| throw new HttpException( |
| - "Headers size exceeded the of '$_HEADERS_BUFFER_SIZE' bytes"); |
| + "Headers size exceeded the of '$_OUTGOING_BUFFER_SIZE'" |
| + " bytes"); |
| } |
| + return this; |
| } |
| - if (_headersWritten) return new Future.value(); |
| + if (_headersWritten) return new Future.value(this); |
| _headersWritten = true; |
| - _dataSink.encoding = encoding; |
| + Future drainFuture; |
| bool isServerSide = this is _HttpResponse; |
| + bool gzip = false; |
| if (isServerSide) { |
| var response = this; |
| if (headers.chunkedTransferEncoding) { |
| @@ -516,42 +483,51 @@ abstract class _HttpOutboundMessage<T> implements IOSink { |
| .any((encoding) => encoding.trim().toLowerCase() == "gzip") && |
| contentEncoding == null) { |
| headers.set(HttpHeaders.CONTENT_ENCODING, "gzip"); |
| - _asGZip = true; |
| + gzip = true; |
| } |
| } |
| if (drainRequest && !response._httpRequest._incoming.hasSubscriber) { |
| - return response._httpRequest.drain() |
| - // TODO(ajohnsen): Timeout on drain? |
| - .catchError((_) {}) // Ignore errors. |
| - .then((_) => write()); |
| + drainFuture = response._httpRequest.drain().catchError((_) {}); |
| } |
| + } else { |
| + drainRequest = false; |
| + } |
| + if (_ignoreBody) { |
| + return new Future.sync(write).then((_) => _outgoing.close()); |
| + } |
| + if (setOutgoing) { |
| + int contentLength = headers.contentLength; |
| + if (headers.chunkedTransferEncoding) { |
| + _outgoing.chunked = true; |
| + if (gzip) _outgoing.gzip = true; |
| + } else if (contentLength >= 0) { |
| + _outgoing.contentLength = contentLength; |
| + } |
| + } |
| + if (drainFuture != null) { |
| + return drainFuture.then((_) => write()); |
| } |
| return new Future.sync(write); |
| } |
| Future _addStream(Stream<List<int>> stream) { |
| - return _writeHeaders() |
| - .then((_) { |
| - int contentLength = headers.contentLength; |
| - if (_ignoreBody) { |
| - stream.drain().catchError((_) {}); |
| - return _headersSink.close(); |
| - } |
| - stream = stream.transform(const _BufferTransformer()); |
| - if (headers.chunkedTransferEncoding) { |
| - if (_asGZip) { |
| - stream = stream.transform(GZIP.encoder); |
| - } |
| - stream = stream.transform(const _ChunkedTransformer()); |
| - } else if (contentLength >= 0) { |
| - stream = stream.transform( |
| - new _ContentLengthValidator(contentLength, _uri)); |
| - } |
| - return _headersSink.addStream(stream); |
| - }); |
| + // TODO(ajohnsen): Merge into _HttpOutgoing. |
| + if (_ignoreBody) { |
| + stream.drain().catchError((_) {}); |
|
Lasse Reichstein Nielsen
2014/03/03 11:37:23
The "drain" method just ignores input and waits fo
Anders Johnsen
2014/03/03 19:43:14
No. The remote user may see that as write failing,
|
| + return _writeHeaders(); |
| + } |
| + if (_headersWritten) { |
| + return _outgoing.addStream(stream); |
| + } else { |
| + var completer = new Completer.sync(); |
| + var future = _outgoing.addStream(stream, completer.future); |
| + _writeHeaders().then(completer.complete); |
| + return future; |
| + } |
| } |
| Future _close() { |
| + // TODO(ajohnsen): Merge into _HttpOutgoing. |
| if (!_headersWritten) { |
| if (!_ignoreBody && headers.contentLength == -1) { |
| // If no body was written, _ignoreBody is false (it's not a HEAD |
| @@ -560,14 +536,14 @@ abstract class _HttpOutboundMessage<T> implements IOSink { |
| headers.chunkedTransferEncoding = false; |
| headers.contentLength = 0; |
| } else if (!_ignoreBody && headers.contentLength > 0) { |
| - _headersSink.addError(new HttpException( |
| - "No content while contentLength was specified to be greater " |
| - "than 0: ${headers.contentLength}.", |
| - uri: _uri)); |
| - return _headersSink.done; |
| + return _outgoing.addStream( |
| + new Stream.fromFuture(new Future.error(new HttpException( |
| + "No content while contentLength was specified to be greater " |
|
Lasse Reichstein Nielsen
2014/03/03 11:37:23
while -> and
Or perhaps "even though".
Anders Johnsen
2014/03/03 19:43:14
Done.
|
| + "than 0: ${headers.contentLength}.", |
| + uri: _uri)))); |
| } |
| } |
| - return _writeHeaders().whenComplete(_headersSink.close); |
| + return _writeHeaders().whenComplete(_outgoing.close); |
| } |
| void _writeHeader(); |
| @@ -575,146 +551,13 @@ abstract class _HttpOutboundMessage<T> implements IOSink { |
| class _HttpOutboundConsumer implements StreamConsumer { |
| - final _HttpOutboundMessage _outbound; |
| - StreamController _controller; |
| - StreamSubscription _subscription; |
| - Completer _closeCompleter = new Completer(); |
| - Completer _completer; |
| - bool _socketError = false; |
| + // TODO(ajohnsen): Once _addStream and _close is merged into _HttpOutgoing, |
| + // this class can be removed. |
| + _HttpOutboundMessage outbound; |
| + _HttpOutboundConsumer(); |
| - _HttpOutboundConsumer(this._outbound); |
| - |
| - void _cancel() { |
| - if (_subscription != null) { |
| - StreamSubscription subscription = _subscription; |
| - _subscription = null; |
| - subscription.cancel(); |
| - } |
| - } |
| - |
| - bool _ignoreError(error) |
| - => (error is SocketException || error is TlsException) && |
| - _outbound is HttpResponse; |
| - |
| - _ensureController() { |
| - if (_controller != null) return; |
| - _controller = new StreamController(sync: true, |
| - onPause: () => _subscription.pause(), |
| - onResume: () => _subscription.resume(), |
| - onListen: () => _subscription.resume(), |
| - onCancel: _cancel); |
| - _outbound._addStream(_controller.stream) |
| - .then((_) { |
| - _cancel(); |
| - _done(); |
| - _closeCompleter.complete(_outbound); |
| - }, |
| - onError: (error, [StackTrace stackTrace]) { |
| - _socketError = true; |
| - if (_ignoreError(error)) { |
| - _cancel(); |
| - _done(); |
| - _closeCompleter.complete(_outbound); |
| - } else { |
| - if (!_done(error)) { |
| - _closeCompleter.completeError(error, stackTrace); |
| - } |
| - } |
| - }); |
| - } |
| - |
| - bool _done([error, StackTrace stackTrace]) { |
| - if (_completer == null) return false; |
| - if (error != null) { |
| - _completer.completeError(error, stackTrace); |
| - } else { |
| - _completer.complete(_outbound); |
| - } |
| - _completer = null; |
| - return true; |
| - } |
| - |
| - Future addStream(var stream) { |
| - // If we saw a socket error subscribe and then cancel, to ignore any data |
| - // on the stream. |
| - if (_socketError) { |
| - stream.listen(null).cancel(); |
| - return new Future.value(_outbound); |
| - } |
| - _completer = new Completer(); |
| - _subscription = stream.listen( |
| - (data) => _controller.add(data), |
| - onDone: _done, |
| - onError: (e, s) => _controller.addError(e, s), |
| - cancelOnError: true); |
| - // Pause the first request. |
| - if (_controller == null) _subscription.pause(); |
| - _ensureController(); |
| - return _completer.future; |
| - } |
| - |
| - Future close() { |
| - Future closeOutbound() { |
| - if (_socketError) return new Future.value(_outbound); |
| - return _outbound._close() |
| - .catchError((_) {}, test: _ignoreError) |
| - .then((_) => _outbound); |
| - } |
| - if (_controller == null) return closeOutbound(); |
| - _controller.close(); |
| - return _closeCompleter.future.then((_) => closeOutbound()); |
| - } |
| -} |
| - |
| - |
| -class _BufferTransformerSink implements EventSink<List<int>> { |
| - static const int MIN_CHUNK_SIZE = 4 * 1024; |
| - static const int MAX_BUFFER_SIZE = 16 * 1024; |
| - |
| - final BytesBuilder _builder = new BytesBuilder(); |
| - final EventSink<List<int>> _outSink; |
| - |
| - _BufferTransformerSink(this._outSink); |
| - |
| - void add(List<int> data) { |
| - // TODO(ajohnsen): Use timeout? |
| - if (data.length == 0) return; |
| - if (data.length >= MIN_CHUNK_SIZE) { |
| - flush(); |
| - _outSink.add(data); |
| - } else { |
| - _builder.add(data); |
| - if (_builder.length >= MAX_BUFFER_SIZE) { |
| - flush(); |
| - } |
| - } |
| - } |
| - |
| - void addError(Object error, [StackTrace stackTrace]) { |
| - _outSink.addError(error, stackTrace); |
| - } |
| - |
| - void close() { |
| - flush(); |
| - _outSink.close(); |
| - } |
| - |
| - void flush() { |
| - if (_builder.length > 0) { |
| - // takeBytes will clear the BytesBuilder. |
| - _outSink.add(_builder.takeBytes()); |
| - } |
| - } |
| -} |
| - |
| -class _BufferTransformer implements StreamTransformer<List<int>, List<int>> { |
| - const _BufferTransformer(); |
| - |
| - Stream<List<int>> bind(Stream<List<int>> stream) { |
| - return new Stream<List<int>>.eventTransformed( |
| - stream, |
| - (EventSink outSink) => new _BufferTransformerSink(outSink)); |
| - } |
| + Future addStream(var stream) => outbound._addStream(stream); |
| + Future close() => outbound._close(); |
| } |
| @@ -763,7 +606,8 @@ class _HttpResponse extends _HttpOutboundMessage<HttpResponse> |
| if (_headersWritten) throw new StateError("Headers already sent"); |
| deadline = null; // Be sure to stop any deadline. |
| var future = _httpRequest._httpConnection.detachSocket(); |
| - _writeHeaders(drainRequest: false).then((_) => close()); |
| + _writeHeaders(drainRequest: false, |
| + setOutgoing: false).then((_) => close()); |
| // Close connection so the socket is 'free'. |
| close(); |
| done.catchError((_) { |
| @@ -783,12 +627,13 @@ class _HttpResponse extends _HttpOutboundMessage<HttpResponse> |
| if (_deadline == null) return; |
| _deadlineTimer = new Timer(_deadline, () { |
| + _outgoing._socketError = true; |
| _outgoing.socket.destroy(); |
| }); |
| } |
| void _writeHeader() { |
| - Uint8List buffer = _httpRequest._httpConnection._headersBuffer; |
| + Uint8List buffer = new Uint8List(_OUTGOING_BUFFER_SIZE); |
| int offset = 0; |
| void write(List<int> bytes) { |
| @@ -847,7 +692,7 @@ class _HttpResponse extends _HttpOutboundMessage<HttpResponse> |
| offset = headers._write(buffer, offset); |
| buffer[offset++] = _CharCode.CR; |
| buffer[offset++] = _CharCode.LF; |
| - _headersSink.add(new Uint8List.view(buffer.buffer, 0, offset)); |
| + _outgoing.setHeader(buffer, offset); |
| } |
| String _findReasonPhrase(int statusCode) { |
| @@ -1036,7 +881,7 @@ class _HttpClientRequest extends _HttpOutboundMessage<HttpClientResponse> |
| } |
| void _writeHeader() { |
| - Uint8List buffer = _httpClientConnection._headersBuffer; |
| + Uint8List buffer = new Uint8List(_OUTGOING_BUFFER_SIZE); |
| int offset = 0; |
| void write(List<int> bytes) { |
| @@ -1074,41 +919,265 @@ class _HttpClientRequest extends _HttpOutboundMessage<HttpClientResponse> |
| offset = headers._write(buffer, offset); |
| buffer[offset++] = _CharCode.CR; |
| buffer[offset++] = _CharCode.LF; |
| - _headersSink.add(new Uint8List.view(buffer.buffer, 0, offset)); |
| + _outgoing.setHeader(buffer, offset); |
| } |
| } |
| +// Used by _HttpOutgoing as a target of a chunked converter for gzip |
| +// compression. |
| +class _HttpGZipSink extends ByteConversionSink { |
| + final Function _consume; |
| + _HttpGZipSink(this._consume); |
| -class _ChunkedTransformerSink implements EventSink<List<int>> { |
| + void add(List<int> chunk) { |
| + _consume(chunk); |
| + } |
| - int _pendingFooter = 0; |
| - final EventSink<List<int>> _outSink; |
| + void addSlice(Uint8List chunk, int start, int end, bool isLast) { |
| + _consume(new Uint8List.view(chunk.buffer, start, end - start)); |
| + } |
| - _ChunkedTransformerSink(this._outSink); |
| + void close() {} |
| +} |
| - void add(List<int> data) { |
| - _outSink.add(_chunkHeader(data.length)); |
| - if (data.length > 0) _outSink.add(data); |
| - _pendingFooter = 2; |
| + |
| +// The _HttpOutgoing handles all of the following: |
| +// - Buffering |
| +// - GZip compressionm |
| +// - Content-Length validation. |
| +// - Errors. |
| +// |
| +// Most notable is the GZip compression, that uses a double-buffering system, |
| +// one before gzip (_gzipBuffer) and one after (_buffer). |
| +class _HttpOutgoing |
| + implements StreamConsumer<List<int>> { |
| + static const List<int> _footerAndChunk0Length = |
| + const [_CharCode.CR, _CharCode.LF, 0x30, _CharCode.CR, _CharCode.LF, |
| + _CharCode.CR, _CharCode.LF]; |
| + |
| + static const List<int> _chunk0Length = |
| + const [0x30, _CharCode.CR, _CharCode.LF, _CharCode.CR, _CharCode.LF]; |
| + |
| + final Completer _doneCompleter = new Completer(); |
| + final Socket socket; |
| + |
| + Uint8List _buffer; |
| + int _length = 0; |
| + |
| + Future _closeFuture; |
| + |
| + bool chunked = false; |
| + int _pendingChunkedFooter = 0; |
| + |
| + int contentLength; |
| + int _bytesWritten = 0; |
| + |
| + bool _gzip = false; |
| + ByteConversionSink _gzipSink; |
| + // _gzipAdd is set iff the sink is being added to. It's used to specify where |
| + // gzipped data should be taken (sometimes a controller, sometimes a socket). |
| + Function _gzipAdd; |
| + Uint8List _gzipBuffer; |
| + int _gzipBufferLength = 0; |
| + |
| + bool _socketError = false; |
| + |
| + _HttpOutboundMessage outbound; |
| + |
| + bool _ignoreError(error) |
| + => (error is SocketException || error is TlsException) && |
| + outbound is HttpResponse; |
| + |
| + _HttpOutgoing(this.socket); |
| + |
| + Future addStream(Stream<List<int>> stream, [Future pauseFuture]) { |
| + if (_socketError) { |
| + stream.listen(null).cancel(); |
| + return new Future.value(outbound); |
| + } |
| + var sub; |
| + var controller; |
| + // Use new stream so we are able to pause (see below listen). The |
| + // alternative is to use stream.extand, but that won't give us a way of |
| + // pausing. |
| + controller = new StreamController( |
| + onPause: () => sub.pause(), |
| + onResume: () => sub.resume(), |
| + sync: true); |
| + sub = stream.listen( |
| + (data) { |
| + if (_socketError) return; |
| + if (data.length == 0) return; |
| + if (chunked) { |
| + if (_gzip) { |
| + _gzipAdd = controller.add; |
| + _addGZipChunk(data, _gzipSink.add); |
| + _gzipAdd = null; |
| + return; |
| + } |
| + _addChunk(_chunkHeader(data.length), controller.add); |
| + _pendingChunkedFooter = 2; |
| + } else { |
| + if (contentLength != null) { |
| + _bytesWritten += data.length; |
| + if (_bytesWritten > contentLength) { |
| + controller.addError(new HttpException( |
| + "Content size exceeds specified contentLength. " |
| + "$_bytesWritten bytes written while expected " |
| + "$contentLength. " |
| + "[${new String.fromCharCodes(data)}]")); |
| + return; |
| + } |
| + } |
| + } |
| + _addChunk(data, controller.add); |
| + }, |
| + onError: controller.addError, |
| + onDone: controller.close, |
| + cancelOnError: true); |
|
Lasse Reichstein Nielsen
2014/03/03 11:37:23
If you want to avoid extra closures, you can swap
Anders Johnsen
2014/03/03 19:43:14
I'm fine by doing it like this. Also, this will no
|
| + |
| + // If incoming is being drained, the pauseFuture is != null. Pause output |
|
Lasse Reichstein Nielsen
2014/03/03 11:37:23
If -> while? Is this the only case where pauseFutu
Anders Johnsen
2014/03/03 19:43:14
Done. No, it can be it in many cases (one of the r
|
| + // until it's drained. |
| + if (pauseFuture != null) { |
| + sub.pause(pauseFuture); |
| + } |
| + |
| + return socket.addStream(controller.stream) |
| + .then((_) { |
| + return outbound; |
| + }, onError: (error) { |
| + // Be sure to close it in case of an error. |
| + if (_gzip) _gzipSink.close(); |
| + _socketError = true; |
| + _doneCompleter.completeError(error); |
| + if (_ignoreError(error)) { |
| + return outbound; |
| + } else { |
| + throw error; |
| + } |
| + }); |
| + } |
| + |
| + Future close() { |
| + // If we was already closed, return that future. |
|
Lasse Reichstein Nielsen
2014/03/03 11:37:23
was -> are
Anders Johnsen
2014/03/03 19:43:14
Done.
|
| + if (_closeFuture != null) return _closeFuture; |
| + // If we earlier saw an error, return immidiate. The internals are already |
|
Lasse Reichstein Nielsen
2014/03/03 11:37:23
if we have seen an error, return immediately.
inte
Anders Johnsen
2014/03/03 19:43:14
Done.
|
| + // closed correctly. |
| + if (_socketError) return new Future.value(outbound); |
| + // If contentLength was specified, validate it. |
| + if (contentLength != null) { |
| + if (_bytesWritten < contentLength) { |
| + var error = new HttpException( |
| + "Content size below specified contentLength. " |
| + " $_bytesWritten bytes written while expected " |
|
Lasse Reichstein Nielsen
2014/03/03 11:37:23
while -> but.
Anders Johnsen
2014/03/03 19:43:14
Done.
|
| + "$contentLength."); |
| + _doneCompleter.completeError(error); |
| + return _closeFuture = new Future.error(error); |
| + } |
| + } |
| + // In case of chunked encoding (and gzip), handle remaining gzip data and |
| + // append the 'footer' for chunked encoding. |
| + if (chunked) { |
| + if (_gzip) { |
| + _gzipAdd = socket.add; |
| + if (_gzipBufferLength > 0) { |
| + _gzipSink.add(new Uint8List.view( |
| + _gzipBuffer.buffer, 0, _gzipBufferLength)); |
| + } |
| + _gzipBuffer = null; |
| + _gzipSink.close(); |
| + _gzipAdd = null; |
| + } |
| + _addChunk(_chunkHeader(0), socket.add); |
| + } |
| + // Add any remaining data in the buffer. |
| + if (_length > 0) { |
| + socket.add(new Uint8List.view(_buffer.buffer, 0, _length)); |
| + } |
| + // Clear references, for better GC. |
| + _buffer = null; |
| + // And finally flush it. As we support keep-alive, never close it from here. |
| + // Once the socket is flushed, we'll be able to reuse it (signaled by the |
| + // 'done' future). |
| + return _closeFuture = socket.flush() |
| + .then((_) { |
| + _doneCompleter.complete(socket); |
| + return outbound; |
| + }, onError: (error) { |
| + _doneCompleter.completeError(error); |
| + if (_ignoreError(error)) { |
| + return outbound; |
| + } else { |
| + throw error; |
| + } |
| + }); |
| } |
| - void addError(Object error, [StackTrace stackTrace]) { |
| - _outSink.addError(error, stackTrace); |
| + Future get done => _doneCompleter.future; |
| + |
| + void setHeader(List<int> data, int length) { |
| + assert(_length == 0); |
| + assert(data.length == _OUTGOING_BUFFER_SIZE); |
| + _buffer = data; |
| + _length = length; |
| + } |
| + |
| + void set gzip(bool value) { |
| + _gzip = value; |
| + if (_gzip) { |
| + _gzipBuffer = new Uint8List(_OUTGOING_BUFFER_SIZE); |
| + assert(_gzipSink == null); |
| + _gzipSink = new ZLibEncoder(gzip: true) |
| + .startChunkedConversion( |
| + new _HttpGZipSink((data) { |
| + // We are closing down prematurely, due to an error. Discard. |
| + if (_gzipAdd == null) return; |
| + _addChunk(_chunkHeader(data.length), _gzipAdd); |
| + _pendingChunkedFooter = 2; |
| + _addChunk(data, _gzipAdd); |
| + })); |
| + } |
| + } |
| + |
| + void _addGZipChunk(chunk, void add(List<int> data)) { |
| + if (chunk.length > _gzipBuffer.length - _gzipBufferLength) { |
| + add(new Uint8List.view( |
| + _gzipBuffer.buffer, 0, _gzipBufferLength)); |
| + _gzipBuffer = new Uint8List(_OUTGOING_BUFFER_SIZE); |
| + _gzipBufferLength = 0; |
| + } |
| + if (chunk.length > _OUTGOING_BUFFER_SIZE) { |
| + add(chunk); |
| + } else { |
| + _gzipBuffer.setRange(_gzipBufferLength, |
| + _gzipBufferLength + chunk.length, |
| + chunk); |
| + _gzipBufferLength += chunk.length; |
| + } |
| } |
| - void close() { |
| - add(const []); |
| - _outSink.close(); |
| + void _addChunk(chunk, void add(List<int> data)) { |
| + if (chunk.length > _buffer.length - _length) { |
| + add(new Uint8List.view(_buffer.buffer, 0, _length)); |
| + _buffer = new Uint8List(_OUTGOING_BUFFER_SIZE); |
| + _length = 0; |
| + } |
| + if (chunk.length > _OUTGOING_BUFFER_SIZE) { |
| + add(chunk); |
| + } else { |
| + _buffer.setRange(_length, _length + chunk.length, chunk); |
| + _length += chunk.length; |
| + } |
| } |
| List<int> _chunkHeader(int length) { |
| const hexDigits = const [0x30, 0x31, 0x32, 0x33, 0x34, 0x35, 0x36, 0x37, |
| 0x38, 0x39, 0x41, 0x42, 0x43, 0x44, 0x45, 0x46]; |
| if (length == 0) { |
| - if (_pendingFooter == 2) return _footerAndChunk0Length; |
| + if (_pendingChunkedFooter == 2) return _footerAndChunk0Length; |
| return _chunk0Length; |
| } |
| - int size = _pendingFooter; |
| + int size = _pendingChunkedFooter; |
| int len = length; |
| // Compute a fast integer version of (log(length + 1) / log(16)).ceil(). |
| while (len > 0) { |
| @@ -1116,12 +1185,12 @@ class _ChunkedTransformerSink implements EventSink<List<int>> { |
| len >>= 4; |
| } |
| var footerAndHeader = new Uint8List(size + 2); |
| - if (_pendingFooter == 2) { |
| + if (_pendingChunkedFooter == 2) { |
| footerAndHeader[0] = _CharCode.CR; |
| footerAndHeader[1] = _CharCode.LF; |
| } |
| int index = size; |
| - while (index > _pendingFooter) { |
| + while (index > _pendingChunkedFooter) { |
| footerAndHeader[--index] = hexDigits[length & 15]; |
| length = length >> 4; |
| } |
| @@ -1129,102 +1198,6 @@ class _ChunkedTransformerSink implements EventSink<List<int>> { |
| footerAndHeader[size + 1] = _CharCode.LF; |
| return footerAndHeader; |
| } |
| - |
| - static List<int> get _footerAndChunk0Length => new Uint8List.fromList( |
| - const [_CharCode.CR, _CharCode.LF, 0x30, _CharCode.CR, _CharCode.LF, |
| - _CharCode.CR, _CharCode.LF]); |
| - |
| - static List<int> get _chunk0Length => new Uint8List.fromList( |
| - const [0x30, _CharCode.CR, _CharCode.LF, _CharCode.CR, _CharCode.LF]); |
| -} |
| - |
| -// Transformer that transforms data to HTTP Chunked Encoding. |
| -class _ChunkedTransformer implements StreamTransformer<List<int>, List<int>> { |
| - const _ChunkedTransformer(); |
| - |
| - Stream<List<int>> bind(Stream<List<int>> stream) { |
| - return new Stream<List<int>>.eventTransformed( |
| - stream, |
| - (EventSink<List<int>> sink) => new _ChunkedTransformerSink(sink)); |
| - } |
| -} |
| - |
| -// Transformer that validates the content length. |
| -class _ContentLengthValidator |
| - implements StreamTransformer<List<int>, List<int>>, EventSink<List<int>> { |
| - final int expectedContentLength; |
| - final Uri uri; |
| - int _bytesWritten = 0; |
| - |
| - EventSink<List<int>> _outSink; |
| - |
| - _ContentLengthValidator(this.expectedContentLength, this.uri); |
| - |
| - Stream<List<int>> bind(Stream<List<int>> stream) { |
| - return new Stream.eventTransformed( |
| - stream, |
| - (EventSink sink) { |
| - if (_outSink != null) { |
| - throw new StateError("Validator transformer already used"); |
| - } |
| - _outSink = sink; |
| - return this; |
| - }); |
| - } |
| - |
| - void add(List<int> data) { |
| - _bytesWritten += data.length; |
| - if (_bytesWritten > expectedContentLength) { |
| - _outSink.addError(new HttpException( |
| - "Content size exceeds specified contentLength. " |
| - "$_bytesWritten bytes written while expected " |
| - "$expectedContentLength. " |
| - "[${new String.fromCharCodes(data)}]", |
| - uri: uri)); |
| - _outSink.close(); |
| - } else { |
| - _outSink.add(data); |
| - } |
| - } |
| - |
| - void addError(Object error, [StackTrace stackTrace]) { |
| - _outSink.addError(error, stackTrace); |
| - } |
| - |
| - void close() { |
| - if (_bytesWritten < expectedContentLength) { |
| - _outSink.addError(new HttpException( |
| - "Content size below specified contentLength. " |
| - " $_bytesWritten bytes written while expected " |
| - "$expectedContentLength.", |
| - uri: uri)); |
| - } |
| - _outSink.close(); |
| - } |
| -} |
| - |
| - |
| -// Extends StreamConsumer as this is an internal type, only used to pipe to. |
| -class _HttpOutgoing implements StreamConsumer<List<int>> { |
| - final Completer _doneCompleter = new Completer(); |
| - final Socket socket; |
| - |
| - _HttpOutgoing(this.socket); |
| - |
| - Future addStream(Stream<List<int>> stream) { |
| - return socket.addStream(stream) |
| - .catchError((error) { |
| - _doneCompleter.completeError(error); |
| - throw error; |
| - }); |
| - } |
| - |
| - Future close() { |
| - _doneCompleter.complete(socket); |
| - return new Future.value(); |
| - } |
| - |
| - Future get done => _doneCompleter.future; |
| } |
| class _HttpClientConnection { |
| @@ -1238,7 +1211,6 @@ class _HttpClientConnection { |
| Timer _idleTimer; |
| bool closed = false; |
| Uri _currentUri; |
| - final Uint8List _headersBuffer = new Uint8List(_HEADERS_BUFFER_SIZE); |
| Completer<_HttpIncoming> _nextResponseCompleter; |
| Future _streamFuture; |
| @@ -1901,7 +1873,6 @@ class _HttpConnection extends LinkedListEntry<_HttpConnection> { |
| int _state = _IDLE; |
| StreamSubscription _subscription; |
| Timer _idleTimer; |
| - final Uint8List _headersBuffer = new Uint8List(_HEADERS_BUFFER_SIZE); |
| bool _idleMark = false; |
| Future _streamFuture; |