OLD | NEW |
1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 import 'dart:async'; | 5 import 'dart:async'; |
6 import 'dart:isolate'; | 6 import 'dart:isolate'; |
7 | 7 |
8 import 'package:async/async.dart'; | 8 import 'package:async/async.dart'; |
9 import 'package:stack_trace/stack_trace.dart'; | 9 import 'package:stack_trace/stack_trace.dart'; |
10 | 10 |
(...skipping 26 matching lines...) Expand all Loading... |
37 /// | 37 /// |
38 /// The connection protocol is guaranteed to remain compatible across versions | 38 /// The connection protocol is guaranteed to remain compatible across versions |
39 /// at least until the next major version release. If the protocol is | 39 /// at least until the next major version release. If the protocol is |
40 /// violated, the resulting channel will emit a single value on its stream and | 40 /// violated, the resulting channel will emit a single value on its stream and |
41 /// then close. | 41 /// then close. |
42 factory IsolateChannel.connectReceive(ReceivePort receivePort) { | 42 factory IsolateChannel.connectReceive(ReceivePort receivePort) { |
43 // We can't use a [StreamChannelCompleter] here because we need the return | 43 // We can't use a [StreamChannelCompleter] here because we need the return |
44 // value to be an [IsolateChannel]. | 44 // value to be an [IsolateChannel]. |
45 var streamCompleter = new StreamCompleter<T>(); | 45 var streamCompleter = new StreamCompleter<T>(); |
46 var sinkCompleter = new StreamSinkCompleter<T>(); | 46 var sinkCompleter = new StreamSinkCompleter<T>(); |
47 var channel = new IsolateChannel._( | 47 var channel = new IsolateChannel<T>._( |
48 streamCompleter.stream, sinkCompleter.sink); | 48 streamCompleter.stream, sinkCompleter.sink); |
49 | 49 |
50 // The first message across the ReceivePort should be a SendPort pointing to | 50 // The first message across the ReceivePort should be a SendPort pointing to |
51 // the remote end. If it's not, we'll make the stream emit an error | 51 // the remote end. If it's not, we'll make the stream emit an error |
52 // complaining. | 52 // complaining. |
53 var subscription; | 53 var subscription; |
54 subscription = receivePort.listen((message) { | 54 subscription = receivePort.listen((message) { |
55 if (message is SendPort) { | 55 if (message is SendPort) { |
56 var controller = new StreamChannelController( | 56 var controller = new StreamChannelController<T>( |
57 allowForeignErrors: false, sync: true); | 57 allowForeignErrors: false, sync: true); |
58 new SubscriptionStream<T>(subscription).pipe(controller.local.sink); | 58 new SubscriptionStream(subscription).pipe(controller.local.sink); |
59 controller.local.stream.listen(message.send, | 59 controller.local.stream.listen( |
| 60 (data) => message.send(data), |
60 onDone: receivePort.close); | 61 onDone: receivePort.close); |
61 | 62 |
62 streamCompleter.setSourceStream(controller.foreign.stream); | 63 streamCompleter.setSourceStream(controller.foreign.stream); |
63 sinkCompleter.setDestinationSink(controller.foreign.sink); | 64 sinkCompleter.setDestinationSink(controller.foreign.sink); |
64 return; | 65 return; |
65 } | 66 } |
66 | 67 |
67 streamCompleter.setError( | 68 streamCompleter.setError( |
68 new StateError('Unexpected Isolate response "$message".'), | 69 new StateError('Unexpected Isolate response "$message".'), |
69 new Trace.current()); | 70 new Trace.current()); |
(...skipping 15 matching lines...) Expand all Loading... |
85 /// at least until the next major version release. | 86 /// at least until the next major version release. |
86 factory IsolateChannel.connectSend(SendPort sendPort) { | 87 factory IsolateChannel.connectSend(SendPort sendPort) { |
87 var receivePort = new ReceivePort(); | 88 var receivePort = new ReceivePort(); |
88 sendPort.send(receivePort.sendPort); | 89 sendPort.send(receivePort.sendPort); |
89 return new IsolateChannel(receivePort, sendPort); | 90 return new IsolateChannel(receivePort, sendPort); |
90 } | 91 } |
91 | 92 |
92 /// Creates a stream channel that receives messages from [receivePort] and | 93 /// Creates a stream channel that receives messages from [receivePort] and |
93 /// sends them over [sendPort]. | 94 /// sends them over [sendPort]. |
94 factory IsolateChannel(ReceivePort receivePort, SendPort sendPort) { | 95 factory IsolateChannel(ReceivePort receivePort, SendPort sendPort) { |
95 var controller = new StreamChannelController( | 96 var controller = new StreamChannelController<T>( |
96 allowForeignErrors: false, sync: true); | 97 allowForeignErrors: false, sync: true); |
97 receivePort.pipe(controller.local.sink); | 98 receivePort.pipe(controller.local.sink); |
98 controller.local.stream.listen(sendPort.send, onDone: receivePort.close); | 99 controller.local.stream.listen( |
| 100 (data) => sendPort.send(data), |
| 101 onDone: receivePort.close); |
99 return new IsolateChannel._( | 102 return new IsolateChannel._( |
100 controller.foreign.stream, controller.foreign.sink); | 103 controller.foreign.stream, controller.foreign.sink); |
101 } | 104 } |
102 | 105 |
103 IsolateChannel._(this.stream, this.sink); | 106 IsolateChannel._(this.stream, this.sink); |
104 } | 107 } |
OLD | NEW |