Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(322)

Side by Side Diff: pkg/json_rpc_2/lib/src/two_way_stream.dart

Issue 333683003: Extract out a StreamManager class from json_rpc.Server. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: code review Created 6 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « pkg/json_rpc_2/lib/src/server.dart ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file.
4
5 library json_rpc_2.two_way_stream;
6
7 import 'dart:async';
8 import 'dart:convert';
9
10 import 'utils.dart';
11
12 /// A class for managing a stream of input messages and a sink for output
13 /// messages.
14 ///
15 /// This contains stream logic that's shared between [Server] and [Client].
16 class TwoWayStream {
17 /// The name of the component whose streams are being managed (e.g. "Server").
18 ///
19 /// Used for error reporting.
20 final String _name;
21
22 /// The input stream.
23 ///
24 /// This is a stream of decoded JSON objects.
25 final Stream _input;
26
27 /// The subscription to [_input].
28 StreamSubscription _inputSubscription;
29
30 /// The output sink.
31 ///
32 /// This takes decoded JSON objects.
33 final StreamSink _output;
34
35 /// The completer for [listen].
36 ///
37 /// This is non-`null` after [listen] has been called.
38 Completer _listenCompleter;
39
40 /// Creates a two-way stream.
41 ///
42 /// [input] and [output] should emit and take (respectively) JSON-encoded
43 /// strings.
44 ///
45 /// [inputName] is used in error messages as the name of the input parameter.
46 /// [outputName] is likewise used as the name of the output parameter.
47 ///
48 /// If [onInvalidInput] is passed, any errors parsing messages from [input]
49 /// are passed to it. Otherwise, they're ignored and the input is discarded.
50 factory TwoWayStream(String name, Stream<String> input, String inputName,
51 StreamSink<String> output, String outputName,
52 {void onInvalidInput(String message, FormatException error)}) {
53 if (output == null) {
54 if (input is! StreamSink) {
55 throw new ArgumentError("Either `$inputName` must be a StreamSink or "
56 "`$outputName` must be passed.");
57 }
58 output = input as StreamSink;
59 }
60
61 var wrappedOutput = mapStreamSink(output, JSON.encode);
62 return new TwoWayStream.withoutJson(name, input.expand((message) {
63 var decodedMessage;
64 try {
65 decodedMessage = JSON.decode(message);
66 } on FormatException catch (error) {
67 if (onInvalidInput != null) onInvalidInput(message, error);
68 return [];
69 }
70
71 return [decodedMessage];
72 }), inputName, wrappedOutput, outputName);
73 }
74
75 /// Creates a two-way stream that reads decoded input and writes decoded
76 /// responses.
77 ///
78 /// [input] and [output] should emit and take (respectively) decoded JSON
79 /// objects.
80 ///
81 /// [inputName] is used in error messages as the name of the input parameter.
82 /// [outputName] is likewise used as the name of the output parameter.
83 TwoWayStream.withoutJson(this._name, Stream input, String inputName,
84 StreamSink output, String outputName)
85 : _input = input,
86 _output = output == null && input is StreamSink ? input : output {
87 if (_output == null) {
88 throw new ArgumentError("Either `$inputName` must be a StreamSink or "
89 "`$outputName` must be passed.");
90 }
91 }
92
93 /// Starts listening to the input stream.
94 ///
95 /// The returned Future will complete when the input stream is closed. If the
96 /// input stream emits an error, that will be piped to the returned Future.
97 Future listen(void handleInput(input)) {
98 if (_listenCompleter != null) {
99 throw new StateError("Can only call $_name.listen once.");
100 }
101
102 _listenCompleter = new Completer();
103 _inputSubscription = _input.listen(handleInput,
104 onError: (error, stackTrace) {
105 if (_listenCompleter.isCompleted) return;
106 _output.close();
107 _listenCompleter.completeError(error, stackTrace);
108 }, onDone: () {
109 if (_listenCompleter.isCompleted) return;
110 _output.close();
111 _listenCompleter.complete();
112 }, cancelOnError: true);
113
114 return _listenCompleter.future;
115 }
116
117 /// Emit [event] on the output stream.
118 void add(event) => _output.add(event);
119
120 /// Stops listening to the input stream and closes the output stream.
121 Future close() {
122 if (_listenCompleter == null) {
123 throw new StateError("Can't call $_name.close before $_name.listen.");
124 }
125
126 if (!_listenCompleter.isCompleted) _listenCompleter.complete();
127
128 var inputFuture = _inputSubscription.cancel();
129 // TODO(nweiz): include the output future in the return value when issue
130 // 19095 is fixed.
131 _output.close();
132 return inputFuture == null ? new Future.value() : inputFuture;
133 }
134 }
OLDNEW
« no previous file with comments | « pkg/json_rpc_2/lib/src/server.dart ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698