Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 part of dart.io; | 5 part of dart.io; |
| 6 | 6 |
| 7 // Global constants. | 7 // Global constants. |
| 8 class _Const { | 8 class _Const { |
| 9 // Bytes for "HTTP". | 9 // Bytes for "HTTP". |
| 10 static const HTTP = const [72, 84, 84, 80]; | 10 static const HTTP = const [72, 84, 84, 80]; |
| (...skipping 78 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 89 static const int HTTP11 = 2; | 89 static const int HTTP11 = 2; |
| 90 } | 90 } |
| 91 | 91 |
| 92 // States of the HTTP parser state machine. | 92 // States of the HTTP parser state machine. |
| 93 class _MessageType { | 93 class _MessageType { |
| 94 static const int UNDETERMINED = 0; | 94 static const int UNDETERMINED = 0; |
| 95 static const int REQUEST = 1; | 95 static const int REQUEST = 1; |
| 96 static const int RESPONSE = 0; | 96 static const int RESPONSE = 0; |
| 97 } | 97 } |
| 98 | 98 |
| 99 class _HttpDetachedIncoming extends Stream<List<int>> { | |
| 100 StreamController<List<int>> controller; | |
| 101 final StreamSubscription subscription; | |
| 102 | 99 |
|
Søren Gjesse
2014/02/04 10:17:24
Please add a short description of this subscriptio
Anders Johnsen
2014/02/04 10:24:51
Done.
| |
| 103 List<int> bufferedData; | 100 class _HttpDetachedStreamSubscription implements StreamSubscription<List<int>> { |
| 104 bool paused; | 101 StreamSubscription<List<int>> _subscription; |
|
Søren Gjesse
2014/02/04 10:17:24
Can we find a better name that _data? It is pretty
Anders Johnsen
2014/02/04 10:24:51
Done.
| |
| 102 List<int> _data; | |
| 103 bool _isCanceled = false; | |
| 104 int _pauseCount = 1; | |
| 105 Function _userOnData; | |
| 106 bool _scheduled = false; | |
| 105 | 107 |
| 106 Completer resumeCompleter; | 108 _HttpDetachedStreamSubscription(this._subscription, |
| 109 this._data, | |
| 110 this._userOnData); | |
| 107 | 111 |
| 108 _HttpDetachedIncoming(this.subscription, this.bufferedData) { | 112 bool get isPaused => _subscription.isPaused; |
| 109 controller = new StreamController<List<int>>( | 113 |
| 110 sync: true, | 114 Future asFuture([futureValue]) => _subscription.asFuture(futureValue); |
| 111 onListen: resume, | 115 |
| 112 onPause: pause, | 116 Future cancel() { |
| 113 onResume: resume, | 117 _isCanceled = true; |
| 114 onCancel: () => subscription.cancel()); | 118 _data = null; |
| 115 if (subscription == null) { | 119 return _subscription.cancel(); |
| 116 // Socket was already closed. | 120 } |
| 117 if (bufferedData != null) controller.add(bufferedData); | 121 |
| 118 controller.close(); | 122 void onData(void handleData(List<int> data)) { |
| 123 _userOnData = handleData; | |
| 124 _subscription.onData(handleData); | |
| 125 } | |
| 126 | |
| 127 void onDone(void handleDone()) { | |
| 128 _subscription.onDone(handleDone); | |
| 129 } | |
| 130 | |
| 131 void onError(Function handleError) { | |
| 132 _subscription.onError(handleError); | |
| 133 } | |
| 134 | |
| 135 void pause([Future resumeSignal]) { | |
| 136 if (_data == null) { | |
| 137 _subscription.pause(resumeSignal); | |
| 119 } else { | 138 } else { |
| 120 pause(); | 139 _pauseCount++; |
| 121 subscription | 140 if (resumeSignal != null) { |
| 122 ..resume() | 141 resumeSignal.whenComplete(resume); |
| 123 ..onData(controller.add) | 142 } |
| 124 ..onDone(controller.close) | |
| 125 ..onError(controller.addError); | |
| 126 } | 143 } |
| 127 } | 144 } |
| 128 | 145 |
| 146 void resume() { | |
| 147 if (_data == null) { | |
| 148 _subscription.resume(); | |
| 149 } else { | |
| 150 _pauseCount--; | |
| 151 _scheduleData(); | |
| 152 } | |
| 153 } | |
| 154 | |
| 155 void _scheduleData() { | |
|
Søren Gjesse
2014/02/04 10:17:24
This method does not necessarily schedule the data
Anders Johnsen
2014/02/04 10:24:51
Done.
| |
| 156 if (_scheduled) return; | |
| 157 if (_pauseCount != 0) return; | |
| 158 _scheduled = true; | |
| 159 scheduleMicrotask(() { | |
| 160 _scheduled = false; | |
| 161 if (_pauseCount > 0 || _isCanceled) return; | |
| 162 var data = _data; | |
| 163 _data = null; | |
|
Søren Gjesse
2014/02/04 10:17:24
Please add a comment here on why the underlying su
Anders Johnsen
2014/02/04 10:24:51
Done.
| |
| 164 _subscription.resume(); | |
| 165 if (_userOnData != null) { | |
| 166 _userOnData(data); | |
| 167 } | |
| 168 }); | |
| 169 } | |
| 170 } | |
| 171 | |
| 172 | |
| 173 class _HttpDetachedIncoming extends Stream<List<int>> { | |
| 174 final StreamSubscription subscription; | |
| 175 final List<int> bufferedData; | |
| 176 | |
| 177 _HttpDetachedIncoming(this.subscription, this.bufferedData); | |
| 178 | |
| 129 StreamSubscription<List<int>> listen(void onData(List<int> event), | 179 StreamSubscription<List<int>> listen(void onData(List<int> event), |
| 130 {Function onError, | 180 {Function onError, |
| 131 void onDone(), | 181 void onDone(), |
| 132 bool cancelOnError}) { | 182 bool cancelOnError}) { |
| 133 return controller.stream.listen( | 183 if (subscription != null) { |
| 134 onData, | 184 subscription |
| 135 onError: onError, | 185 ..onData(onData) |
| 136 onDone: onDone, | 186 ..onError(onError) |
| 137 cancelOnError: cancelOnError); | 187 ..onDone(onDone); |
| 138 } | 188 if (bufferedData == null) { |
| 139 | 189 return subscription..resume(); |
| 140 void resume() { | 190 } |
| 141 paused = false; | 191 return new _HttpDetachedStreamSubscription(subscription, |
| 142 if (bufferedData != null) { | 192 bufferedData, |
| 143 var data = bufferedData; | 193 onData) |
| 144 bufferedData = null; | 194 ..resume(); |
| 145 controller.add(data); | 195 } else { |
| 146 // If the consumer pauses again after the carry-over data, we'll not | 196 return new Stream.fromIterable(bufferedData) |
| 147 // continue our subscriber until the next resume. | 197 .listen(onData, |
| 148 if (paused) return; | 198 onError: onError, |
| 149 } | 199 onDone: onDone, |
| 150 if (resumeCompleter != null) { | 200 cancelOnError: cancelOnError); |
| 151 resumeCompleter.complete(); | |
| 152 resumeCompleter = null; | |
| 153 } | |
| 154 } | |
| 155 | |
| 156 void pause() { | |
| 157 paused = true; | |
| 158 if (resumeCompleter == null) { | |
| 159 resumeCompleter = new Completer(); | |
| 160 subscription.pause(resumeCompleter.future); | |
| 161 } | 201 } |
| 162 } | 202 } |
| 163 } | 203 } |
| 164 | 204 |
| 165 | 205 |
| 166 /** | 206 /** |
| 167 * HTTP parser which parses the data stream given to [consume]. | 207 * HTTP parser which parses the data stream given to [consume]. |
| 168 * | 208 * |
| 169 * If an HTTP parser error occours, the parser will signal an error to either | 209 * If an HTTP parser error occours, the parser will signal an error to either |
| 170 * the current _HttpIncoming or the _parser itself. | 210 * the current _HttpIncoming or the _parser itself. |
| (...skipping 814 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 985 } | 1025 } |
| 986 } | 1026 } |
| 987 | 1027 |
| 988 void _reportError(error, [stackTrace]) { | 1028 void _reportError(error, [stackTrace]) { |
| 989 if (_socketSubscription != null) _socketSubscription.cancel(); | 1029 if (_socketSubscription != null) _socketSubscription.cancel(); |
| 990 _state = _State.FAILURE; | 1030 _state = _State.FAILURE; |
| 991 _controller.addError(error, stackTrace); | 1031 _controller.addError(error, stackTrace); |
| 992 _controller.close(); | 1032 _controller.close(); |
| 993 } | 1033 } |
| 994 } | 1034 } |
| OLD | NEW |