Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(253)

Side by Side Diff: lib/src/copy/io_sink.dart

Issue 1947683006: Bring in the latest version of the SDK's WebSocket impl. (Closed) Base URL: git@github.com:dart-lang/web_socket_channel.git@master
Patch Set: Merge again Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « lib/src/copy/bytes_builder.dart ('k') | lib/src/copy/web_socket.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 // The following code is copied from sdk/lib/io/io_sink.dart. The "dart:io" 5 // The following code is copied from sdk/lib/io/io_sink.dart. The "dart:io"
6 // implementation isn't used directly to support non-"dart:io" applications. 6 // implementation isn't used directly to support non-"dart:io" applications.
7 // 7 //
8 // Because it's copied directly, only modifications necessary to support the 8 // Because it's copied directly, only modifications necessary to support the
9 // desired public API and to remove "dart:io" dependencies have been made. 9 // desired public API and to remove "dart:io" dependencies have been made.
10 // 10 //
11 // This is up-to-date as of sdk revision 11 // This is up-to-date as of sdk revision
12 // 86227840d75d974feb238f8b3c59c038b99c05cf. 12 // e41fb4cafd6052157dbc1490d437045240f4773f.
13
13 import 'dart:async'; 14 import 'dart:async';
14 15
15 class StreamSinkImpl<T> implements StreamSink<T> { 16 class StreamSinkImpl<T> implements StreamSink<T> {
16 final StreamConsumer<T> _target; 17 final StreamConsumer<T> _target;
17 Completer _doneCompleter = new Completer(); 18 final Completer _doneCompleter = new Completer();
18 Future _doneFuture;
19 StreamController<T> _controllerInstance; 19 StreamController<T> _controllerInstance;
20 Completer _controllerCompleter; 20 Completer _controllerCompleter;
21 bool _isClosed = false; 21 bool _isClosed = false;
22 bool _isBound = false; 22 bool _isBound = false;
23 bool _hasError = false; 23 bool _hasError = false;
24 24
25 StreamSinkImpl(this._target) { 25 StreamSinkImpl(this._target);
26 _doneFuture = _doneCompleter.future;
27 }
28 26
29 void add(T data) { 27 void add(T data) {
30 if (_isClosed) return; 28 if (_isClosed) return;
31 _controller.add(data); 29 _controller.add(data);
32 } 30 }
33 31
34 void addError(error, [StackTrace stackTrace]) { 32 void addError(error, [StackTrace stackTrace]) {
35 _controller.addError(error, stackTrace); 33 _controller.addError(error, stackTrace);
36 } 34 }
37 35
(...skipping 20 matching lines...) Expand all
58 if (_isBound) { 56 if (_isBound) {
59 throw new StateError("StreamSink is bound to a stream"); 57 throw new StateError("StreamSink is bound to a stream");
60 } 58 }
61 if (_controllerInstance == null) return new Future.value(this); 59 if (_controllerInstance == null) return new Future.value(this);
62 // Adding an empty stream-controller will return a future that will complete 60 // Adding an empty stream-controller will return a future that will complete
63 // when all data is done. 61 // when all data is done.
64 _isBound = true; 62 _isBound = true;
65 var future = _controllerCompleter.future; 63 var future = _controllerCompleter.future;
66 _controllerInstance.close(); 64 _controllerInstance.close();
67 return future.whenComplete(() { 65 return future.whenComplete(() {
68 _isBound = false; 66 _isBound = false;
69 }); 67 });
70 } 68 }
71 69
72 Future close() { 70 Future close() {
73 if (_isBound) { 71 if (_isBound) {
74 throw new StateError("StreamSink is bound to a stream"); 72 throw new StateError("StreamSink is bound to a stream");
75 } 73 }
76 if (!_isClosed) { 74 if (!_isClosed) {
77 _isClosed = true; 75 _isClosed = true;
78 if (_controllerInstance != null) { 76 if (_controllerInstance != null) {
79 _controllerInstance.close(); 77 _controllerInstance.close();
80 } else { 78 } else {
81 _closeTarget(); 79 _closeTarget();
82 } 80 }
83 } 81 }
84 return done; 82 return done;
85 } 83 }
86 84
87 void _closeTarget() { 85 void _closeTarget() {
88 _target.close().then(_completeDoneValue, onError: _completeDoneError); 86 _target.close().then(_completeDoneValue, onError: _completeDoneError);
89 } 87 }
90 88
91 Future get done => _doneFuture; 89 Future get done => _doneCompleter.future;
92 90
93 void _completeDoneValue(value) { 91 void _completeDoneValue(value) {
94 if (_doneCompleter == null) return; 92 if (!_doneCompleter.isCompleted) {
95 _doneCompleter.complete(value); 93 _doneCompleter.complete(value);
96 _doneCompleter = null; 94 }
97 } 95 }
98 96
99 void _completeDoneError(error, StackTrace stackTrace) { 97 void _completeDoneError(error, StackTrace stackTrace) {
100 if (_doneCompleter == null) return; 98 if (!_doneCompleter.isCompleted) {
101 _hasError = true; 99 _hasError = true;
102 _doneCompleter.completeError(error, stackTrace); 100 _doneCompleter.completeError(error, stackTrace);
103 _doneCompleter = null; 101 }
104 } 102 }
105 103
106 StreamController<T> get _controller { 104 StreamController<T> get _controller {
107 if (_isBound) { 105 if (_isBound) {
108 throw new StateError("StreamSink is bound to a stream"); 106 throw new StateError("StreamSink is bound to a stream");
109 } 107 }
110 if (_isClosed) { 108 if (_isClosed) {
111 throw new StateError("StreamSink is closed"); 109 throw new StateError("StreamSink is closed");
112 } 110 }
113 if (_controllerInstance == null) { 111 if (_controllerInstance == null) {
114 _controllerInstance = new StreamController<T>(sync: true); 112 _controllerInstance = new StreamController<T>(sync: true);
115 _controllerCompleter = new Completer(); 113 _controllerCompleter = new Completer();
116 _target.addStream(_controller.stream) 114 _target.addStream(_controller.stream).then((_) {
117 .then( 115 if (_isBound) {
118 (_) { 116 // A new stream takes over - forward values to that stream.
119 if (_isBound) { 117 _controllerCompleter.complete(this);
120 // A new stream takes over - forward values to that stream. 118 _controllerCompleter = null;
121 _controllerCompleter.complete(this); 119 _controllerInstance = null;
122 _controllerCompleter = null; 120 } else {
123 _controllerInstance = null; 121 // No new stream, .close was called. Close _target.
124 } else { 122 _closeTarget();
125 // No new stream, .close was called. Close _target. 123 }
126 _closeTarget(); 124 }, onError: (error, stackTrace) {
127 } 125 if (_isBound) {
128 }, 126 // A new stream takes over - forward errors to that stream.
129 onError: (error, stackTrace) { 127 _controllerCompleter.completeError(error, stackTrace);
130 if (_isBound) { 128 _controllerCompleter = null;
131 // A new stream takes over - forward errors to that stream. 129 _controllerInstance = null;
132 _controllerCompleter.completeError(error, stackTrace); 130 } else {
133 _controllerCompleter = null; 131 // No new stream. No need to close target, as it has already
134 _controllerInstance = null; 132 // failed.
135 } else { 133 _completeDoneError(error, stackTrace);
136 // No new stream. No need to close target, as it have already 134 }
137 // failed. 135 });
138 _completeDoneError(error, stackTrace); 136 }
139 }
140 });
141 }
142 return _controllerInstance; 137 return _controllerInstance;
143 } 138 }
144 } 139 }
145
OLDNEW
« no previous file with comments | « lib/src/copy/bytes_builder.dart ('k') | lib/src/copy/web_socket.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698