Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(55)

Unified Diff: lib/src/channel_manager.dart

Issue 1652413002: Use StreamChannel. (Closed) Base URL: git@github.com:dart-lang/json_rpc_2.git@master
Patch Set: Created 4 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: lib/src/channel_manager.dart
diff --git a/lib/src/channel_manager.dart b/lib/src/channel_manager.dart
new file mode 100644
index 0000000000000000000000000000000000000000..40a3c58a9c0f9b7b7d0550c5f814cba003bd4423
--- /dev/null
+++ b/lib/src/channel_manager.dart
@@ -0,0 +1,79 @@
+// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
Bob Nystrom 2016/02/02 17:54:15 2016! :)
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import 'package:stream_channel/stream_channel.dart';
+
+/// Wraps a [StreamChannel] and handles logic that's shared between [Server],
+/// [Client], and [Peer].
+///
+/// These classes don't provide the user direct access to a
+/// [StreamSubscription]. Instead, they use the future returned by [listen] to
+/// notify the user of the remote endpoint closing or producing an error.
+class ChannelManager {
+ /// The name of the component whose channel is wrapped (e.g. "Server").
+ ///
+ /// Used for error reporting.
+ final String _name;
+
+ /// The underlying channel.
+ final StreamChannel _channel;
+
+ /// Returns a [Future] that completes when the connection is closed.
+ ///
+ /// This is the same future that's returned by [listen].
+ Future get done => _doneCompleter.future;
+ final _doneCompleter = new Completer();
+
+ /// Whether the underlying communication channel is closed.
+ bool get isClosed => _doneCompleter.isCompleted;
+
+ /// Whether [listen] has been called.
+ bool _listenCalled = false;
+
+ /// Whether [close] has been called.
+ ///
+ /// Note that [isClosed] tracks whether the underlying connection is closed,
+ /// whereas this tracks only whether it was explicitly closed from this end.
+ bool _closeCalled = false;
+
+ ChannelManager(this._name, this._channel);
+
+ /// Starts listening to the channel.
+ ///
+ /// The returned Future will complete when the input stream is closed. If the
+ /// input stream emits an error, that will be piped to the returned Future.
+ Future listen(void handleInput(input)) {
+ if (_listenCalled) {
+ throw new StateError("Can only call $_name.listen() once.");
+ }
+ _listenCalled = true;
+
+ _channel.stream.listen(handleInput,
+ onError: (error, stackTrace) {
+ _doneCompleter.completeError(error, stackTrace);
+ _channel.sink.close();
+ },
+ onDone: _doneCompleter.complete,
+ cancelOnError: true);
+
+ return done;
+ }
+
+ /// Emit [event].
+ void add(event) {
+ if (isClosed && !_closeCalled) return;
+ _channel.sink.add(event);
+ }
+
+ /// Closes the channel.
+ Future close() {
+ _closeCalled = true;
+ if (!_doneCompleter.isCompleted) {
+ _doneCompleter.complete(_channel.sink.close());
+ }
+ return done;
+ }
+}

Powered by Google App Engine
This is Rietveld 408576698