| Index: sdk/lib/io/http_parser.dart
|
| diff --git a/sdk/lib/io/http_parser.dart b/sdk/lib/io/http_parser.dart
|
| index 8856c103df260d6941f7fd3030594fb952218345..dee7371687749e0f3e0ae4bb8b65e22eaf254879 100644
|
| --- a/sdk/lib/io/http_parser.dart
|
| +++ b/sdk/lib/io/http_parser.dart
|
| @@ -96,68 +96,119 @@ class _MessageType {
|
| static const int RESPONSE = 0;
|
| }
|
|
|
| -class _HttpDetachedIncoming extends Stream<List<int>> {
|
| - StreamController<List<int>> controller;
|
| - final StreamSubscription subscription;
|
|
|
| - List<int> bufferedData;
|
| - bool paused;
|
| +/**
|
| + * The _HttpDetachedStreamSubscription takes a subscription and some extra data,
|
| + * and makes it possible to "inject" the data in from of other data events
|
| + * from the subscription.
|
| + *
|
| + * It does so by overriding pause/resume, so that once the
|
| + * _HttpDetachedStreamSubscription is resumed, it'll deliver the data before
|
| + * resuming the underlaying subscription.
|
| + */
|
| +class _HttpDetachedStreamSubscription implements StreamSubscription<List<int>> {
|
| + StreamSubscription<List<int>> _subscription;
|
| + List<int> _injectData;
|
| + bool _isCanceled = false;
|
| + int _pauseCount = 1;
|
| + Function _userOnData;
|
| + bool _scheduled = false;
|
| +
|
| + _HttpDetachedStreamSubscription(this._subscription,
|
| + this._injectData,
|
| + this._userOnData);
|
| +
|
| + bool get isPaused => _subscription.isPaused;
|
| +
|
| + Future asFuture([futureValue]) => _subscription.asFuture(futureValue);
|
| +
|
| + Future cancel() {
|
| + _isCanceled = true;
|
| + _injectData = null;
|
| + return _subscription.cancel();
|
| + }
|
|
|
| - Completer resumeCompleter;
|
| + void onData(void handleData(List<int> data)) {
|
| + _userOnData = handleData;
|
| + _subscription.onData(handleData);
|
| + }
|
|
|
| - _HttpDetachedIncoming(this.subscription, this.bufferedData) {
|
| - controller = new StreamController<List<int>>(
|
| - sync: true,
|
| - onListen: resume,
|
| - onPause: pause,
|
| - onResume: resume,
|
| - onCancel: () => subscription.cancel());
|
| - if (subscription == null) {
|
| - // Socket was already closed.
|
| - if (bufferedData != null) controller.add(bufferedData);
|
| - controller.close();
|
| - } else {
|
| - pause();
|
| - subscription
|
| - ..resume()
|
| - ..onData(controller.add)
|
| - ..onDone(controller.close)
|
| - ..onError(controller.addError);
|
| - }
|
| + void onDone(void handleDone()) {
|
| + _subscription.onDone(handleDone);
|
| }
|
|
|
| - StreamSubscription<List<int>> listen(void onData(List<int> event),
|
| - {Function onError,
|
| - void onDone(),
|
| - bool cancelOnError}) {
|
| - return controller.stream.listen(
|
| - onData,
|
| - onError: onError,
|
| - onDone: onDone,
|
| - cancelOnError: cancelOnError);
|
| + void onError(Function handleError) {
|
| + _subscription.onError(handleError);
|
| }
|
|
|
| - void resume() {
|
| - paused = false;
|
| - if (bufferedData != null) {
|
| - var data = bufferedData;
|
| - bufferedData = null;
|
| - controller.add(data);
|
| - // If the consumer pauses again after the carry-over data, we'll not
|
| - // continue our subscriber until the next resume.
|
| - if (paused) return;
|
| + void pause([Future resumeSignal]) {
|
| + if (_injectData == null) {
|
| + _subscription.pause(resumeSignal);
|
| + } else {
|
| + _pauseCount++;
|
| + if (resumeSignal != null) {
|
| + resumeSignal.whenComplete(resume);
|
| + }
|
| }
|
| - if (resumeCompleter != null) {
|
| - resumeCompleter.complete();
|
| - resumeCompleter = null;
|
| + }
|
| +
|
| + void resume() {
|
| + if (_injectData == null) {
|
| + _subscription.resume();
|
| + } else {
|
| + _pauseCount--;
|
| + _maybeScheduleData();
|
| }
|
| }
|
|
|
| - void pause() {
|
| - paused = true;
|
| - if (resumeCompleter == null) {
|
| - resumeCompleter = new Completer();
|
| - subscription.pause(resumeCompleter.future);
|
| + void _maybeScheduleData() {
|
| + if (_scheduled) return;
|
| + if (_pauseCount != 0) return;
|
| + _scheduled = true;
|
| + scheduleMicrotask(() {
|
| + _scheduled = false;
|
| + if (_pauseCount > 0 || _isCanceled) return;
|
| + var data = _injectData;
|
| + _injectData = null;
|
| + // To ensure that 'subscription.isPaused' is false, we resume the
|
| + // subscription here. This is fine as potential events are delayed.
|
| + _subscription.resume();
|
| + if (_userOnData != null) {
|
| + _userOnData(data);
|
| + }
|
| + });
|
| + }
|
| +}
|
| +
|
| +
|
| +class _HttpDetachedIncoming extends Stream<List<int>> {
|
| + final StreamSubscription subscription;
|
| + final List<int> bufferedData;
|
| +
|
| + _HttpDetachedIncoming(this.subscription, this.bufferedData);
|
| +
|
| + StreamSubscription<List<int>> listen(void onData(List<int> event),
|
| + {Function onError,
|
| + void onDone(),
|
| + bool cancelOnError}) {
|
| + if (subscription != null) {
|
| + subscription
|
| + ..onData(onData)
|
| + ..onError(onError)
|
| + ..onDone(onDone);
|
| + if (bufferedData == null) {
|
| + return subscription..resume();
|
| + }
|
| + return new _HttpDetachedStreamSubscription(subscription,
|
| + bufferedData,
|
| + onData)
|
| + ..resume();
|
| + } else {
|
| + return new Stream.fromIterable(bufferedData)
|
| + .listen(onData,
|
| + onError: onError,
|
| + onDone: onDone,
|
| + cancelOnError: cancelOnError);
|
| }
|
| }
|
| }
|
|
|