OLD | NEW |
---|---|
(Empty) | |
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file | |
Bob Nystrom
2016/02/02 17:54:15
2016! :)
| |
2 // for details. All rights reserved. Use of this source code is governed by a | |
3 // BSD-style license that can be found in the LICENSE file. | |
4 | |
5 import 'dart:async'; | |
6 | |
7 import 'package:stream_channel/stream_channel.dart'; | |
8 | |
9 /// Wraps a [StreamChannel] and handles logic that's shared between [Server], | |
10 /// [Client], and [Peer]. | |
11 /// | |
12 /// These classes don't provide the user direct access to a | |
13 /// [StreamSubscription]. Instead, they use the future returned by [listen] to | |
14 /// notify the user of the remote endpoint closing or producing an error. | |
15 class ChannelManager { | |
16 /// The name of the component whose channel is wrapped (e.g. "Server"). | |
17 /// | |
18 /// Used for error reporting. | |
19 final String _name; | |
20 | |
21 /// The underlying channel. | |
22 final StreamChannel _channel; | |
23 | |
24 /// Returns a [Future] that completes when the connection is closed. | |
25 /// | |
26 /// This is the same future that's returned by [listen]. | |
27 Future get done => _doneCompleter.future; | |
28 final _doneCompleter = new Completer(); | |
29 | |
30 /// Whether the underlying communication channel is closed. | |
31 bool get isClosed => _doneCompleter.isCompleted; | |
32 | |
33 /// Whether [listen] has been called. | |
34 bool _listenCalled = false; | |
35 | |
36 /// Whether [close] has been called. | |
37 /// | |
38 /// Note that [isClosed] tracks whether the underlying connection is closed, | |
39 /// whereas this tracks only whether it was explicitly closed from this end. | |
40 bool _closeCalled = false; | |
41 | |
42 ChannelManager(this._name, this._channel); | |
43 | |
44 /// Starts listening to the channel. | |
45 /// | |
46 /// The returned Future will complete when the input stream is closed. If the | |
47 /// input stream emits an error, that will be piped to the returned Future. | |
48 Future listen(void handleInput(input)) { | |
49 if (_listenCalled) { | |
50 throw new StateError("Can only call $_name.listen() once."); | |
51 } | |
52 _listenCalled = true; | |
53 | |
54 _channel.stream.listen(handleInput, | |
55 onError: (error, stackTrace) { | |
56 _doneCompleter.completeError(error, stackTrace); | |
57 _channel.sink.close(); | |
58 }, | |
59 onDone: _doneCompleter.complete, | |
60 cancelOnError: true); | |
61 | |
62 return done; | |
63 } | |
64 | |
65 /// Emit [event]. | |
66 void add(event) { | |
67 if (isClosed && !_closeCalled) return; | |
68 _channel.sink.add(event); | |
69 } | |
70 | |
71 /// Closes the channel. | |
72 Future close() { | |
73 _closeCalled = true; | |
74 if (!_doneCompleter.isCompleted) { | |
75 _doneCompleter.complete(_channel.sink.close()); | |
76 } | |
77 return done; | |
78 } | |
79 } | |
OLD | NEW |