Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(591)

Side by Side Diff: sdk/lib/io/http_parser.dart

Issue 14196003: Change StreamController constructor. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Address comments and rebase. Created 7 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « sdk/lib/io/http_impl.dart ('k') | sdk/lib/io/secure_server_socket.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.io; 5 part of dart.io;
6 6
7 // Global constants. 7 // Global constants.
8 class _Const { 8 class _Const {
9 // Bytes for "HTTP". 9 // Bytes for "HTTP".
10 static const HTTP = const [72, 84, 84, 80]; 10 static const HTTP = const [72, 84, 84, 80];
(...skipping 88 matching lines...) Expand 10 before | Expand all | Expand 10 after
99 99
100 List<int> carryOverData; 100 List<int> carryOverData;
101 bool paused; 101 bool paused;
102 102
103 Completer resumeCompleter; 103 Completer resumeCompleter;
104 104
105 _HttpDetachedIncoming(StreamSubscription this.subscription, 105 _HttpDetachedIncoming(StreamSubscription this.subscription,
106 List<int> this.carryOverData, 106 List<int> this.carryOverData,
107 Completer oldResumeCompleter) { 107 Completer oldResumeCompleter) {
108 controller = new StreamController<List<int>>( 108 controller = new StreamController<List<int>>(
109 onSubscriptionStateChange: onSubscriptionStateChange, 109 onListen: resume,
110 onPauseStateChange: onPauseStateChange); 110 onPause: pause,
111 onResume: resume,
112 onCancel: () => subscription.cancel());
111 if (subscription == null) { 113 if (subscription == null) {
112 // Socket was already closed. 114 // Socket was already closed.
113 if (carryOverData != null) controller.add(carryOverData); 115 if (carryOverData != null) controller.add(carryOverData);
114 controller.close(); 116 controller.close();
115 } else { 117 } else {
116 pause(); 118 pause();
117 if (oldResumeCompleter != null) oldResumeCompleter.complete(); 119 if (oldResumeCompleter != null) oldResumeCompleter.complete();
118 subscription.resume(); 120 subscription.resume();
119 subscription.onData(controller.add); 121 subscription.onData(controller.add);
120 subscription.onDone(controller.close); 122 subscription.onDone(controller.close);
(...skipping 28 matching lines...) Expand all
149 } 151 }
150 } 152 }
151 153
152 void pause() { 154 void pause() {
153 paused = true; 155 paused = true;
154 if (resumeCompleter == null) { 156 if (resumeCompleter == null) {
155 resumeCompleter = new Completer(); 157 resumeCompleter = new Completer();
156 subscription.pause(resumeCompleter.future); 158 subscription.pause(resumeCompleter.future);
157 } 159 }
158 } 160 }
159
160 void onPauseStateChange() {
161 if (controller.isPaused) {
162 pause();
163 } else {
164 resume();
165 }
166 }
167
168 void onSubscriptionStateChange() {
169 if (controller.hasListener) {
170 resume();
171 } else {
172 subscription.cancel();
173 }
174 }
175 } 161 }
176 162
177 163
178 /** 164 /**
179 * HTTP parser which parses the data stream given to [consume]. 165 * HTTP parser which parses the data stream given to [consume].
180 * 166 *
181 * If an HTTP parser error occours, the parser will signal an error to either 167 * If an HTTP parser error occours, the parser will signal an error to either
182 * the current _HttpIncoming or the _parser itself. 168 * the current _HttpIncoming or the _parser itself.
183 * 169 *
184 * The connection upgrades (e.g. switching from HTTP/1.1 to the 170 * The connection upgrades (e.g. switching from HTTP/1.1 to the
(...skipping 17 matching lines...) Expand all
202 factory _HttpParser.requestParser() { 188 factory _HttpParser.requestParser() {
203 return new _HttpParser._(true); 189 return new _HttpParser._(true);
204 } 190 }
205 191
206 factory _HttpParser.responseParser() { 192 factory _HttpParser.responseParser() {
207 return new _HttpParser._(false); 193 return new _HttpParser._(false);
208 } 194 }
209 195
210 _HttpParser._(this._requestParser) { 196 _HttpParser._(this._requestParser) {
211 _controller = new StreamController<_HttpIncoming>( 197 _controller = new StreamController<_HttpIncoming>(
212 onSubscriptionStateChange: _updateParsePauseState, 198 onListen: _updateParsePauseState,
213 onPauseStateChange: _updateParsePauseState); 199 onPause: _updateParsePauseState,
200 onResume: _updateParsePauseState,
201 onCancel: _updateParsePauseState);
214 _reset(); 202 _reset();
215 } 203 }
216 204
217 205
218 StreamSubscription<_HttpIncoming> listen(void onData(_HttpIncoming event), 206 StreamSubscription<_HttpIncoming> listen(void onData(_HttpIncoming event),
219 {void onError(AsyncError error), 207 {void onError(AsyncError error),
220 void onDone(), 208 void onDone(),
221 bool cancelOnError}) { 209 bool cancelOnError}) {
222 return _controller.stream.listen(onData, 210 return _controller.stream.listen(onData,
223 onError: onError, 211 onError: onError,
(...skipping 647 matching lines...) Expand 10 before | Expand all | Expand 10 after
871 return byte - 0x61 + 10; // a - f 859 return byte - 0x61 + 10; // a - f
872 } else { 860 } else {
873 throw new HttpParserException("Failed to parse HTTP"); 861 throw new HttpParserException("Failed to parse HTTP");
874 } 862 }
875 } 863 }
876 864
877 void _createIncoming(int transferLength) { 865 void _createIncoming(int transferLength) {
878 assert(_incoming == null); 866 assert(_incoming == null);
879 assert(_bodyController == null); 867 assert(_bodyController == null);
880 _bodyController = new StreamController<List<int>>( 868 _bodyController = new StreamController<List<int>>(
881 onSubscriptionStateChange: _bodySubscriptionStateChange, 869 onListen: _bodySubscriptionStateChange,
882 onPauseStateChange: _updateParsePauseState); 870 onPause: _updateParsePauseState,
871 onResume: _updateParsePauseState,
872 onCancel: _bodySubscriptionStateChange);
883 _incoming = new _HttpIncoming( 873 _incoming = new _HttpIncoming(
884 _headers, transferLength, _bodyController.stream); 874 _headers, transferLength, _bodyController.stream);
885 _pauseParsing(); // Needed to handle detaching - don't start on the body! 875 _pauseParsing(); // Needed to handle detaching - don't start on the body!
886 } 876 }
887 877
888 void _closeIncoming() { 878 void _closeIncoming() {
889 // Ignore multiple close (can happend in re-entrance). 879 // Ignore multiple close (can happend in re-entrance).
890 if (_incoming == null) return; 880 if (_incoming == null) return;
891 var tmp = _incoming; 881 var tmp = _incoming;
892 _incoming = null; 882 _incoming = null;
(...skipping 82 matching lines...) Expand 10 before | Expand all | Expand 10 after
975 StreamController<_HttpIncoming> _controller; 965 StreamController<_HttpIncoming> _controller;
976 StreamController<List<int>> _bodyController; 966 StreamController<List<int>> _bodyController;
977 } 967 }
978 968
979 969
980 class HttpParserException implements Exception { 970 class HttpParserException implements Exception {
981 const HttpParserException([String this.message = ""]); 971 const HttpParserException([String this.message = ""]);
982 String toString() => "HttpParserException: $message"; 972 String toString() => "HttpParserException: $message";
983 final String message; 973 final String message;
984 } 974 }
OLDNEW
« no previous file with comments | « sdk/lib/io/http_impl.dart ('k') | sdk/lib/io/secure_server_socket.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698