Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(22)

Side by Side Diff: pkg/json_rpc_2/lib/src/stream_manager.dart

Issue 333683003: Extract out a StreamManager class from json_rpc.Server. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: fix tests Created 6 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « pkg/json_rpc_2/lib/src/server.dart ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file.
4
5 library json_rpc_2.stream_manager;
6
7 import 'dart:async';
8 import 'dart:convert';
9
10 import 'utils.dart';
11
12 /// A class for managing a streams of input messages and a sink for output
Bob Nystrom 2014/06/26 21:40:27 "streams" -> "stream".
nweiz 2014/06/26 22:55:47 Done.
13 /// messages.
14 ///
15 /// This contains stream logic that's shared between [Server] and [Client].
16 class StreamManager {
Bob Nystrom 2014/06/13 17:58:04 "manager" doesn't mean anything, and it's hard to
nweiz 2014/06/26 20:43:57 This class is really just "functionality shared be
Bob Nystrom 2014/06/26 21:40:27 How about "BidirectionalStream"? Although that's h
nweiz 2014/06/26 22:55:47 I think "Multiplexer" strongly implies combining a
17 /// The name of the component whose streams are being managed (e.g. "Server").
18 ///
19 /// Used for error reporting.
20 final String _name;
21
22 /// The input stream.
23 ///
24 /// This is a stream of decoded JSON objects.
25 final Stream _input;
Bob Nystrom 2014/06/13 17:58:04 How about Stream<Object>?
nweiz 2014/06/26 20:43:58 This is basically a union type--not every type is
Bob Nystrom 2014/06/26 21:40:27 SGTM.
26
27 /// The subscription to [_input].
28 StreamSubscription _inputSubscription;
29
30 /// The output sink.
31 ///
32 /// This takes decoded JSON objects.
33 final StreamSink _output;
Bob Nystrom 2014/06/13 17:58:04 Likewise StreamSink<Object>.
34
35 /// The completer for [listen].
36 ///
37 /// This is non-`null` after [listen] has been called.
38 Completer _listenCompleter;
39
40 /// Creates a stream manager.
41 ///
Bob Nystrom 2014/06/13 17:58:04 Document input and output, in particular that the
nweiz 2014/06/26 20:43:57 Done.
42 /// [inputName] is used in error messages as the name of the input parameter.
43 /// [outputName] is likewise used as the name of the output parameter.
44 ///
45 /// If [onInvalidInput] is passed, any errors parsing messages from [input]
46 /// are passed to it. Otherwise, they're ignored and the input is discarded.
47 factory StreamManager(String name, Stream<String> input, String inputName,
48 StreamSink<String> output, String outputName,
49 {void onInvalidInput(String message, FormatException error)}) {
50 if (output == null) {
51 if (input is! StreamSink) {
52 throw new ArgumentError("Either `$inputName` must be a StreamSink or "
53 "`$outputName` must be passed.");
Bob Nystrom 2014/06/13 17:58:04 Instead of this conditional logic, how about addin
nweiz 2014/06/26 20:43:58 I think having a single method is cleaner and easi
54 }
55 output = input as StreamSink;
56 }
57
58 var wrappedOutput = mapStreamSink(output, JSON.encode);
59 return new StreamManager.withoutJson(name, input.expand((message) {
60 var decodedMessage;
61 try {
62 decodedMessage = JSON.decode(message);
63 } on FormatException catch (error) {
64 if (onInvalidInput != null) onInvalidInput(message, error);
65 return [];
66 }
67
68 return [decodedMessage];
69 }), inputName, wrappedOutput, outputName);
70 }
71
72 /// Creates a stream manager that reads decoded input and writes decoded
73 /// responses.
Bob Nystrom 2014/06/13 17:58:04 Document input and output.
nweiz 2014/06/26 20:43:58 Done.
74 ///
75 /// [inputName] is used in error messages as the name of the input parameter.
76 /// [outputName] is likewise used as the name of the output parameter.
77 StreamManager.withoutJson(this._name, Stream input, String inputName,
Bob Nystrom 2014/06/13 17:58:04 Stream<Object> and StreamSink<Object>?
78 StreamSink output, String outputName)
79 : _input = input,
80 _output = output == null && input is StreamSink ? input : output {
81 if (_output == null) {
82 throw new ArgumentError("Either `$inputName` must be a StreamSink or "
83 "`$outputName` must be passed.");
84 }
85 }
86
87 /// Starts listening to the input stream.
Bob Nystrom 2014/06/13 17:58:04 Document that an error on the stream is piped to t
nweiz 2014/06/26 20:43:58 Done.
88 Future listen(void handleInput(input)) {
89 if (_listenCompleter != null) {
90 throw new StateError("Can only call $_name.listen once.");
91 }
92
93 _listenCompleter = new Completer();
94 _inputSubscription = _input.listen(handleInput,
95 onError: (error, stackTrace) {
96 if (_listenCompleter.isCompleted) return;
97 _output.close();
98 _listenCompleter.completeError(error, stackTrace);
99 }, onDone: () {
100 if (_listenCompleter.isCompleted) return;
101 _output.close();
102 _listenCompleter.complete();
103 }, cancelOnError: true);
104
105 return _listenCompleter.future;
106 }
107
108 /// Emit [event] on the output stream.
109 void add(event) => _output.add(event);
110
111 /// Stops listening to the input stream and closes the output stream.
112 Future close() {
113 if (_listenCompleter == null) {
114 throw new StateError("Can't call $_name.close before $_name.listen.");
115 }
116
117 if (!_listenCompleter.isCompleted) _listenCompleter.complete();
118
119 var inputFuture = _inputSubscription.cancel();
120 // TODO(nweiz): include the output future in the return value when issue
121 // 19095 is fixed.
122 _output.close();
123 return inputFuture == null ? new Future.value() : inputFuture;
124 }
125 }
OLDNEW
« no previous file with comments | « pkg/json_rpc_2/lib/src/server.dart ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698