Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(241)

Side by Side Diff: sdk/lib/async/stream.dart

Issue 196423021: Move _StreamSinkImpl from dart:io to dart:async as StreamSinkAdapter. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Fix doc and add test.dart Created 6 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « no previous file | sdk/lib/io/io_sink.dart » ('j') | tests/lib/async/stream_sink_adapter_test.dart » ('J')
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 // ------------------------------------------------------------------- 7 // -------------------------------------------------------------------
8 // Core Stream types 8 // Core Stream types
9 // ------------------------------------------------------------------- 9 // -------------------------------------------------------------------
10 10
(...skipping 1308 matching lines...) Expand 10 before | Expand all | Expand 10 after
1319 * 1319 *
1320 * * The synchronous methods of [EventSink] were called, resulting in an 1320 * * The synchronous methods of [EventSink] were called, resulting in an
1321 * error. If there is no active future (like from an addStream call), the 1321 * error. If there is no active future (like from an addStream call), the
1322 * [done] future will complete with that error 1322 * [done] future will complete with that error
1323 */ 1323 */
1324 Future get done; 1324 Future get done;
1325 } 1325 }
1326 1326
1327 1327
1328 /** 1328 /**
1329 * A [StreamSink] adapter for a [StreamConsumer].
1330 *
1331 * The `StreamSinkAdapter` buffers the input given by all [EventSink] methods
1332 * and will delay an [addStream] until all buffered data has been forwarded to
1333 * the stream consumer.
1334 *
1335 * While the `StreamSinkAdapter` is bound to a stream (through [addStream]) any
1336 * call to the `StreamSinkAdapter` will throw a [StateError], except [done].
1337 * When the [addStream] completes, the `StreamSinkAdapter` will again be open
1338 * to all calls.
1339 *
1340 * If events are added to the `StreamSinkAdapter` after the adapter is closed,
1341 * the events will be ignored.
1342 * Use the [done] future to be notified when the `StreamSinkAdapter` is closed.
1343 */
1344 class StreamSinkAdapter<S> implements StreamSink<S> {
1345 final StreamConsumer<S> _target;
1346 Completer _doneCompleter = new Completer();
1347 Future _doneFuture;
1348 StreamController<S> _controllerInstance;
1349 Completer _controllerCompleter;
1350 bool _isClosed = false;
1351 bool _isBound = false;
1352 bool _hasError = false;
1353
1354 /**
1355 * Construct a new StreamSinkAdapter, from a `target` [StreamConsumer].
1356 */
1357 StreamSinkAdapter(StreamConsumer<S> target) : _target = target {
1358 _doneFuture = _doneCompleter.future;
1359 }
1360
1361 void add(S data) {
1362 if (_isClosed) return;
1363 _controller.add(data);
1364 }
1365
1366 void addError(error, [StackTrace stackTrace]) =>
1367 _controller.addError(error, stackTrace);
1368
1369 Future addStream(Stream<S> stream) {
1370 _checkIsBound();
Lasse Reichstein Nielsen 2014/03/19 13:27:20 Hmm, would probably sounds better as "_checkNotBou
Anders Johnsen 2014/03/19 13:56:39 Done.
1371 _isBound = true;
1372 if (_hasError) return done;
Lasse Reichstein Nielsen 2014/03/19 13:27:20 return _doneFuture; (I generally try to avoid hav
Anders Johnsen 2014/03/19 13:56:39 Done.
1373 // Wait for any sync operations to complete.
1374 Future targetAddStream() {
1375 return _target.addStream(stream)
1376 .whenComplete(() {
1377 _isBound = false;
1378 });
1379 }
1380 if (_controllerInstance == null) return targetAddStream();
1381 var future = _controllerCompleter.future;
1382 _controllerInstance.close();
1383 return future.then((_) => targetAddStream());
1384 }
1385
1386 /**
1387 * Returns a [Future] that completes once all buffered events is accepted by
Lasse Reichstein Nielsen 2014/03/19 13:27:20 is -> has been.
Anders Johnsen 2014/03/19 13:56:39 Done.
1388 * the underlying [StreamConsumer].
1389 *
1390 * It's an error to call this method, while an [addStream] is incomplete.
Lasse Reichstein Nielsen 2014/03/19 13:27:20 -> This method must not be called while an [addStr
Anders Johnsen 2014/03/19 13:56:39 Done.
1391 *
1392 * NOTE: This is not necessarily the same as the data being flushed by the
1393 * operating system.
Lasse Reichstein Nielsen 2014/03/19 13:27:20 Remote this NOTE, keep the next.
Anders Johnsen 2014/03/19 13:56:39 Done.
1394 * NOTE: This method does not guarantee anything except that the stream
1395 * consumer has received all buffered data. It does not guarantee that the
1396 * consumer has acted on the data in any way, or that the data has reached
1397 * its final destination.
1398 */
1399 Future flush() {
1400 _checkIsBound();
1401 if (_controllerInstance == null) return new Future.value(this);
Lasse Reichstein Nielsen 2014/03/19 13:27:20 Why is "this" the value of the future? Is this doc
Anders Johnsen 2014/03/19 13:56:39 Done.
1402 // Adding an empty stream-controller will return a future that will complete
1403 // when all data is done.
1404 _isBound = true;
1405 var future = _controllerCompleter.future;
1406 _controllerInstance.close();
1407 return future.whenComplete(() {
1408 _isBound = false;
1409 });
Lasse Reichstein Nielsen 2014/03/19 13:27:20 Indentation: Unindent the previous two lines by fo
Anders Johnsen 2014/03/19 13:56:39 Done.
1410 }
1411
1412 Future close() {
1413 _checkIsBound();
1414 if (!_isClosed) {
1415 _isClosed = true;
1416 if (_controllerInstance != null) {
1417 _controllerInstance.close();
1418 } else {
1419 _closeTarget();
1420 }
1421 }
1422 return done;
1423 }
1424
1425 Future get done => _doneFuture;
1426
1427 // Private helper methods.
1428
1429 void _closeTarget() {
1430 _target.close()
1431 .then((value) => _completeDone(value: value),
1432 onError: (error) => _completeDone(error: error));
1433 }
1434
1435 void _completeDone({value, error}) {
Lasse Reichstein Nielsen 2014/03/19 13:27:20 This function expects one of two arguments. Make i
Anders Johnsen 2014/03/19 13:56:39 Done.
1436 if (_doneCompleter == null) return;
1437 if (error == null) {
1438 _doneCompleter.complete(value);
1439 } else {
1440 _hasError = true;
1441 _doneCompleter.completeError(error);
1442 }
1443 _doneCompleter = null;
1444 }
1445
1446 StreamController<S> get _controller {
1447 _checkIsBound();
1448 if (_isClosed) {
1449 throw new StateError("StreamSink is closed");
1450 }
1451 if (_controllerInstance == null) {
1452 _controllerInstance = new StreamController<S>(sync: true);
1453 _controllerCompleter = new Completer();
1454 _target.addStream(_controller.stream)
1455 .then(
1456 (_) {
1457 if (_isBound) {
1458 // A new stream takes over - forward values to that stream.
1459 _controllerCompleter.complete(this);
1460 _controllerCompleter = null;
1461 _controllerInstance = null;
1462 } else {
1463 // No new stream, .close was called. Close _target.
1464 _closeTarget();
1465 }
1466 },
1467 onError: (error) {
1468 if (_isBound) {
1469 // A new stream takes over - forward errors to that stream.
1470 _controllerCompleter.completeError(error);
1471 _controllerCompleter = null;
1472 _controllerInstance = null;
1473 } else {
1474 // No new stream. No need to close target, as it have already
1475 // failed.
1476 _completeDone(error: error);
1477 }
1478 });
1479 }
1480 return _controllerInstance;
1481 }
1482
1483 void _checkIsBound() {
1484 if (_isBound) {
1485 throw new StateError("StreamSink is bound to a stream");
Lasse Reichstein Nielsen 2014/03/19 13:27:20 ... is bound to a stream -> ... is processing an a
Anders Johnsen 2014/03/19 13:56:39 Done.
1486 }
1487 }
1488 }
1489
1490
1491 /**
1329 * The target of a [Stream.transform] call. 1492 * The target of a [Stream.transform] call.
1330 * 1493 *
1331 * The [Stream.transform] call will pass itself to this object and then return 1494 * The [Stream.transform] call will pass itself to this object and then return
1332 * the resulting stream. 1495 * the resulting stream.
1333 * 1496 *
1334 * It is good practice to write transformers that can be used multiple times. 1497 * It is good practice to write transformers that can be used multiple times.
1335 */ 1498 */
1336 abstract class StreamTransformer<S, T> { 1499 abstract class StreamTransformer<S, T> {
1337 1500
1338 /** 1501 /**
(...skipping 143 matching lines...) Expand 10 before | Expand all | Expand 10 after
1482 class _ControllerEventSinkWrapper<T> implements EventSink<T> { 1645 class _ControllerEventSinkWrapper<T> implements EventSink<T> {
1483 EventSink _sink; 1646 EventSink _sink;
1484 _ControllerEventSinkWrapper(this._sink); 1647 _ControllerEventSinkWrapper(this._sink);
1485 1648
1486 void add(T data) { _sink.add(data); } 1649 void add(T data) { _sink.add(data); }
1487 void addError(error, [StackTrace stackTrace]) { 1650 void addError(error, [StackTrace stackTrace]) {
1488 _sink.addError(error, stackTrace); 1651 _sink.addError(error, stackTrace);
1489 } 1652 }
1490 void close() { _sink.close(); } 1653 void close() { _sink.close(); }
1491 } 1654 }
OLDNEW
« no previous file with comments | « no previous file | sdk/lib/io/io_sink.dart » ('j') | tests/lib/async/stream_sink_adapter_test.dart » ('J')

Powered by Google App Engine
This is Rietveld 408576698