Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(237)

Side by Side Diff: sdk/lib/async/stream.dart

Issue 212753003: Revert accidental parts of "Speed up toLowerCase, by manually inlining the upper-case part." (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Created 6 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « no previous file | sdk/lib/io/io_sink.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 // ------------------------------------------------------------------- 7 // -------------------------------------------------------------------
8 // Core Stream types 8 // Core Stream types
9 // ------------------------------------------------------------------- 9 // -------------------------------------------------------------------
10 10
(...skipping 1308 matching lines...) Expand 10 before | Expand all | Expand 10 after
1319 * 1319 *
1320 * * The synchronous methods of [EventSink] were called, resulting in an 1320 * * The synchronous methods of [EventSink] were called, resulting in an
1321 * error. If there is no active future (like from an addStream call), the 1321 * error. If there is no active future (like from an addStream call), the
1322 * [done] future will complete with that error 1322 * [done] future will complete with that error
1323 */ 1323 */
1324 Future get done; 1324 Future get done;
1325 } 1325 }
1326 1326
1327 1327
1328 /** 1328 /**
1329 * A [StreamSink] adapter for a [StreamConsumer].
1330 *
1331 * The `StreamSinkAdapter` buffers the input given by all [EventSink] methods
1332 * and will delay an [addStream] until all buffered data has been forwarded to
1333 * the stream consumer.
1334 *
1335 * While the `StreamSinkAdapter` is bound to a stream (through [addStream]) any
1336 * call to the `StreamSinkAdapter` will throw a [StateError], except [done].
1337 * When the [addStream] completes, the `StreamSinkAdapter` will again be open
1338 * to all calls.
1339 *
1340 * If events are added to the `StreamSinkAdapter` after the adapter is closed,
1341 * the events will be ignored.
1342 * Use the [done] future to be notified when the `StreamSinkAdapter` is closed.
1343 */
1344 class StreamSinkAdapter<S> implements StreamSink<S> {
1345 final StreamConsumer<S> _target;
1346 Completer _doneCompleter = new Completer();
1347 Future _doneFuture;
1348 StreamController<S> _controllerInstance;
1349 Completer _controllerCompleter;
1350 bool _isClosed = false;
1351 bool _isBound = false;
1352 bool _hasError = false;
1353
1354 /**
1355 * Construct a new StreamSinkAdapter, from a `target` [StreamConsumer].
1356 */
1357 StreamSinkAdapter(StreamConsumer<S> target) : _target = target {
1358 _doneFuture = _doneCompleter.future;
1359 }
1360
1361 void add(S data) {
1362 if (_isClosed) return;
1363 _controller.add(data);
1364 }
1365
1366 void addError(error, [StackTrace stackTrace]) =>
1367 _controller.addError(error, stackTrace);
1368
1369 Future addStream(Stream<S> stream) {
1370 _checkNotBound();
1371 _isBound = true;
1372 if (_hasError) return _doneFuture;
1373 // Wait for any sync operations to complete.
1374 Future targetAddStream() {
1375 return _target.addStream(stream)
1376 .whenComplete(() {
1377 _isBound = false;
1378 });
1379 }
1380 if (_controllerInstance == null) return targetAddStream();
1381 var future = _controllerCompleter.future;
1382 _controllerInstance.close();
1383 return future.then((_) => targetAddStream());
1384 }
1385
1386 /**
1387 * Returns a [Future] that completes once all buffered events has been
1388 * accepted by the underlying [StreamConsumer].
1389 *
1390 * The [Future] will complete with the value `this`.
1391 *
1392 * This method must not be called while an [addStream] is in progress.
1393 *
1394 * NOTE: This method does not guarantee anything except that the stream
1395 * consumer has received all buffered data. It does not guarantee that the
1396 * consumer has acted on the data in any way, or that the data has reached
1397 * its final destination.
1398 */
1399 Future flush() {
1400 _checkNotBound();
1401 if (_controllerInstance == null) return new Future.value(this);
1402 // Adding an empty stream-controller will return a future that will complete
1403 // when all data is done.
1404 _isBound = true;
1405 var future = _controllerCompleter.future;
1406 _controllerInstance.close();
1407 return future.whenComplete(() {
1408 _isBound = false;
1409 });
1410 }
1411
1412 Future close() {
1413 _checkNotBound();
1414 if (!_isClosed) {
1415 _isClosed = true;
1416 if (_controllerInstance != null) {
1417 _controllerInstance.close();
1418 } else {
1419 _closeTarget();
1420 }
1421 }
1422 return _doneFuture;
1423 }
1424
1425 Future get done => _doneFuture;
1426
1427 // Private helper methods.
1428
1429 void _closeTarget() {
1430 _target.close() .then(_completeDoneValue, onError: _completeDoneError);
1431 }
1432
1433 void _completeDoneValue(value) {
1434 if (_doneCompleter == null) return;
1435 _doneCompleter.complete(value);
1436 _doneCompleter = null;
1437 }
1438
1439 void _completeDoneError(error, stackTrace) {
1440 if (_doneCompleter == null) return;
1441 _hasError = true;
1442 _doneCompleter.completeError(error, stackTrace);
1443 _doneCompleter = null;
1444 }
1445
1446 StreamController<S> get _controller {
1447 _checkNotBound();
1448 if (_isClosed) {
1449 throw new StateError("StreamSink is closed");
1450 }
1451 if (_controllerInstance == null) {
1452 _controllerInstance = new StreamController<S>(sync: true);
1453 _controllerCompleter = new Completer();
1454 _target.addStream(_controller.stream)
1455 .then(
1456 (_) {
1457 if (_isBound) {
1458 // A new stream takes over - forward values to that stream.
1459 _controllerCompleter.complete(this);
1460 _controllerCompleter = null;
1461 _controllerInstance = null;
1462 } else {
1463 // No new stream, .close was called. Close _target.
1464 _closeTarget();
1465 }
1466 },
1467 onError: (error, stackTrace) {
1468 if (_isBound) {
1469 // A new stream takes over - forward errors to that stream.
1470 _controllerCompleter.completeError(error, stackTrace);
1471 _controllerCompleter = null;
1472 _controllerInstance = null;
1473 } else {
1474 // No new stream. No need to close target, as it have already
1475 // failed.
1476 _completeDoneError(error, stackTrace);
1477 }
1478 });
1479 }
1480 return _controllerInstance;
1481 }
1482
1483 void _checkNotBound() {
1484 if (_isBound) {
1485 throw new StateError("StreamSink is processing an addStream call");
1486 }
1487 }
1488 }
1489
1490
1491 /**
1492 * The target of a [Stream.transform] call. 1329 * The target of a [Stream.transform] call.
1493 * 1330 *
1494 * The [Stream.transform] call will pass itself to this object and then return 1331 * The [Stream.transform] call will pass itself to this object and then return
1495 * the resulting stream. 1332 * the resulting stream.
1496 * 1333 *
1497 * It is good practice to write transformers that can be used multiple times. 1334 * It is good practice to write transformers that can be used multiple times.
1498 */ 1335 */
1499 abstract class StreamTransformer<S, T> { 1336 abstract class StreamTransformer<S, T> {
1500 1337
1501 /** 1338 /**
(...skipping 143 matching lines...) Expand 10 before | Expand all | Expand 10 after
1645 class _ControllerEventSinkWrapper<T> implements EventSink<T> { 1482 class _ControllerEventSinkWrapper<T> implements EventSink<T> {
1646 EventSink _sink; 1483 EventSink _sink;
1647 _ControllerEventSinkWrapper(this._sink); 1484 _ControllerEventSinkWrapper(this._sink);
1648 1485
1649 void add(T data) { _sink.add(data); } 1486 void add(T data) { _sink.add(data); }
1650 void addError(error, [StackTrace stackTrace]) { 1487 void addError(error, [StackTrace stackTrace]) {
1651 _sink.addError(error, stackTrace); 1488 _sink.addError(error, stackTrace);
1652 } 1489 }
1653 void close() { _sink.close(); } 1490 void close() { _sink.close(); }
1654 } 1491 }
OLDNEW
« no previous file with comments | « no previous file | sdk/lib/io/io_sink.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698