Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(686)

Side by Side Diff: sdk/lib/async/stream.dart

Issue 196423021: Move _StreamSinkImpl from dart:io to dart:async as StreamSinkAdapter. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Created 6 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « no previous file | sdk/lib/io/io_sink.dart » ('j') | sdk/lib/io/io_sink.dart » ('J')
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 // ------------------------------------------------------------------- 7 // -------------------------------------------------------------------
8 // Core Stream types 8 // Core Stream types
9 // ------------------------------------------------------------------- 9 // -------------------------------------------------------------------
10 10
(...skipping 1308 matching lines...) Expand 10 before | Expand all | Expand 10 after
1319 * 1319 *
1320 * * The synchronous methods of [EventSink] were called, resulting in an 1320 * * The synchronous methods of [EventSink] were called, resulting in an
1321 * error. If there is no active future (like from an addStream call), the 1321 * error. If there is no active future (like from an addStream call), the
1322 * [done] future will complete with that error 1322 * [done] future will complete with that error
1323 */ 1323 */
1324 Future get done; 1324 Future get done;
1325 } 1325 }
1326 1326
1327 1327
1328 /** 1328 /**
1329 * A `StreamSinkAdapter` is an adapter implementation that can be used on a
Lasse Reichstein Nielsen 2014/03/18 15:34:45 Too long first line. How about: An adapter that
Anders Johnsen 2014/03/19 11:48:55 Done.
1330 * [StreamConsumer], to facilitate the functionality of a [StreamSink],
1331 *
1332 * The `StreamSinkAdapter` buffers the input given by all [EventSink] methods
1333 * and will delay an [addStream] until the buffer is flushed.
Lasse Reichstein Nielsen 2014/03/18 15:34:45 the buffer -> that buffer This sounds like nothin
Anders Johnsen 2014/03/19 11:48:55 Done.
1334 *
1335 * When the `StreamSinkAdapter` is bound to a stream (through [addStream]) any
Lasse Reichstein Nielsen 2014/03/18 15:34:45 When -> While
Anders Johnsen 2014/03/19 11:48:55 Done.
1336 * call to the `StreamSinkAdapter` will throw a [StateError].
Lasse Reichstein Nielsen 2014/03/18 15:34:45 Is this all calls? What about "done" (which is a g
Anders Johnsen 2014/03/19 11:48:55 Yes, only done... :)
1337 * When the [addStream] completes, the `StreamSinkAdapter` will again be open
1338 * to all calls.
1339 *
1340 * If events are added to the `StreamSinkAdapter` after the adapter is closed,
1341 * the events will be ignored.
1342 * Use the [done] future to be notified when the `StreamSinkAdapter` is closed.
1343 */
1344 class StreamSinkAdapter<S> implements StreamSink<S> {
1345 final StreamConsumer<S> _target;
1346 Completer _doneCompleter = new Completer();
1347 Future _doneFuture;
1348 StreamController<S> _controllerInstance;
1349 Completer _controllerCompleter;
1350 bool _isClosed = false;
1351 bool _isBound = false;
1352 bool _hasError = false;
1353
1354 /**
1355 * Construct a new StreamSinkAdapter, from a `target` [StreamConsumer].
1356 */
1357 StreamSinkAdapter(StreamConsumer<S> target) : _target = target {
1358 _doneFuture = _doneCompleter.future;
1359 }
1360
1361 void add(S data) {
Lasse Reichstein Nielsen 2014/03/18 15:34:45 No _isBound check?
Anders Johnsen 2014/03/19 11:48:55 Is in _controller getter.
1362 if (_isClosed) return;
1363 _controller.add(data);
1364 }
1365
1366 void addError(error, [StackTrace stackTrace]) =>
1367 _controller.addError(error, stackTrace);
Lasse Reichstein Nielsen 2014/03/18 15:34:45 No _isBound check?
Anders Johnsen 2014/03/19 11:48:55 Is in _controller getter.
1368
1369 Future addStream(Stream<S> stream) {
1370 if (_isBound) {
1371 throw new StateError("StreamSink is already bound to a stream");
1372 }
1373 _isBound = true;
1374 if (_hasError) return done;
1375 // Wait for any sync operations to complete.
1376 Future targetAddStream() {
1377 return _target.addStream(stream)
1378 .whenComplete(() {
1379 _isBound = false;
1380 });
1381 }
1382 if (_controllerInstance == null) return targetAddStream();
1383 var future = _controllerCompleter.future;
1384 _controllerInstance.close();
1385 return future.then((_) => targetAddStream());
1386 }
1387
1388 /**
1389 * Returns a [Future] that completes once all buffered events is accepted by
1390 * the underlying [StreamConsumer].
1391 *
1392 * It's an error to call this method, while an [addStream] is incomplete.
1393 *
1394 * NOTE: This is not necessarily the same as the data being flushed by the
Lasse Reichstein Nielsen 2014/03/18 15:34:45 This note is confusion. It says what this call isn
Anders Johnsen 2014/03/19 11:48:55 Done.
1395 * operating system.
1396 */
1397 Future flush() {
1398 if (_isBound) {
Lasse Reichstein Nielsen 2014/03/18 15:34:45 Consider having a helper function for this check,
Anders Johnsen 2014/03/19 11:48:55 Done.
1399 throw new StateError("StreamSink is bound to a stream");
1400 }
1401 if (_controllerInstance == null) return new Future.value(this);
1402 // Adding an empty stream-controller will return a future that will complete
1403 // when all data is done.
1404 _isBound = true;
1405 var future = _controllerCompleter.future;
1406 _controllerInstance.close();
1407 return future.whenComplete(() {
1408 _isBound = false;
1409 });
1410 }
1411
1412 Future close() {
1413 if (_isBound) {
1414 throw new StateError("StreamSink is bound to a stream");
1415 }
1416 if (!_isClosed) {
1417 _isClosed = true;
1418 if (_controllerInstance != null) {
1419 _controllerInstance.close();
1420 } else {
1421 _closeTarget();
1422 }
1423 }
1424 return done;
1425 }
1426
1427 Future get done => _doneFuture;
1428
1429 // Private helper methods.
1430
1431 void _closeTarget() {
1432 _target.close()
1433 .then((value) => _completeDone(value: value),
1434 onError: (error) => _completeDone(error: error));
1435 }
1436
1437 void _completeDone({value, error}) {
1438 if (_doneCompleter == null) return;
1439 if (error == null) {
1440 _doneCompleter.complete(value);
1441 } else {
1442 _hasError = true;
1443 _doneCompleter.completeError(error);
1444 }
1445 _doneCompleter = null;
1446 }
1447
1448 StreamController<S> get _controller {
1449 if (_isBound) {
1450 throw new StateError("StreamSink is bound to a stream");
1451 }
1452 if (_isClosed) {
1453 throw new StateError("StreamSink is closed");
1454 }
1455 if (_controllerInstance == null) {
1456 _controllerInstance = new StreamController<S>(sync: true);
1457 _controllerCompleter = new Completer();
1458 _target.addStream(_controller.stream)
1459 .then(
1460 (_) {
1461 if (_isBound) {
1462 // A new stream takes over - forward values to that stream.
1463 _controllerCompleter.complete(this);
1464 _controllerCompleter = null;
1465 _controllerInstance = null;
1466 } else {
1467 // No new stream, .close was called. Close _target.
1468 _closeTarget();
1469 }
1470 },
1471 onError: (error) {
1472 if (_isBound) {
1473 // A new stream takes over - forward errors to that stream.
1474 _controllerCompleter.completeError(error);
1475 _controllerCompleter = null;
1476 _controllerInstance = null;
1477 } else {
1478 // No new stream. No need to close target, as it have already
1479 // failed.
1480 _completeDone(error: error);
1481 }
1482 });
1483 }
1484 return _controllerInstance;
1485 }
1486 }
1487
1488
1489 /**
1329 * The target of a [Stream.transform] call. 1490 * The target of a [Stream.transform] call.
1330 * 1491 *
1331 * The [Stream.transform] call will pass itself to this object and then return 1492 * The [Stream.transform] call will pass itself to this object and then return
1332 * the resulting stream. 1493 * the resulting stream.
1333 * 1494 *
1334 * It is good practice to write transformers that can be used multiple times. 1495 * It is good practice to write transformers that can be used multiple times.
1335 */ 1496 */
1336 abstract class StreamTransformer<S, T> { 1497 abstract class StreamTransformer<S, T> {
1337 1498
1338 /** 1499 /**
(...skipping 143 matching lines...) Expand 10 before | Expand all | Expand 10 after
1482 class _ControllerEventSinkWrapper<T> implements EventSink<T> { 1643 class _ControllerEventSinkWrapper<T> implements EventSink<T> {
1483 EventSink _sink; 1644 EventSink _sink;
1484 _ControllerEventSinkWrapper(this._sink); 1645 _ControllerEventSinkWrapper(this._sink);
1485 1646
1486 void add(T data) { _sink.add(data); } 1647 void add(T data) { _sink.add(data); }
1487 void addError(error, [StackTrace stackTrace]) { 1648 void addError(error, [StackTrace stackTrace]) {
1488 _sink.addError(error, stackTrace); 1649 _sink.addError(error, stackTrace);
1489 } 1650 }
1490 void close() { _sink.close(); } 1651 void close() { _sink.close(); }
1491 } 1652 }
OLDNEW
« no previous file with comments | « no previous file | sdk/lib/io/io_sink.dart » ('j') | sdk/lib/io/io_sink.dart » ('J')

Powered by Google App Engine
This is Rietveld 408576698