OLD | NEW |
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.io; | 5 part of dart.io; |
6 | 6 |
7 // Global constants. | 7 // Global constants. |
8 class _Const { | 8 class _Const { |
9 // Bytes for "HTTP". | 9 // Bytes for "HTTP". |
10 static const HTTP = const [72, 84, 84, 80]; | 10 static const HTTP = const [72, 84, 84, 80]; |
(...skipping 89 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
100 final StreamSubscription subscription; | 100 final StreamSubscription subscription; |
101 | 101 |
102 List<int> carryOverData; | 102 List<int> carryOverData; |
103 bool paused; | 103 bool paused; |
104 | 104 |
105 Completer resumeCompleter; | 105 Completer resumeCompleter; |
106 | 106 |
107 _HttpDetachedIncoming(StreamSubscription this.subscription, | 107 _HttpDetachedIncoming(StreamSubscription this.subscription, |
108 List<int> this.carryOverData) { | 108 List<int> this.carryOverData) { |
109 controller = new StreamController<List<int>>( | 109 controller = new StreamController<List<int>>( |
| 110 sync: true, |
110 onListen: resume, | 111 onListen: resume, |
111 onPause: pause, | 112 onPause: pause, |
112 onResume: resume, | 113 onResume: resume, |
113 onCancel: () => subscription.cancel()); | 114 onCancel: () => subscription.cancel()); |
114 if (subscription == null) { | 115 if (subscription == null) { |
115 // Socket was already closed. | 116 // Socket was already closed. |
116 if (carryOverData != null) controller.add(carryOverData); | 117 if (carryOverData != null) controller.add(carryOverData); |
117 controller.close(); | 118 controller.close(); |
118 } else { | 119 } else { |
119 pause(); | 120 pause(); |
(...skipping 68 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
188 factory _HttpParser.requestParser() { | 189 factory _HttpParser.requestParser() { |
189 return new _HttpParser._(true); | 190 return new _HttpParser._(true); |
190 } | 191 } |
191 | 192 |
192 factory _HttpParser.responseParser() { | 193 factory _HttpParser.responseParser() { |
193 return new _HttpParser._(false); | 194 return new _HttpParser._(false); |
194 } | 195 } |
195 | 196 |
196 _HttpParser._(this._requestParser) { | 197 _HttpParser._(this._requestParser) { |
197 _controller = new StreamController<_HttpIncoming>( | 198 _controller = new StreamController<_HttpIncoming>( |
198 onListen: () { | 199 sync: true, |
199 _socketSubscription.resume(); | 200 onListen: () { |
200 _paused = false; | 201 _socketSubscription.resume(); |
201 }, | 202 _paused = false; |
202 onPause: () { | 203 }, |
203 _paused = true; | 204 onPause: () { |
204 _pauseStateChanged(); | 205 _paused = true; |
205 }, | 206 _pauseStateChanged(); |
206 onResume: () { | 207 }, |
207 _paused = false; | 208 onResume: () { |
208 _pauseStateChanged(); | 209 _paused = false; |
209 }, | 210 _pauseStateChanged(); |
210 onCancel: () { | 211 }, |
211 try { | 212 onCancel: () { |
212 _socketSubscription.cancel(); | 213 try { |
213 } catch (e) { | 214 _socketSubscription.cancel(); |
214 } | 215 } catch (e) { |
215 }); | 216 } |
| 217 }); |
216 _reset(); | 218 _reset(); |
217 } | 219 } |
218 | 220 |
219 | 221 |
220 StreamSubscription<_HttpIncoming> listen(void onData(_HttpIncoming event), | 222 StreamSubscription<_HttpIncoming> listen(void onData(_HttpIncoming event), |
221 {void onError(error), | 223 {void onError(error), |
222 void onDone(), | 224 void onDone(), |
223 bool cancelOnError}) { | 225 bool cancelOnError}) { |
224 return _controller.stream.listen(onData, | 226 return _controller.stream.listen(onData, |
225 onError: onError, | 227 onError: onError, |
(...skipping 650 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
876 throw new HttpParserException("Failed to parse HTTP"); | 878 throw new HttpParserException("Failed to parse HTTP"); |
877 } | 879 } |
878 } | 880 } |
879 | 881 |
880 void _createIncoming(int transferLength) { | 882 void _createIncoming(int transferLength) { |
881 assert(_incoming == null); | 883 assert(_incoming == null); |
882 assert(_bodyController == null); | 884 assert(_bodyController == null); |
883 assert(!_bodyPaused); | 885 assert(!_bodyPaused); |
884 var incoming; | 886 var incoming; |
885 _bodyController = new StreamController<List<int>>( | 887 _bodyController = new StreamController<List<int>>( |
| 888 sync: true, |
886 onListen: () { | 889 onListen: () { |
887 if (incoming != _incoming) return; | 890 if (incoming != _incoming) return; |
888 assert(_bodyPaused); | 891 assert(_bodyPaused); |
889 _bodyPaused = false; | 892 _bodyPaused = false; |
890 _pauseStateChanged(); | 893 _pauseStateChanged(); |
891 }, | 894 }, |
892 onPause: () { | 895 onPause: () { |
893 if (incoming != _incoming) return; | 896 if (incoming != _incoming) return; |
894 assert(!_bodyPaused); | 897 assert(!_bodyPaused); |
895 _bodyPaused = true; | 898 _bodyPaused = true; |
(...skipping 89 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
985 StreamController<_HttpIncoming> _controller; | 988 StreamController<_HttpIncoming> _controller; |
986 StreamController<List<int>> _bodyController; | 989 StreamController<List<int>> _bodyController; |
987 } | 990 } |
988 | 991 |
989 | 992 |
990 class HttpParserException implements Exception { | 993 class HttpParserException implements Exception { |
991 const HttpParserException([String this.message = ""]); | 994 const HttpParserException([String this.message = ""]); |
992 String toString() => "HttpParserException: $message"; | 995 String toString() => "HttpParserException: $message"; |
993 final String message; | 996 final String message; |
994 } | 997 } |
OLD | NEW |