OLD | NEW |
(Empty) | |
| 1 // Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. |
| 4 |
| 5 library json_rpc_2.two_way_stream; |
| 6 |
| 7 import 'dart:async'; |
| 8 import 'dart:convert'; |
| 9 |
| 10 import 'utils.dart'; |
| 11 |
| 12 /// A class for managing a stream of input messages and a sink for output |
| 13 /// messages. |
| 14 /// |
| 15 /// This contains stream logic that's shared between [Server] and [Client]. |
| 16 class TwoWayStream { |
| 17 /// The name of the component whose streams are being managed (e.g. "Server"). |
| 18 /// |
| 19 /// Used for error reporting. |
| 20 final String _name; |
| 21 |
| 22 /// The input stream. |
| 23 /// |
| 24 /// This is a stream of decoded JSON objects. |
| 25 final Stream _input; |
| 26 |
| 27 /// The subscription to [_input]. |
| 28 StreamSubscription _inputSubscription; |
| 29 |
| 30 /// The output sink. |
| 31 /// |
| 32 /// This takes decoded JSON objects. |
| 33 final StreamSink _output; |
| 34 |
| 35 /// The completer for [listen]. |
| 36 /// |
| 37 /// This is non-`null` after [listen] has been called. |
| 38 Completer _listenCompleter; |
| 39 |
| 40 /// Creates a two-way stream. |
| 41 /// |
| 42 /// [input] and [output] should emit and take (respectively) JSON-encoded |
| 43 /// strings. |
| 44 /// |
| 45 /// [inputName] is used in error messages as the name of the input parameter. |
| 46 /// [outputName] is likewise used as the name of the output parameter. |
| 47 /// |
| 48 /// If [onInvalidInput] is passed, any errors parsing messages from [input] |
| 49 /// are passed to it. Otherwise, they're ignored and the input is discarded. |
| 50 factory TwoWayStream(String name, Stream<String> input, String inputName, |
| 51 StreamSink<String> output, String outputName, |
| 52 {void onInvalidInput(String message, FormatException error)}) { |
| 53 if (output == null) { |
| 54 if (input is! StreamSink) { |
| 55 throw new ArgumentError("Either `$inputName` must be a StreamSink or " |
| 56 "`$outputName` must be passed."); |
| 57 } |
| 58 output = input as StreamSink; |
| 59 } |
| 60 |
| 61 var wrappedOutput = mapStreamSink(output, JSON.encode); |
| 62 return new TwoWayStream.withoutJson(name, input.expand((message) { |
| 63 var decodedMessage; |
| 64 try { |
| 65 decodedMessage = JSON.decode(message); |
| 66 } on FormatException catch (error) { |
| 67 if (onInvalidInput != null) onInvalidInput(message, error); |
| 68 return []; |
| 69 } |
| 70 |
| 71 return [decodedMessage]; |
| 72 }), inputName, wrappedOutput, outputName); |
| 73 } |
| 74 |
| 75 /// Creates a two-way stream that reads decoded input and writes decoded |
| 76 /// responses. |
| 77 /// |
| 78 /// [input] and [output] should emit and take (respectively) decoded JSON |
| 79 /// objects. |
| 80 /// |
| 81 /// [inputName] is used in error messages as the name of the input parameter. |
| 82 /// [outputName] is likewise used as the name of the output parameter. |
| 83 TwoWayStream.withoutJson(this._name, Stream input, String inputName, |
| 84 StreamSink output, String outputName) |
| 85 : _input = input, |
| 86 _output = output == null && input is StreamSink ? input : output { |
| 87 if (_output == null) { |
| 88 throw new ArgumentError("Either `$inputName` must be a StreamSink or " |
| 89 "`$outputName` must be passed."); |
| 90 } |
| 91 } |
| 92 |
| 93 /// Starts listening to the input stream. |
| 94 /// |
| 95 /// The returned Future will complete when the input stream is closed. If the |
| 96 /// input stream emits an error, that will be piped to the returned Future. |
| 97 Future listen(void handleInput(input)) { |
| 98 if (_listenCompleter != null) { |
| 99 throw new StateError("Can only call $_name.listen once."); |
| 100 } |
| 101 |
| 102 _listenCompleter = new Completer(); |
| 103 _inputSubscription = _input.listen(handleInput, |
| 104 onError: (error, stackTrace) { |
| 105 if (_listenCompleter.isCompleted) return; |
| 106 _output.close(); |
| 107 _listenCompleter.completeError(error, stackTrace); |
| 108 }, onDone: () { |
| 109 if (_listenCompleter.isCompleted) return; |
| 110 _output.close(); |
| 111 _listenCompleter.complete(); |
| 112 }, cancelOnError: true); |
| 113 |
| 114 return _listenCompleter.future; |
| 115 } |
| 116 |
| 117 /// Emit [event] on the output stream. |
| 118 void add(event) => _output.add(event); |
| 119 |
| 120 /// Stops listening to the input stream and closes the output stream. |
| 121 Future close() { |
| 122 if (_listenCompleter == null) { |
| 123 throw new StateError("Can't call $_name.close before $_name.listen."); |
| 124 } |
| 125 |
| 126 if (!_listenCompleter.isCompleted) _listenCompleter.complete(); |
| 127 |
| 128 var inputFuture = _inputSubscription.cancel(); |
| 129 // TODO(nweiz): include the output future in the return value when issue |
| 130 // 19095 is fixed. |
| 131 _output.close(); |
| 132 return inputFuture == null ? new Future.value() : inputFuture; |
| 133 } |
| 134 } |
OLD | NEW |