Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(398)

Unified Diff: pkg/json_rpc_2/lib/src/two_way_stream.dart

Issue 333683003: Extract out a StreamManager class from json_rpc.Server. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: code review Created 6 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « pkg/json_rpc_2/lib/src/server.dart ('k') | no next file » | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: pkg/json_rpc_2/lib/src/two_way_stream.dart
diff --git a/pkg/json_rpc_2/lib/src/two_way_stream.dart b/pkg/json_rpc_2/lib/src/two_way_stream.dart
new file mode 100644
index 0000000000000000000000000000000000000000..f470c2fcec6f024d34c15a1f015b2848050ae089
--- /dev/null
+++ b/pkg/json_rpc_2/lib/src/two_way_stream.dart
@@ -0,0 +1,134 @@
+// Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+library json_rpc_2.two_way_stream;
+
+import 'dart:async';
+import 'dart:convert';
+
+import 'utils.dart';
+
+/// A class for managing a stream of input messages and a sink for output
+/// messages.
+///
+/// This contains stream logic that's shared between [Server] and [Client].
+class TwoWayStream {
+ /// The name of the component whose streams are being managed (e.g. "Server").
+ ///
+ /// Used for error reporting.
+ final String _name;
+
+ /// The input stream.
+ ///
+ /// This is a stream of decoded JSON objects.
+ final Stream _input;
+
+ /// The subscription to [_input].
+ StreamSubscription _inputSubscription;
+
+ /// The output sink.
+ ///
+ /// This takes decoded JSON objects.
+ final StreamSink _output;
+
+ /// The completer for [listen].
+ ///
+ /// This is non-`null` after [listen] has been called.
+ Completer _listenCompleter;
+
+ /// Creates a two-way stream.
+ ///
+ /// [input] and [output] should emit and take (respectively) JSON-encoded
+ /// strings.
+ ///
+ /// [inputName] is used in error messages as the name of the input parameter.
+ /// [outputName] is likewise used as the name of the output parameter.
+ ///
+ /// If [onInvalidInput] is passed, any errors parsing messages from [input]
+ /// are passed to it. Otherwise, they're ignored and the input is discarded.
+ factory TwoWayStream(String name, Stream<String> input, String inputName,
+ StreamSink<String> output, String outputName,
+ {void onInvalidInput(String message, FormatException error)}) {
+ if (output == null) {
+ if (input is! StreamSink) {
+ throw new ArgumentError("Either `$inputName` must be a StreamSink or "
+ "`$outputName` must be passed.");
+ }
+ output = input as StreamSink;
+ }
+
+ var wrappedOutput = mapStreamSink(output, JSON.encode);
+ return new TwoWayStream.withoutJson(name, input.expand((message) {
+ var decodedMessage;
+ try {
+ decodedMessage = JSON.decode(message);
+ } on FormatException catch (error) {
+ if (onInvalidInput != null) onInvalidInput(message, error);
+ return [];
+ }
+
+ return [decodedMessage];
+ }), inputName, wrappedOutput, outputName);
+ }
+
+ /// Creates a two-way stream that reads decoded input and writes decoded
+ /// responses.
+ ///
+ /// [input] and [output] should emit and take (respectively) decoded JSON
+ /// objects.
+ ///
+ /// [inputName] is used in error messages as the name of the input parameter.
+ /// [outputName] is likewise used as the name of the output parameter.
+ TwoWayStream.withoutJson(this._name, Stream input, String inputName,
+ StreamSink output, String outputName)
+ : _input = input,
+ _output = output == null && input is StreamSink ? input : output {
+ if (_output == null) {
+ throw new ArgumentError("Either `$inputName` must be a StreamSink or "
+ "`$outputName` must be passed.");
+ }
+ }
+
+ /// Starts listening to the input stream.
+ ///
+ /// The returned Future will complete when the input stream is closed. If the
+ /// input stream emits an error, that will be piped to the returned Future.
+ Future listen(void handleInput(input)) {
+ if (_listenCompleter != null) {
+ throw new StateError("Can only call $_name.listen once.");
+ }
+
+ _listenCompleter = new Completer();
+ _inputSubscription = _input.listen(handleInput,
+ onError: (error, stackTrace) {
+ if (_listenCompleter.isCompleted) return;
+ _output.close();
+ _listenCompleter.completeError(error, stackTrace);
+ }, onDone: () {
+ if (_listenCompleter.isCompleted) return;
+ _output.close();
+ _listenCompleter.complete();
+ }, cancelOnError: true);
+
+ return _listenCompleter.future;
+ }
+
+ /// Emit [event] on the output stream.
+ void add(event) => _output.add(event);
+
+ /// Stops listening to the input stream and closes the output stream.
+ Future close() {
+ if (_listenCompleter == null) {
+ throw new StateError("Can't call $_name.close before $_name.listen.");
+ }
+
+ if (!_listenCompleter.isCompleted) _listenCompleter.complete();
+
+ var inputFuture = _inputSubscription.cancel();
+ // TODO(nweiz): include the output future in the return value when issue
+ // 19095 is fixed.
+ _output.close();
+ return inputFuture == null ? new Future.value() : inputFuture;
+ }
+}
« no previous file with comments | « pkg/json_rpc_2/lib/src/server.dart ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698