Index: sdk/lib/io/http_parser.dart |
diff --git a/sdk/lib/io/http_parser.dart b/sdk/lib/io/http_parser.dart |
index 8856c103df260d6941f7fd3030594fb952218345..dee7371687749e0f3e0ae4bb8b65e22eaf254879 100644 |
--- a/sdk/lib/io/http_parser.dart |
+++ b/sdk/lib/io/http_parser.dart |
@@ -96,68 +96,119 @@ class _MessageType { |
static const int RESPONSE = 0; |
} |
-class _HttpDetachedIncoming extends Stream<List<int>> { |
- StreamController<List<int>> controller; |
- final StreamSubscription subscription; |
- List<int> bufferedData; |
- bool paused; |
+/** |
+ * The _HttpDetachedStreamSubscription takes a subscription and some extra data, |
+ * and makes it possible to "inject" the data in from of other data events |
+ * from the subscription. |
+ * |
+ * It does so by overriding pause/resume, so that once the |
+ * _HttpDetachedStreamSubscription is resumed, it'll deliver the data before |
+ * resuming the underlaying subscription. |
+ */ |
+class _HttpDetachedStreamSubscription implements StreamSubscription<List<int>> { |
+ StreamSubscription<List<int>> _subscription; |
+ List<int> _injectData; |
+ bool _isCanceled = false; |
+ int _pauseCount = 1; |
+ Function _userOnData; |
+ bool _scheduled = false; |
+ |
+ _HttpDetachedStreamSubscription(this._subscription, |
+ this._injectData, |
+ this._userOnData); |
+ |
+ bool get isPaused => _subscription.isPaused; |
+ |
+ Future asFuture([futureValue]) => _subscription.asFuture(futureValue); |
+ |
+ Future cancel() { |
+ _isCanceled = true; |
+ _injectData = null; |
+ return _subscription.cancel(); |
+ } |
- Completer resumeCompleter; |
+ void onData(void handleData(List<int> data)) { |
+ _userOnData = handleData; |
+ _subscription.onData(handleData); |
+ } |
- _HttpDetachedIncoming(this.subscription, this.bufferedData) { |
- controller = new StreamController<List<int>>( |
- sync: true, |
- onListen: resume, |
- onPause: pause, |
- onResume: resume, |
- onCancel: () => subscription.cancel()); |
- if (subscription == null) { |
- // Socket was already closed. |
- if (bufferedData != null) controller.add(bufferedData); |
- controller.close(); |
- } else { |
- pause(); |
- subscription |
- ..resume() |
- ..onData(controller.add) |
- ..onDone(controller.close) |
- ..onError(controller.addError); |
- } |
+ void onDone(void handleDone()) { |
+ _subscription.onDone(handleDone); |
} |
- StreamSubscription<List<int>> listen(void onData(List<int> event), |
- {Function onError, |
- void onDone(), |
- bool cancelOnError}) { |
- return controller.stream.listen( |
- onData, |
- onError: onError, |
- onDone: onDone, |
- cancelOnError: cancelOnError); |
+ void onError(Function handleError) { |
+ _subscription.onError(handleError); |
} |
- void resume() { |
- paused = false; |
- if (bufferedData != null) { |
- var data = bufferedData; |
- bufferedData = null; |
- controller.add(data); |
- // If the consumer pauses again after the carry-over data, we'll not |
- // continue our subscriber until the next resume. |
- if (paused) return; |
+ void pause([Future resumeSignal]) { |
+ if (_injectData == null) { |
+ _subscription.pause(resumeSignal); |
+ } else { |
+ _pauseCount++; |
+ if (resumeSignal != null) { |
+ resumeSignal.whenComplete(resume); |
+ } |
} |
- if (resumeCompleter != null) { |
- resumeCompleter.complete(); |
- resumeCompleter = null; |
+ } |
+ |
+ void resume() { |
+ if (_injectData == null) { |
+ _subscription.resume(); |
+ } else { |
+ _pauseCount--; |
+ _maybeScheduleData(); |
} |
} |
- void pause() { |
- paused = true; |
- if (resumeCompleter == null) { |
- resumeCompleter = new Completer(); |
- subscription.pause(resumeCompleter.future); |
+ void _maybeScheduleData() { |
+ if (_scheduled) return; |
+ if (_pauseCount != 0) return; |
+ _scheduled = true; |
+ scheduleMicrotask(() { |
+ _scheduled = false; |
+ if (_pauseCount > 0 || _isCanceled) return; |
+ var data = _injectData; |
+ _injectData = null; |
+ // To ensure that 'subscription.isPaused' is false, we resume the |
+ // subscription here. This is fine as potential events are delayed. |
+ _subscription.resume(); |
+ if (_userOnData != null) { |
+ _userOnData(data); |
+ } |
+ }); |
+ } |
+} |
+ |
+ |
+class _HttpDetachedIncoming extends Stream<List<int>> { |
+ final StreamSubscription subscription; |
+ final List<int> bufferedData; |
+ |
+ _HttpDetachedIncoming(this.subscription, this.bufferedData); |
+ |
+ StreamSubscription<List<int>> listen(void onData(List<int> event), |
+ {Function onError, |
+ void onDone(), |
+ bool cancelOnError}) { |
+ if (subscription != null) { |
+ subscription |
+ ..onData(onData) |
+ ..onError(onError) |
+ ..onDone(onDone); |
+ if (bufferedData == null) { |
+ return subscription..resume(); |
+ } |
+ return new _HttpDetachedStreamSubscription(subscription, |
+ bufferedData, |
+ onData) |
+ ..resume(); |
+ } else { |
+ return new Stream.fromIterable(bufferedData) |
+ .listen(onData, |
+ onError: onError, |
+ onDone: onDone, |
+ cancelOnError: cancelOnError); |
} |
} |
} |