OLD | NEW |
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.io; | 5 part of dart.io; |
6 | 6 |
7 // Global constants. | 7 // Global constants. |
8 class _Const { | 8 class _Const { |
9 // Bytes for "HTTP". | 9 // Bytes for "HTTP". |
10 static const HTTP = const [72, 84, 84, 80]; | 10 static const HTTP = const [72, 84, 84, 80]; |
(...skipping 88 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
99 | 99 |
100 List<int> carryOverData; | 100 List<int> carryOverData; |
101 bool paused; | 101 bool paused; |
102 | 102 |
103 Completer resumeCompleter; | 103 Completer resumeCompleter; |
104 | 104 |
105 _HttpDetachedIncoming(StreamSubscription this.subscription, | 105 _HttpDetachedIncoming(StreamSubscription this.subscription, |
106 List<int> this.carryOverData, | 106 List<int> this.carryOverData, |
107 Completer oldResumeCompleter) { | 107 Completer oldResumeCompleter) { |
108 controller = new StreamController<List<int>>( | 108 controller = new StreamController<List<int>>( |
109 onSubscriptionStateChange: onSubscriptionStateChange, | 109 onListen: resume, |
110 onPauseStateChange: onPauseStateChange); | 110 onPause: pause, |
| 111 onResume: resume, |
| 112 onCancel: () => subscription.cancel()); |
111 if (subscription == null) { | 113 if (subscription == null) { |
112 // Socket was already closed. | 114 // Socket was already closed. |
113 if (carryOverData != null) controller.add(carryOverData); | 115 if (carryOverData != null) controller.add(carryOverData); |
114 controller.close(); | 116 controller.close(); |
115 } else { | 117 } else { |
116 pause(); | 118 pause(); |
117 if (oldResumeCompleter != null) oldResumeCompleter.complete(); | 119 if (oldResumeCompleter != null) oldResumeCompleter.complete(); |
118 subscription.resume(); | 120 subscription.resume(); |
119 subscription.onData(controller.add); | 121 subscription.onData(controller.add); |
120 subscription.onDone(controller.close); | 122 subscription.onDone(controller.close); |
(...skipping 28 matching lines...) Expand all Loading... |
149 } | 151 } |
150 } | 152 } |
151 | 153 |
152 void pause() { | 154 void pause() { |
153 paused = true; | 155 paused = true; |
154 if (resumeCompleter == null) { | 156 if (resumeCompleter == null) { |
155 resumeCompleter = new Completer(); | 157 resumeCompleter = new Completer(); |
156 subscription.pause(resumeCompleter.future); | 158 subscription.pause(resumeCompleter.future); |
157 } | 159 } |
158 } | 160 } |
159 | |
160 void onPauseStateChange() { | |
161 if (controller.isPaused) { | |
162 pause(); | |
163 } else { | |
164 resume(); | |
165 } | |
166 } | |
167 | |
168 void onSubscriptionStateChange() { | |
169 if (controller.hasListener) { | |
170 resume(); | |
171 } else { | |
172 subscription.cancel(); | |
173 } | |
174 } | |
175 } | 161 } |
176 | 162 |
177 | 163 |
178 /** | 164 /** |
179 * HTTP parser which parses the data stream given to [consume]. | 165 * HTTP parser which parses the data stream given to [consume]. |
180 * | 166 * |
181 * If an HTTP parser error occours, the parser will signal an error to either | 167 * If an HTTP parser error occours, the parser will signal an error to either |
182 * the current _HttpIncoming or the _parser itself. | 168 * the current _HttpIncoming or the _parser itself. |
183 * | 169 * |
184 * The connection upgrades (e.g. switching from HTTP/1.1 to the | 170 * The connection upgrades (e.g. switching from HTTP/1.1 to the |
(...skipping 17 matching lines...) Expand all Loading... |
202 factory _HttpParser.requestParser() { | 188 factory _HttpParser.requestParser() { |
203 return new _HttpParser._(true); | 189 return new _HttpParser._(true); |
204 } | 190 } |
205 | 191 |
206 factory _HttpParser.responseParser() { | 192 factory _HttpParser.responseParser() { |
207 return new _HttpParser._(false); | 193 return new _HttpParser._(false); |
208 } | 194 } |
209 | 195 |
210 _HttpParser._(this._requestParser) { | 196 _HttpParser._(this._requestParser) { |
211 _controller = new StreamController<_HttpIncoming>( | 197 _controller = new StreamController<_HttpIncoming>( |
212 onSubscriptionStateChange: _updateParsePauseState, | 198 onListen: _updateParsePauseState, |
213 onPauseStateChange: _updateParsePauseState); | 199 onPause: _updateParsePauseState, |
| 200 onResume: _updateParsePauseState, |
| 201 onCancel: _updateParsePauseState); |
214 _reset(); | 202 _reset(); |
215 } | 203 } |
216 | 204 |
217 | 205 |
218 StreamSubscription<_HttpIncoming> listen(void onData(_HttpIncoming event), | 206 StreamSubscription<_HttpIncoming> listen(void onData(_HttpIncoming event), |
219 {void onError(AsyncError error), | 207 {void onError(AsyncError error), |
220 void onDone(), | 208 void onDone(), |
221 bool cancelOnError}) { | 209 bool cancelOnError}) { |
222 return _controller.stream.listen(onData, | 210 return _controller.stream.listen(onData, |
223 onError: onError, | 211 onError: onError, |
(...skipping 647 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
871 return byte - 0x61 + 10; // a - f | 859 return byte - 0x61 + 10; // a - f |
872 } else { | 860 } else { |
873 throw new HttpParserException("Failed to parse HTTP"); | 861 throw new HttpParserException("Failed to parse HTTP"); |
874 } | 862 } |
875 } | 863 } |
876 | 864 |
877 void _createIncoming(int transferLength) { | 865 void _createIncoming(int transferLength) { |
878 assert(_incoming == null); | 866 assert(_incoming == null); |
879 assert(_bodyController == null); | 867 assert(_bodyController == null); |
880 _bodyController = new StreamController<List<int>>( | 868 _bodyController = new StreamController<List<int>>( |
881 onSubscriptionStateChange: _bodySubscriptionStateChange, | 869 onListen: _bodySubscriptionStateChange, |
882 onPauseStateChange: _updateParsePauseState); | 870 onPause: _updateParsePauseState, |
| 871 onResume: _updateParsePauseState, |
| 872 onCancel: _bodySubscriptionStateChange); |
883 _incoming = new _HttpIncoming( | 873 _incoming = new _HttpIncoming( |
884 _headers, transferLength, _bodyController.stream); | 874 _headers, transferLength, _bodyController.stream); |
885 _pauseParsing(); // Needed to handle detaching - don't start on the body! | 875 _pauseParsing(); // Needed to handle detaching - don't start on the body! |
886 } | 876 } |
887 | 877 |
888 void _closeIncoming() { | 878 void _closeIncoming() { |
889 // Ignore multiple close (can happend in re-entrance). | 879 // Ignore multiple close (can happend in re-entrance). |
890 if (_incoming == null) return; | 880 if (_incoming == null) return; |
891 var tmp = _incoming; | 881 var tmp = _incoming; |
892 _incoming = null; | 882 _incoming = null; |
(...skipping 82 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
975 StreamController<_HttpIncoming> _controller; | 965 StreamController<_HttpIncoming> _controller; |
976 StreamController<List<int>> _bodyController; | 966 StreamController<List<int>> _bodyController; |
977 } | 967 } |
978 | 968 |
979 | 969 |
980 class HttpParserException implements Exception { | 970 class HttpParserException implements Exception { |
981 const HttpParserException([String this.message = ""]); | 971 const HttpParserException([String this.message = ""]); |
982 String toString() => "HttpParserException: $message"; | 972 String toString() => "HttpParserException: $message"; |
983 final String message; | 973 final String message; |
984 } | 974 } |
OLD | NEW |