Chromium Code Reviews| OLD | NEW |
|---|---|
| (Empty) | |
| 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | |
| 2 // for details. All rights reserved. Use of this source code is governed by a | |
| 3 // BSD-style license that can be found in the LICENSE file. | |
| 4 | |
| 5 /** | |
| 6 * The initial [IsolateStream] available by default for this isolate. This | |
| 7 * [IsolateStream] is created automatically and it is commonly used to establish | |
| 8 * the first communication between isolates (see [streamSpawnFunction] and | |
| 9 * [streamSpawnUri]). | |
| 10 */ | |
| 11 final IsolateStream stream = new IsolateStream._fromOriginalReceivePort(port); | |
| 12 | |
| 13 /** | |
| 14 * A [MessageBox] creates an [IsolateStream], [stream], and an [IsolateSink], | |
| 15 * [sink]. | |
| 16 * | |
| 17 * Any message that is written into the [sink] (independent of the isolate) is | |
| 18 * sent to the [stream] where its subscribers can react to the messages. | |
| 19 */ | |
| 20 class MessageBox { | |
| 21 final IsolateStream stream; | |
| 22 final IsolateSink sink; | |
| 23 | |
| 24 MessageBox.oneShot() : this._oneShot(new ReceivePort()); | |
| 25 MessageBox._oneShot(ReceivePort receivePort) | |
| 26 : stream = new IsolateStream._fromOriginalReceivePortOneShot(receivePort), | |
| 27 sink = new IsolateSink._fromPort(receivePort.toSendPort()); | |
| 28 | |
| 29 MessageBox() : this._(new ReceivePort()); | |
| 30 MessageBox._(ReceivePort receivePort) | |
| 31 : stream = new IsolateStream._fromOriginalReceivePort(receivePort), | |
| 32 sink = new IsolateSink._fromPort(receivePort.toSendPort()); | |
| 33 } | |
| 34 | |
| 35 // Used for mangling. | |
| 36 const int _ISOLATE_STREAM_TOKEN = 132421119; | |
| 37 | |
| 38 /** | |
| 39 * [IsolateStream]s, together with [IsolateSink]s, are the only means of | |
| 40 * communication between isolates. Each IsolateStream has a corresponding | |
| 41 * [IsolateSink]. Any message written into that sink will be delivered to | |
| 42 * the stream and then dispatched to the stream's subscribers. | |
| 43 */ | |
| 44 class IsolateStream extends Stream<dynamic> { | |
|
Anton Muhin
2012/11/25 10:23:16
just for my education, what's the benefit of Strea
floitsch
2012/11/28 14:13:18
Semantically it is the same. We (I?) prefer to wri
| |
| 45 final ReceivePort _port; | |
| 46 StreamController _controller = new StreamController(); | |
| 47 | |
| 48 IsolateStream._fromOriginalReceivePort(this._port) { | |
| 49 _port.receive((message, replyTo) { | |
| 50 assert(replyTo == null); | |
| 51 _write(message); | |
| 52 }); | |
| 53 } | |
| 54 | |
| 55 IsolateStream._fromOriginalReceivePortOneShot(this._port) { | |
| 56 _port.receive((message, replyTo) { | |
| 57 assert(replyTo == null); | |
| 58 _write(message); | |
| 59 close(); | |
| 60 }); | |
| 61 } | |
| 62 | |
| 63 void _write(var message) { | |
| 64 message = _unmangleMessage(message); | |
| 65 _controller.sink.write(message); | |
| 66 } | |
| 67 | |
| 68 void close() { | |
| 69 _controller.close(); | |
| 70 _port.close(); | |
| 71 } | |
| 72 | |
| 73 StreamSubscription<T> subscribe({void onData(T event), | |
| 74 void onError(StreamError error), | |
| 75 void onDone(), | |
| 76 bool unsubscribeOnError}) { | |
| 77 return _controller.subscribe(onData: onData, | |
| 78 onError: onError, | |
| 79 onDone: onDone, | |
| 80 unsubscribeOnError: unsubscribeOnError); | |
| 81 } | |
| 82 | |
| 83 dynamic _unmangleMessage(var message) { | |
|
Anton Muhin
2012/11/25 10:23:16
why dynamic return type, not var or just dropped
floitsch
2012/11/28 14:13:18
ditto. To show that I thought about it.
| |
| 84 _IsolateDecoder decoder = new _IsolateDecoder( | |
| 85 _ISOLATE_STREAM_TOKEN, | |
| 86 (data) { | |
| 87 if (data is! List) return data; | |
| 88 if (data.length != 2) return data; | |
| 89 if (data[0] != "Sink" || data[1] is! SendPort) return data; | |
| 90 return new IsolateSink._fromPort(data[1]); | |
| 91 }); | |
| 92 return decoder.decode(message); | |
| 93 } | |
| 94 } | |
| 95 | |
| 96 /** | |
| 97 * [IsolateSink]s represent the feed for [IsolateStream]s. Any message written | |
| 98 * to [this] is delivered to its respective [IsolateStream]. [IsolateSink]s are | |
| 99 * created by [MessageBox]es. | |
| 100 * | |
| 101 * [IsolateSink]s can be transmitted to other isolates. | |
| 102 */ | |
| 103 class IsolateSink extends StreamSink<dynamic> { | |
| 104 final SendPort _port; | |
| 105 IsolateSink._fromPort(this._port); | |
| 106 | |
| 107 /** | |
| 108 * Sends an asynchronous [message] to the linked [IsolateStream]. The message | |
| 109 * is copied to the receiving isolate. | |
| 110 * | |
| 111 * The content of [message] can be: primitive values (null, num, bool, double, | |
| 112 * String), instances of [IsolateSink]s, and lists and maps whose elements are | |
| 113 * any of these. List and maps are also allowed to be cyclic. | |
| 114 * | |
| 115 * In the special circumstances when two isolates share the same code and are | |
| 116 * running in the same process (e.g. isolates created via [spawnFunction]), it | |
| 117 * is also possible to send object instances (which would be copied in the | |
| 118 * process). This is currently only supported by the dartvm. For now, the | |
| 119 * dart2js compiler only supports the restricted messages described above. | |
| 120 */ | |
| 121 void write(dynamic message) { | |
| 122 var mangled = _mangleMessage(message); | |
| 123 _port.send(mangled); | |
| 124 } | |
| 125 | |
| 126 void signalError(StreamError errorEvent) { | |
| 127 throw new UnimplementedError("signalError on isolate streams"); | |
| 128 } | |
| 129 | |
| 130 dynamic _mangleMessage(var message) { | |
| 131 _IsolateEncoder encoder = new _IsolateEncoder( | |
| 132 _ISOLATE_STREAM_TOKEN, | |
| 133 (data) => (data is IsolateSink) ? ["Sink", data._port] : data); | |
|
Anton Muhin
2012/11/25 10:23:16
I am sure you took care of it, but what happens if
floitsch
2012/11/28 14:13:18
The Encoder/Decoder takes care of this. If the man
| |
| 134 return encoder.encode(message); | |
| 135 } | |
| 136 | |
| 137 void close() { | |
| 138 // TODO(floitsch): what should happen when an IsolateSink is closed? | |
| 139 } | |
| 140 | |
| 141 /** | |
| 142 * Tests whether [other] is an [IsolateSink] feeding into the same | |
| 143 * [IsolateStream] as this one. | |
| 144 */ | |
| 145 bool operator==(var other) { | |
| 146 return other is IsolateSink && _port == other._port; | |
| 147 } | |
| 148 | |
| 149 int get hashCode => _port.hashCode + 499; | |
| 150 } | |
| 151 | |
| 152 | |
| 153 /** | |
| 154 * Creates and spawns an isolate that shares the same code as the current | |
| 155 * isolate, but that starts from [topLevelFunction]. The [topLevelFunction] | |
| 156 * argument must be a static top-level function or a static method that takes no | |
| 157 * arguments. | |
| 158 * | |
| 159 * When any isolate starts (even the main script of the application), a default | |
| 160 * [IsolateStream] is created for it. This sink is available from the top-level | |
| 161 * getter [stream] defined in this library. | |
| 162 * | |
| 163 * [spawnFunction] returns an [IsolateSink] feeding into the child isolate's | |
| 164 * default stream. | |
| 165 * | |
| 166 * See comments at the top of this library for more details. | |
| 167 */ | |
| 168 IsolateSink streamSpawnFunction(void topLevelFunction()) { | |
| 169 SendPort sendPort = spawnFunction(topLevelFunction); | |
| 170 return new IsolateSink._fromPort(sendPort); | |
| 171 } | |
| 172 | |
| 173 /** | |
| 174 * Creates and spawns an isolate whose code is available at [uri]. Like with | |
| 175 * [streamSpawnFunction], the child isolate will have a default [IsolateStream], | |
| 176 * and a this function returns an [IsolateSink] feeding into it. | |
| 177 * | |
| 178 * See comments at the top of this library for more details. | |
| 179 */ | |
| 180 IsolateSink streamSpawnUri(String uri) { | |
| 181 SendPort sendPort = spawnUri(uri); | |
| 182 return new IsolateSink._fromPort(sendPort); | |
| 183 } | |
| OLD | NEW |