OLD | NEW |
---|---|
1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 import 'dart:async'; | 5 import 'dart:async'; |
6 import 'dart:isolate'; | 6 import 'dart:isolate'; |
7 | 7 |
8 import 'package:async/async.dart'; | |
9 import 'package:stack_trace/stack_trace.dart'; | |
10 | |
8 import '../stream_channel.dart'; | 11 import '../stream_channel.dart'; |
12 import 'isolate_channel/send_port_sink.dart'; | |
9 | 13 |
10 /// A [StreamChannel] that communicates over a [ReceivePort]/[SendPort] pair, | 14 /// A [StreamChannel] that communicates over a [ReceivePort]/[SendPort] pair, |
11 /// presumably with another isolate. | 15 /// presumably with another isolate. |
12 /// | 16 /// |
13 /// The remote endpoint doesn't necessarily need to be running an | 17 /// The remote endpoint doesn't necessarily need to be running an |
14 /// [IsolateChannel]. This can be used with any two ports, although the | 18 /// [IsolateChannel]. This can be used with any two ports, although the |
15 /// [StreamChannel] semantics mean that this class will treat them as being | 19 /// [StreamChannel] semantics mean that this class will treat them as being |
16 /// paired (for example, closing the [sink] will cause the [stream] to stop | 20 /// paired (for example, closing the [sink] will cause the [stream] to stop |
17 /// emitting events). | 21 /// emitting events). |
18 /// | 22 /// |
19 /// The underlying isolate ports have no notion of closing connections. This | 23 /// The underlying isolate ports have no notion of closing connections. This |
20 /// means that [stream] won't close unless [sink] is closed, and that closing | 24 /// means that [stream] won't close unless [sink] is closed, and that closing |
21 /// [sink] won't cause the remote endpoint to close. Users should take care to | 25 /// [sink] won't cause the remote endpoint to close. Users should take care to |
22 /// ensure that they always close the [sink] of every [IsolateChannel] they use | 26 /// ensure that they always close the [sink] of every [IsolateChannel] they use |
23 /// to avoid leaving dangling [ReceivePort]s. | 27 /// to avoid leaving dangling [ReceivePort]s. |
24 class IsolateChannel<T> extends StreamChannelMixin<T> { | 28 class IsolateChannel<T> extends StreamChannelMixin<T> { |
25 /// The port that produces incoming messages. | 29 final Stream<T> stream; |
30 final StreamSink<T> sink; | |
31 | |
32 /// Connects to a remote channel that was created with | |
33 /// [IsolateChannel.connectSend]. | |
26 /// | 34 /// |
27 /// This is wrapped in a [StreamView] to produce [stream]. | 35 /// These constructors establish a connection using only a single |
28 final ReceivePort _receivePort; | 36 /// [SendPort]/[ReceivePort] pair, as long as each side uses one of the |
37 /// connect constructors. | |
38 /// | |
39 /// The connection protocol is guaranteed to remain compatible across versions | |
40 /// at least until the next major version release. If the protocol is | |
41 /// violated, the resulting channel will emit a single value on its stream and | |
42 /// then close. | |
43 factory IsolateChannel.connectReceive(ReceivePort receivePort) { | |
44 // We can't use a [StreamChannelCompleter] here because we need the return | |
45 // value to be an [IsolateChannel]. | |
46 var streamCompleter = new StreamCompleter<T>(); | |
47 var sinkCompleter = new StreamSinkCompleter<T>(); | |
48 var channel = new IsolateChannel._( | |
49 streamCompleter.stream, sinkCompleter.sink); | |
29 | 50 |
30 /// The port that sends outgoing messages. | 51 // The first message across the ReceivePort should be a SendPort pointing to |
31 final SendPort _sendPort; | 52 // the remote end. If it's not, we'll make the stream emit an error |
53 // complaining. | |
54 var subscription; | |
55 subscription = receivePort.listen((message) { | |
56 if (message is SendPort) { | |
57 streamCompleter.setSourceStream( | |
58 new SubscriptionStream<T>(subscription)); | |
59 sinkCompleter.setDestinationSink( | |
60 new SendPortSink<T>(receivePort, message)); | |
61 return; | |
62 } | |
32 | 63 |
33 Stream<T> get stream => _stream; | 64 streamCompleter.setError( |
34 final Stream<T> _stream; | 65 new StateError('Unexpected Isolate response "$message".'), |
66 new Trace.current()); | |
67 sinkCompleter.setDestinationSink(new NullStreamSink<T>()); | |
68 subscription.cancel(); | |
69 }); | |
35 | 70 |
36 StreamSink<T> get sink => _sink; | 71 return channel; |
37 _SendPortSink<T> _sink; | 72 } |
73 | |
74 /// Connects to a remote channel that was created with | |
75 /// [IsolateChannel.connectReceive]. | |
76 /// | |
77 /// These constructors establish a connection using only a single | |
78 /// [SendPort]/[ReceivePort] pair, as long as each side uses one of the | |
79 /// connect constructors. | |
80 /// | |
81 /// The connection protocol is guaranteed to remain compatible across versions | |
82 /// at least until the next major version release. | |
83 factory IsolateChannel.connectSend(SendPort sendPort) { | |
84 var receivePort = new ReceivePort(); | |
Bob Nystrom
2016/01/27 20:39:47
Go home, Reitveld diff algorithm, you are drunk.
| |
85 sendPort.send(receivePort.sendPort); | |
86 return new IsolateChannel(receivePort, sendPort); | |
87 } | |
38 | 88 |
39 /// Creates a stream channel that receives messages from [receivePort] and | 89 /// Creates a stream channel that receives messages from [receivePort] and |
40 /// sends them over [sendPort]. | 90 /// sends them over [sendPort]. |
41 IsolateChannel(ReceivePort receivePort, this._sendPort) | 91 IsolateChannel(ReceivePort receivePort, SendPort sendPort) |
42 : _receivePort = receivePort, | 92 : stream = new StreamView<T>(receivePort), |
43 _stream = new StreamView<T>(receivePort) { | 93 sink = new SendPortSink<T>(receivePort, sendPort); |
44 _sink = new _SendPortSink<T>(this); | 94 |
45 } | 95 IsolateChannel._(this.stream, this.sink); |
46 } | 96 } |
47 | |
48 /// The sink for [IsolateChannel]. | |
49 /// | |
50 /// [SendPort] doesn't natively implement any sink API, so this adds that API as | |
51 /// a wrapper. Closing this just closes the [ReceivePort]. | |
52 class _SendPortSink<T> implements StreamSink<T> { | |
53 /// The channel that this sink is for. | |
54 final IsolateChannel _channel; | |
55 | |
56 Future get done => _doneCompleter.future; | |
57 final _doneCompleter = new Completer(); | |
58 | |
59 /// Whether [done] has been completed. | |
60 /// | |
61 /// This is distinct from [_closed] because [done] can complete with an error | |
62 /// without the user explicitly calling [close]. | |
63 bool get _isDone => _doneCompleter.isCompleted; | |
64 | |
65 /// Whether the user has called [close]. | |
66 bool _closed = false; | |
67 | |
68 /// Whether we're currently adding a stream with [addStream]. | |
69 bool _inAddStream = false; | |
70 | |
71 _SendPortSink(this._channel); | |
72 | |
73 void add(T data) { | |
74 if (_closed) throw new StateError("Cannot add event after closing."); | |
75 if (_inAddStream) { | |
76 throw new StateError("Cannot add event while adding stream."); | |
77 } | |
78 if (_isDone) return; | |
79 | |
80 _add(data); | |
81 } | |
82 | |
83 /// A helper for [add] that doesn't check for [StateError]s. | |
84 /// | |
85 /// This is called from [addStream], so it shouldn't check [_inAddStream]. | |
86 void _add(T data) { | |
87 _channel._sendPort.send(data); | |
88 } | |
89 | |
90 void addError(error, [StackTrace stackTrace]) { | |
91 if (_closed) throw new StateError("Cannot add event after closing."); | |
92 if (_inAddStream) { | |
93 throw new StateError("Cannot add event while adding stream."); | |
94 } | |
95 | |
96 _close(error, stackTrace); | |
97 } | |
98 | |
99 Future close() { | |
100 if (_inAddStream) { | |
101 throw new StateError("Cannot close sink while adding stream."); | |
102 } | |
103 | |
104 _closed = true; | |
105 return _close(); | |
106 } | |
107 | |
108 /// A helper for [close] that doesn't check for [StateError]s. | |
109 /// | |
110 /// This is called from [addStream], so it shouldn't check [_inAddStream]. It | |
111 /// also forwards [error] and [stackTrace] to [done] if they're passed. | |
112 Future _close([error, StackTrace stackTrace]) { | |
113 if (_isDone) return done; | |
114 | |
115 _channel._receivePort.close(); | |
116 | |
117 if (error != null) { | |
118 _doneCompleter.completeError(error, stackTrace); | |
119 } else { | |
120 _doneCompleter.complete(); | |
121 } | |
122 | |
123 return done; | |
124 } | |
125 | |
126 Future addStream(Stream<T> stream) { | |
127 if (_closed) throw new StateError("Cannot add stream after closing."); | |
128 if (_inAddStream) { | |
129 throw new StateError("Cannot add stream while adding stream."); | |
130 } | |
131 if (_isDone) return; | |
132 | |
133 _inAddStream = true; | |
134 var completer = new Completer.sync(); | |
135 stream.listen(_add, | |
136 onError: (error, stackTrace) { | |
137 _close(error, stackTrace); | |
138 completer.complete(); | |
139 }, | |
140 onDone: completer.complete, | |
141 cancelOnError: true); | |
142 return completer.future.then((_) { | |
143 _inAddStream = false; | |
144 }); | |
145 } | |
146 } | |
OLD | NEW |