Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1329)

Unified Diff: lib/src/isolate_channel.dart

Issue 1635873002: Add an IsolateChannel class. (Closed) Base URL: git@github.com:dart-lang/stream_channel.git@master
Patch Set: Created 4 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « lib/src/delegating_stream_channel.dart ('k') | lib/stream_channel.dart » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: lib/src/isolate_channel.dart
diff --git a/lib/src/isolate_channel.dart b/lib/src/isolate_channel.dart
new file mode 100644
index 0000000000000000000000000000000000000000..c6645438d6b02daf99ff7d916df884cabc2d59e5
--- /dev/null
+++ b/lib/src/isolate_channel.dart
@@ -0,0 +1,146 @@
+// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+import 'dart:isolate';
+
+import '../stream_channel.dart';
+
+/// A [StreamChannel] that communicates over a [ReceivePort]/[SendPort] pair,
+/// presumably with another isolate.
+///
+/// The remote endpoint doesn't necessarily need to be running an
+/// [IsolateChannel]. This can be used with any two ports, although the
+/// [StreamChannel] semantics mean that this class will treat them as being
+/// paired (for example, closing the [sink] will cause the [stream] to stop
+/// emitting events).
+///
+/// The underlying isolate ports have no notion of closing connections. This
+/// means that [stream] won't close unless [sink] is closed, and that closing
+/// [sink] won't cause the remote endpoint to close. Users should take care to
+/// ensure that they always close the [sink] of every [IsolateChannel] they use
+/// to avoid leaving dangling [ReceivePort]s.
+class IsolateChannel<T> extends StreamChannelMixin<T> {
+ /// The port that produces incoming messages.
+ ///
+ /// This is wrapped in a [StreamView] to produce [stream].
+ final ReceivePort _receivePort;
+
+ /// The port that sends outgoing messages.
+ final SendPort _sendPort;
+
+ Stream<T> get stream => _stream;
+ final Stream<T> _stream;
+
+ StreamSink<T> get sink => _sink;
+ _SendPortSink<T> _sink;
+
+ /// Creates a stream channel that receives messages from [receivePort] and
+ /// sends them over [sendPort].
+ IsolateChannel(ReceivePort receivePort, this._sendPort)
+ : _receivePort = receivePort,
+ _stream = new StreamView<T>(receivePort) {
+ _sink = new _SendPortSink<T>(this);
+ }
+}
+
+/// The sink for [IsolateChannel].
+///
+/// [SendPort] doesn't natively implement any sink API, so this adds that API as
+/// a wrapper. Closing this just closes the [ReceivePort].
+class _SendPortSink<T> implements StreamSink<T> {
+ /// The channel that this sink is for.
+ final IsolateChannel _channel;
+
+ Future get done => _doneCompleter.future;
+ final _doneCompleter = new Completer();
+
+ /// Whether [done] has been completed.
+ ///
+ /// This is distinct from [_closed] because [done] can complete with an error
+ /// without the user explicitly calling [close].
+ bool get _isDone => _doneCompleter.isCompleted;
+
+ /// Whether the user has called [close].
+ bool _closed = false;
+
+ /// Whether we're currently adding a stream with [addStream].
+ bool _inAddStream = false;
+
+ _SendPortSink(this._channel);
+
+ void add(T data) {
+ if (_closed) throw new StateError("Cannot add event after closing.");
+ if (_inAddStream) {
+ throw new StateError("Cannot add event while adding stream.");
+ }
+ if (_isDone) return;
+
+ _add(data);
+ }
+
+ /// A helper for [add] that doesn't check for [StateError]s.
+ ///
+ /// This is called from [addStream], so it shouldn't check [_inAddStream].
+ void _add(T data) {
+ _channel._sendPort.send(data);
+ }
+
+ void addError(error, [StackTrace stackTrace]) {
+ if (_closed) throw new StateError("Cannot add event after closing.");
+ if (_inAddStream) {
+ throw new StateError("Cannot add event while adding stream.");
+ }
+
+ _close(error, stackTrace);
+ }
+
+ Future close() {
+ if (_inAddStream) {
+ throw new StateError("Cannot close sink while adding stream.");
+ }
+
+ _closed = true;
+ return _close();
+ }
+
+ /// A helper for [close] that doesn't check for [StateError]s.
+ ///
+ /// This is called from [addStream], so it shouldn't check [_inAddStream]. It
+ /// also forwards [error] and [stackTrace] to [done] if they're passed.
+ Future _close([error, StackTrace stackTrace]) {
+ if (_isDone) return done;
+
+ _channel._receivePort.close();
+
+ if (error != null) {
+ _doneCompleter.completeError(error, stackTrace);
+ } else {
+ _doneCompleter.complete();
+ }
+
+ return done;
+ }
+
+ Future addStream(Stream<T> stream) {
+ if (_closed) throw new StateError("Cannot add stream after closing.");
+ if (_inAddStream) {
+ throw new StateError("Cannot add stream while adding stream.");
+ }
+ if (_isDone) return;
+
+ _inAddStream = true;
+ var completer = new Completer.sync();
+ stream.listen(_add,
+ onError: (error, stackTrace) {
+ _close(error, stackTrace);
+ completer.complete();
+ },
+ onDone: completer.complete,
+ cancelOnError: true);
+ return completer.future.then((_) {
+ _inAddStream = false;
+ });
+ }
+}
« no previous file with comments | « lib/src/delegating_stream_channel.dart ('k') | lib/stream_channel.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698