Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(294)

Side by Side Diff: sdk/lib/async/stream.dart

Issue 2872263003: Rename StreamGroup to GroupedEvents. (Closed)
Patch Set: Created 3 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « no previous file | tests/lib/async/stream_group_by_test.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 // ------------------------------------------------------------------- 7 // -------------------------------------------------------------------
8 // Core Stream types 8 // Core Stream types
9 // ------------------------------------------------------------------- 9 // -------------------------------------------------------------------
10 10
(...skipping 372 matching lines...) Expand 10 before | Expand all | Expand 10 after
383 * will individually call [convert] on each data event. 383 * will individually call [convert] on each data event.
384 */ 384 */
385 Stream<S> map<S>(S convert(T event)) { 385 Stream<S> map<S>(S convert(T event)) {
386 return new _MapStream<T, S>(this, convert); 386 return new _MapStream<T, S>(this, convert);
387 } 387 }
388 388
389 /// Groups events by a computed key. 389 /// Groups events by a computed key.
390 /// 390 ///
391 /// A key is extracted from incoming events. 391 /// A key is extracted from incoming events.
392 /// The first time a key is seen, a stream is created for it, and emitted 392 /// The first time a key is seen, a stream is created for it, and emitted
393 /// on the returned stream, along with the key, as a [StreamGroup] object. 393 /// on the returned stream, along with the key, as a [GroupedEvents] object.
394 /// Then the event is emitted on the stream ([StreamGroup.values]) 394 /// Then the event is emitted on the stream ([GroupedEvents.values])
395 /// corresponding to the key. 395 /// corresponding to the key.
396 /// 396 ///
397 /// An error on the source stream, or when calling the `key` functions, 397 /// An error on the source stream, or when calling the `key` functions,
398 /// will emit the error on the returned stream. 398 /// will emit the error on the returned stream.
399 /// 399 ///
400 /// Canceling the subscription on the returned stream will stop processing 400 /// Canceling the subscription on the returned stream will stop processing
401 /// and close the streams for all groups. 401 /// and close the streams for all groups.
402 /// 402 ///
403 /// Pausing the subscription on the returned stream will pause processing 403 /// Pausing the subscription on the returned stream will pause processing
404 /// and no further events are added to streams for the individual groups. 404 /// and no further events are added to streams for the individual groups.
405 /// 405 ///
406 /// Pausing or canceling an individual group stream has no effect other than 406 /// Pausing or canceling an individual group stream has no effect other than
407 /// on that stream. Events will be queued while the group stream 407 /// on that stream. Events will be queued while the group stream
408 /// is paused and until it is first listened to. 408 /// is paused and until it is first listened to.
409 /// If the [StreamGroup.values] stream is never listened to, 409 /// If the [GroupedEvents.values] stream is never listened to,
410 /// it will enqueue all the events unnecessarily. 410 /// it will enqueue all the events unnecessarily.
411 Stream<StreamGroup<K, T>> groupBy<K>(K key(T event)) { 411 Stream<GroupedEvents<K, T>> groupBy<K>(K key(T event)) {
412 var controller; 412 var controller;
413 controller = new StreamController<StreamGroup<K, T>>( 413 controller = new StreamController<GroupedEvents<K, T>>(
414 sync: true, 414 sync: true,
415 onListen: () { 415 onListen: () {
416 var groupControllers = new HashMap<K, StreamController<T>>(); 416 var groupControllers = new HashMap<K, StreamController<T>>();
417 417
418 void closeAll() { 418 void closeAll() {
419 for (var groupController in groupControllers.values) { 419 for (var groupController in groupControllers.values) {
420 groupController.close(); 420 groupController.close();
421 } 421 }
422 } 422 }
423 423
424 var subscription = this.listen( 424 var subscription = this.listen(
425 (data) { 425 (data) {
426 K theKey; 426 K theKey;
427 try { 427 try {
428 theKey = key(data); 428 theKey = key(data);
429 } catch (error, stackTrace) { 429 } catch (error, stackTrace) {
430 controller.addError(error, stackTrace); 430 controller.addError(error, stackTrace);
431 return; 431 return;
432 } 432 }
433 var groupController = groupControllers[theKey]; 433 var groupController = groupControllers[theKey];
434 if (groupController == null) { 434 if (groupController == null) {
435 groupController = 435 groupController =
436 new StreamController<T>.broadcast(sync: true); 436 new StreamController<T>.broadcast(sync: true);
437 groupControllers[theKey] = groupController; 437 groupControllers[theKey] = groupController;
438 controller.add( 438 controller.add(
439 new StreamGroup<K, T>(theKey, groupController.stream)); 439 new GroupedEvents<K, T>(theKey, groupController.stream));
440 } 440 }
441 groupController.add(data); 441 groupController.add(data);
442 }, 442 },
443 onError: controller.addError, 443 onError: controller.addError,
444 onDone: () { 444 onDone: () {
445 controller.close(); 445 controller.close();
446 closeAll(); 446 closeAll();
447 }); 447 });
448 controller.onPause = subscription.pause; 448 controller.onPause = subscription.pause;
449 controller.onResume = subscription.resume; 449 controller.onResume = subscription.resume;
(...skipping 1452 matching lines...) Expand 10 before | Expand all | Expand 10 after
1902 _sink.addError(error, stackTrace); 1902 _sink.addError(error, stackTrace);
1903 } 1903 }
1904 1904
1905 void close() { 1905 void close() {
1906 _sink.close(); 1906 _sink.close();
1907 } 1907 }
1908 } 1908 }
1909 1909
1910 /// A group created by [Stream.groupBy] or [Stream.groupByMapped]. 1910 /// A group created by [Stream.groupBy] or [Stream.groupByMapped].
1911 /// 1911 ///
1912 /// The stream created by `groupBy` emits a `StreamGroup` for each distinct key 1912 /// The stream created by `groupBy` emits a `GroupedEvents` for each distinct ke y
1913 /// it encounters. 1913 /// it encounters.
1914 /// This group contains the [key] itself, along with a stream of the [values] 1914 /// This group contains the [key] itself, along with a stream of the [values]
1915 /// associated with that key. 1915 /// associated with that key.
1916 class StreamGroup<K, V> { 1916 class GroupedEvents<K, V> {
1917 /// The key that identifiers the values emitted by [values]. 1917 /// The key that identifiers the values emitted by [values].
1918 final K key; 1918 final K key;
1919 1919
1920 /// The [values] that [GroupBy] have grouped by the common [key]. 1920 /// The [values] that [GroupBy] have grouped by the common [key].
1921 final Stream<V> values; 1921 final Stream<V> values;
1922 1922
1923 factory StreamGroup(K key, Stream<V> values) = StreamGroup<K, V>._; 1923 factory GroupedEvents(K key, Stream<V> values) = GroupedEvents<K, V>._;
1924 1924
1925 // Don't expose a generative constructor. 1925 // Don't expose a generative constructor.
1926 // This class is not intended for subclassing, so we don't want to promise 1926 // This class is not intended for subclassing, so we don't want to promise
1927 // it. We can change that in the future. 1927 // it. We can change that in the future.
1928 StreamGroup._(this.key, this.values); 1928 GroupedEvents._(this.key, this.values);
1929 1929
1930 /// Tells [values] to discard values instead of retaining them. 1930 /// Tells [values] to discard values instead of retaining them.
1931 /// 1931 ///
1932 /// Must only be used instead of listening to the [values] stream. 1932 /// Must only be used instead of listening to the [values] stream.
1933 /// If the stream has been listened to, this call fails. 1933 /// If the stream has been listened to, this call fails.
1934 /// After calling this method, listening on the [values] stream fails. 1934 /// After calling this method, listening on the [values] stream fails.
1935 Future cancel() { 1935 Future cancel() {
1936 // If values has been listened to, 1936 // If values has been listened to,
1937 // this throws a StateError saying that stream has already been listened to, 1937 // this throws a StateError saying that stream has already been listened to,
1938 // which is a correct error message for this call too. 1938 // which is a correct error message for this call too.
1939 return values.listen(null).cancel(); 1939 return values.listen(null).cancel();
1940 } 1940 }
1941 } 1941 }
OLDNEW
« no previous file with comments | « no previous file | tests/lib/async/stream_group_by_test.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698