Chromium Code Reviews| Index: sdk/lib/io/http_impl.dart |
| diff --git a/sdk/lib/io/http_impl.dart b/sdk/lib/io/http_impl.dart |
| index 625154aceb8f6e1f30b63a1430f7dbec0d52cf07..e87908b4cdf3fce4f2d85e3678b60394a750856d 100644 |
| --- a/sdk/lib/io/http_impl.dart |
| +++ b/sdk/lib/io/http_impl.dart |
| @@ -322,16 +322,15 @@ abstract class _HttpOutboundMessage<T> implements IOSink { |
| // requests and in error handling. |
| bool _ignoreBody = false; |
| bool _headersWritten = false; |
| - bool _chunked = false; |
| - final IOSink _ioSink; |
| + IOSink _ioSink; |
| final _HttpOutgoing _outgoing; |
| final _HttpHeaders headers; |
| _HttpOutboundMessage(String protocolVersion, _HttpOutgoing outgoing) |
| : _outgoing = outgoing, |
| - _ioSink = new IOSink(outgoing, encoding: Encoding.ASCII), |
| + _ioSink = new IOSink(outgoing, encoding: Encoding.ASCII), |
| headers = new _HttpHeaders(protocolVersion); |
| int get contentLength => headers.contentLength; |
| @@ -360,7 +359,6 @@ abstract class _HttpOutboundMessage<T> implements IOSink { |
| void write(Object obj) { |
| _writeHeaders(); |
| - if (_ignoreBody) return; |
| // This comment is copied from runtime/lib/string_buffer_patch.dart. |
| // TODO(srdjan): The following four lines could be replaced by |
| // '$obj', but apparently this is too slow on the Dart VM. |
| @@ -374,12 +372,7 @@ abstract class _HttpOutboundMessage<T> implements IOSink { |
| } |
| } |
| if (string.isEmpty) return; |
| - if (_chunked) { |
| - _ChunkedTransformer._addChunk(_encodeString(string, encoding), |
| - _ioSink.writeBytes); |
| - } else { |
| - _ioSink.write(string); |
| - } |
| + _ioSink.write(string); |
| } |
| void writeAll(Iterable objects) { |
| @@ -397,31 +390,17 @@ abstract class _HttpOutboundMessage<T> implements IOSink { |
| void writeBytes(List<int> data) { |
| _writeHeaders(); |
| - if (_ignoreBody || data.length == 0) return; |
| - if (_chunked) { |
| - _ChunkedTransformer._addChunk(data, _ioSink.writeBytes); |
| - } else { |
| - _ioSink.writeBytes(data); |
| - } |
| + if (data.length == 0) return; |
| + _ioSink.writeBytes(data); |
| } |
| Future<T> consume(Stream<List<int>> stream) { |
| _writeHeaders(); |
| - if (_ignoreBody) return new Future.immediate(this); |
| - if (_chunked) { |
| - // Transform when chunked. |
| - stream = stream.transform(new _ChunkedTransformer()); |
| - } |
| - return _ioSink.consume(stream).then((_) => this); |
| + return _ioSink.consume(stream); |
| } |
| Future<T> writeStream(Stream<List<int>> stream) { |
| _writeHeaders(); |
| - if (_ignoreBody) return new Future.immediate(this); |
| - if (_chunked) { |
| - // Transform when chunked. |
| - stream = stream.transform(new _ChunkedTransformer(writeEnd: false)); |
| - } |
| return _ioSink.writeStream(stream).then((_) => this); |
| } |
| @@ -432,36 +411,60 @@ abstract class _HttpOutboundMessage<T> implements IOSink { |
| headers.contentLength = 0; |
| } |
| _writeHeaders(); |
| - if (!_ignoreBody) { |
| - if (_chunked) { |
| - _ChunkedTransformer._addChunk([], _ioSink.writeBytes); |
| - } |
| - } |
| _ioSink.close(); |
| } |
| - Future<T> get done => _ioSink.done.then((_) => this); |
| + Future<T> get done { |
| + _writeHeaders(); |
| + return _ioSink.done; |
| + } |
| void _writeHeaders() { |
| if (_headersWritten) return; |
| _headersWritten = true; |
| _ioSink.encoding = Encoding.ASCII; |
| _writeHeader(); |
| + _ioSink = new IOSink(new _HttpOutboundConsumer(_ioSink, _consume)); |
| _ioSink.encoding = encoding; |
| + } |
| + |
| + Future _consume(IOSink ioSink, Stream<List<int>> stream) { |
| + int contentLength = headers.contentLength; |
| if (_ignoreBody) { |
| - _ioSink.close(); |
| - return; |
| + ioSink.close(); |
| + return stream.reduce(null, (x, y) {}).then((_) => this); |
| } |
| - _chunked = headers.chunkedTransferEncoding; |
| - if (headers.contentLength >= 0) { |
| - _outgoing.setTransferLength(headers.contentLength); |
| + if (headers.chunkedTransferEncoding) { |
| + bool isServerSide = this is _HttpResponse; |
| + if (isServerSide) { |
| + List acceptEncodings = |
| + _httpRequest.headers[HttpHeaders.ACCEPT_ENCODING]; |
| + bool canGZip = acceptEncodings != null && acceptEncodings.any( |
| + (encoding) => encoding.toLowerCase() == "gzip"); |
|
Søren Gjesse
2013/03/12 08:54:09
Here the header Content-Encoding should be set to
Anders Johnsen
2013/03/12 10:04:59
Done.
|
| + if (canGZip && headers.chunkedTransferEncoding) { |
| + stream = stream.transform(new ZLibDeflater(gzip: true, level: 6)); |
| + } |
| + } |
| + stream = stream.transform(new _ChunkedTransformer()); |
| + } else if (contentLength >= 0) { |
| + stream = stream.transform(new _ContentLengthValidator(contentLength)); |
| } |
| + return stream.pipe(ioSink).then((_) => this); |
| } |
| void _writeHeader(); // TODO(ajohnsen): Better name. |
| } |
| +class _HttpOutboundConsumer implements StreamConsumer { |
| + Function _consume; |
| + IOSink _ioSink; |
| + _HttpOutboundConsumer(IOSink this._ioSink, Function this._consume); |
| + |
| + Future consume(var stream) => _consume(_ioSink, stream); |
| +} |
| + |
| + |
| class _HttpResponse extends _HttpOutboundMessage<HttpResponse> |
| implements HttpResponse { |
| int statusCode = 200; |
| @@ -744,26 +747,17 @@ class _HttpClientRequest extends _HttpOutboundMessage<HttpClientRequest> |
| // Transformer that transforms data to HTTP Chunked Encoding. |
| class _ChunkedTransformer extends StreamEventTransformer<List<int>, List<int>> { |
| - final bool writeEnd; |
| - _ChunkedTransformer({this.writeEnd: true}); |
| - |
| void handleData(List<int> data, EventSink<List<int>> sink) { |
| - _addChunk(data, sink.add); |
| + sink.add(_chunkHeader(data.length)); |
| + if (data.length > 0) sink.add(data); |
| + sink.add(_chunkFooter); |
| } |
| void handleDone(EventSink<List<int>> sink) { |
| - if (writeEnd) { |
| - _addChunk([], sink.add); |
| - } |
| + handleData([], sink); |
| sink.close(); |
| } |
| - static void _addChunk(List<int> data, void add(List<int> data)) { |
| - add(_chunkHeader(data.length)); |
| - if (data.length > 0) add(data); |
| - add(_chunkFooter); |
| - } |
| - |
| static List<int> _chunkHeader(int length) { |
| const hexDigits = const [0x30, 0x31, 0x32, 0x33, 0x34, 0x35, 0x36, 0x37, |
| 0x38, 0x39, 0x41, 0x42, 0x43, 0x44, 0x45, 0x46]; |
| @@ -786,77 +780,42 @@ class _ChunkedTransformer extends StreamEventTransformer<List<int>, List<int>> { |
| } |
| -// Transformer that invokes [_onDone] when completed. |
| -class _DoneTransformer implements StreamTransformer<List<int>, List<int>> { |
| - final StreamController<List<int>> _controller |
| - = new StreamController<List<int>>(); |
| - final Function _onDone; |
| +// Transformer that validates the content length. |
| +class _ContentLengthValidator |
| + extends StreamEventTransformer<List<int>, List<int>> { |
| + final int expectedContentLength; |
| + int _bytesWritten = 0; |
| - _DoneTransformer(this._onDone); |
| + _ContentLengthValidator(int this.expectedContentLength); |
| - Stream<List<int>> bind(Stream<List<int>> stream) { |
| - var subscription = stream.listen( |
| - _controller.add, |
| - onError: _controller.addError, |
| - onDone: () { |
| - _onDone(); |
| - _controller.close(); |
| - }); |
| - return _controller.stream; |
| + void handleData(List<int> data, EventSink<List<int>> sink) { |
| + _bytesWritten += data.length; |
| + if (_bytesWritten > expectedContentLength) { |
| + sink.addError(new HttpException( |
| + "Content size exceeds specified contentLength. " |
| + "$_bytesWritten bytes written while expected " |
| + "$expectedContentLength. " |
| + "[${new String.fromCharCodes(data)}]")); |
| + sink.close(); |
| + } else { |
| + sink.add(data); |
| + } |
| } |
| -} |
| - |
| -// Transformer that validates the data written. |
| -class _DataValidatorTransformer |
| - implements StreamTransformer<List<int>, List<int>> { |
| - final StreamController<List<int>> _controller = |
| - new StreamController<List<int>>(); |
| - int _bytesWritten = 0; |
| - int expectedTransferLength; |
| - |
| - Stream<List<int>> bind(Stream<List<int>> stream) { |
| - var subscription; |
| - subscription = stream.listen( |
| - (data) { |
| - if (expectedTransferLength != null) { |
| - _bytesWritten += data.length; |
| - if (_bytesWritten > expectedTransferLength) { |
| - subscription.cancel(); |
| - _controller.addError(new HttpException( |
| - "Content size exceeds specified contentLength. " |
| - "$_bytesWritten bytes written while expected " |
| - "$expectedTransferLength. " |
| - "[${new String.fromCharCodes(data)}]")); |
| - _controller.close(); |
| - return; |
| - } |
| - } |
| - _controller.add(data); |
| - }, |
| - onError: (error) { |
| - _controller.addError(error); |
| - _controller.close(); |
| - }, |
| - onDone: () { |
| - if (expectedTransferLength != null) { |
| - if (_bytesWritten < expectedTransferLength) { |
| - _controller.addError(new HttpException( |
| - "Content size below specified contentLength. " |
| - " $_bytesWritten bytes written while expected " |
| - "$expectedTransferLength.")); |
| - } |
| - } |
| - _controller.close(); |
| - }, |
| - unsubscribeOnError: true); |
| - return _controller.stream; |
| + void handleDone(EventSink<List<int>> sink) { |
| + if (_bytesWritten < expectedContentLength) { |
| + sink.addError(new HttpException( |
| + "Content size below specified contentLength. " |
| + " $_bytesWritten bytes written while expected " |
| + "$expectedContentLength.")); |
| + } |
| + sink.close(); |
| } |
| } |
| + |
| // Extends StreamConsumer as this is an internal type, only used to pipe to. |
| class _HttpOutgoing implements StreamConsumer<List<int>, dynamic> { |
| - final _DataValidatorTransformer _validator = new _DataValidatorTransformer(); |
| Function _onStream; |
| final Completer _consumeCompleter = new Completer(); |
| @@ -865,12 +824,8 @@ class _HttpOutgoing implements StreamConsumer<List<int>, dynamic> { |
| return _consumeCompleter.future; |
| } |
| - void setTransferLength(int transferLength) { |
| - _validator.expectedTransferLength = transferLength; |
| - } |
| - |
| Future consume(Stream<List<int>> stream) { |
| - _onStream(stream.transform(_validator)) |
| + _onStream(stream) |
| .then((_) => _consumeCompleter.complete(), |
| onError: _consumeCompleter.completeError); |
| // Use .then to ensure a Future branch. |