| OLD | NEW | 
|     1 // Copyright (c) 2013, the Dart project authors.  Please see the AUTHORS file |     1 // Copyright (c) 2013, the Dart project authors.  Please see the AUTHORS file | 
|     2 // for details. All rights reserved. Use of this source code is governed by a |     2 // for details. All rights reserved. Use of this source code is governed by a | 
|     3 // BSD-style license that can be found in the LICENSE file. |     3 // BSD-style license that can be found in the LICENSE file. | 
|     4  |     4  | 
|     5 part of dart.async; |     5 part of dart.async; | 
|     6  |     6  | 
|     7 // ------------------------------------------------------------------- |     7 // ------------------------------------------------------------------- | 
|     8 // Core Stream types |     8 // Core Stream types | 
|     9 // ------------------------------------------------------------------- |     9 // ------------------------------------------------------------------- | 
|    10  |    10  | 
| (...skipping 372 matching lines...) Expand 10 before | Expand all | Expand 10 after  Loading... | 
|   383    * will individually call [convert] on each data event. |   383    * will individually call [convert] on each data event. | 
|   384    */ |   384    */ | 
|   385   Stream<S> map<S>(S convert(T event)) { |   385   Stream<S> map<S>(S convert(T event)) { | 
|   386     return new _MapStream<T, S>(this, convert); |   386     return new _MapStream<T, S>(this, convert); | 
|   387   } |   387   } | 
|   388  |   388  | 
|   389   /// Groups events by a computed key. |   389   /// Groups events by a computed key. | 
|   390   /// |   390   /// | 
|   391   /// A key is extracted from incoming events. |   391   /// A key is extracted from incoming events. | 
|   392   /// The first time a key is seen, a stream is created for it, and emitted |   392   /// The first time a key is seen, a stream is created for it, and emitted | 
|   393   /// on the returned stream, along with the key, as a [StreamGroup] object. |   393   /// on the returned stream, along with the key, as a [GroupedEvents] object. | 
|   394   /// Then the event is emitted on the stream ([StreamGroup.values]) |   394   /// Then the event is emitted on the stream ([GroupedEvents.values]) | 
|   395   /// corresponding to the key. |   395   /// corresponding to the key. | 
|   396   /// |   396   /// | 
|   397   /// An error on the source stream, or when calling the `key` functions, |   397   /// An error on the source stream, or when calling the `key` functions, | 
|   398   /// will emit the error on the returned stream. |   398   /// will emit the error on the returned stream. | 
|   399   /// |   399   /// | 
|   400   /// Canceling the subscription on the returned stream will stop processing |   400   /// Canceling the subscription on the returned stream will stop processing | 
|   401   /// and close the streams for all groups. |   401   /// and close the streams for all groups. | 
|   402   /// |   402   /// | 
|   403   /// Pausing the subscription on the returned stream will pause processing |   403   /// Pausing the subscription on the returned stream will pause processing | 
|   404   /// and no further events are added to streams for the individual groups. |   404   /// and no further events are added to streams for the individual groups. | 
|   405   /// |   405   /// | 
|   406   /// Pausing or canceling an individual group stream has no effect other than |   406   /// Pausing or canceling an individual group stream has no effect other than | 
|   407   /// on that stream. Events will be queued while the group stream |   407   /// on that stream. Events will be queued while the group stream | 
|   408   /// is paused and until it is first listened to. |   408   /// is paused and until it is first listened to. | 
|   409   /// If the [StreamGroup.values] stream is never listened to, |   409   /// If the [GroupedEvents.values] stream is never listened to, | 
|   410   /// it will enqueue all the events unnecessarily. |   410   /// it will enqueue all the events unnecessarily. | 
|   411   Stream<StreamGroup<K, T>> groupBy<K>(K key(T event)) { |   411   Stream<GroupedEvents<K, T>> groupBy<K>(K key(T event)) { | 
|   412     var controller; |   412     var controller; | 
|   413     controller = new StreamController<StreamGroup<K, T>>( |   413     controller = new StreamController<GroupedEvents<K, T>>( | 
|   414         sync: true, |   414         sync: true, | 
|   415         onListen: () { |   415         onListen: () { | 
|   416           var groupControllers = new HashMap<K, StreamController<T>>(); |   416           var groupControllers = new HashMap<K, StreamController<T>>(); | 
|   417  |   417  | 
|   418           void closeAll() { |   418           void closeAll() { | 
|   419             for (var groupController in groupControllers.values) { |   419             for (var groupController in groupControllers.values) { | 
|   420               groupController.close(); |   420               groupController.close(); | 
|   421             } |   421             } | 
|   422           } |   422           } | 
|   423  |   423  | 
|   424           var subscription = this.listen( |   424           var subscription = this.listen( | 
|   425               (data) { |   425               (data) { | 
|   426                 K theKey; |   426                 K theKey; | 
|   427                 try { |   427                 try { | 
|   428                   theKey = key(data); |   428                   theKey = key(data); | 
|   429                 } catch (error, stackTrace) { |   429                 } catch (error, stackTrace) { | 
|   430                   controller.addError(error, stackTrace); |   430                   controller.addError(error, stackTrace); | 
|   431                   return; |   431                   return; | 
|   432                 } |   432                 } | 
|   433                 var groupController = groupControllers[theKey]; |   433                 var groupController = groupControllers[theKey]; | 
|   434                 if (groupController == null) { |   434                 if (groupController == null) { | 
|   435                   groupController = |   435                   groupController = | 
|   436                       new StreamController<T>.broadcast(sync: true); |   436                       new StreamController<T>.broadcast(sync: true); | 
|   437                   groupControllers[theKey] = groupController; |   437                   groupControllers[theKey] = groupController; | 
|   438                   controller.add( |   438                   controller.add( | 
|   439                       new StreamGroup<K, T>(theKey, groupController.stream)); |   439                       new GroupedEvents<K, T>(theKey, groupController.stream)); | 
|   440                 } |   440                 } | 
|   441                 groupController.add(data); |   441                 groupController.add(data); | 
|   442               }, |   442               }, | 
|   443               onError: controller.addError, |   443               onError: controller.addError, | 
|   444               onDone: () { |   444               onDone: () { | 
|   445                 controller.close(); |   445                 controller.close(); | 
|   446                 closeAll(); |   446                 closeAll(); | 
|   447               }); |   447               }); | 
|   448           controller.onPause = subscription.pause; |   448           controller.onPause = subscription.pause; | 
|   449           controller.onResume = subscription.resume; |   449           controller.onResume = subscription.resume; | 
| (...skipping 1452 matching lines...) Expand 10 before | Expand all | Expand 10 after  Loading... | 
|  1902     _sink.addError(error, stackTrace); |  1902     _sink.addError(error, stackTrace); | 
|  1903   } |  1903   } | 
|  1904  |  1904  | 
|  1905   void close() { |  1905   void close() { | 
|  1906     _sink.close(); |  1906     _sink.close(); | 
|  1907   } |  1907   } | 
|  1908 } |  1908 } | 
|  1909  |  1909  | 
|  1910 /// A group created by [Stream.groupBy] or [Stream.groupByMapped]. |  1910 /// A group created by [Stream.groupBy] or [Stream.groupByMapped]. | 
|  1911 /// |  1911 /// | 
|  1912 /// The stream created by `groupBy` emits a `StreamGroup` for each distinct key |  1912 /// The stream created by `groupBy` emits a `GroupedEvents` for each distinct ke
      y | 
|  1913 /// it encounters. |  1913 /// it encounters. | 
|  1914 /// This group contains the [key] itself, along with a stream of the [values] |  1914 /// This group contains the [key] itself, along with a stream of the [values] | 
|  1915 /// associated with that key. |  1915 /// associated with that key. | 
|  1916 class StreamGroup<K, V> { |  1916 class GroupedEvents<K, V> { | 
|  1917   /// The key that identifiers the values emitted by [values]. |  1917   /// The key that identifiers the values emitted by [values]. | 
|  1918   final K key; |  1918   final K key; | 
|  1919  |  1919  | 
|  1920   /// The [values] that [GroupBy] have grouped by the common [key]. |  1920   /// The [values] that [GroupBy] have grouped by the common [key]. | 
|  1921   final Stream<V> values; |  1921   final Stream<V> values; | 
|  1922  |  1922  | 
|  1923   factory StreamGroup(K key, Stream<V> values) = StreamGroup<K, V>._; |  1923   factory GroupedEvents(K key, Stream<V> values) = GroupedEvents<K, V>._; | 
|  1924  |  1924  | 
|  1925   // Don't expose a generative constructor. |  1925   // Don't expose a generative constructor. | 
|  1926   // This class is not intended for subclassing, so we don't want to promise |  1926   // This class is not intended for subclassing, so we don't want to promise | 
|  1927   // it. We can change that in the future. |  1927   // it. We can change that in the future. | 
|  1928   StreamGroup._(this.key, this.values); |  1928   GroupedEvents._(this.key, this.values); | 
|  1929  |  1929  | 
|  1930   /// Tells [values] to discard values instead of retaining them. |  1930   /// Tells [values] to discard values instead of retaining them. | 
|  1931   /// |  1931   /// | 
|  1932   /// Must only be used instead of listening to the [values] stream. |  1932   /// Must only be used instead of listening to the [values] stream. | 
|  1933   /// If the stream has been listened to, this call fails. |  1933   /// If the stream has been listened to, this call fails. | 
|  1934   /// After calling this method, listening on the [values] stream fails. |  1934   /// After calling this method, listening on the [values] stream fails. | 
|  1935   Future cancel() { |  1935   Future cancel() { | 
|  1936     // If values has been listened to, |  1936     // If values has been listened to, | 
|  1937     // this throws a StateError saying that stream has already been listened to, |  1937     // this throws a StateError saying that stream has already been listened to, | 
|  1938     // which is a correct error message for this call too. |  1938     // which is a correct error message for this call too. | 
|  1939     return values.listen(null).cancel(); |  1939     return values.listen(null).cancel(); | 
|  1940   } |  1940   } | 
|  1941 } |  1941 } | 
| OLD | NEW |