OLD | NEW |
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.io; | 5 part of dart.io; |
6 | 6 |
7 // Global constants. | 7 // Global constants. |
8 class _Const { | 8 class _Const { |
9 // Bytes for "HTTP". | 9 // Bytes for "HTTP". |
10 static const HTTP = const [72, 84, 84, 80]; | 10 static const HTTP = const [72, 84, 84, 80]; |
(...skipping 88 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
99 | 99 |
100 List<int> carryOverData; | 100 List<int> carryOverData; |
101 bool paused; | 101 bool paused; |
102 | 102 |
103 Completer resumeCompleter; | 103 Completer resumeCompleter; |
104 | 104 |
105 _HttpDetachedIncoming(StreamSubscription this.subscription, | 105 _HttpDetachedIncoming(StreamSubscription this.subscription, |
106 List<int> this.carryOverData, | 106 List<int> this.carryOverData, |
107 Completer oldResumeCompleter) { | 107 Completer oldResumeCompleter) { |
108 controller = new StreamController<List<int>>( | 108 controller = new StreamController<List<int>>( |
109 onSubscriptionStateChange: onSubscriptionStateChange, | 109 onListen: resume, |
110 onPauseStateChange: onPauseStateChange); | 110 onPause: pause, |
| 111 onResume: resume, |
| 112 onCancel: () => subscription.cancel()); |
111 if (subscription == null) { | 113 if (subscription == null) { |
112 // Socket was already closed. | 114 // Socket was already closed. |
113 if (carryOverData != null) controller.add(carryOverData); | 115 if (carryOverData != null) controller.add(carryOverData); |
114 controller.close(); | 116 controller.close(); |
115 } else { | 117 } else { |
116 pause(); | 118 pause(); |
117 if (oldResumeCompleter != null) oldResumeCompleter.complete(); | 119 if (oldResumeCompleter != null) oldResumeCompleter.complete(); |
118 subscription.resume(); | 120 subscription.resume(); |
119 subscription.onData(controller.add); | 121 subscription.onData(controller.add); |
120 subscription.onDone(controller.close); | 122 subscription.onDone(controller.close); |
(...skipping 28 matching lines...) Expand all Loading... |
149 } | 151 } |
150 } | 152 } |
151 | 153 |
152 void pause() { | 154 void pause() { |
153 paused = true; | 155 paused = true; |
154 if (resumeCompleter == null) { | 156 if (resumeCompleter == null) { |
155 resumeCompleter = new Completer(); | 157 resumeCompleter = new Completer(); |
156 subscription.pause(resumeCompleter.future); | 158 subscription.pause(resumeCompleter.future); |
157 } | 159 } |
158 } | 160 } |
159 | |
160 void onPauseStateChange() { | |
161 if (controller.isPaused) { | |
162 pause(); | |
163 } else { | |
164 resume(); | |
165 } | |
166 } | |
167 | |
168 void onSubscriptionStateChange() { | |
169 if (controller.hasListener) { | |
170 resume(); | |
171 } else { | |
172 subscription.cancel(); | |
173 } | |
174 } | |
175 } | 161 } |
176 | 162 |
177 | 163 |
178 /** | 164 /** |
179 * HTTP parser which parses the data stream given to [consume]. | 165 * HTTP parser which parses the data stream given to [consume]. |
180 * | 166 * |
181 * If an HTTP parser error occours, the parser will signal an error to either | 167 * If an HTTP parser error occours, the parser will signal an error to either |
182 * the current _HttpIncoming or the _parser itself. | 168 * the current _HttpIncoming or the _parser itself. |
183 * | 169 * |
184 * The connection upgrades (e.g. switching from HTTP/1.1 to the | 170 * The connection upgrades (e.g. switching from HTTP/1.1 to the |
(...skipping 17 matching lines...) Expand all Loading... |
202 factory _HttpParser.requestParser() { | 188 factory _HttpParser.requestParser() { |
203 return new _HttpParser._(true); | 189 return new _HttpParser._(true); |
204 } | 190 } |
205 | 191 |
206 factory _HttpParser.responseParser() { | 192 factory _HttpParser.responseParser() { |
207 return new _HttpParser._(false); | 193 return new _HttpParser._(false); |
208 } | 194 } |
209 | 195 |
210 _HttpParser._(this._requestParser) { | 196 _HttpParser._(this._requestParser) { |
211 _controller = new StreamController<_HttpIncoming>( | 197 _controller = new StreamController<_HttpIncoming>( |
212 onSubscriptionStateChange: _updateParsePauseState, | 198 onListen: _updateParsePauseState, |
213 onPauseStateChange: _updateParsePauseState); | 199 onPause: _updateParsePauseState, |
| 200 onResume: _updateParsePauseState, |
| 201 onCancel: _updateParsePauseState); |
214 _reset(); | 202 _reset(); |
215 } | 203 } |
216 | 204 |
217 | 205 |
218 StreamSubscription<_HttpIncoming> listen(void onData(_HttpIncoming event), | 206 StreamSubscription<_HttpIncoming> listen(void onData(_HttpIncoming event), |
219 {void onError(AsyncError error), | 207 {void onError(AsyncError error), |
220 void onDone(), | 208 void onDone(), |
221 bool unsubscribeOnError}) { | 209 bool unsubscribeOnError}) { |
222 return _controller.stream.listen(onData, | 210 return _controller.stream.listen(onData, |
223 onError: onError, | 211 onError: onError, |
(...skipping 651 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
875 return byte - 0x61 + 10; // a - f | 863 return byte - 0x61 + 10; // a - f |
876 } else { | 864 } else { |
877 throw new HttpParserException("Failed to parse HTTP"); | 865 throw new HttpParserException("Failed to parse HTTP"); |
878 } | 866 } |
879 } | 867 } |
880 | 868 |
881 void _createIncoming(int transferLength) { | 869 void _createIncoming(int transferLength) { |
882 assert(_incoming == null); | 870 assert(_incoming == null); |
883 assert(_bodyController == null); | 871 assert(_bodyController == null); |
884 _bodyController = new StreamController<List<int>>( | 872 _bodyController = new StreamController<List<int>>( |
885 onSubscriptionStateChange: _bodySubscriptionStateChange, | 873 onListen: _bodySubscriptionStateChange, |
886 onPauseStateChange: _updateParsePauseState); | 874 onPause: _updateParsePauseState, |
| 875 onResume: _updateParsePauseState, |
| 876 onCancel: _bodySubscriptionStateChange); |
887 _incoming = new _HttpIncoming( | 877 _incoming = new _HttpIncoming( |
888 _headers, transferLength, _bodyController.stream); | 878 _headers, transferLength, _bodyController.stream); |
889 _pauseParsing(); // Needed to handle detaching - don't start on the body! | 879 _pauseParsing(); // Needed to handle detaching - don't start on the body! |
890 } | 880 } |
891 | 881 |
892 void _closeIncoming() { | 882 void _closeIncoming() { |
893 // Ignore multiple close (can happend in re-entrance). | 883 // Ignore multiple close (can happend in re-entrance). |
894 if (_incoming == null) return; | 884 if (_incoming == null) return; |
895 var tmp = _incoming; | 885 var tmp = _incoming; |
896 _incoming = null; | 886 _incoming = null; |
(...skipping 82 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
979 StreamController<_HttpIncoming> _controller; | 969 StreamController<_HttpIncoming> _controller; |
980 StreamController<List<int>> _bodyController; | 970 StreamController<List<int>> _bodyController; |
981 } | 971 } |
982 | 972 |
983 | 973 |
984 class HttpParserException implements Exception { | 974 class HttpParserException implements Exception { |
985 const HttpParserException([String this.message = ""]); | 975 const HttpParserException([String this.message = ""]); |
986 String toString() => "HttpParserException: $message"; | 976 String toString() => "HttpParserException: $message"; |
987 final String message; | 977 final String message; |
988 } | 978 } |
OLD | NEW |