| OLD | NEW |
| 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 part of dart.io; | 5 part of dart.io; |
| 6 | 6 |
| 7 // Global constants. | 7 // Global constants. |
| 8 class _Const { | 8 class _Const { |
| 9 // Bytes for "HTTP". | 9 // Bytes for "HTTP". |
| 10 static const HTTP = const [72, 84, 84, 80]; | 10 static const HTTP = const [72, 84, 84, 80]; |
| (...skipping 88 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 99 | 99 |
| 100 List<int> carryOverData; | 100 List<int> carryOverData; |
| 101 bool paused; | 101 bool paused; |
| 102 | 102 |
| 103 Completer resumeCompleter; | 103 Completer resumeCompleter; |
| 104 | 104 |
| 105 _HttpDetachedIncoming(StreamSubscription this.subscription, | 105 _HttpDetachedIncoming(StreamSubscription this.subscription, |
| 106 List<int> this.carryOverData, | 106 List<int> this.carryOverData, |
| 107 Completer oldResumeCompleter) { | 107 Completer oldResumeCompleter) { |
| 108 controller = new StreamController<List<int>>( | 108 controller = new StreamController<List<int>>( |
| 109 onSubscriptionStateChange: onSubscriptionStateChange, | 109 onListen: resume, |
| 110 onPauseStateChange: onPauseStateChange); | 110 onPause: pause, |
| 111 onResume: resume, |
| 112 onCancel: () => subscription.cancel()); |
| 111 if (subscription == null) { | 113 if (subscription == null) { |
| 112 // Socket was already closed. | 114 // Socket was already closed. |
| 113 if (carryOverData != null) controller.add(carryOverData); | 115 if (carryOverData != null) controller.add(carryOverData); |
| 114 controller.close(); | 116 controller.close(); |
| 115 } else { | 117 } else { |
| 116 pause(); | 118 pause(); |
| 117 if (oldResumeCompleter != null) oldResumeCompleter.complete(); | 119 if (oldResumeCompleter != null) oldResumeCompleter.complete(); |
| 118 subscription.resume(); | 120 subscription.resume(); |
| 119 subscription.onData(controller.add); | 121 subscription.onData(controller.add); |
| 120 subscription.onDone(controller.close); | 122 subscription.onDone(controller.close); |
| (...skipping 28 matching lines...) Expand all Loading... |
| 149 } | 151 } |
| 150 } | 152 } |
| 151 | 153 |
| 152 void pause() { | 154 void pause() { |
| 153 paused = true; | 155 paused = true; |
| 154 if (resumeCompleter == null) { | 156 if (resumeCompleter == null) { |
| 155 resumeCompleter = new Completer(); | 157 resumeCompleter = new Completer(); |
| 156 subscription.pause(resumeCompleter.future); | 158 subscription.pause(resumeCompleter.future); |
| 157 } | 159 } |
| 158 } | 160 } |
| 159 | |
| 160 void onPauseStateChange() { | |
| 161 if (controller.isPaused) { | |
| 162 pause(); | |
| 163 } else { | |
| 164 resume(); | |
| 165 } | |
| 166 } | |
| 167 | |
| 168 void onSubscriptionStateChange() { | |
| 169 if (controller.hasListener) { | |
| 170 resume(); | |
| 171 } else { | |
| 172 subscription.cancel(); | |
| 173 } | |
| 174 } | |
| 175 } | 161 } |
| 176 | 162 |
| 177 | 163 |
| 178 /** | 164 /** |
| 179 * HTTP parser which parses the data stream given to [consume]. | 165 * HTTP parser which parses the data stream given to [consume]. |
| 180 * | 166 * |
| 181 * If an HTTP parser error occours, the parser will signal an error to either | 167 * If an HTTP parser error occours, the parser will signal an error to either |
| 182 * the current _HttpIncoming or the _parser itself. | 168 * the current _HttpIncoming or the _parser itself. |
| 183 * | 169 * |
| 184 * The connection upgrades (e.g. switching from HTTP/1.1 to the | 170 * The connection upgrades (e.g. switching from HTTP/1.1 to the |
| (...skipping 17 matching lines...) Expand all Loading... |
| 202 factory _HttpParser.requestParser() { | 188 factory _HttpParser.requestParser() { |
| 203 return new _HttpParser._(true); | 189 return new _HttpParser._(true); |
| 204 } | 190 } |
| 205 | 191 |
| 206 factory _HttpParser.responseParser() { | 192 factory _HttpParser.responseParser() { |
| 207 return new _HttpParser._(false); | 193 return new _HttpParser._(false); |
| 208 } | 194 } |
| 209 | 195 |
| 210 _HttpParser._(this._requestParser) { | 196 _HttpParser._(this._requestParser) { |
| 211 _controller = new StreamController<_HttpIncoming>( | 197 _controller = new StreamController<_HttpIncoming>( |
| 212 onSubscriptionStateChange: _updateParsePauseState, | 198 onListen: _updateParsePauseState, |
| 213 onPauseStateChange: _updateParsePauseState); | 199 onPause: _updateParsePauseState, |
| 200 onResume: _updateParsePauseState, |
| 201 onCancel: _updateParsePauseState); |
| 214 _reset(); | 202 _reset(); |
| 215 } | 203 } |
| 216 | 204 |
| 217 | 205 |
| 218 StreamSubscription<_HttpIncoming> listen(void onData(_HttpIncoming event), | 206 StreamSubscription<_HttpIncoming> listen(void onData(_HttpIncoming event), |
| 219 {void onError(AsyncError error), | 207 {void onError(AsyncError error), |
| 220 void onDone(), | 208 void onDone(), |
| 221 bool cancelOnError}) { | 209 bool cancelOnError}) { |
| 222 return _controller.stream.listen(onData, | 210 return _controller.stream.listen(onData, |
| 223 onError: onError, | 211 onError: onError, |
| (...skipping 647 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 871 return byte - 0x61 + 10; // a - f | 859 return byte - 0x61 + 10; // a - f |
| 872 } else { | 860 } else { |
| 873 throw new HttpParserException("Failed to parse HTTP"); | 861 throw new HttpParserException("Failed to parse HTTP"); |
| 874 } | 862 } |
| 875 } | 863 } |
| 876 | 864 |
| 877 void _createIncoming(int transferLength) { | 865 void _createIncoming(int transferLength) { |
| 878 assert(_incoming == null); | 866 assert(_incoming == null); |
| 879 assert(_bodyController == null); | 867 assert(_bodyController == null); |
| 880 _bodyController = new StreamController<List<int>>( | 868 _bodyController = new StreamController<List<int>>( |
| 881 onSubscriptionStateChange: _bodySubscriptionStateChange, | 869 onListen: _bodySubscriptionStateChange, |
| 882 onPauseStateChange: _updateParsePauseState); | 870 onPause: _updateParsePauseState, |
| 871 onResume: _updateParsePauseState, |
| 872 onCancel: _bodySubscriptionStateChange); |
| 883 _incoming = new _HttpIncoming( | 873 _incoming = new _HttpIncoming( |
| 884 _headers, transferLength, _bodyController.stream); | 874 _headers, transferLength, _bodyController.stream); |
| 885 _pauseParsing(); // Needed to handle detaching - don't start on the body! | 875 _pauseParsing(); // Needed to handle detaching - don't start on the body! |
| 886 } | 876 } |
| 887 | 877 |
| 888 void _closeIncoming() { | 878 void _closeIncoming() { |
| 889 // Ignore multiple close (can happend in re-entrance). | 879 // Ignore multiple close (can happend in re-entrance). |
| 890 if (_incoming == null) return; | 880 if (_incoming == null) return; |
| 891 var tmp = _incoming; | 881 var tmp = _incoming; |
| 892 _incoming = null; | 882 _incoming = null; |
| (...skipping 82 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 975 StreamController<_HttpIncoming> _controller; | 965 StreamController<_HttpIncoming> _controller; |
| 976 StreamController<List<int>> _bodyController; | 966 StreamController<List<int>> _bodyController; |
| 977 } | 967 } |
| 978 | 968 |
| 979 | 969 |
| 980 class HttpParserException implements Exception { | 970 class HttpParserException implements Exception { |
| 981 const HttpParserException([String this.message = ""]); | 971 const HttpParserException([String this.message = ""]); |
| 982 String toString() => "HttpParserException: $message"; | 972 String toString() => "HttpParserException: $message"; |
| 983 final String message; | 973 final String message; |
| 984 } | 974 } |
| OLD | NEW |