Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(72)

Side by Side Diff: lib/src/guarantee_channel.dart

Issue 1662773003: Add StreamChannel.withGuarantees and StreamChannelController. (Closed) Base URL: git@github.com:dart-lang/stream_channel.git@master
Patch Set: Code review changes Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « CHANGELOG.md ('k') | lib/src/stream_channel_controller.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file.
4
5 import 'dart:async';
6
7 import 'package:async/async.dart';
8
9 import '../stream_channel.dart';
10
11 /// A [StreamChannel] that enforces the stream channel guarantees.
12 ///
13 /// This is exposed via [new StreamChannel.withGuarantees].
14 class GuaranteeChannel<T> extends StreamChannelMixin<T> {
15 Stream<T> get stream => _streamController.stream;
16
17 StreamSink<T> get sink => _sink;
18 _GuaranteeSink<T> _sink;
19
20 /// The controller for [stream].
21 ///
22 /// This intermediate controller allows us to continue listening for a done
23 /// event even after the user has canceled their subscription, and to send our
24 /// own done event when the sink is closed.
25 StreamController<T> _streamController;
26
27 /// The subscription to the inner stream.
28 StreamSubscription<T> _subscription;
29
30 /// Whether the sink has closed, causing the underlying channel to disconnect.
31 bool _disconnected = false;
32
33 GuaranteeChannel(Stream<T> innerStream, StreamSink<T> innerSink) {
34 _sink = new _GuaranteeSink<T>(innerSink, this);
35
36 // Enforce the single-subscription guarantee by changing a broadcast stream
37 // to single-subscription.
38 if (innerStream.isBroadcast) {
39 innerStream = innerStream.transform(
40 const SingleSubscriptionTransformer());
41 }
42
43 _streamController = new StreamController<T>(onListen: () {
44 // If the sink has disconnected, we've already called
45 // [_streamController.close].
46 if (_disconnected) return;
47
48 _subscription = innerStream.listen(_streamController.add,
49 onError: _streamController.addError,
50 onDone: () {
51 _sink._onStreamDisconnected();
52 _streamController.close();
53 });
54 }, sync: true);
55 }
56
57 /// Called by [_GuaranteeSink] when the user closes it.
58 ///
59 /// The sink closing indicates that the connection is closed, so the stream
60 /// should stop emitting events.
61 void _onSinkDisconnected() {
62 _disconnected = true;
63 if (_subscription != null) _subscription.cancel();
64 _streamController.close();
65 }
66 }
67
68 /// The sink for [GuaranteeChannel].
69 ///
70 /// This wraps the inner sink to ignore events and cancel any in-progress
71 /// [addStream] calls when the underlying channel closes.
72 class _GuaranteeSink<T> implements StreamSink<T> {
73 /// The inner sink being wrapped.
74 final StreamSink<T> _inner;
75
76 /// The [GuaranteeChannel] this belongs to.
77 final GuaranteeChannel<T> _channel;
78
79 Future get done => _inner.done;
80
81 /// Whether the stream has emitted a done event, causing the underlying
82 /// channel to disconnect.
83 bool _disconnected = false;
84
85 /// Whether the user has called [close].
86 bool _closed = false;
87
88 /// The subscription to the stream passed to [addStream], if a stream is
89 /// currently being added.
90 StreamSubscription<T> _addStreamSubscription;
91
92 /// The completer for the future returned by [addStream], if a stream is
93 /// currently being added.
94 Completer _addStreamCompleter;
95
96 /// Whether we're currently adding a stream with [addStream].
97 bool get _inAddStream => _addStreamSubscription != null;
98
99 _GuaranteeSink(this._inner, this._channel);
100
101 void add(T data) {
102 if (_closed) throw new StateError("Cannot add event after closing.");
103 if (_inAddStream) {
104 throw new StateError("Cannot add event while adding stream.");
105 }
106 if (_disconnected) return;
107
108 _inner.add(data);
109 }
110
111 void addError(error, [StackTrace stackTrace]) {
112 if (_closed) throw new StateError("Cannot add event after closing.");
113 if (_inAddStream) {
114 throw new StateError("Cannot add event while adding stream.");
115 }
116 if (_disconnected) return;
117
118 _inner.addError(error, stackTrace);
119 }
120
121 Future addStream(Stream<T> stream) {
122 if (_closed) throw new StateError("Cannot add stream after closing.");
123 if (_inAddStream) {
124 throw new StateError("Cannot add stream while adding stream.");
125 }
126 if (_disconnected) return new Future.value();
127
128 _addStreamCompleter = new Completer.sync();
129 _addStreamSubscription = stream.listen(
130 _inner.add,
131 onError: _inner.addError,
132 onDone: _addStreamCompleter.complete);
133 return _addStreamCompleter.future.then((_) {
134 _addStreamCompleter = null;
135 _addStreamSubscription = null;
136 });
137 }
138
139 Future close() {
140 if (_inAddStream) {
141 throw new StateError("Cannot close sink while adding stream.");
142 }
143
144 _closed = true;
145 if (_disconnected) return new Future.value();
146
147 _channel._onSinkDisconnected();
148 return _inner.close();
149 }
150
151 /// Called by [GuaranteeChannel] when the stream emits a done event.
152 ///
153 /// The stream being done indicates that the connection is closed, so the
154 /// sink should stop forwarding events.
155 void _onStreamDisconnected() {
156 _disconnected = true;
157 if (!_inAddStream) return;
158
159 _addStreamCompleter.complete(_addStreamSubscription.cancel());
160 _addStreamCompleter = null;
161 _addStreamSubscription = null;
162 }
163 }
OLDNEW
« no previous file with comments | « CHANGELOG.md ('k') | lib/src/stream_channel_controller.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698