OLD | NEW |
(Empty) | |
| 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. |
| 4 |
| 5 library async.stream_events; |
| 6 |
| 7 import 'dart:async'; |
| 8 import 'dart:collection'; |
| 9 |
| 10 import "subscription_stream.dart"; |
| 11 import "stream_completer.dart"; |
| 12 |
| 13 /// An asynchronous pull-based interface for accessing stream events. |
| 14 /// |
| 15 /// Wraps a stream and makes individual events available on request. |
| 16 /// |
| 17 /// You can request (and reserve) one or more events from the stream, |
| 18 /// and after all previous requestes have been fulfilled, stream events |
| 19 /// go towards fulfilling your request. |
| 20 /// |
| 21 /// For example, if you ask for [next] two times, the returned futures |
| 22 /// will be completed by the next two unreserved events from the stream. |
| 23 /// |
| 24 /// The stream subscription is paused when there are no active |
| 25 /// requests. |
| 26 /// |
| 27 /// Some streams, including broadcast streams, will buffer |
| 28 /// events while paused, so waiting too long between requests may |
| 29 /// cause memory bloat somewhere else. |
| 30 /// |
| 31 /// The individual requests are served in the order they are requested, |
| 32 /// and the stream subscription is paused when there are no active requests. |
| 33 /// |
| 34 /// This is similar to, but more convenient than, a [StreamIterator]. |
| 35 /// A `StreamIterator` requires you to manually check when a new event is |
| 36 /// available and you can only access the value of that event until you |
| 37 /// check for the next one. A `StreamEvents` allows you to request, for example, |
| 38 /// three events at a time, either individually, as a group using [take] |
| 39 /// or [skip], or in any combination. |
| 40 /// |
| 41 /// You can also ask to have the [rest] of the stream provided as |
| 42 /// a new stream. This allows, for example, taking the first event |
| 43 /// out of a stream and continue using the rest of the stream as a stream. |
| 44 /// |
| 45 /// Example: |
| 46 /// |
| 47 /// var events = new StreamEvents<String>(someStreamOfLines); |
| 48 /// var first = await events.next; |
| 49 /// while (first.startsWith('#')) { |
| 50 /// // Skip comments. |
| 51 /// first = await events.next; |
| 52 /// } |
| 53 /// |
| 54 /// if (first.startsWith(MAGIC_MARKER)) { |
| 55 /// var headerCount = |
| 56 /// first.parseInt(first.substring(MAGIC_MARKER.length + 1)); |
| 57 /// handleMessage(headers: await events.take(headerCount), |
| 58 /// body: events.rest); |
| 59 /// return; |
| 60 /// } |
| 61 /// // Error handling. |
| 62 /// |
| 63 /// When you need no further events the `StreamEvents` should be closed |
| 64 /// using [cancel]. This releases the underlying stream subscription. |
| 65 /// |
| 66 /// The underlying stream subscription is paused when there |
| 67 /// are no requeusts. Some subscriptions, including those of broadcast streams, |
| 68 /// will still buffer events while paused. Creating a `StreamEvents` from |
| 69 /// such a stream and stopping to request events, will cause memory to fill up |
| 70 /// unnecessarily. |
| 71 class StreamEvents<T> { |
| 72 /// The initial state, where the stream has not been listened to yet. |
| 73 /// |
| 74 /// It will be listened to when the first event is requested. |
| 75 /// The `stateData` field holds the stream and the request queue is empty. |
| 76 static const int _stateInitial = 0; |
| 77 |
| 78 /// Listening on a stream that hasn't completed yet. |
| 79 /// |
| 80 /// If the request queue is empty, the subscription is paused. |
| 81 /// The `stateData` field holds the active subscription. |
| 82 static const int _stateListening = 1; |
| 83 |
| 84 /// The stream has completed. |
| 85 /// |
| 86 /// The `stateData` field is `null` and the request queue is empty. |
| 87 static const int _stateDone = 2; |
| 88 |
| 89 /// Flag set when [cancel] is called. |
| 90 /// The `StreamEvents` is closed and no further events can be requested. |
| 91 static const int _stateClosed = 4; |
| 92 |
| 93 /// Flag combined with [_stateClosed] to say it was closed using [rest]. |
| 94 /// Only used for error reporting purposes. |
| 95 static const int _restFlag = 8; |
| 96 |
| 97 /// Flag set when [rest] is called. |
| 98 /// Only used for error reporting, otherwise equivalent to [_stateClosed]. |
| 99 static const int _stateClosedRest = _stateClosed | _restFlag; |
| 100 |
| 101 /// Current state. |
| 102 /// |
| 103 /// Use getters below to check if the state is [_isListening] or [_isDone], |
| 104 /// and whether the stream events object [_isClosed]. |
| 105 int _state = _stateInitial; |
| 106 |
| 107 /// Value depending on state. Use getters below to get the value and assert |
| 108 /// the expected state. |
| 109 var _stateData; |
| 110 |
| 111 /// Queue of pending requests while state is [_stateListening]. |
| 112 /// Access through methods below to ensure consistency. |
| 113 final Queue<_EventRequest> _requestQueue = new Queue(); |
| 114 |
| 115 StreamEvents(Stream source) : _stateData = source; |
| 116 |
| 117 /// Whether we are currently listening on a stream subscription. |
| 118 bool get _isListening => (_state & _stateListening) != 0; |
| 119 |
| 120 /// Whether the underlying stream is done. |
| 121 /// |
| 122 /// This may return true before all events have been delivered. |
| 123 /// Requesting a new event when [_isDone] returns true, |
| 124 /// for example using [next], will always fail. |
| 125 bool get _isDone => (_state & _stateDone) != 0; |
| 126 |
| 127 /// Whether the stream events has been closed. |
| 128 /// |
| 129 /// While closed, no further requests can be made. |
| 130 bool get _isClosed => (_state & _stateClosed) != 0; |
| 131 |
| 132 /// Returns the stream subscription while in a listening state. |
| 133 StreamSubscription get _subscription { |
| 134 assert(_isListening); |
| 135 return _stateData; |
| 136 } |
| 137 |
| 138 /// Returns the source stream while in the initial state. |
| 139 Stream get _sourceStream { |
| 140 assert(!_isListening); |
| 141 assert(!_isDone); |
| 142 return _stateData; |
| 143 } |
| 144 |
| 145 /// Sets the subscription and transitions to listening state. |
| 146 void _setListening(StreamSubscription subscription) { |
| 147 assert(!_isListening); |
| 148 assert(!_isDone); |
| 149 _stateData = subscription; |
| 150 _state |= _stateListening; |
| 151 } |
| 152 |
| 153 /// Transitions to the done state. |
| 154 /// |
| 155 /// The stream is done, and no further events will arrive. |
| 156 void _setDone() { |
| 157 assert(!_isDone); |
| 158 _state = (_state & _stateClosedRest) | _stateDone; |
| 159 _stateData = null; |
| 160 } |
| 161 |
| 162 /// Request the next (yet unrequested) event from the stream. |
| 163 /// |
| 164 /// When the requested event arrives, the returned future is completed with |
| 165 /// the event. |
| 166 /// If the event is a data event, the returned future completes |
| 167 /// with its value. |
| 168 /// If the event is an error event, the returned future completes with |
| 169 /// its error and stack trace. |
| 170 /// If the stream closes before an event arrives, the returned future |
| 171 /// completes with a [StateError]. |
| 172 /// |
| 173 /// It's possible to have several pending [next] calls (or other requests), |
| 174 /// and they will be completed in the order they were requested, by the |
| 175 /// first events that were not used by previous requeusts. |
| 176 Future<T> get next { |
| 177 if (!_isClosed) { |
| 178 _NextRequest nextRequest = new _NextRequest<T>(); |
| 179 _addRequest(nextRequest); |
| 180 return nextRequest.future; |
| 181 } |
| 182 throw _failClosed(); |
| 183 } |
| 184 |
| 185 /// Returns a stream of all the remaning events of the source stream. |
| 186 /// |
| 187 /// All requested [next], [skip] or [take] operations are completed |
| 188 /// first, and then any remaining events are provided as events of |
| 189 /// the returned stream. |
| 190 /// |
| 191 /// Using `rest` closes the stream events object. After getting the |
| 192 /// `rest` the caller may no longer request other events, like |
| 193 /// after calling [cancel]. |
| 194 Stream<T> get rest { |
| 195 if (_isClosed) { |
| 196 throw _failClosed(); |
| 197 } |
| 198 _state |= _stateClosedRest; |
| 199 if (_isListening) { |
| 200 // We have an active subscription that we want to take over. |
| 201 var request = new _RestRequest<T>(this); |
| 202 _addRequest(request); |
| 203 return request.stream; |
| 204 } |
| 205 assert(_requestQueue.isEmpty); |
| 206 if (_isDone) { |
| 207 // TODO(lrn): Add Stream.empty() constructor. |
| 208 return new Stream<T>.fromIterable(const []); |
| 209 } |
| 210 // We have never listened to the source stream, |
| 211 // so just return that directly. |
| 212 Stream result = _sourceStream; |
| 213 _setDone(); |
| 214 return result; |
| 215 } |
| 216 |
| 217 /// Skips the next [count] *data* events. |
| 218 /// |
| 219 /// The [count] must be non-negative. |
| 220 /// |
| 221 /// When successful, this is equivalent to using [take] |
| 222 /// and ignoring the result. |
| 223 /// |
| 224 /// If an error occurs before `count` data events have been skipped, |
| 225 /// the returned future completes with that error instead. |
| 226 /// |
| 227 /// If the stream closes before `count` data events, |
| 228 /// the remaining unskipped event count is returned. |
| 229 /// If the returned future completes with the integer `0`, |
| 230 /// then all events were succssfully skipped. If the value |
| 231 /// is greater than zero then the stream ended early. |
| 232 Future<int> skip(int count) { |
| 233 if (count < 0) throw new RangeError.range(count, 0, null, "count"); |
| 234 if (!_isClosed) { |
| 235 var request = new _SkipRequest(count); |
| 236 _addRequest(request); |
| 237 return request.future; |
| 238 } |
| 239 throw _failClosed(); |
| 240 } |
| 241 |
| 242 /// Requests the next [count] data events as a list. |
| 243 /// |
| 244 /// The [count] must be non-negative. |
| 245 /// |
| 246 /// Equivalent to calling [next] `count` times and |
| 247 /// storing the data values in a list. |
| 248 /// |
| 249 /// If an error occurs before `count` data events has |
| 250 /// been collected, the returned future completes with |
| 251 /// that error instead. |
| 252 /// |
| 253 /// If the stream closes before `count` data events, |
| 254 /// the returned future completes with the list |
| 255 /// of data collected so far. That is, the returned |
| 256 /// list may have fewer than [count] elements. |
| 257 Future<List<T>> take(int count) { |
| 258 if (count < 0) throw new RangeError.range(count, 0, null, "count"); |
| 259 if (!_isClosed) { |
| 260 var request = new _TakeRequest<T>(count); |
| 261 _addRequest(request); |
| 262 return request.future; |
| 263 } |
| 264 throw _failClosed(); |
| 265 } |
| 266 |
| 267 /// Cancels the underlying stream subscription. |
| 268 /// |
| 269 /// The cancel operation waits until all previously requested |
| 270 /// events have been processed, then it cancels the subscription |
| 271 /// providing the events. |
| 272 /// |
| 273 /// The returned future completes with the result of calling |
| 274 /// `cancel`. |
| 275 /// |
| 276 /// After calling `cancel`, no further events can be requested. |
| 277 /// None of [next], [rest], [skip], [take] or [cancel] may be |
| 278 /// called again. |
| 279 Future cancel() { |
| 280 if (!_isClosed) { |
| 281 _state |= _stateClosed; |
| 282 if (!_isListening) { |
| 283 // The request queue is only non-empty while we are listening. |
| 284 assert(_requestQueue.isEmpty); |
| 285 if (!_isDone) _setDone(); |
| 286 return new Future.value(); |
| 287 } |
| 288 var request = new _CancelRequest(this); |
| 289 _addRequest(request); |
| 290 return request.future; |
| 291 } |
| 292 throw _failClosed(); |
| 293 } |
| 294 |
| 295 /// Returns an error for when a request is made after cancel. |
| 296 /// |
| 297 /// Returns a [StateError] with a message saying that either |
| 298 /// [cancel] or [rest] have already been called. |
| 299 Error _failClosed() { |
| 300 String cause = |
| 301 ((_state & _stateClosedRest) == _stateClosedRest) ? "rest" : "cancel"; |
| 302 return new StateError("Already cancelled by a call to $cause"); |
| 303 } |
| 304 |
| 305 // Callbacks receiving the events of the source stream. |
| 306 |
| 307 void _onData(T data) { |
| 308 assert(_requestQueue.isNotEmpty); |
| 309 _EventRequest request = _nextRequest; |
| 310 request.add(data); |
| 311 _checkCompleted(); |
| 312 } |
| 313 |
| 314 void _onError(error, StackTrace stack) { |
| 315 assert(_requestQueue.isNotEmpty); |
| 316 _EventRequest request = _nextRequest; |
| 317 request.addError(error, stack); |
| 318 _checkCompleted(); |
| 319 } |
| 320 |
| 321 void _onDone() { |
| 322 _setDone(); |
| 323 _closeAllRequests(); |
| 324 } |
| 325 |
| 326 // Request queue management. |
| 327 |
| 328 /// Returns the next request in the queue, but don't remove it. |
| 329 _EventRequest get _nextRequest { |
| 330 return _requestQueue.first; |
| 331 } |
| 332 |
| 333 /// Add a new request to the queue. |
| 334 void _addRequest(_EventRequest request) { |
| 335 if (_isDone) { |
| 336 request.close(); |
| 337 return; |
| 338 } |
| 339 if (_requestQueue.isEmpty) { |
| 340 if (request.isComplete) { |
| 341 // Some requests are complete without receiving any events. |
| 342 // This includes [cancel] and [rest] requests, as well as |
| 343 // [take] and [skip] events with zero count. |
| 344 |
| 345 // We can skip listening and jsut complete the request immediately. |
| 346 request.close(); |
| 347 return; |
| 348 } |
| 349 // Continue listening on the source stream. |
| 350 // The source stream is paused while the requeust queue is empty, |
| 351 // except at the beginning when it hasn't been listened to at all. |
| 352 if (_isListening) { |
| 353 _subscription.resume(); |
| 354 } else if (!_isDone) { |
| 355 _setListening( |
| 356 _sourceStream.listen(_onData, onError: _onError, onDone: _onDone)); |
| 357 } |
| 358 } |
| 359 _requestQueue.add(request); |
| 360 } |
| 361 |
| 362 /// Remove all requests and close them. |
| 363 /// |
| 364 /// Used when the source stream is done. |
| 365 void _closeAllRequests() { |
| 366 assert(_isDone); |
| 367 while (_requestQueue.isNotEmpty) { |
| 368 _requestQueue.removeFirst().close(); |
| 369 } |
| 370 } |
| 371 |
| 372 /// Check whether the next requests in the queue are complete. |
| 373 /// |
| 374 /// If so, remove them and call their `close` method. |
| 375 void _checkCompleted() { |
| 376 // Close-actions are executed immediately when they become the |
| 377 // next (and last) event in the queue. |
| 378 // When _isClosed and the queue is not empty, the last element |
| 379 // of the queue is the close action. |
| 380 while (_requestQueue.isNotEmpty) { |
| 381 if (!_requestQueue.first.isComplete) { |
| 382 return; |
| 383 } |
| 384 _requestQueue.removeFirst().close(); |
| 385 } |
| 386 assert(_requestQueue.isEmpty); |
| 387 if (!_isDone) { |
| 388 // Pause the underlying subscription. |
| 389 // Won't get here without adding a request, so we must be listening |
| 390 // already. |
| 391 assert(_isListening); |
| 392 _subscription.pause(); |
| 393 } |
| 394 } |
| 395 |
| 396 /// Extracts the subscription and makes the events object unusable. |
| 397 /// |
| 398 /// Can only be used by the very last request. |
| 399 StreamSubscription _dispose() { |
| 400 assert(_isClosed); |
| 401 assert(_isListening); |
| 402 assert(_requestQueue.isEmpty); |
| 403 StreamSubscription subscription = _subscription; |
| 404 _setDone(); |
| 405 return subscription; |
| 406 } |
| 407 } |
| 408 |
| 409 /// Request object that receives events when they arrive, until fulfilled. |
| 410 /// |
| 411 /// Each request that cannot be fulfilled immediately is represented by |
| 412 /// an `_EventRequest` object in the request queue. |
| 413 /// |
| 414 /// Events from the source stream are sent to the first request in the |
| 415 /// queue until it reports itself as [isComplete]. |
| 416 /// |
| 417 /// When the first request in the queue `isComplete`, either when becoming |
| 418 /// the first request or after receiving an event, its [close] methods is |
| 419 /// called. |
| 420 /// |
| 421 /// The [close] method is also called immediately when the source stream |
| 422 /// is done. |
| 423 abstract class _EventRequest implements EventSink { |
| 424 /// Handle a data event. |
| 425 void add(data); |
| 426 |
| 427 /// Handle an error event. |
| 428 void addError(error, [StackTrace stackTrace]); |
| 429 |
| 430 /// Complete the request. |
| 431 /// |
| 432 /// This may be called either when [isComplete] returns true, |
| 433 /// or if the source stream is done. |
| 434 void close(); |
| 435 |
| 436 /// Whether the request considers itself fulfilled. |
| 437 /// |
| 438 /// This is checked whenever a request becomes the first request |
| 439 /// in the request queue, and after it receives an event. |
| 440 /// |
| 441 /// When a request is complete, its [close] method is called and |
| 442 /// it's removed from the request queue. |
| 443 bool get isComplete; |
| 444 } |
| 445 |
| 446 /// Request for a [StreamEvents.next] call. |
| 447 /// |
| 448 /// Completes the returned future when receiving the first event, |
| 449 /// and is then complete. |
| 450 class _NextRequest<T> implements _EventRequest { |
| 451 /// Completer for the future returned by [StreamEvents.next]. |
| 452 /// |
| 453 /// Set to `null` when it is completed, to mark it as already complete. |
| 454 final Completer _completer; |
| 455 |
| 456 _NextRequest() : _completer = new Completer<T>(); |
| 457 |
| 458 Future<T> get future => _completer.future; |
| 459 |
| 460 void add(data) { |
| 461 _completer.complete(data); |
| 462 } |
| 463 |
| 464 void addError(error, [StackTrace stack]) { |
| 465 _completer.completeError(error, stack); |
| 466 } |
| 467 |
| 468 void close() { |
| 469 if (!_completer.isCompleted) { |
| 470 _completer.completeError(new StateError("no elements")); |
| 471 } |
| 472 } |
| 473 |
| 474 bool get isComplete => _completer.isCompleted; |
| 475 } |
| 476 |
| 477 /// Request for a [StreamEvents.skip] call. |
| 478 class _SkipRequest implements _EventRequest { |
| 479 /// Completer for the future returned by the skip call. |
| 480 final Completer _completer = new Completer<int>(); |
| 481 |
| 482 /// Number of remaining events to skip. |
| 483 /// |
| 484 /// The request [isComplete] when the values reaches zero. |
| 485 /// |
| 486 /// Decremented when an event is seen. |
| 487 /// Set to zero when an error is seen since errors abort the skip request. |
| 488 int _eventsToSkip; |
| 489 |
| 490 _SkipRequest(this._eventsToSkip); |
| 491 |
| 492 /// The future completed when the correct number of events have been skipped. |
| 493 Future get future => _completer.future; |
| 494 |
| 495 void add(data) { |
| 496 assert(_eventsToSkip > 0); |
| 497 _eventsToSkip--; |
| 498 } |
| 499 |
| 500 void addError(error, [StackTrace stackTrace]) { |
| 501 _eventsToSkip = 0; |
| 502 _completer.completeError(error, stackTrace); |
| 503 } |
| 504 |
| 505 void close() { |
| 506 if (!_completer.isCompleted) { |
| 507 _completer.complete(_eventsToSkip); |
| 508 } |
| 509 } |
| 510 |
| 511 bool get isComplete => _eventsToSkip == 0; |
| 512 } |
| 513 |
| 514 /// Request for a [StreamEvents.take] call. |
| 515 class _TakeRequest<T> implements _EventRequest { |
| 516 /// Completer for the future returned by the take call. |
| 517 final Completer _completer; |
| 518 |
| 519 /// List collecting events until enough have been seen. |
| 520 final List _list = <T>[]; |
| 521 |
| 522 /// Number of events to capture. |
| 523 /// |
| 524 /// The request [isComplete] when the length of [_list] reaches |
| 525 /// this value. |
| 526 final int _eventsToTake; |
| 527 |
| 528 _TakeRequest(this._eventsToTake) : _completer = new Completer<List<T>>(); |
| 529 |
| 530 /// The future completed when the correct number of events have been captured. |
| 531 Future get future => _completer.future; |
| 532 |
| 533 void add(data) { |
| 534 _list.add(data); |
| 535 } |
| 536 |
| 537 void addError(error, [StackTrace stack]) { |
| 538 _completer.completeError(error, stack); |
| 539 } |
| 540 |
| 541 void close() { |
| 542 if (!_completer.isCompleted) { |
| 543 _completer.complete(_list); |
| 544 } |
| 545 } |
| 546 |
| 547 bool get isComplete => |
| 548 _list.length == _eventsToTake || _completer.isCompleted; |
| 549 } |
| 550 |
| 551 /// Request for a [StreamEvents.cancel] call. |
| 552 /// |
| 553 /// The request is always complete, it just waits in the request queue |
| 554 /// until all previous events are fulfilled, then it cancels the stream events |
| 555 /// subscription. |
| 556 class _CancelRequest implements _EventRequest { |
| 557 /// Completer for the future returned by the `cancel` call. |
| 558 final Completer _completer = new Completer(); |
| 559 |
| 560 /// The [StreamEvents] object that has this request queued. |
| 561 /// |
| 562 /// When the event is completed, it needs to cancel the active subscription |
| 563 /// of the `StreamEvents` object, if any. |
| 564 final StreamEvents _events; |
| 565 |
| 566 _CancelRequest(this._events); |
| 567 |
| 568 /// The future completed when the cancel request is completed. |
| 569 Future get future => _completer.future; |
| 570 |
| 571 void add(data) { |
| 572 assert(false); // Unreachable. |
| 573 } |
| 574 |
| 575 void addError(error, [StackTrace stack]) { |
| 576 assert(false); // Unreachable. |
| 577 } |
| 578 |
| 579 void close() { |
| 580 if (_events._isListening) { |
| 581 _completer.complete(_events._dispose().cancel()); |
| 582 } else { |
| 583 _completer.complete(); |
| 584 } |
| 585 } |
| 586 |
| 587 bool get isComplete => true; |
| 588 } |
| 589 |
| 590 /// Request for a [StreamEvents.rest] call. |
| 591 /// |
| 592 /// The request is always complete, it just waits in the request queue |
| 593 /// until all previous events are fulfilled, then it takes over the |
| 594 /// stream events subscription and creates a stream from it. |
| 595 class _RestRequest<T> implements _EventRequest { |
| 596 /// Completer for the stream returned by the `rest` call. |
| 597 final StreamCompleter _completer; |
| 598 |
| 599 /// The [StreamEvents] object that has this request queued. |
| 600 /// |
| 601 /// When the event is completed, it needs to cancel the active subscription |
| 602 /// of the `StreamEvents` object, if any. |
| 603 final StreamEvents _events; |
| 604 _RestRequest(this._events) : _completer = new StreamCompleter<T>(); |
| 605 |
| 606 /// The future which will contain the remaining events of [_events]. |
| 607 Stream<T> get stream => _completer.stream; |
| 608 |
| 609 void add(data) { |
| 610 assert(false); // Unreachable. |
| 611 } |
| 612 |
| 613 void addError(error, [StackTrace stack]) { |
| 614 assert(false); // Unreachable. |
| 615 } |
| 616 |
| 617 void close() { |
| 618 if (_events._isListening) { |
| 619 StreamSubscription subscription = _events._dispose(); |
| 620 _completer.setSourceStream(new SubscriptionStream<T>(subscription)); |
| 621 if (subscription.isPaused) subscription.resume(); |
| 622 } else { |
| 623 assert(_events._isDone); |
| 624 _completer.setEmpty(); |
| 625 } |
| 626 } |
| 627 |
| 628 bool get isComplete => true; |
| 629 } |
OLD | NEW |