OLD | NEW |
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.async; | 5 part of dart.async; |
6 | 6 |
7 // ------------------------------------------------------------------- | 7 // ------------------------------------------------------------------- |
8 // Core Stream types | 8 // Core Stream types |
9 // ------------------------------------------------------------------- | 9 // ------------------------------------------------------------------- |
10 | 10 |
(...skipping 1308 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1319 * | 1319 * |
1320 * * The synchronous methods of [EventSink] were called, resulting in an | 1320 * * The synchronous methods of [EventSink] were called, resulting in an |
1321 * error. If there is no active future (like from an addStream call), the | 1321 * error. If there is no active future (like from an addStream call), the |
1322 * [done] future will complete with that error | 1322 * [done] future will complete with that error |
1323 */ | 1323 */ |
1324 Future get done; | 1324 Future get done; |
1325 } | 1325 } |
1326 | 1326 |
1327 | 1327 |
1328 /** | 1328 /** |
1329 * A [StreamSink] adapter for a [StreamConsumer]. | |
1330 * | |
1331 * The `StreamSinkAdapter` buffers the input given by all [EventSink] methods | |
1332 * and will delay an [addStream] until all buffered data has been forwarded to | |
1333 * the stream consumer. | |
1334 * | |
1335 * While the `StreamSinkAdapter` is bound to a stream (through [addStream]) any | |
1336 * call to the `StreamSinkAdapter` will throw a [StateError], except [done]. | |
1337 * When the [addStream] completes, the `StreamSinkAdapter` will again be open | |
1338 * to all calls. | |
1339 * | |
1340 * If events are added to the `StreamSinkAdapter` after the adapter is closed, | |
1341 * the events will be ignored. | |
1342 * Use the [done] future to be notified when the `StreamSinkAdapter` is closed. | |
1343 */ | |
1344 class StreamSinkAdapter<S> implements StreamSink<S> { | |
1345 final StreamConsumer<S> _target; | |
1346 Completer _doneCompleter = new Completer(); | |
1347 Future _doneFuture; | |
1348 StreamController<S> _controllerInstance; | |
1349 Completer _controllerCompleter; | |
1350 bool _isClosed = false; | |
1351 bool _isBound = false; | |
1352 bool _hasError = false; | |
1353 | |
1354 /** | |
1355 * Construct a new StreamSinkAdapter, from a `target` [StreamConsumer]. | |
1356 */ | |
1357 StreamSinkAdapter(StreamConsumer<S> target) : _target = target { | |
1358 _doneFuture = _doneCompleter.future; | |
1359 } | |
1360 | |
1361 void add(S data) { | |
1362 if (_isClosed) return; | |
1363 _controller.add(data); | |
1364 } | |
1365 | |
1366 void addError(error, [StackTrace stackTrace]) => | |
1367 _controller.addError(error, stackTrace); | |
1368 | |
1369 Future addStream(Stream<S> stream) { | |
1370 _checkNotBound(); | |
1371 _isBound = true; | |
1372 if (_hasError) return _doneFuture; | |
1373 // Wait for any sync operations to complete. | |
1374 Future targetAddStream() { | |
1375 return _target.addStream(stream) | |
1376 .whenComplete(() { | |
1377 _isBound = false; | |
1378 }); | |
1379 } | |
1380 if (_controllerInstance == null) return targetAddStream(); | |
1381 var future = _controllerCompleter.future; | |
1382 _controllerInstance.close(); | |
1383 return future.then((_) => targetAddStream()); | |
1384 } | |
1385 | |
1386 /** | |
1387 * Returns a [Future] that completes once all buffered events has been | |
1388 * accepted by the underlying [StreamConsumer]. | |
1389 * | |
1390 * The [Future] will complete with the value `this`. | |
1391 * | |
1392 * This method must not be called while an [addStream] is in progress. | |
1393 * | |
1394 * NOTE: This method does not guarantee anything except that the stream | |
1395 * consumer has received all buffered data. It does not guarantee that the | |
1396 * consumer has acted on the data in any way, or that the data has reached | |
1397 * its final destination. | |
1398 */ | |
1399 Future flush() { | |
1400 _checkNotBound(); | |
1401 if (_controllerInstance == null) return new Future.value(this); | |
1402 // Adding an empty stream-controller will return a future that will complete | |
1403 // when all data is done. | |
1404 _isBound = true; | |
1405 var future = _controllerCompleter.future; | |
1406 _controllerInstance.close(); | |
1407 return future.whenComplete(() { | |
1408 _isBound = false; | |
1409 }); | |
1410 } | |
1411 | |
1412 Future close() { | |
1413 _checkNotBound(); | |
1414 if (!_isClosed) { | |
1415 _isClosed = true; | |
1416 if (_controllerInstance != null) { | |
1417 _controllerInstance.close(); | |
1418 } else { | |
1419 _closeTarget(); | |
1420 } | |
1421 } | |
1422 return _doneFuture; | |
1423 } | |
1424 | |
1425 Future get done => _doneFuture; | |
1426 | |
1427 // Private helper methods. | |
1428 | |
1429 void _closeTarget() { | |
1430 _target.close() .then(_completeDoneValue, onError: _completeDoneError); | |
1431 } | |
1432 | |
1433 void _completeDoneValue(value) { | |
1434 if (_doneCompleter == null) return; | |
1435 _doneCompleter.complete(value); | |
1436 _doneCompleter = null; | |
1437 } | |
1438 | |
1439 void _completeDoneError(error, stackTrace) { | |
1440 if (_doneCompleter == null) return; | |
1441 _hasError = true; | |
1442 _doneCompleter.completeError(error, stackTrace); | |
1443 _doneCompleter = null; | |
1444 } | |
1445 | |
1446 StreamController<S> get _controller { | |
1447 _checkNotBound(); | |
1448 if (_isClosed) { | |
1449 throw new StateError("StreamSink is closed"); | |
1450 } | |
1451 if (_controllerInstance == null) { | |
1452 _controllerInstance = new StreamController<S>(sync: true); | |
1453 _controllerCompleter = new Completer(); | |
1454 _target.addStream(_controller.stream) | |
1455 .then( | |
1456 (_) { | |
1457 if (_isBound) { | |
1458 // A new stream takes over - forward values to that stream. | |
1459 _controllerCompleter.complete(this); | |
1460 _controllerCompleter = null; | |
1461 _controllerInstance = null; | |
1462 } else { | |
1463 // No new stream, .close was called. Close _target. | |
1464 _closeTarget(); | |
1465 } | |
1466 }, | |
1467 onError: (error, stackTrace) { | |
1468 if (_isBound) { | |
1469 // A new stream takes over - forward errors to that stream. | |
1470 _controllerCompleter.completeError(error, stackTrace); | |
1471 _controllerCompleter = null; | |
1472 _controllerInstance = null; | |
1473 } else { | |
1474 // No new stream. No need to close target, as it have already | |
1475 // failed. | |
1476 _completeDoneError(error, stackTrace); | |
1477 } | |
1478 }); | |
1479 } | |
1480 return _controllerInstance; | |
1481 } | |
1482 | |
1483 void _checkNotBound() { | |
1484 if (_isBound) { | |
1485 throw new StateError("StreamSink is processing an addStream call"); | |
1486 } | |
1487 } | |
1488 } | |
1489 | |
1490 | |
1491 /** | |
1492 * The target of a [Stream.transform] call. | 1329 * The target of a [Stream.transform] call. |
1493 * | 1330 * |
1494 * The [Stream.transform] call will pass itself to this object and then return | 1331 * The [Stream.transform] call will pass itself to this object and then return |
1495 * the resulting stream. | 1332 * the resulting stream. |
1496 * | 1333 * |
1497 * It is good practice to write transformers that can be used multiple times. | 1334 * It is good practice to write transformers that can be used multiple times. |
1498 */ | 1335 */ |
1499 abstract class StreamTransformer<S, T> { | 1336 abstract class StreamTransformer<S, T> { |
1500 | 1337 |
1501 /** | 1338 /** |
(...skipping 143 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1645 class _ControllerEventSinkWrapper<T> implements EventSink<T> { | 1482 class _ControllerEventSinkWrapper<T> implements EventSink<T> { |
1646 EventSink _sink; | 1483 EventSink _sink; |
1647 _ControllerEventSinkWrapper(this._sink); | 1484 _ControllerEventSinkWrapper(this._sink); |
1648 | 1485 |
1649 void add(T data) { _sink.add(data); } | 1486 void add(T data) { _sink.add(data); } |
1650 void addError(error, [StackTrace stackTrace]) { | 1487 void addError(error, [StackTrace stackTrace]) { |
1651 _sink.addError(error, stackTrace); | 1488 _sink.addError(error, stackTrace); |
1652 } | 1489 } |
1653 void close() { _sink.close(); } | 1490 void close() { _sink.close(); } |
1654 } | 1491 } |
OLD | NEW |