Index: pkg/http/lib/src/base_request.dart |
diff --git a/pkg/http/lib/src/base_request.dart b/pkg/http/lib/src/base_request.dart |
index 41f04917d93a17af2686f0410243987b57a6d126..aec4cff9f34b78dfdd9cd924a5b40dbb1bcd2b33 100644 |
--- a/pkg/http/lib/src/base_request.dart |
+++ b/pkg/http/lib/src/base_request.dart |
@@ -9,8 +9,10 @@ import 'dart:io'; |
import 'dart:isolate'; |
import 'dart:uri'; |
+import 'byte_stream.dart'; |
import 'client.dart'; |
import 'streamed_response.dart'; |
+import 'utils.dart'; |
/// The base class for HTTP requests. |
/// |
@@ -83,15 +85,15 @@ abstract class BaseRequest { |
: headers = <String>{}; |
/// Finalizes the HTTP request in preparation for it being sent. This freezes |
- /// all mutable fields and returns an [InputStream] that should emit the body |
- /// of the request. The stream may be closed to indicate a request with no |
- /// body. |
+ /// all mutable fields and returns a single-subscription [ByteStream] that |
+ /// emits the body of the request. |
/// |
- /// The base implementation of this returns null rather than an [InputStream]; |
- /// subclasses are responsible for creating the return value. They should also |
+ /// The base implementation of this returns null rather than a [ByteStream]; |
+ /// subclasses are responsible for creating the return value, which should be |
+ /// single-subscription to ensure that no data is dropped. They should also |
/// freeze any additional mutable fields they add that don't make sense to |
/// change after the request headers are sent. |
- InputStream finalize() { |
+ ByteStream finalize() { |
// TODO(nweiz): freeze headers |
if (finalized) throw new StateError("Can't finalize a finalized Request."); |
_finalized = true; |
@@ -107,18 +109,20 @@ abstract class BaseRequest { |
Future<StreamedResponse> send() { |
var client = new Client(); |
return client.send(this).then((response) { |
- // TODO(nweiz): This makes me sick to my stomach, but it's currently the |
- // best way to listen for the response stream being closed. Kill it with |
- // fire once issue 4202 is fixed. |
- new Timer.repeating(100, (timer) { |
- if (response.stream.closed) { |
- client.close(); |
- timer.cancel(); |
- } |
- }); |
- |
- return response; |
- }).catchError((_) { client.close(); }); |
+ var stream = onDone(response.stream, client.close); |
+ return new StreamedResponse( |
+ new ByteStream(stream), |
+ response.statusCode, |
+ response.contentLength, |
+ request: response.request, |
+ headers: response.headers, |
+ isRedirect: response.isRedirect, |
+ persistentConnection: response.persistentConnection, |
+ reasonPhrase: response.reasonPhrase); |
+ }).catchError((e) { |
+ client.close(); |
+ throw e; |
+ }); |
} |
/// Throws an error if this request has been finalized. |