Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1316)

Side by Side Diff: lib/src/two_way_stream.dart

Issue 1652413002: Use StreamChannel. (Closed) Base URL: git@github.com:dart-lang/json_rpc_2.git@master
Patch Set: Code review changes Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « lib/src/server.dart ('k') | lib/src/utils.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file.
4
5 import 'dart:async';
6 import 'dart:convert';
7
8 import 'utils.dart';
9
10 /// A class for managing a stream of input messages and a sink for output
11 /// messages.
12 ///
13 /// This contains stream logic that's shared between [Server] and [Client].
14 class TwoWayStream {
15 /// The name of the component whose streams are being managed (e.g. "Server").
16 ///
17 /// Used for error reporting.
18 final String _name;
19
20 /// The input stream.
21 ///
22 /// This is a stream of decoded JSON objects.
23 final Stream _input;
24
25 /// The subscription to [_input].
26 StreamSubscription _inputSubscription;
27
28 /// The output sink.
29 ///
30 /// This takes decoded JSON objects.
31 final StreamSink _output;
32
33 /// Returns a [Future] that completes when the connection is closed.
34 ///
35 /// This is the same future that's returned by [listen].
36 Future get done => _doneCompleter.future;
37 final _doneCompleter = new Completer();
38
39 /// Whether the stream has been closed.
40 bool get isClosed => _doneCompleter.isCompleted;
41
42 /// Creates a two-way stream.
43 ///
44 /// [input] and [output] should emit and take (respectively) JSON-encoded
45 /// strings.
46 ///
47 /// [inputName] is used in error messages as the name of the input parameter.
48 /// [outputName] is likewise used as the name of the output parameter.
49 ///
50 /// If [onInvalidInput] is passed, any errors parsing messages from [input]
51 /// are passed to it. Otherwise, they're ignored and the input is discarded.
52 factory TwoWayStream(String name, Stream<String> input, String inputName,
53 StreamSink<String> output, String outputName,
54 {void onInvalidInput(String message, FormatException error)}) {
55 if (output == null) {
56 if (input is! StreamSink) {
57 throw new ArgumentError("Either `$inputName` must be a StreamSink or "
58 "`$outputName` must be passed.");
59 }
60 output = input as StreamSink;
61 }
62
63 var wrappedOutput = mapStreamSink(output, JSON.encode);
64 return new TwoWayStream.withoutJson(name, input.expand((message) {
65 var decodedMessage;
66 try {
67 decodedMessage = JSON.decode(message);
68 } on FormatException catch (error) {
69 if (onInvalidInput != null) onInvalidInput(message, error);
70 return [];
71 }
72
73 return [decodedMessage];
74 }), inputName, wrappedOutput, outputName);
75 }
76
77 /// Creates a two-way stream that reads decoded input and writes decoded
78 /// responses.
79 ///
80 /// [input] and [output] should emit and take (respectively) decoded JSON
81 /// objects.
82 ///
83 /// [inputName] is used in error messages as the name of the input parameter.
84 /// [outputName] is likewise used as the name of the output parameter.
85 TwoWayStream.withoutJson(this._name, Stream input, String inputName,
86 StreamSink output, String outputName)
87 : _input = input,
88 _output = output == null && input is StreamSink ? input : output {
89 if (_output == null) {
90 throw new ArgumentError("Either `$inputName` must be a StreamSink or "
91 "`$outputName` must be passed.");
92 }
93 }
94
95 /// Starts listening to the input stream.
96 ///
97 /// The returned Future will complete when the input stream is closed. If the
98 /// input stream emits an error, that will be piped to the returned Future.
99 Future listen(void handleInput(input)) {
100 if (_inputSubscription != null) {
101 throw new StateError("Can only call $_name.listen once.");
102 }
103
104 _inputSubscription = _input.listen(handleInput,
105 onError: (error, stackTrace) {
106 if (_doneCompleter.isCompleted) return;
107 _doneCompleter.completeError(error, stackTrace);
108 _output.close();
109 }, onDone: () {
110 if (_doneCompleter.isCompleted) return;
111 _doneCompleter.complete();
112 _output.close();
113 }, cancelOnError: true);
114
115 return _doneCompleter.future;
116 }
117
118 /// Emit [event] on the output stream.
119 void add(event) => _output.add(event);
120
121 /// Stops listening to the input stream and closes the output stream.
122 Future close() {
123 if (_inputSubscription == null) {
124 throw new StateError("Can't call $_name.close before $_name.listen.");
125 }
126
127 if (!_doneCompleter.isCompleted) _doneCompleter.complete();
128
129 var inputFuture = _inputSubscription.cancel();
130 // TODO(nweiz): include the output future in the return value when issue
131 // 19095 is fixed.
132 _output.close();
133 return inputFuture == null ? new Future.value() : inputFuture;
134 }
135 }
OLDNEW
« no previous file with comments | « lib/src/server.dart ('k') | lib/src/utils.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698