OLD | NEW |
---|---|
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.async; | 5 part of dart.async; |
6 | 6 |
7 // ------------------------------------------------------------------- | 7 // ------------------------------------------------------------------- |
8 // Core Stream types | 8 // Core Stream types |
9 // ------------------------------------------------------------------- | 9 // ------------------------------------------------------------------- |
10 | 10 |
(...skipping 1308 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
1319 * | 1319 * |
1320 * * The synchronous methods of [EventSink] were called, resulting in an | 1320 * * The synchronous methods of [EventSink] were called, resulting in an |
1321 * error. If there is no active future (like from an addStream call), the | 1321 * error. If there is no active future (like from an addStream call), the |
1322 * [done] future will complete with that error | 1322 * [done] future will complete with that error |
1323 */ | 1323 */ |
1324 Future get done; | 1324 Future get done; |
1325 } | 1325 } |
1326 | 1326 |
1327 | 1327 |
1328 /** | 1328 /** |
1329 * A `StreamSinkAdapter` is an adapter implementation that can be used on a | |
Lasse Reichstein Nielsen
2014/03/18 15:34:45
Too long first line. How about:
An adapter that
Anders Johnsen
2014/03/19 11:48:55
Done.
| |
1330 * [StreamConsumer], to facilitate the functionality of a [StreamSink], | |
1331 * | |
1332 * The `StreamSinkAdapter` buffers the input given by all [EventSink] methods | |
1333 * and will delay an [addStream] until the buffer is flushed. | |
Lasse Reichstein Nielsen
2014/03/18 15:34:45
the buffer -> that buffer
This sounds like nothin
Anders Johnsen
2014/03/19 11:48:55
Done.
| |
1334 * | |
1335 * When the `StreamSinkAdapter` is bound to a stream (through [addStream]) any | |
Lasse Reichstein Nielsen
2014/03/18 15:34:45
When -> While
Anders Johnsen
2014/03/19 11:48:55
Done.
| |
1336 * call to the `StreamSinkAdapter` will throw a [StateError]. | |
Lasse Reichstein Nielsen
2014/03/18 15:34:45
Is this all calls? What about "done" (which is a g
Anders Johnsen
2014/03/19 11:48:55
Yes, only done... :)
| |
1337 * When the [addStream] completes, the `StreamSinkAdapter` will again be open | |
1338 * to all calls. | |
1339 * | |
1340 * If events are added to the `StreamSinkAdapter` after the adapter is closed, | |
1341 * the events will be ignored. | |
1342 * Use the [done] future to be notified when the `StreamSinkAdapter` is closed. | |
1343 */ | |
1344 class StreamSinkAdapter<S> implements StreamSink<S> { | |
1345 final StreamConsumer<S> _target; | |
1346 Completer _doneCompleter = new Completer(); | |
1347 Future _doneFuture; | |
1348 StreamController<S> _controllerInstance; | |
1349 Completer _controllerCompleter; | |
1350 bool _isClosed = false; | |
1351 bool _isBound = false; | |
1352 bool _hasError = false; | |
1353 | |
1354 /** | |
1355 * Construct a new StreamSinkAdapter, from a `target` [StreamConsumer]. | |
1356 */ | |
1357 StreamSinkAdapter(StreamConsumer<S> target) : _target = target { | |
1358 _doneFuture = _doneCompleter.future; | |
1359 } | |
1360 | |
1361 void add(S data) { | |
Lasse Reichstein Nielsen
2014/03/18 15:34:45
No _isBound check?
Anders Johnsen
2014/03/19 11:48:55
Is in _controller getter.
| |
1362 if (_isClosed) return; | |
1363 _controller.add(data); | |
1364 } | |
1365 | |
1366 void addError(error, [StackTrace stackTrace]) => | |
1367 _controller.addError(error, stackTrace); | |
Lasse Reichstein Nielsen
2014/03/18 15:34:45
No _isBound check?
Anders Johnsen
2014/03/19 11:48:55
Is in _controller getter.
| |
1368 | |
1369 Future addStream(Stream<S> stream) { | |
1370 if (_isBound) { | |
1371 throw new StateError("StreamSink is already bound to a stream"); | |
1372 } | |
1373 _isBound = true; | |
1374 if (_hasError) return done; | |
1375 // Wait for any sync operations to complete. | |
1376 Future targetAddStream() { | |
1377 return _target.addStream(stream) | |
1378 .whenComplete(() { | |
1379 _isBound = false; | |
1380 }); | |
1381 } | |
1382 if (_controllerInstance == null) return targetAddStream(); | |
1383 var future = _controllerCompleter.future; | |
1384 _controllerInstance.close(); | |
1385 return future.then((_) => targetAddStream()); | |
1386 } | |
1387 | |
1388 /** | |
1389 * Returns a [Future] that completes once all buffered events is accepted by | |
1390 * the underlying [StreamConsumer]. | |
1391 * | |
1392 * It's an error to call this method, while an [addStream] is incomplete. | |
1393 * | |
1394 * NOTE: This is not necessarily the same as the data being flushed by the | |
Lasse Reichstein Nielsen
2014/03/18 15:34:45
This note is confusion. It says what this call isn
Anders Johnsen
2014/03/19 11:48:55
Done.
| |
1395 * operating system. | |
1396 */ | |
1397 Future flush() { | |
1398 if (_isBound) { | |
Lasse Reichstein Nielsen
2014/03/18 15:34:45
Consider having a helper function for this check,
Anders Johnsen
2014/03/19 11:48:55
Done.
| |
1399 throw new StateError("StreamSink is bound to a stream"); | |
1400 } | |
1401 if (_controllerInstance == null) return new Future.value(this); | |
1402 // Adding an empty stream-controller will return a future that will complete | |
1403 // when all data is done. | |
1404 _isBound = true; | |
1405 var future = _controllerCompleter.future; | |
1406 _controllerInstance.close(); | |
1407 return future.whenComplete(() { | |
1408 _isBound = false; | |
1409 }); | |
1410 } | |
1411 | |
1412 Future close() { | |
1413 if (_isBound) { | |
1414 throw new StateError("StreamSink is bound to a stream"); | |
1415 } | |
1416 if (!_isClosed) { | |
1417 _isClosed = true; | |
1418 if (_controllerInstance != null) { | |
1419 _controllerInstance.close(); | |
1420 } else { | |
1421 _closeTarget(); | |
1422 } | |
1423 } | |
1424 return done; | |
1425 } | |
1426 | |
1427 Future get done => _doneFuture; | |
1428 | |
1429 // Private helper methods. | |
1430 | |
1431 void _closeTarget() { | |
1432 _target.close() | |
1433 .then((value) => _completeDone(value: value), | |
1434 onError: (error) => _completeDone(error: error)); | |
1435 } | |
1436 | |
1437 void _completeDone({value, error}) { | |
1438 if (_doneCompleter == null) return; | |
1439 if (error == null) { | |
1440 _doneCompleter.complete(value); | |
1441 } else { | |
1442 _hasError = true; | |
1443 _doneCompleter.completeError(error); | |
1444 } | |
1445 _doneCompleter = null; | |
1446 } | |
1447 | |
1448 StreamController<S> get _controller { | |
1449 if (_isBound) { | |
1450 throw new StateError("StreamSink is bound to a stream"); | |
1451 } | |
1452 if (_isClosed) { | |
1453 throw new StateError("StreamSink is closed"); | |
1454 } | |
1455 if (_controllerInstance == null) { | |
1456 _controllerInstance = new StreamController<S>(sync: true); | |
1457 _controllerCompleter = new Completer(); | |
1458 _target.addStream(_controller.stream) | |
1459 .then( | |
1460 (_) { | |
1461 if (_isBound) { | |
1462 // A new stream takes over - forward values to that stream. | |
1463 _controllerCompleter.complete(this); | |
1464 _controllerCompleter = null; | |
1465 _controllerInstance = null; | |
1466 } else { | |
1467 // No new stream, .close was called. Close _target. | |
1468 _closeTarget(); | |
1469 } | |
1470 }, | |
1471 onError: (error) { | |
1472 if (_isBound) { | |
1473 // A new stream takes over - forward errors to that stream. | |
1474 _controllerCompleter.completeError(error); | |
1475 _controllerCompleter = null; | |
1476 _controllerInstance = null; | |
1477 } else { | |
1478 // No new stream. No need to close target, as it have already | |
1479 // failed. | |
1480 _completeDone(error: error); | |
1481 } | |
1482 }); | |
1483 } | |
1484 return _controllerInstance; | |
1485 } | |
1486 } | |
1487 | |
1488 | |
1489 /** | |
1329 * The target of a [Stream.transform] call. | 1490 * The target of a [Stream.transform] call. |
1330 * | 1491 * |
1331 * The [Stream.transform] call will pass itself to this object and then return | 1492 * The [Stream.transform] call will pass itself to this object and then return |
1332 * the resulting stream. | 1493 * the resulting stream. |
1333 * | 1494 * |
1334 * It is good practice to write transformers that can be used multiple times. | 1495 * It is good practice to write transformers that can be used multiple times. |
1335 */ | 1496 */ |
1336 abstract class StreamTransformer<S, T> { | 1497 abstract class StreamTransformer<S, T> { |
1337 | 1498 |
1338 /** | 1499 /** |
(...skipping 143 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
1482 class _ControllerEventSinkWrapper<T> implements EventSink<T> { | 1643 class _ControllerEventSinkWrapper<T> implements EventSink<T> { |
1483 EventSink _sink; | 1644 EventSink _sink; |
1484 _ControllerEventSinkWrapper(this._sink); | 1645 _ControllerEventSinkWrapper(this._sink); |
1485 | 1646 |
1486 void add(T data) { _sink.add(data); } | 1647 void add(T data) { _sink.add(data); } |
1487 void addError(error, [StackTrace stackTrace]) { | 1648 void addError(error, [StackTrace stackTrace]) { |
1488 _sink.addError(error, stackTrace); | 1649 _sink.addError(error, stackTrace); |
1489 } | 1650 } |
1490 void close() { _sink.close(); } | 1651 void close() { _sink.close(); } |
1491 } | 1652 } |
OLD | NEW |