OLD | NEW |
(Empty) | |
| 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. |
| 4 |
| 5 part of dart.isolate; |
| 6 |
| 7 /** |
| 8 * The initial [IsolateStream] available by default for this isolate. This |
| 9 * [IsolateStream] is created automatically and it is commonly used to establish |
| 10 * the first communication between isolates (see [streamSpawnFunction] and |
| 11 * [streamSpawnUri]). |
| 12 */ |
| 13 final IsolateStream stream = new IsolateStream._fromOriginalReceivePort(port); |
| 14 |
| 15 /** |
| 16 * A [MessageBox] creates an [IsolateStream], [stream], and an [IsolateSink], |
| 17 * [sink]. |
| 18 * |
| 19 * Any message that is written into the [sink] (independent of the isolate) is |
| 20 * sent to the [stream] where its subscribers can react to the messages. |
| 21 */ |
| 22 class MessageBox { |
| 23 final IsolateStream stream; |
| 24 final IsolateSink sink; |
| 25 |
| 26 MessageBox.oneShot() : this._oneShot(new ReceivePort()); |
| 27 MessageBox._oneShot(ReceivePort receivePort) |
| 28 : stream = new IsolateStream._fromOriginalReceivePortOneShot(receivePort), |
| 29 sink = new IsolateSink._fromPort(receivePort.toSendPort()); |
| 30 |
| 31 MessageBox() : this._(new ReceivePort()); |
| 32 MessageBox._(ReceivePort receivePort) |
| 33 : stream = new IsolateStream._fromOriginalReceivePort(receivePort), |
| 34 sink = new IsolateSink._fromPort(receivePort.toSendPort()); |
| 35 } |
| 36 |
| 37 // Used for mangling. |
| 38 const int _ISOLATE_STREAM_TOKEN = 132421119; |
| 39 |
| 40 class _CloseToken { |
| 41 /// This token is sent from [IsolateSink]s to [IsolateStream]s to ask them to |
| 42 /// close themselves. |
| 43 const _CloseToken(); |
| 44 } |
| 45 |
| 46 /** |
| 47 * [IsolateStream]s, together with [IsolateSink]s, are the only means of |
| 48 * communication between isolates. Each IsolateStream has a corresponding |
| 49 * [IsolateSink]. Any message written into that sink will be delivered to |
| 50 * the stream and then dispatched to the stream's subscribers. |
| 51 */ |
| 52 class IsolateStream extends Stream<dynamic> { |
| 53 bool _isClosed = false; |
| 54 final ReceivePort _port; |
| 55 StreamController _controller = new StreamController(); |
| 56 |
| 57 IsolateStream._fromOriginalReceivePort(this._port) { |
| 58 _port.receive((message, replyTo) { |
| 59 assert(replyTo == null); |
| 60 _add(message); |
| 61 }); |
| 62 } |
| 63 |
| 64 IsolateStream._fromOriginalReceivePortOneShot(this._port) { |
| 65 _port.receive((message, replyTo) { |
| 66 assert(replyTo == null); |
| 67 _add(message); |
| 68 close(); |
| 69 }); |
| 70 } |
| 71 |
| 72 void _add(var message) { |
| 73 message = _unmangleMessage(message); |
| 74 if (identical(message, const _CloseToken())) { |
| 75 close(); |
| 76 } else { |
| 77 _controller.sink.add(message); |
| 78 } |
| 79 } |
| 80 |
| 81 /** |
| 82 * Close the stream from the receiving end. |
| 83 * |
| 84 * Closing an already closed port has no effect. |
| 85 */ |
| 86 void close() { |
| 87 if (!_isClosed) { |
| 88 _isClosed = true; |
| 89 _port.close(); |
| 90 _controller.close(); |
| 91 } |
| 92 } |
| 93 |
| 94 StreamSubscription<T> listen(void onData(T event), |
| 95 { void onError(AsyncError error), |
| 96 void onDone(), |
| 97 bool unsubscribeOnError}) { |
| 98 return _controller.listen(onData, |
| 99 onError: onError, |
| 100 onDone: onDone, |
| 101 unsubscribeOnError: unsubscribeOnError); |
| 102 } |
| 103 |
| 104 dynamic _unmangleMessage(var message) { |
| 105 _IsolateDecoder decoder = new _IsolateDecoder( |
| 106 _ISOLATE_STREAM_TOKEN, |
| 107 (data) { |
| 108 if (data is! List) return data; |
| 109 if (data.length == 2 && data[0] == "Sink" && data[1] is SendPort) { |
| 110 return new IsolateSink._fromPort(data[1]); |
| 111 } |
| 112 if (data.length == 1 && data[0] == "Close") { |
| 113 return const _CloseToken(); |
| 114 } |
| 115 return data; |
| 116 }); |
| 117 return decoder.decode(message); |
| 118 } |
| 119 } |
| 120 |
| 121 /** |
| 122 * [IsolateSink]s represent the feed for [IsolateStream]s. Any message written |
| 123 * to [this] is delivered to its respective [IsolateStream]. [IsolateSink]s are |
| 124 * created by [MessageBox]es. |
| 125 * |
| 126 * [IsolateSink]s can be transmitted to other isolates. |
| 127 */ |
| 128 class IsolateSink extends StreamSink<dynamic> { |
| 129 bool _isClosed = false; |
| 130 final SendPort _port; |
| 131 IsolateSink._fromPort(this._port); |
| 132 |
| 133 /** |
| 134 * Sends an asynchronous [message] to the linked [IsolateStream]. The message |
| 135 * is copied to the receiving isolate. |
| 136 * |
| 137 * The content of [message] can be: primitive values (null, num, bool, double, |
| 138 * String), instances of [IsolateSink]s, and lists and maps whose elements are |
| 139 * any of these. List and maps are also allowed to be cyclic. |
| 140 * |
| 141 * In the special circumstances when two isolates share the same code and are |
| 142 * running in the same process (e.g. isolates created via [spawnFunction]), it |
| 143 * is also possible to send object instances (which would be copied in the |
| 144 * process). This is currently only supported by the dartvm. For now, the |
| 145 * dart2js compiler only supports the restricted messages described above. |
| 146 */ |
| 147 void add(dynamic message) { |
| 148 var mangled = _mangleMessage(message); |
| 149 _port.send(mangled); |
| 150 } |
| 151 |
| 152 void signalError(AsyncError errorEvent) { |
| 153 throw new UnimplementedError("signalError on isolate streams"); |
| 154 } |
| 155 |
| 156 dynamic _mangleMessage(var message) { |
| 157 _IsolateEncoder encoder = new _IsolateEncoder( |
| 158 _ISOLATE_STREAM_TOKEN, |
| 159 (data) { |
| 160 if (data is IsolateSink) return ["Sink", data._port]; |
| 161 if (identical(data, const _CloseToken())) return ["Close"]; |
| 162 return data; |
| 163 }); |
| 164 return encoder.encode(message); |
| 165 } |
| 166 |
| 167 void close() { |
| 168 if (_isClosed) throw new StateError("Sending on closed stream"); |
| 169 add(const _CloseToken()); |
| 170 _isClosed = true; |
| 171 } |
| 172 |
| 173 /** |
| 174 * Tests whether [other] is an [IsolateSink] feeding into the same |
| 175 * [IsolateStream] as this one. |
| 176 */ |
| 177 bool operator==(var other) { |
| 178 return other is IsolateSink && _port == other._port; |
| 179 } |
| 180 |
| 181 int get hashCode => _port.hashCode + 499; |
| 182 } |
| 183 |
| 184 |
| 185 /** |
| 186 * Creates and spawns an isolate that shares the same code as the current |
| 187 * isolate, but that starts from [topLevelFunction]. The [topLevelFunction] |
| 188 * argument must be a static top-level function or a static method that takes no |
| 189 * arguments. |
| 190 * |
| 191 * When any isolate starts (even the main script of the application), a default |
| 192 * [IsolateStream] is created for it. This sink is available from the top-level |
| 193 * getter [stream] defined in this library. |
| 194 * |
| 195 * [spawnFunction] returns an [IsolateSink] feeding into the child isolate's |
| 196 * default stream. |
| 197 * |
| 198 * See comments at the top of this library for more details. |
| 199 */ |
| 200 IsolateSink streamSpawnFunction(void topLevelFunction()) { |
| 201 SendPort sendPort = spawnFunction(topLevelFunction); |
| 202 return new IsolateSink._fromPort(sendPort); |
| 203 } |
| 204 |
| 205 /** |
| 206 * Creates and spawns an isolate whose code is available at [uri]. Like with |
| 207 * [streamSpawnFunction], the child isolate will have a default [IsolateStream], |
| 208 * and a this function returns an [IsolateSink] feeding into it. |
| 209 * |
| 210 * See comments at the top of this library for more details. |
| 211 */ |
| 212 IsolateSink streamSpawnUri(String uri) { |
| 213 SendPort sendPort = spawnUri(uri); |
| 214 return new IsolateSink._fromPort(sendPort); |
| 215 } |
OLD | NEW |