Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(949)

Side by Side Diff: sdk/lib/async/stream.dart

Issue 14196003: Change StreamController constructor. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Address comments and rebase. Created 7 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « runtime/bin/socket_patch.dart ('k') | sdk/lib/async/stream_controller.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 // ------------------------------------------------------------------- 7 // -------------------------------------------------------------------
8 // Core Stream types 8 // Core Stream types
9 // ------------------------------------------------------------------- 9 // -------------------------------------------------------------------
10 10
(...skipping 97 matching lines...) Expand 10 before | Expand all | Expand 10 after
108 } 108 }
109 109
110 void startPeriodicTimer() { 110 void startPeriodicTimer() {
111 assert(timer == null); 111 assert(timer == null);
112 timer = new Timer.periodic(period, (Timer timer) { 112 timer = new Timer.periodic(period, (Timer timer) {
113 sendEvent(); 113 sendEvent();
114 }); 114 });
115 } 115 }
116 116
117 controller = new StreamController<T>( 117 controller = new StreamController<T>(
118 onPauseStateChange: () { 118 onListen: () {
119 if (controller.isPaused) { 119 watch.start();
120 timer.cancel(); 120 startPeriodicTimer();
121 },
122 onPause: () {
123 timer.cancel();
124 timer = null;
125 watch.stop();
126 },
127 onResume: () {
128 assert(timer == null);
129 Duration elapsed = watch.elapsed;
130 watch.start();
131 timer = new Timer(period - elapsed, () {
121 timer = null; 132 timer = null;
122 watch.stop(); 133 startPeriodicTimer();
123 } else { 134 sendEvent();
124 assert(timer == null); 135 });
125 Duration elapsed = watch.elapsed;
126 watch.start();
127 timer = new Timer(period - elapsed, () {
128 timer = null;
129 startPeriodicTimer();
130 sendEvent();
131 });
132 }
133 }, 136 },
134 onSubscriptionStateChange: () { 137 onCancel: () {
135 if (controller.hasListener) { 138 if (timer != null) timer.cancel();
136 watch.start(); 139 timer = null;
137 startPeriodicTimer();
138 } else {
139 if (timer != null) timer.cancel();
140 timer = null;
141 }
142 }); 140 });
143 return controller.stream; 141 return controller.stream;
144 } 142 }
145 143
146 /** 144 /**
147 * Reports whether this stream is a broadcast stream. 145 * Reports whether this stream is a broadcast stream.
148 */ 146 */
149 bool get isBroadcast => false; 147 bool get isBroadcast => false;
150 148
151 /** 149 /**
(...skipping 855 matching lines...) Expand 10 before | Expand all | Expand 10 after
1007 abstract class StreamEventTransformer<S, T> implements StreamTransformer<S, T> { 1005 abstract class StreamEventTransformer<S, T> implements StreamTransformer<S, T> {
1008 const StreamEventTransformer(); 1006 const StreamEventTransformer();
1009 1007
1010 Stream<T> bind(Stream<S> source) { 1008 Stream<T> bind(Stream<S> source) {
1011 // Hackish way of buffering data that goes out of the event-transformer. 1009 // Hackish way of buffering data that goes out of the event-transformer.
1012 // TODO(floitsch): replace this with a correct solution. 1010 // TODO(floitsch): replace this with a correct solution.
1013 Stream transformingStream = new EventTransformStream<S, T>(source, this); 1011 Stream transformingStream = new EventTransformStream<S, T>(source, this);
1014 StreamController controller; 1012 StreamController controller;
1015 StreamSubscription subscription; 1013 StreamSubscription subscription;
1016 controller = new StreamController<T>( 1014 controller = new StreamController<T>(
1017 onPauseStateChange: () { 1015 onListen: () {
1018 if (controller.isPaused) { 1016 subscription = transformingStream.listen(
1019 subscription.pause(); 1017 controller.add,
1020 } else { 1018 onError: controller.addError,
1021 subscription.resume(); 1019 onDone: controller.close);
1022 }
1023 }, 1020 },
1024 onSubscriptionStateChange: () { 1021 onPause: () => subscription.pause(),
1025 if (controller.hasListener) { 1022 onResume: () => subscription.resume(),
1026 subscription = transformingStream.listen( 1023 onCancel: () => subscription.cancel());
1027 controller.add,
1028 onError: controller.addError,
1029 onDone: controller.close);
1030 } else {
1031 subscription.cancel();
1032 }
1033 });
1034 return controller.stream; 1024 return controller.stream;
1035 } 1025 }
1036 1026
1037 /** 1027 /**
1038 * Act on incoming data event. 1028 * Act on incoming data event.
1039 * 1029 *
1040 * The method may generate any number of events on the sink, but should 1030 * The method may generate any number of events on the sink, but should
1041 * not throw. 1031 * not throw.
1042 */ 1032 */
1043 void handleData(S event, EventSink<T> sink) { 1033 void handleData(S event, EventSink<T> sink) {
(...skipping 147 matching lines...) Expand 10 before | Expand all | Expand 10 after
1191 } 1181 }
1192 1182
1193 class _EventOutputSinkWrapper<T> extends EventSink<T> { 1183 class _EventOutputSinkWrapper<T> extends EventSink<T> {
1194 _EventOutputSink _sink; 1184 _EventOutputSink _sink;
1195 _EventOutputSinkWrapper(this._sink); 1185 _EventOutputSinkWrapper(this._sink);
1196 1186
1197 void add(T data) { _sink._sendData(data); } 1187 void add(T data) { _sink._sendData(data); }
1198 void addError(AsyncError error) { _sink._sendError(error); } 1188 void addError(AsyncError error) { _sink._sendError(error); }
1199 void close() { _sink._sendDone(); } 1189 void close() { _sink._sendDone(); }
1200 } 1190 }
OLDNEW
« no previous file with comments | « runtime/bin/socket_patch.dart ('k') | sdk/lib/async/stream_controller.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698