| Index: lib/src/isolate_channel.dart
|
| diff --git a/lib/src/isolate_channel.dart b/lib/src/isolate_channel.dart
|
| index a466d87f3eb698d7b7af69ad031907ff043e5417..3466d19dd10b98f9b8beea4ec9d9f6280d21a5fa 100644
|
| --- a/lib/src/isolate_channel.dart
|
| +++ b/lib/src/isolate_channel.dart
|
| @@ -9,7 +9,6 @@ import 'package:async/async.dart';
|
| import 'package:stack_trace/stack_trace.dart';
|
|
|
| import '../stream_channel.dart';
|
| -import 'isolate_channel/send_port_sink.dart';
|
|
|
| /// A [StreamChannel] that communicates over a [ReceivePort]/[SendPort] pair,
|
| /// presumably with another isolate.
|
| @@ -54,10 +53,13 @@ class IsolateChannel<T> extends StreamChannelMixin<T> {
|
| var subscription;
|
| subscription = receivePort.listen((message) {
|
| if (message is SendPort) {
|
| - streamCompleter.setSourceStream(
|
| - new SubscriptionStream<T>(subscription));
|
| - sinkCompleter.setDestinationSink(
|
| - new SendPortSink<T>(receivePort, message));
|
| + var controller = new StreamChannelController(allowForeignErrors: false);
|
| + new SubscriptionStream<T>(subscription).pipe(controller.local.sink);
|
| + controller.local.stream.listen(message.send,
|
| + onDone: receivePort.close);
|
| +
|
| + streamCompleter.setSourceStream(controller.foreign.stream);
|
| + sinkCompleter.setDestinationSink(controller.foreign.sink);
|
| return;
|
| }
|
|
|
| @@ -88,9 +90,13 @@ class IsolateChannel<T> extends StreamChannelMixin<T> {
|
|
|
| /// Creates a stream channel that receives messages from [receivePort] and
|
| /// sends them over [sendPort].
|
| - IsolateChannel(ReceivePort receivePort, SendPort sendPort)
|
| - : stream = new StreamView<T>(receivePort),
|
| - sink = new SendPortSink<T>(receivePort, sendPort);
|
| + factory IsolateChannel(ReceivePort receivePort, SendPort sendPort) {
|
| + var controller = new StreamChannelController(allowForeignErrors: false);
|
| + receivePort.pipe(controller.local.sink);
|
| + controller.local.stream.listen(sendPort.send, onDone: receivePort.close);
|
| + return new IsolateChannel._(
|
| + controller.foreign.stream, controller.foreign.sink);
|
| + }
|
|
|
| IsolateChannel._(this.stream, this.sink);
|
| }
|
|
|