Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(156)

Side by Side Diff: packages/async/lib/src/stream_queue.dart

Issue 2989763002: Update charted to 0.4.8 and roll (Closed)
Patch Set: Removed Cutch from list of reviewers Created 3 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 library async.stream_events;
6
7 import 'dart:async'; 5 import 'dart:async';
8 import 'dart:collection'; 6 import 'dart:collection';
9 7
8 import 'package:collection/collection.dart';
9
10 import "cancelable_operation.dart";
11 import "result.dart";
10 import "subscription_stream.dart"; 12 import "subscription_stream.dart";
11 import "stream_completer.dart"; 13 import "stream_completer.dart";
12 import "../result.dart"; 14 import "stream_splitter.dart";
13 15
14 /// An asynchronous pull-based interface for accessing stream events. 16 /// An asynchronous pull-based interface for accessing stream events.
15 /// 17 ///
16 /// Wraps a stream and makes individual events available on request. 18 /// Wraps a stream and makes individual events available on request.
17 /// 19 ///
18 /// You can request (and reserve) one or more events from the stream, 20 /// You can request (and reserve) one or more events from the stream,
19 /// and after all previous requests have been fulfilled, stream events 21 /// and after all previous requests have been fulfilled, stream events
20 /// go towards fulfilling your request. 22 /// go towards fulfilling your request.
21 /// 23 ///
22 /// For example, if you ask for [next] two times, the returned futures 24 /// For example, if you ask for [next] two times, the returned futures
(...skipping 57 matching lines...) Expand 10 before | Expand all | Expand 10 after
80 // by the content of the fifth event. 82 // by the content of the fifth event.
81 83
82 /// Whether the event source is done. 84 /// Whether the event source is done.
83 bool _isDone = false; 85 bool _isDone = false;
84 86
85 /// Whether a closing operation has been performed on the stream queue. 87 /// Whether a closing operation has been performed on the stream queue.
86 /// 88 ///
87 /// Closing operations are [cancel] and [rest]. 89 /// Closing operations are [cancel] and [rest].
88 bool _isClosed = false; 90 bool _isClosed = false;
89 91
92 /// The number of events dispatched by this queue.
93 ///
94 /// This counts error events. It doesn't count done events, or events
95 /// dispatched to a stream returned by [rest].
96 int get eventsDispatched => _eventsReceived - _eventQueue.length;
97
98 /// The number of events received by this queue.
99 var _eventsReceived = 0;
100
90 /// Queue of events not used by a request yet. 101 /// Queue of events not used by a request yet.
91 final Queue<Result> _eventQueue = new Queue(); 102 final QueueList<Result> _eventQueue = new QueueList();
92 103
93 /// Queue of pending requests. 104 /// Queue of pending requests.
94 /// 105 ///
95 /// Access through methods below to ensure consistency. 106 /// Access through methods below to ensure consistency.
96 final Queue<_EventRequest> _requestQueue = new Queue(); 107 final Queue<_EventRequest> _requestQueue = new Queue();
97 108
98 /// Create a `StreamQueue` of the events of [source]. 109 /// Create a `StreamQueue` of the events of [source].
99 factory StreamQueue(Stream source) = _StreamQueue<T>; 110 factory StreamQueue(Stream<T> source) = _StreamQueue<T>;
100 111
101 StreamQueue._(); 112 StreamQueue._();
102 113
103 /// Asks if the stream has any more events. 114 /// Asks if the stream has any more events.
104 /// 115 ///
105 /// Returns a future that completes with `true` if the stream has any 116 /// Returns a future that completes with `true` if the stream has any
106 /// more events, whether data or error. 117 /// more events, whether data or error.
107 /// If the stream closes without producing any more events, the returned 118 /// If the stream closes without producing any more events, the returned
108 /// future completes with `false`. 119 /// future completes with `false`.
109 /// 120 ///
110 /// Can be used before using [next] to avoid getting an error in the 121 /// Can be used before using [next] to avoid getting an error in the
111 /// future returned by `next` in the case where there are no more events. 122 /// future returned by `next` in the case where there are no more events.
112 /// Another alternative is to use `take(1)` which returns either zero or 123 /// Another alternative is to use `take(1)` which returns either zero or
113 /// one events. 124 /// one events.
114 Future<bool> get hasNext { 125 Future<bool> get hasNext {
115 if (!_isClosed) { 126 if (!_isClosed) {
116 var hasNextRequest = new _HasNextRequest(); 127 var hasNextRequest = new _HasNextRequest();
117 _addRequest(hasNextRequest); 128 _addRequest(hasNextRequest);
118 return hasNextRequest.future; 129 return hasNextRequest.future;
119 } 130 }
120 throw _failClosed(); 131 throw _failClosed();
121 } 132 }
122 133
134 /// Look at the next [count] data events without consuming them.
135 ///
136 /// Works like [take] except that the events are left in the queue.
137 /// If one of the next [count] events is an error, the returned future
138 /// completes with this error, and the error is still left in the queue.
139 Future<List<T>> lookAhead(int count) {
140 if (count < 0) throw new RangeError.range(count, 0, null, "count");
141 if (!_isClosed) {
142 var request = new _LookAheadRequest<T>(count);
143 _addRequest(request);
144 return request.future;
145 }
146 throw _failClosed();
147 }
148
123 /// Requests the next (yet unrequested) event from the stream. 149 /// Requests the next (yet unrequested) event from the stream.
124 /// 150 ///
125 /// When the requested event arrives, the returned future is completed with 151 /// When the requested event arrives, the returned future is completed with
126 /// the event. 152 /// the event.
127 /// If the event is a data event, the returned future completes 153 /// If the event is a data event, the returned future completes
128 /// with its value. 154 /// with its value.
129 /// If the event is an error event, the returned future completes with 155 /// If the event is an error event, the returned future completes with
130 /// its error and stack trace. 156 /// its error and stack trace.
131 /// If the stream closes before an event arrives, the returned future 157 /// If the stream closes before an event arrives, the returned future
132 /// completes with a [StateError]. 158 /// completes with a [StateError].
133 /// 159 ///
134 /// It's possible to have several pending [next] calls (or other requests), 160 /// It's possible to have several pending [next] calls (or other requests),
135 /// and they will be completed in the order they were requested, by the 161 /// and they will be completed in the order they were requested, by the
136 /// first events that were not consumed by previous requeusts. 162 /// first events that were not consumed by previous requeusts.
137 Future<T> get next { 163 Future<T> get next {
138 if (!_isClosed) { 164 if (!_isClosed) {
139 var nextRequest = new _NextRequest<T>(); 165 var nextRequest = new _NextRequest<T>();
140 _addRequest(nextRequest); 166 _addRequest(nextRequest);
141 return nextRequest.future; 167 return nextRequest.future;
142 } 168 }
143 throw _failClosed(); 169 throw _failClosed();
144 } 170 }
145 171
172 /// Looks at the next (yet unrequested) event from the stream.
173 ///
174 /// Like [next] except that the event is not consumed.
175 /// If the next event is an error event, it stays in the queue.
176 Future<T> get peek {
177 if (!_isClosed) {
178 var nextRequest = new _PeekRequest<T>();
179 _addRequest(nextRequest);
180 return nextRequest.future;
181 }
182 throw _failClosed();
183 }
184
146 /// Returns a stream of all the remaning events of the source stream. 185 /// Returns a stream of all the remaning events of the source stream.
147 /// 186 ///
148 /// All requested [next], [skip] or [take] operations are completed 187 /// All requested [next], [skip] or [take] operations are completed
149 /// first, and then any remaining events are provided as events of 188 /// first, and then any remaining events are provided as events of
150 /// the returned stream. 189 /// the returned stream.
151 /// 190 ///
152 /// Using `rest` closes this stream queue. After getting the 191 /// Using `rest` closes this stream queue. After getting the
153 /// `rest` the caller may no longer request other events, like 192 /// `rest` the caller may no longer request other events, like
154 /// after calling [cancel]. 193 /// after calling [cancel].
155 Stream<T> get rest { 194 Stream<T> get rest {
(...skipping 49 matching lines...) Expand 10 before | Expand all | Expand 10 after
205 Future<List<T>> take(int count) { 244 Future<List<T>> take(int count) {
206 if (count < 0) throw new RangeError.range(count, 0, null, "count"); 245 if (count < 0) throw new RangeError.range(count, 0, null, "count");
207 if (!_isClosed) { 246 if (!_isClosed) {
208 var request = new _TakeRequest<T>(count); 247 var request = new _TakeRequest<T>(count);
209 _addRequest(request); 248 _addRequest(request);
210 return request.future; 249 return request.future;
211 } 250 }
212 throw _failClosed(); 251 throw _failClosed();
213 } 252 }
214 253
254 /// Requests a transaction that can conditionally consume events.
255 ///
256 /// The transaction can create copies of this queue at the current position
257 /// using [StreamQueueTransaction.newQueue]. Each of these queues is
258 /// independent of one another and of the parent queue. The transaction
259 /// finishes when one of two methods is called:
260 ///
261 /// * [StreamQueueTransaction.commit] updates the parent queue's position to
262 /// match that of one of the copies.
263 ///
264 /// * [StreamQueueTransaction.reject] causes the parent queue to continue as
265 /// though [startTransaction] hadn't been called.
266 ///
267 /// Until the transaction finishes, this queue won't emit any events.
268 ///
269 /// See also [withTransaction] and [cancelable].
270 ///
271 /// ```dart
272 /// /// Consumes all empty lines from the beginning of [lines].
273 /// Future consumeEmptyLines(StreamQueue<String> lines) async {
274 /// while (await lines.hasNext) {
275 /// var transaction = lines.startTransaction();
276 /// var queue = transaction.newQueue();
277 /// if ((await queue.next).isNotEmpty) {
278 /// transaction.reject();
279 /// return;
280 /// } else {
281 /// transaction.commit(queue);
282 /// }
283 /// }
284 /// }
285 /// ```
286 StreamQueueTransaction<T> startTransaction() {
287 if (_isClosed) throw _failClosed();
288
289 var request = new _TransactionRequest(this);
290 _addRequest(request);
291 return request.transaction;
292 }
293
294 /// Passes a copy of this queue to [callback], and updates this queue to match
295 /// the copy's position if [callback] returns `true`.
296 ///
297 /// This queue won't emit any events until [callback] returns. If it returns
298 /// `false`, this queue continues as though [withTransaction] hadn't been
299 /// called. If it throws an error, this updates this queue to match the copy's
300 /// position and throws the error from the returned `Future`.
301 ///
302 /// Returns the same value as [callback].
303 ///
304 /// See also [startTransaction] and [cancelable].
305 ///
306 /// ```dart
307 /// /// Consumes all empty lines from the beginning of [lines].
308 /// Future consumeEmptyLines(StreamQueue<String> lines) async {
309 /// while (await lines.hasNext) {
310 /// // Consume a line if it's empty, otherwise return.
311 /// if (!await lines.withTransaction(
312 /// (queue) async => (await queue.next).isEmpty)) {
313 /// return;
314 /// }
315 /// }
316 /// }
317 /// ```
318 Future<bool> withTransaction(Future<bool> callback(StreamQueue<T> queue)) {
319 var transaction = startTransaction();
320
321 /// Avoid async/await to ensure that [startTransaction] is called
322 /// synchronously and so ends up in the right place in the request queue.
323 var queue = transaction.newQueue();
324 return callback(queue).then((result) {
325 if (result) {
326 transaction.commit(queue);
327 } else {
328 transaction.reject();
329 }
330 return result;
331 }, onError: (error) {
332 transaction.commit(queue);
333 throw error;
334 });
335 }
336
337 /// Passes a copy of this queue to [callback], and updates this queue to match
338 /// the copy's position once [callback] completes.
339 ///
340 /// If the returned [CancelableOperation] is canceled, this queue instead
341 /// continues as though [cancelable] hadn't been called. Otherwise, it emits
342 /// the same value or error as [callback].
343 ///
344 /// See also [startTransaction] and [withTransaction].
345 ///
346 /// ```dart
347 /// final _stdinQueue = new StreamQueue(stdin);
348 ///
349 /// /// Returns an operation that completes when the user sends a line to
350 /// /// standard input.
351 /// ///
352 /// /// If the operation is canceled, stops waiting for user input.
353 /// CancelableOperation<String> nextStdinLine() =>
354 /// _stdinQueue.cancelable((queue) => queue.next);
355 /// ```
356 CancelableOperation<S> cancelable<S>(
357 Future<S> callback(StreamQueue<T> queue)) {
358 var transaction = startTransaction();
359 var completer = new CancelableCompleter<S>(onCancel: () {
360 transaction.reject();
361 });
362
363 var queue = transaction.newQueue();
364 completer.complete(callback(queue).whenComplete(() {
365 if (!completer.isCanceled) transaction.commit(queue);
366 }));
367
368 return completer.operation;
369 }
370
215 /// Cancels the underlying event source. 371 /// Cancels the underlying event source.
216 /// 372 ///
217 /// If [immediate] is `false` (the default), the cancel operation waits until 373 /// If [immediate] is `false` (the default), the cancel operation waits until
218 /// all previously requested events have been processed, then it cancels the 374 /// all previously requested events have been processed, then it cancels the
219 /// subscription providing the events. 375 /// subscription providing the events.
220 /// 376 ///
221 /// If [immediate] is `true`, the source is instead canceled 377 /// If [immediate] is `true`, the source is instead canceled
222 /// immediately. Any pending events are completed as though the underlying 378 /// immediately. Any pending events are completed as though the underlying
223 /// stream had closed. 379 /// stream had closed.
224 /// 380 ///
225 /// The returned future completes with the result of calling 381 /// The returned future completes with the result of calling
226 /// `cancel`. 382 /// `cancel`.
227 /// 383 ///
228 /// After calling `cancel`, no further events can be requested. 384 /// After calling `cancel`, no further events can be requested.
229 /// None of [next], [rest], [skip], [take] or [cancel] may be 385 /// None of [lookAhead], [next], [peek], [rest], [skip], [take] or [cancel]
230 /// called again. 386 /// may be called again.
231 Future cancel({bool immediate: false}) { 387 Future cancel({bool immediate: false}) {
232 if (_isClosed) throw _failClosed(); 388 if (_isClosed) throw _failClosed();
233 _isClosed = true; 389 _isClosed = true;
234 390
235 if (!immediate) { 391 if (!immediate) {
236 var request = new _CancelRequest(this); 392 var request = new _CancelRequest(this);
237 _addRequest(request); 393 _addRequest(request);
238 return request.future; 394 return request.future;
239 } 395 }
240 396
(...skipping 28 matching lines...) Expand all
269 _pause(); 425 _pause();
270 } 426 }
271 } 427 }
272 428
273 /// Extracts a stream from the event source and makes this stream queue 429 /// Extracts a stream from the event source and makes this stream queue
274 /// unusable. 430 /// unusable.
275 /// 431 ///
276 /// Can only be used by the very last request (the stream queue must 432 /// Can only be used by the very last request (the stream queue must
277 /// be closed by that request). 433 /// be closed by that request).
278 /// Only used by [rest]. 434 /// Only used by [rest].
279 Stream _extractStream(); 435 Stream<T> _extractStream();
280 436
281 /// Requests that the event source pauses events. 437 /// Requests that the event source pauses events.
282 /// 438 ///
283 /// This is called automatically when the request queue is empty. 439 /// This is called automatically when the request queue is empty.
284 /// 440 ///
285 /// The event source is restarted by the next call to [_ensureListening]. 441 /// The event source is restarted by the next call to [_ensureListening].
286 void _pause(); 442 void _pause();
287 443
288 /// Ensures that we are listening on events from the event source. 444 /// Ensures that we are listening on events from the event source.
289 /// 445 ///
290 /// Starts listening for the first time or resumes after a [_pause]. 446 /// Starts listening for the first time or resumes after a [_pause].
291 /// 447 ///
292 /// Is called automatically if a request requires more events. 448 /// Is called automatically if a request requires more events.
293 void _ensureListening(); 449 void _ensureListening();
294 450
295 /// Cancels the underlying event source. 451 /// Cancels the underlying event source.
296 Future _cancel(); 452 Future _cancel();
297 453
298 // ------------------------------------------------------------------ 454 // ------------------------------------------------------------------
299 // Methods called by the event source to add events or say that it's 455 // Methods called by the event source to add events or say that it's
300 // done. 456 // done.
301 457
302 /// Called when the event source adds a new data or error event. 458 /// Called when the event source adds a new data or error event.
303 /// Always calls [_updateRequests] after adding. 459 /// Always calls [_updateRequests] after adding.
304 void _addResult(Result result) { 460 void _addResult(Result result) {
461 _eventsReceived++;
305 _eventQueue.add(result); 462 _eventQueue.add(result);
306 _updateRequests(); 463 _updateRequests();
307 } 464 }
308 465
309 /// Called when the event source is done. 466 /// Called when the event source is done.
310 /// Always calls [_updateRequests] after adding. 467 /// Always calls [_updateRequests] after adding.
311 void _close() { 468 void _close() {
312 _isDone = true; 469 _isDone = true;
313 _updateRequests(); 470 _updateRequests();
314 } 471 }
(...skipping 15 matching lines...) Expand all
330 /// immediately, it skips the queue. 487 /// immediately, it skips the queue.
331 void _addRequest(_EventRequest request) { 488 void _addRequest(_EventRequest request) {
332 if (_requestQueue.isEmpty) { 489 if (_requestQueue.isEmpty) {
333 if (request.update(_eventQueue, _isDone)) return; 490 if (request.update(_eventQueue, _isDone)) return;
334 _ensureListening(); 491 _ensureListening();
335 } 492 }
336 _requestQueue.add(request); 493 _requestQueue.add(request);
337 } 494 }
338 } 495 }
339 496
340
341 /// The default implementation of [StreamQueue]. 497 /// The default implementation of [StreamQueue].
342 /// 498 ///
343 /// This queue gets its events from a stream which is listened 499 /// This queue gets its events from a stream which is listened
344 /// to when a request needs events. 500 /// to when a request needs events.
345 class _StreamQueue<T> extends StreamQueue<T> { 501 class _StreamQueue<T> extends StreamQueue<T> {
346 /// Source of events. 502 /// Source of events.
347 final Stream _sourceStream; 503 final Stream<T> _sourceStream;
348 504
349 /// Subscription on [_sourceStream] while listening for events. 505 /// Subscription on [_sourceStream] while listening for events.
350 /// 506 ///
351 /// Set to subscription when listening, and set to `null` when the 507 /// Set to subscription when listening, and set to `null` when the
352 /// subscription is done (and [_isDone] is set to true). 508 /// subscription is done (and [_isDone] is set to true).
353 StreamSubscription _subscription; 509 StreamSubscription<T> _subscription;
354 510
355 _StreamQueue(this._sourceStream) : super._(); 511 _StreamQueue(this._sourceStream) : super._();
356 512
357 Future _cancel() { 513 Future _cancel() {
358 if (_isDone) return null; 514 if (_isDone) return null;
359 if (_subscription == null) _subscription = _sourceStream.listen(null); 515 if (_subscription == null) _subscription = _sourceStream.listen(null);
360 var future = _subscription.cancel(); 516 var future = _subscription.cancel();
361 _close(); 517 _close();
362 return future; 518 return future;
363 } 519 }
364 520
365 void _ensureListening() { 521 void _ensureListening() {
366 assert(!_isDone); 522 if (_isDone) return;
367 if (_subscription == null) { 523 if (_subscription == null) {
368 _subscription = 524 _subscription = _sourceStream.listen((data) {
369 _sourceStream.listen( 525 _addResult(new Result.value(data));
370 (data) { 526 }, onError: (error, StackTrace stackTrace) {
371 _addResult(new Result.value(data)); 527 _addResult(new Result.error(error, stackTrace));
372 }, 528 }, onDone: () {
373 onError: (error, StackTrace stackTrace) { 529 _subscription = null;
374 _addResult(new Result.error(error, stackTrace)); 530 this._close();
375 }, 531 });
376 onDone: () {
377 _subscription = null;
378 this._close();
379 });
380 } else { 532 } else {
381 _subscription.resume(); 533 _subscription.resume();
382 } 534 }
383 } 535 }
384 536
385 void _pause() { 537 void _pause() {
386 _subscription.pause(); 538 _subscription.pause();
387 } 539 }
388 540
389 Stream<T> _extractStream() { 541 Stream<T> _extractStream() {
390 assert(_isClosed); 542 assert(_isClosed);
391 if (_isDone) { 543 if (_isDone) {
392 return new Stream<T>.empty(); 544 return new Stream<T>.empty();
393 } 545 }
546 _isDone = true;
394 547
395 if (_subscription == null) { 548 if (_subscription == null) {
396 return _sourceStream; 549 return _sourceStream;
397 } 550 }
398 551
399 var subscription = _subscription; 552 var subscription = _subscription;
400 _subscription = null; 553 _subscription = null;
401 _isDone = true;
402 554
403 var wasPaused = subscription.isPaused; 555 var wasPaused = subscription.isPaused;
404 var result = new SubscriptionStream<T>(subscription); 556 var result = new SubscriptionStream<T>(subscription);
405 // Resume after creating stream because that pauses the subscription too. 557 // Resume after creating stream because that pauses the subscription too.
406 // This way there won't be a short resumption in the middle. 558 // This way there won't be a short resumption in the middle.
407 if (wasPaused) subscription.resume(); 559 if (wasPaused) subscription.resume();
408 return result; 560 return result;
409 } 561 }
410 } 562 }
411 563
564 /// A transaction on a [StreamQueue], created by [StreamQueue.startTransaction].
565 ///
566 /// Copies of the parent queue may be created using [newQueue]. Calling [commit]
567 /// moves the parent queue to a copy's position, and calling [reject] causes it
568 /// to continue as though [StreamQueue.startTransaction] was never called.
569 class StreamQueueTransaction<T> {
570 /// The parent queue on which this transaction is active.
571 final StreamQueue<T> _parent;
572
573 /// The splitter that produces copies of the parent queue's stream.
574 final StreamSplitter<T> _splitter;
575
576 /// Queues created using [newQueue].
577 final _queues = new Set<StreamQueue>();
578
579 /// Whether [commit] has been called.
580 var _committed = false;
581
582 /// Whether [reject] has been called.
583 var _rejected = false;
584
585 StreamQueueTransaction._(this._parent, Stream<T> source)
586 : _splitter = new StreamSplitter(source);
587
588 /// Creates a new copy of the parent queue.
589 ///
590 /// This copy starts at the parent queue's position when
591 /// [StreamQueue.startTransaction] was called. Its position can be committed
592 /// to the parent queue using [commit].
593 StreamQueue<T> newQueue() {
594 var queue = new StreamQueue(_splitter.split());
595 _queues.add(queue);
596 return queue;
597 }
598
599 /// Commits a queue created using [newQueue].
600 ///
601 /// The parent queue's position is updated to be the same as [queue]'s.
602 /// Further requests on all queues created by this transaction, including
603 /// [queue], will complete as though [cancel] were called with `immediate:
604 /// true`.
605 ///
606 /// Throws a [StateError] if [commit] or [reject] have already been called, or
607 /// if there are pending requests on [queue].
608 void commit(StreamQueue<T> queue) {
609 _assertActive();
610 if (!_queues.contains(queue)) {
611 throw new ArgumentError("Queue doesn't belong to this transaction.");
612 } else if (queue._requestQueue.isNotEmpty) {
613 throw new StateError("A queue with pending requests can't be committed.");
614 }
615 _committed = true;
616
617 // Remove all events from the parent queue that were consumed by the
618 // child queue.
619 for (var j = 0; j < queue.eventsDispatched; j++) {
620 _parent._eventQueue.removeFirst();
621 }
622
623 _done();
624 }
625
626 /// Rejects this transaction without updating the parent queue.
627 ///
628 /// The parent will continue as though [StreamQueue.startTransaction] hadn't
629 /// been called. Further requests on all queues created by this transaction
630 /// will complete as though [cancel] were called with `immediate: true`.
631 ///
632 /// Throws a [StateError] if [commit] or [reject] have already been called.
633 void reject() {
634 _assertActive();
635 _rejected = true;
636 _done();
637 }
638
639 // Cancels all [_queues], removes the [_TransactionRequest] from [_parent]'s
640 // request queue, and runs the next request.
641 void _done() {
642 _splitter.close();
643 for (var queue in _queues) {
644 queue._cancel();
645 }
646
647 assert((_parent._requestQueue.first as _TransactionRequest).transaction ==
648 this);
649 _parent._requestQueue.removeFirst();
650 _parent._updateRequests();
651 }
652
653 /// Throws a [StateError] if [accept] or [reject] has already been called.
654 void _assertActive() {
655 if (_committed) {
656 throw new StateError("This transaction has already been accepted.");
657 } else if (_rejected) {
658 throw new StateError("This transaction has already been rejected.");
659 }
660 }
661 }
412 662
413 /// Request object that receives events when they arrive, until fulfilled. 663 /// Request object that receives events when they arrive, until fulfilled.
414 /// 664 ///
415 /// Each request that cannot be fulfilled immediately is represented by 665 /// Each request that cannot be fulfilled immediately is represented by
416 /// an `_EventRequest` object in the request queue. 666 /// an `_EventRequest` object in the request queue.
417 /// 667 ///
418 /// Events from the source stream are sent to the first request in the 668 /// Events from the source stream are sent to the first request in the
419 /// queue until it reports itself as [isComplete]. 669 /// queue until it reports itself as [isComplete].
420 /// 670 ///
421 /// When the first request in the queue `isComplete`, either when becoming 671 /// When the first request in the queue `isComplete`, either when becoming
422 /// the first request or after receiving an event, its [close] methods is 672 /// the first request or after receiving an event, its [close] methods is
423 /// called. 673 /// called.
424 /// 674 ///
425 /// The [close] method is also called immediately when the source stream 675 /// The [close] method is also called immediately when the source stream
426 /// is done. 676 /// is done.
427 abstract class _EventRequest { 677 abstract class _EventRequest<T> {
428 /// Handle available events. 678 /// Handle available events.
429 /// 679 ///
430 /// The available events are provided as a queue. The `update` function 680 /// The available events are provided as a queue. The `update` function
431 /// should only remove events from the front of the event queue, e.g., 681 /// should only remove events from the front of the event queue, e.g.,
432 /// using [removeFirst]. 682 /// using [removeFirst].
433 /// 683 ///
434 /// Returns `true` if the request is completed, or `false` if it needs 684 /// Returns `true` if the request is completed, or `false` if it needs
435 /// more events. 685 /// more events.
436 /// The call may keep events in the queue until the requeust is complete, 686 /// The call may keep events in the queue until the requeust is complete,
437 /// or it may remove them immediately. 687 /// or it may remove them immediately.
438 /// 688 ///
439 /// If the method returns true, the request is considered fulfilled, and 689 /// If the method returns true, the request is considered fulfilled, and
440 /// will never be called again. 690 /// will never be called again.
441 /// 691 ///
442 /// This method is called when a request reaches the front of the request 692 /// This method is called when a request reaches the front of the request
443 /// queue, and if it returns `false`, it's called again every time a new event 693 /// queue, and if it returns `false`, it's called again every time a new event
444 /// becomes available, or when the stream closes. 694 /// becomes available, or when the stream closes.
445 /// If the function returns `false` when the stream has already closed 695 /// If the function returns `false` when the stream has already closed
446 /// ([isDone] is true), then the request must call 696 /// ([isDone] is true), then the request must call
447 /// [StreamQueue._updateRequests] itself when it's ready to continue. 697 /// [StreamQueue._updateRequests] itself when it's ready to continue.
448 bool update(Queue<Result> events, bool isDone); 698 bool update(QueueList<Result<T>> events, bool isDone);
449 } 699 }
450 700
451 /// Request for a [StreamQueue.next] call. 701 /// Request for a [StreamQueue.next] call.
452 /// 702 ///
453 /// Completes the returned future when receiving the first event, 703 /// Completes the returned future when receiving the first event,
454 /// and is then complete. 704 /// and is then complete.
455 class _NextRequest<T> implements _EventRequest { 705 class _NextRequest<T> implements _EventRequest<T> {
456 /// Completer for the future returned by [StreamQueue.next]. 706 /// Completer for the future returned by [StreamQueue.next].
457 final Completer _completer; 707 final _completer = new Completer<T>();
458 708
459 _NextRequest() : _completer = new Completer<T>(); 709 _NextRequest();
460 710
461 Future<T> get future => _completer.future; 711 Future<T> get future => _completer.future;
462 712
463 bool update(Queue<Result> events, bool isDone) { 713 bool update(QueueList<Result<T>> events, bool isDone) {
464 if (events.isNotEmpty) { 714 if (events.isNotEmpty) {
465 events.removeFirst().complete(_completer); 715 events.removeFirst().complete(_completer);
466 return true; 716 return true;
467 } 717 }
468 if (isDone) { 718 if (isDone) {
469 var errorFuture = 719 _completer.completeError(
470 new Future.sync(() => throw new StateError("No elements")); 720 new StateError("No elements"), StackTrace.current);
471 _completer.complete(errorFuture);
472 return true; 721 return true;
473 } 722 }
474 return false; 723 return false;
724 }
725 }
726
727 /// Request for a [StreamQueue.peek] call.
728 ///
729 /// Completes the returned future when receiving the first event,
730 /// and is then complete, but doesn't consume the event.
731 class _PeekRequest<T> implements _EventRequest<T> {
732 /// Completer for the future returned by [StreamQueue.next].
733 final _completer = new Completer<T>();
734
735 _PeekRequest();
736
737 Future<T> get future => _completer.future;
738
739 bool update(QueueList<Result<T>> events, bool isDone) {
740 if (events.isNotEmpty) {
741 events.first.complete(_completer);
742 return true;
743 }
744 if (isDone) {
745 _completer.completeError(
746 new StateError("No elements"), StackTrace.current);
747 return true;
748 }
749 return false;
475 } 750 }
476 } 751 }
477 752
478 /// Request for a [StreamQueue.skip] call. 753 /// Request for a [StreamQueue.skip] call.
479 class _SkipRequest implements _EventRequest { 754 class _SkipRequest<T> implements _EventRequest<T> {
480 /// Completer for the future returned by the skip call. 755 /// Completer for the future returned by the skip call.
481 final Completer _completer = new Completer<int>(); 756 final _completer = new Completer<int>();
482 757
483 /// Number of remaining events to skip. 758 /// Number of remaining events to skip.
484 /// 759 ///
485 /// The request [isComplete] when the values reaches zero. 760 /// The request [isComplete] when the values reaches zero.
486 /// 761 ///
487 /// Decremented when an event is seen. 762 /// Decremented when an event is seen.
488 /// Set to zero when an error is seen since errors abort the skip request. 763 /// Set to zero when an error is seen since errors abort the skip request.
489 int _eventsToSkip; 764 int _eventsToSkip;
490 765
491 _SkipRequest(this._eventsToSkip); 766 _SkipRequest(this._eventsToSkip);
492 767
493 /// The future completed when the correct number of events have been skipped. 768 /// The future completed when the correct number of events have been skipped.
494 Future get future => _completer.future; 769 Future<int> get future => _completer.future;
495 770
496 bool update(Queue<Result> events, bool isDone) { 771 bool update(QueueList<Result<T>> events, bool isDone) {
497 while (_eventsToSkip > 0) { 772 while (_eventsToSkip > 0) {
498 if (events.isEmpty) { 773 if (events.isEmpty) {
499 if (isDone) break; 774 if (isDone) break;
500 return false; 775 return false;
501 } 776 }
502 _eventsToSkip--; 777 _eventsToSkip--;
503 778
504 var event = events.removeFirst(); 779 var event = events.removeFirst();
505 if (event.isError) { 780 if (event.isError) {
506 event.complete(_completer); 781 _completer.completeError(event.asError.error, event.asError.stackTrace);
507 return true; 782 return true;
508 } 783 }
509 } 784 }
510 _completer.complete(_eventsToSkip); 785 _completer.complete(_eventsToSkip);
511 return true; 786 return true;
512 } 787 }
513 } 788 }
514 789
515 /// Request for a [StreamQueue.take] call. 790 /// Common superclass for [_TakeRequest] and [_LookAheadRequest].
516 class _TakeRequest<T> implements _EventRequest { 791 abstract class _ListRequest<T> implements _EventRequest<T> {
517 /// Completer for the future returned by the take call. 792 /// Completer for the future returned by the take call.
518 final Completer _completer; 793 final _completer = new Completer<List<T>>();
519 794
520 /// List collecting events until enough have been seen. 795 /// List collecting events until enough have been seen.
521 final List _list = <T>[]; 796 final _list = <T>[];
522 797
523 /// Number of events to capture. 798 /// Number of events to capture.
524 /// 799 ///
525 /// The request [isComplete] when the length of [_list] reaches 800 /// The request [isComplete] when the length of [_list] reaches
526 /// this value. 801 /// this value.
527 final int _eventsToTake; 802 final int _eventsToTake;
528 803
529 _TakeRequest(this._eventsToTake) : _completer = new Completer<List<T>>(); 804 _ListRequest(this._eventsToTake);
530 805
531 /// The future completed when the correct number of events have been captured. 806 /// The future completed when the correct number of events have been captured.
532 Future get future => _completer.future; 807 Future<List<T>> get future => _completer.future;
808 }
533 809
534 bool update(Queue<Result> events, bool isDone) { 810 /// Request for a [StreamQueue.take] call.
811 class _TakeRequest<T> extends _ListRequest<T> {
812 _TakeRequest(int eventsToTake) : super(eventsToTake);
813
814 bool update(QueueList<Result<T>> events, bool isDone) {
535 while (_list.length < _eventsToTake) { 815 while (_list.length < _eventsToTake) {
536 if (events.isEmpty) { 816 if (events.isEmpty) {
537 if (isDone) break; 817 if (isDone) break;
538 return false; 818 return false;
539 } 819 }
540 820
541 var result = events.removeFirst(); 821 var event = events.removeFirst();
542 if (result.isError) { 822 if (event.isError) {
543 result.complete(_completer); 823 event.asError.complete(_completer);
544 return true; 824 return true;
545 } 825 }
546 _list.add(result.asValue.value); 826 _list.add(event.asValue.value);
547 } 827 }
548 _completer.complete(_list); 828 _completer.complete(_list);
549 return true; 829 return true;
830 }
831 }
832
833 /// Request for a [StreamQueue.lookAhead] call.
834 class _LookAheadRequest<T> extends _ListRequest<T> {
835 _LookAheadRequest(int eventsToTake) : super(eventsToTake);
836
837 bool update(QueueList<Result<T>> events, bool isDone) {
838 while (_list.length < _eventsToTake) {
839 if (events.length == _list.length) {
840 if (isDone) break;
841 return false;
842 }
843 var event = events.elementAt(_list.length);
844 if (event.isError) {
845 event.asError.complete(_completer);
846 return true;
847 }
848 _list.add(event.asValue.value);
849 }
850 _completer.complete(_list);
851 return true;
550 } 852 }
551 } 853 }
552 854
553 /// Request for a [StreamQueue.cancel] call. 855 /// Request for a [StreamQueue.cancel] call.
554 /// 856 ///
555 /// The request needs no events, it just waits in the request queue 857 /// The request needs no events, it just waits in the request queue
556 /// until all previous events are fulfilled, then it cancels the stream queue 858 /// until all previous events are fulfilled, then it cancels the stream queue
557 /// source subscription. 859 /// source subscription.
558 class _CancelRequest implements _EventRequest { 860 class _CancelRequest<T> implements _EventRequest<T> {
559 /// Completer for the future returned by the `cancel` call. 861 /// Completer for the future returned by the `cancel` call.
560 final Completer _completer = new Completer(); 862 final _completer = new Completer();
561 863
562 /// The [StreamQueue] object that has this request queued.
563 /// 864 ///
564 /// When the event is completed, it needs to cancel the active subscription 865 /// When the event is completed, it needs to cancel the active subscription
565 /// of the `StreamQueue` object, if any. 866 /// of the `StreamQueue` object, if any.
566 final StreamQueue _streamQueue; 867 final StreamQueue _streamQueue;
567 868
568 _CancelRequest(this._streamQueue); 869 _CancelRequest(this._streamQueue);
569 870
570 /// The future completed when the cancel request is completed. 871 /// The future completed when the cancel request is completed.
571 Future get future => _completer.future; 872 Future get future => _completer.future;
572 873
573 bool update(Queue<Result> events, bool isDone) { 874 bool update(QueueList<Result<T>> events, bool isDone) {
574 if (_streamQueue._isDone) { 875 if (_streamQueue._isDone) {
575 _completer.complete(); 876 _completer.complete();
576 } else { 877 } else {
577 _streamQueue._ensureListening(); 878 _streamQueue._ensureListening();
578 _completer.complete(_streamQueue._extractStream().listen(null).cancel()); 879 _completer.complete(_streamQueue._extractStream().listen(null).cancel());
579 } 880 }
580 return true; 881 return true;
581 } 882 }
582 } 883 }
583 884
584 /// Request for a [StreamQueue.rest] call. 885 /// Request for a [StreamQueue.rest] call.
585 /// 886 ///
586 /// The request is always complete, it just waits in the request queue 887 /// The request is always complete, it just waits in the request queue
587 /// until all previous events are fulfilled, then it takes over the 888 /// until all previous events are fulfilled, then it takes over the
588 /// stream events subscription and creates a stream from it. 889 /// stream events subscription and creates a stream from it.
589 class _RestRequest<T> implements _EventRequest { 890 class _RestRequest<T> implements _EventRequest<T> {
590 /// Completer for the stream returned by the `rest` call. 891 /// Completer for the stream returned by the `rest` call.
591 final StreamCompleter _completer = new StreamCompleter<T>(); 892 final _completer = new StreamCompleter<T>();
592 893
593 /// The [StreamQueue] object that has this request queued. 894 /// The [StreamQueue] object that has this request queued.
594 /// 895 ///
595 /// When the event is completed, it needs to cancel the active subscription 896 /// When the event is completed, it needs to cancel the active subscription
596 /// of the `StreamQueue` object, if any. 897 /// of the `StreamQueue` object, if any.
597 final StreamQueue _streamQueue; 898 final StreamQueue<T> _streamQueue;
598 899
599 _RestRequest(this._streamQueue); 900 _RestRequest(this._streamQueue);
600 901
601 /// The stream which will contain the remaining events of [_streamQueue]. 902 /// The stream which will contain the remaining events of [_streamQueue].
602 Stream<T> get stream => _completer.stream; 903 Stream<T> get stream => _completer.stream;
603 904
604 bool update(Queue<Result> events, bool isDone) { 905 bool update(QueueList<Result<T>> events, bool isDone) {
605 if (events.isEmpty) { 906 if (events.isEmpty) {
606 if (_streamQueue._isDone) { 907 if (_streamQueue._isDone) {
607 _completer.setEmpty(); 908 _completer.setEmpty();
608 } else { 909 } else {
609 _completer.setSourceStream(_streamQueue._extractStream()); 910 _completer.setSourceStream(_streamQueue._extractStream());
610 } 911 }
611 } else { 912 } else {
612 // There are prefetched events which needs to be added before the 913 // There are prefetched events which needs to be added before the
613 // remaining stream. 914 // remaining stream.
614 var controller = new StreamController<T>(); 915 var controller = new StreamController<T>();
615 for (var event in events) { 916 for (var event in events) {
616 event.addTo(controller); 917 event.addTo(controller);
617 } 918 }
618 controller.addStream(_streamQueue._extractStream(), cancelOnError: false) 919 controller
619 .whenComplete(controller.close); 920 .addStream(_streamQueue._extractStream(), cancelOnError: false)
921 .whenComplete(controller.close);
620 _completer.setSourceStream(controller.stream); 922 _completer.setSourceStream(controller.stream);
621 } 923 }
622 return true; 924 return true;
623 } 925 }
624 } 926 }
625 927
626 /// Request for a [StreamQueue.hasNext] call. 928 /// Request for a [StreamQueue.hasNext] call.
627 /// 929 ///
628 /// Completes the [future] with `true` if it sees any event, 930 /// Completes the [future] with `true` if it sees any event,
629 /// but doesn't consume the event. 931 /// but doesn't consume the event.
630 /// If the request is closed without seeing an event, then 932 /// If the request is closed without seeing an event, then
631 /// the [future] is completed with `false`. 933 /// the [future] is completed with `false`.
632 class _HasNextRequest<T> implements _EventRequest { 934 class _HasNextRequest<T> implements _EventRequest<T> {
633 final Completer _completer = new Completer<bool>(); 935 final _completer = new Completer<bool>();
634 936
635 Future<bool> get future => _completer.future; 937 Future<bool> get future => _completer.future;
636 938
637 bool update(Queue<Result> events, bool isDone) { 939 bool update(QueueList<Result<T>> events, bool isDone) {
638 if (events.isNotEmpty) { 940 if (events.isNotEmpty) {
639 _completer.complete(true); 941 _completer.complete(true);
640 return true; 942 return true;
641 } 943 }
642 if (isDone) { 944 if (isDone) {
643 _completer.complete(false); 945 _completer.complete(false);
644 return true; 946 return true;
645 } 947 }
646 return false; 948 return false;
647 } 949 }
648 } 950 }
951
952 /// Request for a [StreamQueue.startTransaction] call.
953 ///
954 /// This request isn't complete until the user calls
955 /// [StreamQueueTransaction.commit] or [StreamQueueTransaction.reject], at which
956 /// point it manually removes itself from the request queue and calls
957 /// [StreamQueue._updateRequests].
958 class _TransactionRequest<T> implements _EventRequest<T> {
959 /// The transaction created by this request.
960 StreamQueueTransaction<T> get transaction => _transaction;
961 StreamQueueTransaction<T> _transaction;
962
963 /// The controller that passes events to [transaction].
964 final _controller = new StreamController<T>(sync: true);
965
966 /// The number of events passed to [_controller] so far.
967 var _eventsSent = 0;
968
969 _TransactionRequest(StreamQueue<T> parent) {
970 _transaction = new StreamQueueTransaction._(parent, _controller.stream);
971 }
972
973 bool update(QueueList<Result<T>> events, bool isDone) {
974 while (_eventsSent < events.length) {
975 events[_eventsSent++].addTo(_controller);
976 }
977 if (isDone && !_controller.isClosed) _controller.close();
978 return false;
979 }
980 }
OLDNEW
« no previous file with comments | « packages/async/lib/src/stream_group.dart ('k') | packages/async/lib/src/stream_sink_completer.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698