| OLD | NEW |
| 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 library async.stream_events; | 5 library async.stream_events; |
| 6 | 6 |
| 7 import 'dart:async'; | 7 import 'dart:async'; |
| 8 import 'dart:collection'; | 8 import 'dart:collection'; |
| 9 | 9 |
| 10 import "subscription_stream.dart"; | 10 import "subscription_stream.dart"; |
| (...skipping 268 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 279 if (request.addEvents(_eventQueue)) return; | 279 if (request.addEvents(_eventQueue)) return; |
| 280 _ensureListening(); | 280 _ensureListening(); |
| 281 } | 281 } |
| 282 _requestQueue.add(request); | 282 _requestQueue.add(request); |
| 283 | 283 |
| 284 } | 284 } |
| 285 | 285 |
| 286 /// Ensures that we are listening on events from [_sourceStream]. | 286 /// Ensures that we are listening on events from [_sourceStream]. |
| 287 /// | 287 /// |
| 288 /// Resumes subscription on [_sourceStream], or creates it if necessary. | 288 /// Resumes subscription on [_sourceStream], or creates it if necessary. |
| 289 StreamSubscription _ensureListening() { | 289 void _ensureListening() { |
| 290 assert(!_isDone); | 290 assert(!_isDone); |
| 291 if (_subscription == null) { | 291 if (_subscription == null) { |
| 292 _subscription = | 292 _subscription = |
| 293 _sourceStream.listen(_onData, onError: _onError, onDone: _onDone); | 293 _sourceStream.listen(_onData, onError: _onError, onDone: _onDone); |
| 294 } else { | 294 } else { |
| 295 _subscription.resume(); | 295 _subscription.resume(); |
| 296 } | 296 } |
| 297 } | 297 } |
| 298 | 298 |
| 299 /// Removes all requests and closes them. | 299 /// Removes all requests and closes them. |
| (...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 347 /// | 347 /// |
| 348 /// Events from the source stream are sent to the first request in the | 348 /// Events from the source stream are sent to the first request in the |
| 349 /// queue until it reports itself as [isComplete]. | 349 /// queue until it reports itself as [isComplete]. |
| 350 /// | 350 /// |
| 351 /// When the first request in the queue `isComplete`, either when becoming | 351 /// When the first request in the queue `isComplete`, either when becoming |
| 352 /// the first request or after receiving an event, its [close] methods is | 352 /// the first request or after receiving an event, its [close] methods is |
| 353 /// called. | 353 /// called. |
| 354 /// | 354 /// |
| 355 /// The [close] method is also called immediately when the source stream | 355 /// The [close] method is also called immediately when the source stream |
| 356 /// is done. | 356 /// is done. |
| 357 abstract class _EventRequest implements EventSink { | 357 abstract class _EventRequest { |
| 358 /// Handle available events. | 358 /// Handle available events. |
| 359 /// | 359 /// |
| 360 /// The available events are provided as a queue. The `addEvents` function | 360 /// The available events are provided as a queue. The `addEvents` function |
| 361 /// should only remove events from the front of the event queue, e.g., | 361 /// should only remove events from the front of the event queue, e.g., |
| 362 /// using [removeFirst]. | 362 /// using [removeFirst]. |
| 363 /// | 363 /// |
| 364 /// Returns `true` if the request is completed, or `false` if it needs | 364 /// Returns `true` if the request is completed, or `false` if it needs |
| 365 /// more events. | 365 /// more events. |
| 366 /// The call may keep events in the queue until the requeust is complete, | 366 /// The call may keep events in the queue until the requeust is complete, |
| 367 /// or it may remove them immediately. | 367 /// or it may remove them immediately. |
| (...skipping 96 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 464 /// | 464 /// |
| 465 /// The request [isComplete] when the length of [_list] reaches | 465 /// The request [isComplete] when the length of [_list] reaches |
| 466 /// this value. | 466 /// this value. |
| 467 final int _eventsToTake; | 467 final int _eventsToTake; |
| 468 | 468 |
| 469 _TakeRequest(this._eventsToTake) : _completer = new Completer<List<T>>(); | 469 _TakeRequest(this._eventsToTake) : _completer = new Completer<List<T>>(); |
| 470 | 470 |
| 471 /// The future completed when the correct number of events have been captured. | 471 /// The future completed when the correct number of events have been captured. |
| 472 Future get future => _completer.future; | 472 Future get future => _completer.future; |
| 473 | 473 |
| 474 bool addEvents(Queue<Events> events) { | 474 bool addEvents(Queue<Result> events) { |
| 475 while (_list.length < _eventsToTake) { | 475 while (_list.length < _eventsToTake) { |
| 476 if (events.isEmpty) return false; | 476 if (events.isEmpty) return false; |
| 477 var result = events.removeFirst(); | 477 var result = events.removeFirst(); |
| 478 if (result.isError) { | 478 if (result.isError) { |
| 479 result.complete(_completer); | 479 result.complete(_completer); |
| 480 return true; | 480 return true; |
| 481 } | 481 } |
| 482 _list.add(result.asValue.value); | 482 _list.add(result.asValue.value); |
| 483 } | 483 } |
| 484 _completer.complete(_list); | 484 _completer.complete(_list); |
| 485 return true; | 485 return true; |
| 486 } | 486 } |
| 487 | 487 |
| 488 void close(Queue<Events> events) { | 488 void close(Queue<Result> events) { |
| 489 _completer.complete(_list); | 489 _completer.complete(_list); |
| 490 } | 490 } |
| 491 } | 491 } |
| 492 | 492 |
| 493 /// Request for a [StreamQueue.cancel] call. | 493 /// Request for a [StreamQueue.cancel] call. |
| 494 /// | 494 /// |
| 495 /// The request needs no events, it just waits in the request queue | 495 /// The request needs no events, it just waits in the request queue |
| 496 /// until all previous events are fulfilled, then it cancels the stream queue | 496 /// until all previous events are fulfilled, then it cancels the stream queue |
| 497 /// source subscription. | 497 /// source subscription. |
| 498 class _CancelRequest implements _EventRequest { | 498 class _CancelRequest implements _EventRequest { |
| (...skipping 111 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 610 _completer.complete(true); | 610 _completer.complete(true); |
| 611 return true; | 611 return true; |
| 612 } | 612 } |
| 613 return false; | 613 return false; |
| 614 } | 614 } |
| 615 | 615 |
| 616 void close(_) { | 616 void close(_) { |
| 617 _completer.complete(false); | 617 _completer.complete(false); |
| 618 } | 618 } |
| 619 } | 619 } |
| OLD | NEW |