Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(91)

Unified Diff: sdk/lib/io/http_impl.dart

Issue 185543004: Merge all http-outgoing transfomers into _HttpOutgoing, including simpler(better) buffering. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Cleanup Created 6 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « runtime/bin/socket_patch.dart ('k') | sdk/lib/io/io_sink.dart » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: sdk/lib/io/http_impl.dart
diff --git a/sdk/lib/io/http_impl.dart b/sdk/lib/io/http_impl.dart
index 5bc5b97cae902e5b01162391de71b196bc53bc52..7990e05ffeb4ae4de9ef0e413f9dd0d693ba9ec1 100644
--- a/sdk/lib/io/http_impl.dart
+++ b/sdk/lib/io/http_impl.dart
@@ -4,7 +4,7 @@
part of dart.io;
-const int _HEADERS_BUFFER_SIZE = 8 * 1024;
+const int _OUTGOING_BUFFER_SIZE = 8 * 1024;
class _HttpIncoming extends Stream<List<int>> {
final int _transferLength;
@@ -406,28 +406,25 @@ class _HttpClientResponse
}
-abstract class _HttpOutboundMessage<T> implements IOSink {
+abstract class _HttpOutboundMessage<T> extends _IOSinkImpl {
// Used to mark when the body should be written. This is used for HEAD
// requests and in error handling.
bool _ignoreBody = false;
bool _headersWritten = false;
- bool _asGZip = false;
- IOSink _headersSink;
- IOSink _dataSink;
-
- final _HttpOutgoing _outgoing;
final Uri _uri;
+ final _HttpOutgoing _outgoing;
final _HttpHeaders headers;
_HttpOutboundMessage(this._uri,
String protocolVersion,
- _HttpOutgoing outgoing)
- : _outgoing = outgoing,
- _headersSink = new IOSink(outgoing, encoding: ASCII),
+ this._outgoing)
+ : super(new _HttpOutboundConsumer(), null),
headers = new _HttpHeaders(protocolVersion) {
- _dataSink = new IOSink(new _HttpOutboundConsumer(this));
+ _outgoing.outbound = this;
+ (_target as _HttpOutboundConsumer).outbound = this;
+ _encodingMutable = false;
}
int get contentLength => headers.contentLength;
@@ -450,60 +447,30 @@ abstract class _HttpOutboundMessage<T> implements IOSink {
return Encoding.getByName(charset);
}
- void set encoding(Encoding value) {
- throw new StateError("IOSink encoding is not mutable");
- }
-
- void write(Object obj) {
- if (!_headersWritten) _dataSink.encoding = encoding;
- _dataSink.write(obj);
- }
-
- void writeAll(Iterable objects, [String separator = ""]) {
- if (!_headersWritten) _dataSink.encoding = encoding;
- _dataSink.writeAll(objects, separator);
- }
-
- void writeln([Object obj = ""]) {
- if (!_headersWritten) _dataSink.encoding = encoding;
- _dataSink.writeln(obj);
- }
-
- void writeCharCode(int charCode) {
- if (!_headersWritten) _dataSink.encoding = encoding;
- _dataSink.writeCharCode(charCode);
- }
-
void add(List<int> data) {
if (data.length == 0) return;
- _dataSink.add(data);
+ super.add(data);
}
- void addError(error, [StackTrace stackTrace]) =>
- _dataSink.addError(error, stackTrace);
-
- Future<T> addStream(Stream<List<int>> stream) => _dataSink.addStream(stream);
-
- Future flush() => _dataSink.flush();
-
- Future close() => _dataSink.close();
-
- Future<T> get done => _dataSink.done;
-
- Future _writeHeaders({bool drainRequest: true}) {
- void write() {
+ Future _writeHeaders({bool drainRequest: true,
+ bool setOutgoing: true}) {
+ // TODO(ajohnsen): Avoid excessive futures in this method.
+ write() {
try {
_writeHeader();
- } catch (error) {
+ } catch (_) {
// Headers too large.
throw new HttpException(
- "Headers size exceeded the of '$_HEADERS_BUFFER_SIZE' bytes");
+ "Headers size exceeded the of '$_OUTGOING_BUFFER_SIZE'"
+ " bytes");
}
+ return this;
}
- if (_headersWritten) return new Future.value();
+ if (_headersWritten) return new Future.value(this);
_headersWritten = true;
- _dataSink.encoding = encoding;
+ Future drainFuture;
bool isServerSide = this is _HttpResponse;
+ bool gzip = false;
if (isServerSide) {
var response = this;
if (headers.chunkedTransferEncoding) {
@@ -516,42 +483,51 @@ abstract class _HttpOutboundMessage<T> implements IOSink {
.any((encoding) => encoding.trim().toLowerCase() == "gzip") &&
contentEncoding == null) {
headers.set(HttpHeaders.CONTENT_ENCODING, "gzip");
- _asGZip = true;
+ gzip = true;
}
}
if (drainRequest && !response._httpRequest._incoming.hasSubscriber) {
- return response._httpRequest.drain()
- // TODO(ajohnsen): Timeout on drain?
- .catchError((_) {}) // Ignore errors.
- .then((_) => write());
+ drainFuture = response._httpRequest.drain().catchError((_) {});
}
+ } else {
+ drainRequest = false;
+ }
+ if (_ignoreBody) {
+ return new Future.sync(write).then((_) => _outgoing.close());
+ }
+ if (setOutgoing) {
+ int contentLength = headers.contentLength;
+ if (headers.chunkedTransferEncoding) {
+ _outgoing.chunked = true;
+ if (gzip) _outgoing.gzip = true;
+ } else if (contentLength >= 0) {
+ _outgoing.contentLength = contentLength;
+ }
+ }
+ if (drainFuture != null) {
+ return drainFuture.then((_) => write());
}
return new Future.sync(write);
}
Future _addStream(Stream<List<int>> stream) {
- return _writeHeaders()
- .then((_) {
- int contentLength = headers.contentLength;
- if (_ignoreBody) {
- stream.drain().catchError((_) {});
- return _headersSink.close();
- }
- stream = stream.transform(const _BufferTransformer());
- if (headers.chunkedTransferEncoding) {
- if (_asGZip) {
- stream = stream.transform(GZIP.encoder);
- }
- stream = stream.transform(const _ChunkedTransformer());
- } else if (contentLength >= 0) {
- stream = stream.transform(
- new _ContentLengthValidator(contentLength, _uri));
- }
- return _headersSink.addStream(stream);
- });
+ // TODO(ajohnsen): Merge into _HttpOutgoing.
+ if (_ignoreBody) {
+ stream.drain().catchError((_) {});
+ return _writeHeaders();
+ }
+ if (_headersWritten) {
+ return _outgoing.addStream(stream);
+ } else {
+ var completer = new Completer.sync();
+ var future = _outgoing.addStream(stream, completer.future);
+ _writeHeaders().then(completer.complete);
+ return future;
+ }
}
Future _close() {
+ // TODO(ajohnsen): Merge into _HttpOutgoing.
if (!_headersWritten) {
if (!_ignoreBody && headers.contentLength == -1) {
// If no body was written, _ignoreBody is false (it's not a HEAD
@@ -560,14 +536,14 @@ abstract class _HttpOutboundMessage<T> implements IOSink {
headers.chunkedTransferEncoding = false;
headers.contentLength = 0;
} else if (!_ignoreBody && headers.contentLength > 0) {
- _headersSink.addError(new HttpException(
- "No content while contentLength was specified to be greater "
- "than 0: ${headers.contentLength}.",
- uri: _uri));
- return _headersSink.done;
+ return _outgoing.addStream(
+ new Stream.fromFuture(new Future.error(new HttpException(
+ "No content even though contentLength was specified to be "
+ "greater than 0: ${headers.contentLength}.",
+ uri: _uri))));
}
}
- return _writeHeaders().whenComplete(_headersSink.close);
+ return _writeHeaders().whenComplete(_outgoing.close);
}
void _writeHeader();
@@ -575,146 +551,13 @@ abstract class _HttpOutboundMessage<T> implements IOSink {
class _HttpOutboundConsumer implements StreamConsumer {
- final _HttpOutboundMessage _outbound;
- StreamController _controller;
- StreamSubscription _subscription;
- Completer _closeCompleter = new Completer();
- Completer _completer;
- bool _socketError = false;
-
- _HttpOutboundConsumer(this._outbound);
-
- void _cancel() {
- if (_subscription != null) {
- StreamSubscription subscription = _subscription;
- _subscription = null;
- subscription.cancel();
- }
- }
-
- bool _ignoreError(error)
- => (error is SocketException || error is TlsException) &&
- _outbound is HttpResponse;
-
- _ensureController() {
- if (_controller != null) return;
- _controller = new StreamController(sync: true,
- onPause: () => _subscription.pause(),
- onResume: () => _subscription.resume(),
- onListen: () => _subscription.resume(),
- onCancel: _cancel);
- _outbound._addStream(_controller.stream)
- .then((_) {
- _cancel();
- _done();
- _closeCompleter.complete(_outbound);
- },
- onError: (error, [StackTrace stackTrace]) {
- _socketError = true;
- if (_ignoreError(error)) {
- _cancel();
- _done();
- _closeCompleter.complete(_outbound);
- } else {
- if (!_done(error)) {
- _closeCompleter.completeError(error, stackTrace);
- }
- }
- });
- }
+ // TODO(ajohnsen): Once _addStream and _close is merged into _HttpOutgoing,
+ // this class can be removed.
+ _HttpOutboundMessage outbound;
+ _HttpOutboundConsumer();
- bool _done([error, StackTrace stackTrace]) {
- if (_completer == null) return false;
- if (error != null) {
- _completer.completeError(error, stackTrace);
- } else {
- _completer.complete(_outbound);
- }
- _completer = null;
- return true;
- }
-
- Future addStream(var stream) {
- // If we saw a socket error subscribe and then cancel, to ignore any data
- // on the stream.
- if (_socketError) {
- stream.listen(null).cancel();
- return new Future.value(_outbound);
- }
- _completer = new Completer();
- _subscription = stream.listen(
- (data) => _controller.add(data),
- onDone: _done,
- onError: (e, s) => _controller.addError(e, s),
- cancelOnError: true);
- // Pause the first request.
- if (_controller == null) _subscription.pause();
- _ensureController();
- return _completer.future;
- }
-
- Future close() {
- Future closeOutbound() {
- if (_socketError) return new Future.value(_outbound);
- return _outbound._close()
- .catchError((_) {}, test: _ignoreError)
- .then((_) => _outbound);
- }
- if (_controller == null) return closeOutbound();
- _controller.close();
- return _closeCompleter.future.then((_) => closeOutbound());
- }
-}
-
-
-class _BufferTransformerSink implements EventSink<List<int>> {
- static const int MIN_CHUNK_SIZE = 4 * 1024;
- static const int MAX_BUFFER_SIZE = 16 * 1024;
-
- final BytesBuilder _builder = new BytesBuilder();
- final EventSink<List<int>> _outSink;
-
- _BufferTransformerSink(this._outSink);
-
- void add(List<int> data) {
- // TODO(ajohnsen): Use timeout?
- if (data.length == 0) return;
- if (data.length >= MIN_CHUNK_SIZE) {
- flush();
- _outSink.add(data);
- } else {
- _builder.add(data);
- if (_builder.length >= MAX_BUFFER_SIZE) {
- flush();
- }
- }
- }
-
- void addError(Object error, [StackTrace stackTrace]) {
- _outSink.addError(error, stackTrace);
- }
-
- void close() {
- flush();
- _outSink.close();
- }
-
- void flush() {
- if (_builder.length > 0) {
- // takeBytes will clear the BytesBuilder.
- _outSink.add(_builder.takeBytes());
- }
- }
-}
-
-class _BufferTransformer implements StreamTransformer<List<int>, List<int>> {
- const _BufferTransformer();
-
- Stream<List<int>> bind(Stream<List<int>> stream) {
- return new Stream<List<int>>.eventTransformed(
- stream,
- (EventSink outSink) => new _BufferTransformerSink(outSink));
- }
+ Future addStream(var stream) => outbound._addStream(stream);
+ Future close() => outbound._close();
}
@@ -763,7 +606,8 @@ class _HttpResponse extends _HttpOutboundMessage<HttpResponse>
if (_headersWritten) throw new StateError("Headers already sent");
deadline = null; // Be sure to stop any deadline.
var future = _httpRequest._httpConnection.detachSocket();
- _writeHeaders(drainRequest: false).then((_) => close());
+ _writeHeaders(drainRequest: false,
+ setOutgoing: false).then((_) => close());
// Close connection so the socket is 'free'.
close();
done.catchError((_) {
@@ -783,12 +627,13 @@ class _HttpResponse extends _HttpOutboundMessage<HttpResponse>
if (_deadline == null) return;
_deadlineTimer = new Timer(_deadline, () {
+ _outgoing._socketError = true;
_outgoing.socket.destroy();
});
}
void _writeHeader() {
- Uint8List buffer = _httpRequest._httpConnection._headersBuffer;
+ Uint8List buffer = new Uint8List(_OUTGOING_BUFFER_SIZE);
int offset = 0;
void write(List<int> bytes) {
@@ -847,7 +692,7 @@ class _HttpResponse extends _HttpOutboundMessage<HttpResponse>
offset = headers._write(buffer, offset);
buffer[offset++] = _CharCode.CR;
buffer[offset++] = _CharCode.LF;
- _headersSink.add(new Uint8List.view(buffer.buffer, 0, offset));
+ _outgoing.setHeader(buffer, offset);
}
String _findReasonPhrase(int statusCode) {
@@ -1036,7 +881,7 @@ class _HttpClientRequest extends _HttpOutboundMessage<HttpClientResponse>
}
void _writeHeader() {
- Uint8List buffer = _httpClientConnection._headersBuffer;
+ Uint8List buffer = new Uint8List(_OUTGOING_BUFFER_SIZE);
int offset = 0;
void write(List<int> bytes) {
@@ -1074,41 +919,268 @@ class _HttpClientRequest extends _HttpOutboundMessage<HttpClientResponse>
offset = headers._write(buffer, offset);
buffer[offset++] = _CharCode.CR;
buffer[offset++] = _CharCode.LF;
- _headersSink.add(new Uint8List.view(buffer.buffer, 0, offset));
+ _outgoing.setHeader(buffer, offset);
}
}
+// Used by _HttpOutgoing as a target of a chunked converter for gzip
+// compression.
+class _HttpGZipSink extends ByteConversionSink {
+ final Function _consume;
+ _HttpGZipSink(this._consume);
+
+ void add(List<int> chunk) {
+ _consume(chunk);
+ }
-class _ChunkedTransformerSink implements EventSink<List<int>> {
+ void addSlice(Uint8List chunk, int start, int end, bool isLast) {
+ _consume(new Uint8List.view(chunk.buffer, start, end - start));
+ }
- int _pendingFooter = 0;
- final EventSink<List<int>> _outSink;
+ void close() {}
+}
- _ChunkedTransformerSink(this._outSink);
- void add(List<int> data) {
- _outSink.add(_chunkHeader(data.length));
- if (data.length > 0) _outSink.add(data);
- _pendingFooter = 2;
+// The _HttpOutgoing handles all of the following:
+// - Buffering
+// - GZip compressionm
+// - Content-Length validation.
+// - Errors.
+//
+// Most notable is the GZip compression, that uses a double-buffering system,
+// one before gzip (_gzipBuffer) and one after (_buffer).
+class _HttpOutgoing
+ implements StreamConsumer<List<int>> {
+ static const List<int> _footerAndChunk0Length =
+ const [_CharCode.CR, _CharCode.LF, 0x30, _CharCode.CR, _CharCode.LF,
+ _CharCode.CR, _CharCode.LF];
+
+ static const List<int> _chunk0Length =
+ const [0x30, _CharCode.CR, _CharCode.LF, _CharCode.CR, _CharCode.LF];
+
+ final Completer _doneCompleter = new Completer();
+ final Socket socket;
+
+ Uint8List _buffer;
+ int _length = 0;
+
+ Future _closeFuture;
+
+ bool chunked = false;
+ int _pendingChunkedFooter = 0;
+
+ int contentLength;
+ int _bytesWritten = 0;
+
+ bool _gzip = false;
+ ByteConversionSink _gzipSink;
+ // _gzipAdd is set iff the sink is being added to. It's used to specify where
+ // gzipped data should be taken (sometimes a controller, sometimes a socket).
+ Function _gzipAdd;
+ Uint8List _gzipBuffer;
+ int _gzipBufferLength = 0;
+
+ bool _socketError = false;
+
+ _HttpOutboundMessage outbound;
+
+ bool _ignoreError(error)
+ => (error is SocketException || error is TlsException) &&
+ outbound is HttpResponse;
+
+ _HttpOutgoing(this.socket);
+
+ Future addStream(Stream<List<int>> stream, [Future pauseFuture]) {
+ if (_socketError) {
+ stream.listen(null).cancel();
+ return new Future.value(outbound);
+ }
+ var sub;
+ var controller;
+ // Use new stream so we are able to pause (see below listen). The
+ // alternative is to use stream.extand, but that won't give us a way of
+ // pausing.
+ controller = new StreamController(
+ onPause: () => sub.pause(),
+ onResume: () => sub.resume(),
+ sync: true);
+
+ void onData(data) {
+ if (_socketError) return;
+ if (data.length == 0) return;
+ if (chunked) {
+ if (_gzip) {
+ _gzipAdd = controller.add;
+ _addGZipChunk(data, _gzipSink.add);
+ _gzipAdd = null;
+ return;
+ }
+ _addChunk(_chunkHeader(data.length), controller.add);
+ _pendingChunkedFooter = 2;
+ } else {
+ if (contentLength != null) {
+ _bytesWritten += data.length;
+ if (_bytesWritten > contentLength) {
+ controller.addError(new HttpException(
+ "Content size exceeds specified contentLength. "
+ "$_bytesWritten bytes written while expected "
+ "$contentLength. "
+ "[${new String.fromCharCodes(data)}]"));
+ return;
+ }
+ }
+ }
+ _addChunk(data, controller.add);
+ }
+
+ sub = stream.listen(
+ onData,
+ onError: controller.addError,
+ onDone: controller.close,
+ cancelOnError: true);
+
+ // While incoming is being drained, the pauseFuture is non-null. Pause
+ // output until it's drained.
+ if (pauseFuture != null) {
+ sub.pause(pauseFuture);
+ }
+
+ return socket.addStream(controller.stream)
+ .then((_) {
+ return outbound;
+ }, onError: (error) {
+ // Be sure to close it in case of an error.
+ if (_gzip) _gzipSink.close();
+ _socketError = true;
+ _doneCompleter.completeError(error);
+ if (_ignoreError(error)) {
+ return outbound;
+ } else {
+ throw error;
+ }
+ });
}
- void addError(Object error, [StackTrace stackTrace]) {
- _outSink.addError(error, stackTrace);
+ Future close() {
+ // If we are already closed, return that future.
+ if (_closeFuture != null) return _closeFuture;
+ // If we earlier saw an error, return immidiate. The notification to
+ // _Http*Connection is already done.
+ if (_socketError) return new Future.value(outbound);
+ // If contentLength was specified, validate it.
+ if (contentLength != null) {
+ if (_bytesWritten < contentLength) {
+ var error = new HttpException(
+ "Content size below specified contentLength. "
+ " $_bytesWritten bytes written but expected "
+ "$contentLength.");
+ _doneCompleter.completeError(error);
+ return _closeFuture = new Future.error(error);
+ }
+ }
+ // In case of chunked encoding (and gzip), handle remaining gzip data and
+ // append the 'footer' for chunked encoding.
+ if (chunked) {
+ if (_gzip) {
+ _gzipAdd = socket.add;
+ if (_gzipBufferLength > 0) {
+ _gzipSink.add(new Uint8List.view(
+ _gzipBuffer.buffer, 0, _gzipBufferLength));
+ }
+ _gzipBuffer = null;
+ _gzipSink.close();
+ _gzipAdd = null;
+ }
+ _addChunk(_chunkHeader(0), socket.add);
+ }
+ // Add any remaining data in the buffer.
+ if (_length > 0) {
+ socket.add(new Uint8List.view(_buffer.buffer, 0, _length));
+ }
+ // Clear references, for better GC.
+ _buffer = null;
+ // And finally flush it. As we support keep-alive, never close it from here.
+ // Once the socket is flushed, we'll be able to reuse it (signaled by the
+ // 'done' future).
+ return _closeFuture = socket.flush()
+ .then((_) {
+ _doneCompleter.complete(socket);
+ return outbound;
+ }, onError: (error) {
+ _doneCompleter.completeError(error);
+ if (_ignoreError(error)) {
+ return outbound;
+ } else {
+ throw error;
+ }
+ });
}
- void close() {
- add(const []);
- _outSink.close();
+ Future get done => _doneCompleter.future;
+
+ void setHeader(List<int> data, int length) {
+ assert(_length == 0);
+ assert(data.length == _OUTGOING_BUFFER_SIZE);
+ _buffer = data;
+ _length = length;
+ }
+
+ void set gzip(bool value) {
+ _gzip = value;
+ if (_gzip) {
+ _gzipBuffer = new Uint8List(_OUTGOING_BUFFER_SIZE);
+ assert(_gzipSink == null);
+ _gzipSink = new ZLibEncoder(gzip: true)
+ .startChunkedConversion(
+ new _HttpGZipSink((data) {
+ // We are closing down prematurely, due to an error. Discard.
+ if (_gzipAdd == null) return;
+ _addChunk(_chunkHeader(data.length), _gzipAdd);
+ _pendingChunkedFooter = 2;
+ _addChunk(data, _gzipAdd);
+ }));
+ }
+ }
+
+ void _addGZipChunk(chunk, void add(List<int> data)) {
+ if (chunk.length > _gzipBuffer.length - _gzipBufferLength) {
+ add(new Uint8List.view(
+ _gzipBuffer.buffer, 0, _gzipBufferLength));
+ _gzipBuffer = new Uint8List(_OUTGOING_BUFFER_SIZE);
+ _gzipBufferLength = 0;
+ }
+ if (chunk.length > _OUTGOING_BUFFER_SIZE) {
+ add(chunk);
+ } else {
+ _gzipBuffer.setRange(_gzipBufferLength,
+ _gzipBufferLength + chunk.length,
+ chunk);
+ _gzipBufferLength += chunk.length;
+ }
+ }
+
+ void _addChunk(chunk, void add(List<int> data)) {
+ if (chunk.length > _buffer.length - _length) {
+ add(new Uint8List.view(_buffer.buffer, 0, _length));
+ _buffer = new Uint8List(_OUTGOING_BUFFER_SIZE);
+ _length = 0;
+ }
+ if (chunk.length > _OUTGOING_BUFFER_SIZE) {
+ add(chunk);
+ } else {
+ _buffer.setRange(_length, _length + chunk.length, chunk);
+ _length += chunk.length;
+ }
}
List<int> _chunkHeader(int length) {
const hexDigits = const [0x30, 0x31, 0x32, 0x33, 0x34, 0x35, 0x36, 0x37,
0x38, 0x39, 0x41, 0x42, 0x43, 0x44, 0x45, 0x46];
if (length == 0) {
- if (_pendingFooter == 2) return _footerAndChunk0Length;
+ if (_pendingChunkedFooter == 2) return _footerAndChunk0Length;
return _chunk0Length;
}
- int size = _pendingFooter;
+ int size = _pendingChunkedFooter;
int len = length;
// Compute a fast integer version of (log(length + 1) / log(16)).ceil().
while (len > 0) {
@@ -1116,12 +1188,12 @@ class _ChunkedTransformerSink implements EventSink<List<int>> {
len >>= 4;
}
var footerAndHeader = new Uint8List(size + 2);
- if (_pendingFooter == 2) {
+ if (_pendingChunkedFooter == 2) {
footerAndHeader[0] = _CharCode.CR;
footerAndHeader[1] = _CharCode.LF;
}
int index = size;
- while (index > _pendingFooter) {
+ while (index > _pendingChunkedFooter) {
footerAndHeader[--index] = hexDigits[length & 15];
length = length >> 4;
}
@@ -1129,102 +1201,6 @@ class _ChunkedTransformerSink implements EventSink<List<int>> {
footerAndHeader[size + 1] = _CharCode.LF;
return footerAndHeader;
}
-
- static List<int> get _footerAndChunk0Length => new Uint8List.fromList(
- const [_CharCode.CR, _CharCode.LF, 0x30, _CharCode.CR, _CharCode.LF,
- _CharCode.CR, _CharCode.LF]);
-
- static List<int> get _chunk0Length => new Uint8List.fromList(
- const [0x30, _CharCode.CR, _CharCode.LF, _CharCode.CR, _CharCode.LF]);
-}
-
-// Transformer that transforms data to HTTP Chunked Encoding.
-class _ChunkedTransformer implements StreamTransformer<List<int>, List<int>> {
- const _ChunkedTransformer();
-
- Stream<List<int>> bind(Stream<List<int>> stream) {
- return new Stream<List<int>>.eventTransformed(
- stream,
- (EventSink<List<int>> sink) => new _ChunkedTransformerSink(sink));
- }
-}
-
-// Transformer that validates the content length.
-class _ContentLengthValidator
- implements StreamTransformer<List<int>, List<int>>, EventSink<List<int>> {
- final int expectedContentLength;
- final Uri uri;
- int _bytesWritten = 0;
-
- EventSink<List<int>> _outSink;
-
- _ContentLengthValidator(this.expectedContentLength, this.uri);
-
- Stream<List<int>> bind(Stream<List<int>> stream) {
- return new Stream.eventTransformed(
- stream,
- (EventSink sink) {
- if (_outSink != null) {
- throw new StateError("Validator transformer already used");
- }
- _outSink = sink;
- return this;
- });
- }
-
- void add(List<int> data) {
- _bytesWritten += data.length;
- if (_bytesWritten > expectedContentLength) {
- _outSink.addError(new HttpException(
- "Content size exceeds specified contentLength. "
- "$_bytesWritten bytes written while expected "
- "$expectedContentLength. "
- "[${new String.fromCharCodes(data)}]",
- uri: uri));
- _outSink.close();
- } else {
- _outSink.add(data);
- }
- }
-
- void addError(Object error, [StackTrace stackTrace]) {
- _outSink.addError(error, stackTrace);
- }
-
- void close() {
- if (_bytesWritten < expectedContentLength) {
- _outSink.addError(new HttpException(
- "Content size below specified contentLength. "
- " $_bytesWritten bytes written while expected "
- "$expectedContentLength.",
- uri: uri));
- }
- _outSink.close();
- }
-}
-
-
-// Extends StreamConsumer as this is an internal type, only used to pipe to.
-class _HttpOutgoing implements StreamConsumer<List<int>> {
- final Completer _doneCompleter = new Completer();
- final Socket socket;
-
- _HttpOutgoing(this.socket);
-
- Future addStream(Stream<List<int>> stream) {
- return socket.addStream(stream)
- .catchError((error) {
- _doneCompleter.completeError(error);
- throw error;
- });
- }
-
- Future close() {
- _doneCompleter.complete(socket);
- return new Future.value();
- }
-
- Future get done => _doneCompleter.future;
}
class _HttpClientConnection {
@@ -1238,7 +1214,6 @@ class _HttpClientConnection {
Timer _idleTimer;
bool closed = false;
Uri _currentUri;
- final Uint8List _headersBuffer = new Uint8List(_HEADERS_BUFFER_SIZE);
Completer<_HttpIncoming> _nextResponseCompleter;
Future _streamFuture;
@@ -1901,7 +1876,6 @@ class _HttpConnection extends LinkedListEntry<_HttpConnection> {
int _state = _IDLE;
StreamSubscription _subscription;
Timer _idleTimer;
- final Uint8List _headersBuffer = new Uint8List(_HEADERS_BUFFER_SIZE);
bool _idleMark = false;
Future _streamFuture;
« no previous file with comments | « runtime/bin/socket_patch.dart ('k') | sdk/lib/io/io_sink.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698