OLD | NEW |
(Empty) | |
| 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. |
| 4 |
| 5 // TODO(nweiz): Get rid of this when https://codereview.chromium.org/1241723003/ |
| 6 // lands. |
| 7 library test.util.forkable_stream_queue; |
| 8 |
| 9 import 'dart:async'; |
| 10 import 'dart:collection'; |
| 11 |
| 12 import "package:async/async.dart" hide ForkableStream, StreamQueue; |
| 13 |
| 14 import "forkable_stream.dart"; |
| 15 |
| 16 /// An asynchronous pull-based interface for accessing stream events. |
| 17 /// |
| 18 /// Wraps a stream and makes individual events available on request. |
| 19 /// |
| 20 /// You can request (and reserve) one or more events from the stream, |
| 21 /// and after all previous requests have been fulfilled, stream events |
| 22 /// go towards fulfilling your request. |
| 23 /// |
| 24 /// For example, if you ask for [next] two times, the returned futures |
| 25 /// will be completed by the next two unrequested events from the stream. |
| 26 /// |
| 27 /// The stream subscription is paused when there are no active |
| 28 /// requests. |
| 29 /// |
| 30 /// Some streams, including broadcast streams, will buffer |
| 31 /// events while paused, so waiting too long between requests may |
| 32 /// cause memory bloat somewhere else. |
| 33 /// |
| 34 /// This is similar to, but more convenient than, a [StreamIterator]. |
| 35 /// A `StreamIterator` requires you to manually check when a new event is |
| 36 /// available and you can only access the value of that event until you |
| 37 /// check for the next one. A `StreamQueue` allows you to request, for example, |
| 38 /// three events at a time, either individually, as a group using [take] |
| 39 /// or [skip], or in any combination. |
| 40 /// |
| 41 /// You can also ask to have the [rest] of the stream provided as |
| 42 /// a new stream. This allows, for example, taking the first event |
| 43 /// out of a stream and continuing to use the rest of the stream as a stream. |
| 44 /// |
| 45 /// Example: |
| 46 /// |
| 47 /// var events = new StreamQueue<String>(someStreamOfLines); |
| 48 /// var first = await events.next; |
| 49 /// while (first.startsWith('#')) { |
| 50 /// // Skip comments. |
| 51 /// first = await events.next; |
| 52 /// } |
| 53 /// |
| 54 /// if (first.startsWith(MAGIC_MARKER)) { |
| 55 /// var headerCount = |
| 56 /// first.parseInt(first.substring(MAGIC_MARKER.length + 1)); |
| 57 /// handleMessage(headers: await events.take(headerCount), |
| 58 /// body: events.rest); |
| 59 /// return; |
| 60 /// } |
| 61 /// // Error handling. |
| 62 /// |
| 63 /// When you need no further events the `StreamQueue` should be closed |
| 64 /// using [cancel]. This releases the underlying stream subscription. |
| 65 class StreamQueue<T> { |
| 66 // This class maintains two queues: one of events and one of requests. |
| 67 // The active request (the one in front of the queue) is called with |
| 68 // the current event queue when it becomes active. |
| 69 // |
| 70 // If the request returns true, it's complete and will be removed from the |
| 71 // request queue. |
| 72 // If the request returns false, it needs more events, and will be called |
| 73 // again when new events are available. |
| 74 // The request can remove events that it uses, or keep them in the event |
| 75 // queue until it has all that it needs. |
| 76 // |
| 77 // This model is very flexible and easily extensible. |
| 78 // It allows requests that don't consume events (like [hasNext]) or |
| 79 // potentially a request that takes either five or zero events, determined |
| 80 // by the content of the fifth event. |
| 81 |
| 82 /// Source of events. |
| 83 final ForkableStream _sourceStream; |
| 84 |
| 85 /// Subscription on [_sourceStream] while listening for events. |
| 86 /// |
| 87 /// Set to subscription when listening, and set to `null` when the |
| 88 /// subscription is done (and [_isDone] is set to true). |
| 89 StreamSubscription _subscription; |
| 90 |
| 91 /// Whether we have listened on [_sourceStream] and the subscription is done. |
| 92 bool _isDone = false; |
| 93 |
| 94 /// Whether a closing operation has been performed on the stream queue. |
| 95 /// |
| 96 /// Closing operations are [cancel] and [rest]. |
| 97 bool _isClosed = false; |
| 98 |
| 99 /// Queue of events not used by a request yet. |
| 100 final Queue<Result> _eventQueue = new Queue(); |
| 101 |
| 102 /// Queue of pending requests. |
| 103 /// |
| 104 /// Access through methods below to ensure consistency. |
| 105 final Queue<_EventRequest> _requestQueue = new Queue(); |
| 106 |
| 107 /// Create a `StreamQueue` of the events of [source]. |
| 108 StreamQueue(Stream source) |
| 109 : _sourceStream = source is ForkableStream |
| 110 ? source |
| 111 : new ForkableStream(source); |
| 112 |
| 113 /// Asks if the stream has any more events. |
| 114 /// |
| 115 /// Returns a future that completes with `true` if the stream has any |
| 116 /// more events, whether data or error. |
| 117 /// If the stream closes without producing any more events, the returned |
| 118 /// future completes with `false`. |
| 119 /// |
| 120 /// Can be used before using [next] to avoid getting an error in the |
| 121 /// future returned by `next` in the case where there are no more events. |
| 122 Future<bool> get hasNext { |
| 123 if (!_isClosed) { |
| 124 var hasNextRequest = new _HasNextRequest(); |
| 125 _addRequest(hasNextRequest); |
| 126 return hasNextRequest.future; |
| 127 } |
| 128 throw _failClosed(); |
| 129 } |
| 130 |
| 131 /// Requests the next (yet unrequested) event from the stream. |
| 132 /// |
| 133 /// When the requested event arrives, the returned future is completed with |
| 134 /// the event. |
| 135 /// If the event is a data event, the returned future completes |
| 136 /// with its value. |
| 137 /// If the event is an error event, the returned future completes with |
| 138 /// its error and stack trace. |
| 139 /// If the stream closes before an event arrives, the returned future |
| 140 /// completes with a [StateError]. |
| 141 /// |
| 142 /// It's possible to have several pending [next] calls (or other requests), |
| 143 /// and they will be completed in the order they were requested, by the |
| 144 /// first events that were not consumed by previous requeusts. |
| 145 Future<T> get next { |
| 146 if (!_isClosed) { |
| 147 var nextRequest = new _NextRequest<T>(); |
| 148 _addRequest(nextRequest); |
| 149 return nextRequest.future; |
| 150 } |
| 151 throw _failClosed(); |
| 152 } |
| 153 |
| 154 /// Returns a stream of all the remaning events of the source stream. |
| 155 /// |
| 156 /// All requested [next], [skip] or [take] operations are completed |
| 157 /// first, and then any remaining events are provided as events of |
| 158 /// the returned stream. |
| 159 /// |
| 160 /// Using `rest` closes this stream queue. After getting the |
| 161 /// `rest` the caller may no longer request other events, like |
| 162 /// after calling [cancel]. |
| 163 Stream<T> get rest { |
| 164 if (_isClosed) { |
| 165 throw _failClosed(); |
| 166 } |
| 167 var request = new _RestRequest<T>(this); |
| 168 _isClosed = true; |
| 169 _addRequest(request); |
| 170 return request.stream; |
| 171 } |
| 172 |
| 173 /// Skips the next [count] *data* events. |
| 174 /// |
| 175 /// The [count] must be non-negative. |
| 176 /// |
| 177 /// When successful, this is equivalent to using [take] |
| 178 /// and ignoring the result. |
| 179 /// |
| 180 /// If an error occurs before `count` data events have been skipped, |
| 181 /// the returned future completes with that error instead. |
| 182 /// |
| 183 /// If the stream closes before `count` data events, |
| 184 /// the remaining unskipped event count is returned. |
| 185 /// If the returned future completes with the integer `0`, |
| 186 /// then all events were succssfully skipped. If the value |
| 187 /// is greater than zero then the stream ended early. |
| 188 Future<int> skip(int count) { |
| 189 if (count < 0) throw new RangeError.range(count, 0, null, "count"); |
| 190 if (!_isClosed) { |
| 191 var request = new _SkipRequest(count); |
| 192 _addRequest(request); |
| 193 return request.future; |
| 194 } |
| 195 throw _failClosed(); |
| 196 } |
| 197 |
| 198 /// Requests the next [count] data events as a list. |
| 199 /// |
| 200 /// The [count] must be non-negative. |
| 201 /// |
| 202 /// Equivalent to calling [next] `count` times and |
| 203 /// storing the data values in a list. |
| 204 /// |
| 205 /// If an error occurs before `count` data events has |
| 206 /// been collected, the returned future completes with |
| 207 /// that error instead. |
| 208 /// |
| 209 /// If the stream closes before `count` data events, |
| 210 /// the returned future completes with the list |
| 211 /// of data collected so far. That is, the returned |
| 212 /// list may have fewer than [count] elements. |
| 213 Future<List<T>> take(int count) { |
| 214 if (count < 0) throw new RangeError.range(count, 0, null, "count"); |
| 215 if (!_isClosed) { |
| 216 var request = new _TakeRequest<T>(count); |
| 217 _addRequest(request); |
| 218 return request.future; |
| 219 } |
| 220 throw _failClosed(); |
| 221 } |
| 222 |
| 223 /// Creates a new stream queue in the same position as this one. |
| 224 /// |
| 225 /// The fork is subscribed to the same underlying stream as this queue, but |
| 226 /// it's otherwise wholly independent. If requests are made on one, they don't |
| 227 /// move the other forward; if one is closed, the other is still open. |
| 228 /// |
| 229 /// The underlying stream will only be paused when all forks have no |
| 230 /// outstanding requests, and only canceled when all forks are canceled. |
| 231 StreamQueue<T> fork() { |
| 232 if (_isClosed) throw _failClosed(); |
| 233 |
| 234 var request = new _ForkRequest<T>(this); |
| 235 _addRequest(request); |
| 236 return request.queue; |
| 237 } |
| 238 |
| 239 /// Cancels the underlying stream subscription. |
| 240 /// |
| 241 /// If [immediate] is `false` (the default), the cancel operation waits until |
| 242 /// all previously requested events have been processed, then it cancels the |
| 243 /// subscription providing the events. |
| 244 /// |
| 245 /// If [immediate] is `true`, the subscription is instead canceled |
| 246 /// immediately. Any pending events complete with a 'closed'-event, as though |
| 247 /// the stream had closed by itself. |
| 248 /// |
| 249 /// The returned future completes with the result of calling |
| 250 /// `cancel`. |
| 251 /// |
| 252 /// After calling `cancel`, no further events can be requested. |
| 253 /// None of [next], [rest], [skip], [take] or [cancel] may be |
| 254 /// called again. |
| 255 Future cancel({bool immediate: false}) { |
| 256 if (_isClosed) throw _failClosed(); |
| 257 _isClosed = true; |
| 258 |
| 259 if (_isDone) return new Future.value(); |
| 260 if (_subscription == null) _subscription = _sourceStream.listen(null); |
| 261 |
| 262 if (!immediate) { |
| 263 var request = new _CancelRequest(this); |
| 264 _addRequest(request); |
| 265 return request.future; |
| 266 } |
| 267 |
| 268 var future = _subscription.cancel(); |
| 269 _onDone(); |
| 270 return future; |
| 271 } |
| 272 |
| 273 /// Returns an error for when a request is made after cancel. |
| 274 /// |
| 275 /// Returns a [StateError] with a message saying that either |
| 276 /// [cancel] or [rest] have already been called. |
| 277 Error _failClosed() { |
| 278 return new StateError("Already cancelled"); |
| 279 } |
| 280 |
| 281 // Callbacks receiving the events of the source stream. |
| 282 |
| 283 void _onData(T data) { |
| 284 _eventQueue.add(new Result.value(data)); |
| 285 _checkQueues(); |
| 286 } |
| 287 |
| 288 void _onError(error, StackTrace stack) { |
| 289 _eventQueue.add(new Result.error(error, stack)); |
| 290 _checkQueues(); |
| 291 } |
| 292 |
| 293 void _onDone() { |
| 294 _subscription = null; |
| 295 _isDone = true; |
| 296 _closeAllRequests(); |
| 297 } |
| 298 |
| 299 // Request queue management. |
| 300 |
| 301 /// Adds a new request to the queue. |
| 302 void _addRequest(_EventRequest request) { |
| 303 if (_isDone) { |
| 304 assert(_requestQueue.isEmpty); |
| 305 if (!request.addEvents(_eventQueue)) { |
| 306 request.close(_eventQueue); |
| 307 } |
| 308 return; |
| 309 } |
| 310 if (_requestQueue.isEmpty) { |
| 311 if (request.addEvents(_eventQueue)) return; |
| 312 _ensureListening(); |
| 313 } |
| 314 _requestQueue.add(request); |
| 315 } |
| 316 |
| 317 /// Ensures that we are listening on events from [_sourceStream]. |
| 318 /// |
| 319 /// Resumes subscription on [_sourceStream], or creates it if necessary. |
| 320 void _ensureListening() { |
| 321 assert(!_isDone); |
| 322 if (_subscription == null) { |
| 323 _subscription = |
| 324 _sourceStream.listen(_onData, onError: _onError, onDone: _onDone); |
| 325 } else { |
| 326 _subscription.resume(); |
| 327 } |
| 328 } |
| 329 |
| 330 /// Removes all requests and closes them. |
| 331 /// |
| 332 /// Used when the source stream is done. |
| 333 /// After this, no further requests will be added to the queue, |
| 334 /// requests are immediately served entirely by events already in the event |
| 335 /// queue, if any. |
| 336 void _closeAllRequests() { |
| 337 assert(_isDone); |
| 338 while (_requestQueue.isNotEmpty) { |
| 339 var request = _requestQueue.removeFirst(); |
| 340 if (!request.addEvents(_eventQueue)) { |
| 341 request.close(_eventQueue); |
| 342 } |
| 343 } |
| 344 } |
| 345 |
| 346 /// Matches events with requests. |
| 347 /// |
| 348 /// Called after receiving an event. |
| 349 void _checkQueues() { |
| 350 while (_requestQueue.isNotEmpty) { |
| 351 if (_requestQueue.first.addEvents(_eventQueue)) { |
| 352 _requestQueue.removeFirst(); |
| 353 } else { |
| 354 return; |
| 355 } |
| 356 } |
| 357 |
| 358 if (!_isDone) { |
| 359 _subscription.pause(); |
| 360 } |
| 361 } |
| 362 |
| 363 /// Extracts the subscription and makes this stream queue unusable. |
| 364 /// |
| 365 /// Can only be used by the very last request. |
| 366 StreamSubscription _dispose() { |
| 367 assert(_isClosed); |
| 368 var subscription = _subscription; |
| 369 _subscription = null; |
| 370 _isDone = true; |
| 371 return subscription; |
| 372 } |
| 373 } |
| 374 |
| 375 /// Request object that receives events when they arrive, until fulfilled. |
| 376 /// |
| 377 /// Each request that cannot be fulfilled immediately is represented by |
| 378 /// an `_EventRequest` object in the request queue. |
| 379 /// |
| 380 /// Events from the source stream are sent to the first request in the |
| 381 /// queue until it reports itself as [isComplete]. |
| 382 /// |
| 383 /// When the first request in the queue `isComplete`, either when becoming |
| 384 /// the first request or after receiving an event, its [close] methods is |
| 385 /// called. |
| 386 /// |
| 387 /// The [close] method is also called immediately when the source stream |
| 388 /// is done. |
| 389 abstract class _EventRequest { |
| 390 /// Handle available events. |
| 391 /// |
| 392 /// The available events are provided as a queue. The `addEvents` function |
| 393 /// should only remove events from the front of the event queue, e.g., |
| 394 /// using [removeFirst]. |
| 395 /// |
| 396 /// Returns `true` if the request is completed, or `false` if it needs |
| 397 /// more events. |
| 398 /// The call may keep events in the queue until the requeust is complete, |
| 399 /// or it may remove them immediately. |
| 400 /// |
| 401 /// If the method returns true, the request is considered fulfilled, and |
| 402 /// will never be called again. |
| 403 /// |
| 404 /// This method is called when a request reaches the front of the request |
| 405 /// queue, and if it returns `false`, it's called again every time a new event |
| 406 /// becomes available, or when the stream closes. |
| 407 bool addEvents(Queue<Result> events); |
| 408 |
| 409 /// Complete the request. |
| 410 /// |
| 411 /// This is called when the source stream is done before the request |
| 412 /// had a chance to receive all its events. That is, after a call |
| 413 /// to [addEvents] has returned `false`. |
| 414 /// If there are any unused events available, they are in the [events] queue. |
| 415 /// No further events will become available. |
| 416 /// |
| 417 /// The queue should only remove events from the front of the event queue, |
| 418 /// e.g., using [removeFirst]. |
| 419 /// |
| 420 /// If the request kept events in the queue after an [addEvents] call, |
| 421 /// this is the last chance to use them. |
| 422 void close(Queue<Result> events); |
| 423 } |
| 424 |
| 425 /// Request for a [StreamQueue.next] call. |
| 426 /// |
| 427 /// Completes the returned future when receiving the first event, |
| 428 /// and is then complete. |
| 429 class _NextRequest<T> implements _EventRequest { |
| 430 /// Completer for the future returned by [StreamQueue.next]. |
| 431 final Completer _completer; |
| 432 |
| 433 _NextRequest() : _completer = new Completer<T>(); |
| 434 |
| 435 Future<T> get future => _completer.future; |
| 436 |
| 437 bool addEvents(Queue<Result> events) { |
| 438 if (events.isEmpty) return false; |
| 439 events.removeFirst().complete(_completer); |
| 440 return true; |
| 441 } |
| 442 |
| 443 void close(Queue<Result> events) { |
| 444 var errorFuture = |
| 445 new Future.sync(() => throw new StateError("No elements")); |
| 446 _completer.complete(errorFuture); |
| 447 } |
| 448 } |
| 449 |
| 450 /// Request for a [StreamQueue.skip] call. |
| 451 class _SkipRequest implements _EventRequest { |
| 452 /// Completer for the future returned by the skip call. |
| 453 final Completer _completer = new Completer<int>(); |
| 454 |
| 455 /// Number of remaining events to skip. |
| 456 /// |
| 457 /// The request [isComplete] when the values reaches zero. |
| 458 /// |
| 459 /// Decremented when an event is seen. |
| 460 /// Set to zero when an error is seen since errors abort the skip request. |
| 461 int _eventsToSkip; |
| 462 |
| 463 _SkipRequest(this._eventsToSkip); |
| 464 |
| 465 /// The future completed when the correct number of events have been skipped. |
| 466 Future get future => _completer.future; |
| 467 |
| 468 bool addEvents(Queue<Result> events) { |
| 469 while (_eventsToSkip > 0) { |
| 470 if (events.isEmpty) return false; |
| 471 _eventsToSkip--; |
| 472 var event = events.removeFirst(); |
| 473 if (event.isError) { |
| 474 event.complete(_completer); |
| 475 return true; |
| 476 } |
| 477 } |
| 478 _completer.complete(0); |
| 479 return true; |
| 480 } |
| 481 |
| 482 void close(Queue<Result> events) { |
| 483 _completer.complete(_eventsToSkip); |
| 484 } |
| 485 } |
| 486 |
| 487 /// Request for a [StreamQueue.take] call. |
| 488 class _TakeRequest<T> implements _EventRequest { |
| 489 /// Completer for the future returned by the take call. |
| 490 final Completer _completer; |
| 491 |
| 492 /// List collecting events until enough have been seen. |
| 493 final List _list = <T>[]; |
| 494 |
| 495 /// Number of events to capture. |
| 496 /// |
| 497 /// The request [isComplete] when the length of [_list] reaches |
| 498 /// this value. |
| 499 final int _eventsToTake; |
| 500 |
| 501 _TakeRequest(this._eventsToTake) : _completer = new Completer<List<T>>(); |
| 502 |
| 503 /// The future completed when the correct number of events have been captured. |
| 504 Future get future => _completer.future; |
| 505 |
| 506 bool addEvents(Queue<Result> events) { |
| 507 while (_list.length < _eventsToTake) { |
| 508 if (events.isEmpty) return false; |
| 509 var result = events.removeFirst(); |
| 510 if (result.isError) { |
| 511 result.complete(_completer); |
| 512 return true; |
| 513 } |
| 514 _list.add(result.asValue.value); |
| 515 } |
| 516 _completer.complete(_list); |
| 517 return true; |
| 518 } |
| 519 |
| 520 void close(Queue<Result> events) { |
| 521 _completer.complete(_list); |
| 522 } |
| 523 } |
| 524 |
| 525 /// Request for a [StreamQueue.cancel] call. |
| 526 /// |
| 527 /// The request needs no events, it just waits in the request queue |
| 528 /// until all previous events are fulfilled, then it cancels the stream queue |
| 529 /// source subscription. |
| 530 class _CancelRequest implements _EventRequest { |
| 531 /// Completer for the future returned by the `cancel` call. |
| 532 final Completer _completer = new Completer(); |
| 533 |
| 534 /// The [StreamQueue] object that has this request queued. |
| 535 /// |
| 536 /// When the event is completed, it needs to cancel the active subscription |
| 537 /// of the `StreamQueue` object, if any. |
| 538 final StreamQueue _streamQueue; |
| 539 |
| 540 _CancelRequest(this._streamQueue); |
| 541 |
| 542 /// The future completed when the cancel request is completed. |
| 543 Future get future => _completer.future; |
| 544 |
| 545 bool addEvents(Queue<Result> events) { |
| 546 _shutdown(); |
| 547 return true; |
| 548 } |
| 549 |
| 550 void close(_) { |
| 551 _shutdown(); |
| 552 } |
| 553 |
| 554 void _shutdown() { |
| 555 if (_streamQueue._isDone) { |
| 556 _completer.complete(); |
| 557 } else { |
| 558 _streamQueue._ensureListening(); |
| 559 _completer.complete(_streamQueue._dispose().cancel()); |
| 560 } |
| 561 } |
| 562 } |
| 563 |
| 564 /// Request for a [StreamQueue.rest] call. |
| 565 /// |
| 566 /// The request is always complete, it just waits in the request queue |
| 567 /// until all previous events are fulfilled, then it takes over the |
| 568 /// stream events subscription and creates a stream from it. |
| 569 class _RestRequest<T> implements _EventRequest { |
| 570 /// Completer for the stream returned by the `rest` call. |
| 571 final StreamCompleter _completer = new StreamCompleter<T>(); |
| 572 |
| 573 /// The [StreamQueue] object that has this request queued. |
| 574 /// |
| 575 /// When the event is completed, it needs to cancel the active subscription |
| 576 /// of the `StreamQueue` object, if any. |
| 577 final StreamQueue _streamQueue; |
| 578 |
| 579 _RestRequest(this._streamQueue); |
| 580 |
| 581 /// The stream which will contain the remaining events of [_streamQueue]. |
| 582 Stream<T> get stream => _completer.stream; |
| 583 |
| 584 bool addEvents(Queue<Result> events) { |
| 585 _completeStream(events); |
| 586 return true; |
| 587 } |
| 588 |
| 589 void close(Queue<Result> events) { |
| 590 _completeStream(events); |
| 591 } |
| 592 |
| 593 void _completeStream(Queue<Result> events) { |
| 594 if (events.isEmpty) { |
| 595 if (_streamQueue._isDone) { |
| 596 _completer.setEmpty(); |
| 597 } else { |
| 598 _completer.setSourceStream(_getRestStream()); |
| 599 } |
| 600 } else { |
| 601 // There are prefetched events which needs to be added before the |
| 602 // remaining stream. |
| 603 var controller = new StreamController<T>(); |
| 604 for (var event in events) { |
| 605 event.addTo(controller); |
| 606 } |
| 607 controller.addStream(_getRestStream(), cancelOnError: false) |
| 608 .whenComplete(controller.close); |
| 609 _completer.setSourceStream(controller.stream); |
| 610 } |
| 611 } |
| 612 |
| 613 /// Create a stream from the rest of [_streamQueue]'s subscription. |
| 614 Stream _getRestStream() { |
| 615 if (_streamQueue._isDone) { |
| 616 var controller = new StreamController<T>()..close(); |
| 617 return controller.stream; |
| 618 // TODO(lrn). Use the following when 1.11 is released. |
| 619 // return new Stream<T>.empty(); |
| 620 } |
| 621 if (_streamQueue._subscription == null) { |
| 622 return _streamQueue._sourceStream; |
| 623 } |
| 624 var subscription = _streamQueue._dispose(); |
| 625 subscription.resume(); |
| 626 return new SubscriptionStream<T>(subscription); |
| 627 } |
| 628 } |
| 629 |
| 630 /// Request for a [StreamQueue.hasNext] call. |
| 631 /// |
| 632 /// Completes the [future] with `true` if it sees any event, |
| 633 /// but doesn't consume the event. |
| 634 /// If the request is closed without seeing an event, then |
| 635 /// the [future] is completed with `false`. |
| 636 class _HasNextRequest<T> implements _EventRequest { |
| 637 final Completer _completer = new Completer<bool>(); |
| 638 |
| 639 Future<bool> get future => _completer.future; |
| 640 |
| 641 bool addEvents(Queue<Result> events) { |
| 642 if (events.isNotEmpty) { |
| 643 _completer.complete(true); |
| 644 return true; |
| 645 } |
| 646 return false; |
| 647 } |
| 648 |
| 649 void close(_) { |
| 650 _completer.complete(false); |
| 651 } |
| 652 } |
| 653 |
| 654 /// Request for a [StreamQueue.fork] call. |
| 655 class _ForkRequest<T> implements _EventRequest { |
| 656 /// Completer for the stream used by the queue by the `fork` call. |
| 657 StreamCompleter _completer; |
| 658 |
| 659 StreamQueue<T> queue; |
| 660 |
| 661 /// The [StreamQueue] object that has this request queued. |
| 662 final StreamQueue _streamQueue; |
| 663 |
| 664 _ForkRequest(this._streamQueue) { |
| 665 _completer = new StreamCompleter<T>(); |
| 666 queue = new StreamQueue<T>(_completer.stream); |
| 667 } |
| 668 |
| 669 bool addEvents(Queue<Result> events) { |
| 670 _completeStream(events); |
| 671 return true; |
| 672 } |
| 673 |
| 674 void close(Queue<Result> events) { |
| 675 _completeStream(events); |
| 676 } |
| 677 |
| 678 void _completeStream(Queue<Result> events) { |
| 679 if (events.isEmpty) { |
| 680 if (_streamQueue._isDone) { |
| 681 _completer.setEmpty(); |
| 682 } else { |
| 683 _completer.setSourceStream(_streamQueue._sourceStream.fork()); |
| 684 } |
| 685 } else { |
| 686 // There are prefetched events which need to be added before the |
| 687 // remaining stream. |
| 688 var controller = new StreamController<T>(); |
| 689 for (var event in events) { |
| 690 event.addTo(controller); |
| 691 } |
| 692 |
| 693 var fork = _streamQueue._sourceStream.fork(); |
| 694 controller.addStream(fork, cancelOnError: false) |
| 695 .whenComplete(controller.close); |
| 696 _completer.setSourceStream(controller.stream); |
| 697 } |
| 698 } |
| 699 } |
OLD | NEW |