Index: packages/async/lib/src/stream_queue.dart |
diff --git a/packages/async/lib/src/stream_queue.dart b/packages/async/lib/src/stream_queue.dart |
index 09b3a75b2360a9adf2a18ad2715fe2fef08cc656..b9023ab91a9ff8279fd269e5b319fa2b0e20e6a4 100644 |
--- a/packages/async/lib/src/stream_queue.dart |
+++ b/packages/async/lib/src/stream_queue.dart |
@@ -2,14 +2,16 @@ |
// for details. All rights reserved. Use of this source code is governed by a |
// BSD-style license that can be found in the LICENSE file. |
-library async.stream_events; |
- |
import 'dart:async'; |
import 'dart:collection'; |
+import 'package:collection/collection.dart'; |
+ |
+import "cancelable_operation.dart"; |
+import "result.dart"; |
import "subscription_stream.dart"; |
import "stream_completer.dart"; |
-import "../result.dart"; |
+import "stream_splitter.dart"; |
/// An asynchronous pull-based interface for accessing stream events. |
/// |
@@ -87,8 +89,17 @@ abstract class StreamQueue<T> { |
/// Closing operations are [cancel] and [rest]. |
bool _isClosed = false; |
+ /// The number of events dispatched by this queue. |
+ /// |
+ /// This counts error events. It doesn't count done events, or events |
+ /// dispatched to a stream returned by [rest]. |
+ int get eventsDispatched => _eventsReceived - _eventQueue.length; |
+ |
+ /// The number of events received by this queue. |
+ var _eventsReceived = 0; |
+ |
/// Queue of events not used by a request yet. |
- final Queue<Result> _eventQueue = new Queue(); |
+ final QueueList<Result> _eventQueue = new QueueList(); |
/// Queue of pending requests. |
/// |
@@ -96,7 +107,7 @@ abstract class StreamQueue<T> { |
final Queue<_EventRequest> _requestQueue = new Queue(); |
/// Create a `StreamQueue` of the events of [source]. |
- factory StreamQueue(Stream source) = _StreamQueue<T>; |
+ factory StreamQueue(Stream<T> source) = _StreamQueue<T>; |
StreamQueue._(); |
@@ -120,6 +131,21 @@ abstract class StreamQueue<T> { |
throw _failClosed(); |
} |
+ /// Look at the next [count] data events without consuming them. |
+ /// |
+ /// Works like [take] except that the events are left in the queue. |
+ /// If one of the next [count] events is an error, the returned future |
+ /// completes with this error, and the error is still left in the queue. |
+ Future<List<T>> lookAhead(int count) { |
+ if (count < 0) throw new RangeError.range(count, 0, null, "count"); |
+ if (!_isClosed) { |
+ var request = new _LookAheadRequest<T>(count); |
+ _addRequest(request); |
+ return request.future; |
+ } |
+ throw _failClosed(); |
+ } |
+ |
/// Requests the next (yet unrequested) event from the stream. |
/// |
/// When the requested event arrives, the returned future is completed with |
@@ -143,6 +169,19 @@ abstract class StreamQueue<T> { |
throw _failClosed(); |
} |
+ /// Looks at the next (yet unrequested) event from the stream. |
+ /// |
+ /// Like [next] except that the event is not consumed. |
+ /// If the next event is an error event, it stays in the queue. |
+ Future<T> get peek { |
+ if (!_isClosed) { |
+ var nextRequest = new _PeekRequest<T>(); |
+ _addRequest(nextRequest); |
+ return nextRequest.future; |
+ } |
+ throw _failClosed(); |
+ } |
+ |
/// Returns a stream of all the remaning events of the source stream. |
/// |
/// All requested [next], [skip] or [take] operations are completed |
@@ -212,6 +251,123 @@ abstract class StreamQueue<T> { |
throw _failClosed(); |
} |
+ /// Requests a transaction that can conditionally consume events. |
+ /// |
+ /// The transaction can create copies of this queue at the current position |
+ /// using [StreamQueueTransaction.newQueue]. Each of these queues is |
+ /// independent of one another and of the parent queue. The transaction |
+ /// finishes when one of two methods is called: |
+ /// |
+ /// * [StreamQueueTransaction.commit] updates the parent queue's position to |
+ /// match that of one of the copies. |
+ /// |
+ /// * [StreamQueueTransaction.reject] causes the parent queue to continue as |
+ /// though [startTransaction] hadn't been called. |
+ /// |
+ /// Until the transaction finishes, this queue won't emit any events. |
+ /// |
+ /// See also [withTransaction] and [cancelable]. |
+ /// |
+ /// ```dart |
+ /// /// Consumes all empty lines from the beginning of [lines]. |
+ /// Future consumeEmptyLines(StreamQueue<String> lines) async { |
+ /// while (await lines.hasNext) { |
+ /// var transaction = lines.startTransaction(); |
+ /// var queue = transaction.newQueue(); |
+ /// if ((await queue.next).isNotEmpty) { |
+ /// transaction.reject(); |
+ /// return; |
+ /// } else { |
+ /// transaction.commit(queue); |
+ /// } |
+ /// } |
+ /// } |
+ /// ``` |
+ StreamQueueTransaction<T> startTransaction() { |
+ if (_isClosed) throw _failClosed(); |
+ |
+ var request = new _TransactionRequest(this); |
+ _addRequest(request); |
+ return request.transaction; |
+ } |
+ |
+ /// Passes a copy of this queue to [callback], and updates this queue to match |
+ /// the copy's position if [callback] returns `true`. |
+ /// |
+ /// This queue won't emit any events until [callback] returns. If it returns |
+ /// `false`, this queue continues as though [withTransaction] hadn't been |
+ /// called. If it throws an error, this updates this queue to match the copy's |
+ /// position and throws the error from the returned `Future`. |
+ /// |
+ /// Returns the same value as [callback]. |
+ /// |
+ /// See also [startTransaction] and [cancelable]. |
+ /// |
+ /// ```dart |
+ /// /// Consumes all empty lines from the beginning of [lines]. |
+ /// Future consumeEmptyLines(StreamQueue<String> lines) async { |
+ /// while (await lines.hasNext) { |
+ /// // Consume a line if it's empty, otherwise return. |
+ /// if (!await lines.withTransaction( |
+ /// (queue) async => (await queue.next).isEmpty)) { |
+ /// return; |
+ /// } |
+ /// } |
+ /// } |
+ /// ``` |
+ Future<bool> withTransaction(Future<bool> callback(StreamQueue<T> queue)) { |
+ var transaction = startTransaction(); |
+ |
+ /// Avoid async/await to ensure that [startTransaction] is called |
+ /// synchronously and so ends up in the right place in the request queue. |
+ var queue = transaction.newQueue(); |
+ return callback(queue).then((result) { |
+ if (result) { |
+ transaction.commit(queue); |
+ } else { |
+ transaction.reject(); |
+ } |
+ return result; |
+ }, onError: (error) { |
+ transaction.commit(queue); |
+ throw error; |
+ }); |
+ } |
+ |
+ /// Passes a copy of this queue to [callback], and updates this queue to match |
+ /// the copy's position once [callback] completes. |
+ /// |
+ /// If the returned [CancelableOperation] is canceled, this queue instead |
+ /// continues as though [cancelable] hadn't been called. Otherwise, it emits |
+ /// the same value or error as [callback]. |
+ /// |
+ /// See also [startTransaction] and [withTransaction]. |
+ /// |
+ /// ```dart |
+ /// final _stdinQueue = new StreamQueue(stdin); |
+ /// |
+ /// /// Returns an operation that completes when the user sends a line to |
+ /// /// standard input. |
+ /// /// |
+ /// /// If the operation is canceled, stops waiting for user input. |
+ /// CancelableOperation<String> nextStdinLine() => |
+ /// _stdinQueue.cancelable((queue) => queue.next); |
+ /// ``` |
+ CancelableOperation<S> cancelable<S>( |
+ Future<S> callback(StreamQueue<T> queue)) { |
+ var transaction = startTransaction(); |
+ var completer = new CancelableCompleter<S>(onCancel: () { |
+ transaction.reject(); |
+ }); |
+ |
+ var queue = transaction.newQueue(); |
+ completer.complete(callback(queue).whenComplete(() { |
+ if (!completer.isCanceled) transaction.commit(queue); |
+ })); |
+ |
+ return completer.operation; |
+ } |
+ |
/// Cancels the underlying event source. |
/// |
/// If [immediate] is `false` (the default), the cancel operation waits until |
@@ -226,8 +382,8 @@ abstract class StreamQueue<T> { |
/// `cancel`. |
/// |
/// After calling `cancel`, no further events can be requested. |
- /// None of [next], [rest], [skip], [take] or [cancel] may be |
- /// called again. |
+ /// None of [lookAhead], [next], [peek], [rest], [skip], [take] or [cancel] |
+ /// may be called again. |
Future cancel({bool immediate: false}) { |
if (_isClosed) throw _failClosed(); |
_isClosed = true; |
@@ -276,7 +432,7 @@ abstract class StreamQueue<T> { |
/// Can only be used by the very last request (the stream queue must |
/// be closed by that request). |
/// Only used by [rest]. |
- Stream _extractStream(); |
+ Stream<T> _extractStream(); |
/// Requests that the event source pauses events. |
/// |
@@ -302,6 +458,7 @@ abstract class StreamQueue<T> { |
/// Called when the event source adds a new data or error event. |
/// Always calls [_updateRequests] after adding. |
void _addResult(Result result) { |
+ _eventsReceived++; |
_eventQueue.add(result); |
_updateRequests(); |
} |
@@ -337,20 +494,19 @@ abstract class StreamQueue<T> { |
} |
} |
- |
/// The default implementation of [StreamQueue]. |
/// |
/// This queue gets its events from a stream which is listened |
/// to when a request needs events. |
class _StreamQueue<T> extends StreamQueue<T> { |
/// Source of events. |
- final Stream _sourceStream; |
+ final Stream<T> _sourceStream; |
/// Subscription on [_sourceStream] while listening for events. |
/// |
/// Set to subscription when listening, and set to `null` when the |
/// subscription is done (and [_isDone] is set to true). |
- StreamSubscription _subscription; |
+ StreamSubscription<T> _subscription; |
_StreamQueue(this._sourceStream) : super._(); |
@@ -363,20 +519,16 @@ class _StreamQueue<T> extends StreamQueue<T> { |
} |
void _ensureListening() { |
- assert(!_isDone); |
+ if (_isDone) return; |
if (_subscription == null) { |
- _subscription = |
- _sourceStream.listen( |
- (data) { |
- _addResult(new Result.value(data)); |
- }, |
- onError: (error, StackTrace stackTrace) { |
- _addResult(new Result.error(error, stackTrace)); |
- }, |
- onDone: () { |
- _subscription = null; |
- this._close(); |
- }); |
+ _subscription = _sourceStream.listen((data) { |
+ _addResult(new Result.value(data)); |
+ }, onError: (error, StackTrace stackTrace) { |
+ _addResult(new Result.error(error, stackTrace)); |
+ }, onDone: () { |
+ _subscription = null; |
+ this._close(); |
+ }); |
} else { |
_subscription.resume(); |
} |
@@ -391,6 +543,7 @@ class _StreamQueue<T> extends StreamQueue<T> { |
if (_isDone) { |
return new Stream<T>.empty(); |
} |
+ _isDone = true; |
if (_subscription == null) { |
return _sourceStream; |
@@ -398,7 +551,6 @@ class _StreamQueue<T> extends StreamQueue<T> { |
var subscription = _subscription; |
_subscription = null; |
- _isDone = true; |
var wasPaused = subscription.isPaused; |
var result = new SubscriptionStream<T>(subscription); |
@@ -409,6 +561,104 @@ class _StreamQueue<T> extends StreamQueue<T> { |
} |
} |
+/// A transaction on a [StreamQueue], created by [StreamQueue.startTransaction]. |
+/// |
+/// Copies of the parent queue may be created using [newQueue]. Calling [commit] |
+/// moves the parent queue to a copy's position, and calling [reject] causes it |
+/// to continue as though [StreamQueue.startTransaction] was never called. |
+class StreamQueueTransaction<T> { |
+ /// The parent queue on which this transaction is active. |
+ final StreamQueue<T> _parent; |
+ |
+ /// The splitter that produces copies of the parent queue's stream. |
+ final StreamSplitter<T> _splitter; |
+ |
+ /// Queues created using [newQueue]. |
+ final _queues = new Set<StreamQueue>(); |
+ |
+ /// Whether [commit] has been called. |
+ var _committed = false; |
+ |
+ /// Whether [reject] has been called. |
+ var _rejected = false; |
+ |
+ StreamQueueTransaction._(this._parent, Stream<T> source) |
+ : _splitter = new StreamSplitter(source); |
+ |
+ /// Creates a new copy of the parent queue. |
+ /// |
+ /// This copy starts at the parent queue's position when |
+ /// [StreamQueue.startTransaction] was called. Its position can be committed |
+ /// to the parent queue using [commit]. |
+ StreamQueue<T> newQueue() { |
+ var queue = new StreamQueue(_splitter.split()); |
+ _queues.add(queue); |
+ return queue; |
+ } |
+ |
+ /// Commits a queue created using [newQueue]. |
+ /// |
+ /// The parent queue's position is updated to be the same as [queue]'s. |
+ /// Further requests on all queues created by this transaction, including |
+ /// [queue], will complete as though [cancel] were called with `immediate: |
+ /// true`. |
+ /// |
+ /// Throws a [StateError] if [commit] or [reject] have already been called, or |
+ /// if there are pending requests on [queue]. |
+ void commit(StreamQueue<T> queue) { |
+ _assertActive(); |
+ if (!_queues.contains(queue)) { |
+ throw new ArgumentError("Queue doesn't belong to this transaction."); |
+ } else if (queue._requestQueue.isNotEmpty) { |
+ throw new StateError("A queue with pending requests can't be committed."); |
+ } |
+ _committed = true; |
+ |
+ // Remove all events from the parent queue that were consumed by the |
+ // child queue. |
+ for (var j = 0; j < queue.eventsDispatched; j++) { |
+ _parent._eventQueue.removeFirst(); |
+ } |
+ |
+ _done(); |
+ } |
+ |
+ /// Rejects this transaction without updating the parent queue. |
+ /// |
+ /// The parent will continue as though [StreamQueue.startTransaction] hadn't |
+ /// been called. Further requests on all queues created by this transaction |
+ /// will complete as though [cancel] were called with `immediate: true`. |
+ /// |
+ /// Throws a [StateError] if [commit] or [reject] have already been called. |
+ void reject() { |
+ _assertActive(); |
+ _rejected = true; |
+ _done(); |
+ } |
+ |
+ // Cancels all [_queues], removes the [_TransactionRequest] from [_parent]'s |
+ // request queue, and runs the next request. |
+ void _done() { |
+ _splitter.close(); |
+ for (var queue in _queues) { |
+ queue._cancel(); |
+ } |
+ |
+ assert((_parent._requestQueue.first as _TransactionRequest).transaction == |
+ this); |
+ _parent._requestQueue.removeFirst(); |
+ _parent._updateRequests(); |
+ } |
+ |
+ /// Throws a [StateError] if [accept] or [reject] has already been called. |
+ void _assertActive() { |
+ if (_committed) { |
+ throw new StateError("This transaction has already been accepted."); |
+ } else if (_rejected) { |
+ throw new StateError("This transaction has already been rejected."); |
+ } |
+ } |
+} |
/// Request object that receives events when they arrive, until fulfilled. |
/// |
@@ -424,7 +674,7 @@ class _StreamQueue<T> extends StreamQueue<T> { |
/// |
/// The [close] method is also called immediately when the source stream |
/// is done. |
-abstract class _EventRequest { |
+abstract class _EventRequest<T> { |
/// Handle available events. |
/// |
/// The available events are provided as a queue. The `update` function |
@@ -445,30 +695,55 @@ abstract class _EventRequest { |
/// If the function returns `false` when the stream has already closed |
/// ([isDone] is true), then the request must call |
/// [StreamQueue._updateRequests] itself when it's ready to continue. |
- bool update(Queue<Result> events, bool isDone); |
+ bool update(QueueList<Result<T>> events, bool isDone); |
} |
/// Request for a [StreamQueue.next] call. |
/// |
/// Completes the returned future when receiving the first event, |
/// and is then complete. |
-class _NextRequest<T> implements _EventRequest { |
+class _NextRequest<T> implements _EventRequest<T> { |
/// Completer for the future returned by [StreamQueue.next]. |
- final Completer _completer; |
+ final _completer = new Completer<T>(); |
- _NextRequest() : _completer = new Completer<T>(); |
+ _NextRequest(); |
Future<T> get future => _completer.future; |
- bool update(Queue<Result> events, bool isDone) { |
+ bool update(QueueList<Result<T>> events, bool isDone) { |
if (events.isNotEmpty) { |
events.removeFirst().complete(_completer); |
return true; |
} |
if (isDone) { |
- var errorFuture = |
- new Future.sync(() => throw new StateError("No elements")); |
- _completer.complete(errorFuture); |
+ _completer.completeError( |
+ new StateError("No elements"), StackTrace.current); |
+ return true; |
+ } |
+ return false; |
+ } |
+} |
+ |
+/// Request for a [StreamQueue.peek] call. |
+/// |
+/// Completes the returned future when receiving the first event, |
+/// and is then complete, but doesn't consume the event. |
+class _PeekRequest<T> implements _EventRequest<T> { |
+ /// Completer for the future returned by [StreamQueue.next]. |
+ final _completer = new Completer<T>(); |
+ |
+ _PeekRequest(); |
+ |
+ Future<T> get future => _completer.future; |
+ |
+ bool update(QueueList<Result<T>> events, bool isDone) { |
+ if (events.isNotEmpty) { |
+ events.first.complete(_completer); |
+ return true; |
+ } |
+ if (isDone) { |
+ _completer.completeError( |
+ new StateError("No elements"), StackTrace.current); |
return true; |
} |
return false; |
@@ -476,9 +751,9 @@ class _NextRequest<T> implements _EventRequest { |
} |
/// Request for a [StreamQueue.skip] call. |
-class _SkipRequest implements _EventRequest { |
+class _SkipRequest<T> implements _EventRequest<T> { |
/// Completer for the future returned by the skip call. |
- final Completer _completer = new Completer<int>(); |
+ final _completer = new Completer<int>(); |
/// Number of remaining events to skip. |
/// |
@@ -491,9 +766,9 @@ class _SkipRequest implements _EventRequest { |
_SkipRequest(this._eventsToSkip); |
/// The future completed when the correct number of events have been skipped. |
- Future get future => _completer.future; |
+ Future<int> get future => _completer.future; |
- bool update(Queue<Result> events, bool isDone) { |
+ bool update(QueueList<Result<T>> events, bool isDone) { |
while (_eventsToSkip > 0) { |
if (events.isEmpty) { |
if (isDone) break; |
@@ -503,7 +778,7 @@ class _SkipRequest implements _EventRequest { |
var event = events.removeFirst(); |
if (event.isError) { |
- event.complete(_completer); |
+ _completer.completeError(event.asError.error, event.asError.stackTrace); |
return true; |
} |
} |
@@ -512,13 +787,13 @@ class _SkipRequest implements _EventRequest { |
} |
} |
-/// Request for a [StreamQueue.take] call. |
-class _TakeRequest<T> implements _EventRequest { |
+/// Common superclass for [_TakeRequest] and [_LookAheadRequest]. |
+abstract class _ListRequest<T> implements _EventRequest<T> { |
/// Completer for the future returned by the take call. |
- final Completer _completer; |
+ final _completer = new Completer<List<T>>(); |
/// List collecting events until enough have been seen. |
- final List _list = <T>[]; |
+ final _list = <T>[]; |
/// Number of events to capture. |
/// |
@@ -526,24 +801,51 @@ class _TakeRequest<T> implements _EventRequest { |
/// this value. |
final int _eventsToTake; |
- _TakeRequest(this._eventsToTake) : _completer = new Completer<List<T>>(); |
+ _ListRequest(this._eventsToTake); |
/// The future completed when the correct number of events have been captured. |
- Future get future => _completer.future; |
+ Future<List<T>> get future => _completer.future; |
+} |
- bool update(Queue<Result> events, bool isDone) { |
+/// Request for a [StreamQueue.take] call. |
+class _TakeRequest<T> extends _ListRequest<T> { |
+ _TakeRequest(int eventsToTake) : super(eventsToTake); |
+ |
+ bool update(QueueList<Result<T>> events, bool isDone) { |
while (_list.length < _eventsToTake) { |
if (events.isEmpty) { |
if (isDone) break; |
return false; |
} |
- var result = events.removeFirst(); |
- if (result.isError) { |
- result.complete(_completer); |
+ var event = events.removeFirst(); |
+ if (event.isError) { |
+ event.asError.complete(_completer); |
+ return true; |
+ } |
+ _list.add(event.asValue.value); |
+ } |
+ _completer.complete(_list); |
+ return true; |
+ } |
+} |
+ |
+/// Request for a [StreamQueue.lookAhead] call. |
+class _LookAheadRequest<T> extends _ListRequest<T> { |
+ _LookAheadRequest(int eventsToTake) : super(eventsToTake); |
+ |
+ bool update(QueueList<Result<T>> events, bool isDone) { |
+ while (_list.length < _eventsToTake) { |
+ if (events.length == _list.length) { |
+ if (isDone) break; |
+ return false; |
+ } |
+ var event = events.elementAt(_list.length); |
+ if (event.isError) { |
+ event.asError.complete(_completer); |
return true; |
} |
- _list.add(result.asValue.value); |
+ _list.add(event.asValue.value); |
} |
_completer.complete(_list); |
return true; |
@@ -555,11 +857,10 @@ class _TakeRequest<T> implements _EventRequest { |
/// The request needs no events, it just waits in the request queue |
/// until all previous events are fulfilled, then it cancels the stream queue |
/// source subscription. |
-class _CancelRequest implements _EventRequest { |
+class _CancelRequest<T> implements _EventRequest<T> { |
/// Completer for the future returned by the `cancel` call. |
- final Completer _completer = new Completer(); |
+ final _completer = new Completer(); |
- /// The [StreamQueue] object that has this request queued. |
/// |
/// When the event is completed, it needs to cancel the active subscription |
/// of the `StreamQueue` object, if any. |
@@ -570,7 +871,7 @@ class _CancelRequest implements _EventRequest { |
/// The future completed when the cancel request is completed. |
Future get future => _completer.future; |
- bool update(Queue<Result> events, bool isDone) { |
+ bool update(QueueList<Result<T>> events, bool isDone) { |
if (_streamQueue._isDone) { |
_completer.complete(); |
} else { |
@@ -586,22 +887,22 @@ class _CancelRequest implements _EventRequest { |
/// The request is always complete, it just waits in the request queue |
/// until all previous events are fulfilled, then it takes over the |
/// stream events subscription and creates a stream from it. |
-class _RestRequest<T> implements _EventRequest { |
+class _RestRequest<T> implements _EventRequest<T> { |
/// Completer for the stream returned by the `rest` call. |
- final StreamCompleter _completer = new StreamCompleter<T>(); |
+ final _completer = new StreamCompleter<T>(); |
/// The [StreamQueue] object that has this request queued. |
/// |
/// When the event is completed, it needs to cancel the active subscription |
/// of the `StreamQueue` object, if any. |
- final StreamQueue _streamQueue; |
+ final StreamQueue<T> _streamQueue; |
_RestRequest(this._streamQueue); |
/// The stream which will contain the remaining events of [_streamQueue]. |
Stream<T> get stream => _completer.stream; |
- bool update(Queue<Result> events, bool isDone) { |
+ bool update(QueueList<Result<T>> events, bool isDone) { |
if (events.isEmpty) { |
if (_streamQueue._isDone) { |
_completer.setEmpty(); |
@@ -615,8 +916,9 @@ class _RestRequest<T> implements _EventRequest { |
for (var event in events) { |
event.addTo(controller); |
} |
- controller.addStream(_streamQueue._extractStream(), cancelOnError: false) |
- .whenComplete(controller.close); |
+ controller |
+ .addStream(_streamQueue._extractStream(), cancelOnError: false) |
+ .whenComplete(controller.close); |
_completer.setSourceStream(controller.stream); |
} |
return true; |
@@ -629,12 +931,12 @@ class _RestRequest<T> implements _EventRequest { |
/// but doesn't consume the event. |
/// If the request is closed without seeing an event, then |
/// the [future] is completed with `false`. |
-class _HasNextRequest<T> implements _EventRequest { |
- final Completer _completer = new Completer<bool>(); |
+class _HasNextRequest<T> implements _EventRequest<T> { |
+ final _completer = new Completer<bool>(); |
Future<bool> get future => _completer.future; |
- bool update(Queue<Result> events, bool isDone) { |
+ bool update(QueueList<Result<T>> events, bool isDone) { |
if (events.isNotEmpty) { |
_completer.complete(true); |
return true; |
@@ -646,3 +948,33 @@ class _HasNextRequest<T> implements _EventRequest { |
return false; |
} |
} |
+ |
+/// Request for a [StreamQueue.startTransaction] call. |
+/// |
+/// This request isn't complete until the user calls |
+/// [StreamQueueTransaction.commit] or [StreamQueueTransaction.reject], at which |
+/// point it manually removes itself from the request queue and calls |
+/// [StreamQueue._updateRequests]. |
+class _TransactionRequest<T> implements _EventRequest<T> { |
+ /// The transaction created by this request. |
+ StreamQueueTransaction<T> get transaction => _transaction; |
+ StreamQueueTransaction<T> _transaction; |
+ |
+ /// The controller that passes events to [transaction]. |
+ final _controller = new StreamController<T>(sync: true); |
+ |
+ /// The number of events passed to [_controller] so far. |
+ var _eventsSent = 0; |
+ |
+ _TransactionRequest(StreamQueue<T> parent) { |
+ _transaction = new StreamQueueTransaction._(parent, _controller.stream); |
+ } |
+ |
+ bool update(QueueList<Result<T>> events, bool isDone) { |
+ while (_eventsSent < events.length) { |
+ events[_eventsSent++].addTo(_controller); |
+ } |
+ if (isDone && !_controller.isClosed) _controller.close(); |
+ return false; |
+ } |
+} |