Chromium Code Reviews| OLD | NEW |
|---|---|
| (Empty) | |
| 1 // Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file | |
| 2 // for details. All rights reserved. Use of this source code is governed by a | |
| 3 // BSD-style license that can be found in the LICENSE file. | |
| 4 | |
| 5 library json_rpc_2.stream_manager; | |
| 6 | |
| 7 import 'dart:async'; | |
| 8 import 'dart:convert'; | |
| 9 | |
| 10 import 'utils.dart'; | |
| 11 | |
| 12 /// A class for managing a streams of input messages and a sink for output | |
|
Bob Nystrom
2014/06/26 21:40:27
"streams" -> "stream".
nweiz
2014/06/26 22:55:47
Done.
| |
| 13 /// messages. | |
| 14 /// | |
| 15 /// This contains stream logic that's shared between [Server] and [Client]. | |
| 16 class StreamManager { | |
|
Bob Nystrom
2014/06/13 17:58:04
"manager" doesn't mean anything, and it's hard to
nweiz
2014/06/26 20:43:57
This class is really just "functionality shared be
Bob Nystrom
2014/06/26 21:40:27
How about "BidirectionalStream"? Although that's h
nweiz
2014/06/26 22:55:47
I think "Multiplexer" strongly implies combining a
| |
| 17 /// The name of the component whose streams are being managed (e.g. "Server"). | |
| 18 /// | |
| 19 /// Used for error reporting. | |
| 20 final String _name; | |
| 21 | |
| 22 /// The input stream. | |
| 23 /// | |
| 24 /// This is a stream of decoded JSON objects. | |
| 25 final Stream _input; | |
|
Bob Nystrom
2014/06/13 17:58:04
How about Stream<Object>?
nweiz
2014/06/26 20:43:58
This is basically a union type--not every type is
Bob Nystrom
2014/06/26 21:40:27
SGTM.
| |
| 26 | |
| 27 /// The subscription to [_input]. | |
| 28 StreamSubscription _inputSubscription; | |
| 29 | |
| 30 /// The output sink. | |
| 31 /// | |
| 32 /// This takes decoded JSON objects. | |
| 33 final StreamSink _output; | |
|
Bob Nystrom
2014/06/13 17:58:04
Likewise StreamSink<Object>.
| |
| 34 | |
| 35 /// The completer for [listen]. | |
| 36 /// | |
| 37 /// This is non-`null` after [listen] has been called. | |
| 38 Completer _listenCompleter; | |
| 39 | |
| 40 /// Creates a stream manager. | |
| 41 /// | |
|
Bob Nystrom
2014/06/13 17:58:04
Document input and output, in particular that the
nweiz
2014/06/26 20:43:57
Done.
| |
| 42 /// [inputName] is used in error messages as the name of the input parameter. | |
| 43 /// [outputName] is likewise used as the name of the output parameter. | |
| 44 /// | |
| 45 /// If [onInvalidInput] is passed, any errors parsing messages from [input] | |
| 46 /// are passed to it. Otherwise, they're ignored and the input is discarded. | |
| 47 factory StreamManager(String name, Stream<String> input, String inputName, | |
| 48 StreamSink<String> output, String outputName, | |
| 49 {void onInvalidInput(String message, FormatException error)}) { | |
| 50 if (output == null) { | |
| 51 if (input is! StreamSink) { | |
| 52 throw new ArgumentError("Either `$inputName` must be a StreamSink or " | |
| 53 "`$outputName` must be passed."); | |
|
Bob Nystrom
2014/06/13 17:58:04
Instead of this conditional logic, how about addin
nweiz
2014/06/26 20:43:58
I think having a single method is cleaner and easi
| |
| 54 } | |
| 55 output = input as StreamSink; | |
| 56 } | |
| 57 | |
| 58 var wrappedOutput = mapStreamSink(output, JSON.encode); | |
| 59 return new StreamManager.withoutJson(name, input.expand((message) { | |
| 60 var decodedMessage; | |
| 61 try { | |
| 62 decodedMessage = JSON.decode(message); | |
| 63 } on FormatException catch (error) { | |
| 64 if (onInvalidInput != null) onInvalidInput(message, error); | |
| 65 return []; | |
| 66 } | |
| 67 | |
| 68 return [decodedMessage]; | |
| 69 }), inputName, wrappedOutput, outputName); | |
| 70 } | |
| 71 | |
| 72 /// Creates a stream manager that reads decoded input and writes decoded | |
| 73 /// responses. | |
|
Bob Nystrom
2014/06/13 17:58:04
Document input and output.
nweiz
2014/06/26 20:43:58
Done.
| |
| 74 /// | |
| 75 /// [inputName] is used in error messages as the name of the input parameter. | |
| 76 /// [outputName] is likewise used as the name of the output parameter. | |
| 77 StreamManager.withoutJson(this._name, Stream input, String inputName, | |
|
Bob Nystrom
2014/06/13 17:58:04
Stream<Object> and StreamSink<Object>?
| |
| 78 StreamSink output, String outputName) | |
| 79 : _input = input, | |
| 80 _output = output == null && input is StreamSink ? input : output { | |
| 81 if (_output == null) { | |
| 82 throw new ArgumentError("Either `$inputName` must be a StreamSink or " | |
| 83 "`$outputName` must be passed."); | |
| 84 } | |
| 85 } | |
| 86 | |
| 87 /// Starts listening to the input stream. | |
|
Bob Nystrom
2014/06/13 17:58:04
Document that an error on the stream is piped to t
nweiz
2014/06/26 20:43:58
Done.
| |
| 88 Future listen(void handleInput(input)) { | |
| 89 if (_listenCompleter != null) { | |
| 90 throw new StateError("Can only call $_name.listen once."); | |
| 91 } | |
| 92 | |
| 93 _listenCompleter = new Completer(); | |
| 94 _inputSubscription = _input.listen(handleInput, | |
| 95 onError: (error, stackTrace) { | |
| 96 if (_listenCompleter.isCompleted) return; | |
| 97 _output.close(); | |
| 98 _listenCompleter.completeError(error, stackTrace); | |
| 99 }, onDone: () { | |
| 100 if (_listenCompleter.isCompleted) return; | |
| 101 _output.close(); | |
| 102 _listenCompleter.complete(); | |
| 103 }, cancelOnError: true); | |
| 104 | |
| 105 return _listenCompleter.future; | |
| 106 } | |
| 107 | |
| 108 /// Emit [event] on the output stream. | |
| 109 void add(event) => _output.add(event); | |
| 110 | |
| 111 /// Stops listening to the input stream and closes the output stream. | |
| 112 Future close() { | |
| 113 if (_listenCompleter == null) { | |
| 114 throw new StateError("Can't call $_name.close before $_name.listen."); | |
| 115 } | |
| 116 | |
| 117 if (!_listenCompleter.isCompleted) _listenCompleter.complete(); | |
| 118 | |
| 119 var inputFuture = _inputSubscription.cancel(); | |
| 120 // TODO(nweiz): include the output future in the return value when issue | |
| 121 // 19095 is fixed. | |
| 122 _output.close(); | |
| 123 return inputFuture == null ? new Future.value() : inputFuture; | |
| 124 } | |
| 125 } | |
| OLD | NEW |