Index: lib/src/isolate_channel.dart |
diff --git a/lib/src/isolate_channel.dart b/lib/src/isolate_channel.dart |
index a466d87f3eb698d7b7af69ad031907ff043e5417..3466d19dd10b98f9b8beea4ec9d9f6280d21a5fa 100644 |
--- a/lib/src/isolate_channel.dart |
+++ b/lib/src/isolate_channel.dart |
@@ -9,7 +9,6 @@ import 'package:async/async.dart'; |
import 'package:stack_trace/stack_trace.dart'; |
import '../stream_channel.dart'; |
-import 'isolate_channel/send_port_sink.dart'; |
/// A [StreamChannel] that communicates over a [ReceivePort]/[SendPort] pair, |
/// presumably with another isolate. |
@@ -54,10 +53,13 @@ class IsolateChannel<T> extends StreamChannelMixin<T> { |
var subscription; |
subscription = receivePort.listen((message) { |
if (message is SendPort) { |
- streamCompleter.setSourceStream( |
- new SubscriptionStream<T>(subscription)); |
- sinkCompleter.setDestinationSink( |
- new SendPortSink<T>(receivePort, message)); |
+ var controller = new StreamChannelController(allowForeignErrors: false); |
+ new SubscriptionStream<T>(subscription).pipe(controller.local.sink); |
+ controller.local.stream.listen(message.send, |
+ onDone: receivePort.close); |
+ |
+ streamCompleter.setSourceStream(controller.foreign.stream); |
+ sinkCompleter.setDestinationSink(controller.foreign.sink); |
return; |
} |
@@ -88,9 +90,13 @@ class IsolateChannel<T> extends StreamChannelMixin<T> { |
/// Creates a stream channel that receives messages from [receivePort] and |
/// sends them over [sendPort]. |
- IsolateChannel(ReceivePort receivePort, SendPort sendPort) |
- : stream = new StreamView<T>(receivePort), |
- sink = new SendPortSink<T>(receivePort, sendPort); |
+ factory IsolateChannel(ReceivePort receivePort, SendPort sendPort) { |
+ var controller = new StreamChannelController(allowForeignErrors: false); |
+ receivePort.pipe(controller.local.sink); |
+ controller.local.stream.listen(sendPort.send, onDone: receivePort.close); |
+ return new IsolateChannel._( |
+ controller.foreign.stream, controller.foreign.sink); |
+ } |
IsolateChannel._(this.stream, this.sink); |
} |