Index: pkg/json_rpc_2/lib/src/stream_manager.dart |
diff --git a/pkg/json_rpc_2/lib/src/stream_manager.dart b/pkg/json_rpc_2/lib/src/stream_manager.dart |
new file mode 100644 |
index 0000000000000000000000000000000000000000..ae481434246e9b6ac9c3437430ca6f826759344c |
--- /dev/null |
+++ b/pkg/json_rpc_2/lib/src/stream_manager.dart |
@@ -0,0 +1,125 @@ |
+// Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file |
+// for details. All rights reserved. Use of this source code is governed by a |
+// BSD-style license that can be found in the LICENSE file. |
+ |
+library json_rpc_2.stream_manager; |
+ |
+import 'dart:async'; |
+import 'dart:convert'; |
+ |
+import 'utils.dart'; |
+ |
+/// A class for managing a streams of input messages and a sink for output |
Bob Nystrom
2014/06/26 21:40:27
"streams" -> "stream".
nweiz
2014/06/26 22:55:47
Done.
|
+/// messages. |
+/// |
+/// This contains stream logic that's shared between [Server] and [Client]. |
+class StreamManager { |
Bob Nystrom
2014/06/13 17:58:04
"manager" doesn't mean anything, and it's hard to
nweiz
2014/06/26 20:43:57
This class is really just "functionality shared be
Bob Nystrom
2014/06/26 21:40:27
How about "BidirectionalStream"? Although that's h
nweiz
2014/06/26 22:55:47
I think "Multiplexer" strongly implies combining a
|
+ /// The name of the component whose streams are being managed (e.g. "Server"). |
+ /// |
+ /// Used for error reporting. |
+ final String _name; |
+ |
+ /// The input stream. |
+ /// |
+ /// This is a stream of decoded JSON objects. |
+ final Stream _input; |
Bob Nystrom
2014/06/13 17:58:04
How about Stream<Object>?
nweiz
2014/06/26 20:43:58
This is basically a union type--not every type is
Bob Nystrom
2014/06/26 21:40:27
SGTM.
|
+ |
+ /// The subscription to [_input]. |
+ StreamSubscription _inputSubscription; |
+ |
+ /// The output sink. |
+ /// |
+ /// This takes decoded JSON objects. |
+ final StreamSink _output; |
Bob Nystrom
2014/06/13 17:58:04
Likewise StreamSink<Object>.
|
+ |
+ /// The completer for [listen]. |
+ /// |
+ /// This is non-`null` after [listen] has been called. |
+ Completer _listenCompleter; |
+ |
+ /// Creates a stream manager. |
+ /// |
Bob Nystrom
2014/06/13 17:58:04
Document input and output, in particular that the
nweiz
2014/06/26 20:43:57
Done.
|
+ /// [inputName] is used in error messages as the name of the input parameter. |
+ /// [outputName] is likewise used as the name of the output parameter. |
+ /// |
+ /// If [onInvalidInput] is passed, any errors parsing messages from [input] |
+ /// are passed to it. Otherwise, they're ignored and the input is discarded. |
+ factory StreamManager(String name, Stream<String> input, String inputName, |
+ StreamSink<String> output, String outputName, |
+ {void onInvalidInput(String message, FormatException error)}) { |
+ if (output == null) { |
+ if (input is! StreamSink) { |
+ throw new ArgumentError("Either `$inputName` must be a StreamSink or " |
+ "`$outputName` must be passed."); |
Bob Nystrom
2014/06/13 17:58:04
Instead of this conditional logic, how about addin
nweiz
2014/06/26 20:43:58
I think having a single method is cleaner and easi
|
+ } |
+ output = input as StreamSink; |
+ } |
+ |
+ var wrappedOutput = mapStreamSink(output, JSON.encode); |
+ return new StreamManager.withoutJson(name, input.expand((message) { |
+ var decodedMessage; |
+ try { |
+ decodedMessage = JSON.decode(message); |
+ } on FormatException catch (error) { |
+ if (onInvalidInput != null) onInvalidInput(message, error); |
+ return []; |
+ } |
+ |
+ return [decodedMessage]; |
+ }), inputName, wrappedOutput, outputName); |
+ } |
+ |
+ /// Creates a stream manager that reads decoded input and writes decoded |
+ /// responses. |
Bob Nystrom
2014/06/13 17:58:04
Document input and output.
nweiz
2014/06/26 20:43:58
Done.
|
+ /// |
+ /// [inputName] is used in error messages as the name of the input parameter. |
+ /// [outputName] is likewise used as the name of the output parameter. |
+ StreamManager.withoutJson(this._name, Stream input, String inputName, |
Bob Nystrom
2014/06/13 17:58:04
Stream<Object> and StreamSink<Object>?
|
+ StreamSink output, String outputName) |
+ : _input = input, |
+ _output = output == null && input is StreamSink ? input : output { |
+ if (_output == null) { |
+ throw new ArgumentError("Either `$inputName` must be a StreamSink or " |
+ "`$outputName` must be passed."); |
+ } |
+ } |
+ |
+ /// Starts listening to the input stream. |
Bob Nystrom
2014/06/13 17:58:04
Document that an error on the stream is piped to t
nweiz
2014/06/26 20:43:58
Done.
|
+ Future listen(void handleInput(input)) { |
+ if (_listenCompleter != null) { |
+ throw new StateError("Can only call $_name.listen once."); |
+ } |
+ |
+ _listenCompleter = new Completer(); |
+ _inputSubscription = _input.listen(handleInput, |
+ onError: (error, stackTrace) { |
+ if (_listenCompleter.isCompleted) return; |
+ _output.close(); |
+ _listenCompleter.completeError(error, stackTrace); |
+ }, onDone: () { |
+ if (_listenCompleter.isCompleted) return; |
+ _output.close(); |
+ _listenCompleter.complete(); |
+ }, cancelOnError: true); |
+ |
+ return _listenCompleter.future; |
+ } |
+ |
+ /// Emit [event] on the output stream. |
+ void add(event) => _output.add(event); |
+ |
+ /// Stops listening to the input stream and closes the output stream. |
+ Future close() { |
+ if (_listenCompleter == null) { |
+ throw new StateError("Can't call $_name.close before $_name.listen."); |
+ } |
+ |
+ if (!_listenCompleter.isCompleted) _listenCompleter.complete(); |
+ |
+ var inputFuture = _inputSubscription.cancel(); |
+ // TODO(nweiz): include the output future in the return value when issue |
+ // 19095 is fixed. |
+ _output.close(); |
+ return inputFuture == null ? new Future.value() : inputFuture; |
+ } |
+} |