Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(292)

Side by Side Diff: sdk/lib/io/http_parser.dart

Issue 141553014: Optmize detached socket to not contain a extra stream controller/subscription, but simly forward ca… (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Created 6 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « no previous file | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.io; 5 part of dart.io;
6 6
7 // Global constants. 7 // Global constants.
8 class _Const { 8 class _Const {
9 // Bytes for "HTTP". 9 // Bytes for "HTTP".
10 static const HTTP = const [72, 84, 84, 80]; 10 static const HTTP = const [72, 84, 84, 80];
(...skipping 78 matching lines...) Expand 10 before | Expand all | Expand 10 after
89 static const int HTTP11 = 2; 89 static const int HTTP11 = 2;
90 } 90 }
91 91
92 // States of the HTTP parser state machine. 92 // States of the HTTP parser state machine.
93 class _MessageType { 93 class _MessageType {
94 static const int UNDETERMINED = 0; 94 static const int UNDETERMINED = 0;
95 static const int REQUEST = 1; 95 static const int REQUEST = 1;
96 static const int RESPONSE = 0; 96 static const int RESPONSE = 0;
97 } 97 }
98 98
99 class _HttpDetachedIncoming extends Stream<List<int>> {
100 StreamController<List<int>> controller;
101 final StreamSubscription subscription;
102 99
103 List<int> bufferedData; 100 /**
104 bool paused; 101 * The _HttpDetachedStreamSubscription takes a subscription and some extra data,
102 * and makes it possible to "inject" the data in from of other data events
103 * from the subscription.
104 *
105 * It does so by overriding pause/resume, so that once the
106 * _HttpDetachedStreamSubscription is resumed, it'll deliver the data before
107 * resuming the underlaying subscription.
108 */
109 class _HttpDetachedStreamSubscription implements StreamSubscription<List<int>> {
110 StreamSubscription<List<int>> _subscription;
111 List<int> _injectData;
112 bool _isCanceled = false;
113 int _pauseCount = 1;
114 Function _userOnData;
115 bool _scheduled = false;
105 116
106 Completer resumeCompleter; 117 _HttpDetachedStreamSubscription(this._subscription,
118 this._injectData,
119 this._userOnData);
107 120
108 _HttpDetachedIncoming(this.subscription, this.bufferedData) { 121 bool get isPaused => _subscription.isPaused;
109 controller = new StreamController<List<int>>( 122
110 sync: true, 123 Future asFuture([futureValue]) => _subscription.asFuture(futureValue);
111 onListen: resume, 124
112 onPause: pause, 125 Future cancel() {
113 onResume: resume, 126 _isCanceled = true;
114 onCancel: () => subscription.cancel()); 127 _injectData = null;
115 if (subscription == null) { 128 return _subscription.cancel();
116 // Socket was already closed. 129 }
117 if (bufferedData != null) controller.add(bufferedData); 130
118 controller.close(); 131 void onData(void handleData(List<int> data)) {
132 _userOnData = handleData;
133 _subscription.onData(handleData);
134 }
135
136 void onDone(void handleDone()) {
137 _subscription.onDone(handleDone);
138 }
139
140 void onError(Function handleError) {
141 _subscription.onError(handleError);
142 }
143
144 void pause([Future resumeSignal]) {
145 if (_injectData == null) {
146 _subscription.pause(resumeSignal);
119 } else { 147 } else {
120 pause(); 148 _pauseCount++;
121 subscription 149 if (resumeSignal != null) {
122 ..resume() 150 resumeSignal.whenComplete(resume);
123 ..onData(controller.add) 151 }
124 ..onDone(controller.close)
125 ..onError(controller.addError);
126 } 152 }
127 } 153 }
128 154
155 void resume() {
156 if (_injectData == null) {
157 _subscription.resume();
158 } else {
159 _pauseCount--;
160 _maybeScheduleData();
161 }
162 }
163
164 void _maybeScheduleData() {
165 if (_scheduled) return;
166 if (_pauseCount != 0) return;
167 _scheduled = true;
168 scheduleMicrotask(() {
169 _scheduled = false;
170 if (_pauseCount > 0 || _isCanceled) return;
171 var data = _injectData;
172 _injectData = null;
173 // To ensure that 'subscription.isPaused' is false, we resume the
174 // subscription here. This is fine as potential events are delayed.
175 _subscription.resume();
176 if (_userOnData != null) {
177 _userOnData(data);
178 }
179 });
180 }
181 }
182
183
184 class _HttpDetachedIncoming extends Stream<List<int>> {
185 final StreamSubscription subscription;
186 final List<int> bufferedData;
187
188 _HttpDetachedIncoming(this.subscription, this.bufferedData);
189
129 StreamSubscription<List<int>> listen(void onData(List<int> event), 190 StreamSubscription<List<int>> listen(void onData(List<int> event),
130 {Function onError, 191 {Function onError,
131 void onDone(), 192 void onDone(),
132 bool cancelOnError}) { 193 bool cancelOnError}) {
133 return controller.stream.listen( 194 if (subscription != null) {
134 onData, 195 subscription
135 onError: onError, 196 ..onData(onData)
136 onDone: onDone, 197 ..onError(onError)
137 cancelOnError: cancelOnError); 198 ..onDone(onDone);
138 } 199 if (bufferedData == null) {
139 200 return subscription..resume();
140 void resume() { 201 }
141 paused = false; 202 return new _HttpDetachedStreamSubscription(subscription,
142 if (bufferedData != null) { 203 bufferedData,
143 var data = bufferedData; 204 onData)
144 bufferedData = null; 205 ..resume();
145 controller.add(data); 206 } else {
146 // If the consumer pauses again after the carry-over data, we'll not 207 return new Stream.fromIterable(bufferedData)
147 // continue our subscriber until the next resume. 208 .listen(onData,
148 if (paused) return; 209 onError: onError,
149 } 210 onDone: onDone,
150 if (resumeCompleter != null) { 211 cancelOnError: cancelOnError);
151 resumeCompleter.complete();
152 resumeCompleter = null;
153 }
154 }
155
156 void pause() {
157 paused = true;
158 if (resumeCompleter == null) {
159 resumeCompleter = new Completer();
160 subscription.pause(resumeCompleter.future);
161 } 212 }
162 } 213 }
163 } 214 }
164 215
165 216
166 /** 217 /**
167 * HTTP parser which parses the data stream given to [consume]. 218 * HTTP parser which parses the data stream given to [consume].
168 * 219 *
169 * If an HTTP parser error occours, the parser will signal an error to either 220 * If an HTTP parser error occours, the parser will signal an error to either
170 * the current _HttpIncoming or the _parser itself. 221 * the current _HttpIncoming or the _parser itself.
(...skipping 814 matching lines...) Expand 10 before | Expand all | Expand 10 after
985 } 1036 }
986 } 1037 }
987 1038
988 void _reportError(error, [stackTrace]) { 1039 void _reportError(error, [stackTrace]) {
989 if (_socketSubscription != null) _socketSubscription.cancel(); 1040 if (_socketSubscription != null) _socketSubscription.cancel();
990 _state = _State.FAILURE; 1041 _state = _State.FAILURE;
991 _controller.addError(error, stackTrace); 1042 _controller.addError(error, stackTrace);
992 _controller.close(); 1043 _controller.close();
993 } 1044 }
994 } 1045 }
OLDNEW
« no previous file with comments | « no previous file | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698