Chromium Code Reviews| Index: sdk/lib/io/http_parser.dart |
| diff --git a/sdk/lib/io/http_parser.dart b/sdk/lib/io/http_parser.dart |
| index 8856c103df260d6941f7fd3030594fb952218345..e6483a8e59600fa6afcb9faf9d99f84af2450a62 100644 |
| --- a/sdk/lib/io/http_parser.dart |
| +++ b/sdk/lib/io/http_parser.dart |
| @@ -96,68 +96,108 @@ class _MessageType { |
| static const int RESPONSE = 0; |
| } |
| -class _HttpDetachedIncoming extends Stream<List<int>> { |
| - StreamController<List<int>> controller; |
| - final StreamSubscription subscription; |
|
Søren Gjesse
2014/02/04 10:17:24
Please add a short description of this subscriptio
Anders Johnsen
2014/02/04 10:24:51
Done.
|
| - List<int> bufferedData; |
| - bool paused; |
| +class _HttpDetachedStreamSubscription implements StreamSubscription<List<int>> { |
| + StreamSubscription<List<int>> _subscription; |
|
Søren Gjesse
2014/02/04 10:17:24
Can we find a better name that _data? It is pretty
Anders Johnsen
2014/02/04 10:24:51
Done.
|
| + List<int> _data; |
| + bool _isCanceled = false; |
| + int _pauseCount = 1; |
| + Function _userOnData; |
| + bool _scheduled = false; |
| - Completer resumeCompleter; |
| + _HttpDetachedStreamSubscription(this._subscription, |
| + this._data, |
| + this._userOnData); |
| - _HttpDetachedIncoming(this.subscription, this.bufferedData) { |
| - controller = new StreamController<List<int>>( |
| - sync: true, |
| - onListen: resume, |
| - onPause: pause, |
| - onResume: resume, |
| - onCancel: () => subscription.cancel()); |
| - if (subscription == null) { |
| - // Socket was already closed. |
| - if (bufferedData != null) controller.add(bufferedData); |
| - controller.close(); |
| - } else { |
| - pause(); |
| - subscription |
| - ..resume() |
| - ..onData(controller.add) |
| - ..onDone(controller.close) |
| - ..onError(controller.addError); |
| - } |
| + bool get isPaused => _subscription.isPaused; |
| + |
| + Future asFuture([futureValue]) => _subscription.asFuture(futureValue); |
| + |
| + Future cancel() { |
| + _isCanceled = true; |
| + _data = null; |
| + return _subscription.cancel(); |
| } |
| - StreamSubscription<List<int>> listen(void onData(List<int> event), |
| - {Function onError, |
| - void onDone(), |
| - bool cancelOnError}) { |
| - return controller.stream.listen( |
| - onData, |
| - onError: onError, |
| - onDone: onDone, |
| - cancelOnError: cancelOnError); |
| + void onData(void handleData(List<int> data)) { |
| + _userOnData = handleData; |
| + _subscription.onData(handleData); |
| } |
| - void resume() { |
| - paused = false; |
| - if (bufferedData != null) { |
| - var data = bufferedData; |
| - bufferedData = null; |
| - controller.add(data); |
| - // If the consumer pauses again after the carry-over data, we'll not |
| - // continue our subscriber until the next resume. |
| - if (paused) return; |
| + void onDone(void handleDone()) { |
| + _subscription.onDone(handleDone); |
| + } |
| + |
| + void onError(Function handleError) { |
| + _subscription.onError(handleError); |
| + } |
| + |
| + void pause([Future resumeSignal]) { |
| + if (_data == null) { |
| + _subscription.pause(resumeSignal); |
| + } else { |
| + _pauseCount++; |
| + if (resumeSignal != null) { |
| + resumeSignal.whenComplete(resume); |
| + } |
| } |
| - if (resumeCompleter != null) { |
| - resumeCompleter.complete(); |
| - resumeCompleter = null; |
| + } |
| + |
| + void resume() { |
| + if (_data == null) { |
| + _subscription.resume(); |
| + } else { |
| + _pauseCount--; |
| + _scheduleData(); |
| } |
| } |
| - void pause() { |
| - paused = true; |
| - if (resumeCompleter == null) { |
| - resumeCompleter = new Completer(); |
| - subscription.pause(resumeCompleter.future); |
| + void _scheduleData() { |
|
Søren Gjesse
2014/02/04 10:17:24
This method does not necessarily schedule the data
Anders Johnsen
2014/02/04 10:24:51
Done.
|
| + if (_scheduled) return; |
| + if (_pauseCount != 0) return; |
| + _scheduled = true; |
| + scheduleMicrotask(() { |
| + _scheduled = false; |
| + if (_pauseCount > 0 || _isCanceled) return; |
| + var data = _data; |
| + _data = null; |
|
Søren Gjesse
2014/02/04 10:17:24
Please add a comment here on why the underlying su
Anders Johnsen
2014/02/04 10:24:51
Done.
|
| + _subscription.resume(); |
| + if (_userOnData != null) { |
| + _userOnData(data); |
| + } |
| + }); |
| + } |
| +} |
| + |
| + |
| +class _HttpDetachedIncoming extends Stream<List<int>> { |
| + final StreamSubscription subscription; |
| + final List<int> bufferedData; |
| + |
| + _HttpDetachedIncoming(this.subscription, this.bufferedData); |
| + |
| + StreamSubscription<List<int>> listen(void onData(List<int> event), |
| + {Function onError, |
| + void onDone(), |
| + bool cancelOnError}) { |
| + if (subscription != null) { |
| + subscription |
| + ..onData(onData) |
| + ..onError(onError) |
| + ..onDone(onDone); |
| + if (bufferedData == null) { |
| + return subscription..resume(); |
| + } |
| + return new _HttpDetachedStreamSubscription(subscription, |
| + bufferedData, |
| + onData) |
| + ..resume(); |
| + } else { |
| + return new Stream.fromIterable(bufferedData) |
| + .listen(onData, |
| + onError: onError, |
| + onDone: onDone, |
| + cancelOnError: cancelOnError); |
| } |
| } |
| } |