Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 part of dart.async; | 5 part of dart.async; |
| 6 | 6 |
| 7 // ------------------------------------------------------------------- | 7 // ------------------------------------------------------------------- |
| 8 // Core Stream types | 8 // Core Stream types |
| 9 // ------------------------------------------------------------------- | 9 // ------------------------------------------------------------------- |
| 10 | 10 |
| (...skipping 1308 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 1319 * | 1319 * |
| 1320 * * The synchronous methods of [EventSink] were called, resulting in an | 1320 * * The synchronous methods of [EventSink] were called, resulting in an |
| 1321 * error. If there is no active future (like from an addStream call), the | 1321 * error. If there is no active future (like from an addStream call), the |
| 1322 * [done] future will complete with that error | 1322 * [done] future will complete with that error |
| 1323 */ | 1323 */ |
| 1324 Future get done; | 1324 Future get done; |
| 1325 } | 1325 } |
| 1326 | 1326 |
| 1327 | 1327 |
| 1328 /** | 1328 /** |
| 1329 * A [StreamSink] adapter for a [StreamConsumer]. | |
| 1330 * | |
| 1331 * The `StreamSinkAdapter` buffers the input given by all [EventSink] methods | |
| 1332 * and will delay an [addStream] until all buffered data has been forwarded to | |
| 1333 * the stream consumer. | |
| 1334 * | |
| 1335 * While the `StreamSinkAdapter` is bound to a stream (through [addStream]) any | |
| 1336 * call to the `StreamSinkAdapter` will throw a [StateError], except [done]. | |
| 1337 * When the [addStream] completes, the `StreamSinkAdapter` will again be open | |
| 1338 * to all calls. | |
| 1339 * | |
| 1340 * If events are added to the `StreamSinkAdapter` after the adapter is closed, | |
| 1341 * the events will be ignored. | |
| 1342 * Use the [done] future to be notified when the `StreamSinkAdapter` is closed. | |
| 1343 */ | |
| 1344 class StreamSinkAdapter<S> implements StreamSink<S> { | |
| 1345 final StreamConsumer<S> _target; | |
| 1346 Completer _doneCompleter = new Completer(); | |
| 1347 Future _doneFuture; | |
| 1348 StreamController<S> _controllerInstance; | |
| 1349 Completer _controllerCompleter; | |
| 1350 bool _isClosed = false; | |
| 1351 bool _isBound = false; | |
| 1352 bool _hasError = false; | |
| 1353 | |
| 1354 /** | |
| 1355 * Construct a new StreamSinkAdapter, from a `target` [StreamConsumer]. | |
| 1356 */ | |
| 1357 StreamSinkAdapter(StreamConsumer<S> target) : _target = target { | |
| 1358 _doneFuture = _doneCompleter.future; | |
| 1359 } | |
| 1360 | |
| 1361 void add(S data) { | |
| 1362 if (_isClosed) return; | |
| 1363 _controller.add(data); | |
| 1364 } | |
| 1365 | |
| 1366 void addError(error, [StackTrace stackTrace]) => | |
| 1367 _controller.addError(error, stackTrace); | |
| 1368 | |
| 1369 Future addStream(Stream<S> stream) { | |
| 1370 _checkIsBound(); | |
|
Lasse Reichstein Nielsen
2014/03/19 13:27:20
Hmm, would probably sounds better as "_checkNotBou
Anders Johnsen
2014/03/19 13:56:39
Done.
| |
| 1371 _isBound = true; | |
| 1372 if (_hasError) return done; | |
|
Lasse Reichstein Nielsen
2014/03/19 13:27:20
return _doneFuture;
(I generally try to avoid hav
Anders Johnsen
2014/03/19 13:56:39
Done.
| |
| 1373 // Wait for any sync operations to complete. | |
| 1374 Future targetAddStream() { | |
| 1375 return _target.addStream(stream) | |
| 1376 .whenComplete(() { | |
| 1377 _isBound = false; | |
| 1378 }); | |
| 1379 } | |
| 1380 if (_controllerInstance == null) return targetAddStream(); | |
| 1381 var future = _controllerCompleter.future; | |
| 1382 _controllerInstance.close(); | |
| 1383 return future.then((_) => targetAddStream()); | |
| 1384 } | |
| 1385 | |
| 1386 /** | |
| 1387 * Returns a [Future] that completes once all buffered events is accepted by | |
|
Lasse Reichstein Nielsen
2014/03/19 13:27:20
is -> has been.
Anders Johnsen
2014/03/19 13:56:39
Done.
| |
| 1388 * the underlying [StreamConsumer]. | |
| 1389 * | |
| 1390 * It's an error to call this method, while an [addStream] is incomplete. | |
|
Lasse Reichstein Nielsen
2014/03/19 13:27:20
-> This method must not be called while an [addStr
Anders Johnsen
2014/03/19 13:56:39
Done.
| |
| 1391 * | |
| 1392 * NOTE: This is not necessarily the same as the data being flushed by the | |
| 1393 * operating system. | |
|
Lasse Reichstein Nielsen
2014/03/19 13:27:20
Remote this NOTE, keep the next.
Anders Johnsen
2014/03/19 13:56:39
Done.
| |
| 1394 * NOTE: This method does not guarantee anything except that the stream | |
| 1395 * consumer has received all buffered data. It does not guarantee that the | |
| 1396 * consumer has acted on the data in any way, or that the data has reached | |
| 1397 * its final destination. | |
| 1398 */ | |
| 1399 Future flush() { | |
| 1400 _checkIsBound(); | |
| 1401 if (_controllerInstance == null) return new Future.value(this); | |
|
Lasse Reichstein Nielsen
2014/03/19 13:27:20
Why is "this" the value of the future? Is this doc
Anders Johnsen
2014/03/19 13:56:39
Done.
| |
| 1402 // Adding an empty stream-controller will return a future that will complete | |
| 1403 // when all data is done. | |
| 1404 _isBound = true; | |
| 1405 var future = _controllerCompleter.future; | |
| 1406 _controllerInstance.close(); | |
| 1407 return future.whenComplete(() { | |
| 1408 _isBound = false; | |
| 1409 }); | |
|
Lasse Reichstein Nielsen
2014/03/19 13:27:20
Indentation: Unindent the previous two lines by fo
Anders Johnsen
2014/03/19 13:56:39
Done.
| |
| 1410 } | |
| 1411 | |
| 1412 Future close() { | |
| 1413 _checkIsBound(); | |
| 1414 if (!_isClosed) { | |
| 1415 _isClosed = true; | |
| 1416 if (_controllerInstance != null) { | |
| 1417 _controllerInstance.close(); | |
| 1418 } else { | |
| 1419 _closeTarget(); | |
| 1420 } | |
| 1421 } | |
| 1422 return done; | |
| 1423 } | |
| 1424 | |
| 1425 Future get done => _doneFuture; | |
| 1426 | |
| 1427 // Private helper methods. | |
| 1428 | |
| 1429 void _closeTarget() { | |
| 1430 _target.close() | |
| 1431 .then((value) => _completeDone(value: value), | |
| 1432 onError: (error) => _completeDone(error: error)); | |
| 1433 } | |
| 1434 | |
| 1435 void _completeDone({value, error}) { | |
|
Lasse Reichstein Nielsen
2014/03/19 13:27:20
This function expects one of two arguments.
Make i
Anders Johnsen
2014/03/19 13:56:39
Done.
| |
| 1436 if (_doneCompleter == null) return; | |
| 1437 if (error == null) { | |
| 1438 _doneCompleter.complete(value); | |
| 1439 } else { | |
| 1440 _hasError = true; | |
| 1441 _doneCompleter.completeError(error); | |
| 1442 } | |
| 1443 _doneCompleter = null; | |
| 1444 } | |
| 1445 | |
| 1446 StreamController<S> get _controller { | |
| 1447 _checkIsBound(); | |
| 1448 if (_isClosed) { | |
| 1449 throw new StateError("StreamSink is closed"); | |
| 1450 } | |
| 1451 if (_controllerInstance == null) { | |
| 1452 _controllerInstance = new StreamController<S>(sync: true); | |
| 1453 _controllerCompleter = new Completer(); | |
| 1454 _target.addStream(_controller.stream) | |
| 1455 .then( | |
| 1456 (_) { | |
| 1457 if (_isBound) { | |
| 1458 // A new stream takes over - forward values to that stream. | |
| 1459 _controllerCompleter.complete(this); | |
| 1460 _controllerCompleter = null; | |
| 1461 _controllerInstance = null; | |
| 1462 } else { | |
| 1463 // No new stream, .close was called. Close _target. | |
| 1464 _closeTarget(); | |
| 1465 } | |
| 1466 }, | |
| 1467 onError: (error) { | |
| 1468 if (_isBound) { | |
| 1469 // A new stream takes over - forward errors to that stream. | |
| 1470 _controllerCompleter.completeError(error); | |
| 1471 _controllerCompleter = null; | |
| 1472 _controllerInstance = null; | |
| 1473 } else { | |
| 1474 // No new stream. No need to close target, as it have already | |
| 1475 // failed. | |
| 1476 _completeDone(error: error); | |
| 1477 } | |
| 1478 }); | |
| 1479 } | |
| 1480 return _controllerInstance; | |
| 1481 } | |
| 1482 | |
| 1483 void _checkIsBound() { | |
| 1484 if (_isBound) { | |
| 1485 throw new StateError("StreamSink is bound to a stream"); | |
|
Lasse Reichstein Nielsen
2014/03/19 13:27:20
... is bound to a stream -> ... is processing an a
Anders Johnsen
2014/03/19 13:56:39
Done.
| |
| 1486 } | |
| 1487 } | |
| 1488 } | |
| 1489 | |
| 1490 | |
| 1491 /** | |
| 1329 * The target of a [Stream.transform] call. | 1492 * The target of a [Stream.transform] call. |
| 1330 * | 1493 * |
| 1331 * The [Stream.transform] call will pass itself to this object and then return | 1494 * The [Stream.transform] call will pass itself to this object and then return |
| 1332 * the resulting stream. | 1495 * the resulting stream. |
| 1333 * | 1496 * |
| 1334 * It is good practice to write transformers that can be used multiple times. | 1497 * It is good practice to write transformers that can be used multiple times. |
| 1335 */ | 1498 */ |
| 1336 abstract class StreamTransformer<S, T> { | 1499 abstract class StreamTransformer<S, T> { |
| 1337 | 1500 |
| 1338 /** | 1501 /** |
| (...skipping 143 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 1482 class _ControllerEventSinkWrapper<T> implements EventSink<T> { | 1645 class _ControllerEventSinkWrapper<T> implements EventSink<T> { |
| 1483 EventSink _sink; | 1646 EventSink _sink; |
| 1484 _ControllerEventSinkWrapper(this._sink); | 1647 _ControllerEventSinkWrapper(this._sink); |
| 1485 | 1648 |
| 1486 void add(T data) { _sink.add(data); } | 1649 void add(T data) { _sink.add(data); } |
| 1487 void addError(error, [StackTrace stackTrace]) { | 1650 void addError(error, [StackTrace stackTrace]) { |
| 1488 _sink.addError(error, stackTrace); | 1651 _sink.addError(error, stackTrace); |
| 1489 } | 1652 } |
| 1490 void close() { _sink.close(); } | 1653 void close() { _sink.close(); } |
| 1491 } | 1654 } |
| OLD | NEW |