OLD | NEW |
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 // The following code is copied from sdk/lib/io/io_sink.dart. The "dart:io" | 5 // The following code is copied from sdk/lib/io/io_sink.dart. The "dart:io" |
6 // implementation isn't used directly to support non-"dart:io" applications. | 6 // implementation isn't used directly to support non-"dart:io" applications. |
7 // | 7 // |
8 // Because it's copied directly, only modifications necessary to support the | 8 // Because it's copied directly, only modifications necessary to support the |
9 // desired public API and to remove "dart:io" dependencies have been made. | 9 // desired public API and to remove "dart:io" dependencies have been made. |
10 // | 10 // |
11 // This is up-to-date as of sdk revision | 11 // This is up-to-date as of sdk revision |
12 // 86227840d75d974feb238f8b3c59c038b99c05cf. | 12 // e41fb4cafd6052157dbc1490d437045240f4773f. |
| 13 |
13 import 'dart:async'; | 14 import 'dart:async'; |
14 | 15 |
15 class StreamSinkImpl<T> implements StreamSink<T> { | 16 class StreamSinkImpl<T> implements StreamSink<T> { |
16 final StreamConsumer<T> _target; | 17 final StreamConsumer<T> _target; |
17 Completer _doneCompleter = new Completer(); | 18 final Completer _doneCompleter = new Completer(); |
18 Future _doneFuture; | |
19 StreamController<T> _controllerInstance; | 19 StreamController<T> _controllerInstance; |
20 Completer _controllerCompleter; | 20 Completer _controllerCompleter; |
21 bool _isClosed = false; | 21 bool _isClosed = false; |
22 bool _isBound = false; | 22 bool _isBound = false; |
23 bool _hasError = false; | 23 bool _hasError = false; |
24 | 24 |
25 StreamSinkImpl(this._target) { | 25 StreamSinkImpl(this._target); |
26 _doneFuture = _doneCompleter.future; | |
27 } | |
28 | 26 |
29 void add(T data) { | 27 void add(T data) { |
30 if (_isClosed) return; | 28 if (_isClosed) return; |
31 _controller.add(data); | 29 _controller.add(data); |
32 } | 30 } |
33 | 31 |
34 void addError(error, [StackTrace stackTrace]) { | 32 void addError(error, [StackTrace stackTrace]) { |
35 _controller.addError(error, stackTrace); | 33 _controller.addError(error, stackTrace); |
36 } | 34 } |
37 | 35 |
(...skipping 20 matching lines...) Expand all Loading... |
58 if (_isBound) { | 56 if (_isBound) { |
59 throw new StateError("StreamSink is bound to a stream"); | 57 throw new StateError("StreamSink is bound to a stream"); |
60 } | 58 } |
61 if (_controllerInstance == null) return new Future.value(this); | 59 if (_controllerInstance == null) return new Future.value(this); |
62 // Adding an empty stream-controller will return a future that will complete | 60 // Adding an empty stream-controller will return a future that will complete |
63 // when all data is done. | 61 // when all data is done. |
64 _isBound = true; | 62 _isBound = true; |
65 var future = _controllerCompleter.future; | 63 var future = _controllerCompleter.future; |
66 _controllerInstance.close(); | 64 _controllerInstance.close(); |
67 return future.whenComplete(() { | 65 return future.whenComplete(() { |
68 _isBound = false; | 66 _isBound = false; |
69 }); | 67 }); |
70 } | 68 } |
71 | 69 |
72 Future close() { | 70 Future close() { |
73 if (_isBound) { | 71 if (_isBound) { |
74 throw new StateError("StreamSink is bound to a stream"); | 72 throw new StateError("StreamSink is bound to a stream"); |
75 } | 73 } |
76 if (!_isClosed) { | 74 if (!_isClosed) { |
77 _isClosed = true; | 75 _isClosed = true; |
78 if (_controllerInstance != null) { | 76 if (_controllerInstance != null) { |
79 _controllerInstance.close(); | 77 _controllerInstance.close(); |
80 } else { | 78 } else { |
81 _closeTarget(); | 79 _closeTarget(); |
82 } | 80 } |
83 } | 81 } |
84 return done; | 82 return done; |
85 } | 83 } |
86 | 84 |
87 void _closeTarget() { | 85 void _closeTarget() { |
88 _target.close().then(_completeDoneValue, onError: _completeDoneError); | 86 _target.close().then(_completeDoneValue, onError: _completeDoneError); |
89 } | 87 } |
90 | 88 |
91 Future get done => _doneFuture; | 89 Future get done => _doneCompleter.future; |
92 | 90 |
93 void _completeDoneValue(value) { | 91 void _completeDoneValue(value) { |
94 if (_doneCompleter == null) return; | 92 if (!_doneCompleter.isCompleted) { |
95 _doneCompleter.complete(value); | 93 _doneCompleter.complete(value); |
96 _doneCompleter = null; | 94 } |
97 } | 95 } |
98 | 96 |
99 void _completeDoneError(error, StackTrace stackTrace) { | 97 void _completeDoneError(error, StackTrace stackTrace) { |
100 if (_doneCompleter == null) return; | 98 if (!_doneCompleter.isCompleted) { |
101 _hasError = true; | 99 _hasError = true; |
102 _doneCompleter.completeError(error, stackTrace); | 100 _doneCompleter.completeError(error, stackTrace); |
103 _doneCompleter = null; | 101 } |
104 } | 102 } |
105 | 103 |
106 StreamController<T> get _controller { | 104 StreamController<T> get _controller { |
107 if (_isBound) { | 105 if (_isBound) { |
108 throw new StateError("StreamSink is bound to a stream"); | 106 throw new StateError("StreamSink is bound to a stream"); |
109 } | 107 } |
110 if (_isClosed) { | 108 if (_isClosed) { |
111 throw new StateError("StreamSink is closed"); | 109 throw new StateError("StreamSink is closed"); |
112 } | 110 } |
113 if (_controllerInstance == null) { | 111 if (_controllerInstance == null) { |
114 _controllerInstance = new StreamController<T>(sync: true); | 112 _controllerInstance = new StreamController<T>(sync: true); |
115 _controllerCompleter = new Completer(); | 113 _controllerCompleter = new Completer(); |
116 _target.addStream(_controller.stream) | 114 _target.addStream(_controller.stream).then((_) { |
117 .then( | 115 if (_isBound) { |
118 (_) { | 116 // A new stream takes over - forward values to that stream. |
119 if (_isBound) { | 117 _controllerCompleter.complete(this); |
120 // A new stream takes over - forward values to that stream. | 118 _controllerCompleter = null; |
121 _controllerCompleter.complete(this); | 119 _controllerInstance = null; |
122 _controllerCompleter = null; | 120 } else { |
123 _controllerInstance = null; | 121 // No new stream, .close was called. Close _target. |
124 } else { | 122 _closeTarget(); |
125 // No new stream, .close was called. Close _target. | 123 } |
126 _closeTarget(); | 124 }, onError: (error, stackTrace) { |
127 } | 125 if (_isBound) { |
128 }, | 126 // A new stream takes over - forward errors to that stream. |
129 onError: (error, stackTrace) { | 127 _controllerCompleter.completeError(error, stackTrace); |
130 if (_isBound) { | 128 _controllerCompleter = null; |
131 // A new stream takes over - forward errors to that stream. | 129 _controllerInstance = null; |
132 _controllerCompleter.completeError(error, stackTrace); | 130 } else { |
133 _controllerCompleter = null; | 131 // No new stream. No need to close target, as it has already |
134 _controllerInstance = null; | 132 // failed. |
135 } else { | 133 _completeDoneError(error, stackTrace); |
136 // No new stream. No need to close target, as it have already | 134 } |
137 // failed. | 135 }); |
138 _completeDoneError(error, stackTrace); | 136 } |
139 } | |
140 }); | |
141 } | |
142 return _controllerInstance; | 137 return _controllerInstance; |
143 } | 138 } |
144 } | 139 } |
145 | |
OLD | NEW |