Index: sdk/lib/io/http_parser.dart |
diff --git a/sdk/lib/io/http_parser.dart b/sdk/lib/io/http_parser.dart |
index a6662cf87a577b3c256da7eb246dc7300e79c5a0..db6adfcfaecaf60928f9173a55ed404c983aa93d 100644 |
--- a/sdk/lib/io/http_parser.dart |
+++ b/sdk/lib/io/http_parser.dart |
@@ -106,8 +106,10 @@ class _HttpDetachedIncoming extends Stream<List<int>> { |
List<int> this.carryOverData, |
Completer oldResumeCompleter) { |
controller = new StreamController<List<int>>( |
- onSubscriptionStateChange: onSubscriptionStateChange, |
- onPauseStateChange: onPauseStateChange); |
+ onListen: resume, |
+ onPause: pause, |
+ onResume: resume, |
+ onCancel: () => subscription.cancel()); |
if (subscription == null) { |
// Socket was already closed. |
if (carryOverData != null) controller.add(carryOverData); |
@@ -156,22 +158,6 @@ class _HttpDetachedIncoming extends Stream<List<int>> { |
subscription.pause(resumeCompleter.future); |
} |
} |
- |
- void onPauseStateChange() { |
- if (controller.isPaused) { |
- pause(); |
- } else { |
- resume(); |
- } |
- } |
- |
- void onSubscriptionStateChange() { |
- if (controller.hasListener) { |
- resume(); |
- } else { |
- subscription.cancel(); |
- } |
- } |
} |
@@ -209,8 +195,10 @@ class _HttpParser |
_HttpParser._(this._requestParser) { |
_controller = new StreamController<_HttpIncoming>( |
- onSubscriptionStateChange: _updateParsePauseState, |
- onPauseStateChange: _updateParsePauseState); |
+ onListen: _updateParsePauseState, |
+ onPause: _updateParsePauseState, |
+ onResume: _updateParsePauseState, |
+ onCancel: _updateParsePauseState); |
_reset(); |
} |
@@ -878,8 +866,10 @@ class _HttpParser |
assert(_incoming == null); |
assert(_bodyController == null); |
_bodyController = new StreamController<List<int>>( |
- onSubscriptionStateChange: _bodySubscriptionStateChange, |
- onPauseStateChange: _updateParsePauseState); |
+ onListen: _bodySubscriptionStateChange, |
+ onPause: _updateParsePauseState, |
+ onResume: _updateParsePauseState, |
+ onCancel: _bodySubscriptionStateChange); |
_incoming = new _HttpIncoming( |
_headers, transferLength, _bodyController.stream); |
_pauseParsing(); // Needed to handle detaching - don't start on the body! |