Chromium Code Reviews| OLD | NEW |
|---|---|
| (Empty) | |
| 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file | |
| 2 // for details. All rights reserved. Use of this source code is governed by a | |
| 3 // BSD-style license that can be found in the LICENSE file. | |
| 4 | |
| 5 library async.streams.stream_events; | |
| 6 | |
| 7 import 'dart:async'; | |
| 8 import 'dart:collection'; | |
| 9 | |
| 10 import "subscription_stream.dart"; | |
| 11 import "stream_completer.dart"; | |
| 12 | |
| 13 /// An asynchronous pull-based interface for accessing stream events. | |
| 14 /// | |
| 15 /// Wraps a stream and makes individual events available on request. | |
| 16 class StreamEvents<T> { | |
|
ahe
2015/06/08 16:55:16
It took me quite some time to realize this is the
Lasse Reichstein Nielsen
2015/06/09 07:33:53
This is a little hard to name well.
PullStream wo
| |
| 17 /// In the initial state, the stream has not been listened to yet. | |
| 18 /// It will be listened to when the first event is requested. | |
| 19 /// The `stateData` field holds the stream and the request queue is empty. | |
| 20 static const int _INITIAL = 0; | |
| 21 | |
| 22 /// Listening on the stream. | |
| 23 /// If the request queue is empty and the subscription isn't done, | |
| 24 /// the subscription is paused. | |
| 25 /// The `stateData` field holds the subscription. | |
| 26 static const int _LISTENING = 1; | |
| 27 | |
| 28 /// The stream has completed. | |
| 29 /// The `stateData` field is `null` and the request queue is empty. | |
| 30 static const int _DONE = 2; | |
| 31 | |
| 32 /// Flag set when [close] is called. | |
| 33 /// The `StreamEvents` is closed and no further events can be requested. | |
| 34 /// While set, the last elmement of the request queue is an | |
| 35 /// [_EventCloseAction]. | |
| 36 static const int _CLOSED = 8; | |
| 37 | |
| 38 /// Flag set when [rest] is called. | |
| 39 /// Only used for error reporting, otherwise equivalent to [_CLOSED]. | |
| 40 static const int _CLOSED_REST = 12; | |
| 41 | |
| 42 /// Current state. | |
| 43 /// | |
| 44 /// Use getters below to check if the state is [_isListening] or [_isDone], | |
| 45 /// and whether the stream events object [_isClosed]. | |
| 46 int _state = _INITIAL; | |
| 47 | |
| 48 /// Value depending on state. Use getters below to get the value and assert | |
| 49 /// the expected state. | |
| 50 var _stateData; | |
| 51 | |
| 52 /// Queue of pending requests while state is [_LISTENING]. | |
| 53 /// Access through methods below to ensure consistency. | |
| 54 Queue<_EventAction> _requestQueue = new Queue(); | |
| 55 | |
| 56 StreamEvents(Stream source) : _stateData = source; | |
| 57 | |
| 58 bool get _isListening => (_state & _LISTENING) != 0; | |
| 59 bool get _isClosed => (_state & _CLOSED) != 0; | |
| 60 bool get _isDone => (_state & _DONE) != 0; | |
| 61 | |
| 62 /// Whether the underlying stream is spent. | |
| 63 /// | |
| 64 /// This may return true before all events have been delivered. | |
| 65 /// Requesting a new event when [isDone] returns true, | |
| 66 /// for example using [next], will always fail. | |
| 67 bool get isDone => _isDone; | |
| 68 | |
| 69 /// Return the stream subscription while state is listening. | |
| 70 StreamSubscription get _subscription { | |
| 71 assert(_isListening); | |
| 72 return _stateData; | |
| 73 } | |
| 74 | |
| 75 /// Return the source stream while state is initial. | |
| 76 Stream get _sourceStream { | |
| 77 assert(!_isListening); | |
| 78 assert(!_isDone); | |
| 79 return _stateData; | |
| 80 } | |
| 81 | |
| 82 // Set the subscription and transition to listening state. | |
| 83 void set _subscription(StreamSubscription subscription) { | |
| 84 assert(!_isListening); | |
| 85 assert(!_isDone); | |
| 86 _stateData = subscription; | |
| 87 _state |= _LISTENING; | |
| 88 } | |
| 89 | |
| 90 void _setDone() { | |
| 91 assert(!_isDone); | |
| 92 _state = (_state & _CLOSED_REST) | _DONE; | |
| 93 _stateData = null; | |
| 94 } | |
| 95 | |
| 96 /// Request the next (yet unrequested) event from the stream. | |
| 97 /// | |
| 98 /// When the requested event arrives, the returned future is completed with | |
| 99 /// the event. This is independent of whether the event is a data event or | |
| 100 /// an error event. | |
| 101 /// | |
| 102 /// If the stream closed before an event arrives, the future is completed | |
| 103 /// with a [StateError]. | |
| 104 Future<T> get next { | |
| 105 if (!_isClosed) { | |
| 106 Completer completer = new Completer<T>(); | |
| 107 _addAction(new _NextAction(completer)); | |
| 108 return completer.future; | |
| 109 } | |
| 110 throw _failClosed(); | |
| 111 } | |
| 112 | |
| 113 /// Request a stream of all the remaning events of the source stream. | |
| 114 /// | |
| 115 /// All requested [next], [skip] or [take] operations are completed | |
| 116 /// first, and then any remaining events are provided as events of | |
| 117 /// the returned stream. | |
| 118 /// | |
| 119 /// Using `rest` closes the stream events object. After getting the | |
| 120 /// `rest` it is no longer allowed to request other events, like | |
| 121 /// after calling [close]. | |
| 122 Stream<T> get rest { | |
| 123 if (!_isClosed) { | |
| 124 _state |= _CLOSED_REST; | |
| 125 if (_isListening) { | |
| 126 // We have an active subscription that we want to take over. | |
| 127 var delayStream = new StreamCompleter<T>(); | |
| 128 _addAction(new _RestAction<T>(delayStream)); | |
| 129 return delayStream.stream; | |
| 130 } | |
| 131 assert(_requestQueue.isEmpty); | |
| 132 if (isDone) { | |
| 133 // TODO(lrn): Add Stream.empty() constructor. | |
| 134 return new Stream<T>.fromIterable(const []); | |
| 135 } | |
| 136 // We have never listened the source stream, so just return that directly. | |
| 137 Stream result = _sourceStream; | |
| 138 _setDone(); | |
| 139 return result; | |
| 140 } | |
| 141 throw _failClosed(); | |
| 142 } | |
| 143 | |
| 144 /// Requests to skip the next [count] *data* events. | |
| 145 /// | |
| 146 /// The [count] value must be greater than zero. | |
| 147 /// | |
| 148 /// When successful, this is equivalent to using [take] | |
| 149 /// and ignoring the result. | |
| 150 /// | |
| 151 /// If an error occurs before `count` data events has | |
| 152 /// been skipped, the returned future completes with | |
| 153 /// that error instead. | |
| 154 /// | |
| 155 /// If the stream closes before `count` data events, | |
| 156 /// the remaining unskipped event count is returned. | |
| 157 /// If the returned future completes with the integer `0`, | |
| 158 /// then all events were succssfully skipped. If the value | |
| 159 /// is greater than zero then the stream ended early. | |
| 160 Future<int> skip(int count) { | |
| 161 if (count <= 0) throw new RangeError.range(count, 0, null, "count"); | |
| 162 if (!_isClosed) { | |
| 163 Completer completer = new Completer<int>(); | |
| 164 _addAction(new _SkipAction(completer, count)); | |
| 165 return completer.future; | |
| 166 } | |
| 167 throw _failClosed(); | |
| 168 } | |
| 169 | |
| 170 /// Requests the next [count] data events as a list. | |
| 171 /// | |
| 172 /// The [count] value must be greater than zero. | |
| 173 /// | |
| 174 /// Equivalent to calling [next] `count` times and | |
| 175 /// storing the data values in a list. | |
| 176 /// | |
| 177 /// If an error occurs before `count` data events has | |
| 178 /// been collected, the returned future completes with | |
| 179 /// that error instead. | |
| 180 /// | |
| 181 /// If the stream closes before `count` data events, | |
| 182 /// the returned future completes with the list | |
| 183 /// of data collected so far. That is, the returned | |
| 184 /// list may have fewer than [count] elements. | |
| 185 Future<List<T>> take(int count) { | |
| 186 if (count <= 0) throw new RangeError.range(count, 0, null, "count"); | |
| 187 if (!_isClosed) { | |
| 188 Completer completer = new Completer<List<T>>(); | |
| 189 _addAction(new _TakeAction(completer, count)); | |
| 190 return completer.future; | |
| 191 } | |
| 192 throw _failClosed(); | |
| 193 } | |
| 194 | |
| 195 /// Release the underlying stream subscription. | |
| 196 /// | |
| 197 /// The close operation waits until all previously requested | |
| 198 /// events have been processed, then it cancels the subscription | |
| 199 /// providing the events. | |
| 200 /// | |
| 201 /// The returned future completes with the result of calling | |
| 202 /// `cancel`. | |
| 203 /// | |
| 204 /// After calling `close`, no further events can be requested. | |
| 205 /// None of [next], [rest], [skip], [take] or [close] may be | |
| 206 /// called again. | |
| 207 Future close() { | |
| 208 if (!_isClosed) { | |
| 209 _state |= _CLOSED; | |
| 210 if (!_isListening) { | |
| 211 assert(_requestQueue.isEmpty); | |
| 212 if (!_isDone) _setDone(); | |
| 213 return new Future.value(); | |
| 214 } | |
| 215 Completer completer = new Completer(); | |
| 216 _addAction(new _CloseAction(completer)); | |
| 217 return completer.future; | |
| 218 } | |
| 219 throw _failClosed(); | |
| 220 } | |
| 221 | |
| 222 /// Reused error message. | |
| 223 Error _failClosed() { | |
| 224 String cause = | |
| 225 ((_state & _CLOSED_REST) == _CLOSED_REST) ? "rest" : "close"; | |
| 226 return new StateError("Already closed by a call to $cause"); | |
| 227 } | |
| 228 | |
| 229 /// Called when requesting an event when the requeust queue is empty. | |
| 230 /// The underlying subscription is paused in that case, except the very | |
| 231 /// first time when the subscription needs to be created. | |
| 232 void _listen() { | |
| 233 if (_isListening) { | |
| 234 _subscription.resume(); | |
| 235 } else if (!_isDone) { | |
| 236 _subscription = | |
| 237 _sourceStream.listen(_onData, onError: _onError, onDone: _onDone); | |
| 238 } | |
| 239 } | |
| 240 | |
| 241 /// Pauses the underlying subscription. | |
| 242 /// Called when the request queue is empty and the subscription isn't closed. | |
| 243 void _pause() { | |
| 244 assert(_isListening); | |
| 245 _subscription.pause(); | |
| 246 } | |
| 247 | |
| 248 // Callbacks receiving the events of the source stream. | |
| 249 void _onData(T data) { | |
| 250 assert(_requestQueue.isNotEmpty); | |
| 251 _EventAction action = _nextAction; | |
| 252 if (action.data(data)) { | |
| 253 _completeAction(); | |
| 254 } | |
| 255 } | |
| 256 | |
| 257 void _onError(error, StackTrace stack) { | |
| 258 assert(_requestQueue.isNotEmpty); | |
| 259 _EventAction action = _nextAction; | |
| 260 action.error(error, stack); | |
| 261 _completeAction(); | |
| 262 } | |
| 263 | |
| 264 void _onDone() { | |
| 265 _setDone(); | |
| 266 _flushQueue(); | |
| 267 } | |
| 268 | |
| 269 // Request queue management. | |
| 270 | |
| 271 /// Get the next action in the queue, but don't remove it. | |
| 272 _EventAction get _nextAction { | |
| 273 assert(_requestQueue.isNotEmpty); | |
| 274 return _requestQueue.first; | |
| 275 } | |
| 276 | |
| 277 /// Add a new request to the queue. | |
| 278 void _addAction(_EventAction action) { | |
| 279 if (_isDone) { | |
| 280 action.done(); | |
| 281 return; | |
| 282 } | |
| 283 if (_requestQueue.isEmpty) { | |
| 284 _listen(); | |
| 285 } | |
| 286 _requestQueue.add(action); | |
| 287 _checkClose(); | |
| 288 } | |
| 289 | |
| 290 /// Remove all requests and call their `done` event. | |
| 291 /// | |
| 292 /// Used when the source stream dries up. | |
| 293 void _flushQueue() { | |
| 294 while (_requestQueue.isNotEmpty) { | |
| 295 _requestQueue.removeFirst().done(); | |
| 296 } | |
| 297 } | |
| 298 | |
| 299 /// Remove a completed action from the queue. | |
| 300 /// | |
| 301 /// An action is complete when its `data` or `error` handler | |
| 302 /// returns true. For actions that use multiple | |
| 303 void _completeAction() { | |
| 304 _requestQueue.removeFirst(); | |
| 305 if (_requestQueue.isEmpty) { | |
| 306 _pause(); | |
| 307 } else { | |
| 308 _checkClose(); | |
| 309 } | |
| 310 } | |
| 311 | |
| 312 /// Check whether the only remaining action in the queue is a close action. | |
| 313 /// | |
| 314 /// If so, pass the subscription to the action and let it take over. | |
| 315 void _checkClose() { | |
| 316 // Close-actions are executed immediately when they become the | |
| 317 // next (and last) event in the queue. | |
| 318 // When _isClosed and the queue is not empty, the last element | |
| 319 // of the queue is the close action. | |
| 320 if (_isClosed && _requestQueue.length == 1) { | |
| 321 _EventCloseAction action = _requestQueue.removeFirst(); | |
| 322 StreamSubscription subscription = _subscription; | |
| 323 _setDone(); | |
| 324 action.close(subscription); | |
| 325 } | |
| 326 } | |
| 327 } | |
| 328 | |
| 329 | |
| 330 /// Action to take when a requested event arrives. | |
| 331 abstract class _EventAction { | |
| 332 bool data(data); | |
| 333 void error(error, StackTrace stack); | |
| 334 void done(); | |
| 335 } | |
| 336 | |
| 337 /// Action to take when closing the [StreamEvents] class. | |
| 338 abstract class _EventCloseAction implements _EventAction { | |
| 339 void close(StreamSubscription subscription); | |
| 340 } | |
| 341 | |
| 342 | |
| 343 /// Action completing a [StreamEvents.next] request. | |
| 344 class _NextAction implements _EventAction { | |
| 345 Completer completer; | |
| 346 _NextAction(this.completer); | |
| 347 | |
| 348 bool data(data) { | |
| 349 completer.complete(data); | |
| 350 return true; | |
| 351 } | |
| 352 | |
| 353 void error(error, StackTrace stack) { | |
| 354 completer.completeError(error, stack); | |
| 355 } | |
| 356 | |
| 357 void done() { | |
| 358 completer.completeError(new StateError("no elements")); | |
| 359 } | |
| 360 } | |
| 361 | |
| 362 /// Action completing a [StreamEvents.skip] request. | |
| 363 class _SkipAction implements _EventAction { | |
| 364 Completer completer; | |
| 365 int count; | |
| 366 _SkipAction(this.completer, this.count); | |
| 367 | |
| 368 bool data(data) { | |
| 369 count--; | |
| 370 if (count > 0) return false; | |
| 371 completer.complete(count); | |
| 372 return true; | |
| 373 } | |
| 374 | |
| 375 void error(error, StackTrace stack) { | |
| 376 completer.completeError(error, stack); | |
| 377 } | |
| 378 | |
| 379 void done() { | |
| 380 completer.complete(count); | |
| 381 } | |
| 382 } | |
| 383 | |
| 384 /// Action completing a [StreamEvents.take] request. | |
| 385 class _TakeAction<T> implements _EventAction { | |
| 386 final Completer completer; | |
| 387 final int count; | |
| 388 List list = <T>[]; | |
| 389 _TakeAction(this.completer, this.count); | |
| 390 | |
| 391 bool data(data) { | |
| 392 list.add(data); | |
| 393 if (list.length < count) return false; | |
| 394 completer.complete(list); | |
| 395 return true; | |
| 396 } | |
| 397 | |
| 398 void error(error, StackTrace stack) { | |
| 399 completer.completeError(error, stack); | |
| 400 } | |
| 401 | |
| 402 void done() { | |
| 403 completer.complete(list); | |
| 404 } | |
| 405 } | |
| 406 | |
| 407 /// Action completing a [StreamEvents.close] request. | |
| 408 class _CloseAction implements _EventCloseAction { | |
| 409 final Completer completer; | |
| 410 | |
| 411 _CloseAction(this.completer); | |
| 412 | |
| 413 bool data(data) { | |
| 414 throw new UnsupportedError("event"); | |
| 415 } | |
| 416 | |
| 417 void error(e, StackTrace stack) { | |
| 418 throw new UnsupportedError("event"); | |
| 419 } | |
| 420 | |
| 421 void done() { | |
| 422 completer.complete(); | |
| 423 } | |
| 424 | |
| 425 void close(StreamSubscription subscription) { | |
| 426 completer.complete(subscription.cancel()); | |
| 427 } | |
| 428 } | |
| 429 | |
| 430 /// Action completing a [StreamEvents.rest] request. | |
| 431 class _RestAction<T> implements _EventCloseAction { | |
| 432 final StreamCompleter delayStream; | |
| 433 _RestAction(this.delayStream); | |
| 434 | |
| 435 bool data(data) { | |
| 436 throw new UnsupportedError("event"); | |
| 437 } | |
| 438 | |
| 439 void error(e, StackTrace stack) { | |
| 440 throw new UnsupportedError("event"); | |
| 441 } | |
| 442 | |
| 443 void done() { | |
| 444 delayStream.setEmpty(); | |
| 445 } | |
| 446 | |
| 447 void close(StreamSubscription subscription) { | |
| 448 delayStream.setSourceStream(new SubscriptionStream<T>(subscription)); | |
| 449 } | |
| 450 } | |
| OLD | NEW |