Index: sdk/lib/io/http_impl.dart |
diff --git a/sdk/lib/io/http_impl.dart b/sdk/lib/io/http_impl.dart |
index 55445491b8d6c28110af30ee02d3b9e8ca1ea0a7..f2204bf0cca80140e7125b2f91b7da00b4e616c9 100644 |
--- a/sdk/lib/io/http_impl.dart |
+++ b/sdk/lib/io/http_impl.dart |
@@ -449,6 +449,7 @@ abstract class _HttpOutboundMessage<T> implements IOSink { |
ioSink.close(); |
return stream.reduce(null, (x, y) {}).then((_) => this); |
} |
+ stream = stream.transform(new _BufferTransformer()); |
if (headers.chunkedTransferEncoding) { |
if (asGZip) { |
stream = stream.transform(new ZLibDeflater(gzip: true, level: 6)); |
@@ -476,6 +477,40 @@ class _HttpOutboundConsumer implements StreamConsumer { |
} |
+class _BufferTransformer extends StreamEventTransformer<List<int>, List<int>> { |
+ const int MIN_CHUNK_SIZE = 4 * 1024; |
+ const int MAX_BUFFER_SIZE = 16 * 1024; |
+ |
+ final _BufferList _buffer = new _BufferList(); |
+ |
+ void handleData(List<int> data, EventSink<List<int>> sink) { |
+ // TODO(ajohnsen): Use timeout? |
+ if (data.length == 0) return; |
+ if (data.length >= MIN_CHUNK_SIZE) { |
+ flush(sink); |
+ sink.add(data); |
+ } else { |
+ _buffer.add(data); |
+ if (_buffer.length >= MAX_BUFFER_SIZE) { |
+ flush(sink); |
+ } |
+ } |
+ } |
+ |
+ void handleDone(EventSink<List<int>> sink) { |
+ flush(sink); |
+ sink.close(); |
+ } |
+ |
+ void flush(EventSink<List<int>> sink) { |
+ if (_buffer.length > 0) { |
+ sink.add(_buffer.readBytes()); |
+ _buffer.clear(); |
+ } |
+ } |
+} |
+ |
+ |
class _HttpResponse extends _HttpOutboundMessage<HttpResponse> |
implements HttpResponse { |
int statusCode = 200; |
@@ -514,19 +549,20 @@ class _HttpResponse extends _HttpOutboundMessage<HttpResponse> |
HttpConnectionInfo get connectionInfo => _httpRequest.connectionInfo; |
void _writeHeader() { |
- writeSP() => _ioSink.writeBytes([_CharCode.SP]); |
- writeCRLF() => _ioSink.writeBytes([_CharCode.CR, _CharCode.LF]); |
+ var buffer = new _BufferList(); |
+ writeSP() => buffer.add(const [_CharCode.SP]); |
+ writeCRLF() => buffer.add(const [_CharCode.CR, _CharCode.LF]); |
// Write status line. |
if (headers.protocolVersion == "1.1") { |
- _ioSink.writeBytes(_Const.HTTP11); |
+ buffer.add(_Const.HTTP11); |
} else { |
- _ioSink.writeBytes(_Const.HTTP10); |
+ buffer.add(_Const.HTTP10); |
} |
writeSP(); |
- _ioSink.write(statusCode.toString()); |
+ buffer.add(statusCode.toString().codeUnits); |
writeSP(); |
- _ioSink.write(reasonPhrase); |
+ buffer.add(reasonPhrase.codeUnits); |
writeCRLF(); |
var session = _httpRequest._session; |
@@ -560,8 +596,9 @@ class _HttpResponse extends _HttpOutboundMessage<HttpResponse> |
headers._finalize(); |
// Write headers. |
- headers._write(_ioSink); |
+ headers._write(buffer); |
writeCRLF(); |
+ _ioSink.writeBytes(buffer.readBytes()); |
} |
String _findReasonPhrase(int statusCode) { |
@@ -710,10 +747,11 @@ class _HttpClientRequest extends _HttpOutboundMessage<HttpClientRequest> |
} |
void _writeHeader() { |
- writeSP() => _ioSink.writeBytes([_CharCode.SP]); |
- writeCRLF() => _ioSink.writeBytes([_CharCode.CR, _CharCode.LF]); |
+ var buffer = new _BufferList(); |
+ writeSP() => buffer.add(const [_CharCode.SP]); |
+ writeCRLF() => buffer.add(const [_CharCode.CR, _CharCode.LF]); |
- _ioSink.write(method); |
+ buffer.add(method.codeUnits); |
writeSP(); |
// Send the path for direct connections and the whole URL for |
// proxy connections. |
@@ -727,12 +765,12 @@ class _HttpClientRequest extends _HttpOutboundMessage<HttpClientRequest> |
path = "${path}?${uri.query}"; |
} |
} |
- _ioSink.write(path); |
+ buffer.add(path.codeUnits); |
} else { |
- _ioSink.write(uri.toString()); |
+ buffer.add(uri.toString().codeUnits); |
} |
writeSP(); |
- _ioSink.writeBytes(_Const.HTTP11); |
+ buffer.add(_Const.HTTP11); |
writeCRLF(); |
// Add the cookies to the headers. |
@@ -750,8 +788,9 @@ class _HttpClientRequest extends _HttpOutboundMessage<HttpClientRequest> |
headers._finalize(); |
// Write headers. |
- headers._write(_ioSink); |
+ headers._write(buffer); |
writeCRLF(); |
+ _ioSink.writeBytes(buffer.readBytes()); |
} |
} |