| OLD | NEW |
| (Empty) |
| 1 // Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file | |
| 2 // for details. All rights reserved. Use of this source code is governed by a | |
| 3 // BSD-style license that can be found in the LICENSE file. | |
| 4 | |
| 5 import 'dart:async'; | |
| 6 import 'dart:convert'; | |
| 7 | |
| 8 import 'utils.dart'; | |
| 9 | |
| 10 /// A class for managing a stream of input messages and a sink for output | |
| 11 /// messages. | |
| 12 /// | |
| 13 /// This contains stream logic that's shared between [Server] and [Client]. | |
| 14 class TwoWayStream { | |
| 15 /// The name of the component whose streams are being managed (e.g. "Server"). | |
| 16 /// | |
| 17 /// Used for error reporting. | |
| 18 final String _name; | |
| 19 | |
| 20 /// The input stream. | |
| 21 /// | |
| 22 /// This is a stream of decoded JSON objects. | |
| 23 final Stream _input; | |
| 24 | |
| 25 /// The subscription to [_input]. | |
| 26 StreamSubscription _inputSubscription; | |
| 27 | |
| 28 /// The output sink. | |
| 29 /// | |
| 30 /// This takes decoded JSON objects. | |
| 31 final StreamSink _output; | |
| 32 | |
| 33 /// Returns a [Future] that completes when the connection is closed. | |
| 34 /// | |
| 35 /// This is the same future that's returned by [listen]. | |
| 36 Future get done => _doneCompleter.future; | |
| 37 final _doneCompleter = new Completer(); | |
| 38 | |
| 39 /// Whether the stream has been closed. | |
| 40 bool get isClosed => _doneCompleter.isCompleted; | |
| 41 | |
| 42 /// Creates a two-way stream. | |
| 43 /// | |
| 44 /// [input] and [output] should emit and take (respectively) JSON-encoded | |
| 45 /// strings. | |
| 46 /// | |
| 47 /// [inputName] is used in error messages as the name of the input parameter. | |
| 48 /// [outputName] is likewise used as the name of the output parameter. | |
| 49 /// | |
| 50 /// If [onInvalidInput] is passed, any errors parsing messages from [input] | |
| 51 /// are passed to it. Otherwise, they're ignored and the input is discarded. | |
| 52 factory TwoWayStream(String name, Stream<String> input, String inputName, | |
| 53 StreamSink<String> output, String outputName, | |
| 54 {void onInvalidInput(String message, FormatException error)}) { | |
| 55 if (output == null) { | |
| 56 if (input is! StreamSink) { | |
| 57 throw new ArgumentError("Either `$inputName` must be a StreamSink or " | |
| 58 "`$outputName` must be passed."); | |
| 59 } | |
| 60 output = input as StreamSink; | |
| 61 } | |
| 62 | |
| 63 var wrappedOutput = mapStreamSink(output, JSON.encode); | |
| 64 return new TwoWayStream.withoutJson(name, input.expand((message) { | |
| 65 var decodedMessage; | |
| 66 try { | |
| 67 decodedMessage = JSON.decode(message); | |
| 68 } on FormatException catch (error) { | |
| 69 if (onInvalidInput != null) onInvalidInput(message, error); | |
| 70 return []; | |
| 71 } | |
| 72 | |
| 73 return [decodedMessage]; | |
| 74 }), inputName, wrappedOutput, outputName); | |
| 75 } | |
| 76 | |
| 77 /// Creates a two-way stream that reads decoded input and writes decoded | |
| 78 /// responses. | |
| 79 /// | |
| 80 /// [input] and [output] should emit and take (respectively) decoded JSON | |
| 81 /// objects. | |
| 82 /// | |
| 83 /// [inputName] is used in error messages as the name of the input parameter. | |
| 84 /// [outputName] is likewise used as the name of the output parameter. | |
| 85 TwoWayStream.withoutJson(this._name, Stream input, String inputName, | |
| 86 StreamSink output, String outputName) | |
| 87 : _input = input, | |
| 88 _output = output == null && input is StreamSink ? input : output { | |
| 89 if (_output == null) { | |
| 90 throw new ArgumentError("Either `$inputName` must be a StreamSink or " | |
| 91 "`$outputName` must be passed."); | |
| 92 } | |
| 93 } | |
| 94 | |
| 95 /// Starts listening to the input stream. | |
| 96 /// | |
| 97 /// The returned Future will complete when the input stream is closed. If the | |
| 98 /// input stream emits an error, that will be piped to the returned Future. | |
| 99 Future listen(void handleInput(input)) { | |
| 100 if (_inputSubscription != null) { | |
| 101 throw new StateError("Can only call $_name.listen once."); | |
| 102 } | |
| 103 | |
| 104 _inputSubscription = _input.listen(handleInput, | |
| 105 onError: (error, stackTrace) { | |
| 106 if (_doneCompleter.isCompleted) return; | |
| 107 _doneCompleter.completeError(error, stackTrace); | |
| 108 _output.close(); | |
| 109 }, onDone: () { | |
| 110 if (_doneCompleter.isCompleted) return; | |
| 111 _doneCompleter.complete(); | |
| 112 _output.close(); | |
| 113 }, cancelOnError: true); | |
| 114 | |
| 115 return _doneCompleter.future; | |
| 116 } | |
| 117 | |
| 118 /// Emit [event] on the output stream. | |
| 119 void add(event) => _output.add(event); | |
| 120 | |
| 121 /// Stops listening to the input stream and closes the output stream. | |
| 122 Future close() { | |
| 123 if (_inputSubscription == null) { | |
| 124 throw new StateError("Can't call $_name.close before $_name.listen."); | |
| 125 } | |
| 126 | |
| 127 if (!_doneCompleter.isCompleted) _doneCompleter.complete(); | |
| 128 | |
| 129 var inputFuture = _inputSubscription.cancel(); | |
| 130 // TODO(nweiz): include the output future in the return value when issue | |
| 131 // 19095 is fixed. | |
| 132 _output.close(); | |
| 133 return inputFuture == null ? new Future.value() : inputFuture; | |
| 134 } | |
| 135 } | |
| OLD | NEW |