| OLD | NEW | 
|---|
| 1 // Copyright (c) 2013, the Dart project authors.  Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors.  Please see the AUTHORS file | 
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a | 
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. | 
| 4 | 4 | 
| 5 part of dart.async; | 5 part of dart.async; | 
| 6 | 6 | 
| 7 // ------------------------------------------------------------------- | 7 // ------------------------------------------------------------------- | 
| 8 // Core Stream types | 8 // Core Stream types | 
| 9 // ------------------------------------------------------------------- | 9 // ------------------------------------------------------------------- | 
| 10 | 10 | 
| (...skipping 368 matching lines...) Expand 10 before | Expand all | Expand 10 after  Loading... | 
| 379    * | 379    * | 
| 380    * The returned stream is a broadcast stream if this stream is. | 380    * The returned stream is a broadcast stream if this stream is. | 
| 381    * The [convert] function is called once per data event per listener. | 381    * The [convert] function is called once per data event per listener. | 
| 382    * If a broadcast stream is listened to more than once, each subscription | 382    * If a broadcast stream is listened to more than once, each subscription | 
| 383    * will individually call [convert] on each data event. | 383    * will individually call [convert] on each data event. | 
| 384    */ | 384    */ | 
| 385   Stream<S> map<S>(S convert(T event)) { | 385   Stream<S> map<S>(S convert(T event)) { | 
| 386     return new _MapStream<T, S>(this, convert); | 386     return new _MapStream<T, S>(this, convert); | 
| 387   } | 387   } | 
| 388 | 388 | 
|  | 389   /// Groups events by a computed key. | 
|  | 390   /// | 
|  | 391   /// A key is extracted from incoming events. | 
|  | 392   /// The first time a key is seen, a stream is created for it, and emitted | 
|  | 393   /// on the returned stream, along with the key, as a [StreamGroup] object. | 
|  | 394   /// Then the event is emitted on the stream ([StreamGroup.values]) | 
|  | 395   /// corresponding to the key. | 
|  | 396   /// | 
|  | 397   /// An error on the source stream, or when calling the `key` functions, | 
|  | 398   /// will emit the error on the returned stream. | 
|  | 399   /// | 
|  | 400   /// Canceling the subscription on the returned stream will stop processing | 
|  | 401   /// and close the streams for all groups. | 
|  | 402   /// | 
|  | 403   /// Pausing the subscription on the returned stream will pause processing | 
|  | 404   /// and no further events are added to streams for the individual groups. | 
|  | 405   /// | 
|  | 406   /// Pausing or canceling an individual group stream has no effect other than | 
|  | 407   /// on that stream. Events will be queued while the group stream | 
|  | 408   /// is paused and until it is first listened to. | 
|  | 409   /// If the [StreamGroup.values] stream is never listened to, | 
|  | 410   /// it will enqueue all the events unnecessarily. | 
|  | 411   Stream<StreamGroup<K, T>> groupBy<K>(K key(T event)) { | 
|  | 412     var controller; | 
|  | 413     controller = new StreamController<StreamGroup<K, T>>( | 
|  | 414         sync: true, | 
|  | 415         onListen: () { | 
|  | 416           var groupControllers = new HashMap<K, StreamController<T>>(); | 
|  | 417 | 
|  | 418           void closeAll() { | 
|  | 419             for (var groupController in groupControllers.values) { | 
|  | 420               groupController.close(); | 
|  | 421             } | 
|  | 422           } | 
|  | 423 | 
|  | 424           var subscription = this.listen( | 
|  | 425               (data) { | 
|  | 426                 K theKey; | 
|  | 427                 try { | 
|  | 428                   theKey = key(data); | 
|  | 429                 } catch (error, stackTrace) { | 
|  | 430                   controller.addError(error, stackTrace); | 
|  | 431                   return; | 
|  | 432                 } | 
|  | 433                 var groupController = groupControllers[theKey]; | 
|  | 434                 if (groupController == null) { | 
|  | 435                   groupController = | 
|  | 436                       new StreamController<T>.broadcast(sync: true); | 
|  | 437                   groupControllers[theKey] = groupController; | 
|  | 438                   controller.add( | 
|  | 439                       new StreamGroup<K, T>(theKey, groupController.stream)); | 
|  | 440                 } | 
|  | 441                 groupController.add(data); | 
|  | 442               }, | 
|  | 443               onError: controller.addError, | 
|  | 444               onDone: () { | 
|  | 445                 controller.close(); | 
|  | 446                 closeAll(); | 
|  | 447               }); | 
|  | 448           controller.onPause = subscription.pause; | 
|  | 449           controller.onResume = subscription.resume; | 
|  | 450           controller.onCancel = () { | 
|  | 451             subscription.cancel(); | 
|  | 452             // Don't fire sync events in response to a callback. | 
|  | 453             scheduleMicrotask(closeAll); | 
|  | 454           }; | 
|  | 455         }); | 
|  | 456     return controller.stream; | 
|  | 457   } | 
|  | 458 | 
| 389   /** | 459   /** | 
| 390    * Creates a new stream with each data event of this stream asynchronously | 460    * Creates a new stream with each data event of this stream asynchronously | 
| 391    * mapped to a new event. | 461    * mapped to a new event. | 
| 392    * | 462    * | 
| 393    * This acts like [map], except that [convert] may return a [Future], | 463    * This acts like [map], except that [convert] may return a [Future], | 
| 394    * and in that case, the stream waits for that future to complete before | 464    * and in that case, the stream waits for that future to complete before | 
| 395    * continuing with its result. | 465    * continuing with its result. | 
| 396    * | 466    * | 
| 397    * The returned stream is a broadcast stream if this stream is. | 467    * The returned stream is a broadcast stream if this stream is. | 
| 398    */ | 468    */ | 
| (...skipping 1390 matching lines...) Expand 10 before | Expand all | Expand 10 after  Loading... | 
| 1789   } | 1859   } | 
| 1790 | 1860 | 
| 1791   void addError(error, [StackTrace stackTrace]) { | 1861   void addError(error, [StackTrace stackTrace]) { | 
| 1792     _sink.addError(error, stackTrace); | 1862     _sink.addError(error, stackTrace); | 
| 1793   } | 1863   } | 
| 1794 | 1864 | 
| 1795   void close() { | 1865   void close() { | 
| 1796     _sink.close(); | 1866     _sink.close(); | 
| 1797   } | 1867   } | 
| 1798 } | 1868 } | 
|  | 1869 | 
|  | 1870 /// A group created by [Stream.groupBy] or [Stream.groupByMapped]. | 
|  | 1871 /// | 
|  | 1872 /// The stream created by `groupBy` emits a `StreamGroup` for each distinct key | 
|  | 1873 /// it encounters. | 
|  | 1874 /// This group contains the [key] itself, along with a stream of the [values] | 
|  | 1875 /// associated with that key. | 
|  | 1876 class StreamGroup<K, V> { | 
|  | 1877   /// The key that identifiers the values emitted by [values]. | 
|  | 1878   final K key; | 
|  | 1879 | 
|  | 1880   /// The [values] that [GroupBy] have grouped by the common [key]. | 
|  | 1881   final Stream<V> values; | 
|  | 1882 | 
|  | 1883   factory StreamGroup(K key, Stream<V> values) = StreamGroup<K, V>._; | 
|  | 1884 | 
|  | 1885   // Don't expose a generative constructor. | 
|  | 1886   // This class is not intended for subclassing, so we don't want to promise | 
|  | 1887   // it. We can change that in the future. | 
|  | 1888   StreamGroup._(this.key, this.values); | 
|  | 1889 | 
|  | 1890   /// Tells [values] to discard values instead of retaining them. | 
|  | 1891   /// | 
|  | 1892   /// Must only be used instead of listening to the [values] stream. | 
|  | 1893   /// If the stream has been listened to, this call fails. | 
|  | 1894   /// After calling this method, listening on the [values] stream fails. | 
|  | 1895   Future cancel() { | 
|  | 1896     // If values has been listened to, | 
|  | 1897     // this throws a StateError saying that stream has already been listened to, | 
|  | 1898     // which is a correct error message for this call too. | 
|  | 1899     return values.listen(null).cancel(); | 
|  | 1900   } | 
|  | 1901 } | 
| OLD | NEW | 
|---|