Index: lib/src/isolate_channel/send_port_sink.dart |
diff --git a/lib/src/isolate_channel.dart b/lib/src/isolate_channel/send_port_sink.dart |
similarity index 65% |
copy from lib/src/isolate_channel.dart |
copy to lib/src/isolate_channel/send_port_sink.dart |
index c6645438d6b02daf99ff7d916df884cabc2d59e5..d98f1da65db04e2ee798622c6d32c1ed5ebef707 100644 |
--- a/lib/src/isolate_channel.dart |
+++ b/lib/src/isolate_channel/send_port_sink.dart |
@@ -5,23 +5,11 @@ |
import 'dart:async'; |
import 'dart:isolate'; |
-import '../stream_channel.dart'; |
- |
-/// A [StreamChannel] that communicates over a [ReceivePort]/[SendPort] pair, |
-/// presumably with another isolate. |
-/// |
-/// The remote endpoint doesn't necessarily need to be running an |
-/// [IsolateChannel]. This can be used with any two ports, although the |
-/// [StreamChannel] semantics mean that this class will treat them as being |
-/// paired (for example, closing the [sink] will cause the [stream] to stop |
-/// emitting events). |
+/// The sink for [IsolateChannel]. |
/// |
-/// The underlying isolate ports have no notion of closing connections. This |
-/// means that [stream] won't close unless [sink] is closed, and that closing |
-/// [sink] won't cause the remote endpoint to close. Users should take care to |
-/// ensure that they always close the [sink] of every [IsolateChannel] they use |
-/// to avoid leaving dangling [ReceivePort]s. |
-class IsolateChannel<T> extends StreamChannelMixin<T> { |
+/// [SendPort] doesn't natively implement any sink API, so this adds that API as |
+/// a wrapper. Closing this just closes the [ReceivePort]. |
+class SendPortSink<T> implements StreamSink<T> { |
/// The port that produces incoming messages. |
/// |
/// This is wrapped in a [StreamView] to produce [stream]. |
@@ -30,29 +18,6 @@ class IsolateChannel<T> extends StreamChannelMixin<T> { |
/// The port that sends outgoing messages. |
final SendPort _sendPort; |
- Stream<T> get stream => _stream; |
- final Stream<T> _stream; |
- |
- StreamSink<T> get sink => _sink; |
- _SendPortSink<T> _sink; |
- |
- /// Creates a stream channel that receives messages from [receivePort] and |
- /// sends them over [sendPort]. |
- IsolateChannel(ReceivePort receivePort, this._sendPort) |
- : _receivePort = receivePort, |
- _stream = new StreamView<T>(receivePort) { |
- _sink = new _SendPortSink<T>(this); |
- } |
-} |
- |
-/// The sink for [IsolateChannel]. |
-/// |
-/// [SendPort] doesn't natively implement any sink API, so this adds that API as |
-/// a wrapper. Closing this just closes the [ReceivePort]. |
-class _SendPortSink<T> implements StreamSink<T> { |
- /// The channel that this sink is for. |
- final IsolateChannel _channel; |
- |
Future get done => _doneCompleter.future; |
final _doneCompleter = new Completer(); |
@@ -68,7 +33,7 @@ class _SendPortSink<T> implements StreamSink<T> { |
/// Whether we're currently adding a stream with [addStream]. |
bool _inAddStream = false; |
- _SendPortSink(this._channel); |
+ SendPortSink(this._receivePort, this._sendPort); |
void add(T data) { |
if (_closed) throw new StateError("Cannot add event after closing."); |
@@ -84,7 +49,7 @@ class _SendPortSink<T> implements StreamSink<T> { |
/// |
/// This is called from [addStream], so it shouldn't check [_inAddStream]. |
void _add(T data) { |
- _channel._sendPort.send(data); |
+ _sendPort.send(data); |
} |
void addError(error, [StackTrace stackTrace]) { |
@@ -112,7 +77,7 @@ class _SendPortSink<T> implements StreamSink<T> { |
Future _close([error, StackTrace stackTrace]) { |
if (_isDone) return done; |
- _channel._receivePort.close(); |
+ _receivePort.close(); |
if (error != null) { |
_doneCompleter.completeError(error, stackTrace); |
@@ -128,7 +93,7 @@ class _SendPortSink<T> implements StreamSink<T> { |
if (_inAddStream) { |
throw new StateError("Cannot add stream while adding stream."); |
} |
- if (_isDone) return; |
+ if (_isDone) return new Future.value(); |
_inAddStream = true; |
var completer = new Completer.sync(); |