OLD | NEW |
1 // Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 library json_rpc_2.two_way_stream; | 5 library json_rpc_2.two_way_stream; |
6 | 6 |
7 import 'dart:async'; | 7 import 'dart:async'; |
8 import 'dart:convert'; | 8 import 'dart:convert'; |
9 | 9 |
10 import 'utils.dart'; | 10 import 'utils.dart'; |
(...skipping 20 matching lines...) Expand all Loading... |
31 /// | 31 /// |
32 /// This takes decoded JSON objects. | 32 /// This takes decoded JSON objects. |
33 final StreamSink _output; | 33 final StreamSink _output; |
34 | 34 |
35 /// Returns a [Future] that completes when the connection is closed. | 35 /// Returns a [Future] that completes when the connection is closed. |
36 /// | 36 /// |
37 /// This is the same future that's returned by [listen]. | 37 /// This is the same future that's returned by [listen]. |
38 Future get done => _doneCompleter.future; | 38 Future get done => _doneCompleter.future; |
39 final _doneCompleter = new Completer(); | 39 final _doneCompleter = new Completer(); |
40 | 40 |
| 41 /// Whether the stream has been closed. |
| 42 bool get isClosed => _doneCompleter.isCompleted; |
| 43 |
41 /// Creates a two-way stream. | 44 /// Creates a two-way stream. |
42 /// | 45 /// |
43 /// [input] and [output] should emit and take (respectively) JSON-encoded | 46 /// [input] and [output] should emit and take (respectively) JSON-encoded |
44 /// strings. | 47 /// strings. |
45 /// | 48 /// |
46 /// [inputName] is used in error messages as the name of the input parameter. | 49 /// [inputName] is used in error messages as the name of the input parameter. |
47 /// [outputName] is likewise used as the name of the output parameter. | 50 /// [outputName] is likewise used as the name of the output parameter. |
48 /// | 51 /// |
49 /// If [onInvalidInput] is passed, any errors parsing messages from [input] | 52 /// If [onInvalidInput] is passed, any errors parsing messages from [input] |
50 /// are passed to it. Otherwise, they're ignored and the input is discarded. | 53 /// are passed to it. Otherwise, they're ignored and the input is discarded. |
(...skipping 45 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
96 /// The returned Future will complete when the input stream is closed. If the | 99 /// The returned Future will complete when the input stream is closed. If the |
97 /// input stream emits an error, that will be piped to the returned Future. | 100 /// input stream emits an error, that will be piped to the returned Future. |
98 Future listen(void handleInput(input)) { | 101 Future listen(void handleInput(input)) { |
99 if (_inputSubscription != null) { | 102 if (_inputSubscription != null) { |
100 throw new StateError("Can only call $_name.listen once."); | 103 throw new StateError("Can only call $_name.listen once."); |
101 } | 104 } |
102 | 105 |
103 _inputSubscription = _input.listen(handleInput, | 106 _inputSubscription = _input.listen(handleInput, |
104 onError: (error, stackTrace) { | 107 onError: (error, stackTrace) { |
105 if (_doneCompleter.isCompleted) return; | 108 if (_doneCompleter.isCompleted) return; |
| 109 _doneCompleter.completeError(error, stackTrace); |
106 _output.close(); | 110 _output.close(); |
107 _doneCompleter.completeError(error, stackTrace); | |
108 }, onDone: () { | 111 }, onDone: () { |
109 if (_doneCompleter.isCompleted) return; | 112 if (_doneCompleter.isCompleted) return; |
| 113 _doneCompleter.complete(); |
110 _output.close(); | 114 _output.close(); |
111 _doneCompleter.complete(); | |
112 }, cancelOnError: true); | 115 }, cancelOnError: true); |
113 | 116 |
114 return _doneCompleter.future; | 117 return _doneCompleter.future; |
115 } | 118 } |
116 | 119 |
117 /// Emit [event] on the output stream. | 120 /// Emit [event] on the output stream. |
118 void add(event) => _output.add(event); | 121 void add(event) => _output.add(event); |
119 | 122 |
120 /// Stops listening to the input stream and closes the output stream. | 123 /// Stops listening to the input stream and closes the output stream. |
121 Future close() { | 124 Future close() { |
122 if (_inputSubscription == null) { | 125 if (_inputSubscription == null) { |
123 throw new StateError("Can't call $_name.close before $_name.listen."); | 126 throw new StateError("Can't call $_name.close before $_name.listen."); |
124 } | 127 } |
125 | 128 |
| 129 _isClosed = true; |
126 if (!_doneCompleter.isCompleted) _doneCompleter.complete(); | 130 if (!_doneCompleter.isCompleted) _doneCompleter.complete(); |
127 | 131 |
128 var inputFuture = _inputSubscription.cancel(); | 132 var inputFuture = _inputSubscription.cancel(); |
129 // TODO(nweiz): include the output future in the return value when issue | 133 // TODO(nweiz): include the output future in the return value when issue |
130 // 19095 is fixed. | 134 // 19095 is fixed. |
131 _output.close(); | 135 _output.close(); |
132 return inputFuture == null ? new Future.value() : inputFuture; | 136 return inputFuture == null ? new Future.value() : inputFuture; |
133 } | 137 } |
134 } | 138 } |
OLD | NEW |