Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(343)

Side by Side Diff: sdk/lib/io/http_parser.dart

Issue 14196003: Change StreamController constructor. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Fix some bugs. Created 7 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.io; 5 part of dart.io;
6 6
7 // Global constants. 7 // Global constants.
8 class _Const { 8 class _Const {
9 // Bytes for "HTTP". 9 // Bytes for "HTTP".
10 static const HTTP = const [72, 84, 84, 80]; 10 static const HTTP = const [72, 84, 84, 80];
(...skipping 88 matching lines...) Expand 10 before | Expand all | Expand 10 after
99 99
100 List<int> carryOverData; 100 List<int> carryOverData;
101 bool paused; 101 bool paused;
102 102
103 Completer resumeCompleter; 103 Completer resumeCompleter;
104 104
105 _HttpDetachedIncoming(StreamSubscription this.subscription, 105 _HttpDetachedIncoming(StreamSubscription this.subscription,
106 List<int> this.carryOverData, 106 List<int> this.carryOverData,
107 Completer oldResumeCompleter) { 107 Completer oldResumeCompleter) {
108 controller = new StreamController<List<int>>( 108 controller = new StreamController<List<int>>(
109 onSubscriptionStateChange: onSubscriptionStateChange, 109 onListen: resume,
110 onPauseStateChange: onPauseStateChange); 110 onPause: pause,
111 onResume: resume,
112 onCancel: () => subscription.cancel());
111 if (subscription == null) { 113 if (subscription == null) {
112 // Socket was already closed. 114 // Socket was already closed.
113 if (carryOverData != null) controller.add(carryOverData); 115 if (carryOverData != null) controller.add(carryOverData);
114 controller.close(); 116 controller.close();
115 } else { 117 } else {
116 pause(); 118 pause();
117 if (oldResumeCompleter != null) oldResumeCompleter.complete(); 119 if (oldResumeCompleter != null) oldResumeCompleter.complete();
118 subscription.resume(); 120 subscription.resume();
119 subscription.onData(controller.add); 121 subscription.onData(controller.add);
120 subscription.onDone(controller.close); 122 subscription.onDone(controller.close);
(...skipping 28 matching lines...) Expand all
149 } 151 }
150 } 152 }
151 153
152 void pause() { 154 void pause() {
153 paused = true; 155 paused = true;
154 if (resumeCompleter == null) { 156 if (resumeCompleter == null) {
155 resumeCompleter = new Completer(); 157 resumeCompleter = new Completer();
156 subscription.pause(resumeCompleter.future); 158 subscription.pause(resumeCompleter.future);
157 } 159 }
158 } 160 }
159
160 void onPauseStateChange() {
161 if (controller.isPaused) {
162 pause();
163 } else {
164 resume();
165 }
166 }
167
168 void onSubscriptionStateChange() {
169 if (controller.hasListener) {
170 resume();
171 } else {
172 subscription.cancel();
173 }
174 }
175 } 161 }
176 162
177 163
178 /** 164 /**
179 * HTTP parser which parses the data stream given to [consume]. 165 * HTTP parser which parses the data stream given to [consume].
180 * 166 *
181 * If an HTTP parser error occours, the parser will signal an error to either 167 * If an HTTP parser error occours, the parser will signal an error to either
182 * the current _HttpIncoming or the _parser itself. 168 * the current _HttpIncoming or the _parser itself.
183 * 169 *
184 * The connection upgrades (e.g. switching from HTTP/1.1 to the 170 * The connection upgrades (e.g. switching from HTTP/1.1 to the
(...skipping 17 matching lines...) Expand all
202 factory _HttpParser.requestParser() { 188 factory _HttpParser.requestParser() {
203 return new _HttpParser._(true); 189 return new _HttpParser._(true);
204 } 190 }
205 191
206 factory _HttpParser.responseParser() { 192 factory _HttpParser.responseParser() {
207 return new _HttpParser._(false); 193 return new _HttpParser._(false);
208 } 194 }
209 195
210 _HttpParser._(this._requestParser) { 196 _HttpParser._(this._requestParser) {
211 _controller = new StreamController<_HttpIncoming>( 197 _controller = new StreamController<_HttpIncoming>(
212 onSubscriptionStateChange: _updateParsePauseState, 198 onListen: _updateParsePauseState,
213 onPauseStateChange: _updateParsePauseState); 199 onPause: _updateParsePauseState,
200 onResume: _updateParsePauseState,
201 onCancel: _updateParsePauseState);
214 _reset(); 202 _reset();
215 } 203 }
216 204
217 205
218 StreamSubscription<_HttpIncoming> listen(void onData(_HttpIncoming event), 206 StreamSubscription<_HttpIncoming> listen(void onData(_HttpIncoming event),
219 {void onError(AsyncError error), 207 {void onError(AsyncError error),
220 void onDone(), 208 void onDone(),
221 bool unsubscribeOnError}) { 209 bool unsubscribeOnError}) {
222 return _controller.stream.listen(onData, 210 return _controller.stream.listen(onData,
223 onError: onError, 211 onError: onError,
(...skipping 651 matching lines...) Expand 10 before | Expand all | Expand 10 after
875 return byte - 0x61 + 10; // a - f 863 return byte - 0x61 + 10; // a - f
876 } else { 864 } else {
877 throw new HttpParserException("Failed to parse HTTP"); 865 throw new HttpParserException("Failed to parse HTTP");
878 } 866 }
879 } 867 }
880 868
881 void _createIncoming(int transferLength) { 869 void _createIncoming(int transferLength) {
882 assert(_incoming == null); 870 assert(_incoming == null);
883 assert(_bodyController == null); 871 assert(_bodyController == null);
884 _bodyController = new StreamController<List<int>>( 872 _bodyController = new StreamController<List<int>>(
885 onSubscriptionStateChange: _bodySubscriptionStateChange, 873 onListen: _bodySubscriptionStateChange,
886 onPauseStateChange: _updateParsePauseState); 874 onPause: _updateParsePauseState,
875 onResume: _updateParsePauseState,
876 onCancel: _bodySubscriptionStateChange);
887 _incoming = new _HttpIncoming( 877 _incoming = new _HttpIncoming(
888 _headers, transferLength, _bodyController.stream); 878 _headers, transferLength, _bodyController.stream);
889 _pauseParsing(); // Needed to handle detaching - don't start on the body! 879 _pauseParsing(); // Needed to handle detaching - don't start on the body!
890 } 880 }
891 881
892 void _closeIncoming() { 882 void _closeIncoming() {
893 // Ignore multiple close (can happend in re-entrance). 883 // Ignore multiple close (can happend in re-entrance).
894 if (_incoming == null) return; 884 if (_incoming == null) return;
895 var tmp = _incoming; 885 var tmp = _incoming;
896 _incoming = null; 886 _incoming = null;
(...skipping 82 matching lines...) Expand 10 before | Expand all | Expand 10 after
979 StreamController<_HttpIncoming> _controller; 969 StreamController<_HttpIncoming> _controller;
980 StreamController<List<int>> _bodyController; 970 StreamController<List<int>> _bodyController;
981 } 971 }
982 972
983 973
984 class HttpParserException implements Exception { 974 class HttpParserException implements Exception {
985 const HttpParserException([String this.message = ""]); 975 const HttpParserException([String this.message = ""]);
986 String toString() => "HttpParserException: $message"; 976 String toString() => "HttpParserException: $message";
987 final String message; 977 final String message;
988 } 978 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698