OLD | NEW |
---|---|
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.io; | 5 part of dart.io; |
6 | 6 |
7 // Global constants. | 7 // Global constants. |
8 class _Const { | 8 class _Const { |
9 // Bytes for "HTTP". | 9 // Bytes for "HTTP". |
10 static const HTTP = const [72, 84, 84, 80]; | 10 static const HTTP = const [72, 84, 84, 80]; |
(...skipping 78 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
89 static const int HTTP11 = 2; | 89 static const int HTTP11 = 2; |
90 } | 90 } |
91 | 91 |
92 // States of the HTTP parser state machine. | 92 // States of the HTTP parser state machine. |
93 class _MessageType { | 93 class _MessageType { |
94 static const int UNDETERMINED = 0; | 94 static const int UNDETERMINED = 0; |
95 static const int REQUEST = 1; | 95 static const int REQUEST = 1; |
96 static const int RESPONSE = 0; | 96 static const int RESPONSE = 0; |
97 } | 97 } |
98 | 98 |
99 class _HttpDetachedIncoming extends Stream<List<int>> { | |
100 StreamController<List<int>> controller; | |
101 final StreamSubscription subscription; | |
102 | 99 |
Søren Gjesse
2014/02/04 10:17:24
Please add a short description of this subscriptio
Anders Johnsen
2014/02/04 10:24:51
Done.
| |
103 List<int> bufferedData; | 100 class _HttpDetachedStreamSubscription implements StreamSubscription<List<int>> { |
104 bool paused; | 101 StreamSubscription<List<int>> _subscription; |
Søren Gjesse
2014/02/04 10:17:24
Can we find a better name that _data? It is pretty
Anders Johnsen
2014/02/04 10:24:51
Done.
| |
102 List<int> _data; | |
103 bool _isCanceled = false; | |
104 int _pauseCount = 1; | |
105 Function _userOnData; | |
106 bool _scheduled = false; | |
105 | 107 |
106 Completer resumeCompleter; | 108 _HttpDetachedStreamSubscription(this._subscription, |
109 this._data, | |
110 this._userOnData); | |
107 | 111 |
108 _HttpDetachedIncoming(this.subscription, this.bufferedData) { | 112 bool get isPaused => _subscription.isPaused; |
109 controller = new StreamController<List<int>>( | 113 |
110 sync: true, | 114 Future asFuture([futureValue]) => _subscription.asFuture(futureValue); |
111 onListen: resume, | 115 |
112 onPause: pause, | 116 Future cancel() { |
113 onResume: resume, | 117 _isCanceled = true; |
114 onCancel: () => subscription.cancel()); | 118 _data = null; |
115 if (subscription == null) { | 119 return _subscription.cancel(); |
116 // Socket was already closed. | 120 } |
117 if (bufferedData != null) controller.add(bufferedData); | 121 |
118 controller.close(); | 122 void onData(void handleData(List<int> data)) { |
123 _userOnData = handleData; | |
124 _subscription.onData(handleData); | |
125 } | |
126 | |
127 void onDone(void handleDone()) { | |
128 _subscription.onDone(handleDone); | |
129 } | |
130 | |
131 void onError(Function handleError) { | |
132 _subscription.onError(handleError); | |
133 } | |
134 | |
135 void pause([Future resumeSignal]) { | |
136 if (_data == null) { | |
137 _subscription.pause(resumeSignal); | |
119 } else { | 138 } else { |
120 pause(); | 139 _pauseCount++; |
121 subscription | 140 if (resumeSignal != null) { |
122 ..resume() | 141 resumeSignal.whenComplete(resume); |
123 ..onData(controller.add) | 142 } |
124 ..onDone(controller.close) | |
125 ..onError(controller.addError); | |
126 } | 143 } |
127 } | 144 } |
128 | 145 |
146 void resume() { | |
147 if (_data == null) { | |
148 _subscription.resume(); | |
149 } else { | |
150 _pauseCount--; | |
151 _scheduleData(); | |
152 } | |
153 } | |
154 | |
155 void _scheduleData() { | |
Søren Gjesse
2014/02/04 10:17:24
This method does not necessarily schedule the data
Anders Johnsen
2014/02/04 10:24:51
Done.
| |
156 if (_scheduled) return; | |
157 if (_pauseCount != 0) return; | |
158 _scheduled = true; | |
159 scheduleMicrotask(() { | |
160 _scheduled = false; | |
161 if (_pauseCount > 0 || _isCanceled) return; | |
162 var data = _data; | |
163 _data = null; | |
Søren Gjesse
2014/02/04 10:17:24
Please add a comment here on why the underlying su
Anders Johnsen
2014/02/04 10:24:51
Done.
| |
164 _subscription.resume(); | |
165 if (_userOnData != null) { | |
166 _userOnData(data); | |
167 } | |
168 }); | |
169 } | |
170 } | |
171 | |
172 | |
173 class _HttpDetachedIncoming extends Stream<List<int>> { | |
174 final StreamSubscription subscription; | |
175 final List<int> bufferedData; | |
176 | |
177 _HttpDetachedIncoming(this.subscription, this.bufferedData); | |
178 | |
129 StreamSubscription<List<int>> listen(void onData(List<int> event), | 179 StreamSubscription<List<int>> listen(void onData(List<int> event), |
130 {Function onError, | 180 {Function onError, |
131 void onDone(), | 181 void onDone(), |
132 bool cancelOnError}) { | 182 bool cancelOnError}) { |
133 return controller.stream.listen( | 183 if (subscription != null) { |
134 onData, | 184 subscription |
135 onError: onError, | 185 ..onData(onData) |
136 onDone: onDone, | 186 ..onError(onError) |
137 cancelOnError: cancelOnError); | 187 ..onDone(onDone); |
138 } | 188 if (bufferedData == null) { |
139 | 189 return subscription..resume(); |
140 void resume() { | 190 } |
141 paused = false; | 191 return new _HttpDetachedStreamSubscription(subscription, |
142 if (bufferedData != null) { | 192 bufferedData, |
143 var data = bufferedData; | 193 onData) |
144 bufferedData = null; | 194 ..resume(); |
145 controller.add(data); | 195 } else { |
146 // If the consumer pauses again after the carry-over data, we'll not | 196 return new Stream.fromIterable(bufferedData) |
147 // continue our subscriber until the next resume. | 197 .listen(onData, |
148 if (paused) return; | 198 onError: onError, |
149 } | 199 onDone: onDone, |
150 if (resumeCompleter != null) { | 200 cancelOnError: cancelOnError); |
151 resumeCompleter.complete(); | |
152 resumeCompleter = null; | |
153 } | |
154 } | |
155 | |
156 void pause() { | |
157 paused = true; | |
158 if (resumeCompleter == null) { | |
159 resumeCompleter = new Completer(); | |
160 subscription.pause(resumeCompleter.future); | |
161 } | 201 } |
162 } | 202 } |
163 } | 203 } |
164 | 204 |
165 | 205 |
166 /** | 206 /** |
167 * HTTP parser which parses the data stream given to [consume]. | 207 * HTTP parser which parses the data stream given to [consume]. |
168 * | 208 * |
169 * If an HTTP parser error occours, the parser will signal an error to either | 209 * If an HTTP parser error occours, the parser will signal an error to either |
170 * the current _HttpIncoming or the _parser itself. | 210 * the current _HttpIncoming or the _parser itself. |
(...skipping 814 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
985 } | 1025 } |
986 } | 1026 } |
987 | 1027 |
988 void _reportError(error, [stackTrace]) { | 1028 void _reportError(error, [stackTrace]) { |
989 if (_socketSubscription != null) _socketSubscription.cancel(); | 1029 if (_socketSubscription != null) _socketSubscription.cancel(); |
990 _state = _State.FAILURE; | 1030 _state = _State.FAILURE; |
991 _controller.addError(error, stackTrace); | 1031 _controller.addError(error, stackTrace); |
992 _controller.close(); | 1032 _controller.close(); |
993 } | 1033 } |
994 } | 1034 } |
OLD | NEW |