OLD | NEW |
---|---|
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.async; | 5 part of dart.async; |
6 | 6 |
7 // ------------------------------------------------------------------- | 7 // ------------------------------------------------------------------- |
8 // Core Stream types | 8 // Core Stream types |
9 // ------------------------------------------------------------------- | 9 // ------------------------------------------------------------------- |
10 | 10 |
(...skipping 1308 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
1319 * | 1319 * |
1320 * * The synchronous methods of [EventSink] were called, resulting in an | 1320 * * The synchronous methods of [EventSink] were called, resulting in an |
1321 * error. If there is no active future (like from an addStream call), the | 1321 * error. If there is no active future (like from an addStream call), the |
1322 * [done] future will complete with that error | 1322 * [done] future will complete with that error |
1323 */ | 1323 */ |
1324 Future get done; | 1324 Future get done; |
1325 } | 1325 } |
1326 | 1326 |
1327 | 1327 |
1328 /** | 1328 /** |
1329 * A [StreamSink] adapter for a [StreamConsumer]. | |
1330 * | |
1331 * The `StreamSinkAdapter` buffers the input given by all [EventSink] methods | |
1332 * and will delay an [addStream] until all buffered data has been forwarded to | |
1333 * the stream consumer. | |
1334 * | |
1335 * While the `StreamSinkAdapter` is bound to a stream (through [addStream]) any | |
1336 * call to the `StreamSinkAdapter` will throw a [StateError], except [done]. | |
1337 * When the [addStream] completes, the `StreamSinkAdapter` will again be open | |
1338 * to all calls. | |
1339 * | |
1340 * If events are added to the `StreamSinkAdapter` after the adapter is closed, | |
1341 * the events will be ignored. | |
1342 * Use the [done] future to be notified when the `StreamSinkAdapter` is closed. | |
1343 */ | |
1344 class StreamSinkAdapter<S> implements StreamSink<S> { | |
1345 final StreamConsumer<S> _target; | |
1346 Completer _doneCompleter = new Completer(); | |
1347 Future _doneFuture; | |
1348 StreamController<S> _controllerInstance; | |
1349 Completer _controllerCompleter; | |
1350 bool _isClosed = false; | |
1351 bool _isBound = false; | |
1352 bool _hasError = false; | |
1353 | |
1354 /** | |
1355 * Construct a new StreamSinkAdapter, from a `target` [StreamConsumer]. | |
1356 */ | |
1357 StreamSinkAdapter(StreamConsumer<S> target) : _target = target { | |
1358 _doneFuture = _doneCompleter.future; | |
1359 } | |
1360 | |
1361 void add(S data) { | |
1362 if (_isClosed) return; | |
1363 _controller.add(data); | |
1364 } | |
1365 | |
1366 void addError(error, [StackTrace stackTrace]) => | |
1367 _controller.addError(error, stackTrace); | |
1368 | |
1369 Future addStream(Stream<S> stream) { | |
1370 _checkIsBound(); | |
Lasse Reichstein Nielsen
2014/03/19 13:27:20
Hmm, would probably sounds better as "_checkNotBou
Anders Johnsen
2014/03/19 13:56:39
Done.
| |
1371 _isBound = true; | |
1372 if (_hasError) return done; | |
Lasse Reichstein Nielsen
2014/03/19 13:27:20
return _doneFuture;
(I generally try to avoid hav
Anders Johnsen
2014/03/19 13:56:39
Done.
| |
1373 // Wait for any sync operations to complete. | |
1374 Future targetAddStream() { | |
1375 return _target.addStream(stream) | |
1376 .whenComplete(() { | |
1377 _isBound = false; | |
1378 }); | |
1379 } | |
1380 if (_controllerInstance == null) return targetAddStream(); | |
1381 var future = _controllerCompleter.future; | |
1382 _controllerInstance.close(); | |
1383 return future.then((_) => targetAddStream()); | |
1384 } | |
1385 | |
1386 /** | |
1387 * Returns a [Future] that completes once all buffered events is accepted by | |
Lasse Reichstein Nielsen
2014/03/19 13:27:20
is -> has been.
Anders Johnsen
2014/03/19 13:56:39
Done.
| |
1388 * the underlying [StreamConsumer]. | |
1389 * | |
1390 * It's an error to call this method, while an [addStream] is incomplete. | |
Lasse Reichstein Nielsen
2014/03/19 13:27:20
-> This method must not be called while an [addStr
Anders Johnsen
2014/03/19 13:56:39
Done.
| |
1391 * | |
1392 * NOTE: This is not necessarily the same as the data being flushed by the | |
1393 * operating system. | |
Lasse Reichstein Nielsen
2014/03/19 13:27:20
Remote this NOTE, keep the next.
Anders Johnsen
2014/03/19 13:56:39
Done.
| |
1394 * NOTE: This method does not guarantee anything except that the stream | |
1395 * consumer has received all buffered data. It does not guarantee that the | |
1396 * consumer has acted on the data in any way, or that the data has reached | |
1397 * its final destination. | |
1398 */ | |
1399 Future flush() { | |
1400 _checkIsBound(); | |
1401 if (_controllerInstance == null) return new Future.value(this); | |
Lasse Reichstein Nielsen
2014/03/19 13:27:20
Why is "this" the value of the future? Is this doc
Anders Johnsen
2014/03/19 13:56:39
Done.
| |
1402 // Adding an empty stream-controller will return a future that will complete | |
1403 // when all data is done. | |
1404 _isBound = true; | |
1405 var future = _controllerCompleter.future; | |
1406 _controllerInstance.close(); | |
1407 return future.whenComplete(() { | |
1408 _isBound = false; | |
1409 }); | |
Lasse Reichstein Nielsen
2014/03/19 13:27:20
Indentation: Unindent the previous two lines by fo
Anders Johnsen
2014/03/19 13:56:39
Done.
| |
1410 } | |
1411 | |
1412 Future close() { | |
1413 _checkIsBound(); | |
1414 if (!_isClosed) { | |
1415 _isClosed = true; | |
1416 if (_controllerInstance != null) { | |
1417 _controllerInstance.close(); | |
1418 } else { | |
1419 _closeTarget(); | |
1420 } | |
1421 } | |
1422 return done; | |
1423 } | |
1424 | |
1425 Future get done => _doneFuture; | |
1426 | |
1427 // Private helper methods. | |
1428 | |
1429 void _closeTarget() { | |
1430 _target.close() | |
1431 .then((value) => _completeDone(value: value), | |
1432 onError: (error) => _completeDone(error: error)); | |
1433 } | |
1434 | |
1435 void _completeDone({value, error}) { | |
Lasse Reichstein Nielsen
2014/03/19 13:27:20
This function expects one of two arguments.
Make i
Anders Johnsen
2014/03/19 13:56:39
Done.
| |
1436 if (_doneCompleter == null) return; | |
1437 if (error == null) { | |
1438 _doneCompleter.complete(value); | |
1439 } else { | |
1440 _hasError = true; | |
1441 _doneCompleter.completeError(error); | |
1442 } | |
1443 _doneCompleter = null; | |
1444 } | |
1445 | |
1446 StreamController<S> get _controller { | |
1447 _checkIsBound(); | |
1448 if (_isClosed) { | |
1449 throw new StateError("StreamSink is closed"); | |
1450 } | |
1451 if (_controllerInstance == null) { | |
1452 _controllerInstance = new StreamController<S>(sync: true); | |
1453 _controllerCompleter = new Completer(); | |
1454 _target.addStream(_controller.stream) | |
1455 .then( | |
1456 (_) { | |
1457 if (_isBound) { | |
1458 // A new stream takes over - forward values to that stream. | |
1459 _controllerCompleter.complete(this); | |
1460 _controllerCompleter = null; | |
1461 _controllerInstance = null; | |
1462 } else { | |
1463 // No new stream, .close was called. Close _target. | |
1464 _closeTarget(); | |
1465 } | |
1466 }, | |
1467 onError: (error) { | |
1468 if (_isBound) { | |
1469 // A new stream takes over - forward errors to that stream. | |
1470 _controllerCompleter.completeError(error); | |
1471 _controllerCompleter = null; | |
1472 _controllerInstance = null; | |
1473 } else { | |
1474 // No new stream. No need to close target, as it have already | |
1475 // failed. | |
1476 _completeDone(error: error); | |
1477 } | |
1478 }); | |
1479 } | |
1480 return _controllerInstance; | |
1481 } | |
1482 | |
1483 void _checkIsBound() { | |
1484 if (_isBound) { | |
1485 throw new StateError("StreamSink is bound to a stream"); | |
Lasse Reichstein Nielsen
2014/03/19 13:27:20
... is bound to a stream -> ... is processing an a
Anders Johnsen
2014/03/19 13:56:39
Done.
| |
1486 } | |
1487 } | |
1488 } | |
1489 | |
1490 | |
1491 /** | |
1329 * The target of a [Stream.transform] call. | 1492 * The target of a [Stream.transform] call. |
1330 * | 1493 * |
1331 * The [Stream.transform] call will pass itself to this object and then return | 1494 * The [Stream.transform] call will pass itself to this object and then return |
1332 * the resulting stream. | 1495 * the resulting stream. |
1333 * | 1496 * |
1334 * It is good practice to write transformers that can be used multiple times. | 1497 * It is good practice to write transformers that can be used multiple times. |
1335 */ | 1498 */ |
1336 abstract class StreamTransformer<S, T> { | 1499 abstract class StreamTransformer<S, T> { |
1337 | 1500 |
1338 /** | 1501 /** |
(...skipping 143 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
1482 class _ControllerEventSinkWrapper<T> implements EventSink<T> { | 1645 class _ControllerEventSinkWrapper<T> implements EventSink<T> { |
1483 EventSink _sink; | 1646 EventSink _sink; |
1484 _ControllerEventSinkWrapper(this._sink); | 1647 _ControllerEventSinkWrapper(this._sink); |
1485 | 1648 |
1486 void add(T data) { _sink.add(data); } | 1649 void add(T data) { _sink.add(data); } |
1487 void addError(error, [StackTrace stackTrace]) { | 1650 void addError(error, [StackTrace stackTrace]) { |
1488 _sink.addError(error, stackTrace); | 1651 _sink.addError(error, stackTrace); |
1489 } | 1652 } |
1490 void close() { _sink.close(); } | 1653 void close() { _sink.close(); } |
1491 } | 1654 } |
OLD | NEW |