Index: lib/src/copy/io_sink.dart |
diff --git a/lib/src/copy/io_sink.dart b/lib/src/copy/io_sink.dart |
index 0578bdb03babbde31fdec30f9b96f6c8d4caea6e..34abcad2a2e5c9491427d177ebff1c4e1ae7397e 100644 |
--- a/lib/src/copy/io_sink.dart |
+++ b/lib/src/copy/io_sink.dart |
@@ -9,22 +9,20 @@ |
// desired public API and to remove "dart:io" dependencies have been made. |
// |
// This is up-to-date as of sdk revision |
-// 86227840d75d974feb238f8b3c59c038b99c05cf. |
+// e41fb4cafd6052157dbc1490d437045240f4773f. |
+ |
import 'dart:async'; |
class StreamSinkImpl<T> implements StreamSink<T> { |
final StreamConsumer<T> _target; |
- Completer _doneCompleter = new Completer(); |
- Future _doneFuture; |
+ final Completer _doneCompleter = new Completer(); |
StreamController<T> _controllerInstance; |
Completer _controllerCompleter; |
bool _isClosed = false; |
bool _isBound = false; |
bool _hasError = false; |
- StreamSinkImpl(this._target) { |
- _doneFuture = _doneCompleter.future; |
- } |
+ StreamSinkImpl(this._target); |
void add(T data) { |
if (_isClosed) return; |
@@ -65,8 +63,8 @@ class StreamSinkImpl<T> implements StreamSink<T> { |
var future = _controllerCompleter.future; |
_controllerInstance.close(); |
return future.whenComplete(() { |
- _isBound = false; |
- }); |
+ _isBound = false; |
+ }); |
} |
Future close() { |
@@ -88,19 +86,19 @@ class StreamSinkImpl<T> implements StreamSink<T> { |
_target.close().then(_completeDoneValue, onError: _completeDoneError); |
} |
- Future get done => _doneFuture; |
+ Future get done => _doneCompleter.future; |
void _completeDoneValue(value) { |
- if (_doneCompleter == null) return; |
- _doneCompleter.complete(value); |
- _doneCompleter = null; |
+ if (!_doneCompleter.isCompleted) { |
+ _doneCompleter.complete(value); |
+ } |
} |
void _completeDoneError(error, StackTrace stackTrace) { |
- if (_doneCompleter == null) return; |
- _hasError = true; |
- _doneCompleter.completeError(error, stackTrace); |
- _doneCompleter = null; |
+ if (!_doneCompleter.isCompleted) { |
+ _hasError = true; |
+ _doneCompleter.completeError(error, stackTrace); |
+ } |
} |
StreamController<T> get _controller { |
@@ -113,33 +111,29 @@ class StreamSinkImpl<T> implements StreamSink<T> { |
if (_controllerInstance == null) { |
_controllerInstance = new StreamController<T>(sync: true); |
_controllerCompleter = new Completer(); |
- _target.addStream(_controller.stream) |
- .then( |
- (_) { |
- if (_isBound) { |
- // A new stream takes over - forward values to that stream. |
- _controllerCompleter.complete(this); |
- _controllerCompleter = null; |
- _controllerInstance = null; |
- } else { |
- // No new stream, .close was called. Close _target. |
- _closeTarget(); |
- } |
- }, |
- onError: (error, stackTrace) { |
- if (_isBound) { |
- // A new stream takes over - forward errors to that stream. |
- _controllerCompleter.completeError(error, stackTrace); |
- _controllerCompleter = null; |
- _controllerInstance = null; |
- } else { |
- // No new stream. No need to close target, as it have already |
- // failed. |
- _completeDoneError(error, stackTrace); |
- } |
- }); |
- } |
+ _target.addStream(_controller.stream).then((_) { |
+ if (_isBound) { |
+ // A new stream takes over - forward values to that stream. |
+ _controllerCompleter.complete(this); |
+ _controllerCompleter = null; |
+ _controllerInstance = null; |
+ } else { |
+ // No new stream, .close was called. Close _target. |
+ _closeTarget(); |
+ } |
+ }, onError: (error, stackTrace) { |
+ if (_isBound) { |
+ // A new stream takes over - forward errors to that stream. |
+ _controllerCompleter.completeError(error, stackTrace); |
+ _controllerCompleter = null; |
+ _controllerInstance = null; |
+ } else { |
+ // No new stream. No need to close target, as it has already |
+ // failed. |
+ _completeDoneError(error, stackTrace); |
+ } |
+ }); |
+ } |
return _controllerInstance; |
} |
} |
- |