OLD | NEW |
| (Empty) |
1 // Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file | |
2 // for details. All rights reserved. Use of this source code is governed by a | |
3 // BSD-style license that can be found in the LICENSE file. | |
4 | |
5 import 'dart:async'; | |
6 import 'dart:convert'; | |
7 | |
8 import 'utils.dart'; | |
9 | |
10 /// A class for managing a stream of input messages and a sink for output | |
11 /// messages. | |
12 /// | |
13 /// This contains stream logic that's shared between [Server] and [Client]. | |
14 class TwoWayStream { | |
15 /// The name of the component whose streams are being managed (e.g. "Server"). | |
16 /// | |
17 /// Used for error reporting. | |
18 final String _name; | |
19 | |
20 /// The input stream. | |
21 /// | |
22 /// This is a stream of decoded JSON objects. | |
23 final Stream _input; | |
24 | |
25 /// The subscription to [_input]. | |
26 StreamSubscription _inputSubscription; | |
27 | |
28 /// The output sink. | |
29 /// | |
30 /// This takes decoded JSON objects. | |
31 final StreamSink _output; | |
32 | |
33 /// Returns a [Future] that completes when the connection is closed. | |
34 /// | |
35 /// This is the same future that's returned by [listen]. | |
36 Future get done => _doneCompleter.future; | |
37 final _doneCompleter = new Completer(); | |
38 | |
39 /// Whether the stream has been closed. | |
40 bool get isClosed => _doneCompleter.isCompleted; | |
41 | |
42 /// Creates a two-way stream. | |
43 /// | |
44 /// [input] and [output] should emit and take (respectively) JSON-encoded | |
45 /// strings. | |
46 /// | |
47 /// [inputName] is used in error messages as the name of the input parameter. | |
48 /// [outputName] is likewise used as the name of the output parameter. | |
49 /// | |
50 /// If [onInvalidInput] is passed, any errors parsing messages from [input] | |
51 /// are passed to it. Otherwise, they're ignored and the input is discarded. | |
52 factory TwoWayStream(String name, Stream<String> input, String inputName, | |
53 StreamSink<String> output, String outputName, | |
54 {void onInvalidInput(String message, FormatException error)}) { | |
55 if (output == null) { | |
56 if (input is! StreamSink) { | |
57 throw new ArgumentError("Either `$inputName` must be a StreamSink or " | |
58 "`$outputName` must be passed."); | |
59 } | |
60 output = input as StreamSink; | |
61 } | |
62 | |
63 var wrappedOutput = mapStreamSink(output, JSON.encode); | |
64 return new TwoWayStream.withoutJson(name, input.expand((message) { | |
65 var decodedMessage; | |
66 try { | |
67 decodedMessage = JSON.decode(message); | |
68 } on FormatException catch (error) { | |
69 if (onInvalidInput != null) onInvalidInput(message, error); | |
70 return []; | |
71 } | |
72 | |
73 return [decodedMessage]; | |
74 }), inputName, wrappedOutput, outputName); | |
75 } | |
76 | |
77 /// Creates a two-way stream that reads decoded input and writes decoded | |
78 /// responses. | |
79 /// | |
80 /// [input] and [output] should emit and take (respectively) decoded JSON | |
81 /// objects. | |
82 /// | |
83 /// [inputName] is used in error messages as the name of the input parameter. | |
84 /// [outputName] is likewise used as the name of the output parameter. | |
85 TwoWayStream.withoutJson(this._name, Stream input, String inputName, | |
86 StreamSink output, String outputName) | |
87 : _input = input, | |
88 _output = output == null && input is StreamSink ? input : output { | |
89 if (_output == null) { | |
90 throw new ArgumentError("Either `$inputName` must be a StreamSink or " | |
91 "`$outputName` must be passed."); | |
92 } | |
93 } | |
94 | |
95 /// Starts listening to the input stream. | |
96 /// | |
97 /// The returned Future will complete when the input stream is closed. If the | |
98 /// input stream emits an error, that will be piped to the returned Future. | |
99 Future listen(void handleInput(input)) { | |
100 if (_inputSubscription != null) { | |
101 throw new StateError("Can only call $_name.listen once."); | |
102 } | |
103 | |
104 _inputSubscription = _input.listen(handleInput, | |
105 onError: (error, stackTrace) { | |
106 if (_doneCompleter.isCompleted) return; | |
107 _doneCompleter.completeError(error, stackTrace); | |
108 _output.close(); | |
109 }, onDone: () { | |
110 if (_doneCompleter.isCompleted) return; | |
111 _doneCompleter.complete(); | |
112 _output.close(); | |
113 }, cancelOnError: true); | |
114 | |
115 return _doneCompleter.future; | |
116 } | |
117 | |
118 /// Emit [event] on the output stream. | |
119 void add(event) => _output.add(event); | |
120 | |
121 /// Stops listening to the input stream and closes the output stream. | |
122 Future close() { | |
123 if (_inputSubscription == null) { | |
124 throw new StateError("Can't call $_name.close before $_name.listen."); | |
125 } | |
126 | |
127 if (!_doneCompleter.isCompleted) _doneCompleter.complete(); | |
128 | |
129 var inputFuture = _inputSubscription.cancel(); | |
130 // TODO(nweiz): include the output future in the return value when issue | |
131 // 19095 is fixed. | |
132 _output.close(); | |
133 return inputFuture == null ? new Future.value() : inputFuture; | |
134 } | |
135 } | |
OLD | NEW |