Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(394)

Side by Side Diff: sdk/lib/async/stream.dart

Issue 14196003: Change StreamController constructor. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Fix some bugs. Created 7 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 // ------------------------------------------------------------------- 7 // -------------------------------------------------------------------
8 // Core Stream types 8 // Core Stream types
9 // ------------------------------------------------------------------- 9 // -------------------------------------------------------------------
10 10
(...skipping 97 matching lines...) Expand 10 before | Expand all | Expand 10 after
108 } 108 }
109 109
110 void startPeriodicTimer() { 110 void startPeriodicTimer() {
111 assert(timer == null); 111 assert(timer == null);
112 timer = new Timer.periodic(period, (Timer timer) { 112 timer = new Timer.periodic(period, (Timer timer) {
113 sendEvent(); 113 sendEvent();
114 }); 114 });
115 } 115 }
116 116
117 controller = new StreamController<T>( 117 controller = new StreamController<T>(
118 onPauseStateChange: () { 118 onListen: () {
119 if (controller.isPaused) { 119 watch.start();
120 timer.cancel(); 120 startPeriodicTimer();
121 },
122 onPause: () {
123 timer.cancel();
124 timer = null;
125 watch.stop();
126 },
127 onResume: () {
128 assert(timer == null);
129 Duration elapsed = watch.elapsed;
130 watch.start();
131 timer = new Timer(period - elapsed, () {
121 timer = null; 132 timer = null;
122 watch.stop(); 133 startPeriodicTimer();
123 } else { 134 sendEvent();
124 assert(timer == null); 135 });
125 Duration elapsed = watch.elapsed;
126 watch.start();
127 timer = new Timer(period - elapsed, () {
128 timer = null;
129 startPeriodicTimer();
130 sendEvent();
131 });
132 }
133 }, 136 },
134 onSubscriptionStateChange: () { 137 onCancel: () {
135 if (controller.hasListener) { 138 if (timer != null) timer.cancel();
136 watch.start(); 139 timer = null;
137 startPeriodicTimer();
138 } else {
139 if (timer != null) timer.cancel();
140 timer = null;
141 }
142 }); 140 });
143 return controller.stream; 141 return controller.stream;
144 } 142 }
145 143
146 /** 144 /**
147 * Reports whether this stream is a broadcast stream. 145 * Reports whether this stream is a broadcast stream.
148 */ 146 */
149 bool get isBroadcast => false; 147 bool get isBroadcast => false;
150 148
151 /** 149 /**
(...skipping 814 matching lines...) Expand 10 before | Expand all | Expand 10 after
966 abstract class StreamEventTransformer<S, T> implements StreamTransformer<S, T> { 964 abstract class StreamEventTransformer<S, T> implements StreamTransformer<S, T> {
967 const StreamEventTransformer(); 965 const StreamEventTransformer();
968 966
969 Stream<T> bind(Stream<S> source) { 967 Stream<T> bind(Stream<S> source) {
970 // Hackish way of buffering data that goes out of the event-transformer. 968 // Hackish way of buffering data that goes out of the event-transformer.
971 // TODO(floitsch): replace this with a correct solution. 969 // TODO(floitsch): replace this with a correct solution.
972 Stream transformingStream = new EventTransformStream<S, T>(source, this); 970 Stream transformingStream = new EventTransformStream<S, T>(source, this);
973 StreamController controller; 971 StreamController controller;
974 StreamSubscription subscription; 972 StreamSubscription subscription;
975 controller = new StreamController<T>( 973 controller = new StreamController<T>(
976 onPauseStateChange: () { 974 onListen: () {
977 if (controller.isPaused) { 975 subscription = transformingStream.listen(
978 subscription.pause(); 976 controller.add,
979 } else { 977 onError: controller.addError,
980 subscription.resume(); 978 onDone: controller.close);
981 }
982 }, 979 },
983 onSubscriptionStateChange: () { 980 onPause: () => subscription.pause(),
984 if (controller.hasListener) { 981 onResume: () => subscription.resume(),
985 subscription = transformingStream.listen( 982 onCancel: () => subscription.cancel());
986 controller.add,
987 onError: controller.addError,
988 onDone: controller.close);
989 } else {
990 subscription.cancel();
991 }
992 });
993 return controller.stream; 983 return controller.stream;
994 } 984 }
995 985
996 /** 986 /**
997 * Act on incoming data event. 987 * Act on incoming data event.
998 * 988 *
999 * The method may generate any number of events on the sink, but should 989 * The method may generate any number of events on the sink, but should
1000 * not throw. 990 * not throw.
1001 */ 991 */
1002 void handleData(S event, EventSink<T> sink) { 992 void handleData(S event, EventSink<T> sink) {
(...skipping 147 matching lines...) Expand 10 before | Expand all | Expand 10 after
1150 } 1140 }
1151 1141
1152 class _EventOutputSinkWrapper<T> extends EventSink<T> { 1142 class _EventOutputSinkWrapper<T> extends EventSink<T> {
1153 _EventOutputSink _sink; 1143 _EventOutputSink _sink;
1154 _EventOutputSinkWrapper(this._sink); 1144 _EventOutputSinkWrapper(this._sink);
1155 1145
1156 void add(T data) { _sink._sendData(data); } 1146 void add(T data) { _sink._sendData(data); }
1157 void addError(AsyncError error) { _sink._sendError(error); } 1147 void addError(AsyncError error) { _sink._sendError(error); }
1158 void close() { _sink._sendDone(); } 1148 void close() { _sink._sendDone(); }
1159 } 1149 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698