| OLD | NEW |
| 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 part of dart.async; | 5 part of dart.async; |
| 6 | 6 |
| 7 // ------------------------------------------------------------------- | 7 // ------------------------------------------------------------------- |
| 8 // Core Stream types | 8 // Core Stream types |
| 9 // ------------------------------------------------------------------- | 9 // ------------------------------------------------------------------- |
| 10 | 10 |
| (...skipping 1308 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1319 * | 1319 * |
| 1320 * * The synchronous methods of [EventSink] were called, resulting in an | 1320 * * The synchronous methods of [EventSink] were called, resulting in an |
| 1321 * error. If there is no active future (like from an addStream call), the | 1321 * error. If there is no active future (like from an addStream call), the |
| 1322 * [done] future will complete with that error | 1322 * [done] future will complete with that error |
| 1323 */ | 1323 */ |
| 1324 Future get done; | 1324 Future get done; |
| 1325 } | 1325 } |
| 1326 | 1326 |
| 1327 | 1327 |
| 1328 /** | 1328 /** |
| 1329 * A [StreamSink] adapter for a [StreamConsumer]. | |
| 1330 * | |
| 1331 * The `StreamSinkAdapter` buffers the input given by all [EventSink] methods | |
| 1332 * and will delay an [addStream] until all buffered data has been forwarded to | |
| 1333 * the stream consumer. | |
| 1334 * | |
| 1335 * While the `StreamSinkAdapter` is bound to a stream (through [addStream]) any | |
| 1336 * call to the `StreamSinkAdapter` will throw a [StateError], except [done]. | |
| 1337 * When the [addStream] completes, the `StreamSinkAdapter` will again be open | |
| 1338 * to all calls. | |
| 1339 * | |
| 1340 * If events are added to the `StreamSinkAdapter` after the adapter is closed, | |
| 1341 * the events will be ignored. | |
| 1342 * Use the [done] future to be notified when the `StreamSinkAdapter` is closed. | |
| 1343 */ | |
| 1344 class StreamSinkAdapter<S> implements StreamSink<S> { | |
| 1345 final StreamConsumer<S> _target; | |
| 1346 Completer _doneCompleter = new Completer(); | |
| 1347 Future _doneFuture; | |
| 1348 StreamController<S> _controllerInstance; | |
| 1349 Completer _controllerCompleter; | |
| 1350 bool _isClosed = false; | |
| 1351 bool _isBound = false; | |
| 1352 bool _hasError = false; | |
| 1353 | |
| 1354 /** | |
| 1355 * Construct a new StreamSinkAdapter, from a `target` [StreamConsumer]. | |
| 1356 */ | |
| 1357 StreamSinkAdapter(StreamConsumer<S> target) : _target = target { | |
| 1358 _doneFuture = _doneCompleter.future; | |
| 1359 } | |
| 1360 | |
| 1361 void add(S data) { | |
| 1362 if (_isClosed) return; | |
| 1363 _controller.add(data); | |
| 1364 } | |
| 1365 | |
| 1366 void addError(error, [StackTrace stackTrace]) => | |
| 1367 _controller.addError(error, stackTrace); | |
| 1368 | |
| 1369 Future addStream(Stream<S> stream) { | |
| 1370 _checkNotBound(); | |
| 1371 _isBound = true; | |
| 1372 if (_hasError) return _doneFuture; | |
| 1373 // Wait for any sync operations to complete. | |
| 1374 Future targetAddStream() { | |
| 1375 return _target.addStream(stream) | |
| 1376 .whenComplete(() { | |
| 1377 _isBound = false; | |
| 1378 }); | |
| 1379 } | |
| 1380 if (_controllerInstance == null) return targetAddStream(); | |
| 1381 var future = _controllerCompleter.future; | |
| 1382 _controllerInstance.close(); | |
| 1383 return future.then((_) => targetAddStream()); | |
| 1384 } | |
| 1385 | |
| 1386 /** | |
| 1387 * Returns a [Future] that completes once all buffered events has been | |
| 1388 * accepted by the underlying [StreamConsumer]. | |
| 1389 * | |
| 1390 * The [Future] will complete with the value `this`. | |
| 1391 * | |
| 1392 * This method must not be called while an [addStream] is in progress. | |
| 1393 * | |
| 1394 * NOTE: This method does not guarantee anything except that the stream | |
| 1395 * consumer has received all buffered data. It does not guarantee that the | |
| 1396 * consumer has acted on the data in any way, or that the data has reached | |
| 1397 * its final destination. | |
| 1398 */ | |
| 1399 Future flush() { | |
| 1400 _checkNotBound(); | |
| 1401 if (_controllerInstance == null) return new Future.value(this); | |
| 1402 // Adding an empty stream-controller will return a future that will complete | |
| 1403 // when all data is done. | |
| 1404 _isBound = true; | |
| 1405 var future = _controllerCompleter.future; | |
| 1406 _controllerInstance.close(); | |
| 1407 return future.whenComplete(() { | |
| 1408 _isBound = false; | |
| 1409 }); | |
| 1410 } | |
| 1411 | |
| 1412 Future close() { | |
| 1413 _checkNotBound(); | |
| 1414 if (!_isClosed) { | |
| 1415 _isClosed = true; | |
| 1416 if (_controllerInstance != null) { | |
| 1417 _controllerInstance.close(); | |
| 1418 } else { | |
| 1419 _closeTarget(); | |
| 1420 } | |
| 1421 } | |
| 1422 return _doneFuture; | |
| 1423 } | |
| 1424 | |
| 1425 Future get done => _doneFuture; | |
| 1426 | |
| 1427 // Private helper methods. | |
| 1428 | |
| 1429 void _closeTarget() { | |
| 1430 _target.close() .then(_completeDoneValue, onError: _completeDoneError); | |
| 1431 } | |
| 1432 | |
| 1433 void _completeDoneValue(value) { | |
| 1434 if (_doneCompleter == null) return; | |
| 1435 _doneCompleter.complete(value); | |
| 1436 _doneCompleter = null; | |
| 1437 } | |
| 1438 | |
| 1439 void _completeDoneError(error, stackTrace) { | |
| 1440 if (_doneCompleter == null) return; | |
| 1441 _hasError = true; | |
| 1442 _doneCompleter.completeError(error, stackTrace); | |
| 1443 _doneCompleter = null; | |
| 1444 } | |
| 1445 | |
| 1446 StreamController<S> get _controller { | |
| 1447 _checkNotBound(); | |
| 1448 if (_isClosed) { | |
| 1449 throw new StateError("StreamSink is closed"); | |
| 1450 } | |
| 1451 if (_controllerInstance == null) { | |
| 1452 _controllerInstance = new StreamController<S>(sync: true); | |
| 1453 _controllerCompleter = new Completer(); | |
| 1454 _target.addStream(_controller.stream) | |
| 1455 .then( | |
| 1456 (_) { | |
| 1457 if (_isBound) { | |
| 1458 // A new stream takes over - forward values to that stream. | |
| 1459 _controllerCompleter.complete(this); | |
| 1460 _controllerCompleter = null; | |
| 1461 _controllerInstance = null; | |
| 1462 } else { | |
| 1463 // No new stream, .close was called. Close _target. | |
| 1464 _closeTarget(); | |
| 1465 } | |
| 1466 }, | |
| 1467 onError: (error, stackTrace) { | |
| 1468 if (_isBound) { | |
| 1469 // A new stream takes over - forward errors to that stream. | |
| 1470 _controllerCompleter.completeError(error, stackTrace); | |
| 1471 _controllerCompleter = null; | |
| 1472 _controllerInstance = null; | |
| 1473 } else { | |
| 1474 // No new stream. No need to close target, as it have already | |
| 1475 // failed. | |
| 1476 _completeDoneError(error, stackTrace); | |
| 1477 } | |
| 1478 }); | |
| 1479 } | |
| 1480 return _controllerInstance; | |
| 1481 } | |
| 1482 | |
| 1483 void _checkNotBound() { | |
| 1484 if (_isBound) { | |
| 1485 throw new StateError("StreamSink is processing an addStream call"); | |
| 1486 } | |
| 1487 } | |
| 1488 } | |
| 1489 | |
| 1490 | |
| 1491 /** | |
| 1492 * The target of a [Stream.transform] call. | 1329 * The target of a [Stream.transform] call. |
| 1493 * | 1330 * |
| 1494 * The [Stream.transform] call will pass itself to this object and then return | 1331 * The [Stream.transform] call will pass itself to this object and then return |
| 1495 * the resulting stream. | 1332 * the resulting stream. |
| 1496 * | 1333 * |
| 1497 * It is good practice to write transformers that can be used multiple times. | 1334 * It is good practice to write transformers that can be used multiple times. |
| 1498 */ | 1335 */ |
| 1499 abstract class StreamTransformer<S, T> { | 1336 abstract class StreamTransformer<S, T> { |
| 1500 | 1337 |
| 1501 /** | 1338 /** |
| (...skipping 143 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1645 class _ControllerEventSinkWrapper<T> implements EventSink<T> { | 1482 class _ControllerEventSinkWrapper<T> implements EventSink<T> { |
| 1646 EventSink _sink; | 1483 EventSink _sink; |
| 1647 _ControllerEventSinkWrapper(this._sink); | 1484 _ControllerEventSinkWrapper(this._sink); |
| 1648 | 1485 |
| 1649 void add(T data) { _sink.add(data); } | 1486 void add(T data) { _sink.add(data); } |
| 1650 void addError(error, [StackTrace stackTrace]) { | 1487 void addError(error, [StackTrace stackTrace]) { |
| 1651 _sink.addError(error, stackTrace); | 1488 _sink.addError(error, stackTrace); |
| 1652 } | 1489 } |
| 1653 void close() { _sink.close(); } | 1490 void close() { _sink.close(); } |
| 1654 } | 1491 } |
| OLD | NEW |