| OLD | NEW |
| 1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 import 'dart:async'; | 5 import 'dart:async'; |
| 6 import 'dart:isolate'; | 6 import 'dart:isolate'; |
| 7 | 7 |
| 8 import 'package:async/async.dart'; | 8 import 'package:async/async.dart'; |
| 9 import 'package:stack_trace/stack_trace.dart'; | 9 import 'package:stack_trace/stack_trace.dart'; |
| 10 | 10 |
| (...skipping 26 matching lines...) Expand all Loading... |
| 37 /// | 37 /// |
| 38 /// The connection protocol is guaranteed to remain compatible across versions | 38 /// The connection protocol is guaranteed to remain compatible across versions |
| 39 /// at least until the next major version release. If the protocol is | 39 /// at least until the next major version release. If the protocol is |
| 40 /// violated, the resulting channel will emit a single value on its stream and | 40 /// violated, the resulting channel will emit a single value on its stream and |
| 41 /// then close. | 41 /// then close. |
| 42 factory IsolateChannel.connectReceive(ReceivePort receivePort) { | 42 factory IsolateChannel.connectReceive(ReceivePort receivePort) { |
| 43 // We can't use a [StreamChannelCompleter] here because we need the return | 43 // We can't use a [StreamChannelCompleter] here because we need the return |
| 44 // value to be an [IsolateChannel]. | 44 // value to be an [IsolateChannel]. |
| 45 var streamCompleter = new StreamCompleter<T>(); | 45 var streamCompleter = new StreamCompleter<T>(); |
| 46 var sinkCompleter = new StreamSinkCompleter<T>(); | 46 var sinkCompleter = new StreamSinkCompleter<T>(); |
| 47 var channel = new IsolateChannel._( | 47 var channel = new IsolateChannel<T>._( |
| 48 streamCompleter.stream, sinkCompleter.sink); | 48 streamCompleter.stream, sinkCompleter.sink); |
| 49 | 49 |
| 50 // The first message across the ReceivePort should be a SendPort pointing to | 50 // The first message across the ReceivePort should be a SendPort pointing to |
| 51 // the remote end. If it's not, we'll make the stream emit an error | 51 // the remote end. If it's not, we'll make the stream emit an error |
| 52 // complaining. | 52 // complaining. |
| 53 var subscription; | 53 var subscription; |
| 54 subscription = receivePort.listen((message) { | 54 subscription = receivePort.listen((message) { |
| 55 if (message is SendPort) { | 55 if (message is SendPort) { |
| 56 var controller = new StreamChannelController( | 56 var controller = new StreamChannelController<T>( |
| 57 allowForeignErrors: false, sync: true); | 57 allowForeignErrors: false, sync: true); |
| 58 new SubscriptionStream<T>(subscription).pipe(controller.local.sink); | 58 new SubscriptionStream(subscription).pipe(controller.local.sink); |
| 59 controller.local.stream.listen(message.send, | 59 controller.local.stream.listen( |
| 60 (data) => message.send(data), |
| 60 onDone: receivePort.close); | 61 onDone: receivePort.close); |
| 61 | 62 |
| 62 streamCompleter.setSourceStream(controller.foreign.stream); | 63 streamCompleter.setSourceStream(controller.foreign.stream); |
| 63 sinkCompleter.setDestinationSink(controller.foreign.sink); | 64 sinkCompleter.setDestinationSink(controller.foreign.sink); |
| 64 return; | 65 return; |
| 65 } | 66 } |
| 66 | 67 |
| 67 streamCompleter.setError( | 68 streamCompleter.setError( |
| 68 new StateError('Unexpected Isolate response "$message".'), | 69 new StateError('Unexpected Isolate response "$message".'), |
| 69 new Trace.current()); | 70 new Trace.current()); |
| (...skipping 15 matching lines...) Expand all Loading... |
| 85 /// at least until the next major version release. | 86 /// at least until the next major version release. |
| 86 factory IsolateChannel.connectSend(SendPort sendPort) { | 87 factory IsolateChannel.connectSend(SendPort sendPort) { |
| 87 var receivePort = new ReceivePort(); | 88 var receivePort = new ReceivePort(); |
| 88 sendPort.send(receivePort.sendPort); | 89 sendPort.send(receivePort.sendPort); |
| 89 return new IsolateChannel(receivePort, sendPort); | 90 return new IsolateChannel(receivePort, sendPort); |
| 90 } | 91 } |
| 91 | 92 |
| 92 /// Creates a stream channel that receives messages from [receivePort] and | 93 /// Creates a stream channel that receives messages from [receivePort] and |
| 93 /// sends them over [sendPort]. | 94 /// sends them over [sendPort]. |
| 94 factory IsolateChannel(ReceivePort receivePort, SendPort sendPort) { | 95 factory IsolateChannel(ReceivePort receivePort, SendPort sendPort) { |
| 95 var controller = new StreamChannelController( | 96 var controller = new StreamChannelController<T>( |
| 96 allowForeignErrors: false, sync: true); | 97 allowForeignErrors: false, sync: true); |
| 97 receivePort.pipe(controller.local.sink); | 98 receivePort.pipe(controller.local.sink); |
| 98 controller.local.stream.listen(sendPort.send, onDone: receivePort.close); | 99 controller.local.stream.listen( |
| 100 (data) => sendPort.send(data), |
| 101 onDone: receivePort.close); |
| 99 return new IsolateChannel._( | 102 return new IsolateChannel._( |
| 100 controller.foreign.stream, controller.foreign.sink); | 103 controller.foreign.stream, controller.foreign.sink); |
| 101 } | 104 } |
| 102 | 105 |
| 103 IsolateChannel._(this.stream, this.sink); | 106 IsolateChannel._(this.stream, this.sink); |
| 104 } | 107 } |
| OLD | NEW |