Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(393)

Unified Diff: lib/src/two_way_stream.dart

Issue 1652413002: Use StreamChannel. (Closed) Base URL: git@github.com:dart-lang/json_rpc_2.git@master
Patch Set: Code review changes Created 4 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « lib/src/server.dart ('k') | lib/src/utils.dart » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: lib/src/two_way_stream.dart
diff --git a/lib/src/two_way_stream.dart b/lib/src/two_way_stream.dart
deleted file mode 100644
index 4f20686b5ff80d08625146f4767395f675eacd4c..0000000000000000000000000000000000000000
--- a/lib/src/two_way_stream.dart
+++ /dev/null
@@ -1,135 +0,0 @@
-// Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file
-// for details. All rights reserved. Use of this source code is governed by a
-// BSD-style license that can be found in the LICENSE file.
-
-import 'dart:async';
-import 'dart:convert';
-
-import 'utils.dart';
-
-/// A class for managing a stream of input messages and a sink for output
-/// messages.
-///
-/// This contains stream logic that's shared between [Server] and [Client].
-class TwoWayStream {
- /// The name of the component whose streams are being managed (e.g. "Server").
- ///
- /// Used for error reporting.
- final String _name;
-
- /// The input stream.
- ///
- /// This is a stream of decoded JSON objects.
- final Stream _input;
-
- /// The subscription to [_input].
- StreamSubscription _inputSubscription;
-
- /// The output sink.
- ///
- /// This takes decoded JSON objects.
- final StreamSink _output;
-
- /// Returns a [Future] that completes when the connection is closed.
- ///
- /// This is the same future that's returned by [listen].
- Future get done => _doneCompleter.future;
- final _doneCompleter = new Completer();
-
- /// Whether the stream has been closed.
- bool get isClosed => _doneCompleter.isCompleted;
-
- /// Creates a two-way stream.
- ///
- /// [input] and [output] should emit and take (respectively) JSON-encoded
- /// strings.
- ///
- /// [inputName] is used in error messages as the name of the input parameter.
- /// [outputName] is likewise used as the name of the output parameter.
- ///
- /// If [onInvalidInput] is passed, any errors parsing messages from [input]
- /// are passed to it. Otherwise, they're ignored and the input is discarded.
- factory TwoWayStream(String name, Stream<String> input, String inputName,
- StreamSink<String> output, String outputName,
- {void onInvalidInput(String message, FormatException error)}) {
- if (output == null) {
- if (input is! StreamSink) {
- throw new ArgumentError("Either `$inputName` must be a StreamSink or "
- "`$outputName` must be passed.");
- }
- output = input as StreamSink;
- }
-
- var wrappedOutput = mapStreamSink(output, JSON.encode);
- return new TwoWayStream.withoutJson(name, input.expand((message) {
- var decodedMessage;
- try {
- decodedMessage = JSON.decode(message);
- } on FormatException catch (error) {
- if (onInvalidInput != null) onInvalidInput(message, error);
- return [];
- }
-
- return [decodedMessage];
- }), inputName, wrappedOutput, outputName);
- }
-
- /// Creates a two-way stream that reads decoded input and writes decoded
- /// responses.
- ///
- /// [input] and [output] should emit and take (respectively) decoded JSON
- /// objects.
- ///
- /// [inputName] is used in error messages as the name of the input parameter.
- /// [outputName] is likewise used as the name of the output parameter.
- TwoWayStream.withoutJson(this._name, Stream input, String inputName,
- StreamSink output, String outputName)
- : _input = input,
- _output = output == null && input is StreamSink ? input : output {
- if (_output == null) {
- throw new ArgumentError("Either `$inputName` must be a StreamSink or "
- "`$outputName` must be passed.");
- }
- }
-
- /// Starts listening to the input stream.
- ///
- /// The returned Future will complete when the input stream is closed. If the
- /// input stream emits an error, that will be piped to the returned Future.
- Future listen(void handleInput(input)) {
- if (_inputSubscription != null) {
- throw new StateError("Can only call $_name.listen once.");
- }
-
- _inputSubscription = _input.listen(handleInput,
- onError: (error, stackTrace) {
- if (_doneCompleter.isCompleted) return;
- _doneCompleter.completeError(error, stackTrace);
- _output.close();
- }, onDone: () {
- if (_doneCompleter.isCompleted) return;
- _doneCompleter.complete();
- _output.close();
- }, cancelOnError: true);
-
- return _doneCompleter.future;
- }
-
- /// Emit [event] on the output stream.
- void add(event) => _output.add(event);
-
- /// Stops listening to the input stream and closes the output stream.
- Future close() {
- if (_inputSubscription == null) {
- throw new StateError("Can't call $_name.close before $_name.listen.");
- }
-
- if (!_doneCompleter.isCompleted) _doneCompleter.complete();
-
- var inputFuture = _inputSubscription.cancel();
- // TODO(nweiz): include the output future in the return value when issue
- // 19095 is fixed.
- _output.close();
- return inputFuture == null ? new Future.value() : inputFuture;
- }
-}
« no previous file with comments | « lib/src/server.dart ('k') | lib/src/utils.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698