Index: sdk/lib/io/http_impl.dart |
diff --git a/sdk/lib/io/http_impl.dart b/sdk/lib/io/http_impl.dart |
index dec363306776c1f0c380a365b476abd655bd748f..7e16e5fe4662e3a908d7faf1c39a9227a049bb89 100644 |
--- a/sdk/lib/io/http_impl.dart |
+++ b/sdk/lib/io/http_impl.dart |
@@ -23,6 +23,8 @@ class _HttpIncoming extends Stream<List<int>> { |
String method; |
Uri uri; |
+ bool hasSubscriber = false; |
+ |
// The transfer length if the length of the message body as it |
// appears in the message (RFC 2616 section 4.4). This can be -1 if |
// the length of the massage body is not known due to transfer |
@@ -38,6 +40,7 @@ class _HttpIncoming extends Stream<List<int>> { |
{void onError(error), |
void onDone(), |
bool cancelOnError}) { |
+ hasSubscriber = true; |
return _stream.listen(onData, |
onError: onError, |
onDone: onDone, |
@@ -49,6 +52,7 @@ class _HttpIncoming extends Stream<List<int>> { |
void close(bool closing) { |
fullBodyRead = true; |
+ hasSubscriber = true; |
_dataCompleter.complete(closing); |
} |
} |
@@ -274,7 +278,7 @@ class _HttpClientResponse |
Future<HttpClientResponse> _authenticate(bool proxyAuth) { |
Future<HttpClientResponse> retry() { |
// Drain body and retry. |
- return fold(null, (x, y) {}).then((_) { |
+ return drain().then((_) { |
return _httpClient._openUrlFromRequest(_httpRequest.method, |
_httpRequest.uri, |
_httpRequest) |
@@ -470,45 +474,56 @@ abstract class _HttpOutboundMessage<T> implements IOSink { |
Future<T> get done => _dataSink.done; |
- void _writeHeaders() { |
- if (_headersWritten) return; |
+ Future _writeHeaders({drainRequest: true}) { |
+ if (_headersWritten) return new Future.value(); |
_headersWritten = true; |
headers._synchronize(); // Be sure the 'chunked' option is updated. |
bool isServerSide = this is _HttpResponse; |
- if (isServerSide && headers.chunkedTransferEncoding) { |
+ if (isServerSide) { |
var response = this; |
- List acceptEncodings = |
- response._httpRequest.headers[HttpHeaders.ACCEPT_ENCODING]; |
- List contentEncoding = headers[HttpHeaders.CONTENT_ENCODING]; |
- if (acceptEncodings != null && |
- acceptEncodings |
- .expand((list) => list.split(",")) |
- .any((encoding) => encoding.trim().toLowerCase() == "gzip") && |
- contentEncoding == null) { |
- headers.set(HttpHeaders.CONTENT_ENCODING, "gzip"); |
- _asGZip = true; |
+ if (headers.chunkedTransferEncoding) { |
+ List acceptEncodings = |
+ response._httpRequest.headers[HttpHeaders.ACCEPT_ENCODING]; |
+ List contentEncoding = headers[HttpHeaders.CONTENT_ENCODING]; |
+ if (acceptEncodings != null && |
+ acceptEncodings |
+ .expand((list) => list.split(",")) |
+ .any((encoding) => encoding.trim().toLowerCase() == "gzip") && |
+ contentEncoding == null) { |
+ headers.set(HttpHeaders.CONTENT_ENCODING, "gzip"); |
+ _asGZip = true; |
+ } |
+ } |
+ if (drainRequest && !response._httpRequest._incoming.hasSubscriber) { |
+ return response._httpRequest.drain() |
+ // TODO(ajohnsen): Timeout on drain? |
+ .catchError((_) {}) // Ignore errors. |
+ .then((_) => _writeHeader()); |
} |
} |
- _writeHeader(); |
+ return new Future.sync(_writeHeader); |
} |
Future _addStream(Stream<List<int>> stream) { |
- _writeHeaders(); |
- int contentLength = headers.contentLength; |
- if (_ignoreBody) { |
- stream.fold(null, (x, y) {}).catchError((_) {}); |
- return _headersSink.close(); |
- } |
- stream = stream.transform(new _BufferTransformer()); |
- if (headers.chunkedTransferEncoding) { |
- if (_asGZip) { |
- stream = stream.transform(new ZLibDeflater(gzip: true, level: 6)); |
- } |
- stream = stream.transform(new _ChunkedTransformer()); |
- } else if (contentLength >= 0) { |
- stream = stream.transform(new _ContentLengthValidator(contentLength)); |
- } |
- return _headersSink.addStream(stream); |
+ return _writeHeaders() |
+ .then((_) { |
+ int contentLength = headers.contentLength; |
+ if (_ignoreBody) { |
+ stream.drain().catchError((_) {}); |
+ return _headersSink.close(); |
+ } |
+ stream = stream.transform(new _BufferTransformer()); |
+ if (headers.chunkedTransferEncoding) { |
+ if (_asGZip) { |
+ stream = stream.transform(new ZLibDeflater(gzip: true, level: 6)); |
+ } |
+ stream = stream.transform(new _ChunkedTransformer()); |
+ } else if (contentLength >= 0) { |
+ stream = stream.transform( |
+ new _ContentLengthValidator(contentLength)); |
+ } |
+ return _headersSink.addStream(stream); |
+ }); |
} |
Future _close() { |
@@ -528,8 +543,7 @@ abstract class _HttpOutboundMessage<T> implements IOSink { |
" than 0: ${headers.contentLength}.")); |
} |
} |
- _writeHeaders(); |
- return _headersSink.close(); |
+ return _writeHeaders().then((_) => _headersSink.close()); |
} |
void _writeHeader(); // TODO(ajohnsen): Better name. |
@@ -556,6 +570,7 @@ class _HttpOutboundConsumer implements StreamConsumer { |
if (_controller != null) return; |
_controller = new StreamController(onPause: () => _subscription.pause(), |
onResume: () => _subscription.resume(), |
+ onListen: () => _subscription.resume(), |
onCancel: _cancel); |
_outbound._addStream(_controller.stream) |
.then((_) { |
@@ -608,6 +623,8 @@ class _HttpOutboundConsumer implements StreamConsumer { |
_done(error); |
}, |
cancelOnError: true); |
+ // Pause the first request. |
+ if (_controller == null) _subscription.pause(); |
_ensureController(); |
return _completer.future; |
} |
@@ -682,8 +699,8 @@ class _HttpResponse extends _HttpOutboundMessage<HttpResponse> |
Future<Socket> detachSocket() { |
if (_headersWritten) throw new StateError("Headers already sent"); |
- _writeHeaders(); |
var future = _httpRequest._httpConnection.detachSocket(); |
+ _writeHeaders(drainRequest: false).then((_) => close()); |
// Close connection so the socket is 'free'. |
close(); |
done.catchError((_) { |
@@ -878,11 +895,11 @@ class _HttpClientRequest extends _HttpOutboundMessage<HttpClientResponse> |
if (followRedirects && response.isRedirect) { |
if (response.redirects.length < maxRedirects) { |
// Redirect and drain response. |
- future = response.fold(null, (x, y) {}) |
+ future = response.drain() |
.then((_) => response.redirect()); |
} else { |
// End with exception, too many redirects. |
- future = response.fold(null, (x, y) {}) |
+ future = response.drain() |
.then((_) => new Future.error( |
new RedirectLimitExceededException(response.redirects))); |
} |