| OLD | NEW |
| 1 // Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 library json_rpc_2.two_way_stream; | 5 library json_rpc_2.two_way_stream; |
| 6 | 6 |
| 7 import 'dart:async'; | 7 import 'dart:async'; |
| 8 import 'dart:convert'; | 8 import 'dart:convert'; |
| 9 | 9 |
| 10 import 'utils.dart'; | 10 import 'utils.dart'; |
| (...skipping 14 matching lines...) Expand all Loading... |
| 25 final Stream _input; | 25 final Stream _input; |
| 26 | 26 |
| 27 /// The subscription to [_input]. | 27 /// The subscription to [_input]. |
| 28 StreamSubscription _inputSubscription; | 28 StreamSubscription _inputSubscription; |
| 29 | 29 |
| 30 /// The output sink. | 30 /// The output sink. |
| 31 /// | 31 /// |
| 32 /// This takes decoded JSON objects. | 32 /// This takes decoded JSON objects. |
| 33 final StreamSink _output; | 33 final StreamSink _output; |
| 34 | 34 |
| 35 /// The completer for [listen]. | 35 /// Returns a [Future] that completes when the connection is closed. |
| 36 /// | 36 /// |
| 37 /// This is non-`null` after [listen] has been called. | 37 /// This is the same future that's returned by [listen]. |
| 38 Completer _listenCompleter; | 38 Future get done => _doneCompleter.future; |
| 39 final _doneCompleter = new Completer(); |
| 39 | 40 |
| 40 /// Creates a two-way stream. | 41 /// Creates a two-way stream. |
| 41 /// | 42 /// |
| 42 /// [input] and [output] should emit and take (respectively) JSON-encoded | 43 /// [input] and [output] should emit and take (respectively) JSON-encoded |
| 43 /// strings. | 44 /// strings. |
| 44 /// | 45 /// |
| 45 /// [inputName] is used in error messages as the name of the input parameter. | 46 /// [inputName] is used in error messages as the name of the input parameter. |
| 46 /// [outputName] is likewise used as the name of the output parameter. | 47 /// [outputName] is likewise used as the name of the output parameter. |
| 47 /// | 48 /// |
| 48 /// If [onInvalidInput] is passed, any errors parsing messages from [input] | 49 /// If [onInvalidInput] is passed, any errors parsing messages from [input] |
| (...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 88 throw new ArgumentError("Either `$inputName` must be a StreamSink or " | 89 throw new ArgumentError("Either `$inputName` must be a StreamSink or " |
| 89 "`$outputName` must be passed."); | 90 "`$outputName` must be passed."); |
| 90 } | 91 } |
| 91 } | 92 } |
| 92 | 93 |
| 93 /// Starts listening to the input stream. | 94 /// Starts listening to the input stream. |
| 94 /// | 95 /// |
| 95 /// The returned Future will complete when the input stream is closed. If the | 96 /// The returned Future will complete when the input stream is closed. If the |
| 96 /// input stream emits an error, that will be piped to the returned Future. | 97 /// input stream emits an error, that will be piped to the returned Future. |
| 97 Future listen(void handleInput(input)) { | 98 Future listen(void handleInput(input)) { |
| 98 if (_listenCompleter != null) { | 99 if (_inputSubscription != null) { |
| 99 throw new StateError("Can only call $_name.listen once."); | 100 throw new StateError("Can only call $_name.listen once."); |
| 100 } | 101 } |
| 101 | 102 |
| 102 _listenCompleter = new Completer(); | |
| 103 _inputSubscription = _input.listen(handleInput, | 103 _inputSubscription = _input.listen(handleInput, |
| 104 onError: (error, stackTrace) { | 104 onError: (error, stackTrace) { |
| 105 if (_listenCompleter.isCompleted) return; | 105 if (_doneCompleter.isCompleted) return; |
| 106 _output.close(); | 106 _output.close(); |
| 107 _listenCompleter.completeError(error, stackTrace); | 107 _doneCompleter.completeError(error, stackTrace); |
| 108 }, onDone: () { | 108 }, onDone: () { |
| 109 if (_listenCompleter.isCompleted) return; | 109 if (_doneCompleter.isCompleted) return; |
| 110 _output.close(); | 110 _output.close(); |
| 111 _listenCompleter.complete(); | 111 _doneCompleter.complete(); |
| 112 }, cancelOnError: true); | 112 }, cancelOnError: true); |
| 113 | 113 |
| 114 return _listenCompleter.future; | 114 return _doneCompleter.future; |
| 115 } | 115 } |
| 116 | 116 |
| 117 /// Emit [event] on the output stream. | 117 /// Emit [event] on the output stream. |
| 118 void add(event) => _output.add(event); | 118 void add(event) => _output.add(event); |
| 119 | 119 |
| 120 /// Stops listening to the input stream and closes the output stream. | 120 /// Stops listening to the input stream and closes the output stream. |
| 121 Future close() { | 121 Future close() { |
| 122 if (_listenCompleter == null) { | 122 if (_inputSubscription == null) { |
| 123 throw new StateError("Can't call $_name.close before $_name.listen."); | 123 throw new StateError("Can't call $_name.close before $_name.listen."); |
| 124 } | 124 } |
| 125 | 125 |
| 126 if (!_listenCompleter.isCompleted) _listenCompleter.complete(); | 126 if (!_doneCompleter.isCompleted) _doneCompleter.complete(); |
| 127 | 127 |
| 128 var inputFuture = _inputSubscription.cancel(); | 128 var inputFuture = _inputSubscription.cancel(); |
| 129 // TODO(nweiz): include the output future in the return value when issue | 129 // TODO(nweiz): include the output future in the return value when issue |
| 130 // 19095 is fixed. | 130 // 19095 is fixed. |
| 131 _output.close(); | 131 _output.close(); |
| 132 return inputFuture == null ? new Future.value() : inputFuture; | 132 return inputFuture == null ? new Future.value() : inputFuture; |
| 133 } | 133 } |
| 134 } | 134 } |
| OLD | NEW |