| Index: lib/src/isolate_channel/send_port_sink.dart
|
| diff --git a/lib/src/isolate_channel.dart b/lib/src/isolate_channel/send_port_sink.dart
|
| similarity index 65%
|
| copy from lib/src/isolate_channel.dart
|
| copy to lib/src/isolate_channel/send_port_sink.dart
|
| index c6645438d6b02daf99ff7d916df884cabc2d59e5..d98f1da65db04e2ee798622c6d32c1ed5ebef707 100644
|
| --- a/lib/src/isolate_channel.dart
|
| +++ b/lib/src/isolate_channel/send_port_sink.dart
|
| @@ -5,23 +5,11 @@
|
| import 'dart:async';
|
| import 'dart:isolate';
|
|
|
| -import '../stream_channel.dart';
|
| -
|
| -/// A [StreamChannel] that communicates over a [ReceivePort]/[SendPort] pair,
|
| -/// presumably with another isolate.
|
| -///
|
| -/// The remote endpoint doesn't necessarily need to be running an
|
| -/// [IsolateChannel]. This can be used with any two ports, although the
|
| -/// [StreamChannel] semantics mean that this class will treat them as being
|
| -/// paired (for example, closing the [sink] will cause the [stream] to stop
|
| -/// emitting events).
|
| +/// The sink for [IsolateChannel].
|
| ///
|
| -/// The underlying isolate ports have no notion of closing connections. This
|
| -/// means that [stream] won't close unless [sink] is closed, and that closing
|
| -/// [sink] won't cause the remote endpoint to close. Users should take care to
|
| -/// ensure that they always close the [sink] of every [IsolateChannel] they use
|
| -/// to avoid leaving dangling [ReceivePort]s.
|
| -class IsolateChannel<T> extends StreamChannelMixin<T> {
|
| +/// [SendPort] doesn't natively implement any sink API, so this adds that API as
|
| +/// a wrapper. Closing this just closes the [ReceivePort].
|
| +class SendPortSink<T> implements StreamSink<T> {
|
| /// The port that produces incoming messages.
|
| ///
|
| /// This is wrapped in a [StreamView] to produce [stream].
|
| @@ -30,29 +18,6 @@ class IsolateChannel<T> extends StreamChannelMixin<T> {
|
| /// The port that sends outgoing messages.
|
| final SendPort _sendPort;
|
|
|
| - Stream<T> get stream => _stream;
|
| - final Stream<T> _stream;
|
| -
|
| - StreamSink<T> get sink => _sink;
|
| - _SendPortSink<T> _sink;
|
| -
|
| - /// Creates a stream channel that receives messages from [receivePort] and
|
| - /// sends them over [sendPort].
|
| - IsolateChannel(ReceivePort receivePort, this._sendPort)
|
| - : _receivePort = receivePort,
|
| - _stream = new StreamView<T>(receivePort) {
|
| - _sink = new _SendPortSink<T>(this);
|
| - }
|
| -}
|
| -
|
| -/// The sink for [IsolateChannel].
|
| -///
|
| -/// [SendPort] doesn't natively implement any sink API, so this adds that API as
|
| -/// a wrapper. Closing this just closes the [ReceivePort].
|
| -class _SendPortSink<T> implements StreamSink<T> {
|
| - /// The channel that this sink is for.
|
| - final IsolateChannel _channel;
|
| -
|
| Future get done => _doneCompleter.future;
|
| final _doneCompleter = new Completer();
|
|
|
| @@ -68,7 +33,7 @@ class _SendPortSink<T> implements StreamSink<T> {
|
| /// Whether we're currently adding a stream with [addStream].
|
| bool _inAddStream = false;
|
|
|
| - _SendPortSink(this._channel);
|
| + SendPortSink(this._receivePort, this._sendPort);
|
|
|
| void add(T data) {
|
| if (_closed) throw new StateError("Cannot add event after closing.");
|
| @@ -84,7 +49,7 @@ class _SendPortSink<T> implements StreamSink<T> {
|
| ///
|
| /// This is called from [addStream], so it shouldn't check [_inAddStream].
|
| void _add(T data) {
|
| - _channel._sendPort.send(data);
|
| + _sendPort.send(data);
|
| }
|
|
|
| void addError(error, [StackTrace stackTrace]) {
|
| @@ -112,7 +77,7 @@ class _SendPortSink<T> implements StreamSink<T> {
|
| Future _close([error, StackTrace stackTrace]) {
|
| if (_isDone) return done;
|
|
|
| - _channel._receivePort.close();
|
| + _receivePort.close();
|
|
|
| if (error != null) {
|
| _doneCompleter.completeError(error, stackTrace);
|
| @@ -128,7 +93,7 @@ class _SendPortSink<T> implements StreamSink<T> {
|
| if (_inAddStream) {
|
| throw new StateError("Cannot add stream while adding stream.");
|
| }
|
| - if (_isDone) return;
|
| + if (_isDone) return new Future.value();
|
|
|
| _inAddStream = true;
|
| var completer = new Completer.sync();
|
|
|