Chromium Code Reviews| Index: sdk/lib/io/http_impl.dart |
| diff --git a/sdk/lib/io/http_impl.dart b/sdk/lib/io/http_impl.dart |
| index cea3e11e9144c54473916a32031e89736f8b2b0d..603c41a2d08ec34ecf1f8f6c048a6e3b8a837ae5 100644 |
| --- a/sdk/lib/io/http_impl.dart |
| +++ b/sdk/lib/io/http_impl.dart |
| @@ -326,16 +326,22 @@ abstract class _HttpOutboundMessage<T> implements IOSink { |
| // requests and in error handling. |
| bool _ignoreBody = false; |
| bool _headersWritten = false; |
| + bool _asGZip = false; |
| + |
| + IOSink _headersSink; |
| + IOSink _dataSink; |
| - IOSink _ioSink; |
| final _HttpOutgoing _outgoing; |
| final _HttpHeaders headers; |
| _HttpOutboundMessage(String protocolVersion, _HttpOutgoing outgoing) |
| : _outgoing = outgoing, |
| - _ioSink = new IOSink(outgoing, encoding: Encoding.ASCII), |
| - headers = new _HttpHeaders(protocolVersion); |
| + _headersSink = new IOSink(outgoing, encoding: Encoding.ASCII), |
| + headers = new _HttpHeaders(protocolVersion) { |
| + _dataSink = new IOSink( |
| + new _HttpOutboundConsumer(_headersSink, _addStream, this)); |
| + } |
| int get contentLength => headers.contentLength; |
| void set contentLength(int contentLength) { |
| @@ -376,7 +382,7 @@ abstract class _HttpOutboundMessage<T> implements IOSink { |
| } |
| } |
| if (string.isEmpty) return; |
| - _ioSink.write(string); |
| + _dataSink.write(string); |
| } |
| void writeAll(Iterable objects, [String separator = ""]) { |
| @@ -403,26 +409,17 @@ abstract class _HttpOutboundMessage<T> implements IOSink { |
| void add(List<int> data) { |
| _writeHeaders(); |
| if (data.length == 0) return; |
| - _ioSink.add(data); |
| + _dataSink.add(data); |
| } |
| void addError(AsyncError error) { |
| _writeHeaders(); |
| - _ioSink.addError(error); |
| - } |
| - |
| - Future<T> consume(Stream<List<int>> stream) { |
| - _writeHeaders(); |
| - return _ioSink.consume(stream); |
| + _dataSink.addError(error); |
| } |
| Future<T> addStream(Stream<List<int>> stream) { |
| _writeHeaders(); |
| - return _ioSink.writeStream(stream).then((_) => this); |
| - } |
| - |
| - Future<T> writeStream(Stream<List<int>> stream) { |
| - return addStream(stream); |
| + return _dataSink.addStream(stream); |
| } |
| Future close() { |
| @@ -435,20 +432,15 @@ abstract class _HttpOutboundMessage<T> implements IOSink { |
| headers.contentLength = 0; |
| } |
| _writeHeaders(); |
| - return _ioSink.close(); |
| + return _dataSink.close(); |
| } |
| - Future<T> get done { |
| - _writeHeaders(); |
| - return _ioSink.done; |
| - } |
| + Future<T> get done => _dataSink.done.then((_) => this); |
| void _writeHeaders() { |
| if (_headersWritten) return; |
| _headersWritten = true; |
| - _ioSink.encoding = Encoding.ASCII; |
| headers._synchronize(); // Be sure the 'chunked' option is updated. |
| - bool asGZip = false; |
| bool isServerSide = this is _HttpResponse; |
| if (isServerSide && headers.chunkedTransferEncoding) { |
| var response = this; |
| @@ -461,30 +453,28 @@ abstract class _HttpOutboundMessage<T> implements IOSink { |
| .any((encoding) => encoding.trim().toLowerCase() == "gzip") && |
| contentEncoding == null) { |
| headers.set(HttpHeaders.CONTENT_ENCODING, "gzip"); |
| - asGZip = true; |
| + _asGZip = true; |
| } |
| } |
| _writeHeader(); |
| - _ioSink = new IOSink(new _HttpOutboundConsumer(_ioSink, _consume, asGZip)); |
| - _ioSink.encoding = encoding; |
| } |
| - Future _consume(IOSink ioSink, Stream<List<int>> stream, bool asGZip) { |
| + Future _addStream(IOSink ioSink, Stream<List<int>> stream) { |
| int contentLength = headers.contentLength; |
| if (_ignoreBody) { |
| - ioSink.close(); |
| - return stream.fold(null, (x, y) {}).then((_) => this); |
| + stream.fold(null, (x, y) {}).catchError((_) {}); |
| + return ioSink.close().then((_) => this); |
| } |
| stream = stream.transform(new _BufferTransformer()); |
| if (headers.chunkedTransferEncoding) { |
| - if (asGZip) { |
| + if (_asGZip) { |
| stream = stream.transform(new ZLibDeflater(gzip: true, level: 6)); |
| } |
| stream = stream.transform(new _ChunkedTransformer()); |
| } else if (contentLength >= 0) { |
| stream = stream.transform(new _ContentLengthValidator(contentLength)); |
| } |
| - return stream.pipe(ioSink).then((_) => this); |
| + return ioSink.addStream(stream).then((_) => this); |
| } |
| void _writeHeader(); // TODO(ajohnsen): Better name. |
| @@ -492,21 +482,83 @@ abstract class _HttpOutboundMessage<T> implements IOSink { |
| class _HttpOutboundConsumer implements StreamConsumer { |
| - Function _consume; |
| + StreamController _controller; |
| + StreamSubscription _subscription; |
| + Function _addStream; |
| IOSink _ioSink; |
| - bool _asGZip; |
| + Completer _closeCompleter = new Completer(); |
| + Completer _completer; |
| + |
| + _HttpOutboundMessage _outbound; |
| _HttpOutboundConsumer(IOSink this._ioSink, |
| - Function this._consume, |
| - bool this._asGZip); |
| + Function this._addStream, |
| + _HttpOutboundMessage this._outbound); |
| - Future consume(var stream) => _consume(_ioSink, stream, _asGZip); |
| + void _onPause() { |
| + if (_controller.isPaused) { |
| + _subscription.pause(); |
| + } else { |
| + _subscription.resume(); |
| + } |
| + } |
| + |
| + void _onListen() { |
| + if (!_controller.hasListener && _subscription != null) { |
| + _subscription.cancel(); |
| + } |
| + } |
| + |
| + _ensureController() { |
| + if (_controller != null) return; |
| + _controller = new StreamController(onPauseStateChange: _onPause, |
| + onSubscriptionStateChange: _onListen); |
| + _addStream(_ioSink, _controller.stream) |
| + .then((v) { |
|
Søren Gjesse
2013/04/15 06:56:30
v -> _
Anders Johnsen
2013/04/15 07:35:20
Done.
|
| + _done(); |
| + _closeCompleter.complete(_outbound); |
| + }, |
| + onError: (error) { |
| + if (!_done(error)) { |
| + _closeCompleter.completeError(error); |
| + } |
| + }); |
| + } |
| + |
| + bool _done([error]) { |
| + if (_completer == null) return false; |
| + var tmp = _completer; |
| + _completer = null; |
| + if (error != null) { |
| + tmp.completeError(error); |
| + } else { |
| + tmp.complete(_outbound); |
| + } |
| + return true; |
| + } |
| Future addStream(var stream) { |
| - throw new UnimplementedError("_HttpOutboundConsumer.addStream"); |
| + _ensureController(); |
| + _completer = new Completer(); |
| + _subscription = stream.listen( |
| + (data) { |
| + _controller.add(data); |
| + }, |
| + onDone: () { |
| + _done(); |
| + }, |
| + onError: (error) { |
| + _done(error); |
| + }, |
| + unsubscribeOnError: true); |
| + return _completer.future; |
| } |
| Future close() { |
| - throw new UnimplementedError("_HttpOutboundConsumer.close"); |
| + _ensureController(); |
| + _controller.close(); |
| + return _closeCompleter.future.then((_) { |
| + return _ioSink.close().then((_) => _outbound); |
| + }); |
| } |
| } |
| @@ -538,7 +590,8 @@ class _BufferTransformer extends StreamEventTransformer<List<int>, List<int>> { |
| void flush(EventSink<List<int>> sink) { |
| if (_buffer.length > 0) { |
| - sink.add(_buffer.readBytes()); |
| + var data = _buffer.readBytes(); |
|
Søren Gjesse
2013/04/15 06:56:30
Why this change?
Anders Johnsen
2013/04/15 07:35:20
Done.
|
| + sink.add(data); |
| _buffer.clear(); |
| } |
| } |
| @@ -574,8 +627,8 @@ class _HttpResponse extends _HttpOutboundMessage<HttpResponse> |
| // Close connection so the socket is 'free'. |
| close(); |
| done.catchError((_) { |
| - // Catch any error on done, as they automatically will be propegated to |
| - // the websocket. |
| + // Catch any error on done, as they automatically will be |
| + // propegated to the websocket. |
|
Søren Gjesse
2013/04/15 06:56:30
propagated
Anders Johnsen
2013/04/15 07:35:20
Done.
|
| }); |
| return future; |
| } |
| @@ -632,7 +685,7 @@ class _HttpResponse extends _HttpOutboundMessage<HttpResponse> |
| // Write headers. |
| headers._write(buffer); |
| writeCRLF(); |
| - _ioSink.add(buffer.readBytes()); |
| + _headersSink.add(buffer.readBytes()); |
| } |
| String _findReasonPhrase(int statusCode) { |
| @@ -731,17 +784,13 @@ class _HttpClientRequest extends _HttpOutboundMessage<HttpClientResponse> |
| Future<HttpClientResponse> get done { |
| if (_response == null) { |
| - _response = Future.wait([_responseCompleter.future, super.done]) |
| + _response = Future.wait([_responseCompleter.future, |
| + super.done]) |
| .then((list) => list[0]); |
| } |
| return _response; |
| } |
| - Future<HttpClientResponse> consume(Stream<List<int>> stream) { |
| - super.consume(stream); |
| - return done; |
| - } |
| - |
| Future<HttpClientResponse> close() { |
| super.close(); |
| return done; |
| @@ -837,7 +886,7 @@ class _HttpClientRequest extends _HttpOutboundMessage<HttpClientResponse> |
| // Write headers. |
| headers._write(buffer); |
| writeCRLF(); |
| - _ioSink.add(buffer.readBytes()); |
| + _headersSink.add(buffer.readBytes()); |
| } |
| } |
| @@ -913,29 +962,25 @@ class _ContentLengthValidator |
| // Extends StreamConsumer as this is an internal type, only used to pipe to. |
| class _HttpOutgoing implements StreamConsumer<List<int>> { |
| - Function _onStream; |
| - final Completer _consumeCompleter = new Completer(); |
| - |
| - Future onStream(Future callback(Stream<List<int>> stream)) { |
| - _onStream = callback; |
| - return _consumeCompleter.future; |
| - } |
| + final Completer _doneCompleter = new Completer(); |
| + final StreamConsumer _consumer; |
| - Future consume(Stream<List<int>> stream) { |
| - _onStream(stream) |
| - .then((_) => _consumeCompleter.complete(), |
| - onError: _consumeCompleter.completeError); |
| - // Use .then to ensure a Future branch. |
| - return _consumeCompleter.future.then((_) => this); |
| - } |
| + _HttpOutgoing(StreamConsumer this._consumer); |
| Future addStream(Stream<List<int>> stream) { |
| - throw new UnimplementedError("_HttpOutgoing.addStream"); |
| + return _consumer.addStream(stream) |
| + .catchError((error) { |
| + _doneCompleter.completeError(error); |
| + throw error; |
| + }); |
| } |
| Future close() { |
| - throw new UnimplementedError("_HttpOutgoing.close"); |
| + _doneCompleter.complete(_consumer); |
| + return new Future.immediate(null); |
| } |
| + |
| + Future get done => _doneCompleter.future; |
| } |
| @@ -954,7 +999,6 @@ class _HttpClientConnection { |
| _HttpClient this._httpClient) |
| : _httpParser = new _HttpParser.responseParser() { |
| _socket.pipe(_httpParser); |
| - _socket.done.catchError((e) { destroy(); }); |
| // Set up handlers on the parser here, so we are sure to get 'onDone' from |
| // the parser. |
| @@ -983,7 +1027,7 @@ class _HttpClientConnection { |
| _HttpClientRequest send(Uri uri, int port, String method, bool isDirect) { |
| // Start with pausing the parser. |
| _subscription.pause(); |
| - var outgoing = new _HttpOutgoing(); |
| + var outgoing = new _HttpOutgoing(_socket); |
| // Create new request object, wrapping the outgoing connection. |
| var request = new _HttpClientRequest(outgoing, |
| uri, |
| @@ -1010,56 +1054,52 @@ class _HttpClientConnection { |
| // Start sending the request (lazy, delayed until the user provides |
| // data). |
| _httpParser.responseToMethod = method; |
| - _streamFuture = outgoing.onStream((stream) { |
| - return _socket.writeStream(stream) |
| - .then((s) { |
| - // Request sent, set up response completer. |
| - _nextResponseCompleter = new Completer(); |
| - |
| - // Listen for response. |
| - _nextResponseCompleter.future |
| - .then((incoming) { |
| - incoming.dataDone.then((_) { |
| - if (incoming.headers.persistentConnection && |
| - request.persistentConnection) { |
| - // Return connection, now we are done. |
| - _httpClient._returnConnection(this); |
| - _subscription.resume(); |
| - } else { |
| - destroy(); |
| - } |
| - }); |
| - request._onIncoming(incoming); |
| - }) |
| - // If we see a state error, we failed to get the 'first' |
| - // element. |
| - // Transform the error to a HttpParserException, for |
| - // consistency. |
| - .catchError((error) { |
| - throw new HttpParserException( |
| - "Connection closed before data was received"); |
| - }, test: (error) => error is StateError) |
| - .catchError((error) { |
| - // We are done with the socket. |
| + _streamFuture = outgoing.done |
| + .then((s) { |
| + // Request sent, set up response completer. |
| + _nextResponseCompleter = new Completer(); |
| + |
| + // Listen for response. |
| + _nextResponseCompleter.future |
| + .then((incoming) { |
| + incoming.dataDone.then((_) { |
| + if (incoming.headers.persistentConnection && |
| + request.persistentConnection) { |
| + // Return connection, now we are done. |
| + _httpClient._returnConnection(this); |
| + _subscription.resume(); |
| + } else { |
| destroy(); |
| - request._onError(error); |
| - }); |
| - |
| - // Resume the parser now we have a handler. |
| - _subscription.resume(); |
| - return s; |
| - }, onError: (e) { |
| - destroy(); |
| - throw e; |
| - }); |
| - }); |
| + } |
| + }); |
| + request._onIncoming(incoming); |
| + }) |
| + // If we see a state error, we failed to get the 'first' |
| + // element. |
| + // Transform the error to a HttpParserException, for |
| + // consistency. |
| + .catchError((error) { |
| + throw new HttpParserException( |
| + "Connection closed before data was received"); |
| + }, test: (error) => error is StateError) |
| + .catchError((error) { |
| + // We are done with the socket. |
| + destroy(); |
| + request._onError(error); |
| + }); |
| + |
| + // Resume the parser now we have a handler. |
| + _subscription.resume(); |
| + return s; |
| + }, onError: (e) { |
| + destroy(); |
| + }); |
| return request; |
| } |
| Future<Socket> detachSocket() { |
| - return _streamFuture |
| - .then((_) => new _DetachedSocket(_socket, _httpParser.detachIncoming()), |
| - onError: (_) {}); |
| + return _streamFuture.then( |
| + (_) => new _DetachedSocket(_socket, _httpParser.detachIncoming())); |
| } |
| void destroy() { |
| @@ -1071,8 +1111,7 @@ class _HttpClientConnection { |
| _httpClient._connectionClosed(this); |
| _streamFuture |
| // TODO(ajohnsen): Add timeout. |
| - .then((_) => _socket.destroy(), |
| - onError: (_) {}); |
| + .then((_) => _socket.destroy()); |
| } |
| HttpConnectionInfo get connectionInfo => _HttpConnectionInfo.create(_socket); |
| @@ -1377,39 +1416,35 @@ class _HttpConnection { |
| _HttpConnection(Socket this._socket, _HttpServer this._httpServer) |
| : _httpParser = new _HttpParser.requestParser() { |
| _socket.pipe(_httpParser); |
| - _socket.done.catchError((e) => destroy()); |
| _subscription = _httpParser.listen( |
| (incoming) { |
| // Only handle one incoming request at the time. Keep the |
| // stream paused until the request has been send. |
| _subscription.pause(); |
| _state = _ACTIVE; |
| - var outgoing = new _HttpOutgoing(); |
| + var outgoing = new _HttpOutgoing(_socket); |
| var response = new _HttpResponse(incoming.headers.protocolVersion, |
| outgoing); |
| var request = new _HttpRequest(response, incoming, _httpServer, this); |
| - outgoing.onStream((stream) { |
| - return _streamFuture = _socket.writeStream(stream) |
| - .then((_) { |
| - if (_state == _DETACHED) return; |
| - if (response.persistentConnection && |
| - request.persistentConnection && |
| - incoming.fullBodyRead) { |
| - _state = _IDLE; |
| - // Resume the subscription for incoming requests as the |
| - // request is now processed. |
| - _subscription.resume(); |
| - } else { |
| - // Close socket, keep-alive not used or body sent before |
| - // received data was handled. |
| - destroy(); |
| - } |
| - }) |
| - .catchError((e) { |
| + _streamFuture = outgoing.done |
| + .then((_) { |
| + if (_state == _DETACHED) return; |
| + if (response.persistentConnection && |
| + request.persistentConnection && |
| + incoming.fullBodyRead) { |
| + _state = _IDLE; |
| + // Resume the subscription for incoming requests as the |
| + // request is now processed. |
| + _subscription.resume(); |
| + } else { |
| + // Close socket, keep-alive not used or body sent before |
| + // received data was handled. |
| destroy(); |
| - throw e; |
| - }); |
| - }); |
| + } |
| + }) |
| + .catchError((e) { |
| + destroy(); |
| + }); |
| response._ignoreBody = request.method == "HEAD"; |
| response._httpRequest = request; |
| _httpServer._handleRequest(request); |
| @@ -1691,18 +1726,10 @@ class _DetachedSocket extends Stream<List<int>> implements Socket { |
| void addError(AsyncError error) => _socket.addError(error); |
| - Future<Socket> consume(Stream<List<int>> stream) { |
| - return _socket.consume(stream); |
| - } |
| - |
| Future<Socket> addStream(Stream<List<int>> stream) { |
| return _socket.addStream(stream); |
| } |
| - Future<Socket> writeStream(Stream<List<int>> stream) { |
| - return _socket.writeStream(stream); |
| - } |
| - |
| void destroy() => _socket.destroy(); |
| Future close() => _socket.close(); |