| OLD | NEW | 
|---|
| 1 // Copyright (c) 2013, the Dart project authors.  Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors.  Please see the AUTHORS file | 
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a | 
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. | 
| 4 | 4 | 
| 5 part of dart.async; | 5 part of dart.async; | 
| 6 | 6 | 
| 7 // ------------------------------------------------------------------- | 7 // ------------------------------------------------------------------- | 
| 8 // Core Stream types | 8 // Core Stream types | 
| 9 // ------------------------------------------------------------------- | 9 // ------------------------------------------------------------------- | 
| 10 | 10 | 
| (...skipping 1308 matching lines...) Expand 10 before | Expand all | Expand 10 after  Loading... | 
| 1319    * | 1319    * | 
| 1320    * * The synchronous methods of [EventSink] were called, resulting in an | 1320    * * The synchronous methods of [EventSink] were called, resulting in an | 
| 1321    *   error. If there is no active future (like from an addStream call), the | 1321    *   error. If there is no active future (like from an addStream call), the | 
| 1322    *   [done] future will complete with that error | 1322    *   [done] future will complete with that error | 
| 1323    */ | 1323    */ | 
| 1324   Future get done; | 1324   Future get done; | 
| 1325 } | 1325 } | 
| 1326 | 1326 | 
| 1327 | 1327 | 
| 1328 /** | 1328 /** | 
| 1329  * A [StreamSink] adapter for a [StreamConsumer]. |  | 
| 1330  * |  | 
| 1331  * The `StreamSinkAdapter` buffers the input given by all [EventSink] methods |  | 
| 1332  * and will delay an [addStream] until all buffered data has been forwarded to |  | 
| 1333  * the stream consumer. |  | 
| 1334  * |  | 
| 1335  * While the `StreamSinkAdapter` is bound to a stream (through [addStream]) any |  | 
| 1336  * call to the `StreamSinkAdapter` will throw a [StateError], except [done]. |  | 
| 1337  * When the [addStream] completes, the `StreamSinkAdapter` will again be open |  | 
| 1338  * to all calls. |  | 
| 1339  * |  | 
| 1340  * If events are added to the `StreamSinkAdapter` after the adapter is closed, |  | 
| 1341  * the events will be ignored. |  | 
| 1342  * Use the [done] future to be notified when the `StreamSinkAdapter` is closed. |  | 
| 1343  */ |  | 
| 1344 class StreamSinkAdapter<S> implements StreamSink<S> { |  | 
| 1345   final StreamConsumer<S> _target; |  | 
| 1346   Completer _doneCompleter = new Completer(); |  | 
| 1347   Future _doneFuture; |  | 
| 1348   StreamController<S> _controllerInstance; |  | 
| 1349   Completer _controllerCompleter; |  | 
| 1350   bool _isClosed = false; |  | 
| 1351   bool _isBound = false; |  | 
| 1352   bool _hasError = false; |  | 
| 1353 |  | 
| 1354   /** |  | 
| 1355    * Construct a new StreamSinkAdapter, from a `target` [StreamConsumer]. |  | 
| 1356    */ |  | 
| 1357   StreamSinkAdapter(StreamConsumer<S> target) : _target = target { |  | 
| 1358     _doneFuture = _doneCompleter.future; |  | 
| 1359   } |  | 
| 1360 |  | 
| 1361   void add(S data) { |  | 
| 1362     if (_isClosed) return; |  | 
| 1363     _controller.add(data); |  | 
| 1364   } |  | 
| 1365 |  | 
| 1366   void addError(error, [StackTrace stackTrace]) => |  | 
| 1367       _controller.addError(error, stackTrace); |  | 
| 1368 |  | 
| 1369   Future addStream(Stream<S> stream) { |  | 
| 1370     _checkNotBound(); |  | 
| 1371     _isBound = true; |  | 
| 1372     if (_hasError) return _doneFuture; |  | 
| 1373     // Wait for any sync operations to complete. |  | 
| 1374     Future targetAddStream() { |  | 
| 1375       return _target.addStream(stream) |  | 
| 1376           .whenComplete(() { |  | 
| 1377             _isBound = false; |  | 
| 1378           }); |  | 
| 1379     } |  | 
| 1380     if (_controllerInstance == null) return targetAddStream(); |  | 
| 1381     var future = _controllerCompleter.future; |  | 
| 1382     _controllerInstance.close(); |  | 
| 1383     return future.then((_) => targetAddStream()); |  | 
| 1384   } |  | 
| 1385 |  | 
| 1386   /** |  | 
| 1387    * Returns a [Future] that completes once all buffered events has been |  | 
| 1388    * accepted by the underlying [StreamConsumer]. |  | 
| 1389    * |  | 
| 1390    * The [Future] will complete with the value `this`. |  | 
| 1391    * |  | 
| 1392    * This method must not be called while an [addStream] is in progress. |  | 
| 1393    * |  | 
| 1394    * NOTE: This method does not guarantee anything except that the stream |  | 
| 1395    * consumer has received all buffered data. It does not guarantee that the |  | 
| 1396    * consumer has acted on the data in any way, or that the data has reached |  | 
| 1397    * its final destination. |  | 
| 1398    */ |  | 
| 1399   Future flush() { |  | 
| 1400     _checkNotBound(); |  | 
| 1401     if (_controllerInstance == null) return new Future.value(this); |  | 
| 1402     // Adding an empty stream-controller will return a future that will complete |  | 
| 1403     // when all data is done. |  | 
| 1404     _isBound = true; |  | 
| 1405     var future = _controllerCompleter.future; |  | 
| 1406     _controllerInstance.close(); |  | 
| 1407     return future.whenComplete(() { |  | 
| 1408       _isBound = false; |  | 
| 1409     }); |  | 
| 1410   } |  | 
| 1411 |  | 
| 1412   Future close() { |  | 
| 1413     _checkNotBound(); |  | 
| 1414     if (!_isClosed) { |  | 
| 1415       _isClosed = true; |  | 
| 1416       if (_controllerInstance != null) { |  | 
| 1417         _controllerInstance.close(); |  | 
| 1418       } else { |  | 
| 1419         _closeTarget(); |  | 
| 1420       } |  | 
| 1421     } |  | 
| 1422     return _doneFuture; |  | 
| 1423   } |  | 
| 1424 |  | 
| 1425   Future get done => _doneFuture; |  | 
| 1426 |  | 
| 1427   // Private helper methods. |  | 
| 1428 |  | 
| 1429   void _closeTarget() { |  | 
| 1430     _target.close() .then(_completeDoneValue, onError: _completeDoneError); |  | 
| 1431   } |  | 
| 1432 |  | 
| 1433   void _completeDoneValue(value) { |  | 
| 1434     if (_doneCompleter == null) return; |  | 
| 1435     _doneCompleter.complete(value); |  | 
| 1436     _doneCompleter = null; |  | 
| 1437   } |  | 
| 1438 |  | 
| 1439   void _completeDoneError(error, stackTrace) { |  | 
| 1440     if (_doneCompleter == null) return; |  | 
| 1441     _hasError = true; |  | 
| 1442     _doneCompleter.completeError(error, stackTrace); |  | 
| 1443     _doneCompleter = null; |  | 
| 1444   } |  | 
| 1445 |  | 
| 1446   StreamController<S> get _controller { |  | 
| 1447     _checkNotBound(); |  | 
| 1448     if (_isClosed) { |  | 
| 1449       throw new StateError("StreamSink is closed"); |  | 
| 1450     } |  | 
| 1451     if (_controllerInstance == null) { |  | 
| 1452       _controllerInstance = new StreamController<S>(sync: true); |  | 
| 1453       _controllerCompleter = new Completer(); |  | 
| 1454       _target.addStream(_controller.stream) |  | 
| 1455           .then( |  | 
| 1456               (_) { |  | 
| 1457                 if (_isBound) { |  | 
| 1458                   // A new stream takes over - forward values to that stream. |  | 
| 1459                   _controllerCompleter.complete(this); |  | 
| 1460                   _controllerCompleter = null; |  | 
| 1461                   _controllerInstance = null; |  | 
| 1462                 } else { |  | 
| 1463                   // No new stream, .close was called. Close _target. |  | 
| 1464                   _closeTarget(); |  | 
| 1465                 } |  | 
| 1466               }, |  | 
| 1467               onError: (error, stackTrace) { |  | 
| 1468                 if (_isBound) { |  | 
| 1469                   // A new stream takes over - forward errors to that stream. |  | 
| 1470                   _controllerCompleter.completeError(error, stackTrace); |  | 
| 1471                   _controllerCompleter = null; |  | 
| 1472                   _controllerInstance = null; |  | 
| 1473                 } else { |  | 
| 1474                   // No new stream. No need to close target, as it have already |  | 
| 1475                   // failed. |  | 
| 1476                   _completeDoneError(error, stackTrace); |  | 
| 1477                 } |  | 
| 1478               }); |  | 
| 1479     } |  | 
| 1480     return _controllerInstance; |  | 
| 1481   } |  | 
| 1482 |  | 
| 1483   void _checkNotBound() { |  | 
| 1484     if (_isBound) { |  | 
| 1485       throw new StateError("StreamSink is processing an addStream call"); |  | 
| 1486     } |  | 
| 1487   } |  | 
| 1488 } |  | 
| 1489 |  | 
| 1490 |  | 
| 1491 /** |  | 
| 1492  * The target of a [Stream.transform] call. | 1329  * The target of a [Stream.transform] call. | 
| 1493  * | 1330  * | 
| 1494  * The [Stream.transform] call will pass itself to this object and then return | 1331  * The [Stream.transform] call will pass itself to this object and then return | 
| 1495  * the resulting stream. | 1332  * the resulting stream. | 
| 1496  * | 1333  * | 
| 1497  * It is good practice to write transformers that can be used multiple times. | 1334  * It is good practice to write transformers that can be used multiple times. | 
| 1498  */ | 1335  */ | 
| 1499 abstract class StreamTransformer<S, T> { | 1336 abstract class StreamTransformer<S, T> { | 
| 1500 | 1337 | 
| 1501   /** | 1338   /** | 
| (...skipping 143 matching lines...) Expand 10 before | Expand all | Expand 10 after  Loading... | 
| 1645 class _ControllerEventSinkWrapper<T> implements EventSink<T> { | 1482 class _ControllerEventSinkWrapper<T> implements EventSink<T> { | 
| 1646   EventSink _sink; | 1483   EventSink _sink; | 
| 1647   _ControllerEventSinkWrapper(this._sink); | 1484   _ControllerEventSinkWrapper(this._sink); | 
| 1648 | 1485 | 
| 1649   void add(T data) { _sink.add(data); } | 1486   void add(T data) { _sink.add(data); } | 
| 1650   void addError(error, [StackTrace stackTrace]) { | 1487   void addError(error, [StackTrace stackTrace]) { | 
| 1651     _sink.addError(error, stackTrace); | 1488     _sink.addError(error, stackTrace); | 
| 1652   } | 1489   } | 
| 1653   void close() { _sink.close(); } | 1490   void close() { _sink.close(); } | 
| 1654 } | 1491 } | 
| OLD | NEW | 
|---|