Index: sdk/lib/io/http_impl.dart |
diff --git a/sdk/lib/io/http_impl.dart b/sdk/lib/io/http_impl.dart |
index 625154aceb8f6e1f30b63a1430f7dbec0d52cf07..55445491b8d6c28110af30ee02d3b9e8ca1ea0a7 100644 |
--- a/sdk/lib/io/http_impl.dart |
+++ b/sdk/lib/io/http_impl.dart |
@@ -322,16 +322,15 @@ abstract class _HttpOutboundMessage<T> implements IOSink { |
// requests and in error handling. |
bool _ignoreBody = false; |
bool _headersWritten = false; |
- bool _chunked = false; |
- final IOSink _ioSink; |
+ IOSink _ioSink; |
final _HttpOutgoing _outgoing; |
final _HttpHeaders headers; |
_HttpOutboundMessage(String protocolVersion, _HttpOutgoing outgoing) |
: _outgoing = outgoing, |
- _ioSink = new IOSink(outgoing, encoding: Encoding.ASCII), |
+ _ioSink = new IOSink(outgoing, encoding: Encoding.ASCII), |
headers = new _HttpHeaders(protocolVersion); |
int get contentLength => headers.contentLength; |
@@ -360,7 +359,6 @@ abstract class _HttpOutboundMessage<T> implements IOSink { |
void write(Object obj) { |
_writeHeaders(); |
- if (_ignoreBody) return; |
// This comment is copied from runtime/lib/string_buffer_patch.dart. |
// TODO(srdjan): The following four lines could be replaced by |
// '$obj', but apparently this is too slow on the Dart VM. |
@@ -374,12 +372,7 @@ abstract class _HttpOutboundMessage<T> implements IOSink { |
} |
} |
if (string.isEmpty) return; |
- if (_chunked) { |
- _ChunkedTransformer._addChunk(_encodeString(string, encoding), |
- _ioSink.writeBytes); |
- } else { |
- _ioSink.write(string); |
- } |
+ _ioSink.write(string); |
} |
void writeAll(Iterable objects) { |
@@ -397,31 +390,17 @@ abstract class _HttpOutboundMessage<T> implements IOSink { |
void writeBytes(List<int> data) { |
_writeHeaders(); |
- if (_ignoreBody || data.length == 0) return; |
- if (_chunked) { |
- _ChunkedTransformer._addChunk(data, _ioSink.writeBytes); |
- } else { |
- _ioSink.writeBytes(data); |
- } |
+ if (data.length == 0) return; |
+ _ioSink.writeBytes(data); |
} |
Future<T> consume(Stream<List<int>> stream) { |
_writeHeaders(); |
- if (_ignoreBody) return new Future.immediate(this); |
- if (_chunked) { |
- // Transform when chunked. |
- stream = stream.transform(new _ChunkedTransformer()); |
- } |
- return _ioSink.consume(stream).then((_) => this); |
+ return _ioSink.consume(stream); |
} |
Future<T> writeStream(Stream<List<int>> stream) { |
_writeHeaders(); |
- if (_ignoreBody) return new Future.immediate(this); |
- if (_chunked) { |
- // Transform when chunked. |
- stream = stream.transform(new _ChunkedTransformer(writeEnd: false)); |
- } |
return _ioSink.writeStream(stream).then((_) => this); |
} |
@@ -429,39 +408,74 @@ abstract class _HttpOutboundMessage<T> implements IOSink { |
if (!_headersWritten && !_ignoreBody && headers.chunkedTransferEncoding) { |
// If no body was written, _ignoreBody is false (it's not a HEAD |
// request) and the content-length is unspecified, set contentLength to 0. |
+ headers.chunkedTransferEncoding = false; |
headers.contentLength = 0; |
} |
_writeHeaders(); |
- if (!_ignoreBody) { |
- if (_chunked) { |
- _ChunkedTransformer._addChunk([], _ioSink.writeBytes); |
- } |
- } |
_ioSink.close(); |
} |
- Future<T> get done => _ioSink.done.then((_) => this); |
+ Future<T> get done { |
+ _writeHeaders(); |
+ return _ioSink.done; |
+ } |
void _writeHeaders() { |
if (_headersWritten) return; |
_headersWritten = true; |
_ioSink.encoding = Encoding.ASCII; |
+ headers._synchronize(); // Be sure the 'chunked' option is updated. |
+ bool asGZip = false; |
+ bool isServerSide = this is _HttpResponse; |
+ if (isServerSide && headers.chunkedTransferEncoding) { |
+ List acceptEncodings = |
+ _httpRequest.headers[HttpHeaders.ACCEPT_ENCODING]; |
+ List contentEncoding = headers[HttpHeaders.CONTENT_ENCODING]; |
+ if (acceptEncodings != null && |
+ acceptEncodings.any((encoding) => encoding.toLowerCase() == "gzip") && |
+ contentEncoding == null) { |
+ headers.set(HttpHeaders.CONTENT_ENCODING, "gzip"); |
+ asGZip = true; |
+ } |
+ } |
_writeHeader(); |
+ _ioSink = new IOSink(new _HttpOutboundConsumer(_ioSink, _consume, asGZip)); |
_ioSink.encoding = encoding; |
+ } |
+ |
+ Future _consume(IOSink ioSink, Stream<List<int>> stream, bool asGZip) { |
+ int contentLength = headers.contentLength; |
if (_ignoreBody) { |
- _ioSink.close(); |
- return; |
+ ioSink.close(); |
+ return stream.reduce(null, (x, y) {}).then((_) => this); |
} |
- _chunked = headers.chunkedTransferEncoding; |
- if (headers.contentLength >= 0) { |
- _outgoing.setTransferLength(headers.contentLength); |
+ if (headers.chunkedTransferEncoding) { |
+ if (asGZip) { |
+ stream = stream.transform(new ZLibDeflater(gzip: true, level: 6)); |
+ } |
+ stream = stream.transform(new _ChunkedTransformer()); |
+ } else if (contentLength >= 0) { |
+ stream = stream.transform(new _ContentLengthValidator(contentLength)); |
} |
+ return stream.pipe(ioSink).then((_) => this); |
} |
void _writeHeader(); // TODO(ajohnsen): Better name. |
} |
+class _HttpOutboundConsumer implements StreamConsumer { |
+ Function _consume; |
+ IOSink _ioSink; |
+ bool _asGZip; |
+ _HttpOutboundConsumer(IOSink this._ioSink, |
+ Function this._consume, |
+ bool this._asGZip); |
+ |
+ Future consume(var stream) => _consume(_ioSink, stream, _asGZip); |
+} |
+ |
+ |
class _HttpResponse extends _HttpOutboundMessage<HttpResponse> |
implements HttpResponse { |
int statusCode = 200; |
@@ -744,26 +758,17 @@ class _HttpClientRequest extends _HttpOutboundMessage<HttpClientRequest> |
// Transformer that transforms data to HTTP Chunked Encoding. |
class _ChunkedTransformer extends StreamEventTransformer<List<int>, List<int>> { |
- final bool writeEnd; |
- _ChunkedTransformer({this.writeEnd: true}); |
- |
void handleData(List<int> data, EventSink<List<int>> sink) { |
- _addChunk(data, sink.add); |
+ sink.add(_chunkHeader(data.length)); |
+ if (data.length > 0) sink.add(data); |
+ sink.add(_chunkFooter); |
} |
void handleDone(EventSink<List<int>> sink) { |
- if (writeEnd) { |
- _addChunk([], sink.add); |
- } |
+ handleData([], sink); |
sink.close(); |
} |
- static void _addChunk(List<int> data, void add(List<int> data)) { |
- add(_chunkHeader(data.length)); |
- if (data.length > 0) add(data); |
- add(_chunkFooter); |
- } |
- |
static List<int> _chunkHeader(int length) { |
const hexDigits = const [0x30, 0x31, 0x32, 0x33, 0x34, 0x35, 0x36, 0x37, |
0x38, 0x39, 0x41, 0x42, 0x43, 0x44, 0x45, 0x46]; |
@@ -786,77 +791,42 @@ class _ChunkedTransformer extends StreamEventTransformer<List<int>, List<int>> { |
} |
-// Transformer that invokes [_onDone] when completed. |
-class _DoneTransformer implements StreamTransformer<List<int>, List<int>> { |
- final StreamController<List<int>> _controller |
- = new StreamController<List<int>>(); |
- final Function _onDone; |
+// Transformer that validates the content length. |
+class _ContentLengthValidator |
+ extends StreamEventTransformer<List<int>, List<int>> { |
+ final int expectedContentLength; |
+ int _bytesWritten = 0; |
- _DoneTransformer(this._onDone); |
+ _ContentLengthValidator(int this.expectedContentLength); |
- Stream<List<int>> bind(Stream<List<int>> stream) { |
- var subscription = stream.listen( |
- _controller.add, |
- onError: _controller.addError, |
- onDone: () { |
- _onDone(); |
- _controller.close(); |
- }); |
- return _controller.stream; |
+ void handleData(List<int> data, EventSink<List<int>> sink) { |
+ _bytesWritten += data.length; |
+ if (_bytesWritten > expectedContentLength) { |
+ sink.addError(new AsyncError(new HttpException( |
+ "Content size exceeds specified contentLength. " |
+ "$_bytesWritten bytes written while expected " |
+ "$expectedContentLength. " |
+ "[${new String.fromCharCodes(data)}]"))); |
+ sink.close(); |
+ } else { |
+ sink.add(data); |
+ } |
} |
-} |
- |
-// Transformer that validates the data written. |
-class _DataValidatorTransformer |
- implements StreamTransformer<List<int>, List<int>> { |
- final StreamController<List<int>> _controller = |
- new StreamController<List<int>>(); |
- int _bytesWritten = 0; |
- int expectedTransferLength; |
- |
- Stream<List<int>> bind(Stream<List<int>> stream) { |
- var subscription; |
- subscription = stream.listen( |
- (data) { |
- if (expectedTransferLength != null) { |
- _bytesWritten += data.length; |
- if (_bytesWritten > expectedTransferLength) { |
- subscription.cancel(); |
- _controller.addError(new HttpException( |
- "Content size exceeds specified contentLength. " |
- "$_bytesWritten bytes written while expected " |
- "$expectedTransferLength. " |
- "[${new String.fromCharCodes(data)}]")); |
- _controller.close(); |
- return; |
- } |
- } |
- _controller.add(data); |
- }, |
- onError: (error) { |
- _controller.addError(error); |
- _controller.close(); |
- }, |
- onDone: () { |
- if (expectedTransferLength != null) { |
- if (_bytesWritten < expectedTransferLength) { |
- _controller.addError(new HttpException( |
- "Content size below specified contentLength. " |
- " $_bytesWritten bytes written while expected " |
- "$expectedTransferLength.")); |
- } |
- } |
- _controller.close(); |
- }, |
- unsubscribeOnError: true); |
- return _controller.stream; |
+ void handleDone(EventSink<List<int>> sink) { |
+ if (_bytesWritten < expectedContentLength) { |
+ sink.addError(new AsyncError(new HttpException( |
+ "Content size below specified contentLength. " |
+ " $_bytesWritten bytes written while expected " |
+ "$expectedContentLength."))); |
+ } |
+ sink.close(); |
} |
} |
+ |
// Extends StreamConsumer as this is an internal type, only used to pipe to. |
class _HttpOutgoing implements StreamConsumer<List<int>, dynamic> { |
- final _DataValidatorTransformer _validator = new _DataValidatorTransformer(); |
Function _onStream; |
final Completer _consumeCompleter = new Completer(); |
@@ -865,12 +835,8 @@ class _HttpOutgoing implements StreamConsumer<List<int>, dynamic> { |
return _consumeCompleter.future; |
} |
- void setTransferLength(int transferLength) { |
- _validator.expectedTransferLength = transferLength; |
- } |
- |
Future consume(Stream<List<int>> stream) { |
- _onStream(stream.transform(_validator)) |
+ _onStream(stream) |
.then((_) => _consumeCompleter.complete(), |
onError: _consumeCompleter.completeError); |
// Use .then to ensure a Future branch. |