Index: packages/async/test/subscription_transformer_test.dart |
diff --git a/packages/async/test/subscription_transformer_test.dart b/packages/async/test/subscription_transformer_test.dart |
new file mode 100644 |
index 0000000000000000000000000000000000000000..dbbf59712e0ef2482420c464d9560df9f2f9db26 |
--- /dev/null |
+++ b/packages/async/test/subscription_transformer_test.dart |
@@ -0,0 +1,288 @@ |
+// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file |
+// for details. All rights reserved. Use of this source code is governed by a |
+// BSD-style license that can be found in the LICENSE file. |
+ |
+import 'dart:async'; |
+ |
+import 'package:async/async.dart'; |
+import 'package:test/test.dart'; |
+ |
+import 'utils.dart'; |
+ |
+void main() { |
+ group("with no callbacks", () { |
+ test("forwards cancellation", () async { |
+ var isCanceled = false; |
+ var cancelCompleter = new Completer(); |
+ var controller = new StreamController(onCancel: expectAsync0(() { |
+ isCanceled = true; |
+ return cancelCompleter.future; |
+ })); |
+ var subscription = controller.stream |
+ .transform(subscriptionTransformer()) |
+ .listen(expectAsync1((_) {}, count: 0)); |
+ |
+ var cancelFired = false; |
+ subscription.cancel().then(expectAsync1((_) { |
+ cancelFired = true; |
+ })); |
+ |
+ await flushMicrotasks(); |
+ expect(isCanceled, isTrue); |
+ expect(cancelFired, isFalse); |
+ |
+ cancelCompleter.complete(); |
+ await flushMicrotasks(); |
+ expect(cancelFired, isTrue); |
+ |
+ // This shouldn't call the onCancel callback again. |
+ expect(subscription.cancel(), completes); |
+ }); |
+ |
+ test("forwards pausing and resuming", () async { |
+ var controller = new StreamController(); |
+ var subscription = controller.stream |
+ .transform(subscriptionTransformer()) |
+ .listen(expectAsync1((_) {}, count: 0)); |
+ |
+ subscription.pause(); |
+ await flushMicrotasks(); |
+ expect(controller.isPaused, isTrue); |
+ |
+ subscription.pause(); |
+ await flushMicrotasks(); |
+ expect(controller.isPaused, isTrue); |
+ |
+ subscription.resume(); |
+ await flushMicrotasks(); |
+ expect(controller.isPaused, isTrue); |
+ |
+ subscription.resume(); |
+ await flushMicrotasks(); |
+ expect(controller.isPaused, isFalse); |
+ }); |
+ |
+ test("forwards pausing with a resume future", () async { |
+ var controller = new StreamController(); |
+ var subscription = controller.stream |
+ .transform(subscriptionTransformer()) |
+ .listen(expectAsync1((_) {}, count: 0)); |
+ |
+ var completer = new Completer(); |
+ subscription.pause(completer.future); |
+ await flushMicrotasks(); |
+ expect(controller.isPaused, isTrue); |
+ |
+ completer.complete(); |
+ await flushMicrotasks(); |
+ expect(controller.isPaused, isFalse); |
+ }); |
+ }); |
+ |
+ group("with a cancel callback", () { |
+ test("invokes the callback when the subscription is canceled", () async { |
+ var isCanceled = false; |
+ var callbackInvoked = false; |
+ var controller = new StreamController(onCancel: expectAsync0(() { |
+ isCanceled = true; |
+ })); |
+ var subscription = controller.stream.transform( |
+ subscriptionTransformer(handleCancel: expectAsync1((inner) { |
+ callbackInvoked = true; |
+ inner.cancel(); |
+ }))).listen(expectAsync1((_) {}, count: 0)); |
+ |
+ await flushMicrotasks(); |
+ expect(callbackInvoked, isFalse); |
+ expect(isCanceled, isFalse); |
+ |
+ subscription.cancel(); |
+ await flushMicrotasks(); |
+ expect(callbackInvoked, isTrue); |
+ expect(isCanceled, isTrue); |
+ }); |
+ |
+ test("invokes the callback once and caches its result", () async { |
+ var completer = new Completer(); |
+ var controller = new StreamController(); |
+ var subscription = controller.stream |
+ .transform(subscriptionTransformer( |
+ handleCancel: expectAsync1((inner) => completer.future))) |
+ .listen(expectAsync1((_) {}, count: 0)); |
+ |
+ var cancelFired1 = false; |
+ subscription.cancel().then(expectAsync1((_) { |
+ cancelFired1 = true; |
+ })); |
+ |
+ var cancelFired2 = false; |
+ subscription.cancel().then(expectAsync1((_) { |
+ cancelFired2 = true; |
+ })); |
+ |
+ await flushMicrotasks(); |
+ expect(cancelFired1, isFalse); |
+ expect(cancelFired2, isFalse); |
+ |
+ completer.complete(); |
+ await flushMicrotasks(); |
+ expect(cancelFired1, isTrue); |
+ expect(cancelFired2, isTrue); |
+ }); |
+ }); |
+ |
+ group("with a pause callback", () { |
+ test("invokes the callback when pause is called", () async { |
+ var pauseCount = 0; |
+ var controller = new StreamController(); |
+ var subscription = controller.stream |
+ .transform(subscriptionTransformer( |
+ handlePause: expectAsync1((inner) { |
+ pauseCount++; |
+ inner.pause(); |
+ }, count: 3))) |
+ .listen(expectAsync1((_) {}, count: 0)); |
+ |
+ await flushMicrotasks(); |
+ expect(pauseCount, equals(0)); |
+ |
+ subscription.pause(); |
+ await flushMicrotasks(); |
+ expect(pauseCount, equals(1)); |
+ |
+ subscription.pause(); |
+ await flushMicrotasks(); |
+ expect(pauseCount, equals(2)); |
+ |
+ subscription.resume(); |
+ subscription.resume(); |
+ await flushMicrotasks(); |
+ expect(pauseCount, equals(2)); |
+ |
+ subscription.pause(); |
+ await flushMicrotasks(); |
+ expect(pauseCount, equals(3)); |
+ }); |
+ |
+ test("doesn't invoke the callback when the subscription has been canceled", |
+ () async { |
+ var controller = new StreamController(); |
+ var subscription = controller.stream |
+ .transform(subscriptionTransformer( |
+ handlePause: expectAsync1((_) {}, count: 0))) |
+ .listen(expectAsync1((_) {}, count: 0)); |
+ |
+ subscription.cancel(); |
+ subscription.pause(); |
+ subscription.pause(); |
+ subscription.pause(); |
+ }); |
+ }); |
+ |
+ group("with a resume callback", () { |
+ test("invokes the callback when resume is called", () async { |
+ var resumeCount = 0; |
+ var controller = new StreamController(); |
+ var subscription = controller.stream |
+ .transform(subscriptionTransformer( |
+ handleResume: expectAsync1((inner) { |
+ resumeCount++; |
+ inner.resume(); |
+ }, count: 3))) |
+ .listen(expectAsync1((_) {}, count: 0)); |
+ |
+ await flushMicrotasks(); |
+ expect(resumeCount, equals(0)); |
+ |
+ subscription.resume(); |
+ await flushMicrotasks(); |
+ expect(resumeCount, equals(1)); |
+ |
+ subscription.pause(); |
+ subscription.pause(); |
+ await flushMicrotasks(); |
+ expect(resumeCount, equals(1)); |
+ |
+ subscription.resume(); |
+ await flushMicrotasks(); |
+ expect(resumeCount, equals(2)); |
+ |
+ subscription.resume(); |
+ await flushMicrotasks(); |
+ expect(resumeCount, equals(3)); |
+ }); |
+ |
+ test("invokes the callback when a resume future completes", () async { |
+ var resumed = false; |
+ var controller = new StreamController(); |
+ var subscription = controller.stream.transform( |
+ subscriptionTransformer(handleResume: expectAsync1((inner) { |
+ resumed = true; |
+ inner.resume(); |
+ }))).listen(expectAsync1((_) {}, count: 0)); |
+ |
+ var completer = new Completer(); |
+ subscription.pause(completer.future); |
+ await flushMicrotasks(); |
+ expect(resumed, isFalse); |
+ |
+ completer.complete(); |
+ await flushMicrotasks(); |
+ expect(resumed, isTrue); |
+ }); |
+ |
+ test("doesn't invoke the callback when the subscription has been canceled", |
+ () async { |
+ var controller = new StreamController(); |
+ var subscription = controller.stream |
+ .transform(subscriptionTransformer( |
+ handlePause: expectAsync1((_) {}, count: 0))) |
+ .listen(expectAsync1((_) {}, count: 0)); |
+ |
+ subscription.cancel(); |
+ subscription.resume(); |
+ subscription.resume(); |
+ subscription.resume(); |
+ }); |
+ }); |
+ |
+ group("when the outer subscription is canceled but the inner is not", () { |
+ StreamSubscription subscription; |
+ setUp(() { |
+ var controller = new StreamController(); |
+ subscription = controller.stream |
+ .transform(subscriptionTransformer(handleCancel: (_) {})) |
+ .listen(expectAsync1((_) {}, count: 0), |
+ onError: expectAsync2((_, __) {}, count: 0), |
+ onDone: expectAsync0(() {}, count: 0)); |
+ subscription.cancel(); |
+ controller.add(1); |
+ controller.addError("oh no!"); |
+ controller.close(); |
+ }); |
+ |
+ test("doesn't call a new onData", () async { |
+ subscription.onData(expectAsync1((_) {}, count: 0)); |
+ await flushMicrotasks(); |
+ }); |
+ |
+ test("doesn't call a new onError", () async { |
+ subscription.onError(expectAsync2((_, __) {}, count: 0)); |
+ await flushMicrotasks(); |
+ }); |
+ |
+ test("doesn't call a new onDone", () async { |
+ subscription.onDone(expectAsync0(() {}, count: 0)); |
+ await flushMicrotasks(); |
+ }); |
+ |
+ test("isPaused returns false", () { |
+ expect(subscription.isPaused, isFalse); |
+ }); |
+ |
+ test("asFuture never completes", () async { |
+ subscription.asFuture().then(expectAsync1((_) {}, count: 0)); |
+ await flushMicrotasks(); |
+ }); |
+ }); |
+} |