| Index: sdk/lib/io/http_impl.dart
|
| diff --git a/sdk/lib/io/http_impl.dart b/sdk/lib/io/http_impl.dart
|
| index cea3e11e9144c54473916a32031e89736f8b2b0d..de86c08d351bb4aeb9039f88ad01417b70806f66 100644
|
| --- a/sdk/lib/io/http_impl.dart
|
| +++ b/sdk/lib/io/http_impl.dart
|
| @@ -326,16 +326,22 @@ abstract class _HttpOutboundMessage<T> implements IOSink {
|
| // requests and in error handling.
|
| bool _ignoreBody = false;
|
| bool _headersWritten = false;
|
| + bool _asGZip = false;
|
| +
|
| + IOSink _headersSink;
|
| + IOSink _dataSink;
|
|
|
| - IOSink _ioSink;
|
| final _HttpOutgoing _outgoing;
|
|
|
| final _HttpHeaders headers;
|
|
|
| _HttpOutboundMessage(String protocolVersion, _HttpOutgoing outgoing)
|
| : _outgoing = outgoing,
|
| - _ioSink = new IOSink(outgoing, encoding: Encoding.ASCII),
|
| - headers = new _HttpHeaders(protocolVersion);
|
| + _headersSink = new IOSink(outgoing, encoding: Encoding.ASCII),
|
| + headers = new _HttpHeaders(protocolVersion) {
|
| + _dataSink = new IOSink(
|
| + new _HttpOutboundConsumer(_headersSink, _addStream, this));
|
| + }
|
|
|
| int get contentLength => headers.contentLength;
|
| void set contentLength(int contentLength) {
|
| @@ -376,7 +382,7 @@ abstract class _HttpOutboundMessage<T> implements IOSink {
|
| }
|
| }
|
| if (string.isEmpty) return;
|
| - _ioSink.write(string);
|
| + _dataSink.write(string);
|
| }
|
|
|
| void writeAll(Iterable objects, [String separator = ""]) {
|
| @@ -403,26 +409,17 @@ abstract class _HttpOutboundMessage<T> implements IOSink {
|
| void add(List<int> data) {
|
| _writeHeaders();
|
| if (data.length == 0) return;
|
| - _ioSink.add(data);
|
| + _dataSink.add(data);
|
| }
|
|
|
| void addError(AsyncError error) {
|
| _writeHeaders();
|
| - _ioSink.addError(error);
|
| - }
|
| -
|
| - Future<T> consume(Stream<List<int>> stream) {
|
| - _writeHeaders();
|
| - return _ioSink.consume(stream);
|
| + _dataSink.addError(error);
|
| }
|
|
|
| Future<T> addStream(Stream<List<int>> stream) {
|
| _writeHeaders();
|
| - return _ioSink.writeStream(stream).then((_) => this);
|
| - }
|
| -
|
| - Future<T> writeStream(Stream<List<int>> stream) {
|
| - return addStream(stream);
|
| + return _dataSink.addStream(stream);
|
| }
|
|
|
| Future close() {
|
| @@ -435,20 +432,15 @@ abstract class _HttpOutboundMessage<T> implements IOSink {
|
| headers.contentLength = 0;
|
| }
|
| _writeHeaders();
|
| - return _ioSink.close();
|
| + return _dataSink.close();
|
| }
|
|
|
| - Future<T> get done {
|
| - _writeHeaders();
|
| - return _ioSink.done;
|
| - }
|
| + Future<T> get done => _dataSink.done.then((_) => this);
|
|
|
| void _writeHeaders() {
|
| if (_headersWritten) return;
|
| _headersWritten = true;
|
| - _ioSink.encoding = Encoding.ASCII;
|
| headers._synchronize(); // Be sure the 'chunked' option is updated.
|
| - bool asGZip = false;
|
| bool isServerSide = this is _HttpResponse;
|
| if (isServerSide && headers.chunkedTransferEncoding) {
|
| var response = this;
|
| @@ -461,30 +453,28 @@ abstract class _HttpOutboundMessage<T> implements IOSink {
|
| .any((encoding) => encoding.trim().toLowerCase() == "gzip") &&
|
| contentEncoding == null) {
|
| headers.set(HttpHeaders.CONTENT_ENCODING, "gzip");
|
| - asGZip = true;
|
| + _asGZip = true;
|
| }
|
| }
|
| _writeHeader();
|
| - _ioSink = new IOSink(new _HttpOutboundConsumer(_ioSink, _consume, asGZip));
|
| - _ioSink.encoding = encoding;
|
| }
|
|
|
| - Future _consume(IOSink ioSink, Stream<List<int>> stream, bool asGZip) {
|
| + Future _addStream(IOSink ioSink, Stream<List<int>> stream) {
|
| int contentLength = headers.contentLength;
|
| if (_ignoreBody) {
|
| - ioSink.close();
|
| - return stream.fold(null, (x, y) {}).then((_) => this);
|
| + stream.fold(null, (x, y) {}).catchError((_) {});
|
| + return ioSink.close().then((_) => this);
|
| }
|
| stream = stream.transform(new _BufferTransformer());
|
| if (headers.chunkedTransferEncoding) {
|
| - if (asGZip) {
|
| + if (_asGZip) {
|
| stream = stream.transform(new ZLibDeflater(gzip: true, level: 6));
|
| }
|
| stream = stream.transform(new _ChunkedTransformer());
|
| } else if (contentLength >= 0) {
|
| stream = stream.transform(new _ContentLengthValidator(contentLength));
|
| }
|
| - return stream.pipe(ioSink).then((_) => this);
|
| + return ioSink.addStream(stream).then((_) => this);
|
| }
|
|
|
| void _writeHeader(); // TODO(ajohnsen): Better name.
|
| @@ -492,21 +482,83 @@ abstract class _HttpOutboundMessage<T> implements IOSink {
|
|
|
|
|
| class _HttpOutboundConsumer implements StreamConsumer {
|
| - Function _consume;
|
| + StreamController _controller;
|
| + StreamSubscription _subscription;
|
| + Function _addStream;
|
| IOSink _ioSink;
|
| - bool _asGZip;
|
| + Completer _closeCompleter = new Completer();
|
| + Completer _completer;
|
| +
|
| + _HttpOutboundMessage _outbound;
|
| _HttpOutboundConsumer(IOSink this._ioSink,
|
| - Function this._consume,
|
| - bool this._asGZip);
|
| + Function this._addStream,
|
| + _HttpOutboundMessage this._outbound);
|
|
|
| - Future consume(var stream) => _consume(_ioSink, stream, _asGZip);
|
| + void _onPause() {
|
| + if (_controller.isPaused) {
|
| + _subscription.pause();
|
| + } else {
|
| + _subscription.resume();
|
| + }
|
| + }
|
| +
|
| + void _onListen() {
|
| + if (!_controller.hasListener && _subscription != null) {
|
| + _subscription.cancel();
|
| + }
|
| + }
|
| +
|
| + _ensureController() {
|
| + if (_controller != null) return;
|
| + _controller = new StreamController(onPauseStateChange: _onPause,
|
| + onSubscriptionStateChange: _onListen);
|
| + _addStream(_ioSink, _controller.stream)
|
| + .then((_) {
|
| + _done();
|
| + _closeCompleter.complete(_outbound);
|
| + },
|
| + onError: (error) {
|
| + if (!_done(error)) {
|
| + _closeCompleter.completeError(error);
|
| + }
|
| + });
|
| + }
|
| +
|
| + bool _done([error]) {
|
| + if (_completer == null) return false;
|
| + var tmp = _completer;
|
| + _completer = null;
|
| + if (error != null) {
|
| + tmp.completeError(error);
|
| + } else {
|
| + tmp.complete(_outbound);
|
| + }
|
| + return true;
|
| + }
|
|
|
| Future addStream(var stream) {
|
| - throw new UnimplementedError("_HttpOutboundConsumer.addStream");
|
| + _ensureController();
|
| + _completer = new Completer();
|
| + _subscription = stream.listen(
|
| + (data) {
|
| + _controller.add(data);
|
| + },
|
| + onDone: () {
|
| + _done();
|
| + },
|
| + onError: (error) {
|
| + _done(error);
|
| + },
|
| + unsubscribeOnError: true);
|
| + return _completer.future;
|
| }
|
|
|
| Future close() {
|
| - throw new UnimplementedError("_HttpOutboundConsumer.close");
|
| + _ensureController();
|
| + _controller.close();
|
| + return _closeCompleter.future.then((_) {
|
| + return _ioSink.close().then((_) => _outbound);
|
| + });
|
| }
|
| }
|
|
|
| @@ -574,8 +626,8 @@ class _HttpResponse extends _HttpOutboundMessage<HttpResponse>
|
| // Close connection so the socket is 'free'.
|
| close();
|
| done.catchError((_) {
|
| - // Catch any error on done, as they automatically will be propegated to
|
| - // the websocket.
|
| + // Catch any error on done, as they automatically will be
|
| + // propagated to the websocket.
|
| });
|
| return future;
|
| }
|
| @@ -632,7 +684,7 @@ class _HttpResponse extends _HttpOutboundMessage<HttpResponse>
|
| // Write headers.
|
| headers._write(buffer);
|
| writeCRLF();
|
| - _ioSink.add(buffer.readBytes());
|
| + _headersSink.add(buffer.readBytes());
|
| }
|
|
|
| String _findReasonPhrase(int statusCode) {
|
| @@ -731,17 +783,13 @@ class _HttpClientRequest extends _HttpOutboundMessage<HttpClientResponse>
|
|
|
| Future<HttpClientResponse> get done {
|
| if (_response == null) {
|
| - _response = Future.wait([_responseCompleter.future, super.done])
|
| + _response = Future.wait([_responseCompleter.future,
|
| + super.done])
|
| .then((list) => list[0]);
|
| }
|
| return _response;
|
| }
|
|
|
| - Future<HttpClientResponse> consume(Stream<List<int>> stream) {
|
| - super.consume(stream);
|
| - return done;
|
| - }
|
| -
|
| Future<HttpClientResponse> close() {
|
| super.close();
|
| return done;
|
| @@ -837,7 +885,7 @@ class _HttpClientRequest extends _HttpOutboundMessage<HttpClientResponse>
|
| // Write headers.
|
| headers._write(buffer);
|
| writeCRLF();
|
| - _ioSink.add(buffer.readBytes());
|
| + _headersSink.add(buffer.readBytes());
|
| }
|
| }
|
|
|
| @@ -913,29 +961,25 @@ class _ContentLengthValidator
|
|
|
| // Extends StreamConsumer as this is an internal type, only used to pipe to.
|
| class _HttpOutgoing implements StreamConsumer<List<int>> {
|
| - Function _onStream;
|
| - final Completer _consumeCompleter = new Completer();
|
| + final Completer _doneCompleter = new Completer();
|
| + final StreamConsumer _consumer;
|
|
|
| - Future onStream(Future callback(Stream<List<int>> stream)) {
|
| - _onStream = callback;
|
| - return _consumeCompleter.future;
|
| - }
|
| -
|
| - Future consume(Stream<List<int>> stream) {
|
| - _onStream(stream)
|
| - .then((_) => _consumeCompleter.complete(),
|
| - onError: _consumeCompleter.completeError);
|
| - // Use .then to ensure a Future branch.
|
| - return _consumeCompleter.future.then((_) => this);
|
| - }
|
| + _HttpOutgoing(StreamConsumer this._consumer);
|
|
|
| Future addStream(Stream<List<int>> stream) {
|
| - throw new UnimplementedError("_HttpOutgoing.addStream");
|
| + return _consumer.addStream(stream)
|
| + .catchError((error) {
|
| + _doneCompleter.completeError(error);
|
| + throw error;
|
| + });
|
| }
|
|
|
| Future close() {
|
| - throw new UnimplementedError("_HttpOutgoing.close");
|
| + _doneCompleter.complete(_consumer);
|
| + return new Future.immediate(null);
|
| }
|
| +
|
| + Future get done => _doneCompleter.future;
|
| }
|
|
|
|
|
| @@ -954,7 +998,6 @@ class _HttpClientConnection {
|
| _HttpClient this._httpClient)
|
| : _httpParser = new _HttpParser.responseParser() {
|
| _socket.pipe(_httpParser);
|
| - _socket.done.catchError((e) { destroy(); });
|
|
|
| // Set up handlers on the parser here, so we are sure to get 'onDone' from
|
| // the parser.
|
| @@ -983,7 +1026,7 @@ class _HttpClientConnection {
|
| _HttpClientRequest send(Uri uri, int port, String method, bool isDirect) {
|
| // Start with pausing the parser.
|
| _subscription.pause();
|
| - var outgoing = new _HttpOutgoing();
|
| + var outgoing = new _HttpOutgoing(_socket);
|
| // Create new request object, wrapping the outgoing connection.
|
| var request = new _HttpClientRequest(outgoing,
|
| uri,
|
| @@ -1010,56 +1053,52 @@ class _HttpClientConnection {
|
| // Start sending the request (lazy, delayed until the user provides
|
| // data).
|
| _httpParser.responseToMethod = method;
|
| - _streamFuture = outgoing.onStream((stream) {
|
| - return _socket.writeStream(stream)
|
| - .then((s) {
|
| - // Request sent, set up response completer.
|
| - _nextResponseCompleter = new Completer();
|
| -
|
| - // Listen for response.
|
| - _nextResponseCompleter.future
|
| - .then((incoming) {
|
| - incoming.dataDone.then((_) {
|
| - if (incoming.headers.persistentConnection &&
|
| - request.persistentConnection) {
|
| - // Return connection, now we are done.
|
| - _httpClient._returnConnection(this);
|
| - _subscription.resume();
|
| - } else {
|
| - destroy();
|
| - }
|
| - });
|
| - request._onIncoming(incoming);
|
| - })
|
| - // If we see a state error, we failed to get the 'first'
|
| - // element.
|
| - // Transform the error to a HttpParserException, for
|
| - // consistency.
|
| - .catchError((error) {
|
| - throw new HttpParserException(
|
| - "Connection closed before data was received");
|
| - }, test: (error) => error is StateError)
|
| - .catchError((error) {
|
| - // We are done with the socket.
|
| + _streamFuture = outgoing.done
|
| + .then((s) {
|
| + // Request sent, set up response completer.
|
| + _nextResponseCompleter = new Completer();
|
| +
|
| + // Listen for response.
|
| + _nextResponseCompleter.future
|
| + .then((incoming) {
|
| + incoming.dataDone.then((_) {
|
| + if (incoming.headers.persistentConnection &&
|
| + request.persistentConnection) {
|
| + // Return connection, now we are done.
|
| + _httpClient._returnConnection(this);
|
| + _subscription.resume();
|
| + } else {
|
| destroy();
|
| - request._onError(error);
|
| - });
|
| -
|
| - // Resume the parser now we have a handler.
|
| - _subscription.resume();
|
| - return s;
|
| - }, onError: (e) {
|
| - destroy();
|
| - throw e;
|
| - });
|
| - });
|
| + }
|
| + });
|
| + request._onIncoming(incoming);
|
| + })
|
| + // If we see a state error, we failed to get the 'first'
|
| + // element.
|
| + // Transform the error to a HttpParserException, for
|
| + // consistency.
|
| + .catchError((error) {
|
| + throw new HttpParserException(
|
| + "Connection closed before data was received");
|
| + }, test: (error) => error is StateError)
|
| + .catchError((error) {
|
| + // We are done with the socket.
|
| + destroy();
|
| + request._onError(error);
|
| + });
|
| +
|
| + // Resume the parser now we have a handler.
|
| + _subscription.resume();
|
| + return s;
|
| + }, onError: (e) {
|
| + destroy();
|
| + });
|
| return request;
|
| }
|
|
|
| Future<Socket> detachSocket() {
|
| - return _streamFuture
|
| - .then((_) => new _DetachedSocket(_socket, _httpParser.detachIncoming()),
|
| - onError: (_) {});
|
| + return _streamFuture.then(
|
| + (_) => new _DetachedSocket(_socket, _httpParser.detachIncoming()));
|
| }
|
|
|
| void destroy() {
|
| @@ -1071,8 +1110,7 @@ class _HttpClientConnection {
|
| _httpClient._connectionClosed(this);
|
| _streamFuture
|
| // TODO(ajohnsen): Add timeout.
|
| - .then((_) => _socket.destroy(),
|
| - onError: (_) {});
|
| + .then((_) => _socket.destroy());
|
| }
|
|
|
| HttpConnectionInfo get connectionInfo => _HttpConnectionInfo.create(_socket);
|
| @@ -1377,39 +1415,35 @@ class _HttpConnection {
|
| _HttpConnection(Socket this._socket, _HttpServer this._httpServer)
|
| : _httpParser = new _HttpParser.requestParser() {
|
| _socket.pipe(_httpParser);
|
| - _socket.done.catchError((e) => destroy());
|
| _subscription = _httpParser.listen(
|
| (incoming) {
|
| // Only handle one incoming request at the time. Keep the
|
| // stream paused until the request has been send.
|
| _subscription.pause();
|
| _state = _ACTIVE;
|
| - var outgoing = new _HttpOutgoing();
|
| + var outgoing = new _HttpOutgoing(_socket);
|
| var response = new _HttpResponse(incoming.headers.protocolVersion,
|
| outgoing);
|
| var request = new _HttpRequest(response, incoming, _httpServer, this);
|
| - outgoing.onStream((stream) {
|
| - return _streamFuture = _socket.writeStream(stream)
|
| - .then((_) {
|
| - if (_state == _DETACHED) return;
|
| - if (response.persistentConnection &&
|
| - request.persistentConnection &&
|
| - incoming.fullBodyRead) {
|
| - _state = _IDLE;
|
| - // Resume the subscription for incoming requests as the
|
| - // request is now processed.
|
| - _subscription.resume();
|
| - } else {
|
| - // Close socket, keep-alive not used or body sent before
|
| - // received data was handled.
|
| - destroy();
|
| - }
|
| - })
|
| - .catchError((e) {
|
| + _streamFuture = outgoing.done
|
| + .then((_) {
|
| + if (_state == _DETACHED) return;
|
| + if (response.persistentConnection &&
|
| + request.persistentConnection &&
|
| + incoming.fullBodyRead) {
|
| + _state = _IDLE;
|
| + // Resume the subscription for incoming requests as the
|
| + // request is now processed.
|
| + _subscription.resume();
|
| + } else {
|
| + // Close socket, keep-alive not used or body sent before
|
| + // received data was handled.
|
| destroy();
|
| - throw e;
|
| - });
|
| - });
|
| + }
|
| + })
|
| + .catchError((e) {
|
| + destroy();
|
| + });
|
| response._ignoreBody = request.method == "HEAD";
|
| response._httpRequest = request;
|
| _httpServer._handleRequest(request);
|
| @@ -1691,18 +1725,10 @@ class _DetachedSocket extends Stream<List<int>> implements Socket {
|
|
|
| void addError(AsyncError error) => _socket.addError(error);
|
|
|
| - Future<Socket> consume(Stream<List<int>> stream) {
|
| - return _socket.consume(stream);
|
| - }
|
| -
|
| Future<Socket> addStream(Stream<List<int>> stream) {
|
| return _socket.addStream(stream);
|
| }
|
|
|
| - Future<Socket> writeStream(Stream<List<int>> stream) {
|
| - return _socket.writeStream(stream);
|
| - }
|
| -
|
| void destroy() => _socket.destroy();
|
|
|
| Future close() => _socket.close();
|
|
|