Chromium Code Reviews| OLD | NEW | 
|---|---|
| (Empty) | |
| 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file | |
| 2 // for details. All rights reserved. Use of this source code is governed by a | |
| 3 // BSD-style license that can be found in the LICENSE file. | |
| 4 | |
| 5 library async.streams.stream_events; | |
| 6 | |
| 7 import 'dart:async'; | |
| 8 import 'dart:collection'; | |
| 9 | |
| 10 import "subscription_stream.dart"; | |
| 11 import "stream_completer.dart"; | |
| 12 | |
| 13 /// An asynchronous pull-based interface for accessing stream events. | |
| 14 /// | |
| 15 /// Wraps a stream and makes individual events available on request. | |
| 16 /// | |
| 17 /// The individual requests are served in the order they are requested. | |
| 
 
nweiz
2015/06/12 01:24:25
Explain how this is different from a [StreamIterat
 
Lasse Reichstein Nielsen
2015/06/17 11:08:29
Done.
 
 | |
| 18 /// | |
| 19 /// Example: | |
| 20 /// | |
| 21 /// var events = new StreamEvents<String>(someStreamOfLines); | |
| 22 /// var first = await events.next; | |
| 23 /// while (first.startsWith('#')) { | |
| 
 
Søren Gjesse
2015/06/11 07:57:06
Maybe add an isDone check in this sample.
 
Lasse Reichstein Nielsen
2015/06/12 13:04:22
Hmm. Not good for the flow, but I guess I can try
 
 | |
| 24 /// // Skip comments. | |
| 25 /// first = await events.next; | |
| 26 /// } | |
| 27 /// | |
| 28 /// if (first.startsWith(MAGIC_MARKER)) { | |
| 29 /// var headerCount = | |
| 30 /// first.parseInt(first.substring(MAGIC_MARKER.length + 1)); | |
| 31 /// handleMessage(headers: await events.take(headerCount), | |
| 32 /// body: events.rest); | |
| 33 /// return; | |
| 34 /// } | |
| 35 /// // Error handling. | |
| 36 /// | |
| 37 class StreamEvents<T> { | |
| 
 
nweiz
2015/06/12 01:24:26
Plural class names always seem weird to me. What d
 
Lasse Reichstein Nielsen
2015/06/12 13:04:22
It's not a stream, so "PullStream" won't work.
The
 
nweiz
2015/06/16 01:05:23
Bob suggested "StreamPump", using the water analog
 
Lasse Reichstein Nielsen
2015/06/16 13:05:45
I'm not a great fan of analogies in nameing - it o
 
nweiz
2015/06/16 22:34:11
All names in programming are analogies at some lev
 
Lasse Reichstein Nielsen
2015/06/17 11:08:29
I'm trying out StreamQueue now. It is slightly irk
 
 | |
| 38 /// In the initial state, the stream has not been listened to yet. | |
| 
 
nweiz
2015/06/12 01:24:25
Nit: The first paragraph of a doc comment should b
 
 | |
| 39 /// It will be listened to when the first event is requested. | |
| 40 /// The `stateData` field holds the stream and the request queue is empty. | |
| 41 static const int _INITIAL = 0; | |
| 
 
nweiz
2015/06/12 01:24:25
Nit: since these fields aren't right up next to on
 
Lasse Reichstein Nielsen
2015/06/12 13:04:22
In my editor it still works very well to show the
 
 | |
| 42 | |
| 43 /// Listening on the stream. | |
| 44 /// If the request queue is empty and the subscription isn't done, | |
| 45 /// the subscription is paused. | |
| 46 /// The `stateData` field holds the subscription. | |
| 47 static const int _LISTENING = 1; | |
| 48 | |
| 49 /// The stream has completed. | |
| 50 /// The `stateData` field is `null` and the request queue is empty. | |
| 51 static const int _DONE = 2; | |
| 52 | |
| 
 
Søren Gjesse
2015/06/11 07:57:06
Maybe put an additional comment that 4 is reserved
 
Lasse Reichstein Nielsen
2015/06/12 13:04:21
Will do.
 
 | |
| 53 /// Flag set when [close] is called. | |
| 54 /// The `StreamEvents` is closed and no further events can be requested. | |
| 55 /// The last request in the queue, if any, | |
| 56 /// is completed and is witing to clean up. | |
| 
 
nweiz
2015/06/12 01:24:24
"witing" -> "waiting"
 
Lasse Reichstein Nielsen
2015/06/12 13:04:21
Done.
 
 | |
| 57 static const int _CLOSED = 8; | |
| 58 | |
| 59 /// Flag set when [rest] is called. | |
| 60 /// Only used for error reporting, otherwise equivalent to [_CLOSED]. | |
| 61 static const int _CLOSED_REST = 12; | |
| 62 | |
| 63 /// Current state. | |
| 64 /// | |
| 65 /// Use getters below to check if the state is [_isListening] or [_isDone], | |
| 66 /// and whether the stream events object [_isClosed]. | |
| 67 int _state = _INITIAL; | |
| 68 | |
| 69 /// Value depending on state. Use getters below to get the value and assert | |
| 70 /// the expected state. | |
| 71 var _stateData; | |
| 72 | |
| 73 /// Queue of pending requests while state is [_LISTENING]. | |
| 74 /// Access through methods below to ensure consistency. | |
| 75 Queue<_EventRequest> _requestQueue = new Queue(); | |
| 
 
nweiz
2015/06/12 01:24:25
Make this final.
Personally, I prefer to omit the
 
Lasse Reichstein Nielsen
2015/06/12 13:04:21
Made final.
I prefer type annotations on all field
 
 | |
| 76 | |
| 77 StreamEvents(Stream source) : _stateData = source; | |
| 78 | |
| 79 bool get _isListening => (_state & _LISTENING) != 0; | |
| 80 bool get _isClosed => (_state & _CLOSED) != 0; | |
| 81 bool get _isDone => (_state & _DONE) != 0; | |
| 82 | |
| 83 /// Whether the underlying stream is spent. | |
| 
 
nweiz
2015/06/12 01:24:24
"spent" -> "done"
Just to use consistent terminol
 
Lasse Reichstein Nielsen
2015/06/12 13:04:22
Done.
 
 | |
| 84 /// | |
| 85 /// This may return true before all events have been delivered. | |
| 86 /// Requesting a new event when [isDone] returns true, | |
| 87 /// for example using [next], will always fail. | |
| 88 bool get isDone => _isDone; | |
| 
 
nweiz
2015/06/12 01:24:25
What's the use case for exposing this to the user?
 
Lasse Reichstein Nielsen
2015/06/12 13:04:23
It might be a little speculative because it doesn'
 
nweiz
2015/06/16 01:05:23
The version of this I wrote for ScheduledTest has
 
Lasse Reichstein Nielsen
2015/06/16 13:05:45
It doesn't solve the problem of what to do with th
 
nweiz
2015/06/16 22:34:11
It's unlikely that someone will call [hasNext] fol
 
 | |
| 89 | |
| 90 /// Return the stream subscription while state is listening. | |
| 
 
nweiz
2015/06/12 01:24:25
Nit: use present tense (e.g. "Returns" rather than
 
Lasse Reichstein Nielsen
2015/06/12 13:04:22
Done.
 
 | |
| 91 StreamSubscription get _subscription { | |
| 92 assert(_isListening); | |
| 93 return _stateData; | |
| 94 } | |
| 95 | |
| 96 /// Return the source stream while state is initial. | |
| 97 Stream get _sourceStream { | |
| 98 assert(!_isListening); | |
| 99 assert(!_isDone); | |
| 100 return _stateData; | |
| 101 } | |
| 102 | |
| 103 // Set the subscription and transition to listening state. | |
| 104 void set _subscription(StreamSubscription subscription) { | |
| 105 assert(!_isListening); | |
| 106 assert(!_isDone); | |
| 107 _stateData = subscription; | |
| 108 _state |= _LISTENING; | |
| 109 } | |
| 110 | |
| 111 void _setDone() { | |
| 112 assert(!_isDone); | |
| 113 _state = (_state & _CLOSED_REST) | _DONE; | |
| 114 _stateData = null; | |
| 115 } | |
| 116 | |
| 117 /// Request the next (yet unrequested) event from the stream. | |
| 
 
nweiz
2015/06/12 01:24:26
Clarify in this documentation that it's valid to h
 
Lasse Reichstein Nielsen
2015/06/12 13:04:21
Done.
 
 | |
| 118 /// | |
| 119 /// When the requested event arrives, the returned future is completed with | |
| 120 /// the event. This is independent of whether the event is a data event or | |
| 121 /// an error event. | |
| 
 
nweiz
2015/06/12 01:24:26
I'd write this as: "If the event is a value event,
 
Lasse Reichstein Nielsen
2015/06/12 13:04:22
Done.
 
 | |
| 122 /// | |
| 123 /// If the stream closed before an event arrives, the future is completed | |
| 
 
nweiz
2015/06/12 01:24:24
"closed" -> "closes", "is completed" -> "completes
 
Lasse Reichstein Nielsen
2015/06/12 13:04:22
Done.
 
 | |
| 124 /// with a [StateError]. | |
| 125 Future<T> get next { | |
| 126 if (!_isClosed) { | |
| 127 Completer completer = new Completer<T>(); | |
| 128 _addRequest(new _NextRequest(completer)); | |
| 129 return completer.future; | |
| 130 } | |
| 131 throw _failClosed(); | |
| 132 } | |
| 133 | |
| 134 /// Request a stream of all the remaning events of the source stream. | |
| 
 
nweiz
2015/06/12 01:24:25
"Request" -> "Returns"
 
Lasse Reichstein Nielsen
2015/06/12 13:04:21
Done.
 
 | |
| 135 /// | |
| 136 /// All requested [next], [skip] or [take] operations are completed | |
| 137 /// first, and then any remaining events are provided as events of | |
| 138 /// the returned stream. | |
| 139 /// | |
| 140 /// Using `rest` closes the stream events object. After getting the | |
| 141 /// `rest` it is no longer allowed to request other events, like | |
| 
 
nweiz
2015/06/12 01:24:25
"it is no longer allowed to" -> "the caller may no
 
Lasse Reichstein Nielsen
2015/06/12 13:04:22
Done.
 
 | |
| 142 /// after calling [close]. | |
| 143 Stream<T> get rest { | |
| 144 if (!_isClosed) { | |
| 
 
nweiz
2015/06/12 01:24:25
Nit: Short-circuit if it's closed to save on inden
 
Lasse Reichstein Nielsen
2015/06/12 13:04:22
ACK.
This is not going to be in any inner loops, s
 
 | |
| 145 _state |= _CLOSED_REST; | |
| 146 if (_isListening) { | |
| 147 // We have an active subscription that we want to take over. | |
| 148 var delayStream = new StreamCompleter<T>(); | |
| 149 _addRequest(new _RestRequest<T>(delayStream, this)); | |
| 150 return delayStream.stream; | |
| 151 } | |
| 152 assert(_requestQueue.isEmpty); | |
| 153 if (isDone) { | |
| 154 // TODO(lrn): Add Stream.empty() constructor. | |
| 155 return new Stream<T>.fromIterable(const []); | |
| 156 } | |
| 157 // We have never listened the source stream, so just return that directly. | |
| 
 
nweiz
2015/06/12 01:24:26
"listened to"
 
Lasse Reichstein Nielsen
2015/06/12 13:04:22
Done.
 
 | |
| 158 Stream result = _sourceStream; | |
| 159 _setDone(); | |
| 160 return result; | |
| 161 } | |
| 162 throw _failClosed(); | |
| 163 } | |
| 164 | |
| 165 /// Requests to skip the next [count] *data* events. | |
| 
 
nweiz
2015/06/12 01:24:24
"Requests to skip" -> "Skips"
 
 | |
| 166 /// | |
| 167 /// The [count] value must be non-negative. | |
| 168 /// | |
| 169 /// When successful, this is equivalent to using [take] | |
| 170 /// and ignoring the result. | |
| 171 /// | |
| 172 /// If an error occurs before `count` data events has | |
| 
 
nweiz
2015/06/12 01:24:25
"has" -> "have"
 
 | |
| 173 /// been skipped, the returned future completes with | |
| 174 /// that error instead. | |
| 175 /// | |
| 176 /// If the stream closes before `count` data events, | |
| 177 /// the remaining unskipped event count is returned. | |
| 178 /// If the returned future completes with the integer `0`, | |
| 179 /// then all events were succssfully skipped. If the value | |
| 180 /// is greater than zero then the stream ended early. | |
| 181 Future<int> skip(int count) { | |
| 182 if (count < 0) throw new RangeError.range(count, 0, null, "count"); | |
| 183 if (!_isClosed) { | |
| 184 Completer completer = new Completer<int>(); | |
| 185 _addRequest(new _SkipRequest(completer, count)); | |
| 186 return completer.future; | |
| 187 } | |
| 188 throw _failClosed(); | |
| 189 } | |
| 190 | |
| 191 /// Requests the next [count] data events as a list. | |
| 192 /// | |
| 193 /// The [count] value must be non-negative. | |
| 
 
nweiz
2015/06/12 01:24:25
"The [count] value" -> "[count]"
 
Lasse Reichstein Nielsen
2015/06/12 13:04:21
-> "The [count]".
Otherwise it would start the sen
 
 | |
| 194 /// | |
| 195 /// Equivalent to calling [next] `count` times and | |
| 196 /// storing the data values in a list. | |
| 197 /// | |
| 198 /// If an error occurs before `count` data events has | |
| 199 /// been collected, the returned future completes with | |
| 200 /// that error instead. | |
| 201 /// | |
| 202 /// If the stream closes before `count` data events, | |
| 203 /// the returned future completes with the list | |
| 204 /// of data collected so far. That is, the returned | |
| 205 /// list may have fewer than [count] elements. | |
| 206 Future<List<T>> take(int count) { | |
| 207 if (count < 0) throw new RangeError.range(count, 0, null, "count"); | |
| 208 if (!_isClosed) { | |
| 209 Completer completer = new Completer<List<T>>(); | |
| 210 _addRequest(new _TakeRequest(completer, count)); | |
| 211 return completer.future; | |
| 212 } | |
| 213 throw _failClosed(); | |
| 214 } | |
| 215 | |
| 216 /// Release the underlying stream subscription. | |
| 
 
nweiz
2015/06/12 01:24:26
"Release" -> "Cancels"
 
Lasse Reichstein Nielsen
2015/06/12 13:04:22
Done.
 
 | |
| 217 /// | |
| 218 /// The close operation waits until all previously requested | |
| 219 /// events have been processed, then it cancels the subscription | |
| 220 /// providing the events. | |
| 221 /// | |
| 222 /// The returned future completes with the result of calling | |
| 223 /// `cancel`. | |
| 224 /// | |
| 225 /// After calling `close`, no further events can be requested. | |
| 226 /// None of [next], [rest], [skip], [take] or [close] may be | |
| 227 /// called again. | |
| 228 Future close() { | |
| 
 
nweiz
2015/06/12 01:24:24
Consider calling this [cancel], to match [StreamSu
 
Lasse Reichstein Nielsen
2015/06/12 13:04:22
Good idea!
 
 | |
| 229 if (!_isClosed) { | |
| 230 _state |= _CLOSED; | |
| 231 if (!_isListening) { | |
| 232 assert(_requestQueue.isEmpty); | |
| 
 
nweiz
2015/06/12 01:24:24
Add a comment explaining why the request queue wil
 
Lasse Reichstein Nielsen
2015/06/12 13:04:22
Done.
 
 | |
| 233 if (!_isDone) _setDone(); | |
| 234 return new Future.value(); | |
| 235 } | |
| 236 Completer completer = new Completer(); | |
| 237 _addRequest(new _CloseRequest(completer, this)); | |
| 238 return completer.future; | |
| 239 } | |
| 240 throw _failClosed(); | |
| 
 
nweiz
2015/06/12 01:24:26
Everything in the core libraries that's closable a
 
Lasse Reichstein Nielsen
2015/06/12 13:04:22
True.
One reason why I didn't allow that was that
 
 | |
| 241 } | |
| 242 | |
| 243 /// Reused error message. | |
| 
 
nweiz
2015/06/12 01:24:24
Expand on this.
 
Lasse Reichstein Nielsen
2015/06/12 13:04:21
Done.
 
 | |
| 244 Error _failClosed() { | |
| 245 String cause = | |
| 246 ((_state & _CLOSED_REST) == _CLOSED_REST) ? "rest" : "close"; | |
| 247 return new StateError("Already closed by a call to $cause"); | |
| 248 } | |
| 249 | |
| 250 /// Called when requesting an event when the requeust queue is empty. | |
| 251 /// The underlying subscription is paused in that case, except the very | |
| 252 /// first time when the subscription needs to be created. | |
| 253 void _listen() { | |
| 
 
nweiz
2015/06/12 01:24:24
Methods like this and [_pause] that are only calle
 
Lasse Reichstein Nielsen
2015/06/12 13:04:23
Done.
 
 | |
| 254 if (_isListening) { | |
| 255 _subscription.resume(); | |
| 256 } else if (!_isDone) { | |
| 257 _subscription = | |
| 258 _sourceStream.listen(_onData, onError: _onError, onDone: _onDone); | |
| 259 } | |
| 260 } | |
| 261 | |
| 262 /// Pauses the underlying subscription. | |
| 263 /// Called when the request queue is empty and the subscription isn't closed. | |
| 264 void _pause() { | |
| 265 assert(_isListening); | |
| 266 _subscription.pause(); | |
| 267 } | |
| 268 | |
| 269 // Callbacks receiving the events of the source stream. | |
| 
 
nweiz
2015/06/12 01:24:25
Nit: If this is referring to a group of methods, a
 
Lasse Reichstein Nielsen
2015/06/12 13:04:21
Done.
 
 | |
| 270 void _onData(T data) { | |
| 271 assert(_requestQueue.isNotEmpty); | |
| 272 _EventRequest action = _nextAction; | |
| 273 action.add(data); | |
| 274 _checkCompleted(); | |
| 275 } | |
| 276 | |
| 277 void _onError(error, StackTrace stack) { | |
| 278 assert(_requestQueue.isNotEmpty); | |
| 279 _EventRequest action = _nextAction; | |
| 280 action.addError(error, stack); | |
| 281 _checkCompleted(); | |
| 282 } | |
| 283 | |
| 284 void _onDone() { | |
| 285 _setDone(); | |
| 286 _closeAllRequests(); | |
| 287 } | |
| 288 | |
| 289 // Request queue management. | |
| 290 | |
| 291 /// Get the next action in the queue, but don't remove it. | |
| 292 _EventRequest get _nextAction { | |
| 293 assert(_requestQueue.isNotEmpty); | |
| 
 
nweiz
2015/06/12 01:24:24
This assertion seems redundant, since the line bel
 
Lasse Reichstein Nielsen
2015/06/12 13:04:22
True.
 
 | |
| 294 return _requestQueue.first; | |
| 295 } | |
| 296 | |
| 297 /// Add a new request to the queue. | |
| 298 void _addRequest(_EventRequest action) { | |
| 299 if (_isDone) { | |
| 300 action.close(); | |
| 301 return; | |
| 302 } | |
| 303 if (_requestQueue.isEmpty) { | |
| 304 if (action.isComplete) { | |
| 
 
nweiz
2015/06/12 01:24:24
Explain under what circumstances an action that wa
 
Lasse Reichstein Nielsen
2015/06/12 13:04:22
Done.
 
 | |
| 305 // Skip listening and complete this immediately. | |
| 306 action.close(); | |
| 307 return; | |
| 308 } | |
| 309 _listen(); | |
| 310 } | |
| 311 _requestQueue.add(action); | |
| 312 } | |
| 313 | |
| 314 /// Remove all requests and call their `done` event. | |
| 315 /// | |
| 316 /// Used when the source stream dries up. | |
| 317 void _closeAllRequests() { | |
| 318 assert(_isDone); | |
| 319 while (_requestQueue.isNotEmpty) { | |
| 320 _requestQueue.removeFirst().close(); | |
| 321 } | |
| 322 } | |
| 323 | |
| 324 /// Check whether the next actions in the queue are complete. | |
| 325 /// | |
| 326 /// If so, remove them and call their `complete` method. | |
| 327 void _checkCompleted() { | |
| 328 // Close-actions are executed immediately when they become the | |
| 329 // next (and last) event in the queue. | |
| 330 // When _isClosed and the queue is not empty, the last element | |
| 331 // of the queue is the close action. | |
| 332 while (_requestQueue.isNotEmpty) { | |
| 333 if (!_requestQueue.first.isComplete) { | |
| 334 return; | |
| 335 } | |
| 336 _requestQueue.removeFirst().close(); | |
| 337 } | |
| 338 assert(_requestQueue.isEmpty); | |
| 339 if (!_isDone) _pause(); | |
| 340 } | |
| 341 | |
| 342 /// Extracts the subscription and makes the events object unusable. | |
| 343 /// | |
| 344 /// Can only be used by the very last request. | |
| 345 StreamSubscription _dispose() { | |
| 346 assert(_isClosed); | |
| 347 assert(_isListening); | |
| 348 assert(_requestQueue.isEmpty); | |
| 349 StreamSubscription subscription = _subscription; | |
| 350 _setDone(); | |
| 351 return subscription; | |
| 352 } | |
| 353 } | |
| 354 | |
| 
 
nweiz
2015/06/12 01:24:25
Nit: extra newline.
 
 | |
| 355 | |
| 356 /// Action to take when a requested event arrives. | |
| 357 abstract class _EventRequest implements EventSink { | |
| 
 
nweiz
2015/06/12 01:24:25
It's kind of weird that the documentation for thes
 
Lasse Reichstein Nielsen
2015/06/12 13:04:22
Agree, I switched to "request" halfway through, an
 
 | |
| 358 bool get isComplete; | |
| 
 
Søren Gjesse
2015/06/11 07:57:06
Add close method here as well
I think the 'close'
 
nweiz
2015/06/12 01:24:25
This should also have [add] and [addError] abstrac
 
Lasse Reichstein Nielsen
2015/06/12 13:04:22
It's inherited from EventSink.
I picked EventSink
 
 | |
| 359 } | |
| 360 | |
| 361 /// Action completing a [StreamEvents.next] request. | |
| 362 class _NextRequest implements _EventRequest { | |
| 363 Completer completer; | |
| 
 
nweiz
2015/06/12 01:24:25
Even though it doesn't do anything at the language
 
 | |
| 364 _NextRequest(this.completer); | |
| 
 
nweiz
2015/06/12 01:24:24
Rather than passing in a completer, consider makin
 
Lasse Reichstein Nielsen
2015/06/12 13:04:23
Good idea. Will do.
The type parameter needs to go
 
 | |
| 365 | |
| 366 void add(data) { | |
| 367 completer.complete(data); | |
| 368 completer = null; | |
| 
 
nweiz
2015/06/12 01:24:24
Does nulling this out buy you anything? It seems l
 
Lasse Reichstein Nielsen
2015/06/12 13:04:23
We have an isCompleted getter? Whoa!
I'm too used
 
 | |
| 369 } | |
| 370 | |
| 371 void addError(error, [StackTrace stack]) { | |
| 372 completer.completeError(error, stack); | |
| 373 completer = null; | |
| 374 } | |
| 375 | |
| 376 void close() { | |
| 377 if (!isComplete) { | |
| 378 completer.completeError(new StateError("no elements")); | |
| 
 
nweiz
2015/06/12 01:24:25
If you do continue to check against null, null out
 
Lasse Reichstein Nielsen
2015/06/12 13:04:22
Technically not necessary since "close" is always
 
 | |
| 379 } | |
| 380 } | |
| 381 | |
| 382 bool get isComplete => completer == null; | |
| 383 } | |
| 384 | |
| 385 /// Action completing a [StreamEvents.skip] request. | |
| 386 class _SkipRequest implements _EventRequest { | |
| 387 final Completer completer; | |
| 388 int count; | |
| 
 
nweiz
2015/06/12 01:24:24
Document this. Also consider calling it something
 
Lasse Reichstein Nielsen
2015/06/15 15:46:23
Done.
 
 | |
| 389 _SkipRequest(this.completer, this.count); | |
| 390 | |
| 391 void add(data) { | |
| 392 count--; | |
| 393 } | |
| 394 | |
| 395 void addError(error, [StackTrace stack]) { | |
| 396 completer.completeError(error, stack); | |
| 397 count = 0; | |
| 398 } | |
| 399 | |
| 400 void close() { | |
| 401 if (!completer.isCompleted) { | |
| 402 completer.complete(count); | |
| 403 count = 0; | |
| 404 } | |
| 405 } | |
| 406 | |
| 407 bool get isComplete { | |
| 408 return count == 0; | |
| 409 } | |
| 410 } | |
| 411 | |
| 412 /// Action completing a [StreamEvents.take] request. | |
| 413 class _TakeRequest<T> implements _EventRequest { | |
| 414 final Completer completer; | |
| 415 final List list = <T>[]; | |
| 416 int count; | |
| 417 _TakeRequest(this.completer, this.count); | |
| 418 | |
| 419 void add(data) { | |
| 420 list.add(data); | |
| 421 count--; | |
| 422 } | |
| 423 | |
| 424 void addError(error, [StackTrace stack]) { | |
| 425 completer.completeError(error, stack); | |
| 426 count = 0; | |
| 427 } | |
| 428 | |
| 429 void close() { | |
| 430 if (!completer.isCompleted) { | |
| 431 completer.complete(list); | |
| 432 } | |
| 433 } | |
| 434 | |
| 435 bool get isComplete => count == 0; | |
| 436 } | |
| 437 | |
| 438 /// Action completing a [StreamEvents.close] request. | |
| 439 class _CloseRequest implements _EventRequest { | |
| 440 final Completer completer; | |
| 441 StreamEvents events; | |
| 442 | |
| 443 _CloseRequest(this.completer, this.events); | |
| 444 | |
| 445 void add(data) { | |
| 446 throw new UnsupportedError("event"); | |
| 447 } | |
| 448 | |
| 449 void addError(error, [StackTrace stack]) { | |
| 450 throw new UnsupportedError("event"); | |
| 
 
nweiz
2015/06/12 01:24:24
Shouldn't this be "error"?
 
Lasse Reichstein Nielsen
2015/06/12 13:04:22
No, we don't support receiving events, that's the
 
 | |
| 451 } | |
| 452 | |
| 453 void close() { | |
| 454 if (events._isListening) { | |
| 455 completer.complete(events._dispose().cancel()); | |
| 456 } else { | |
| 457 completer.complete(); | |
| 458 } | |
| 459 } | |
| 460 | |
| 461 bool get isComplete => true; | |
| 462 } | |
| 463 | |
| 464 /// Action completing a [StreamEvents.rest] request. | |
| 465 class _RestRequest<T> implements _EventRequest { | |
| 466 final StreamCompleter delayStream; | |
| 467 final StreamEvents events; | |
| 468 _RestRequest(this.delayStream, this.events); | |
| 469 | |
| 470 void add(data) { | |
| 471 throw new UnsupportedError("event"); | |
| 472 } | |
| 473 | |
| 474 void addError(error, [StackTrace stack]) { | |
| 475 throw new UnsupportedError("event"); | |
| 
 
nweiz
2015/06/12 01:24:26
Ditto.
 
Lasse Reichstein Nielsen
2015/06/15 15:46:23
Done.
 
 | |
| 476 } | |
| 477 | |
| 478 void close() { | |
| 479 if (events._isListening) { | |
| 480 StreamSubscription subscription = events._dispose(); | |
| 481 delayStream.setSourceStream(new SubscriptionStream<T>(subscription)); | |
| 482 if (subscription.isPaused) subscription.resume(); | |
| 483 } else { | |
| 484 assert(events._isDone); | |
| 485 delayStream.setEmpty(); | |
| 486 } | |
| 487 } | |
| 488 | |
| 489 bool get isComplete => true; | |
| 490 } | |
| OLD | NEW |