Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(919)

Unified Diff: sdk/lib/io/http_impl.dart

Issue 14028017: Remove .writeStream, .consume and rewrite IOSink to correctly implement a (sane) well-defined behav… (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Review comments. Created 7 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « sdk/lib/io/http.dart ('k') | sdk/lib/io/http_parser.dart » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: sdk/lib/io/http_impl.dart
diff --git a/sdk/lib/io/http_impl.dart b/sdk/lib/io/http_impl.dart
index cea3e11e9144c54473916a32031e89736f8b2b0d..de86c08d351bb4aeb9039f88ad01417b70806f66 100644
--- a/sdk/lib/io/http_impl.dart
+++ b/sdk/lib/io/http_impl.dart
@@ -326,16 +326,22 @@ abstract class _HttpOutboundMessage<T> implements IOSink {
// requests and in error handling.
bool _ignoreBody = false;
bool _headersWritten = false;
+ bool _asGZip = false;
+
+ IOSink _headersSink;
+ IOSink _dataSink;
- IOSink _ioSink;
final _HttpOutgoing _outgoing;
final _HttpHeaders headers;
_HttpOutboundMessage(String protocolVersion, _HttpOutgoing outgoing)
: _outgoing = outgoing,
- _ioSink = new IOSink(outgoing, encoding: Encoding.ASCII),
- headers = new _HttpHeaders(protocolVersion);
+ _headersSink = new IOSink(outgoing, encoding: Encoding.ASCII),
+ headers = new _HttpHeaders(protocolVersion) {
+ _dataSink = new IOSink(
+ new _HttpOutboundConsumer(_headersSink, _addStream, this));
+ }
int get contentLength => headers.contentLength;
void set contentLength(int contentLength) {
@@ -376,7 +382,7 @@ abstract class _HttpOutboundMessage<T> implements IOSink {
}
}
if (string.isEmpty) return;
- _ioSink.write(string);
+ _dataSink.write(string);
}
void writeAll(Iterable objects, [String separator = ""]) {
@@ -403,26 +409,17 @@ abstract class _HttpOutboundMessage<T> implements IOSink {
void add(List<int> data) {
_writeHeaders();
if (data.length == 0) return;
- _ioSink.add(data);
+ _dataSink.add(data);
}
void addError(AsyncError error) {
_writeHeaders();
- _ioSink.addError(error);
- }
-
- Future<T> consume(Stream<List<int>> stream) {
- _writeHeaders();
- return _ioSink.consume(stream);
+ _dataSink.addError(error);
}
Future<T> addStream(Stream<List<int>> stream) {
_writeHeaders();
- return _ioSink.writeStream(stream).then((_) => this);
- }
-
- Future<T> writeStream(Stream<List<int>> stream) {
- return addStream(stream);
+ return _dataSink.addStream(stream);
}
Future close() {
@@ -435,20 +432,15 @@ abstract class _HttpOutboundMessage<T> implements IOSink {
headers.contentLength = 0;
}
_writeHeaders();
- return _ioSink.close();
+ return _dataSink.close();
}
- Future<T> get done {
- _writeHeaders();
- return _ioSink.done;
- }
+ Future<T> get done => _dataSink.done.then((_) => this);
void _writeHeaders() {
if (_headersWritten) return;
_headersWritten = true;
- _ioSink.encoding = Encoding.ASCII;
headers._synchronize(); // Be sure the 'chunked' option is updated.
- bool asGZip = false;
bool isServerSide = this is _HttpResponse;
if (isServerSide && headers.chunkedTransferEncoding) {
var response = this;
@@ -461,30 +453,28 @@ abstract class _HttpOutboundMessage<T> implements IOSink {
.any((encoding) => encoding.trim().toLowerCase() == "gzip") &&
contentEncoding == null) {
headers.set(HttpHeaders.CONTENT_ENCODING, "gzip");
- asGZip = true;
+ _asGZip = true;
}
}
_writeHeader();
- _ioSink = new IOSink(new _HttpOutboundConsumer(_ioSink, _consume, asGZip));
- _ioSink.encoding = encoding;
}
- Future _consume(IOSink ioSink, Stream<List<int>> stream, bool asGZip) {
+ Future _addStream(IOSink ioSink, Stream<List<int>> stream) {
int contentLength = headers.contentLength;
if (_ignoreBody) {
- ioSink.close();
- return stream.fold(null, (x, y) {}).then((_) => this);
+ stream.fold(null, (x, y) {}).catchError((_) {});
+ return ioSink.close().then((_) => this);
}
stream = stream.transform(new _BufferTransformer());
if (headers.chunkedTransferEncoding) {
- if (asGZip) {
+ if (_asGZip) {
stream = stream.transform(new ZLibDeflater(gzip: true, level: 6));
}
stream = stream.transform(new _ChunkedTransformer());
} else if (contentLength >= 0) {
stream = stream.transform(new _ContentLengthValidator(contentLength));
}
- return stream.pipe(ioSink).then((_) => this);
+ return ioSink.addStream(stream).then((_) => this);
}
void _writeHeader(); // TODO(ajohnsen): Better name.
@@ -492,21 +482,83 @@ abstract class _HttpOutboundMessage<T> implements IOSink {
class _HttpOutboundConsumer implements StreamConsumer {
- Function _consume;
+ StreamController _controller;
+ StreamSubscription _subscription;
+ Function _addStream;
IOSink _ioSink;
- bool _asGZip;
+ Completer _closeCompleter = new Completer();
+ Completer _completer;
+
+ _HttpOutboundMessage _outbound;
_HttpOutboundConsumer(IOSink this._ioSink,
- Function this._consume,
- bool this._asGZip);
+ Function this._addStream,
+ _HttpOutboundMessage this._outbound);
- Future consume(var stream) => _consume(_ioSink, stream, _asGZip);
+ void _onPause() {
+ if (_controller.isPaused) {
+ _subscription.pause();
+ } else {
+ _subscription.resume();
+ }
+ }
+
+ void _onListen() {
+ if (!_controller.hasListener && _subscription != null) {
+ _subscription.cancel();
+ }
+ }
+
+ _ensureController() {
+ if (_controller != null) return;
+ _controller = new StreamController(onPauseStateChange: _onPause,
+ onSubscriptionStateChange: _onListen);
+ _addStream(_ioSink, _controller.stream)
+ .then((_) {
+ _done();
+ _closeCompleter.complete(_outbound);
+ },
+ onError: (error) {
+ if (!_done(error)) {
+ _closeCompleter.completeError(error);
+ }
+ });
+ }
+
+ bool _done([error]) {
+ if (_completer == null) return false;
+ var tmp = _completer;
+ _completer = null;
+ if (error != null) {
+ tmp.completeError(error);
+ } else {
+ tmp.complete(_outbound);
+ }
+ return true;
+ }
Future addStream(var stream) {
- throw new UnimplementedError("_HttpOutboundConsumer.addStream");
+ _ensureController();
+ _completer = new Completer();
+ _subscription = stream.listen(
+ (data) {
+ _controller.add(data);
+ },
+ onDone: () {
+ _done();
+ },
+ onError: (error) {
+ _done(error);
+ },
+ unsubscribeOnError: true);
+ return _completer.future;
}
Future close() {
- throw new UnimplementedError("_HttpOutboundConsumer.close");
+ _ensureController();
+ _controller.close();
+ return _closeCompleter.future.then((_) {
+ return _ioSink.close().then((_) => _outbound);
+ });
}
}
@@ -574,8 +626,8 @@ class _HttpResponse extends _HttpOutboundMessage<HttpResponse>
// Close connection so the socket is 'free'.
close();
done.catchError((_) {
- // Catch any error on done, as they automatically will be propegated to
- // the websocket.
+ // Catch any error on done, as they automatically will be
+ // propagated to the websocket.
});
return future;
}
@@ -632,7 +684,7 @@ class _HttpResponse extends _HttpOutboundMessage<HttpResponse>
// Write headers.
headers._write(buffer);
writeCRLF();
- _ioSink.add(buffer.readBytes());
+ _headersSink.add(buffer.readBytes());
}
String _findReasonPhrase(int statusCode) {
@@ -731,17 +783,13 @@ class _HttpClientRequest extends _HttpOutboundMessage<HttpClientResponse>
Future<HttpClientResponse> get done {
if (_response == null) {
- _response = Future.wait([_responseCompleter.future, super.done])
+ _response = Future.wait([_responseCompleter.future,
+ super.done])
.then((list) => list[0]);
}
return _response;
}
- Future<HttpClientResponse> consume(Stream<List<int>> stream) {
- super.consume(stream);
- return done;
- }
-
Future<HttpClientResponse> close() {
super.close();
return done;
@@ -837,7 +885,7 @@ class _HttpClientRequest extends _HttpOutboundMessage<HttpClientResponse>
// Write headers.
headers._write(buffer);
writeCRLF();
- _ioSink.add(buffer.readBytes());
+ _headersSink.add(buffer.readBytes());
}
}
@@ -913,29 +961,25 @@ class _ContentLengthValidator
// Extends StreamConsumer as this is an internal type, only used to pipe to.
class _HttpOutgoing implements StreamConsumer<List<int>> {
- Function _onStream;
- final Completer _consumeCompleter = new Completer();
+ final Completer _doneCompleter = new Completer();
+ final StreamConsumer _consumer;
- Future onStream(Future callback(Stream<List<int>> stream)) {
- _onStream = callback;
- return _consumeCompleter.future;
- }
-
- Future consume(Stream<List<int>> stream) {
- _onStream(stream)
- .then((_) => _consumeCompleter.complete(),
- onError: _consumeCompleter.completeError);
- // Use .then to ensure a Future branch.
- return _consumeCompleter.future.then((_) => this);
- }
+ _HttpOutgoing(StreamConsumer this._consumer);
Future addStream(Stream<List<int>> stream) {
- throw new UnimplementedError("_HttpOutgoing.addStream");
+ return _consumer.addStream(stream)
+ .catchError((error) {
+ _doneCompleter.completeError(error);
+ throw error;
+ });
}
Future close() {
- throw new UnimplementedError("_HttpOutgoing.close");
+ _doneCompleter.complete(_consumer);
+ return new Future.immediate(null);
}
+
+ Future get done => _doneCompleter.future;
}
@@ -954,7 +998,6 @@ class _HttpClientConnection {
_HttpClient this._httpClient)
: _httpParser = new _HttpParser.responseParser() {
_socket.pipe(_httpParser);
- _socket.done.catchError((e) { destroy(); });
// Set up handlers on the parser here, so we are sure to get 'onDone' from
// the parser.
@@ -983,7 +1026,7 @@ class _HttpClientConnection {
_HttpClientRequest send(Uri uri, int port, String method, bool isDirect) {
// Start with pausing the parser.
_subscription.pause();
- var outgoing = new _HttpOutgoing();
+ var outgoing = new _HttpOutgoing(_socket);
// Create new request object, wrapping the outgoing connection.
var request = new _HttpClientRequest(outgoing,
uri,
@@ -1010,56 +1053,52 @@ class _HttpClientConnection {
// Start sending the request (lazy, delayed until the user provides
// data).
_httpParser.responseToMethod = method;
- _streamFuture = outgoing.onStream((stream) {
- return _socket.writeStream(stream)
- .then((s) {
- // Request sent, set up response completer.
- _nextResponseCompleter = new Completer();
-
- // Listen for response.
- _nextResponseCompleter.future
- .then((incoming) {
- incoming.dataDone.then((_) {
- if (incoming.headers.persistentConnection &&
- request.persistentConnection) {
- // Return connection, now we are done.
- _httpClient._returnConnection(this);
- _subscription.resume();
- } else {
- destroy();
- }
- });
- request._onIncoming(incoming);
- })
- // If we see a state error, we failed to get the 'first'
- // element.
- // Transform the error to a HttpParserException, for
- // consistency.
- .catchError((error) {
- throw new HttpParserException(
- "Connection closed before data was received");
- }, test: (error) => error is StateError)
- .catchError((error) {
- // We are done with the socket.
+ _streamFuture = outgoing.done
+ .then((s) {
+ // Request sent, set up response completer.
+ _nextResponseCompleter = new Completer();
+
+ // Listen for response.
+ _nextResponseCompleter.future
+ .then((incoming) {
+ incoming.dataDone.then((_) {
+ if (incoming.headers.persistentConnection &&
+ request.persistentConnection) {
+ // Return connection, now we are done.
+ _httpClient._returnConnection(this);
+ _subscription.resume();
+ } else {
destroy();
- request._onError(error);
- });
-
- // Resume the parser now we have a handler.
- _subscription.resume();
- return s;
- }, onError: (e) {
- destroy();
- throw e;
- });
- });
+ }
+ });
+ request._onIncoming(incoming);
+ })
+ // If we see a state error, we failed to get the 'first'
+ // element.
+ // Transform the error to a HttpParserException, for
+ // consistency.
+ .catchError((error) {
+ throw new HttpParserException(
+ "Connection closed before data was received");
+ }, test: (error) => error is StateError)
+ .catchError((error) {
+ // We are done with the socket.
+ destroy();
+ request._onError(error);
+ });
+
+ // Resume the parser now we have a handler.
+ _subscription.resume();
+ return s;
+ }, onError: (e) {
+ destroy();
+ });
return request;
}
Future<Socket> detachSocket() {
- return _streamFuture
- .then((_) => new _DetachedSocket(_socket, _httpParser.detachIncoming()),
- onError: (_) {});
+ return _streamFuture.then(
+ (_) => new _DetachedSocket(_socket, _httpParser.detachIncoming()));
}
void destroy() {
@@ -1071,8 +1110,7 @@ class _HttpClientConnection {
_httpClient._connectionClosed(this);
_streamFuture
// TODO(ajohnsen): Add timeout.
- .then((_) => _socket.destroy(),
- onError: (_) {});
+ .then((_) => _socket.destroy());
}
HttpConnectionInfo get connectionInfo => _HttpConnectionInfo.create(_socket);
@@ -1377,39 +1415,35 @@ class _HttpConnection {
_HttpConnection(Socket this._socket, _HttpServer this._httpServer)
: _httpParser = new _HttpParser.requestParser() {
_socket.pipe(_httpParser);
- _socket.done.catchError((e) => destroy());
_subscription = _httpParser.listen(
(incoming) {
// Only handle one incoming request at the time. Keep the
// stream paused until the request has been send.
_subscription.pause();
_state = _ACTIVE;
- var outgoing = new _HttpOutgoing();
+ var outgoing = new _HttpOutgoing(_socket);
var response = new _HttpResponse(incoming.headers.protocolVersion,
outgoing);
var request = new _HttpRequest(response, incoming, _httpServer, this);
- outgoing.onStream((stream) {
- return _streamFuture = _socket.writeStream(stream)
- .then((_) {
- if (_state == _DETACHED) return;
- if (response.persistentConnection &&
- request.persistentConnection &&
- incoming.fullBodyRead) {
- _state = _IDLE;
- // Resume the subscription for incoming requests as the
- // request is now processed.
- _subscription.resume();
- } else {
- // Close socket, keep-alive not used or body sent before
- // received data was handled.
- destroy();
- }
- })
- .catchError((e) {
+ _streamFuture = outgoing.done
+ .then((_) {
+ if (_state == _DETACHED) return;
+ if (response.persistentConnection &&
+ request.persistentConnection &&
+ incoming.fullBodyRead) {
+ _state = _IDLE;
+ // Resume the subscription for incoming requests as the
+ // request is now processed.
+ _subscription.resume();
+ } else {
+ // Close socket, keep-alive not used or body sent before
+ // received data was handled.
destroy();
- throw e;
- });
- });
+ }
+ })
+ .catchError((e) {
+ destroy();
+ });
response._ignoreBody = request.method == "HEAD";
response._httpRequest = request;
_httpServer._handleRequest(request);
@@ -1691,18 +1725,10 @@ class _DetachedSocket extends Stream<List<int>> implements Socket {
void addError(AsyncError error) => _socket.addError(error);
- Future<Socket> consume(Stream<List<int>> stream) {
- return _socket.consume(stream);
- }
-
Future<Socket> addStream(Stream<List<int>> stream) {
return _socket.addStream(stream);
}
- Future<Socket> writeStream(Stream<List<int>> stream) {
- return _socket.writeStream(stream);
- }
-
void destroy() => _socket.destroy();
Future close() => _socket.close();
« no previous file with comments | « sdk/lib/io/http.dart ('k') | sdk/lib/io/http_parser.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698