| OLD | NEW |
| 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 part of dart.io; | 5 part of dart.io; |
| 6 | 6 |
| 7 // Global constants. | 7 // Global constants. |
| 8 class _Const { | 8 class _Const { |
| 9 // Bytes for "HTTP". | 9 // Bytes for "HTTP". |
| 10 static const HTTP = const [72, 84, 84, 80]; | 10 static const HTTP = const [72, 84, 84, 80]; |
| (...skipping 88 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 99 | 99 |
| 100 List<int> carryOverData; | 100 List<int> carryOverData; |
| 101 bool paused; | 101 bool paused; |
| 102 | 102 |
| 103 Completer resumeCompleter; | 103 Completer resumeCompleter; |
| 104 | 104 |
| 105 _HttpDetachedIncoming(StreamSubscription this.subscription, | 105 _HttpDetachedIncoming(StreamSubscription this.subscription, |
| 106 List<int> this.carryOverData, | 106 List<int> this.carryOverData, |
| 107 Completer oldResumeCompleter) { | 107 Completer oldResumeCompleter) { |
| 108 controller = new StreamController<List<int>>( | 108 controller = new StreamController<List<int>>( |
| 109 onSubscriptionStateChange: onSubscriptionStateChange, | 109 onListen: resume, |
| 110 onPauseStateChange: onPauseStateChange); | 110 onPause: pause, |
| 111 onResume: resume, |
| 112 onCancel: () => subscription.cancel()); |
| 111 if (subscription == null) { | 113 if (subscription == null) { |
| 112 // Socket was already closed. | 114 // Socket was already closed. |
| 113 if (carryOverData != null) controller.add(carryOverData); | 115 if (carryOverData != null) controller.add(carryOverData); |
| 114 controller.close(); | 116 controller.close(); |
| 115 } else { | 117 } else { |
| 116 pause(); | 118 pause(); |
| 117 if (oldResumeCompleter != null) oldResumeCompleter.complete(); | 119 if (oldResumeCompleter != null) oldResumeCompleter.complete(); |
| 118 subscription.resume(); | 120 subscription.resume(); |
| 119 subscription.onData(controller.add); | 121 subscription.onData(controller.add); |
| 120 subscription.onDone(controller.close); | 122 subscription.onDone(controller.close); |
| (...skipping 28 matching lines...) Expand all Loading... |
| 149 } | 151 } |
| 150 } | 152 } |
| 151 | 153 |
| 152 void pause() { | 154 void pause() { |
| 153 paused = true; | 155 paused = true; |
| 154 if (resumeCompleter == null) { | 156 if (resumeCompleter == null) { |
| 155 resumeCompleter = new Completer(); | 157 resumeCompleter = new Completer(); |
| 156 subscription.pause(resumeCompleter.future); | 158 subscription.pause(resumeCompleter.future); |
| 157 } | 159 } |
| 158 } | 160 } |
| 159 | |
| 160 void onPauseStateChange() { | |
| 161 if (controller.isPaused) { | |
| 162 pause(); | |
| 163 } else { | |
| 164 resume(); | |
| 165 } | |
| 166 } | |
| 167 | |
| 168 void onSubscriptionStateChange() { | |
| 169 if (controller.hasListener) { | |
| 170 resume(); | |
| 171 } else { | |
| 172 subscription.cancel(); | |
| 173 } | |
| 174 } | |
| 175 } | 161 } |
| 176 | 162 |
| 177 | 163 |
| 178 /** | 164 /** |
| 179 * HTTP parser which parses the data stream given to [consume]. | 165 * HTTP parser which parses the data stream given to [consume]. |
| 180 * | 166 * |
| 181 * If an HTTP parser error occours, the parser will signal an error to either | 167 * If an HTTP parser error occours, the parser will signal an error to either |
| 182 * the current _HttpIncoming or the _parser itself. | 168 * the current _HttpIncoming or the _parser itself. |
| 183 * | 169 * |
| 184 * The connection upgrades (e.g. switching from HTTP/1.1 to the | 170 * The connection upgrades (e.g. switching from HTTP/1.1 to the |
| (...skipping 17 matching lines...) Expand all Loading... |
| 202 factory _HttpParser.requestParser() { | 188 factory _HttpParser.requestParser() { |
| 203 return new _HttpParser._(true); | 189 return new _HttpParser._(true); |
| 204 } | 190 } |
| 205 | 191 |
| 206 factory _HttpParser.responseParser() { | 192 factory _HttpParser.responseParser() { |
| 207 return new _HttpParser._(false); | 193 return new _HttpParser._(false); |
| 208 } | 194 } |
| 209 | 195 |
| 210 _HttpParser._(this._requestParser) { | 196 _HttpParser._(this._requestParser) { |
| 211 _controller = new StreamController<_HttpIncoming>( | 197 _controller = new StreamController<_HttpIncoming>( |
| 212 onSubscriptionStateChange: _updateParsePauseState, | 198 onListen: _updateParsePauseState, |
| 213 onPauseStateChange: _updateParsePauseState); | 199 onPause: _updateParsePauseState, |
| 200 onResume: _updateParsePauseState, |
| 201 onCancel: _updateParsePauseState); |
| 214 _reset(); | 202 _reset(); |
| 215 } | 203 } |
| 216 | 204 |
| 217 | 205 |
| 218 StreamSubscription<_HttpIncoming> listen(void onData(_HttpIncoming event), | 206 StreamSubscription<_HttpIncoming> listen(void onData(_HttpIncoming event), |
| 219 {void onError(AsyncError error), | 207 {void onError(AsyncError error), |
| 220 void onDone(), | 208 void onDone(), |
| 221 bool unsubscribeOnError}) { | 209 bool unsubscribeOnError}) { |
| 222 return _controller.stream.listen(onData, | 210 return _controller.stream.listen(onData, |
| 223 onError: onError, | 211 onError: onError, |
| (...skipping 651 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 875 return byte - 0x61 + 10; // a - f | 863 return byte - 0x61 + 10; // a - f |
| 876 } else { | 864 } else { |
| 877 throw new HttpParserException("Failed to parse HTTP"); | 865 throw new HttpParserException("Failed to parse HTTP"); |
| 878 } | 866 } |
| 879 } | 867 } |
| 880 | 868 |
| 881 void _createIncoming(int transferLength) { | 869 void _createIncoming(int transferLength) { |
| 882 assert(_incoming == null); | 870 assert(_incoming == null); |
| 883 assert(_bodyController == null); | 871 assert(_bodyController == null); |
| 884 _bodyController = new StreamController<List<int>>( | 872 _bodyController = new StreamController<List<int>>( |
| 885 onSubscriptionStateChange: _bodySubscriptionStateChange, | 873 onListen: _bodySubscriptionStateChange, |
| 886 onPauseStateChange: _updateParsePauseState); | 874 onPause: _updateParsePauseState, |
| 875 onResume: _updateParsePauseState, |
| 876 onCancel: _bodySubscriptionStateChange); |
| 887 _incoming = new _HttpIncoming( | 877 _incoming = new _HttpIncoming( |
| 888 _headers, transferLength, _bodyController.stream); | 878 _headers, transferLength, _bodyController.stream); |
| 889 _pauseParsing(); // Needed to handle detaching - don't start on the body! | 879 _pauseParsing(); // Needed to handle detaching - don't start on the body! |
| 890 } | 880 } |
| 891 | 881 |
| 892 void _closeIncoming() { | 882 void _closeIncoming() { |
| 893 // Ignore multiple close (can happend in re-entrance). | 883 // Ignore multiple close (can happend in re-entrance). |
| 894 if (_incoming == null) return; | 884 if (_incoming == null) return; |
| 895 var tmp = _incoming; | 885 var tmp = _incoming; |
| 896 _incoming = null; | 886 _incoming = null; |
| (...skipping 82 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 979 StreamController<_HttpIncoming> _controller; | 969 StreamController<_HttpIncoming> _controller; |
| 980 StreamController<List<int>> _bodyController; | 970 StreamController<List<int>> _bodyController; |
| 981 } | 971 } |
| 982 | 972 |
| 983 | 973 |
| 984 class HttpParserException implements Exception { | 974 class HttpParserException implements Exception { |
| 985 const HttpParserException([String this.message = ""]); | 975 const HttpParserException([String this.message = ""]); |
| 986 String toString() => "HttpParserException: $message"; | 976 String toString() => "HttpParserException: $message"; |
| 987 final String message; | 977 final String message; |
| 988 } | 978 } |
| OLD | NEW |