Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1605)

Unified Diff: lib/src/stream_queue.dart

Issue 1841223002: Fix most strong mode warnings. (Closed) Base URL: git@github.com:dart-lang/async.git@master
Patch Set: Code review changes Created 4 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « lib/src/stream_group.dart ('k') | lib/src/stream_sink_completer.dart » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: lib/src/stream_queue.dart
diff --git a/lib/src/stream_queue.dart b/lib/src/stream_queue.dart
index 8482e0c65afb5b1dc2993689da335fc5a5d3c8d7..4ce9e1301e45c76cbcbdadb4fd40be3d7564a205 100644
--- a/lib/src/stream_queue.dart
+++ b/lib/src/stream_queue.dart
@@ -94,7 +94,7 @@ abstract class StreamQueue<T> {
final Queue<_EventRequest> _requestQueue = new Queue();
/// Create a `StreamQueue` of the events of [source].
- factory StreamQueue(Stream source) = _StreamQueue<T>;
+ factory StreamQueue(Stream<T> source) = _StreamQueue<T>;
StreamQueue._();
@@ -274,7 +274,7 @@ abstract class StreamQueue<T> {
/// Can only be used by the very last request (the stream queue must
/// be closed by that request).
/// Only used by [rest].
- Stream _extractStream();
+ Stream<T> _extractStream();
/// Requests that the event source pauses events.
///
@@ -342,13 +342,13 @@ abstract class StreamQueue<T> {
/// to when a request needs events.
class _StreamQueue<T> extends StreamQueue<T> {
/// Source of events.
- final Stream _sourceStream;
+ final Stream<T> _sourceStream;
/// Subscription on [_sourceStream] while listening for events.
///
/// Set to subscription when listening, and set to `null` when the
/// subscription is done (and [_isDone] is set to true).
- StreamSubscription _subscription;
+ StreamSubscription<T> _subscription;
_StreamQueue(this._sourceStream) : super._();
@@ -422,7 +422,7 @@ class _StreamQueue<T> extends StreamQueue<T> {
///
/// The [close] method is also called immediately when the source stream
/// is done.
-abstract class _EventRequest {
+abstract class _EventRequest<T> {
/// Handle available events.
///
/// The available events are provided as a queue. The `update` function
@@ -443,22 +443,22 @@ abstract class _EventRequest {
/// If the function returns `false` when the stream has already closed
/// ([isDone] is true), then the request must call
/// [StreamQueue._updateRequests] itself when it's ready to continue.
- bool update(Queue<Result> events, bool isDone);
+ bool update(Queue<Result<T>> events, bool isDone);
}
/// Request for a [StreamQueue.next] call.
///
/// Completes the returned future when receiving the first event,
/// and is then complete.
-class _NextRequest<T> implements _EventRequest {
+class _NextRequest<T> implements _EventRequest<T> {
/// Completer for the future returned by [StreamQueue.next].
- final Completer _completer;
+ final _completer = new Completer<T>();
- _NextRequest() : _completer = new Completer<T>();
+ _NextRequest();
Future<T> get future => _completer.future;
- bool update(Queue<Result> events, bool isDone) {
+ bool update(Queue<Result<T>> events, bool isDone) {
if (events.isNotEmpty) {
events.removeFirst().complete(_completer);
return true;
@@ -474,9 +474,9 @@ class _NextRequest<T> implements _EventRequest {
}
/// Request for a [StreamQueue.skip] call.
-class _SkipRequest implements _EventRequest {
+class _SkipRequest<T> implements _EventRequest<T> {
/// Completer for the future returned by the skip call.
- final Completer _completer = new Completer<int>();
+ final _completer = new Completer<int>();
/// Number of remaining events to skip.
///
@@ -489,9 +489,9 @@ class _SkipRequest implements _EventRequest {
_SkipRequest(this._eventsToSkip);
/// The future completed when the correct number of events have been skipped.
- Future get future => _completer.future;
+ Future<int> get future => _completer.future;
- bool update(Queue<Result> events, bool isDone) {
+ bool update(Queue<Result<T>> events, bool isDone) {
while (_eventsToSkip > 0) {
if (events.isEmpty) {
if (isDone) break;
@@ -501,7 +501,7 @@ class _SkipRequest implements _EventRequest {
var event = events.removeFirst();
if (event.isError) {
- event.complete(_completer);
+ _completer.completeError(event.asError.error, event.asError.stackTrace);
return true;
}
}
@@ -511,12 +511,12 @@ class _SkipRequest implements _EventRequest {
}
/// Request for a [StreamQueue.take] call.
-class _TakeRequest<T> implements _EventRequest {
+class _TakeRequest<T> implements _EventRequest<T> {
/// Completer for the future returned by the take call.
- final Completer _completer;
+ final _completer = new Completer<List<T>>();
/// List collecting events until enough have been seen.
- final List _list = <T>[];
+ final _list = <T>[];
/// Number of events to capture.
///
@@ -524,24 +524,24 @@ class _TakeRequest<T> implements _EventRequest {
/// this value.
final int _eventsToTake;
- _TakeRequest(this._eventsToTake) : _completer = new Completer<List<T>>();
+ _TakeRequest(this._eventsToTake);
/// The future completed when the correct number of events have been captured.
- Future get future => _completer.future;
+ Future<List<T>> get future => _completer.future;
- bool update(Queue<Result> events, bool isDone) {
+ bool update(Queue<Result<T>> events, bool isDone) {
while (_list.length < _eventsToTake) {
if (events.isEmpty) {
if (isDone) break;
return false;
}
- var result = events.removeFirst();
- if (result.isError) {
- result.complete(_completer);
+ var event = events.removeFirst();
+ if (event.isError) {
+ _completer.completeError(event.asError.error, event.asError.stackTrace);
return true;
}
- _list.add(result.asValue.value);
+ _list.add(event.asValue.value);
}
_completer.complete(_list);
return true;
@@ -553,9 +553,9 @@ class _TakeRequest<T> implements _EventRequest {
/// The request needs no events, it just waits in the request queue
/// until all previous events are fulfilled, then it cancels the stream queue
/// source subscription.
-class _CancelRequest implements _EventRequest {
+class _CancelRequest<T> implements _EventRequest<T> {
/// Completer for the future returned by the `cancel` call.
- final Completer _completer = new Completer();
+ final _completer = new Completer();
/// The [StreamQueue] object that has this request queued.
///
@@ -568,7 +568,7 @@ class _CancelRequest implements _EventRequest {
/// The future completed when the cancel request is completed.
Future get future => _completer.future;
- bool update(Queue<Result> events, bool isDone) {
+ bool update(Queue<Result<T>> events, bool isDone) {
if (_streamQueue._isDone) {
_completer.complete();
} else {
@@ -584,22 +584,22 @@ class _CancelRequest implements _EventRequest {
/// The request is always complete, it just waits in the request queue
/// until all previous events are fulfilled, then it takes over the
/// stream events subscription and creates a stream from it.
-class _RestRequest<T> implements _EventRequest {
+class _RestRequest<T> implements _EventRequest<T> {
/// Completer for the stream returned by the `rest` call.
- final StreamCompleter _completer = new StreamCompleter<T>();
+ final _completer = new StreamCompleter<T>();
/// The [StreamQueue] object that has this request queued.
///
/// When the event is completed, it needs to cancel the active subscription
/// of the `StreamQueue` object, if any.
- final StreamQueue _streamQueue;
+ final StreamQueue<T> _streamQueue;
_RestRequest(this._streamQueue);
/// The stream which will contain the remaining events of [_streamQueue].
Stream<T> get stream => _completer.stream;
- bool update(Queue<Result> events, bool isDone) {
+ bool update(Queue<Result<T>> events, bool isDone) {
if (events.isEmpty) {
if (_streamQueue._isDone) {
_completer.setEmpty();
@@ -627,12 +627,12 @@ class _RestRequest<T> implements _EventRequest {
/// but doesn't consume the event.
/// If the request is closed without seeing an event, then
/// the [future] is completed with `false`.
-class _HasNextRequest<T> implements _EventRequest {
- final Completer _completer = new Completer<bool>();
+class _HasNextRequest<T> implements _EventRequest<T> {
+ final _completer = new Completer<bool>();
Future<bool> get future => _completer.future;
- bool update(Queue<Result> events, bool isDone) {
+ bool update(Queue<Result<T>> events, bool isDone) {
if (events.isNotEmpty) {
_completer.complete(true);
return true;
« no previous file with comments | « lib/src/stream_group.dart ('k') | lib/src/stream_sink_completer.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698