Index: sdk/lib/io/http_impl.dart |
diff --git a/sdk/lib/io/http_impl.dart b/sdk/lib/io/http_impl.dart |
index cea3e11e9144c54473916a32031e89736f8b2b0d..de86c08d351bb4aeb9039f88ad01417b70806f66 100644 |
--- a/sdk/lib/io/http_impl.dart |
+++ b/sdk/lib/io/http_impl.dart |
@@ -326,16 +326,22 @@ abstract class _HttpOutboundMessage<T> implements IOSink { |
// requests and in error handling. |
bool _ignoreBody = false; |
bool _headersWritten = false; |
+ bool _asGZip = false; |
+ |
+ IOSink _headersSink; |
+ IOSink _dataSink; |
- IOSink _ioSink; |
final _HttpOutgoing _outgoing; |
final _HttpHeaders headers; |
_HttpOutboundMessage(String protocolVersion, _HttpOutgoing outgoing) |
: _outgoing = outgoing, |
- _ioSink = new IOSink(outgoing, encoding: Encoding.ASCII), |
- headers = new _HttpHeaders(protocolVersion); |
+ _headersSink = new IOSink(outgoing, encoding: Encoding.ASCII), |
+ headers = new _HttpHeaders(protocolVersion) { |
+ _dataSink = new IOSink( |
+ new _HttpOutboundConsumer(_headersSink, _addStream, this)); |
+ } |
int get contentLength => headers.contentLength; |
void set contentLength(int contentLength) { |
@@ -376,7 +382,7 @@ abstract class _HttpOutboundMessage<T> implements IOSink { |
} |
} |
if (string.isEmpty) return; |
- _ioSink.write(string); |
+ _dataSink.write(string); |
} |
void writeAll(Iterable objects, [String separator = ""]) { |
@@ -403,26 +409,17 @@ abstract class _HttpOutboundMessage<T> implements IOSink { |
void add(List<int> data) { |
_writeHeaders(); |
if (data.length == 0) return; |
- _ioSink.add(data); |
+ _dataSink.add(data); |
} |
void addError(AsyncError error) { |
_writeHeaders(); |
- _ioSink.addError(error); |
- } |
- |
- Future<T> consume(Stream<List<int>> stream) { |
- _writeHeaders(); |
- return _ioSink.consume(stream); |
+ _dataSink.addError(error); |
} |
Future<T> addStream(Stream<List<int>> stream) { |
_writeHeaders(); |
- return _ioSink.writeStream(stream).then((_) => this); |
- } |
- |
- Future<T> writeStream(Stream<List<int>> stream) { |
- return addStream(stream); |
+ return _dataSink.addStream(stream); |
} |
Future close() { |
@@ -435,20 +432,15 @@ abstract class _HttpOutboundMessage<T> implements IOSink { |
headers.contentLength = 0; |
} |
_writeHeaders(); |
- return _ioSink.close(); |
+ return _dataSink.close(); |
} |
- Future<T> get done { |
- _writeHeaders(); |
- return _ioSink.done; |
- } |
+ Future<T> get done => _dataSink.done.then((_) => this); |
void _writeHeaders() { |
if (_headersWritten) return; |
_headersWritten = true; |
- _ioSink.encoding = Encoding.ASCII; |
headers._synchronize(); // Be sure the 'chunked' option is updated. |
- bool asGZip = false; |
bool isServerSide = this is _HttpResponse; |
if (isServerSide && headers.chunkedTransferEncoding) { |
var response = this; |
@@ -461,30 +453,28 @@ abstract class _HttpOutboundMessage<T> implements IOSink { |
.any((encoding) => encoding.trim().toLowerCase() == "gzip") && |
contentEncoding == null) { |
headers.set(HttpHeaders.CONTENT_ENCODING, "gzip"); |
- asGZip = true; |
+ _asGZip = true; |
} |
} |
_writeHeader(); |
- _ioSink = new IOSink(new _HttpOutboundConsumer(_ioSink, _consume, asGZip)); |
- _ioSink.encoding = encoding; |
} |
- Future _consume(IOSink ioSink, Stream<List<int>> stream, bool asGZip) { |
+ Future _addStream(IOSink ioSink, Stream<List<int>> stream) { |
int contentLength = headers.contentLength; |
if (_ignoreBody) { |
- ioSink.close(); |
- return stream.fold(null, (x, y) {}).then((_) => this); |
+ stream.fold(null, (x, y) {}).catchError((_) {}); |
+ return ioSink.close().then((_) => this); |
} |
stream = stream.transform(new _BufferTransformer()); |
if (headers.chunkedTransferEncoding) { |
- if (asGZip) { |
+ if (_asGZip) { |
stream = stream.transform(new ZLibDeflater(gzip: true, level: 6)); |
} |
stream = stream.transform(new _ChunkedTransformer()); |
} else if (contentLength >= 0) { |
stream = stream.transform(new _ContentLengthValidator(contentLength)); |
} |
- return stream.pipe(ioSink).then((_) => this); |
+ return ioSink.addStream(stream).then((_) => this); |
} |
void _writeHeader(); // TODO(ajohnsen): Better name. |
@@ -492,21 +482,83 @@ abstract class _HttpOutboundMessage<T> implements IOSink { |
class _HttpOutboundConsumer implements StreamConsumer { |
- Function _consume; |
+ StreamController _controller; |
+ StreamSubscription _subscription; |
+ Function _addStream; |
IOSink _ioSink; |
- bool _asGZip; |
+ Completer _closeCompleter = new Completer(); |
+ Completer _completer; |
+ |
+ _HttpOutboundMessage _outbound; |
_HttpOutboundConsumer(IOSink this._ioSink, |
- Function this._consume, |
- bool this._asGZip); |
+ Function this._addStream, |
+ _HttpOutboundMessage this._outbound); |
- Future consume(var stream) => _consume(_ioSink, stream, _asGZip); |
+ void _onPause() { |
+ if (_controller.isPaused) { |
+ _subscription.pause(); |
+ } else { |
+ _subscription.resume(); |
+ } |
+ } |
+ |
+ void _onListen() { |
+ if (!_controller.hasListener && _subscription != null) { |
+ _subscription.cancel(); |
+ } |
+ } |
+ |
+ _ensureController() { |
+ if (_controller != null) return; |
+ _controller = new StreamController(onPauseStateChange: _onPause, |
+ onSubscriptionStateChange: _onListen); |
+ _addStream(_ioSink, _controller.stream) |
+ .then((_) { |
+ _done(); |
+ _closeCompleter.complete(_outbound); |
+ }, |
+ onError: (error) { |
+ if (!_done(error)) { |
+ _closeCompleter.completeError(error); |
+ } |
+ }); |
+ } |
+ |
+ bool _done([error]) { |
+ if (_completer == null) return false; |
+ var tmp = _completer; |
+ _completer = null; |
+ if (error != null) { |
+ tmp.completeError(error); |
+ } else { |
+ tmp.complete(_outbound); |
+ } |
+ return true; |
+ } |
Future addStream(var stream) { |
- throw new UnimplementedError("_HttpOutboundConsumer.addStream"); |
+ _ensureController(); |
+ _completer = new Completer(); |
+ _subscription = stream.listen( |
+ (data) { |
+ _controller.add(data); |
+ }, |
+ onDone: () { |
+ _done(); |
+ }, |
+ onError: (error) { |
+ _done(error); |
+ }, |
+ unsubscribeOnError: true); |
+ return _completer.future; |
} |
Future close() { |
- throw new UnimplementedError("_HttpOutboundConsumer.close"); |
+ _ensureController(); |
+ _controller.close(); |
+ return _closeCompleter.future.then((_) { |
+ return _ioSink.close().then((_) => _outbound); |
+ }); |
} |
} |
@@ -574,8 +626,8 @@ class _HttpResponse extends _HttpOutboundMessage<HttpResponse> |
// Close connection so the socket is 'free'. |
close(); |
done.catchError((_) { |
- // Catch any error on done, as they automatically will be propegated to |
- // the websocket. |
+ // Catch any error on done, as they automatically will be |
+ // propagated to the websocket. |
}); |
return future; |
} |
@@ -632,7 +684,7 @@ class _HttpResponse extends _HttpOutboundMessage<HttpResponse> |
// Write headers. |
headers._write(buffer); |
writeCRLF(); |
- _ioSink.add(buffer.readBytes()); |
+ _headersSink.add(buffer.readBytes()); |
} |
String _findReasonPhrase(int statusCode) { |
@@ -731,17 +783,13 @@ class _HttpClientRequest extends _HttpOutboundMessage<HttpClientResponse> |
Future<HttpClientResponse> get done { |
if (_response == null) { |
- _response = Future.wait([_responseCompleter.future, super.done]) |
+ _response = Future.wait([_responseCompleter.future, |
+ super.done]) |
.then((list) => list[0]); |
} |
return _response; |
} |
- Future<HttpClientResponse> consume(Stream<List<int>> stream) { |
- super.consume(stream); |
- return done; |
- } |
- |
Future<HttpClientResponse> close() { |
super.close(); |
return done; |
@@ -837,7 +885,7 @@ class _HttpClientRequest extends _HttpOutboundMessage<HttpClientResponse> |
// Write headers. |
headers._write(buffer); |
writeCRLF(); |
- _ioSink.add(buffer.readBytes()); |
+ _headersSink.add(buffer.readBytes()); |
} |
} |
@@ -913,29 +961,25 @@ class _ContentLengthValidator |
// Extends StreamConsumer as this is an internal type, only used to pipe to. |
class _HttpOutgoing implements StreamConsumer<List<int>> { |
- Function _onStream; |
- final Completer _consumeCompleter = new Completer(); |
+ final Completer _doneCompleter = new Completer(); |
+ final StreamConsumer _consumer; |
- Future onStream(Future callback(Stream<List<int>> stream)) { |
- _onStream = callback; |
- return _consumeCompleter.future; |
- } |
- |
- Future consume(Stream<List<int>> stream) { |
- _onStream(stream) |
- .then((_) => _consumeCompleter.complete(), |
- onError: _consumeCompleter.completeError); |
- // Use .then to ensure a Future branch. |
- return _consumeCompleter.future.then((_) => this); |
- } |
+ _HttpOutgoing(StreamConsumer this._consumer); |
Future addStream(Stream<List<int>> stream) { |
- throw new UnimplementedError("_HttpOutgoing.addStream"); |
+ return _consumer.addStream(stream) |
+ .catchError((error) { |
+ _doneCompleter.completeError(error); |
+ throw error; |
+ }); |
} |
Future close() { |
- throw new UnimplementedError("_HttpOutgoing.close"); |
+ _doneCompleter.complete(_consumer); |
+ return new Future.immediate(null); |
} |
+ |
+ Future get done => _doneCompleter.future; |
} |
@@ -954,7 +998,6 @@ class _HttpClientConnection { |
_HttpClient this._httpClient) |
: _httpParser = new _HttpParser.responseParser() { |
_socket.pipe(_httpParser); |
- _socket.done.catchError((e) { destroy(); }); |
// Set up handlers on the parser here, so we are sure to get 'onDone' from |
// the parser. |
@@ -983,7 +1026,7 @@ class _HttpClientConnection { |
_HttpClientRequest send(Uri uri, int port, String method, bool isDirect) { |
// Start with pausing the parser. |
_subscription.pause(); |
- var outgoing = new _HttpOutgoing(); |
+ var outgoing = new _HttpOutgoing(_socket); |
// Create new request object, wrapping the outgoing connection. |
var request = new _HttpClientRequest(outgoing, |
uri, |
@@ -1010,56 +1053,52 @@ class _HttpClientConnection { |
// Start sending the request (lazy, delayed until the user provides |
// data). |
_httpParser.responseToMethod = method; |
- _streamFuture = outgoing.onStream((stream) { |
- return _socket.writeStream(stream) |
- .then((s) { |
- // Request sent, set up response completer. |
- _nextResponseCompleter = new Completer(); |
- |
- // Listen for response. |
- _nextResponseCompleter.future |
- .then((incoming) { |
- incoming.dataDone.then((_) { |
- if (incoming.headers.persistentConnection && |
- request.persistentConnection) { |
- // Return connection, now we are done. |
- _httpClient._returnConnection(this); |
- _subscription.resume(); |
- } else { |
- destroy(); |
- } |
- }); |
- request._onIncoming(incoming); |
- }) |
- // If we see a state error, we failed to get the 'first' |
- // element. |
- // Transform the error to a HttpParserException, for |
- // consistency. |
- .catchError((error) { |
- throw new HttpParserException( |
- "Connection closed before data was received"); |
- }, test: (error) => error is StateError) |
- .catchError((error) { |
- // We are done with the socket. |
+ _streamFuture = outgoing.done |
+ .then((s) { |
+ // Request sent, set up response completer. |
+ _nextResponseCompleter = new Completer(); |
+ |
+ // Listen for response. |
+ _nextResponseCompleter.future |
+ .then((incoming) { |
+ incoming.dataDone.then((_) { |
+ if (incoming.headers.persistentConnection && |
+ request.persistentConnection) { |
+ // Return connection, now we are done. |
+ _httpClient._returnConnection(this); |
+ _subscription.resume(); |
+ } else { |
destroy(); |
- request._onError(error); |
- }); |
- |
- // Resume the parser now we have a handler. |
- _subscription.resume(); |
- return s; |
- }, onError: (e) { |
- destroy(); |
- throw e; |
- }); |
- }); |
+ } |
+ }); |
+ request._onIncoming(incoming); |
+ }) |
+ // If we see a state error, we failed to get the 'first' |
+ // element. |
+ // Transform the error to a HttpParserException, for |
+ // consistency. |
+ .catchError((error) { |
+ throw new HttpParserException( |
+ "Connection closed before data was received"); |
+ }, test: (error) => error is StateError) |
+ .catchError((error) { |
+ // We are done with the socket. |
+ destroy(); |
+ request._onError(error); |
+ }); |
+ |
+ // Resume the parser now we have a handler. |
+ _subscription.resume(); |
+ return s; |
+ }, onError: (e) { |
+ destroy(); |
+ }); |
return request; |
} |
Future<Socket> detachSocket() { |
- return _streamFuture |
- .then((_) => new _DetachedSocket(_socket, _httpParser.detachIncoming()), |
- onError: (_) {}); |
+ return _streamFuture.then( |
+ (_) => new _DetachedSocket(_socket, _httpParser.detachIncoming())); |
} |
void destroy() { |
@@ -1071,8 +1110,7 @@ class _HttpClientConnection { |
_httpClient._connectionClosed(this); |
_streamFuture |
// TODO(ajohnsen): Add timeout. |
- .then((_) => _socket.destroy(), |
- onError: (_) {}); |
+ .then((_) => _socket.destroy()); |
} |
HttpConnectionInfo get connectionInfo => _HttpConnectionInfo.create(_socket); |
@@ -1377,39 +1415,35 @@ class _HttpConnection { |
_HttpConnection(Socket this._socket, _HttpServer this._httpServer) |
: _httpParser = new _HttpParser.requestParser() { |
_socket.pipe(_httpParser); |
- _socket.done.catchError((e) => destroy()); |
_subscription = _httpParser.listen( |
(incoming) { |
// Only handle one incoming request at the time. Keep the |
// stream paused until the request has been send. |
_subscription.pause(); |
_state = _ACTIVE; |
- var outgoing = new _HttpOutgoing(); |
+ var outgoing = new _HttpOutgoing(_socket); |
var response = new _HttpResponse(incoming.headers.protocolVersion, |
outgoing); |
var request = new _HttpRequest(response, incoming, _httpServer, this); |
- outgoing.onStream((stream) { |
- return _streamFuture = _socket.writeStream(stream) |
- .then((_) { |
- if (_state == _DETACHED) return; |
- if (response.persistentConnection && |
- request.persistentConnection && |
- incoming.fullBodyRead) { |
- _state = _IDLE; |
- // Resume the subscription for incoming requests as the |
- // request is now processed. |
- _subscription.resume(); |
- } else { |
- // Close socket, keep-alive not used or body sent before |
- // received data was handled. |
- destroy(); |
- } |
- }) |
- .catchError((e) { |
+ _streamFuture = outgoing.done |
+ .then((_) { |
+ if (_state == _DETACHED) return; |
+ if (response.persistentConnection && |
+ request.persistentConnection && |
+ incoming.fullBodyRead) { |
+ _state = _IDLE; |
+ // Resume the subscription for incoming requests as the |
+ // request is now processed. |
+ _subscription.resume(); |
+ } else { |
+ // Close socket, keep-alive not used or body sent before |
+ // received data was handled. |
destroy(); |
- throw e; |
- }); |
- }); |
+ } |
+ }) |
+ .catchError((e) { |
+ destroy(); |
+ }); |
response._ignoreBody = request.method == "HEAD"; |
response._httpRequest = request; |
_httpServer._handleRequest(request); |
@@ -1691,18 +1725,10 @@ class _DetachedSocket extends Stream<List<int>> implements Socket { |
void addError(AsyncError error) => _socket.addError(error); |
- Future<Socket> consume(Stream<List<int>> stream) { |
- return _socket.consume(stream); |
- } |
- |
Future<Socket> addStream(Stream<List<int>> stream) { |
return _socket.addStream(stream); |
} |
- Future<Socket> writeStream(Stream<List<int>> stream) { |
- return _socket.writeStream(stream); |
- } |
- |
void destroy() => _socket.destroy(); |
Future close() => _socket.close(); |