OLD | NEW |
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 library async.stream_events; | 5 library async.stream_events; |
6 | 6 |
7 import 'dart:async'; | 7 import 'dart:async'; |
8 import 'dart:collection'; | 8 import 'dart:collection'; |
9 | 9 |
10 import "subscription_stream.dart"; | 10 import "subscription_stream.dart"; |
(...skipping 268 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
279 if (request.addEvents(_eventQueue)) return; | 279 if (request.addEvents(_eventQueue)) return; |
280 _ensureListening(); | 280 _ensureListening(); |
281 } | 281 } |
282 _requestQueue.add(request); | 282 _requestQueue.add(request); |
283 | 283 |
284 } | 284 } |
285 | 285 |
286 /// Ensures that we are listening on events from [_sourceStream]. | 286 /// Ensures that we are listening on events from [_sourceStream]. |
287 /// | 287 /// |
288 /// Resumes subscription on [_sourceStream], or creates it if necessary. | 288 /// Resumes subscription on [_sourceStream], or creates it if necessary. |
289 StreamSubscription _ensureListening() { | 289 void _ensureListening() { |
290 assert(!_isDone); | 290 assert(!_isDone); |
291 if (_subscription == null) { | 291 if (_subscription == null) { |
292 _subscription = | 292 _subscription = |
293 _sourceStream.listen(_onData, onError: _onError, onDone: _onDone); | 293 _sourceStream.listen(_onData, onError: _onError, onDone: _onDone); |
294 } else { | 294 } else { |
295 _subscription.resume(); | 295 _subscription.resume(); |
296 } | 296 } |
297 } | 297 } |
298 | 298 |
299 /// Removes all requests and closes them. | 299 /// Removes all requests and closes them. |
(...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
347 /// | 347 /// |
348 /// Events from the source stream are sent to the first request in the | 348 /// Events from the source stream are sent to the first request in the |
349 /// queue until it reports itself as [isComplete]. | 349 /// queue until it reports itself as [isComplete]. |
350 /// | 350 /// |
351 /// When the first request in the queue `isComplete`, either when becoming | 351 /// When the first request in the queue `isComplete`, either when becoming |
352 /// the first request or after receiving an event, its [close] methods is | 352 /// the first request or after receiving an event, its [close] methods is |
353 /// called. | 353 /// called. |
354 /// | 354 /// |
355 /// The [close] method is also called immediately when the source stream | 355 /// The [close] method is also called immediately when the source stream |
356 /// is done. | 356 /// is done. |
357 abstract class _EventRequest implements EventSink { | 357 abstract class _EventRequest { |
358 /// Handle available events. | 358 /// Handle available events. |
359 /// | 359 /// |
360 /// The available events are provided as a queue. The `addEvents` function | 360 /// The available events are provided as a queue. The `addEvents` function |
361 /// should only remove events from the front of the event queue, e.g., | 361 /// should only remove events from the front of the event queue, e.g., |
362 /// using [removeFirst]. | 362 /// using [removeFirst]. |
363 /// | 363 /// |
364 /// Returns `true` if the request is completed, or `false` if it needs | 364 /// Returns `true` if the request is completed, or `false` if it needs |
365 /// more events. | 365 /// more events. |
366 /// The call may keep events in the queue until the requeust is complete, | 366 /// The call may keep events in the queue until the requeust is complete, |
367 /// or it may remove them immediately. | 367 /// or it may remove them immediately. |
(...skipping 96 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
464 /// | 464 /// |
465 /// The request [isComplete] when the length of [_list] reaches | 465 /// The request [isComplete] when the length of [_list] reaches |
466 /// this value. | 466 /// this value. |
467 final int _eventsToTake; | 467 final int _eventsToTake; |
468 | 468 |
469 _TakeRequest(this._eventsToTake) : _completer = new Completer<List<T>>(); | 469 _TakeRequest(this._eventsToTake) : _completer = new Completer<List<T>>(); |
470 | 470 |
471 /// The future completed when the correct number of events have been captured. | 471 /// The future completed when the correct number of events have been captured. |
472 Future get future => _completer.future; | 472 Future get future => _completer.future; |
473 | 473 |
474 bool addEvents(Queue<Events> events) { | 474 bool addEvents(Queue<Result> events) { |
475 while (_list.length < _eventsToTake) { | 475 while (_list.length < _eventsToTake) { |
476 if (events.isEmpty) return false; | 476 if (events.isEmpty) return false; |
477 var result = events.removeFirst(); | 477 var result = events.removeFirst(); |
478 if (result.isError) { | 478 if (result.isError) { |
479 result.complete(_completer); | 479 result.complete(_completer); |
480 return true; | 480 return true; |
481 } | 481 } |
482 _list.add(result.asValue.value); | 482 _list.add(result.asValue.value); |
483 } | 483 } |
484 _completer.complete(_list); | 484 _completer.complete(_list); |
485 return true; | 485 return true; |
486 } | 486 } |
487 | 487 |
488 void close(Queue<Events> events) { | 488 void close(Queue<Result> events) { |
489 _completer.complete(_list); | 489 _completer.complete(_list); |
490 } | 490 } |
491 } | 491 } |
492 | 492 |
493 /// Request for a [StreamQueue.cancel] call. | 493 /// Request for a [StreamQueue.cancel] call. |
494 /// | 494 /// |
495 /// The request needs no events, it just waits in the request queue | 495 /// The request needs no events, it just waits in the request queue |
496 /// until all previous events are fulfilled, then it cancels the stream queue | 496 /// until all previous events are fulfilled, then it cancels the stream queue |
497 /// source subscription. | 497 /// source subscription. |
498 class _CancelRequest implements _EventRequest { | 498 class _CancelRequest implements _EventRequest { |
(...skipping 111 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
610 _completer.complete(true); | 610 _completer.complete(true); |
611 return true; | 611 return true; |
612 } | 612 } |
613 return false; | 613 return false; |
614 } | 614 } |
615 | 615 |
616 void close(_) { | 616 void close(_) { |
617 _completer.complete(false); | 617 _completer.complete(false); |
618 } | 618 } |
619 } | 619 } |
OLD | NEW |