| Index: sdk/lib/io/http_impl.dart
|
| diff --git a/sdk/lib/io/http_impl.dart b/sdk/lib/io/http_impl.dart
|
| index dec363306776c1f0c380a365b476abd655bd748f..7e16e5fe4662e3a908d7faf1c39a9227a049bb89 100644
|
| --- a/sdk/lib/io/http_impl.dart
|
| +++ b/sdk/lib/io/http_impl.dart
|
| @@ -23,6 +23,8 @@ class _HttpIncoming extends Stream<List<int>> {
|
| String method;
|
| Uri uri;
|
|
|
| + bool hasSubscriber = false;
|
| +
|
| // The transfer length if the length of the message body as it
|
| // appears in the message (RFC 2616 section 4.4). This can be -1 if
|
| // the length of the massage body is not known due to transfer
|
| @@ -38,6 +40,7 @@ class _HttpIncoming extends Stream<List<int>> {
|
| {void onError(error),
|
| void onDone(),
|
| bool cancelOnError}) {
|
| + hasSubscriber = true;
|
| return _stream.listen(onData,
|
| onError: onError,
|
| onDone: onDone,
|
| @@ -49,6 +52,7 @@ class _HttpIncoming extends Stream<List<int>> {
|
|
|
| void close(bool closing) {
|
| fullBodyRead = true;
|
| + hasSubscriber = true;
|
| _dataCompleter.complete(closing);
|
| }
|
| }
|
| @@ -274,7 +278,7 @@ class _HttpClientResponse
|
| Future<HttpClientResponse> _authenticate(bool proxyAuth) {
|
| Future<HttpClientResponse> retry() {
|
| // Drain body and retry.
|
| - return fold(null, (x, y) {}).then((_) {
|
| + return drain().then((_) {
|
| return _httpClient._openUrlFromRequest(_httpRequest.method,
|
| _httpRequest.uri,
|
| _httpRequest)
|
| @@ -470,45 +474,56 @@ abstract class _HttpOutboundMessage<T> implements IOSink {
|
|
|
| Future<T> get done => _dataSink.done;
|
|
|
| - void _writeHeaders() {
|
| - if (_headersWritten) return;
|
| + Future _writeHeaders({drainRequest: true}) {
|
| + if (_headersWritten) return new Future.value();
|
| _headersWritten = true;
|
| headers._synchronize(); // Be sure the 'chunked' option is updated.
|
| bool isServerSide = this is _HttpResponse;
|
| - if (isServerSide && headers.chunkedTransferEncoding) {
|
| + if (isServerSide) {
|
| var response = this;
|
| - List acceptEncodings =
|
| - response._httpRequest.headers[HttpHeaders.ACCEPT_ENCODING];
|
| - List contentEncoding = headers[HttpHeaders.CONTENT_ENCODING];
|
| - if (acceptEncodings != null &&
|
| - acceptEncodings
|
| - .expand((list) => list.split(","))
|
| - .any((encoding) => encoding.trim().toLowerCase() == "gzip") &&
|
| - contentEncoding == null) {
|
| - headers.set(HttpHeaders.CONTENT_ENCODING, "gzip");
|
| - _asGZip = true;
|
| + if (headers.chunkedTransferEncoding) {
|
| + List acceptEncodings =
|
| + response._httpRequest.headers[HttpHeaders.ACCEPT_ENCODING];
|
| + List contentEncoding = headers[HttpHeaders.CONTENT_ENCODING];
|
| + if (acceptEncodings != null &&
|
| + acceptEncodings
|
| + .expand((list) => list.split(","))
|
| + .any((encoding) => encoding.trim().toLowerCase() == "gzip") &&
|
| + contentEncoding == null) {
|
| + headers.set(HttpHeaders.CONTENT_ENCODING, "gzip");
|
| + _asGZip = true;
|
| + }
|
| + }
|
| + if (drainRequest && !response._httpRequest._incoming.hasSubscriber) {
|
| + return response._httpRequest.drain()
|
| + // TODO(ajohnsen): Timeout on drain?
|
| + .catchError((_) {}) // Ignore errors.
|
| + .then((_) => _writeHeader());
|
| }
|
| }
|
| - _writeHeader();
|
| + return new Future.sync(_writeHeader);
|
| }
|
|
|
| Future _addStream(Stream<List<int>> stream) {
|
| - _writeHeaders();
|
| - int contentLength = headers.contentLength;
|
| - if (_ignoreBody) {
|
| - stream.fold(null, (x, y) {}).catchError((_) {});
|
| - return _headersSink.close();
|
| - }
|
| - stream = stream.transform(new _BufferTransformer());
|
| - if (headers.chunkedTransferEncoding) {
|
| - if (_asGZip) {
|
| - stream = stream.transform(new ZLibDeflater(gzip: true, level: 6));
|
| - }
|
| - stream = stream.transform(new _ChunkedTransformer());
|
| - } else if (contentLength >= 0) {
|
| - stream = stream.transform(new _ContentLengthValidator(contentLength));
|
| - }
|
| - return _headersSink.addStream(stream);
|
| + return _writeHeaders()
|
| + .then((_) {
|
| + int contentLength = headers.contentLength;
|
| + if (_ignoreBody) {
|
| + stream.drain().catchError((_) {});
|
| + return _headersSink.close();
|
| + }
|
| + stream = stream.transform(new _BufferTransformer());
|
| + if (headers.chunkedTransferEncoding) {
|
| + if (_asGZip) {
|
| + stream = stream.transform(new ZLibDeflater(gzip: true, level: 6));
|
| + }
|
| + stream = stream.transform(new _ChunkedTransformer());
|
| + } else if (contentLength >= 0) {
|
| + stream = stream.transform(
|
| + new _ContentLengthValidator(contentLength));
|
| + }
|
| + return _headersSink.addStream(stream);
|
| + });
|
| }
|
|
|
| Future _close() {
|
| @@ -528,8 +543,7 @@ abstract class _HttpOutboundMessage<T> implements IOSink {
|
| " than 0: ${headers.contentLength}."));
|
| }
|
| }
|
| - _writeHeaders();
|
| - return _headersSink.close();
|
| + return _writeHeaders().then((_) => _headersSink.close());
|
| }
|
|
|
| void _writeHeader(); // TODO(ajohnsen): Better name.
|
| @@ -556,6 +570,7 @@ class _HttpOutboundConsumer implements StreamConsumer {
|
| if (_controller != null) return;
|
| _controller = new StreamController(onPause: () => _subscription.pause(),
|
| onResume: () => _subscription.resume(),
|
| + onListen: () => _subscription.resume(),
|
| onCancel: _cancel);
|
| _outbound._addStream(_controller.stream)
|
| .then((_) {
|
| @@ -608,6 +623,8 @@ class _HttpOutboundConsumer implements StreamConsumer {
|
| _done(error);
|
| },
|
| cancelOnError: true);
|
| + // Pause the first request.
|
| + if (_controller == null) _subscription.pause();
|
| _ensureController();
|
| return _completer.future;
|
| }
|
| @@ -682,8 +699,8 @@ class _HttpResponse extends _HttpOutboundMessage<HttpResponse>
|
|
|
| Future<Socket> detachSocket() {
|
| if (_headersWritten) throw new StateError("Headers already sent");
|
| - _writeHeaders();
|
| var future = _httpRequest._httpConnection.detachSocket();
|
| + _writeHeaders(drainRequest: false).then((_) => close());
|
| // Close connection so the socket is 'free'.
|
| close();
|
| done.catchError((_) {
|
| @@ -878,11 +895,11 @@ class _HttpClientRequest extends _HttpOutboundMessage<HttpClientResponse>
|
| if (followRedirects && response.isRedirect) {
|
| if (response.redirects.length < maxRedirects) {
|
| // Redirect and drain response.
|
| - future = response.fold(null, (x, y) {})
|
| + future = response.drain()
|
| .then((_) => response.redirect());
|
| } else {
|
| // End with exception, too many redirects.
|
| - future = response.fold(null, (x, y) {})
|
| + future = response.drain()
|
| .then((_) => new Future.error(
|
| new RedirectLimitExceededException(response.redirects)));
|
| }
|
|
|