Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(289)

Unified Diff: lib/src/copy/io_sink.dart

Issue 1947683006: Bring in the latest version of the SDK's WebSocket impl. (Closed) Base URL: git@github.com:dart-lang/web_socket_channel.git@master
Patch Set: Merge again Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « lib/src/copy/bytes_builder.dart ('k') | lib/src/copy/web_socket.dart » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: lib/src/copy/io_sink.dart
diff --git a/lib/src/copy/io_sink.dart b/lib/src/copy/io_sink.dart
index 0578bdb03babbde31fdec30f9b96f6c8d4caea6e..34abcad2a2e5c9491427d177ebff1c4e1ae7397e 100644
--- a/lib/src/copy/io_sink.dart
+++ b/lib/src/copy/io_sink.dart
@@ -9,22 +9,20 @@
// desired public API and to remove "dart:io" dependencies have been made.
//
// This is up-to-date as of sdk revision
-// 86227840d75d974feb238f8b3c59c038b99c05cf.
+// e41fb4cafd6052157dbc1490d437045240f4773f.
+
import 'dart:async';
class StreamSinkImpl<T> implements StreamSink<T> {
final StreamConsumer<T> _target;
- Completer _doneCompleter = new Completer();
- Future _doneFuture;
+ final Completer _doneCompleter = new Completer();
StreamController<T> _controllerInstance;
Completer _controllerCompleter;
bool _isClosed = false;
bool _isBound = false;
bool _hasError = false;
- StreamSinkImpl(this._target) {
- _doneFuture = _doneCompleter.future;
- }
+ StreamSinkImpl(this._target);
void add(T data) {
if (_isClosed) return;
@@ -65,8 +63,8 @@ class StreamSinkImpl<T> implements StreamSink<T> {
var future = _controllerCompleter.future;
_controllerInstance.close();
return future.whenComplete(() {
- _isBound = false;
- });
+ _isBound = false;
+ });
}
Future close() {
@@ -88,19 +86,19 @@ class StreamSinkImpl<T> implements StreamSink<T> {
_target.close().then(_completeDoneValue, onError: _completeDoneError);
}
- Future get done => _doneFuture;
+ Future get done => _doneCompleter.future;
void _completeDoneValue(value) {
- if (_doneCompleter == null) return;
- _doneCompleter.complete(value);
- _doneCompleter = null;
+ if (!_doneCompleter.isCompleted) {
+ _doneCompleter.complete(value);
+ }
}
void _completeDoneError(error, StackTrace stackTrace) {
- if (_doneCompleter == null) return;
- _hasError = true;
- _doneCompleter.completeError(error, stackTrace);
- _doneCompleter = null;
+ if (!_doneCompleter.isCompleted) {
+ _hasError = true;
+ _doneCompleter.completeError(error, stackTrace);
+ }
}
StreamController<T> get _controller {
@@ -113,33 +111,29 @@ class StreamSinkImpl<T> implements StreamSink<T> {
if (_controllerInstance == null) {
_controllerInstance = new StreamController<T>(sync: true);
_controllerCompleter = new Completer();
- _target.addStream(_controller.stream)
- .then(
- (_) {
- if (_isBound) {
- // A new stream takes over - forward values to that stream.
- _controllerCompleter.complete(this);
- _controllerCompleter = null;
- _controllerInstance = null;
- } else {
- // No new stream, .close was called. Close _target.
- _closeTarget();
- }
- },
- onError: (error, stackTrace) {
- if (_isBound) {
- // A new stream takes over - forward errors to that stream.
- _controllerCompleter.completeError(error, stackTrace);
- _controllerCompleter = null;
- _controllerInstance = null;
- } else {
- // No new stream. No need to close target, as it have already
- // failed.
- _completeDoneError(error, stackTrace);
- }
- });
- }
+ _target.addStream(_controller.stream).then((_) {
+ if (_isBound) {
+ // A new stream takes over - forward values to that stream.
+ _controllerCompleter.complete(this);
+ _controllerCompleter = null;
+ _controllerInstance = null;
+ } else {
+ // No new stream, .close was called. Close _target.
+ _closeTarget();
+ }
+ }, onError: (error, stackTrace) {
+ if (_isBound) {
+ // A new stream takes over - forward errors to that stream.
+ _controllerCompleter.completeError(error, stackTrace);
+ _controllerCompleter = null;
+ _controllerInstance = null;
+ } else {
+ // No new stream. No need to close target, as it has already
+ // failed.
+ _completeDoneError(error, stackTrace);
+ }
+ });
+ }
return _controllerInstance;
}
}
-
« no previous file with comments | « lib/src/copy/bytes_builder.dart ('k') | lib/src/copy/web_socket.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698