OLD | NEW |
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.io; | 5 part of dart.io; |
6 | 6 |
7 // Global constants. | 7 // Global constants. |
8 class _Const { | 8 class _Const { |
9 // Bytes for "HTTP". | 9 // Bytes for "HTTP". |
10 static const HTTP = const [72, 84, 84, 80]; | 10 static const HTTP = const [72, 84, 84, 80]; |
(...skipping 78 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
89 static const int HTTP11 = 2; | 89 static const int HTTP11 = 2; |
90 } | 90 } |
91 | 91 |
92 // States of the HTTP parser state machine. | 92 // States of the HTTP parser state machine. |
93 class _MessageType { | 93 class _MessageType { |
94 static const int UNDETERMINED = 0; | 94 static const int UNDETERMINED = 0; |
95 static const int REQUEST = 1; | 95 static const int REQUEST = 1; |
96 static const int RESPONSE = 0; | 96 static const int RESPONSE = 0; |
97 } | 97 } |
98 | 98 |
99 class _HttpDetachedIncoming extends Stream<List<int>> { | |
100 StreamController<List<int>> controller; | |
101 final StreamSubscription subscription; | |
102 | 99 |
103 List<int> bufferedData; | 100 /** |
104 bool paused; | 101 * The _HttpDetachedStreamSubscription takes a subscription and some extra data, |
| 102 * and makes it possible to "inject" the data in from of other data events |
| 103 * from the subscription. |
| 104 * |
| 105 * It does so by overriding pause/resume, so that once the |
| 106 * _HttpDetachedStreamSubscription is resumed, it'll deliver the data before |
| 107 * resuming the underlaying subscription. |
| 108 */ |
| 109 class _HttpDetachedStreamSubscription implements StreamSubscription<List<int>> { |
| 110 StreamSubscription<List<int>> _subscription; |
| 111 List<int> _injectData; |
| 112 bool _isCanceled = false; |
| 113 int _pauseCount = 1; |
| 114 Function _userOnData; |
| 115 bool _scheduled = false; |
105 | 116 |
106 Completer resumeCompleter; | 117 _HttpDetachedStreamSubscription(this._subscription, |
| 118 this._injectData, |
| 119 this._userOnData); |
107 | 120 |
108 _HttpDetachedIncoming(this.subscription, this.bufferedData) { | 121 bool get isPaused => _subscription.isPaused; |
109 controller = new StreamController<List<int>>( | 122 |
110 sync: true, | 123 Future asFuture([futureValue]) => _subscription.asFuture(futureValue); |
111 onListen: resume, | 124 |
112 onPause: pause, | 125 Future cancel() { |
113 onResume: resume, | 126 _isCanceled = true; |
114 onCancel: () => subscription.cancel()); | 127 _injectData = null; |
115 if (subscription == null) { | 128 return _subscription.cancel(); |
116 // Socket was already closed. | 129 } |
117 if (bufferedData != null) controller.add(bufferedData); | 130 |
118 controller.close(); | 131 void onData(void handleData(List<int> data)) { |
| 132 _userOnData = handleData; |
| 133 _subscription.onData(handleData); |
| 134 } |
| 135 |
| 136 void onDone(void handleDone()) { |
| 137 _subscription.onDone(handleDone); |
| 138 } |
| 139 |
| 140 void onError(Function handleError) { |
| 141 _subscription.onError(handleError); |
| 142 } |
| 143 |
| 144 void pause([Future resumeSignal]) { |
| 145 if (_injectData == null) { |
| 146 _subscription.pause(resumeSignal); |
119 } else { | 147 } else { |
120 pause(); | 148 _pauseCount++; |
121 subscription | 149 if (resumeSignal != null) { |
122 ..resume() | 150 resumeSignal.whenComplete(resume); |
123 ..onData(controller.add) | 151 } |
124 ..onDone(controller.close) | |
125 ..onError(controller.addError); | |
126 } | 152 } |
127 } | 153 } |
128 | 154 |
| 155 void resume() { |
| 156 if (_injectData == null) { |
| 157 _subscription.resume(); |
| 158 } else { |
| 159 _pauseCount--; |
| 160 _maybeScheduleData(); |
| 161 } |
| 162 } |
| 163 |
| 164 void _maybeScheduleData() { |
| 165 if (_scheduled) return; |
| 166 if (_pauseCount != 0) return; |
| 167 _scheduled = true; |
| 168 scheduleMicrotask(() { |
| 169 _scheduled = false; |
| 170 if (_pauseCount > 0 || _isCanceled) return; |
| 171 var data = _injectData; |
| 172 _injectData = null; |
| 173 // To ensure that 'subscription.isPaused' is false, we resume the |
| 174 // subscription here. This is fine as potential events are delayed. |
| 175 _subscription.resume(); |
| 176 if (_userOnData != null) { |
| 177 _userOnData(data); |
| 178 } |
| 179 }); |
| 180 } |
| 181 } |
| 182 |
| 183 |
| 184 class _HttpDetachedIncoming extends Stream<List<int>> { |
| 185 final StreamSubscription subscription; |
| 186 final List<int> bufferedData; |
| 187 |
| 188 _HttpDetachedIncoming(this.subscription, this.bufferedData); |
| 189 |
129 StreamSubscription<List<int>> listen(void onData(List<int> event), | 190 StreamSubscription<List<int>> listen(void onData(List<int> event), |
130 {Function onError, | 191 {Function onError, |
131 void onDone(), | 192 void onDone(), |
132 bool cancelOnError}) { | 193 bool cancelOnError}) { |
133 return controller.stream.listen( | 194 if (subscription != null) { |
134 onData, | 195 subscription |
135 onError: onError, | 196 ..onData(onData) |
136 onDone: onDone, | 197 ..onError(onError) |
137 cancelOnError: cancelOnError); | 198 ..onDone(onDone); |
138 } | 199 if (bufferedData == null) { |
139 | 200 return subscription..resume(); |
140 void resume() { | 201 } |
141 paused = false; | 202 return new _HttpDetachedStreamSubscription(subscription, |
142 if (bufferedData != null) { | 203 bufferedData, |
143 var data = bufferedData; | 204 onData) |
144 bufferedData = null; | 205 ..resume(); |
145 controller.add(data); | 206 } else { |
146 // If the consumer pauses again after the carry-over data, we'll not | 207 return new Stream.fromIterable(bufferedData) |
147 // continue our subscriber until the next resume. | 208 .listen(onData, |
148 if (paused) return; | 209 onError: onError, |
149 } | 210 onDone: onDone, |
150 if (resumeCompleter != null) { | 211 cancelOnError: cancelOnError); |
151 resumeCompleter.complete(); | |
152 resumeCompleter = null; | |
153 } | |
154 } | |
155 | |
156 void pause() { | |
157 paused = true; | |
158 if (resumeCompleter == null) { | |
159 resumeCompleter = new Completer(); | |
160 subscription.pause(resumeCompleter.future); | |
161 } | 212 } |
162 } | 213 } |
163 } | 214 } |
164 | 215 |
165 | 216 |
166 /** | 217 /** |
167 * HTTP parser which parses the data stream given to [consume]. | 218 * HTTP parser which parses the data stream given to [consume]. |
168 * | 219 * |
169 * If an HTTP parser error occours, the parser will signal an error to either | 220 * If an HTTP parser error occours, the parser will signal an error to either |
170 * the current _HttpIncoming or the _parser itself. | 221 * the current _HttpIncoming or the _parser itself. |
(...skipping 814 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
985 } | 1036 } |
986 } | 1037 } |
987 | 1038 |
988 void _reportError(error, [stackTrace]) { | 1039 void _reportError(error, [stackTrace]) { |
989 if (_socketSubscription != null) _socketSubscription.cancel(); | 1040 if (_socketSubscription != null) _socketSubscription.cancel(); |
990 _state = _State.FAILURE; | 1041 _state = _State.FAILURE; |
991 _controller.addError(error, stackTrace); | 1042 _controller.addError(error, stackTrace); |
992 _controller.close(); | 1043 _controller.close(); |
993 } | 1044 } |
994 } | 1045 } |
OLD | NEW |