Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1161)

Unified Diff: sdk/lib/io/http_parser.dart

Issue 141553014: Optmize detached socket to not contain a extra stream controller/subscription, but simly forward ca… (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Created 6 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « no previous file | no next file » | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: sdk/lib/io/http_parser.dart
diff --git a/sdk/lib/io/http_parser.dart b/sdk/lib/io/http_parser.dart
index 8856c103df260d6941f7fd3030594fb952218345..dee7371687749e0f3e0ae4bb8b65e22eaf254879 100644
--- a/sdk/lib/io/http_parser.dart
+++ b/sdk/lib/io/http_parser.dart
@@ -96,68 +96,119 @@ class _MessageType {
static const int RESPONSE = 0;
}
-class _HttpDetachedIncoming extends Stream<List<int>> {
- StreamController<List<int>> controller;
- final StreamSubscription subscription;
- List<int> bufferedData;
- bool paused;
+/**
+ * The _HttpDetachedStreamSubscription takes a subscription and some extra data,
+ * and makes it possible to "inject" the data in from of other data events
+ * from the subscription.
+ *
+ * It does so by overriding pause/resume, so that once the
+ * _HttpDetachedStreamSubscription is resumed, it'll deliver the data before
+ * resuming the underlaying subscription.
+ */
+class _HttpDetachedStreamSubscription implements StreamSubscription<List<int>> {
+ StreamSubscription<List<int>> _subscription;
+ List<int> _injectData;
+ bool _isCanceled = false;
+ int _pauseCount = 1;
+ Function _userOnData;
+ bool _scheduled = false;
+
+ _HttpDetachedStreamSubscription(this._subscription,
+ this._injectData,
+ this._userOnData);
+
+ bool get isPaused => _subscription.isPaused;
+
+ Future asFuture([futureValue]) => _subscription.asFuture(futureValue);
+
+ Future cancel() {
+ _isCanceled = true;
+ _injectData = null;
+ return _subscription.cancel();
+ }
- Completer resumeCompleter;
+ void onData(void handleData(List<int> data)) {
+ _userOnData = handleData;
+ _subscription.onData(handleData);
+ }
- _HttpDetachedIncoming(this.subscription, this.bufferedData) {
- controller = new StreamController<List<int>>(
- sync: true,
- onListen: resume,
- onPause: pause,
- onResume: resume,
- onCancel: () => subscription.cancel());
- if (subscription == null) {
- // Socket was already closed.
- if (bufferedData != null) controller.add(bufferedData);
- controller.close();
- } else {
- pause();
- subscription
- ..resume()
- ..onData(controller.add)
- ..onDone(controller.close)
- ..onError(controller.addError);
- }
+ void onDone(void handleDone()) {
+ _subscription.onDone(handleDone);
}
- StreamSubscription<List<int>> listen(void onData(List<int> event),
- {Function onError,
- void onDone(),
- bool cancelOnError}) {
- return controller.stream.listen(
- onData,
- onError: onError,
- onDone: onDone,
- cancelOnError: cancelOnError);
+ void onError(Function handleError) {
+ _subscription.onError(handleError);
}
- void resume() {
- paused = false;
- if (bufferedData != null) {
- var data = bufferedData;
- bufferedData = null;
- controller.add(data);
- // If the consumer pauses again after the carry-over data, we'll not
- // continue our subscriber until the next resume.
- if (paused) return;
+ void pause([Future resumeSignal]) {
+ if (_injectData == null) {
+ _subscription.pause(resumeSignal);
+ } else {
+ _pauseCount++;
+ if (resumeSignal != null) {
+ resumeSignal.whenComplete(resume);
+ }
}
- if (resumeCompleter != null) {
- resumeCompleter.complete();
- resumeCompleter = null;
+ }
+
+ void resume() {
+ if (_injectData == null) {
+ _subscription.resume();
+ } else {
+ _pauseCount--;
+ _maybeScheduleData();
}
}
- void pause() {
- paused = true;
- if (resumeCompleter == null) {
- resumeCompleter = new Completer();
- subscription.pause(resumeCompleter.future);
+ void _maybeScheduleData() {
+ if (_scheduled) return;
+ if (_pauseCount != 0) return;
+ _scheduled = true;
+ scheduleMicrotask(() {
+ _scheduled = false;
+ if (_pauseCount > 0 || _isCanceled) return;
+ var data = _injectData;
+ _injectData = null;
+ // To ensure that 'subscription.isPaused' is false, we resume the
+ // subscription here. This is fine as potential events are delayed.
+ _subscription.resume();
+ if (_userOnData != null) {
+ _userOnData(data);
+ }
+ });
+ }
+}
+
+
+class _HttpDetachedIncoming extends Stream<List<int>> {
+ final StreamSubscription subscription;
+ final List<int> bufferedData;
+
+ _HttpDetachedIncoming(this.subscription, this.bufferedData);
+
+ StreamSubscription<List<int>> listen(void onData(List<int> event),
+ {Function onError,
+ void onDone(),
+ bool cancelOnError}) {
+ if (subscription != null) {
+ subscription
+ ..onData(onData)
+ ..onError(onError)
+ ..onDone(onDone);
+ if (bufferedData == null) {
+ return subscription..resume();
+ }
+ return new _HttpDetachedStreamSubscription(subscription,
+ bufferedData,
+ onData)
+ ..resume();
+ } else {
+ return new Stream.fromIterable(bufferedData)
+ .listen(onData,
+ onError: onError,
+ onDone: onDone,
+ cancelOnError: cancelOnError);
}
}
}
« no previous file with comments | « no previous file | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698