OLD | NEW |
---|---|
(Empty) | |
1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file | |
2 // for details. All rights reserved. Use of this source code is governed by a | |
3 // BSD-style license that can be found in the LICENSE file. | |
4 | |
5 import 'dart:async'; | |
6 | |
7 import 'package:async/async.dart'; | |
8 | |
9 import '../stream_channel.dart'; | |
10 | |
11 /// A [StreamChannel] that enforces the stream channel guarantees. | |
tjblasi
2016/02/04 00:33:47
Consider adding a pointer to docs explaining what
nweiz
2016/02/04 01:35:04
The guarantees are explained in the StreamChannel
| |
12 /// | |
13 /// This is exposed via [new StreamChannel.withGuarantees]. | |
14 class GuaranteeChannel<T> extends StreamChannelMixin<T> { | |
15 Stream<T> get stream => _streamController.stream; | |
16 | |
17 StreamSink<T> get sink => _sink; | |
18 _GuaranteeSink<T> _sink; | |
tjblasi
2016/02/04 00:33:47
`_sink`, `_streamController`, and `_subscription`
nweiz
2016/02/04 01:35:04
Unfortunately not :(. _sink's constructor needs to
| |
19 | |
20 /// The controller for [stream]. | |
21 /// | |
22 /// This intermediate controller allows us to continue listening for a done | |
23 /// event even after the user has canceled their subscription, and to send our | |
24 /// own done event when the sink is closed. | |
25 StreamController<T> _streamController; | |
26 | |
27 /// The subscription to the inner stream. | |
28 StreamSubscription<T> _subscription; | |
29 | |
30 /// Whether the sink has closed, causing the underlying channel to disconnect. | |
31 bool _disconnected = false; | |
32 | |
33 GuaranteeChannel(Stream<T> innerStream, StreamSink<T> innerSink) { | |
34 _sink = new _GuaranteeSink<T>(innerSink, this); | |
35 | |
36 // Enforce the single-subscription guarantee by changing a broadcast stream | |
37 // to single-subscription. | |
38 if (innerStream.isBroadcast) { | |
39 innerStream = innerStream.transform( | |
40 const SingleSubscriptionTransformer()); | |
41 } | |
42 | |
43 _streamController = new StreamController<T>(onListen: () { | |
44 // If the sink has disconnected, we've already called | |
45 // [_streamController.close]. | |
46 if (_disconnected) return; | |
47 | |
48 _subscription = innerStream.listen(_streamController.add, | |
49 onError: _streamController.addError, | |
50 onDone: () { | |
51 _sink._onStreamDisconnected(); | |
52 _streamController.close(); | |
53 }); | |
54 }, sync: true); | |
55 } | |
56 | |
57 /// Called by [_GuaranteeSink] when the user closes it. | |
58 /// | |
59 /// The sink closing indicates that the connection is closed, so the stream | |
60 /// should stop emitting events. | |
61 void _onSinkDisconnected() { | |
62 _disconnected = true; | |
63 if (_subscription != null) _subscription.cancel(); | |
64 _streamController.close(); | |
65 } | |
66 } | |
67 | |
68 /// The sink for [GuaranteeChannel]. | |
69 /// | |
70 /// This wraps the inner sink to ignore events and cancel any in-progress | |
71 /// [addStream] calls when the underlying channel closes. | |
72 class _GuaranteeSink<T> implements StreamSink<T> { | |
73 /// The inner sink being wrapped. | |
74 final StreamSink<T> _inner; | |
75 | |
76 /// The [GuaranteeChannel] this belongs to. | |
77 final GuaranteeChannel<T> _channel; | |
78 | |
79 Future get done => _inner.done; | |
80 | |
81 /// Whether the stream has emitted a done event, causing the underlying | |
82 /// channel to disconnect. | |
83 bool _disconnected = false; | |
84 | |
85 /// Whether the user has called [close]. | |
86 bool _closed = false; | |
87 | |
88 /// The subscription to the stream passed to [addStream], if a stream is | |
89 /// currently being added. | |
90 StreamSubscription<T> _addStreamSubscription; | |
91 | |
92 /// The completer for the future returned by [addStream], if a stream is | |
93 /// currently being added. | |
94 Completer _addStreamCompleter; | |
95 | |
96 /// Whether we're currently adding a stream with [addStream]. | |
97 bool get _inAddStream => _addStreamSubscription != null; | |
98 | |
99 _GuaranteeSink(this._inner, this._channel); | |
100 | |
101 void add(T data) { | |
102 if (_closed) throw new StateError("Cannot add event after closing."); | |
103 if (_inAddStream) { | |
104 throw new StateError("Cannot add event while adding stream."); | |
105 } | |
106 if (_disconnected) return; | |
107 | |
108 _inner.add(data); | |
109 } | |
110 | |
111 void addError(error, [StackTrace stackTrace]) { | |
112 if (_closed) throw new StateError("Cannot add event after closing."); | |
113 if (_inAddStream) { | |
114 throw new StateError("Cannot add event while adding stream."); | |
115 } | |
116 if (_disconnected) return; | |
117 | |
118 _inner.addError(error, stackTrace); | |
119 } | |
120 | |
121 Future addStream(Stream<T> stream) { | |
122 if (_closed) throw new StateError("Cannot add stream after closing."); | |
123 if (_inAddStream) { | |
124 throw new StateError("Cannot add stream while adding stream."); | |
125 } | |
126 if (_disconnected) return new Future.value(); | |
127 | |
128 _addStreamCompleter = new Completer.sync(); | |
129 _addStreamSubscription = stream.listen( | |
130 _inner.add, | |
131 onError: _inner.addError, | |
132 onDone: _addStreamCompleter.complete); | |
133 return _addStreamCompleter.future.then((_) { | |
134 _addStreamCompleter = null; | |
135 _addStreamSubscription = null; | |
136 }); | |
137 } | |
138 | |
139 Future close() { | |
140 if (_inAddStream) { | |
141 throw new StateError("Cannot close sink while adding stream."); | |
142 } | |
143 | |
144 _closed = true; | |
145 if (_disconnected) return new Future.value(); | |
146 | |
147 _channel._onSinkDisconnected(); | |
148 return _inner.close(); | |
149 } | |
150 | |
151 /// Called by [GuaranteeChannel] when the stream emits a done event. | |
152 /// | |
153 /// The stream being done indicates that the connection is closed, so the | |
154 /// sink should stop forwarding events. | |
155 void _onStreamDisconnected() { | |
156 _disconnected = true; | |
157 if (!_inAddStream) return; | |
158 | |
159 _addStreamCompleter.complete(_addStreamSubscription.cancel()); | |
160 _addStreamCompleter = null; | |
161 _addStreamSubscription = null; | |
162 } | |
163 } | |
OLD | NEW |