OLD | NEW |
---|---|
(Empty) | |
1 // Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file | |
2 // for details. All rights reserved. Use of this source code is governed by a | |
3 // BSD-style license that can be found in the LICENSE file. | |
4 | |
5 library json_rpc_2.stream_manager; | |
6 | |
7 import 'dart:async'; | |
8 import 'dart:convert'; | |
9 | |
10 import 'utils.dart'; | |
11 | |
12 /// A class for managing a streams of input messages and a sink for output | |
Bob Nystrom
2014/06/26 21:40:27
"streams" -> "stream".
nweiz
2014/06/26 22:55:47
Done.
| |
13 /// messages. | |
14 /// | |
15 /// This contains stream logic that's shared between [Server] and [Client]. | |
16 class StreamManager { | |
Bob Nystrom
2014/06/13 17:58:04
"manager" doesn't mean anything, and it's hard to
nweiz
2014/06/26 20:43:57
This class is really just "functionality shared be
Bob Nystrom
2014/06/26 21:40:27
How about "BidirectionalStream"? Although that's h
nweiz
2014/06/26 22:55:47
I think "Multiplexer" strongly implies combining a
| |
17 /// The name of the component whose streams are being managed (e.g. "Server"). | |
18 /// | |
19 /// Used for error reporting. | |
20 final String _name; | |
21 | |
22 /// The input stream. | |
23 /// | |
24 /// This is a stream of decoded JSON objects. | |
25 final Stream _input; | |
Bob Nystrom
2014/06/13 17:58:04
How about Stream<Object>?
nweiz
2014/06/26 20:43:58
This is basically a union type--not every type is
Bob Nystrom
2014/06/26 21:40:27
SGTM.
| |
26 | |
27 /// The subscription to [_input]. | |
28 StreamSubscription _inputSubscription; | |
29 | |
30 /// The output sink. | |
31 /// | |
32 /// This takes decoded JSON objects. | |
33 final StreamSink _output; | |
Bob Nystrom
2014/06/13 17:58:04
Likewise StreamSink<Object>.
| |
34 | |
35 /// The completer for [listen]. | |
36 /// | |
37 /// This is non-`null` after [listen] has been called. | |
38 Completer _listenCompleter; | |
39 | |
40 /// Creates a stream manager. | |
41 /// | |
Bob Nystrom
2014/06/13 17:58:04
Document input and output, in particular that the
nweiz
2014/06/26 20:43:57
Done.
| |
42 /// [inputName] is used in error messages as the name of the input parameter. | |
43 /// [outputName] is likewise used as the name of the output parameter. | |
44 /// | |
45 /// If [onInvalidInput] is passed, any errors parsing messages from [input] | |
46 /// are passed to it. Otherwise, they're ignored and the input is discarded. | |
47 factory StreamManager(String name, Stream<String> input, String inputName, | |
48 StreamSink<String> output, String outputName, | |
49 {void onInvalidInput(String message, FormatException error)}) { | |
50 if (output == null) { | |
51 if (input is! StreamSink) { | |
52 throw new ArgumentError("Either `$inputName` must be a StreamSink or " | |
53 "`$outputName` must be passed."); | |
Bob Nystrom
2014/06/13 17:58:04
Instead of this conditional logic, how about addin
nweiz
2014/06/26 20:43:58
I think having a single method is cleaner and easi
| |
54 } | |
55 output = input as StreamSink; | |
56 } | |
57 | |
58 var wrappedOutput = mapStreamSink(output, JSON.encode); | |
59 return new StreamManager.withoutJson(name, input.expand((message) { | |
60 var decodedMessage; | |
61 try { | |
62 decodedMessage = JSON.decode(message); | |
63 } on FormatException catch (error) { | |
64 if (onInvalidInput != null) onInvalidInput(message, error); | |
65 return []; | |
66 } | |
67 | |
68 return [decodedMessage]; | |
69 }), inputName, wrappedOutput, outputName); | |
70 } | |
71 | |
72 /// Creates a stream manager that reads decoded input and writes decoded | |
73 /// responses. | |
Bob Nystrom
2014/06/13 17:58:04
Document input and output.
nweiz
2014/06/26 20:43:58
Done.
| |
74 /// | |
75 /// [inputName] is used in error messages as the name of the input parameter. | |
76 /// [outputName] is likewise used as the name of the output parameter. | |
77 StreamManager.withoutJson(this._name, Stream input, String inputName, | |
Bob Nystrom
2014/06/13 17:58:04
Stream<Object> and StreamSink<Object>?
| |
78 StreamSink output, String outputName) | |
79 : _input = input, | |
80 _output = output == null && input is StreamSink ? input : output { | |
81 if (_output == null) { | |
82 throw new ArgumentError("Either `$inputName` must be a StreamSink or " | |
83 "`$outputName` must be passed."); | |
84 } | |
85 } | |
86 | |
87 /// Starts listening to the input stream. | |
Bob Nystrom
2014/06/13 17:58:04
Document that an error on the stream is piped to t
nweiz
2014/06/26 20:43:58
Done.
| |
88 Future listen(void handleInput(input)) { | |
89 if (_listenCompleter != null) { | |
90 throw new StateError("Can only call $_name.listen once."); | |
91 } | |
92 | |
93 _listenCompleter = new Completer(); | |
94 _inputSubscription = _input.listen(handleInput, | |
95 onError: (error, stackTrace) { | |
96 if (_listenCompleter.isCompleted) return; | |
97 _output.close(); | |
98 _listenCompleter.completeError(error, stackTrace); | |
99 }, onDone: () { | |
100 if (_listenCompleter.isCompleted) return; | |
101 _output.close(); | |
102 _listenCompleter.complete(); | |
103 }, cancelOnError: true); | |
104 | |
105 return _listenCompleter.future; | |
106 } | |
107 | |
108 /// Emit [event] on the output stream. | |
109 void add(event) => _output.add(event); | |
110 | |
111 /// Stops listening to the input stream and closes the output stream. | |
112 Future close() { | |
113 if (_listenCompleter == null) { | |
114 throw new StateError("Can't call $_name.close before $_name.listen."); | |
115 } | |
116 | |
117 if (!_listenCompleter.isCompleted) _listenCompleter.complete(); | |
118 | |
119 var inputFuture = _inputSubscription.cancel(); | |
120 // TODO(nweiz): include the output future in the return value when issue | |
121 // 19095 is fixed. | |
122 _output.close(); | |
123 return inputFuture == null ? new Future.value() : inputFuture; | |
124 } | |
125 } | |
OLD | NEW |