Chromium Code Reviews| Index: pkg/json_rpc_2/lib/src/stream_manager.dart |
| diff --git a/pkg/json_rpc_2/lib/src/stream_manager.dart b/pkg/json_rpc_2/lib/src/stream_manager.dart |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..ae481434246e9b6ac9c3437430ca6f826759344c |
| --- /dev/null |
| +++ b/pkg/json_rpc_2/lib/src/stream_manager.dart |
| @@ -0,0 +1,125 @@ |
| +// Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file |
| +// for details. All rights reserved. Use of this source code is governed by a |
| +// BSD-style license that can be found in the LICENSE file. |
| + |
| +library json_rpc_2.stream_manager; |
| + |
| +import 'dart:async'; |
| +import 'dart:convert'; |
| + |
| +import 'utils.dart'; |
| + |
| +/// A class for managing a streams of input messages and a sink for output |
|
Bob Nystrom
2014/06/26 21:40:27
"streams" -> "stream".
nweiz
2014/06/26 22:55:47
Done.
|
| +/// messages. |
| +/// |
| +/// This contains stream logic that's shared between [Server] and [Client]. |
| +class StreamManager { |
|
Bob Nystrom
2014/06/13 17:58:04
"manager" doesn't mean anything, and it's hard to
nweiz
2014/06/26 20:43:57
This class is really just "functionality shared be
Bob Nystrom
2014/06/26 21:40:27
How about "BidirectionalStream"? Although that's h
nweiz
2014/06/26 22:55:47
I think "Multiplexer" strongly implies combining a
|
| + /// The name of the component whose streams are being managed (e.g. "Server"). |
| + /// |
| + /// Used for error reporting. |
| + final String _name; |
| + |
| + /// The input stream. |
| + /// |
| + /// This is a stream of decoded JSON objects. |
| + final Stream _input; |
|
Bob Nystrom
2014/06/13 17:58:04
How about Stream<Object>?
nweiz
2014/06/26 20:43:58
This is basically a union type--not every type is
Bob Nystrom
2014/06/26 21:40:27
SGTM.
|
| + |
| + /// The subscription to [_input]. |
| + StreamSubscription _inputSubscription; |
| + |
| + /// The output sink. |
| + /// |
| + /// This takes decoded JSON objects. |
| + final StreamSink _output; |
|
Bob Nystrom
2014/06/13 17:58:04
Likewise StreamSink<Object>.
|
| + |
| + /// The completer for [listen]. |
| + /// |
| + /// This is non-`null` after [listen] has been called. |
| + Completer _listenCompleter; |
| + |
| + /// Creates a stream manager. |
| + /// |
|
Bob Nystrom
2014/06/13 17:58:04
Document input and output, in particular that the
nweiz
2014/06/26 20:43:57
Done.
|
| + /// [inputName] is used in error messages as the name of the input parameter. |
| + /// [outputName] is likewise used as the name of the output parameter. |
| + /// |
| + /// If [onInvalidInput] is passed, any errors parsing messages from [input] |
| + /// are passed to it. Otherwise, they're ignored and the input is discarded. |
| + factory StreamManager(String name, Stream<String> input, String inputName, |
| + StreamSink<String> output, String outputName, |
| + {void onInvalidInput(String message, FormatException error)}) { |
| + if (output == null) { |
| + if (input is! StreamSink) { |
| + throw new ArgumentError("Either `$inputName` must be a StreamSink or " |
| + "`$outputName` must be passed."); |
|
Bob Nystrom
2014/06/13 17:58:04
Instead of this conditional logic, how about addin
nweiz
2014/06/26 20:43:58
I think having a single method is cleaner and easi
|
| + } |
| + output = input as StreamSink; |
| + } |
| + |
| + var wrappedOutput = mapStreamSink(output, JSON.encode); |
| + return new StreamManager.withoutJson(name, input.expand((message) { |
| + var decodedMessage; |
| + try { |
| + decodedMessage = JSON.decode(message); |
| + } on FormatException catch (error) { |
| + if (onInvalidInput != null) onInvalidInput(message, error); |
| + return []; |
| + } |
| + |
| + return [decodedMessage]; |
| + }), inputName, wrappedOutput, outputName); |
| + } |
| + |
| + /// Creates a stream manager that reads decoded input and writes decoded |
| + /// responses. |
|
Bob Nystrom
2014/06/13 17:58:04
Document input and output.
nweiz
2014/06/26 20:43:58
Done.
|
| + /// |
| + /// [inputName] is used in error messages as the name of the input parameter. |
| + /// [outputName] is likewise used as the name of the output parameter. |
| + StreamManager.withoutJson(this._name, Stream input, String inputName, |
|
Bob Nystrom
2014/06/13 17:58:04
Stream<Object> and StreamSink<Object>?
|
| + StreamSink output, String outputName) |
| + : _input = input, |
| + _output = output == null && input is StreamSink ? input : output { |
| + if (_output == null) { |
| + throw new ArgumentError("Either `$inputName` must be a StreamSink or " |
| + "`$outputName` must be passed."); |
| + } |
| + } |
| + |
| + /// Starts listening to the input stream. |
|
Bob Nystrom
2014/06/13 17:58:04
Document that an error on the stream is piped to t
nweiz
2014/06/26 20:43:58
Done.
|
| + Future listen(void handleInput(input)) { |
| + if (_listenCompleter != null) { |
| + throw new StateError("Can only call $_name.listen once."); |
| + } |
| + |
| + _listenCompleter = new Completer(); |
| + _inputSubscription = _input.listen(handleInput, |
| + onError: (error, stackTrace) { |
| + if (_listenCompleter.isCompleted) return; |
| + _output.close(); |
| + _listenCompleter.completeError(error, stackTrace); |
| + }, onDone: () { |
| + if (_listenCompleter.isCompleted) return; |
| + _output.close(); |
| + _listenCompleter.complete(); |
| + }, cancelOnError: true); |
| + |
| + return _listenCompleter.future; |
| + } |
| + |
| + /// Emit [event] on the output stream. |
| + void add(event) => _output.add(event); |
| + |
| + /// Stops listening to the input stream and closes the output stream. |
| + Future close() { |
| + if (_listenCompleter == null) { |
| + throw new StateError("Can't call $_name.close before $_name.listen."); |
| + } |
| + |
| + if (!_listenCompleter.isCompleted) _listenCompleter.complete(); |
| + |
| + var inputFuture = _inputSubscription.cancel(); |
| + // TODO(nweiz): include the output future in the return value when issue |
| + // 19095 is fixed. |
| + _output.close(); |
| + return inputFuture == null ? new Future.value() : inputFuture; |
| + } |
| +} |