Index: pkg/json_rpc_2/lib/src/two_way_stream.dart |
diff --git a/pkg/json_rpc_2/lib/src/two_way_stream.dart b/pkg/json_rpc_2/lib/src/two_way_stream.dart |
new file mode 100644 |
index 0000000000000000000000000000000000000000..f470c2fcec6f024d34c15a1f015b2848050ae089 |
--- /dev/null |
+++ b/pkg/json_rpc_2/lib/src/two_way_stream.dart |
@@ -0,0 +1,134 @@ |
+// Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file |
+// for details. All rights reserved. Use of this source code is governed by a |
+// BSD-style license that can be found in the LICENSE file. |
+ |
+library json_rpc_2.two_way_stream; |
+ |
+import 'dart:async'; |
+import 'dart:convert'; |
+ |
+import 'utils.dart'; |
+ |
+/// A class for managing a stream of input messages and a sink for output |
+/// messages. |
+/// |
+/// This contains stream logic that's shared between [Server] and [Client]. |
+class TwoWayStream { |
+ /// The name of the component whose streams are being managed (e.g. "Server"). |
+ /// |
+ /// Used for error reporting. |
+ final String _name; |
+ |
+ /// The input stream. |
+ /// |
+ /// This is a stream of decoded JSON objects. |
+ final Stream _input; |
+ |
+ /// The subscription to [_input]. |
+ StreamSubscription _inputSubscription; |
+ |
+ /// The output sink. |
+ /// |
+ /// This takes decoded JSON objects. |
+ final StreamSink _output; |
+ |
+ /// The completer for [listen]. |
+ /// |
+ /// This is non-`null` after [listen] has been called. |
+ Completer _listenCompleter; |
+ |
+ /// Creates a two-way stream. |
+ /// |
+ /// [input] and [output] should emit and take (respectively) JSON-encoded |
+ /// strings. |
+ /// |
+ /// [inputName] is used in error messages as the name of the input parameter. |
+ /// [outputName] is likewise used as the name of the output parameter. |
+ /// |
+ /// If [onInvalidInput] is passed, any errors parsing messages from [input] |
+ /// are passed to it. Otherwise, they're ignored and the input is discarded. |
+ factory TwoWayStream(String name, Stream<String> input, String inputName, |
+ StreamSink<String> output, String outputName, |
+ {void onInvalidInput(String message, FormatException error)}) { |
+ if (output == null) { |
+ if (input is! StreamSink) { |
+ throw new ArgumentError("Either `$inputName` must be a StreamSink or " |
+ "`$outputName` must be passed."); |
+ } |
+ output = input as StreamSink; |
+ } |
+ |
+ var wrappedOutput = mapStreamSink(output, JSON.encode); |
+ return new TwoWayStream.withoutJson(name, input.expand((message) { |
+ var decodedMessage; |
+ try { |
+ decodedMessage = JSON.decode(message); |
+ } on FormatException catch (error) { |
+ if (onInvalidInput != null) onInvalidInput(message, error); |
+ return []; |
+ } |
+ |
+ return [decodedMessage]; |
+ }), inputName, wrappedOutput, outputName); |
+ } |
+ |
+ /// Creates a two-way stream that reads decoded input and writes decoded |
+ /// responses. |
+ /// |
+ /// [input] and [output] should emit and take (respectively) decoded JSON |
+ /// objects. |
+ /// |
+ /// [inputName] is used in error messages as the name of the input parameter. |
+ /// [outputName] is likewise used as the name of the output parameter. |
+ TwoWayStream.withoutJson(this._name, Stream input, String inputName, |
+ StreamSink output, String outputName) |
+ : _input = input, |
+ _output = output == null && input is StreamSink ? input : output { |
+ if (_output == null) { |
+ throw new ArgumentError("Either `$inputName` must be a StreamSink or " |
+ "`$outputName` must be passed."); |
+ } |
+ } |
+ |
+ /// Starts listening to the input stream. |
+ /// |
+ /// The returned Future will complete when the input stream is closed. If the |
+ /// input stream emits an error, that will be piped to the returned Future. |
+ Future listen(void handleInput(input)) { |
+ if (_listenCompleter != null) { |
+ throw new StateError("Can only call $_name.listen once."); |
+ } |
+ |
+ _listenCompleter = new Completer(); |
+ _inputSubscription = _input.listen(handleInput, |
+ onError: (error, stackTrace) { |
+ if (_listenCompleter.isCompleted) return; |
+ _output.close(); |
+ _listenCompleter.completeError(error, stackTrace); |
+ }, onDone: () { |
+ if (_listenCompleter.isCompleted) return; |
+ _output.close(); |
+ _listenCompleter.complete(); |
+ }, cancelOnError: true); |
+ |
+ return _listenCompleter.future; |
+ } |
+ |
+ /// Emit [event] on the output stream. |
+ void add(event) => _output.add(event); |
+ |
+ /// Stops listening to the input stream and closes the output stream. |
+ Future close() { |
+ if (_listenCompleter == null) { |
+ throw new StateError("Can't call $_name.close before $_name.listen."); |
+ } |
+ |
+ if (!_listenCompleter.isCompleted) _listenCompleter.complete(); |
+ |
+ var inputFuture = _inputSubscription.cancel(); |
+ // TODO(nweiz): include the output future in the return value when issue |
+ // 19095 is fixed. |
+ _output.close(); |
+ return inputFuture == null ? new Future.value() : inputFuture; |
+ } |
+} |