Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(184)

Unified Diff: sdk/lib/io/http_impl.dart

Issue 15842004: Support auto-drain of HttpRequest data. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Add documentation. Created 7 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « sdk/lib/io/http.dart ('k') | sdk/lib/io/http_parser.dart » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: sdk/lib/io/http_impl.dart
diff --git a/sdk/lib/io/http_impl.dart b/sdk/lib/io/http_impl.dart
index dec363306776c1f0c380a365b476abd655bd748f..7e16e5fe4662e3a908d7faf1c39a9227a049bb89 100644
--- a/sdk/lib/io/http_impl.dart
+++ b/sdk/lib/io/http_impl.dart
@@ -23,6 +23,8 @@ class _HttpIncoming extends Stream<List<int>> {
String method;
Uri uri;
+ bool hasSubscriber = false;
+
// The transfer length if the length of the message body as it
// appears in the message (RFC 2616 section 4.4). This can be -1 if
// the length of the massage body is not known due to transfer
@@ -38,6 +40,7 @@ class _HttpIncoming extends Stream<List<int>> {
{void onError(error),
void onDone(),
bool cancelOnError}) {
+ hasSubscriber = true;
return _stream.listen(onData,
onError: onError,
onDone: onDone,
@@ -49,6 +52,7 @@ class _HttpIncoming extends Stream<List<int>> {
void close(bool closing) {
fullBodyRead = true;
+ hasSubscriber = true;
_dataCompleter.complete(closing);
}
}
@@ -274,7 +278,7 @@ class _HttpClientResponse
Future<HttpClientResponse> _authenticate(bool proxyAuth) {
Future<HttpClientResponse> retry() {
// Drain body and retry.
- return fold(null, (x, y) {}).then((_) {
+ return drain().then((_) {
return _httpClient._openUrlFromRequest(_httpRequest.method,
_httpRequest.uri,
_httpRequest)
@@ -470,45 +474,56 @@ abstract class _HttpOutboundMessage<T> implements IOSink {
Future<T> get done => _dataSink.done;
- void _writeHeaders() {
- if (_headersWritten) return;
+ Future _writeHeaders({drainRequest: true}) {
+ if (_headersWritten) return new Future.value();
_headersWritten = true;
headers._synchronize(); // Be sure the 'chunked' option is updated.
bool isServerSide = this is _HttpResponse;
- if (isServerSide && headers.chunkedTransferEncoding) {
+ if (isServerSide) {
var response = this;
- List acceptEncodings =
- response._httpRequest.headers[HttpHeaders.ACCEPT_ENCODING];
- List contentEncoding = headers[HttpHeaders.CONTENT_ENCODING];
- if (acceptEncodings != null &&
- acceptEncodings
- .expand((list) => list.split(","))
- .any((encoding) => encoding.trim().toLowerCase() == "gzip") &&
- contentEncoding == null) {
- headers.set(HttpHeaders.CONTENT_ENCODING, "gzip");
- _asGZip = true;
+ if (headers.chunkedTransferEncoding) {
+ List acceptEncodings =
+ response._httpRequest.headers[HttpHeaders.ACCEPT_ENCODING];
+ List contentEncoding = headers[HttpHeaders.CONTENT_ENCODING];
+ if (acceptEncodings != null &&
+ acceptEncodings
+ .expand((list) => list.split(","))
+ .any((encoding) => encoding.trim().toLowerCase() == "gzip") &&
+ contentEncoding == null) {
+ headers.set(HttpHeaders.CONTENT_ENCODING, "gzip");
+ _asGZip = true;
+ }
+ }
+ if (drainRequest && !response._httpRequest._incoming.hasSubscriber) {
+ return response._httpRequest.drain()
+ // TODO(ajohnsen): Timeout on drain?
+ .catchError((_) {}) // Ignore errors.
+ .then((_) => _writeHeader());
}
}
- _writeHeader();
+ return new Future.sync(_writeHeader);
}
Future _addStream(Stream<List<int>> stream) {
- _writeHeaders();
- int contentLength = headers.contentLength;
- if (_ignoreBody) {
- stream.fold(null, (x, y) {}).catchError((_) {});
- return _headersSink.close();
- }
- stream = stream.transform(new _BufferTransformer());
- if (headers.chunkedTransferEncoding) {
- if (_asGZip) {
- stream = stream.transform(new ZLibDeflater(gzip: true, level: 6));
- }
- stream = stream.transform(new _ChunkedTransformer());
- } else if (contentLength >= 0) {
- stream = stream.transform(new _ContentLengthValidator(contentLength));
- }
- return _headersSink.addStream(stream);
+ return _writeHeaders()
+ .then((_) {
+ int contentLength = headers.contentLength;
+ if (_ignoreBody) {
+ stream.drain().catchError((_) {});
+ return _headersSink.close();
+ }
+ stream = stream.transform(new _BufferTransformer());
+ if (headers.chunkedTransferEncoding) {
+ if (_asGZip) {
+ stream = stream.transform(new ZLibDeflater(gzip: true, level: 6));
+ }
+ stream = stream.transform(new _ChunkedTransformer());
+ } else if (contentLength >= 0) {
+ stream = stream.transform(
+ new _ContentLengthValidator(contentLength));
+ }
+ return _headersSink.addStream(stream);
+ });
}
Future _close() {
@@ -528,8 +543,7 @@ abstract class _HttpOutboundMessage<T> implements IOSink {
" than 0: ${headers.contentLength}."));
}
}
- _writeHeaders();
- return _headersSink.close();
+ return _writeHeaders().then((_) => _headersSink.close());
}
void _writeHeader(); // TODO(ajohnsen): Better name.
@@ -556,6 +570,7 @@ class _HttpOutboundConsumer implements StreamConsumer {
if (_controller != null) return;
_controller = new StreamController(onPause: () => _subscription.pause(),
onResume: () => _subscription.resume(),
+ onListen: () => _subscription.resume(),
onCancel: _cancel);
_outbound._addStream(_controller.stream)
.then((_) {
@@ -608,6 +623,8 @@ class _HttpOutboundConsumer implements StreamConsumer {
_done(error);
},
cancelOnError: true);
+ // Pause the first request.
+ if (_controller == null) _subscription.pause();
_ensureController();
return _completer.future;
}
@@ -682,8 +699,8 @@ class _HttpResponse extends _HttpOutboundMessage<HttpResponse>
Future<Socket> detachSocket() {
if (_headersWritten) throw new StateError("Headers already sent");
- _writeHeaders();
var future = _httpRequest._httpConnection.detachSocket();
+ _writeHeaders(drainRequest: false).then((_) => close());
// Close connection so the socket is 'free'.
close();
done.catchError((_) {
@@ -878,11 +895,11 @@ class _HttpClientRequest extends _HttpOutboundMessage<HttpClientResponse>
if (followRedirects && response.isRedirect) {
if (response.redirects.length < maxRedirects) {
// Redirect and drain response.
- future = response.fold(null, (x, y) {})
+ future = response.drain()
.then((_) => response.redirect());
} else {
// End with exception, too many redirects.
- future = response.fold(null, (x, y) {})
+ future = response.drain()
.then((_) => new Future.error(
new RedirectLimitExceededException(response.redirects)));
}
« no previous file with comments | « sdk/lib/io/http.dart ('k') | sdk/lib/io/http_parser.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698