OLD | NEW |
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.async; | 5 part of dart.async; |
6 | 6 |
7 // ------------------------------------------------------------------- | 7 // ------------------------------------------------------------------- |
8 // Core Stream types | 8 // Core Stream types |
9 // ------------------------------------------------------------------- | 9 // ------------------------------------------------------------------- |
10 | 10 |
(...skipping 377 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
388 * | 388 * |
389 * The returned stream is a broadcast stream if this stream is. | 389 * The returned stream is a broadcast stream if this stream is. |
390 * The [convert] function is called once per data event per listener. | 390 * The [convert] function is called once per data event per listener. |
391 * If a broadcast stream is listened to more than once, each subscription | 391 * If a broadcast stream is listened to more than once, each subscription |
392 * will individually call [convert] on each data event. | 392 * will individually call [convert] on each data event. |
393 */ | 393 */ |
394 Stream<S> map<S>(S convert(T event)) { | 394 Stream<S> map<S>(S convert(T event)) { |
395 return new _MapStream<T, S>(this, convert); | 395 return new _MapStream<T, S>(this, convert); |
396 } | 396 } |
397 | 397 |
398 /// Groups events by a computed key. | |
399 /// | |
400 /// A key is extracted from incoming events. | |
401 /// The first time a key is seen, a stream is created for it, and emitted | |
402 /// on the returned stream, along with the key, as a [GroupedEvents] object. | |
403 /// Then the event is emitted on the stream ([GroupedEvents.values]) | |
404 /// corresponding to the key. | |
405 /// | |
406 /// An error on the source stream, or when calling the `key` functions, | |
407 /// will emit the error on the returned stream. | |
408 /// | |
409 /// Canceling the subscription on the returned stream will stop processing | |
410 /// and close the streams for all groups. | |
411 /// | |
412 /// Pausing the subscription on the returned stream will pause processing | |
413 /// and no further events are added to streams for the individual groups. | |
414 /// | |
415 /// Pausing or canceling an individual group stream has no effect other than | |
416 /// on that stream. Events will be queued while the group stream | |
417 /// is paused and until it is first listened to. | |
418 /// If the [GroupedEvents.values] stream is never listened to, | |
419 /// it will enqueue all the events unnecessarily. | |
420 Stream<GroupedEvents<K, T>> groupBy<K>(K key(T event)) { | |
421 var controller; | |
422 controller = new StreamController<GroupedEvents<K, T>>( | |
423 sync: true, | |
424 onListen: () { | |
425 var groupControllers = new HashMap<K, StreamController<T>>(); | |
426 | |
427 void closeAll() { | |
428 for (var groupController in groupControllers.values) { | |
429 groupController.close(); | |
430 } | |
431 } | |
432 | |
433 var subscription = this.listen( | |
434 (data) { | |
435 K theKey; | |
436 try { | |
437 theKey = key(data); | |
438 } catch (error, stackTrace) { | |
439 controller.addError(error, stackTrace); | |
440 return; | |
441 } | |
442 var groupController = groupControllers[theKey]; | |
443 if (groupController == null) { | |
444 groupController = | |
445 new StreamController<T>.broadcast(sync: true); | |
446 groupControllers[theKey] = groupController; | |
447 controller.add( | |
448 new GroupedEvents<K, T>(theKey, groupController.stream)); | |
449 } | |
450 groupController.add(data); | |
451 }, | |
452 onError: controller.addError, | |
453 onDone: () { | |
454 controller.close(); | |
455 closeAll(); | |
456 }); | |
457 controller.onPause = subscription.pause; | |
458 controller.onResume = subscription.resume; | |
459 controller.onCancel = () { | |
460 subscription.cancel(); | |
461 // Don't fire sync events in response to a callback. | |
462 scheduleMicrotask(closeAll); | |
463 }; | |
464 }); | |
465 return controller.stream; | |
466 } | |
467 | |
468 /** | 398 /** |
469 * Creates a new stream with each data event of this stream asynchronously | 399 * Creates a new stream with each data event of this stream asynchronously |
470 * mapped to a new event. | 400 * mapped to a new event. |
471 * | 401 * |
472 * This acts like [map], except that [convert] may return a [Future], | 402 * This acts like [map], except that [convert] may return a [Future], |
473 * and in that case, the stream waits for that future to complete before | 403 * and in that case, the stream waits for that future to complete before |
474 * continuing with its result. | 404 * continuing with its result. |
475 * | 405 * |
476 * The returned stream is a broadcast stream if this stream is. | 406 * The returned stream is a broadcast stream if this stream is. |
477 */ | 407 */ |
(...skipping 1582 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
2060 } | 1990 } |
2061 | 1991 |
2062 void addError(error, [StackTrace stackTrace]) { | 1992 void addError(error, [StackTrace stackTrace]) { |
2063 _sink.addError(error, stackTrace); | 1993 _sink.addError(error, stackTrace); |
2064 } | 1994 } |
2065 | 1995 |
2066 void close() { | 1996 void close() { |
2067 _sink.close(); | 1997 _sink.close(); |
2068 } | 1998 } |
2069 } | 1999 } |
2070 | |
2071 /// A group created by [Stream.groupBy]. | |
2072 /// | |
2073 /// The stream created by `groupBy` emits a `GroupedEvents` | |
2074 /// for each distinct key it encounters. | |
2075 /// This group contains the [key] itself, along with a stream of the [values] | |
2076 /// associated with that key. | |
2077 class GroupedEvents<K, V> { | |
2078 /// The key that identifiers the values emitted by [values]. | |
2079 final K key; | |
2080 | |
2081 /// The [values] that [GroupBy] have grouped by the common [key]. | |
2082 final Stream<V> values; | |
2083 | |
2084 factory GroupedEvents(K key, Stream<V> values) = GroupedEvents<K, V>._; | |
2085 | |
2086 // Don't expose a generative constructor. | |
2087 // This class is not intended for subclassing, so we don't want to promise | |
2088 // it. We can change that in the future. | |
2089 GroupedEvents._(this.key, this.values); | |
2090 | |
2091 /// Tells [values] to discard values instead of retaining them. | |
2092 /// | |
2093 /// Must only be used instead of listening to the [values] stream. | |
2094 /// If the stream has been listened to, this call fails. | |
2095 /// After calling this method, listening on the [values] stream fails. | |
2096 Future cancel() { | |
2097 // If values has been listened to, | |
2098 // this throws a StateError saying that stream has already been listened to, | |
2099 // which is a correct error message for this call too. | |
2100 return values.listen(null).cancel(); | |
2101 } | |
2102 } | |
OLD | NEW |