| Index: sdk/lib/io/http_impl.dart
|
| diff --git a/sdk/lib/io/http_impl.dart b/sdk/lib/io/http_impl.dart
|
| index 5bc5b97cae902e5b01162391de71b196bc53bc52..7990e05ffeb4ae4de9ef0e413f9dd0d693ba9ec1 100644
|
| --- a/sdk/lib/io/http_impl.dart
|
| +++ b/sdk/lib/io/http_impl.dart
|
| @@ -4,7 +4,7 @@
|
|
|
| part of dart.io;
|
|
|
| -const int _HEADERS_BUFFER_SIZE = 8 * 1024;
|
| +const int _OUTGOING_BUFFER_SIZE = 8 * 1024;
|
|
|
| class _HttpIncoming extends Stream<List<int>> {
|
| final int _transferLength;
|
| @@ -406,28 +406,25 @@ class _HttpClientResponse
|
| }
|
|
|
|
|
| -abstract class _HttpOutboundMessage<T> implements IOSink {
|
| +abstract class _HttpOutboundMessage<T> extends _IOSinkImpl {
|
| // Used to mark when the body should be written. This is used for HEAD
|
| // requests and in error handling.
|
| bool _ignoreBody = false;
|
| bool _headersWritten = false;
|
| - bool _asGZip = false;
|
|
|
| - IOSink _headersSink;
|
| - IOSink _dataSink;
|
| -
|
| - final _HttpOutgoing _outgoing;
|
| final Uri _uri;
|
| + final _HttpOutgoing _outgoing;
|
|
|
| final _HttpHeaders headers;
|
|
|
| _HttpOutboundMessage(this._uri,
|
| String protocolVersion,
|
| - _HttpOutgoing outgoing)
|
| - : _outgoing = outgoing,
|
| - _headersSink = new IOSink(outgoing, encoding: ASCII),
|
| + this._outgoing)
|
| + : super(new _HttpOutboundConsumer(), null),
|
| headers = new _HttpHeaders(protocolVersion) {
|
| - _dataSink = new IOSink(new _HttpOutboundConsumer(this));
|
| + _outgoing.outbound = this;
|
| + (_target as _HttpOutboundConsumer).outbound = this;
|
| + _encodingMutable = false;
|
| }
|
|
|
| int get contentLength => headers.contentLength;
|
| @@ -450,60 +447,30 @@ abstract class _HttpOutboundMessage<T> implements IOSink {
|
| return Encoding.getByName(charset);
|
| }
|
|
|
| - void set encoding(Encoding value) {
|
| - throw new StateError("IOSink encoding is not mutable");
|
| - }
|
| -
|
| - void write(Object obj) {
|
| - if (!_headersWritten) _dataSink.encoding = encoding;
|
| - _dataSink.write(obj);
|
| - }
|
| -
|
| - void writeAll(Iterable objects, [String separator = ""]) {
|
| - if (!_headersWritten) _dataSink.encoding = encoding;
|
| - _dataSink.writeAll(objects, separator);
|
| - }
|
| -
|
| - void writeln([Object obj = ""]) {
|
| - if (!_headersWritten) _dataSink.encoding = encoding;
|
| - _dataSink.writeln(obj);
|
| - }
|
| -
|
| - void writeCharCode(int charCode) {
|
| - if (!_headersWritten) _dataSink.encoding = encoding;
|
| - _dataSink.writeCharCode(charCode);
|
| - }
|
| -
|
| void add(List<int> data) {
|
| if (data.length == 0) return;
|
| - _dataSink.add(data);
|
| + super.add(data);
|
| }
|
|
|
| - void addError(error, [StackTrace stackTrace]) =>
|
| - _dataSink.addError(error, stackTrace);
|
| -
|
| - Future<T> addStream(Stream<List<int>> stream) => _dataSink.addStream(stream);
|
| -
|
| - Future flush() => _dataSink.flush();
|
| -
|
| - Future close() => _dataSink.close();
|
| -
|
| - Future<T> get done => _dataSink.done;
|
| -
|
| - Future _writeHeaders({bool drainRequest: true}) {
|
| - void write() {
|
| + Future _writeHeaders({bool drainRequest: true,
|
| + bool setOutgoing: true}) {
|
| + // TODO(ajohnsen): Avoid excessive futures in this method.
|
| + write() {
|
| try {
|
| _writeHeader();
|
| - } catch (error) {
|
| + } catch (_) {
|
| // Headers too large.
|
| throw new HttpException(
|
| - "Headers size exceeded the of '$_HEADERS_BUFFER_SIZE' bytes");
|
| + "Headers size exceeded the of '$_OUTGOING_BUFFER_SIZE'"
|
| + " bytes");
|
| }
|
| + return this;
|
| }
|
| - if (_headersWritten) return new Future.value();
|
| + if (_headersWritten) return new Future.value(this);
|
| _headersWritten = true;
|
| - _dataSink.encoding = encoding;
|
| + Future drainFuture;
|
| bool isServerSide = this is _HttpResponse;
|
| + bool gzip = false;
|
| if (isServerSide) {
|
| var response = this;
|
| if (headers.chunkedTransferEncoding) {
|
| @@ -516,42 +483,51 @@ abstract class _HttpOutboundMessage<T> implements IOSink {
|
| .any((encoding) => encoding.trim().toLowerCase() == "gzip") &&
|
| contentEncoding == null) {
|
| headers.set(HttpHeaders.CONTENT_ENCODING, "gzip");
|
| - _asGZip = true;
|
| + gzip = true;
|
| }
|
| }
|
| if (drainRequest && !response._httpRequest._incoming.hasSubscriber) {
|
| - return response._httpRequest.drain()
|
| - // TODO(ajohnsen): Timeout on drain?
|
| - .catchError((_) {}) // Ignore errors.
|
| - .then((_) => write());
|
| + drainFuture = response._httpRequest.drain().catchError((_) {});
|
| }
|
| + } else {
|
| + drainRequest = false;
|
| + }
|
| + if (_ignoreBody) {
|
| + return new Future.sync(write).then((_) => _outgoing.close());
|
| + }
|
| + if (setOutgoing) {
|
| + int contentLength = headers.contentLength;
|
| + if (headers.chunkedTransferEncoding) {
|
| + _outgoing.chunked = true;
|
| + if (gzip) _outgoing.gzip = true;
|
| + } else if (contentLength >= 0) {
|
| + _outgoing.contentLength = contentLength;
|
| + }
|
| + }
|
| + if (drainFuture != null) {
|
| + return drainFuture.then((_) => write());
|
| }
|
| return new Future.sync(write);
|
| }
|
|
|
| Future _addStream(Stream<List<int>> stream) {
|
| - return _writeHeaders()
|
| - .then((_) {
|
| - int contentLength = headers.contentLength;
|
| - if (_ignoreBody) {
|
| - stream.drain().catchError((_) {});
|
| - return _headersSink.close();
|
| - }
|
| - stream = stream.transform(const _BufferTransformer());
|
| - if (headers.chunkedTransferEncoding) {
|
| - if (_asGZip) {
|
| - stream = stream.transform(GZIP.encoder);
|
| - }
|
| - stream = stream.transform(const _ChunkedTransformer());
|
| - } else if (contentLength >= 0) {
|
| - stream = stream.transform(
|
| - new _ContentLengthValidator(contentLength, _uri));
|
| - }
|
| - return _headersSink.addStream(stream);
|
| - });
|
| + // TODO(ajohnsen): Merge into _HttpOutgoing.
|
| + if (_ignoreBody) {
|
| + stream.drain().catchError((_) {});
|
| + return _writeHeaders();
|
| + }
|
| + if (_headersWritten) {
|
| + return _outgoing.addStream(stream);
|
| + } else {
|
| + var completer = new Completer.sync();
|
| + var future = _outgoing.addStream(stream, completer.future);
|
| + _writeHeaders().then(completer.complete);
|
| + return future;
|
| + }
|
| }
|
|
|
| Future _close() {
|
| + // TODO(ajohnsen): Merge into _HttpOutgoing.
|
| if (!_headersWritten) {
|
| if (!_ignoreBody && headers.contentLength == -1) {
|
| // If no body was written, _ignoreBody is false (it's not a HEAD
|
| @@ -560,14 +536,14 @@ abstract class _HttpOutboundMessage<T> implements IOSink {
|
| headers.chunkedTransferEncoding = false;
|
| headers.contentLength = 0;
|
| } else if (!_ignoreBody && headers.contentLength > 0) {
|
| - _headersSink.addError(new HttpException(
|
| - "No content while contentLength was specified to be greater "
|
| - "than 0: ${headers.contentLength}.",
|
| - uri: _uri));
|
| - return _headersSink.done;
|
| + return _outgoing.addStream(
|
| + new Stream.fromFuture(new Future.error(new HttpException(
|
| + "No content even though contentLength was specified to be "
|
| + "greater than 0: ${headers.contentLength}.",
|
| + uri: _uri))));
|
| }
|
| }
|
| - return _writeHeaders().whenComplete(_headersSink.close);
|
| + return _writeHeaders().whenComplete(_outgoing.close);
|
| }
|
|
|
| void _writeHeader();
|
| @@ -575,146 +551,13 @@ abstract class _HttpOutboundMessage<T> implements IOSink {
|
|
|
|
|
| class _HttpOutboundConsumer implements StreamConsumer {
|
| - final _HttpOutboundMessage _outbound;
|
| - StreamController _controller;
|
| - StreamSubscription _subscription;
|
| - Completer _closeCompleter = new Completer();
|
| - Completer _completer;
|
| - bool _socketError = false;
|
| -
|
| - _HttpOutboundConsumer(this._outbound);
|
| -
|
| - void _cancel() {
|
| - if (_subscription != null) {
|
| - StreamSubscription subscription = _subscription;
|
| - _subscription = null;
|
| - subscription.cancel();
|
| - }
|
| - }
|
| -
|
| - bool _ignoreError(error)
|
| - => (error is SocketException || error is TlsException) &&
|
| - _outbound is HttpResponse;
|
| -
|
| - _ensureController() {
|
| - if (_controller != null) return;
|
| - _controller = new StreamController(sync: true,
|
| - onPause: () => _subscription.pause(),
|
| - onResume: () => _subscription.resume(),
|
| - onListen: () => _subscription.resume(),
|
| - onCancel: _cancel);
|
| - _outbound._addStream(_controller.stream)
|
| - .then((_) {
|
| - _cancel();
|
| - _done();
|
| - _closeCompleter.complete(_outbound);
|
| - },
|
| - onError: (error, [StackTrace stackTrace]) {
|
| - _socketError = true;
|
| - if (_ignoreError(error)) {
|
| - _cancel();
|
| - _done();
|
| - _closeCompleter.complete(_outbound);
|
| - } else {
|
| - if (!_done(error)) {
|
| - _closeCompleter.completeError(error, stackTrace);
|
| - }
|
| - }
|
| - });
|
| - }
|
| + // TODO(ajohnsen): Once _addStream and _close is merged into _HttpOutgoing,
|
| + // this class can be removed.
|
| + _HttpOutboundMessage outbound;
|
| + _HttpOutboundConsumer();
|
|
|
| - bool _done([error, StackTrace stackTrace]) {
|
| - if (_completer == null) return false;
|
| - if (error != null) {
|
| - _completer.completeError(error, stackTrace);
|
| - } else {
|
| - _completer.complete(_outbound);
|
| - }
|
| - _completer = null;
|
| - return true;
|
| - }
|
| -
|
| - Future addStream(var stream) {
|
| - // If we saw a socket error subscribe and then cancel, to ignore any data
|
| - // on the stream.
|
| - if (_socketError) {
|
| - stream.listen(null).cancel();
|
| - return new Future.value(_outbound);
|
| - }
|
| - _completer = new Completer();
|
| - _subscription = stream.listen(
|
| - (data) => _controller.add(data),
|
| - onDone: _done,
|
| - onError: (e, s) => _controller.addError(e, s),
|
| - cancelOnError: true);
|
| - // Pause the first request.
|
| - if (_controller == null) _subscription.pause();
|
| - _ensureController();
|
| - return _completer.future;
|
| - }
|
| -
|
| - Future close() {
|
| - Future closeOutbound() {
|
| - if (_socketError) return new Future.value(_outbound);
|
| - return _outbound._close()
|
| - .catchError((_) {}, test: _ignoreError)
|
| - .then((_) => _outbound);
|
| - }
|
| - if (_controller == null) return closeOutbound();
|
| - _controller.close();
|
| - return _closeCompleter.future.then((_) => closeOutbound());
|
| - }
|
| -}
|
| -
|
| -
|
| -class _BufferTransformerSink implements EventSink<List<int>> {
|
| - static const int MIN_CHUNK_SIZE = 4 * 1024;
|
| - static const int MAX_BUFFER_SIZE = 16 * 1024;
|
| -
|
| - final BytesBuilder _builder = new BytesBuilder();
|
| - final EventSink<List<int>> _outSink;
|
| -
|
| - _BufferTransformerSink(this._outSink);
|
| -
|
| - void add(List<int> data) {
|
| - // TODO(ajohnsen): Use timeout?
|
| - if (data.length == 0) return;
|
| - if (data.length >= MIN_CHUNK_SIZE) {
|
| - flush();
|
| - _outSink.add(data);
|
| - } else {
|
| - _builder.add(data);
|
| - if (_builder.length >= MAX_BUFFER_SIZE) {
|
| - flush();
|
| - }
|
| - }
|
| - }
|
| -
|
| - void addError(Object error, [StackTrace stackTrace]) {
|
| - _outSink.addError(error, stackTrace);
|
| - }
|
| -
|
| - void close() {
|
| - flush();
|
| - _outSink.close();
|
| - }
|
| -
|
| - void flush() {
|
| - if (_builder.length > 0) {
|
| - // takeBytes will clear the BytesBuilder.
|
| - _outSink.add(_builder.takeBytes());
|
| - }
|
| - }
|
| -}
|
| -
|
| -class _BufferTransformer implements StreamTransformer<List<int>, List<int>> {
|
| - const _BufferTransformer();
|
| -
|
| - Stream<List<int>> bind(Stream<List<int>> stream) {
|
| - return new Stream<List<int>>.eventTransformed(
|
| - stream,
|
| - (EventSink outSink) => new _BufferTransformerSink(outSink));
|
| - }
|
| + Future addStream(var stream) => outbound._addStream(stream);
|
| + Future close() => outbound._close();
|
| }
|
|
|
|
|
| @@ -763,7 +606,8 @@ class _HttpResponse extends _HttpOutboundMessage<HttpResponse>
|
| if (_headersWritten) throw new StateError("Headers already sent");
|
| deadline = null; // Be sure to stop any deadline.
|
| var future = _httpRequest._httpConnection.detachSocket();
|
| - _writeHeaders(drainRequest: false).then((_) => close());
|
| + _writeHeaders(drainRequest: false,
|
| + setOutgoing: false).then((_) => close());
|
| // Close connection so the socket is 'free'.
|
| close();
|
| done.catchError((_) {
|
| @@ -783,12 +627,13 @@ class _HttpResponse extends _HttpOutboundMessage<HttpResponse>
|
|
|
| if (_deadline == null) return;
|
| _deadlineTimer = new Timer(_deadline, () {
|
| + _outgoing._socketError = true;
|
| _outgoing.socket.destroy();
|
| });
|
| }
|
|
|
| void _writeHeader() {
|
| - Uint8List buffer = _httpRequest._httpConnection._headersBuffer;
|
| + Uint8List buffer = new Uint8List(_OUTGOING_BUFFER_SIZE);
|
| int offset = 0;
|
|
|
| void write(List<int> bytes) {
|
| @@ -847,7 +692,7 @@ class _HttpResponse extends _HttpOutboundMessage<HttpResponse>
|
| offset = headers._write(buffer, offset);
|
| buffer[offset++] = _CharCode.CR;
|
| buffer[offset++] = _CharCode.LF;
|
| - _headersSink.add(new Uint8List.view(buffer.buffer, 0, offset));
|
| + _outgoing.setHeader(buffer, offset);
|
| }
|
|
|
| String _findReasonPhrase(int statusCode) {
|
| @@ -1036,7 +881,7 @@ class _HttpClientRequest extends _HttpOutboundMessage<HttpClientResponse>
|
| }
|
|
|
| void _writeHeader() {
|
| - Uint8List buffer = _httpClientConnection._headersBuffer;
|
| + Uint8List buffer = new Uint8List(_OUTGOING_BUFFER_SIZE);
|
| int offset = 0;
|
|
|
| void write(List<int> bytes) {
|
| @@ -1074,41 +919,268 @@ class _HttpClientRequest extends _HttpOutboundMessage<HttpClientResponse>
|
| offset = headers._write(buffer, offset);
|
| buffer[offset++] = _CharCode.CR;
|
| buffer[offset++] = _CharCode.LF;
|
| - _headersSink.add(new Uint8List.view(buffer.buffer, 0, offset));
|
| + _outgoing.setHeader(buffer, offset);
|
| }
|
| }
|
|
|
| +// Used by _HttpOutgoing as a target of a chunked converter for gzip
|
| +// compression.
|
| +class _HttpGZipSink extends ByteConversionSink {
|
| + final Function _consume;
|
| + _HttpGZipSink(this._consume);
|
| +
|
| + void add(List<int> chunk) {
|
| + _consume(chunk);
|
| + }
|
|
|
| -class _ChunkedTransformerSink implements EventSink<List<int>> {
|
| + void addSlice(Uint8List chunk, int start, int end, bool isLast) {
|
| + _consume(new Uint8List.view(chunk.buffer, start, end - start));
|
| + }
|
|
|
| - int _pendingFooter = 0;
|
| - final EventSink<List<int>> _outSink;
|
| + void close() {}
|
| +}
|
|
|
| - _ChunkedTransformerSink(this._outSink);
|
|
|
| - void add(List<int> data) {
|
| - _outSink.add(_chunkHeader(data.length));
|
| - if (data.length > 0) _outSink.add(data);
|
| - _pendingFooter = 2;
|
| +// The _HttpOutgoing handles all of the following:
|
| +// - Buffering
|
| +// - GZip compressionm
|
| +// - Content-Length validation.
|
| +// - Errors.
|
| +//
|
| +// Most notable is the GZip compression, that uses a double-buffering system,
|
| +// one before gzip (_gzipBuffer) and one after (_buffer).
|
| +class _HttpOutgoing
|
| + implements StreamConsumer<List<int>> {
|
| + static const List<int> _footerAndChunk0Length =
|
| + const [_CharCode.CR, _CharCode.LF, 0x30, _CharCode.CR, _CharCode.LF,
|
| + _CharCode.CR, _CharCode.LF];
|
| +
|
| + static const List<int> _chunk0Length =
|
| + const [0x30, _CharCode.CR, _CharCode.LF, _CharCode.CR, _CharCode.LF];
|
| +
|
| + final Completer _doneCompleter = new Completer();
|
| + final Socket socket;
|
| +
|
| + Uint8List _buffer;
|
| + int _length = 0;
|
| +
|
| + Future _closeFuture;
|
| +
|
| + bool chunked = false;
|
| + int _pendingChunkedFooter = 0;
|
| +
|
| + int contentLength;
|
| + int _bytesWritten = 0;
|
| +
|
| + bool _gzip = false;
|
| + ByteConversionSink _gzipSink;
|
| + // _gzipAdd is set iff the sink is being added to. It's used to specify where
|
| + // gzipped data should be taken (sometimes a controller, sometimes a socket).
|
| + Function _gzipAdd;
|
| + Uint8List _gzipBuffer;
|
| + int _gzipBufferLength = 0;
|
| +
|
| + bool _socketError = false;
|
| +
|
| + _HttpOutboundMessage outbound;
|
| +
|
| + bool _ignoreError(error)
|
| + => (error is SocketException || error is TlsException) &&
|
| + outbound is HttpResponse;
|
| +
|
| + _HttpOutgoing(this.socket);
|
| +
|
| + Future addStream(Stream<List<int>> stream, [Future pauseFuture]) {
|
| + if (_socketError) {
|
| + stream.listen(null).cancel();
|
| + return new Future.value(outbound);
|
| + }
|
| + var sub;
|
| + var controller;
|
| + // Use new stream so we are able to pause (see below listen). The
|
| + // alternative is to use stream.extand, but that won't give us a way of
|
| + // pausing.
|
| + controller = new StreamController(
|
| + onPause: () => sub.pause(),
|
| + onResume: () => sub.resume(),
|
| + sync: true);
|
| +
|
| + void onData(data) {
|
| + if (_socketError) return;
|
| + if (data.length == 0) return;
|
| + if (chunked) {
|
| + if (_gzip) {
|
| + _gzipAdd = controller.add;
|
| + _addGZipChunk(data, _gzipSink.add);
|
| + _gzipAdd = null;
|
| + return;
|
| + }
|
| + _addChunk(_chunkHeader(data.length), controller.add);
|
| + _pendingChunkedFooter = 2;
|
| + } else {
|
| + if (contentLength != null) {
|
| + _bytesWritten += data.length;
|
| + if (_bytesWritten > contentLength) {
|
| + controller.addError(new HttpException(
|
| + "Content size exceeds specified contentLength. "
|
| + "$_bytesWritten bytes written while expected "
|
| + "$contentLength. "
|
| + "[${new String.fromCharCodes(data)}]"));
|
| + return;
|
| + }
|
| + }
|
| + }
|
| + _addChunk(data, controller.add);
|
| + }
|
| +
|
| + sub = stream.listen(
|
| + onData,
|
| + onError: controller.addError,
|
| + onDone: controller.close,
|
| + cancelOnError: true);
|
| +
|
| + // While incoming is being drained, the pauseFuture is non-null. Pause
|
| + // output until it's drained.
|
| + if (pauseFuture != null) {
|
| + sub.pause(pauseFuture);
|
| + }
|
| +
|
| + return socket.addStream(controller.stream)
|
| + .then((_) {
|
| + return outbound;
|
| + }, onError: (error) {
|
| + // Be sure to close it in case of an error.
|
| + if (_gzip) _gzipSink.close();
|
| + _socketError = true;
|
| + _doneCompleter.completeError(error);
|
| + if (_ignoreError(error)) {
|
| + return outbound;
|
| + } else {
|
| + throw error;
|
| + }
|
| + });
|
| }
|
|
|
| - void addError(Object error, [StackTrace stackTrace]) {
|
| - _outSink.addError(error, stackTrace);
|
| + Future close() {
|
| + // If we are already closed, return that future.
|
| + if (_closeFuture != null) return _closeFuture;
|
| + // If we earlier saw an error, return immidiate. The notification to
|
| + // _Http*Connection is already done.
|
| + if (_socketError) return new Future.value(outbound);
|
| + // If contentLength was specified, validate it.
|
| + if (contentLength != null) {
|
| + if (_bytesWritten < contentLength) {
|
| + var error = new HttpException(
|
| + "Content size below specified contentLength. "
|
| + " $_bytesWritten bytes written but expected "
|
| + "$contentLength.");
|
| + _doneCompleter.completeError(error);
|
| + return _closeFuture = new Future.error(error);
|
| + }
|
| + }
|
| + // In case of chunked encoding (and gzip), handle remaining gzip data and
|
| + // append the 'footer' for chunked encoding.
|
| + if (chunked) {
|
| + if (_gzip) {
|
| + _gzipAdd = socket.add;
|
| + if (_gzipBufferLength > 0) {
|
| + _gzipSink.add(new Uint8List.view(
|
| + _gzipBuffer.buffer, 0, _gzipBufferLength));
|
| + }
|
| + _gzipBuffer = null;
|
| + _gzipSink.close();
|
| + _gzipAdd = null;
|
| + }
|
| + _addChunk(_chunkHeader(0), socket.add);
|
| + }
|
| + // Add any remaining data in the buffer.
|
| + if (_length > 0) {
|
| + socket.add(new Uint8List.view(_buffer.buffer, 0, _length));
|
| + }
|
| + // Clear references, for better GC.
|
| + _buffer = null;
|
| + // And finally flush it. As we support keep-alive, never close it from here.
|
| + // Once the socket is flushed, we'll be able to reuse it (signaled by the
|
| + // 'done' future).
|
| + return _closeFuture = socket.flush()
|
| + .then((_) {
|
| + _doneCompleter.complete(socket);
|
| + return outbound;
|
| + }, onError: (error) {
|
| + _doneCompleter.completeError(error);
|
| + if (_ignoreError(error)) {
|
| + return outbound;
|
| + } else {
|
| + throw error;
|
| + }
|
| + });
|
| }
|
|
|
| - void close() {
|
| - add(const []);
|
| - _outSink.close();
|
| + Future get done => _doneCompleter.future;
|
| +
|
| + void setHeader(List<int> data, int length) {
|
| + assert(_length == 0);
|
| + assert(data.length == _OUTGOING_BUFFER_SIZE);
|
| + _buffer = data;
|
| + _length = length;
|
| + }
|
| +
|
| + void set gzip(bool value) {
|
| + _gzip = value;
|
| + if (_gzip) {
|
| + _gzipBuffer = new Uint8List(_OUTGOING_BUFFER_SIZE);
|
| + assert(_gzipSink == null);
|
| + _gzipSink = new ZLibEncoder(gzip: true)
|
| + .startChunkedConversion(
|
| + new _HttpGZipSink((data) {
|
| + // We are closing down prematurely, due to an error. Discard.
|
| + if (_gzipAdd == null) return;
|
| + _addChunk(_chunkHeader(data.length), _gzipAdd);
|
| + _pendingChunkedFooter = 2;
|
| + _addChunk(data, _gzipAdd);
|
| + }));
|
| + }
|
| + }
|
| +
|
| + void _addGZipChunk(chunk, void add(List<int> data)) {
|
| + if (chunk.length > _gzipBuffer.length - _gzipBufferLength) {
|
| + add(new Uint8List.view(
|
| + _gzipBuffer.buffer, 0, _gzipBufferLength));
|
| + _gzipBuffer = new Uint8List(_OUTGOING_BUFFER_SIZE);
|
| + _gzipBufferLength = 0;
|
| + }
|
| + if (chunk.length > _OUTGOING_BUFFER_SIZE) {
|
| + add(chunk);
|
| + } else {
|
| + _gzipBuffer.setRange(_gzipBufferLength,
|
| + _gzipBufferLength + chunk.length,
|
| + chunk);
|
| + _gzipBufferLength += chunk.length;
|
| + }
|
| + }
|
| +
|
| + void _addChunk(chunk, void add(List<int> data)) {
|
| + if (chunk.length > _buffer.length - _length) {
|
| + add(new Uint8List.view(_buffer.buffer, 0, _length));
|
| + _buffer = new Uint8List(_OUTGOING_BUFFER_SIZE);
|
| + _length = 0;
|
| + }
|
| + if (chunk.length > _OUTGOING_BUFFER_SIZE) {
|
| + add(chunk);
|
| + } else {
|
| + _buffer.setRange(_length, _length + chunk.length, chunk);
|
| + _length += chunk.length;
|
| + }
|
| }
|
|
|
| List<int> _chunkHeader(int length) {
|
| const hexDigits = const [0x30, 0x31, 0x32, 0x33, 0x34, 0x35, 0x36, 0x37,
|
| 0x38, 0x39, 0x41, 0x42, 0x43, 0x44, 0x45, 0x46];
|
| if (length == 0) {
|
| - if (_pendingFooter == 2) return _footerAndChunk0Length;
|
| + if (_pendingChunkedFooter == 2) return _footerAndChunk0Length;
|
| return _chunk0Length;
|
| }
|
| - int size = _pendingFooter;
|
| + int size = _pendingChunkedFooter;
|
| int len = length;
|
| // Compute a fast integer version of (log(length + 1) / log(16)).ceil().
|
| while (len > 0) {
|
| @@ -1116,12 +1188,12 @@ class _ChunkedTransformerSink implements EventSink<List<int>> {
|
| len >>= 4;
|
| }
|
| var footerAndHeader = new Uint8List(size + 2);
|
| - if (_pendingFooter == 2) {
|
| + if (_pendingChunkedFooter == 2) {
|
| footerAndHeader[0] = _CharCode.CR;
|
| footerAndHeader[1] = _CharCode.LF;
|
| }
|
| int index = size;
|
| - while (index > _pendingFooter) {
|
| + while (index > _pendingChunkedFooter) {
|
| footerAndHeader[--index] = hexDigits[length & 15];
|
| length = length >> 4;
|
| }
|
| @@ -1129,102 +1201,6 @@ class _ChunkedTransformerSink implements EventSink<List<int>> {
|
| footerAndHeader[size + 1] = _CharCode.LF;
|
| return footerAndHeader;
|
| }
|
| -
|
| - static List<int> get _footerAndChunk0Length => new Uint8List.fromList(
|
| - const [_CharCode.CR, _CharCode.LF, 0x30, _CharCode.CR, _CharCode.LF,
|
| - _CharCode.CR, _CharCode.LF]);
|
| -
|
| - static List<int> get _chunk0Length => new Uint8List.fromList(
|
| - const [0x30, _CharCode.CR, _CharCode.LF, _CharCode.CR, _CharCode.LF]);
|
| -}
|
| -
|
| -// Transformer that transforms data to HTTP Chunked Encoding.
|
| -class _ChunkedTransformer implements StreamTransformer<List<int>, List<int>> {
|
| - const _ChunkedTransformer();
|
| -
|
| - Stream<List<int>> bind(Stream<List<int>> stream) {
|
| - return new Stream<List<int>>.eventTransformed(
|
| - stream,
|
| - (EventSink<List<int>> sink) => new _ChunkedTransformerSink(sink));
|
| - }
|
| -}
|
| -
|
| -// Transformer that validates the content length.
|
| -class _ContentLengthValidator
|
| - implements StreamTransformer<List<int>, List<int>>, EventSink<List<int>> {
|
| - final int expectedContentLength;
|
| - final Uri uri;
|
| - int _bytesWritten = 0;
|
| -
|
| - EventSink<List<int>> _outSink;
|
| -
|
| - _ContentLengthValidator(this.expectedContentLength, this.uri);
|
| -
|
| - Stream<List<int>> bind(Stream<List<int>> stream) {
|
| - return new Stream.eventTransformed(
|
| - stream,
|
| - (EventSink sink) {
|
| - if (_outSink != null) {
|
| - throw new StateError("Validator transformer already used");
|
| - }
|
| - _outSink = sink;
|
| - return this;
|
| - });
|
| - }
|
| -
|
| - void add(List<int> data) {
|
| - _bytesWritten += data.length;
|
| - if (_bytesWritten > expectedContentLength) {
|
| - _outSink.addError(new HttpException(
|
| - "Content size exceeds specified contentLength. "
|
| - "$_bytesWritten bytes written while expected "
|
| - "$expectedContentLength. "
|
| - "[${new String.fromCharCodes(data)}]",
|
| - uri: uri));
|
| - _outSink.close();
|
| - } else {
|
| - _outSink.add(data);
|
| - }
|
| - }
|
| -
|
| - void addError(Object error, [StackTrace stackTrace]) {
|
| - _outSink.addError(error, stackTrace);
|
| - }
|
| -
|
| - void close() {
|
| - if (_bytesWritten < expectedContentLength) {
|
| - _outSink.addError(new HttpException(
|
| - "Content size below specified contentLength. "
|
| - " $_bytesWritten bytes written while expected "
|
| - "$expectedContentLength.",
|
| - uri: uri));
|
| - }
|
| - _outSink.close();
|
| - }
|
| -}
|
| -
|
| -
|
| -// Extends StreamConsumer as this is an internal type, only used to pipe to.
|
| -class _HttpOutgoing implements StreamConsumer<List<int>> {
|
| - final Completer _doneCompleter = new Completer();
|
| - final Socket socket;
|
| -
|
| - _HttpOutgoing(this.socket);
|
| -
|
| - Future addStream(Stream<List<int>> stream) {
|
| - return socket.addStream(stream)
|
| - .catchError((error) {
|
| - _doneCompleter.completeError(error);
|
| - throw error;
|
| - });
|
| - }
|
| -
|
| - Future close() {
|
| - _doneCompleter.complete(socket);
|
| - return new Future.value();
|
| - }
|
| -
|
| - Future get done => _doneCompleter.future;
|
| }
|
|
|
| class _HttpClientConnection {
|
| @@ -1238,7 +1214,6 @@ class _HttpClientConnection {
|
| Timer _idleTimer;
|
| bool closed = false;
|
| Uri _currentUri;
|
| - final Uint8List _headersBuffer = new Uint8List(_HEADERS_BUFFER_SIZE);
|
|
|
| Completer<_HttpIncoming> _nextResponseCompleter;
|
| Future _streamFuture;
|
| @@ -1901,7 +1876,6 @@ class _HttpConnection extends LinkedListEntry<_HttpConnection> {
|
| int _state = _IDLE;
|
| StreamSubscription _subscription;
|
| Timer _idleTimer;
|
| - final Uint8List _headersBuffer = new Uint8List(_HEADERS_BUFFER_SIZE);
|
| bool _idleMark = false;
|
| Future _streamFuture;
|
|
|
|
|