Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(146)

Side by Side Diff: lib/src/two_way_stream.dart

Issue 810333007: Add a done getter to Client, Server, and Peer. (Closed) Base URL: git@github.com:dart-lang/json_rpc_2@master
Patch Set: Created 5 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « lib/src/server.dart ('k') | pubspec.yaml » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 library json_rpc_2.two_way_stream; 5 library json_rpc_2.two_way_stream;
6 6
7 import 'dart:async'; 7 import 'dart:async';
8 import 'dart:convert'; 8 import 'dart:convert';
9 9
10 import 'utils.dart'; 10 import 'utils.dart';
(...skipping 14 matching lines...) Expand all
25 final Stream _input; 25 final Stream _input;
26 26
27 /// The subscription to [_input]. 27 /// The subscription to [_input].
28 StreamSubscription _inputSubscription; 28 StreamSubscription _inputSubscription;
29 29
30 /// The output sink. 30 /// The output sink.
31 /// 31 ///
32 /// This takes decoded JSON objects. 32 /// This takes decoded JSON objects.
33 final StreamSink _output; 33 final StreamSink _output;
34 34
35 /// The completer for [listen]. 35 /// Returns a [Future] that completes when the connection is closed.
36 /// 36 ///
37 /// This is non-`null` after [listen] has been called. 37 /// This is the same future that's returned by [listen].
38 Completer _listenCompleter; 38 Future get done => _doneCompleter.future;
39 final _doneCompleter = new Completer();
39 40
40 /// Creates a two-way stream. 41 /// Creates a two-way stream.
41 /// 42 ///
42 /// [input] and [output] should emit and take (respectively) JSON-encoded 43 /// [input] and [output] should emit and take (respectively) JSON-encoded
43 /// strings. 44 /// strings.
44 /// 45 ///
45 /// [inputName] is used in error messages as the name of the input parameter. 46 /// [inputName] is used in error messages as the name of the input parameter.
46 /// [outputName] is likewise used as the name of the output parameter. 47 /// [outputName] is likewise used as the name of the output parameter.
47 /// 48 ///
48 /// If [onInvalidInput] is passed, any errors parsing messages from [input] 49 /// If [onInvalidInput] is passed, any errors parsing messages from [input]
(...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after
88 throw new ArgumentError("Either `$inputName` must be a StreamSink or " 89 throw new ArgumentError("Either `$inputName` must be a StreamSink or "
89 "`$outputName` must be passed."); 90 "`$outputName` must be passed.");
90 } 91 }
91 } 92 }
92 93
93 /// Starts listening to the input stream. 94 /// Starts listening to the input stream.
94 /// 95 ///
95 /// The returned Future will complete when the input stream is closed. If the 96 /// The returned Future will complete when the input stream is closed. If the
96 /// input stream emits an error, that will be piped to the returned Future. 97 /// input stream emits an error, that will be piped to the returned Future.
97 Future listen(void handleInput(input)) { 98 Future listen(void handleInput(input)) {
98 if (_listenCompleter != null) { 99 if (_inputSubscription != null) {
99 throw new StateError("Can only call $_name.listen once."); 100 throw new StateError("Can only call $_name.listen once.");
100 } 101 }
101 102
102 _listenCompleter = new Completer();
103 _inputSubscription = _input.listen(handleInput, 103 _inputSubscription = _input.listen(handleInput,
104 onError: (error, stackTrace) { 104 onError: (error, stackTrace) {
105 if (_listenCompleter.isCompleted) return; 105 if (_doneCompleter.isCompleted) return;
106 _output.close(); 106 _output.close();
107 _listenCompleter.completeError(error, stackTrace); 107 _doneCompleter.completeError(error, stackTrace);
108 }, onDone: () { 108 }, onDone: () {
109 if (_listenCompleter.isCompleted) return; 109 if (_doneCompleter.isCompleted) return;
110 _output.close(); 110 _output.close();
111 _listenCompleter.complete(); 111 _doneCompleter.complete();
112 }, cancelOnError: true); 112 }, cancelOnError: true);
113 113
114 return _listenCompleter.future; 114 return _doneCompleter.future;
115 } 115 }
116 116
117 /// Emit [event] on the output stream. 117 /// Emit [event] on the output stream.
118 void add(event) => _output.add(event); 118 void add(event) => _output.add(event);
119 119
120 /// Stops listening to the input stream and closes the output stream. 120 /// Stops listening to the input stream and closes the output stream.
121 Future close() { 121 Future close() {
122 if (_listenCompleter == null) { 122 if (_inputSubscription == null) {
123 throw new StateError("Can't call $_name.close before $_name.listen."); 123 throw new StateError("Can't call $_name.close before $_name.listen.");
124 } 124 }
125 125
126 if (!_listenCompleter.isCompleted) _listenCompleter.complete(); 126 if (!_doneCompleter.isCompleted) _doneCompleter.complete();
127 127
128 var inputFuture = _inputSubscription.cancel(); 128 var inputFuture = _inputSubscription.cancel();
129 // TODO(nweiz): include the output future in the return value when issue 129 // TODO(nweiz): include the output future in the return value when issue
130 // 19095 is fixed. 130 // 19095 is fixed.
131 _output.close(); 131 _output.close();
132 return inputFuture == null ? new Future.value() : inputFuture; 132 return inputFuture == null ? new Future.value() : inputFuture;
133 } 133 }
134 } 134 }
OLDNEW
« no previous file with comments | « lib/src/server.dart ('k') | pubspec.yaml » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698