Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(520)

Side by Side Diff: lib/src/stream_queue.dart

Issue 1215873004: Fix type errors caught by analyzer. (Closed) Base URL: https://github.com/dart-lang/async@master
Patch Set: Created 5 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « no previous file | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 library async.stream_events; 5 library async.stream_events;
6 6
7 import 'dart:async'; 7 import 'dart:async';
8 import 'dart:collection'; 8 import 'dart:collection';
9 9
10 import "subscription_stream.dart"; 10 import "subscription_stream.dart";
(...skipping 268 matching lines...) Expand 10 before | Expand all | Expand 10 after
279 if (request.addEvents(_eventQueue)) return; 279 if (request.addEvents(_eventQueue)) return;
280 _ensureListening(); 280 _ensureListening();
281 } 281 }
282 _requestQueue.add(request); 282 _requestQueue.add(request);
283 283
284 } 284 }
285 285
286 /// Ensures that we are listening on events from [_sourceStream]. 286 /// Ensures that we are listening on events from [_sourceStream].
287 /// 287 ///
288 /// Resumes subscription on [_sourceStream], or creates it if necessary. 288 /// Resumes subscription on [_sourceStream], or creates it if necessary.
289 StreamSubscription _ensureListening() { 289 void _ensureListening() {
290 assert(!_isDone); 290 assert(!_isDone);
291 if (_subscription == null) { 291 if (_subscription == null) {
292 _subscription = 292 _subscription =
293 _sourceStream.listen(_onData, onError: _onError, onDone: _onDone); 293 _sourceStream.listen(_onData, onError: _onError, onDone: _onDone);
294 } else { 294 } else {
295 _subscription.resume(); 295 _subscription.resume();
296 } 296 }
297 } 297 }
298 298
299 /// Removes all requests and closes them. 299 /// Removes all requests and closes them.
(...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after
347 /// 347 ///
348 /// Events from the source stream are sent to the first request in the 348 /// Events from the source stream are sent to the first request in the
349 /// queue until it reports itself as [isComplete]. 349 /// queue until it reports itself as [isComplete].
350 /// 350 ///
351 /// When the first request in the queue `isComplete`, either when becoming 351 /// When the first request in the queue `isComplete`, either when becoming
352 /// the first request or after receiving an event, its [close] methods is 352 /// the first request or after receiving an event, its [close] methods is
353 /// called. 353 /// called.
354 /// 354 ///
355 /// The [close] method is also called immediately when the source stream 355 /// The [close] method is also called immediately when the source stream
356 /// is done. 356 /// is done.
357 abstract class _EventRequest implements EventSink { 357 abstract class _EventRequest {
358 /// Handle available events. 358 /// Handle available events.
359 /// 359 ///
360 /// The available events are provided as a queue. The `addEvents` function 360 /// The available events are provided as a queue. The `addEvents` function
361 /// should only remove events from the front of the event queue, e.g., 361 /// should only remove events from the front of the event queue, e.g.,
362 /// using [removeFirst]. 362 /// using [removeFirst].
363 /// 363 ///
364 /// Returns `true` if the request is completed, or `false` if it needs 364 /// Returns `true` if the request is completed, or `false` if it needs
365 /// more events. 365 /// more events.
366 /// The call may keep events in the queue until the requeust is complete, 366 /// The call may keep events in the queue until the requeust is complete,
367 /// or it may remove them immediately. 367 /// or it may remove them immediately.
(...skipping 96 matching lines...) Expand 10 before | Expand all | Expand 10 after
464 /// 464 ///
465 /// The request [isComplete] when the length of [_list] reaches 465 /// The request [isComplete] when the length of [_list] reaches
466 /// this value. 466 /// this value.
467 final int _eventsToTake; 467 final int _eventsToTake;
468 468
469 _TakeRequest(this._eventsToTake) : _completer = new Completer<List<T>>(); 469 _TakeRequest(this._eventsToTake) : _completer = new Completer<List<T>>();
470 470
471 /// The future completed when the correct number of events have been captured. 471 /// The future completed when the correct number of events have been captured.
472 Future get future => _completer.future; 472 Future get future => _completer.future;
473 473
474 bool addEvents(Queue<Events> events) { 474 bool addEvents(Queue<Result> events) {
475 while (_list.length < _eventsToTake) { 475 while (_list.length < _eventsToTake) {
476 if (events.isEmpty) return false; 476 if (events.isEmpty) return false;
477 var result = events.removeFirst(); 477 var result = events.removeFirst();
478 if (result.isError) { 478 if (result.isError) {
479 result.complete(_completer); 479 result.complete(_completer);
480 return true; 480 return true;
481 } 481 }
482 _list.add(result.asValue.value); 482 _list.add(result.asValue.value);
483 } 483 }
484 _completer.complete(_list); 484 _completer.complete(_list);
485 return true; 485 return true;
486 } 486 }
487 487
488 void close(Queue<Events> events) { 488 void close(Queue<Result> events) {
489 _completer.complete(_list); 489 _completer.complete(_list);
490 } 490 }
491 } 491 }
492 492
493 /// Request for a [StreamQueue.cancel] call. 493 /// Request for a [StreamQueue.cancel] call.
494 /// 494 ///
495 /// The request needs no events, it just waits in the request queue 495 /// The request needs no events, it just waits in the request queue
496 /// until all previous events are fulfilled, then it cancels the stream queue 496 /// until all previous events are fulfilled, then it cancels the stream queue
497 /// source subscription. 497 /// source subscription.
498 class _CancelRequest implements _EventRequest { 498 class _CancelRequest implements _EventRequest {
(...skipping 111 matching lines...) Expand 10 before | Expand all | Expand 10 after
610 _completer.complete(true); 610 _completer.complete(true);
611 return true; 611 return true;
612 } 612 }
613 return false; 613 return false;
614 } 614 }
615 615
616 void close(_) { 616 void close(_) {
617 _completer.complete(false); 617 _completer.complete(false);
618 } 618 }
619 } 619 }
OLDNEW
« no previous file with comments | « no previous file | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698