| Index: packages/async/lib/src/stream_queue.dart
|
| diff --git a/packages/async/lib/src/stream_queue.dart b/packages/async/lib/src/stream_queue.dart
|
| index 09b3a75b2360a9adf2a18ad2715fe2fef08cc656..b9023ab91a9ff8279fd269e5b319fa2b0e20e6a4 100644
|
| --- a/packages/async/lib/src/stream_queue.dart
|
| +++ b/packages/async/lib/src/stream_queue.dart
|
| @@ -2,14 +2,16 @@
|
| // for details. All rights reserved. Use of this source code is governed by a
|
| // BSD-style license that can be found in the LICENSE file.
|
|
|
| -library async.stream_events;
|
| -
|
| import 'dart:async';
|
| import 'dart:collection';
|
|
|
| +import 'package:collection/collection.dart';
|
| +
|
| +import "cancelable_operation.dart";
|
| +import "result.dart";
|
| import "subscription_stream.dart";
|
| import "stream_completer.dart";
|
| -import "../result.dart";
|
| +import "stream_splitter.dart";
|
|
|
| /// An asynchronous pull-based interface for accessing stream events.
|
| ///
|
| @@ -87,8 +89,17 @@ abstract class StreamQueue<T> {
|
| /// Closing operations are [cancel] and [rest].
|
| bool _isClosed = false;
|
|
|
| + /// The number of events dispatched by this queue.
|
| + ///
|
| + /// This counts error events. It doesn't count done events, or events
|
| + /// dispatched to a stream returned by [rest].
|
| + int get eventsDispatched => _eventsReceived - _eventQueue.length;
|
| +
|
| + /// The number of events received by this queue.
|
| + var _eventsReceived = 0;
|
| +
|
| /// Queue of events not used by a request yet.
|
| - final Queue<Result> _eventQueue = new Queue();
|
| + final QueueList<Result> _eventQueue = new QueueList();
|
|
|
| /// Queue of pending requests.
|
| ///
|
| @@ -96,7 +107,7 @@ abstract class StreamQueue<T> {
|
| final Queue<_EventRequest> _requestQueue = new Queue();
|
|
|
| /// Create a `StreamQueue` of the events of [source].
|
| - factory StreamQueue(Stream source) = _StreamQueue<T>;
|
| + factory StreamQueue(Stream<T> source) = _StreamQueue<T>;
|
|
|
| StreamQueue._();
|
|
|
| @@ -120,6 +131,21 @@ abstract class StreamQueue<T> {
|
| throw _failClosed();
|
| }
|
|
|
| + /// Look at the next [count] data events without consuming them.
|
| + ///
|
| + /// Works like [take] except that the events are left in the queue.
|
| + /// If one of the next [count] events is an error, the returned future
|
| + /// completes with this error, and the error is still left in the queue.
|
| + Future<List<T>> lookAhead(int count) {
|
| + if (count < 0) throw new RangeError.range(count, 0, null, "count");
|
| + if (!_isClosed) {
|
| + var request = new _LookAheadRequest<T>(count);
|
| + _addRequest(request);
|
| + return request.future;
|
| + }
|
| + throw _failClosed();
|
| + }
|
| +
|
| /// Requests the next (yet unrequested) event from the stream.
|
| ///
|
| /// When the requested event arrives, the returned future is completed with
|
| @@ -143,6 +169,19 @@ abstract class StreamQueue<T> {
|
| throw _failClosed();
|
| }
|
|
|
| + /// Looks at the next (yet unrequested) event from the stream.
|
| + ///
|
| + /// Like [next] except that the event is not consumed.
|
| + /// If the next event is an error event, it stays in the queue.
|
| + Future<T> get peek {
|
| + if (!_isClosed) {
|
| + var nextRequest = new _PeekRequest<T>();
|
| + _addRequest(nextRequest);
|
| + return nextRequest.future;
|
| + }
|
| + throw _failClosed();
|
| + }
|
| +
|
| /// Returns a stream of all the remaning events of the source stream.
|
| ///
|
| /// All requested [next], [skip] or [take] operations are completed
|
| @@ -212,6 +251,123 @@ abstract class StreamQueue<T> {
|
| throw _failClosed();
|
| }
|
|
|
| + /// Requests a transaction that can conditionally consume events.
|
| + ///
|
| + /// The transaction can create copies of this queue at the current position
|
| + /// using [StreamQueueTransaction.newQueue]. Each of these queues is
|
| + /// independent of one another and of the parent queue. The transaction
|
| + /// finishes when one of two methods is called:
|
| + ///
|
| + /// * [StreamQueueTransaction.commit] updates the parent queue's position to
|
| + /// match that of one of the copies.
|
| + ///
|
| + /// * [StreamQueueTransaction.reject] causes the parent queue to continue as
|
| + /// though [startTransaction] hadn't been called.
|
| + ///
|
| + /// Until the transaction finishes, this queue won't emit any events.
|
| + ///
|
| + /// See also [withTransaction] and [cancelable].
|
| + ///
|
| + /// ```dart
|
| + /// /// Consumes all empty lines from the beginning of [lines].
|
| + /// Future consumeEmptyLines(StreamQueue<String> lines) async {
|
| + /// while (await lines.hasNext) {
|
| + /// var transaction = lines.startTransaction();
|
| + /// var queue = transaction.newQueue();
|
| + /// if ((await queue.next).isNotEmpty) {
|
| + /// transaction.reject();
|
| + /// return;
|
| + /// } else {
|
| + /// transaction.commit(queue);
|
| + /// }
|
| + /// }
|
| + /// }
|
| + /// ```
|
| + StreamQueueTransaction<T> startTransaction() {
|
| + if (_isClosed) throw _failClosed();
|
| +
|
| + var request = new _TransactionRequest(this);
|
| + _addRequest(request);
|
| + return request.transaction;
|
| + }
|
| +
|
| + /// Passes a copy of this queue to [callback], and updates this queue to match
|
| + /// the copy's position if [callback] returns `true`.
|
| + ///
|
| + /// This queue won't emit any events until [callback] returns. If it returns
|
| + /// `false`, this queue continues as though [withTransaction] hadn't been
|
| + /// called. If it throws an error, this updates this queue to match the copy's
|
| + /// position and throws the error from the returned `Future`.
|
| + ///
|
| + /// Returns the same value as [callback].
|
| + ///
|
| + /// See also [startTransaction] and [cancelable].
|
| + ///
|
| + /// ```dart
|
| + /// /// Consumes all empty lines from the beginning of [lines].
|
| + /// Future consumeEmptyLines(StreamQueue<String> lines) async {
|
| + /// while (await lines.hasNext) {
|
| + /// // Consume a line if it's empty, otherwise return.
|
| + /// if (!await lines.withTransaction(
|
| + /// (queue) async => (await queue.next).isEmpty)) {
|
| + /// return;
|
| + /// }
|
| + /// }
|
| + /// }
|
| + /// ```
|
| + Future<bool> withTransaction(Future<bool> callback(StreamQueue<T> queue)) {
|
| + var transaction = startTransaction();
|
| +
|
| + /// Avoid async/await to ensure that [startTransaction] is called
|
| + /// synchronously and so ends up in the right place in the request queue.
|
| + var queue = transaction.newQueue();
|
| + return callback(queue).then((result) {
|
| + if (result) {
|
| + transaction.commit(queue);
|
| + } else {
|
| + transaction.reject();
|
| + }
|
| + return result;
|
| + }, onError: (error) {
|
| + transaction.commit(queue);
|
| + throw error;
|
| + });
|
| + }
|
| +
|
| + /// Passes a copy of this queue to [callback], and updates this queue to match
|
| + /// the copy's position once [callback] completes.
|
| + ///
|
| + /// If the returned [CancelableOperation] is canceled, this queue instead
|
| + /// continues as though [cancelable] hadn't been called. Otherwise, it emits
|
| + /// the same value or error as [callback].
|
| + ///
|
| + /// See also [startTransaction] and [withTransaction].
|
| + ///
|
| + /// ```dart
|
| + /// final _stdinQueue = new StreamQueue(stdin);
|
| + ///
|
| + /// /// Returns an operation that completes when the user sends a line to
|
| + /// /// standard input.
|
| + /// ///
|
| + /// /// If the operation is canceled, stops waiting for user input.
|
| + /// CancelableOperation<String> nextStdinLine() =>
|
| + /// _stdinQueue.cancelable((queue) => queue.next);
|
| + /// ```
|
| + CancelableOperation<S> cancelable<S>(
|
| + Future<S> callback(StreamQueue<T> queue)) {
|
| + var transaction = startTransaction();
|
| + var completer = new CancelableCompleter<S>(onCancel: () {
|
| + transaction.reject();
|
| + });
|
| +
|
| + var queue = transaction.newQueue();
|
| + completer.complete(callback(queue).whenComplete(() {
|
| + if (!completer.isCanceled) transaction.commit(queue);
|
| + }));
|
| +
|
| + return completer.operation;
|
| + }
|
| +
|
| /// Cancels the underlying event source.
|
| ///
|
| /// If [immediate] is `false` (the default), the cancel operation waits until
|
| @@ -226,8 +382,8 @@ abstract class StreamQueue<T> {
|
| /// `cancel`.
|
| ///
|
| /// After calling `cancel`, no further events can be requested.
|
| - /// None of [next], [rest], [skip], [take] or [cancel] may be
|
| - /// called again.
|
| + /// None of [lookAhead], [next], [peek], [rest], [skip], [take] or [cancel]
|
| + /// may be called again.
|
| Future cancel({bool immediate: false}) {
|
| if (_isClosed) throw _failClosed();
|
| _isClosed = true;
|
| @@ -276,7 +432,7 @@ abstract class StreamQueue<T> {
|
| /// Can only be used by the very last request (the stream queue must
|
| /// be closed by that request).
|
| /// Only used by [rest].
|
| - Stream _extractStream();
|
| + Stream<T> _extractStream();
|
|
|
| /// Requests that the event source pauses events.
|
| ///
|
| @@ -302,6 +458,7 @@ abstract class StreamQueue<T> {
|
| /// Called when the event source adds a new data or error event.
|
| /// Always calls [_updateRequests] after adding.
|
| void _addResult(Result result) {
|
| + _eventsReceived++;
|
| _eventQueue.add(result);
|
| _updateRequests();
|
| }
|
| @@ -337,20 +494,19 @@ abstract class StreamQueue<T> {
|
| }
|
| }
|
|
|
| -
|
| /// The default implementation of [StreamQueue].
|
| ///
|
| /// This queue gets its events from a stream which is listened
|
| /// to when a request needs events.
|
| class _StreamQueue<T> extends StreamQueue<T> {
|
| /// Source of events.
|
| - final Stream _sourceStream;
|
| + final Stream<T> _sourceStream;
|
|
|
| /// Subscription on [_sourceStream] while listening for events.
|
| ///
|
| /// Set to subscription when listening, and set to `null` when the
|
| /// subscription is done (and [_isDone] is set to true).
|
| - StreamSubscription _subscription;
|
| + StreamSubscription<T> _subscription;
|
|
|
| _StreamQueue(this._sourceStream) : super._();
|
|
|
| @@ -363,20 +519,16 @@ class _StreamQueue<T> extends StreamQueue<T> {
|
| }
|
|
|
| void _ensureListening() {
|
| - assert(!_isDone);
|
| + if (_isDone) return;
|
| if (_subscription == null) {
|
| - _subscription =
|
| - _sourceStream.listen(
|
| - (data) {
|
| - _addResult(new Result.value(data));
|
| - },
|
| - onError: (error, StackTrace stackTrace) {
|
| - _addResult(new Result.error(error, stackTrace));
|
| - },
|
| - onDone: () {
|
| - _subscription = null;
|
| - this._close();
|
| - });
|
| + _subscription = _sourceStream.listen((data) {
|
| + _addResult(new Result.value(data));
|
| + }, onError: (error, StackTrace stackTrace) {
|
| + _addResult(new Result.error(error, stackTrace));
|
| + }, onDone: () {
|
| + _subscription = null;
|
| + this._close();
|
| + });
|
| } else {
|
| _subscription.resume();
|
| }
|
| @@ -391,6 +543,7 @@ class _StreamQueue<T> extends StreamQueue<T> {
|
| if (_isDone) {
|
| return new Stream<T>.empty();
|
| }
|
| + _isDone = true;
|
|
|
| if (_subscription == null) {
|
| return _sourceStream;
|
| @@ -398,7 +551,6 @@ class _StreamQueue<T> extends StreamQueue<T> {
|
|
|
| var subscription = _subscription;
|
| _subscription = null;
|
| - _isDone = true;
|
|
|
| var wasPaused = subscription.isPaused;
|
| var result = new SubscriptionStream<T>(subscription);
|
| @@ -409,6 +561,104 @@ class _StreamQueue<T> extends StreamQueue<T> {
|
| }
|
| }
|
|
|
| +/// A transaction on a [StreamQueue], created by [StreamQueue.startTransaction].
|
| +///
|
| +/// Copies of the parent queue may be created using [newQueue]. Calling [commit]
|
| +/// moves the parent queue to a copy's position, and calling [reject] causes it
|
| +/// to continue as though [StreamQueue.startTransaction] was never called.
|
| +class StreamQueueTransaction<T> {
|
| + /// The parent queue on which this transaction is active.
|
| + final StreamQueue<T> _parent;
|
| +
|
| + /// The splitter that produces copies of the parent queue's stream.
|
| + final StreamSplitter<T> _splitter;
|
| +
|
| + /// Queues created using [newQueue].
|
| + final _queues = new Set<StreamQueue>();
|
| +
|
| + /// Whether [commit] has been called.
|
| + var _committed = false;
|
| +
|
| + /// Whether [reject] has been called.
|
| + var _rejected = false;
|
| +
|
| + StreamQueueTransaction._(this._parent, Stream<T> source)
|
| + : _splitter = new StreamSplitter(source);
|
| +
|
| + /// Creates a new copy of the parent queue.
|
| + ///
|
| + /// This copy starts at the parent queue's position when
|
| + /// [StreamQueue.startTransaction] was called. Its position can be committed
|
| + /// to the parent queue using [commit].
|
| + StreamQueue<T> newQueue() {
|
| + var queue = new StreamQueue(_splitter.split());
|
| + _queues.add(queue);
|
| + return queue;
|
| + }
|
| +
|
| + /// Commits a queue created using [newQueue].
|
| + ///
|
| + /// The parent queue's position is updated to be the same as [queue]'s.
|
| + /// Further requests on all queues created by this transaction, including
|
| + /// [queue], will complete as though [cancel] were called with `immediate:
|
| + /// true`.
|
| + ///
|
| + /// Throws a [StateError] if [commit] or [reject] have already been called, or
|
| + /// if there are pending requests on [queue].
|
| + void commit(StreamQueue<T> queue) {
|
| + _assertActive();
|
| + if (!_queues.contains(queue)) {
|
| + throw new ArgumentError("Queue doesn't belong to this transaction.");
|
| + } else if (queue._requestQueue.isNotEmpty) {
|
| + throw new StateError("A queue with pending requests can't be committed.");
|
| + }
|
| + _committed = true;
|
| +
|
| + // Remove all events from the parent queue that were consumed by the
|
| + // child queue.
|
| + for (var j = 0; j < queue.eventsDispatched; j++) {
|
| + _parent._eventQueue.removeFirst();
|
| + }
|
| +
|
| + _done();
|
| + }
|
| +
|
| + /// Rejects this transaction without updating the parent queue.
|
| + ///
|
| + /// The parent will continue as though [StreamQueue.startTransaction] hadn't
|
| + /// been called. Further requests on all queues created by this transaction
|
| + /// will complete as though [cancel] were called with `immediate: true`.
|
| + ///
|
| + /// Throws a [StateError] if [commit] or [reject] have already been called.
|
| + void reject() {
|
| + _assertActive();
|
| + _rejected = true;
|
| + _done();
|
| + }
|
| +
|
| + // Cancels all [_queues], removes the [_TransactionRequest] from [_parent]'s
|
| + // request queue, and runs the next request.
|
| + void _done() {
|
| + _splitter.close();
|
| + for (var queue in _queues) {
|
| + queue._cancel();
|
| + }
|
| +
|
| + assert((_parent._requestQueue.first as _TransactionRequest).transaction ==
|
| + this);
|
| + _parent._requestQueue.removeFirst();
|
| + _parent._updateRequests();
|
| + }
|
| +
|
| + /// Throws a [StateError] if [accept] or [reject] has already been called.
|
| + void _assertActive() {
|
| + if (_committed) {
|
| + throw new StateError("This transaction has already been accepted.");
|
| + } else if (_rejected) {
|
| + throw new StateError("This transaction has already been rejected.");
|
| + }
|
| + }
|
| +}
|
|
|
| /// Request object that receives events when they arrive, until fulfilled.
|
| ///
|
| @@ -424,7 +674,7 @@ class _StreamQueue<T> extends StreamQueue<T> {
|
| ///
|
| /// The [close] method is also called immediately when the source stream
|
| /// is done.
|
| -abstract class _EventRequest {
|
| +abstract class _EventRequest<T> {
|
| /// Handle available events.
|
| ///
|
| /// The available events are provided as a queue. The `update` function
|
| @@ -445,30 +695,55 @@ abstract class _EventRequest {
|
| /// If the function returns `false` when the stream has already closed
|
| /// ([isDone] is true), then the request must call
|
| /// [StreamQueue._updateRequests] itself when it's ready to continue.
|
| - bool update(Queue<Result> events, bool isDone);
|
| + bool update(QueueList<Result<T>> events, bool isDone);
|
| }
|
|
|
| /// Request for a [StreamQueue.next] call.
|
| ///
|
| /// Completes the returned future when receiving the first event,
|
| /// and is then complete.
|
| -class _NextRequest<T> implements _EventRequest {
|
| +class _NextRequest<T> implements _EventRequest<T> {
|
| /// Completer for the future returned by [StreamQueue.next].
|
| - final Completer _completer;
|
| + final _completer = new Completer<T>();
|
|
|
| - _NextRequest() : _completer = new Completer<T>();
|
| + _NextRequest();
|
|
|
| Future<T> get future => _completer.future;
|
|
|
| - bool update(Queue<Result> events, bool isDone) {
|
| + bool update(QueueList<Result<T>> events, bool isDone) {
|
| if (events.isNotEmpty) {
|
| events.removeFirst().complete(_completer);
|
| return true;
|
| }
|
| if (isDone) {
|
| - var errorFuture =
|
| - new Future.sync(() => throw new StateError("No elements"));
|
| - _completer.complete(errorFuture);
|
| + _completer.completeError(
|
| + new StateError("No elements"), StackTrace.current);
|
| + return true;
|
| + }
|
| + return false;
|
| + }
|
| +}
|
| +
|
| +/// Request for a [StreamQueue.peek] call.
|
| +///
|
| +/// Completes the returned future when receiving the first event,
|
| +/// and is then complete, but doesn't consume the event.
|
| +class _PeekRequest<T> implements _EventRequest<T> {
|
| + /// Completer for the future returned by [StreamQueue.next].
|
| + final _completer = new Completer<T>();
|
| +
|
| + _PeekRequest();
|
| +
|
| + Future<T> get future => _completer.future;
|
| +
|
| + bool update(QueueList<Result<T>> events, bool isDone) {
|
| + if (events.isNotEmpty) {
|
| + events.first.complete(_completer);
|
| + return true;
|
| + }
|
| + if (isDone) {
|
| + _completer.completeError(
|
| + new StateError("No elements"), StackTrace.current);
|
| return true;
|
| }
|
| return false;
|
| @@ -476,9 +751,9 @@ class _NextRequest<T> implements _EventRequest {
|
| }
|
|
|
| /// Request for a [StreamQueue.skip] call.
|
| -class _SkipRequest implements _EventRequest {
|
| +class _SkipRequest<T> implements _EventRequest<T> {
|
| /// Completer for the future returned by the skip call.
|
| - final Completer _completer = new Completer<int>();
|
| + final _completer = new Completer<int>();
|
|
|
| /// Number of remaining events to skip.
|
| ///
|
| @@ -491,9 +766,9 @@ class _SkipRequest implements _EventRequest {
|
| _SkipRequest(this._eventsToSkip);
|
|
|
| /// The future completed when the correct number of events have been skipped.
|
| - Future get future => _completer.future;
|
| + Future<int> get future => _completer.future;
|
|
|
| - bool update(Queue<Result> events, bool isDone) {
|
| + bool update(QueueList<Result<T>> events, bool isDone) {
|
| while (_eventsToSkip > 0) {
|
| if (events.isEmpty) {
|
| if (isDone) break;
|
| @@ -503,7 +778,7 @@ class _SkipRequest implements _EventRequest {
|
|
|
| var event = events.removeFirst();
|
| if (event.isError) {
|
| - event.complete(_completer);
|
| + _completer.completeError(event.asError.error, event.asError.stackTrace);
|
| return true;
|
| }
|
| }
|
| @@ -512,13 +787,13 @@ class _SkipRequest implements _EventRequest {
|
| }
|
| }
|
|
|
| -/// Request for a [StreamQueue.take] call.
|
| -class _TakeRequest<T> implements _EventRequest {
|
| +/// Common superclass for [_TakeRequest] and [_LookAheadRequest].
|
| +abstract class _ListRequest<T> implements _EventRequest<T> {
|
| /// Completer for the future returned by the take call.
|
| - final Completer _completer;
|
| + final _completer = new Completer<List<T>>();
|
|
|
| /// List collecting events until enough have been seen.
|
| - final List _list = <T>[];
|
| + final _list = <T>[];
|
|
|
| /// Number of events to capture.
|
| ///
|
| @@ -526,24 +801,51 @@ class _TakeRequest<T> implements _EventRequest {
|
| /// this value.
|
| final int _eventsToTake;
|
|
|
| - _TakeRequest(this._eventsToTake) : _completer = new Completer<List<T>>();
|
| + _ListRequest(this._eventsToTake);
|
|
|
| /// The future completed when the correct number of events have been captured.
|
| - Future get future => _completer.future;
|
| + Future<List<T>> get future => _completer.future;
|
| +}
|
|
|
| - bool update(Queue<Result> events, bool isDone) {
|
| +/// Request for a [StreamQueue.take] call.
|
| +class _TakeRequest<T> extends _ListRequest<T> {
|
| + _TakeRequest(int eventsToTake) : super(eventsToTake);
|
| +
|
| + bool update(QueueList<Result<T>> events, bool isDone) {
|
| while (_list.length < _eventsToTake) {
|
| if (events.isEmpty) {
|
| if (isDone) break;
|
| return false;
|
| }
|
|
|
| - var result = events.removeFirst();
|
| - if (result.isError) {
|
| - result.complete(_completer);
|
| + var event = events.removeFirst();
|
| + if (event.isError) {
|
| + event.asError.complete(_completer);
|
| + return true;
|
| + }
|
| + _list.add(event.asValue.value);
|
| + }
|
| + _completer.complete(_list);
|
| + return true;
|
| + }
|
| +}
|
| +
|
| +/// Request for a [StreamQueue.lookAhead] call.
|
| +class _LookAheadRequest<T> extends _ListRequest<T> {
|
| + _LookAheadRequest(int eventsToTake) : super(eventsToTake);
|
| +
|
| + bool update(QueueList<Result<T>> events, bool isDone) {
|
| + while (_list.length < _eventsToTake) {
|
| + if (events.length == _list.length) {
|
| + if (isDone) break;
|
| + return false;
|
| + }
|
| + var event = events.elementAt(_list.length);
|
| + if (event.isError) {
|
| + event.asError.complete(_completer);
|
| return true;
|
| }
|
| - _list.add(result.asValue.value);
|
| + _list.add(event.asValue.value);
|
| }
|
| _completer.complete(_list);
|
| return true;
|
| @@ -555,11 +857,10 @@ class _TakeRequest<T> implements _EventRequest {
|
| /// The request needs no events, it just waits in the request queue
|
| /// until all previous events are fulfilled, then it cancels the stream queue
|
| /// source subscription.
|
| -class _CancelRequest implements _EventRequest {
|
| +class _CancelRequest<T> implements _EventRequest<T> {
|
| /// Completer for the future returned by the `cancel` call.
|
| - final Completer _completer = new Completer();
|
| + final _completer = new Completer();
|
|
|
| - /// The [StreamQueue] object that has this request queued.
|
| ///
|
| /// When the event is completed, it needs to cancel the active subscription
|
| /// of the `StreamQueue` object, if any.
|
| @@ -570,7 +871,7 @@ class _CancelRequest implements _EventRequest {
|
| /// The future completed when the cancel request is completed.
|
| Future get future => _completer.future;
|
|
|
| - bool update(Queue<Result> events, bool isDone) {
|
| + bool update(QueueList<Result<T>> events, bool isDone) {
|
| if (_streamQueue._isDone) {
|
| _completer.complete();
|
| } else {
|
| @@ -586,22 +887,22 @@ class _CancelRequest implements _EventRequest {
|
| /// The request is always complete, it just waits in the request queue
|
| /// until all previous events are fulfilled, then it takes over the
|
| /// stream events subscription and creates a stream from it.
|
| -class _RestRequest<T> implements _EventRequest {
|
| +class _RestRequest<T> implements _EventRequest<T> {
|
| /// Completer for the stream returned by the `rest` call.
|
| - final StreamCompleter _completer = new StreamCompleter<T>();
|
| + final _completer = new StreamCompleter<T>();
|
|
|
| /// The [StreamQueue] object that has this request queued.
|
| ///
|
| /// When the event is completed, it needs to cancel the active subscription
|
| /// of the `StreamQueue` object, if any.
|
| - final StreamQueue _streamQueue;
|
| + final StreamQueue<T> _streamQueue;
|
|
|
| _RestRequest(this._streamQueue);
|
|
|
| /// The stream which will contain the remaining events of [_streamQueue].
|
| Stream<T> get stream => _completer.stream;
|
|
|
| - bool update(Queue<Result> events, bool isDone) {
|
| + bool update(QueueList<Result<T>> events, bool isDone) {
|
| if (events.isEmpty) {
|
| if (_streamQueue._isDone) {
|
| _completer.setEmpty();
|
| @@ -615,8 +916,9 @@ class _RestRequest<T> implements _EventRequest {
|
| for (var event in events) {
|
| event.addTo(controller);
|
| }
|
| - controller.addStream(_streamQueue._extractStream(), cancelOnError: false)
|
| - .whenComplete(controller.close);
|
| + controller
|
| + .addStream(_streamQueue._extractStream(), cancelOnError: false)
|
| + .whenComplete(controller.close);
|
| _completer.setSourceStream(controller.stream);
|
| }
|
| return true;
|
| @@ -629,12 +931,12 @@ class _RestRequest<T> implements _EventRequest {
|
| /// but doesn't consume the event.
|
| /// If the request is closed without seeing an event, then
|
| /// the [future] is completed with `false`.
|
| -class _HasNextRequest<T> implements _EventRequest {
|
| - final Completer _completer = new Completer<bool>();
|
| +class _HasNextRequest<T> implements _EventRequest<T> {
|
| + final _completer = new Completer<bool>();
|
|
|
| Future<bool> get future => _completer.future;
|
|
|
| - bool update(Queue<Result> events, bool isDone) {
|
| + bool update(QueueList<Result<T>> events, bool isDone) {
|
| if (events.isNotEmpty) {
|
| _completer.complete(true);
|
| return true;
|
| @@ -646,3 +948,33 @@ class _HasNextRequest<T> implements _EventRequest {
|
| return false;
|
| }
|
| }
|
| +
|
| +/// Request for a [StreamQueue.startTransaction] call.
|
| +///
|
| +/// This request isn't complete until the user calls
|
| +/// [StreamQueueTransaction.commit] or [StreamQueueTransaction.reject], at which
|
| +/// point it manually removes itself from the request queue and calls
|
| +/// [StreamQueue._updateRequests].
|
| +class _TransactionRequest<T> implements _EventRequest<T> {
|
| + /// The transaction created by this request.
|
| + StreamQueueTransaction<T> get transaction => _transaction;
|
| + StreamQueueTransaction<T> _transaction;
|
| +
|
| + /// The controller that passes events to [transaction].
|
| + final _controller = new StreamController<T>(sync: true);
|
| +
|
| + /// The number of events passed to [_controller] so far.
|
| + var _eventsSent = 0;
|
| +
|
| + _TransactionRequest(StreamQueue<T> parent) {
|
| + _transaction = new StreamQueueTransaction._(parent, _controller.stream);
|
| + }
|
| +
|
| + bool update(QueueList<Result<T>> events, bool isDone) {
|
| + while (_eventsSent < events.length) {
|
| + events[_eventsSent++].addTo(_controller);
|
| + }
|
| + if (isDone && !_controller.isClosed) _controller.close();
|
| + return false;
|
| + }
|
| +}
|
|
|