Index: lib/src/two_way_stream.dart |
diff --git a/lib/src/two_way_stream.dart b/lib/src/two_way_stream.dart |
deleted file mode 100644 |
index 4f20686b5ff80d08625146f4767395f675eacd4c..0000000000000000000000000000000000000000 |
--- a/lib/src/two_way_stream.dart |
+++ /dev/null |
@@ -1,135 +0,0 @@ |
-// Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file |
-// for details. All rights reserved. Use of this source code is governed by a |
-// BSD-style license that can be found in the LICENSE file. |
- |
-import 'dart:async'; |
-import 'dart:convert'; |
- |
-import 'utils.dart'; |
- |
-/// A class for managing a stream of input messages and a sink for output |
-/// messages. |
-/// |
-/// This contains stream logic that's shared between [Server] and [Client]. |
-class TwoWayStream { |
- /// The name of the component whose streams are being managed (e.g. "Server"). |
- /// |
- /// Used for error reporting. |
- final String _name; |
- |
- /// The input stream. |
- /// |
- /// This is a stream of decoded JSON objects. |
- final Stream _input; |
- |
- /// The subscription to [_input]. |
- StreamSubscription _inputSubscription; |
- |
- /// The output sink. |
- /// |
- /// This takes decoded JSON objects. |
- final StreamSink _output; |
- |
- /// Returns a [Future] that completes when the connection is closed. |
- /// |
- /// This is the same future that's returned by [listen]. |
- Future get done => _doneCompleter.future; |
- final _doneCompleter = new Completer(); |
- |
- /// Whether the stream has been closed. |
- bool get isClosed => _doneCompleter.isCompleted; |
- |
- /// Creates a two-way stream. |
- /// |
- /// [input] and [output] should emit and take (respectively) JSON-encoded |
- /// strings. |
- /// |
- /// [inputName] is used in error messages as the name of the input parameter. |
- /// [outputName] is likewise used as the name of the output parameter. |
- /// |
- /// If [onInvalidInput] is passed, any errors parsing messages from [input] |
- /// are passed to it. Otherwise, they're ignored and the input is discarded. |
- factory TwoWayStream(String name, Stream<String> input, String inputName, |
- StreamSink<String> output, String outputName, |
- {void onInvalidInput(String message, FormatException error)}) { |
- if (output == null) { |
- if (input is! StreamSink) { |
- throw new ArgumentError("Either `$inputName` must be a StreamSink or " |
- "`$outputName` must be passed."); |
- } |
- output = input as StreamSink; |
- } |
- |
- var wrappedOutput = mapStreamSink(output, JSON.encode); |
- return new TwoWayStream.withoutJson(name, input.expand((message) { |
- var decodedMessage; |
- try { |
- decodedMessage = JSON.decode(message); |
- } on FormatException catch (error) { |
- if (onInvalidInput != null) onInvalidInput(message, error); |
- return []; |
- } |
- |
- return [decodedMessage]; |
- }), inputName, wrappedOutput, outputName); |
- } |
- |
- /// Creates a two-way stream that reads decoded input and writes decoded |
- /// responses. |
- /// |
- /// [input] and [output] should emit and take (respectively) decoded JSON |
- /// objects. |
- /// |
- /// [inputName] is used in error messages as the name of the input parameter. |
- /// [outputName] is likewise used as the name of the output parameter. |
- TwoWayStream.withoutJson(this._name, Stream input, String inputName, |
- StreamSink output, String outputName) |
- : _input = input, |
- _output = output == null && input is StreamSink ? input : output { |
- if (_output == null) { |
- throw new ArgumentError("Either `$inputName` must be a StreamSink or " |
- "`$outputName` must be passed."); |
- } |
- } |
- |
- /// Starts listening to the input stream. |
- /// |
- /// The returned Future will complete when the input stream is closed. If the |
- /// input stream emits an error, that will be piped to the returned Future. |
- Future listen(void handleInput(input)) { |
- if (_inputSubscription != null) { |
- throw new StateError("Can only call $_name.listen once."); |
- } |
- |
- _inputSubscription = _input.listen(handleInput, |
- onError: (error, stackTrace) { |
- if (_doneCompleter.isCompleted) return; |
- _doneCompleter.completeError(error, stackTrace); |
- _output.close(); |
- }, onDone: () { |
- if (_doneCompleter.isCompleted) return; |
- _doneCompleter.complete(); |
- _output.close(); |
- }, cancelOnError: true); |
- |
- return _doneCompleter.future; |
- } |
- |
- /// Emit [event] on the output stream. |
- void add(event) => _output.add(event); |
- |
- /// Stops listening to the input stream and closes the output stream. |
- Future close() { |
- if (_inputSubscription == null) { |
- throw new StateError("Can't call $_name.close before $_name.listen."); |
- } |
- |
- if (!_doneCompleter.isCompleted) _doneCompleter.complete(); |
- |
- var inputFuture = _inputSubscription.cancel(); |
- // TODO(nweiz): include the output future in the return value when issue |
- // 19095 is fixed. |
- _output.close(); |
- return inputFuture == null ? new Future.value() : inputFuture; |
- } |
-} |