Index: lib/src/isolate_channel.dart |
diff --git a/lib/src/isolate_channel.dart b/lib/src/isolate_channel.dart |
new file mode 100644 |
index 0000000000000000000000000000000000000000..c6645438d6b02daf99ff7d916df884cabc2d59e5 |
--- /dev/null |
+++ b/lib/src/isolate_channel.dart |
@@ -0,0 +1,146 @@ |
+// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file |
+// for details. All rights reserved. Use of this source code is governed by a |
+// BSD-style license that can be found in the LICENSE file. |
+ |
+import 'dart:async'; |
+import 'dart:isolate'; |
+ |
+import '../stream_channel.dart'; |
+ |
+/// A [StreamChannel] that communicates over a [ReceivePort]/[SendPort] pair, |
+/// presumably with another isolate. |
+/// |
+/// The remote endpoint doesn't necessarily need to be running an |
+/// [IsolateChannel]. This can be used with any two ports, although the |
+/// [StreamChannel] semantics mean that this class will treat them as being |
+/// paired (for example, closing the [sink] will cause the [stream] to stop |
+/// emitting events). |
+/// |
+/// The underlying isolate ports have no notion of closing connections. This |
+/// means that [stream] won't close unless [sink] is closed, and that closing |
+/// [sink] won't cause the remote endpoint to close. Users should take care to |
+/// ensure that they always close the [sink] of every [IsolateChannel] they use |
+/// to avoid leaving dangling [ReceivePort]s. |
+class IsolateChannel<T> extends StreamChannelMixin<T> { |
+ /// The port that produces incoming messages. |
+ /// |
+ /// This is wrapped in a [StreamView] to produce [stream]. |
+ final ReceivePort _receivePort; |
+ |
+ /// The port that sends outgoing messages. |
+ final SendPort _sendPort; |
+ |
+ Stream<T> get stream => _stream; |
+ final Stream<T> _stream; |
+ |
+ StreamSink<T> get sink => _sink; |
+ _SendPortSink<T> _sink; |
+ |
+ /// Creates a stream channel that receives messages from [receivePort] and |
+ /// sends them over [sendPort]. |
+ IsolateChannel(ReceivePort receivePort, this._sendPort) |
+ : _receivePort = receivePort, |
+ _stream = new StreamView<T>(receivePort) { |
+ _sink = new _SendPortSink<T>(this); |
+ } |
+} |
+ |
+/// The sink for [IsolateChannel]. |
+/// |
+/// [SendPort] doesn't natively implement any sink API, so this adds that API as |
+/// a wrapper. Closing this just closes the [ReceivePort]. |
+class _SendPortSink<T> implements StreamSink<T> { |
+ /// The channel that this sink is for. |
+ final IsolateChannel _channel; |
+ |
+ Future get done => _doneCompleter.future; |
+ final _doneCompleter = new Completer(); |
+ |
+ /// Whether [done] has been completed. |
+ /// |
+ /// This is distinct from [_closed] because [done] can complete with an error |
+ /// without the user explicitly calling [close]. |
+ bool get _isDone => _doneCompleter.isCompleted; |
+ |
+ /// Whether the user has called [close]. |
+ bool _closed = false; |
+ |
+ /// Whether we're currently adding a stream with [addStream]. |
+ bool _inAddStream = false; |
+ |
+ _SendPortSink(this._channel); |
+ |
+ void add(T data) { |
+ if (_closed) throw new StateError("Cannot add event after closing."); |
+ if (_inAddStream) { |
+ throw new StateError("Cannot add event while adding stream."); |
+ } |
+ if (_isDone) return; |
+ |
+ _add(data); |
+ } |
+ |
+ /// A helper for [add] that doesn't check for [StateError]s. |
+ /// |
+ /// This is called from [addStream], so it shouldn't check [_inAddStream]. |
+ void _add(T data) { |
+ _channel._sendPort.send(data); |
+ } |
+ |
+ void addError(error, [StackTrace stackTrace]) { |
+ if (_closed) throw new StateError("Cannot add event after closing."); |
+ if (_inAddStream) { |
+ throw new StateError("Cannot add event while adding stream."); |
+ } |
+ |
+ _close(error, stackTrace); |
+ } |
+ |
+ Future close() { |
+ if (_inAddStream) { |
+ throw new StateError("Cannot close sink while adding stream."); |
+ } |
+ |
+ _closed = true; |
+ return _close(); |
+ } |
+ |
+ /// A helper for [close] that doesn't check for [StateError]s. |
+ /// |
+ /// This is called from [addStream], so it shouldn't check [_inAddStream]. It |
+ /// also forwards [error] and [stackTrace] to [done] if they're passed. |
+ Future _close([error, StackTrace stackTrace]) { |
+ if (_isDone) return done; |
+ |
+ _channel._receivePort.close(); |
+ |
+ if (error != null) { |
+ _doneCompleter.completeError(error, stackTrace); |
+ } else { |
+ _doneCompleter.complete(); |
+ } |
+ |
+ return done; |
+ } |
+ |
+ Future addStream(Stream<T> stream) { |
+ if (_closed) throw new StateError("Cannot add stream after closing."); |
+ if (_inAddStream) { |
+ throw new StateError("Cannot add stream while adding stream."); |
+ } |
+ if (_isDone) return; |
+ |
+ _inAddStream = true; |
+ var completer = new Completer.sync(); |
+ stream.listen(_add, |
+ onError: (error, stackTrace) { |
+ _close(error, stackTrace); |
+ completer.complete(); |
+ }, |
+ onDone: completer.complete, |
+ cancelOnError: true); |
+ return completer.future.then((_) { |
+ _inAddStream = false; |
+ }); |
+ } |
+} |