OLD | NEW |
---|---|
1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 import 'dart:async'; | 5 import 'dart:async'; |
6 | 6 |
7 import 'package:async/async.dart'; | 7 import 'package:async/async.dart'; |
8 | 8 |
9 import '../stream_channel.dart'; | 9 import '../stream_channel.dart'; |
10 | 10 |
(...skipping 12 matching lines...) Expand all Loading... | |
23 /// event even after the user has canceled their subscription, and to send our | 23 /// event even after the user has canceled their subscription, and to send our |
24 /// own done event when the sink is closed. | 24 /// own done event when the sink is closed. |
25 StreamController<T> _streamController; | 25 StreamController<T> _streamController; |
26 | 26 |
27 /// The subscription to the inner stream. | 27 /// The subscription to the inner stream. |
28 StreamSubscription<T> _subscription; | 28 StreamSubscription<T> _subscription; |
29 | 29 |
30 /// Whether the sink has closed, causing the underlying channel to disconnect. | 30 /// Whether the sink has closed, causing the underlying channel to disconnect. |
31 bool _disconnected = false; | 31 bool _disconnected = false; |
32 | 32 |
33 GuaranteeChannel(Stream<T> innerStream, StreamSink<T> innerSink) { | 33 GuaranteeChannel(Stream<T> innerStream, StreamSink<T> innerSink, |
34 _sink = new _GuaranteeSink<T>(innerSink, this); | 34 {bool allowSinkErrors: true}) { |
35 _sink = new _GuaranteeSink<T>(innerSink, this, | |
36 allowErrors: allowSinkErrors); | |
35 | 37 |
36 // Enforce the single-subscription guarantee by changing a broadcast stream | 38 // Enforce the single-subscription guarantee by changing a broadcast stream |
37 // to single-subscription. | 39 // to single-subscription. |
38 if (innerStream.isBroadcast) { | 40 if (innerStream.isBroadcast) { |
39 innerStream = innerStream.transform( | 41 innerStream = innerStream.transform( |
40 const SingleSubscriptionTransformer()); | 42 const SingleSubscriptionTransformer()); |
41 } | 43 } |
42 | 44 |
43 _streamController = new StreamController<T>(onListen: () { | 45 _streamController = new StreamController<T>(onListen: () { |
44 // If the sink has disconnected, we've already called | 46 // If the sink has disconnected, we've already called |
(...skipping 24 matching lines...) Expand all Loading... | |
69 /// | 71 /// |
70 /// This wraps the inner sink to ignore events and cancel any in-progress | 72 /// This wraps the inner sink to ignore events and cancel any in-progress |
71 /// [addStream] calls when the underlying channel closes. | 73 /// [addStream] calls when the underlying channel closes. |
72 class _GuaranteeSink<T> implements StreamSink<T> { | 74 class _GuaranteeSink<T> implements StreamSink<T> { |
73 /// The inner sink being wrapped. | 75 /// The inner sink being wrapped. |
74 final StreamSink<T> _inner; | 76 final StreamSink<T> _inner; |
75 | 77 |
76 /// The [GuaranteeChannel] this belongs to. | 78 /// The [GuaranteeChannel] this belongs to. |
77 final GuaranteeChannel<T> _channel; | 79 final GuaranteeChannel<T> _channel; |
78 | 80 |
79 Future get done => _inner.done; | 81 Future get done => _doneCompleter.future; |
82 final _doneCompleter = new Completer(); | |
80 | 83 |
81 /// Whether the stream has emitted a done event, causing the underlying | 84 /// Whether connection is disconnected. |
82 /// channel to disconnect. | 85 /// |
86 /// This can happen because the stream has emitted a done event, or because | |
87 /// the user added an error when [_allowErrors] is `false`. | |
83 bool _disconnected = false; | 88 bool _disconnected = false; |
84 | 89 |
85 /// Whether the user has called [close]. | 90 /// Whether the user has called [close]. |
86 bool _closed = false; | 91 bool _closed = false; |
87 | 92 |
88 /// The subscription to the stream passed to [addStream], if a stream is | 93 /// The subscription to the stream passed to [addStream], if a stream is |
89 /// currently being added. | 94 /// currently being added. |
90 StreamSubscription<T> _addStreamSubscription; | 95 StreamSubscription<T> _addStreamSubscription; |
91 | 96 |
92 /// The completer for the future returned by [addStream], if a stream is | 97 /// The completer for the future returned by [addStream], if a stream is |
93 /// currently being added. | 98 /// currently being added. |
94 Completer _addStreamCompleter; | 99 Completer _addStreamCompleter; |
95 | 100 |
96 /// Whether we're currently adding a stream with [addStream]. | 101 /// Whether we're currently adding a stream with [addStream]. |
97 bool get _inAddStream => _addStreamSubscription != null; | 102 bool get _inAddStream => _addStreamSubscription != null; |
98 | 103 |
99 _GuaranteeSink(this._inner, this._channel); | 104 /// Whether errors are passed on to the underlying sink. |
105 /// | |
106 /// If this is `false`, any error passed to the sink is piped to [done] and | |
107 /// the underlying sink is closed. | |
108 final bool _allowErrors; | |
109 | |
110 _GuaranteeSink(this._inner, this._channel, {bool allowErrors: true}) | |
111 : _allowErrors = allowErrors; | |
100 | 112 |
101 void add(T data) { | 113 void add(T data) { |
102 if (_closed) throw new StateError("Cannot add event after closing."); | 114 if (_closed) throw new StateError("Cannot add event after closing."); |
103 if (_inAddStream) { | 115 if (_inAddStream) { |
104 throw new StateError("Cannot add event while adding stream."); | 116 throw new StateError("Cannot add event while adding stream."); |
105 } | 117 } |
106 if (_disconnected) return; | 118 if (_disconnected) return; |
107 | 119 |
108 _inner.add(data); | 120 _inner.add(data); |
109 } | 121 } |
110 | 122 |
111 void addError(error, [StackTrace stackTrace]) { | 123 void addError(error, [StackTrace stackTrace]) { |
112 if (_closed) throw new StateError("Cannot add event after closing."); | 124 if (_closed) throw new StateError("Cannot add event after closing."); |
113 if (_inAddStream) { | 125 if (_inAddStream) { |
114 throw new StateError("Cannot add event while adding stream."); | 126 throw new StateError("Cannot add event while adding stream."); |
115 } | 127 } |
116 if (_disconnected) return; | 128 if (_disconnected) return; |
117 | 129 |
118 _inner.addError(error, stackTrace); | 130 if (_allowErrors) { |
131 _inner.addError(error, stackTrace); | |
132 return; | |
133 } | |
134 | |
135 _doneCompleter.completeError(error, stackTrace); | |
136 | |
137 // Treat an error like both the stream and sink disconnecting. | |
138 _onStreamDisconnected(); | |
139 _channel._onSinkDisconnected(); | |
140 | |
141 // Ignore errors from the inner sink. We're already surfacing one error, and | |
142 // if the user handles it we don't want them to have another top-level. | |
143 _inner.close().catchError((_) {}); | |
119 } | 144 } |
120 | 145 |
121 Future addStream(Stream<T> stream) { | 146 Future addStream(Stream<T> stream) { |
122 if (_closed) throw new StateError("Cannot add stream after closing."); | 147 if (_closed) throw new StateError("Cannot add stream after closing."); |
123 if (_inAddStream) { | 148 if (_inAddStream) { |
124 throw new StateError("Cannot add stream while adding stream."); | 149 throw new StateError("Cannot add stream while adding stream."); |
125 } | 150 } |
126 if (_disconnected) return new Future.value(); | 151 if (_disconnected) return new Future.value(); |
127 | 152 |
128 _addStreamCompleter = new Completer.sync(); | 153 _addStreamCompleter = new Completer.sync(); |
129 _addStreamSubscription = stream.listen( | 154 _addStreamSubscription = stream.listen( |
130 _inner.add, | 155 _inner.add, |
131 onError: _inner.addError, | 156 onError: _inner.addError, |
132 onDone: _addStreamCompleter.complete); | 157 onDone: _addStreamCompleter.complete); |
133 return _addStreamCompleter.future.then((_) { | 158 return _addStreamCompleter.future.then((_) { |
134 _addStreamCompleter = null; | 159 _addStreamCompleter = null; |
135 _addStreamSubscription = null; | 160 _addStreamSubscription = null; |
136 }); | 161 }); |
137 } | 162 } |
138 | 163 |
139 Future close() { | 164 Future close() { |
140 if (_inAddStream) { | 165 if (_inAddStream) { |
141 throw new StateError("Cannot close sink while adding stream."); | 166 throw new StateError("Cannot close sink while adding stream."); |
142 } | 167 } |
143 | 168 |
144 _closed = true; | 169 _closed = true; |
145 if (_disconnected) return new Future.value(); | 170 if (_disconnected) return done; |
tjblasi
2016/02/04 23:16:01
mega-nit: Since here & line 174 return the same va
| |
146 | 171 |
147 _channel._onSinkDisconnected(); | 172 _channel._onSinkDisconnected(); |
148 return _inner.close(); | 173 _doneCompleter.complete(_inner.close()); |
174 return done; | |
149 } | 175 } |
150 | 176 |
151 /// Called by [GuaranteeChannel] when the stream emits a done event. | 177 /// Called by [GuaranteeChannel] when the stream emits a done event. |
152 /// | 178 /// |
153 /// The stream being done indicates that the connection is closed, so the | 179 /// The stream being done indicates that the connection is closed, so the |
154 /// sink should stop forwarding events. | 180 /// sink should stop forwarding events. |
155 void _onStreamDisconnected() { | 181 void _onStreamDisconnected() { |
156 _disconnected = true; | 182 _disconnected = true; |
183 if (!_doneCompleter.isCompleted) _doneCompleter.complete(); | |
184 | |
157 if (!_inAddStream) return; | 185 if (!_inAddStream) return; |
158 | |
159 _addStreamCompleter.complete(_addStreamSubscription.cancel()); | 186 _addStreamCompleter.complete(_addStreamSubscription.cancel()); |
160 _addStreamCompleter = null; | 187 _addStreamCompleter = null; |
161 _addStreamSubscription = null; | 188 _addStreamSubscription = null; |
162 } | 189 } |
163 } | 190 } |
OLD | NEW |