Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(341)

Side by Side Diff: lib/src/disconnector.dart

Issue 1679193002: Add a Disconnector class. (Closed) Base URL: git@github.com:dart-lang/stream_channel.git@master
Patch Set: Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « CHANGELOG.md ('k') | lib/stream_channel.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file.
4
5 import 'dart:async';
6
7 import '../stream_channel.dart';
8
9 /// Allows the caller to force a channel to disconnect.
10 ///
11 /// When [disconnect] is called, the channel (or channels) transformed by this
12 /// transformer will act as though the remote end had disconnected—the stream
13 /// will emit a done event, and the sink will ignore future inputs. The inner
14 /// sink will also be closed to notify the remote end of the disconnection.
15 ///
16 /// If a channel is transformed after the [disconnect] has been called, it will
17 /// be disconnected immediately.
18 class Disconnector<T> implements StreamChannelTransformer<T, T> {
19 /// Whether [disconnect] has been called.
20 bool get isDisconnected => _isDisconnected;
21 var _isDisconnected = false;
22
23 /// The sinks for transformed channels.
24 ///
25 /// Note that we assume that transformed channels provide the stream channel
26 /// guarantees. This allows us to only track sinks, because we know closing
27 /// the underlying sink will cause the stream to emit a done event.
28 final _sinks = <_DisconnectorSink<T>>[];
29
30 /// Disconnects all channels that have been transformed.
31 void disconnect() {
32 _isDisconnected = true;
33 for (var sink in _sinks) {
34 sink._disconnect();
35 }
36 _sinks.clear();
37 }
38
39 StreamChannel<T> bind(StreamChannel<T> channel) {
40 return channel.changeSink((innerSink) {
41 var sink = new _DisconnectorSink(innerSink);
42
43 if (_isDisconnected) {
44 sink._disconnect();
45 } else {
46 _sinks.add(sink);
47 }
48
49 return sink;
50 });
51 }
52 }
53
54 /// A sink wrapper that can force a disconnection.
55 class _DisconnectorSink<T> implements StreamSink<T> {
56 /// The inner sink.
57 final StreamSink<T> _inner;
58
59 Future get done => _inner.done;
60
61 /// Whether [Disconnector.disconnect] has been called.
62 var _isDisconnected = false;
tjblasi 2016/02/09 16:45:37 Out of curiosity, what is your rule for typing pro
nweiz 2016/02/09 19:10:53 In general, I don't type a field if its type would
63
64 /// Whether the user has called [close].
65 var _closed = false;
66
67 /// The subscription to the stream passed to [addStream], if a stream is
68 /// currently being added.
69 StreamSubscription<T> _addStreamSubscription;
70
71 /// The completer for the future returned by [addStream], if a stream is
72 /// currently being added.
73 Completer _addStreamCompleter;
74
75 /// Whether we're currently adding a stream with [addStream].
76 bool get _inAddStream => _addStreamSubscription != null;
77
78 _DisconnectorSink(this._inner);
79
80 void add(T data) {
81 if (_closed) throw new StateError("Cannot add event after closing.");
82 if (_inAddStream) {
83 throw new StateError("Cannot add event while adding stream.");
84 }
85 if (_isDisconnected) return;
86
87 _inner.add(data);
88 }
89
90 void addError(error, [StackTrace stackTrace]) {
91 if (_closed) throw new StateError("Cannot add event after closing.");
92 if (_inAddStream) {
93 throw new StateError("Cannot add event while adding stream.");
94 }
95 if (_isDisconnected) return;
96
97 _inner.addError(error, stackTrace);
98 }
99
100 Future addStream(Stream<T> stream) {
101 if (_closed) throw new StateError("Cannot add stream after closing.");
102 if (_inAddStream) {
103 throw new StateError("Cannot add stream while adding stream.");
104 }
105 if (_isDisconnected) return new Future.value();
106
107 _addStreamCompleter = new Completer.sync();
108 _addStreamSubscription = stream.listen(
109 _inner.add,
110 onError: _inner.addError,
111 onDone: _addStreamCompleter.complete);
112 return _addStreamCompleter.future.then((_) {
113 _addStreamCompleter = null;
114 _addStreamSubscription = null;
115 });
116 }
117
118 Future close() {
119 if (_inAddStream) {
120 throw new StateError("Cannot close sink while adding stream.");
121 }
122
123 _closed = true;
124 return _inner.close();
125 }
126
127 /// Disconnects this sink.
128 ///
129 /// This closes the underlying sink and stops forwarding events.
130 void _disconnect() {
131 _isDisconnected = true;
132 _inner.close();
133
134 if (!_inAddStream) return;
135 _addStreamCompleter.complete(_addStreamSubscription.cancel());
136 _addStreamCompleter = null;
137 _addStreamSubscription = null;
138 }
139 }
OLDNEW
« no previous file with comments | « CHANGELOG.md ('k') | lib/stream_channel.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698