Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(418)

Side by Side Diff: sdk/lib/isolate/isolate_stream.dart

Issue 11308154: Stream isolates. (Closed) Base URL: https://dart.googlecode.com/svn/experimental/lib_v2/dart
Patch Set: Fixes and test. Created 8 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file.
4
5 /**
6 * The initial [IsolateStream] available by default for this isolate. This
7 * [IsolateStream] is created automatically and it is commonly used to establish
8 * the first communication between isolates (see [streamSpawnFunction] and
9 * [streamSpawnUri]).
10 */
11 final IsolateStream stream = new IsolateStream._fromOriginalReceivePort(port);
12
13 /**
14 * A [MessageBox] creates an [IsolateStream], [stream], and an [IsolateSink],
15 * [sink].
16 *
17 * Any message that is written into the [sink] (independent of the isolate) is
18 * sent to the [stream] where its subscribers can react to the messages.
19 */
20 class MessageBox {
21 final IsolateStream stream;
22 final IsolateSink sink;
23
24 MessageBox.oneShot() : this._oneShot(new ReceivePort());
25 MessageBox._oneShot(ReceivePort receivePort)
26 : stream = new IsolateStream._fromOriginalReceivePortOneShot(receivePort),
27 sink = new IsolateSink._fromPort(receivePort.toSendPort());
28
29 MessageBox() : this._(new ReceivePort());
30 MessageBox._(ReceivePort receivePort)
31 : stream = new IsolateStream._fromOriginalReceivePort(receivePort),
32 sink = new IsolateSink._fromPort(receivePort.toSendPort());
33 }
34
35 // Used for mangling.
36 const int _ISOLATE_STREAM_TOKEN = 132421119;
37
38 /**
39 * [IsolateStream]s, together with [IsolateSink]s, are the only means of
40 * communication between isolates. Each IsolateStream has a corresponding
41 * [IsolateSink]. Any message written into that sink will be delivered to
42 * the stream and then dispatched to the stream's subscribers.
43 */
44 class IsolateStream extends Stream<dynamic> {
Anton Muhin 2012/11/25 10:23:16 just for my education, what's the benefit of Strea
floitsch 2012/11/28 14:13:18 Semantically it is the same. We (I?) prefer to wri
45 final ReceivePort _port;
46 StreamController _controller = new StreamController();
47
48 IsolateStream._fromOriginalReceivePort(this._port) {
49 _port.receive((message, replyTo) {
50 assert(replyTo == null);
51 _write(message);
52 });
53 }
54
55 IsolateStream._fromOriginalReceivePortOneShot(this._port) {
56 _port.receive((message, replyTo) {
57 assert(replyTo == null);
58 _write(message);
59 close();
60 });
61 }
62
63 void _write(var message) {
64 message = _unmangleMessage(message);
65 _controller.sink.write(message);
66 }
67
68 void close() {
69 _controller.close();
70 _port.close();
71 }
72
73 StreamSubscription<T> subscribe({void onData(T event),
74 void onError(StreamError error),
75 void onDone(),
76 bool unsubscribeOnError}) {
77 return _controller.subscribe(onData: onData,
78 onError: onError,
79 onDone: onDone,
80 unsubscribeOnError: unsubscribeOnError);
81 }
82
83 dynamic _unmangleMessage(var message) {
Anton Muhin 2012/11/25 10:23:16 why dynamic return type, not var or just dropped
floitsch 2012/11/28 14:13:18 ditto. To show that I thought about it.
84 _IsolateDecoder decoder = new _IsolateDecoder(
85 _ISOLATE_STREAM_TOKEN,
86 (data) {
87 if (data is! List) return data;
88 if (data.length != 2) return data;
89 if (data[0] != "Sink" || data[1] is! SendPort) return data;
90 return new IsolateSink._fromPort(data[1]);
91 });
92 return decoder.decode(message);
93 }
94 }
95
96 /**
97 * [IsolateSink]s represent the feed for [IsolateStream]s. Any message written
98 * to [this] is delivered to its respective [IsolateStream]. [IsolateSink]s are
99 * created by [MessageBox]es.
100 *
101 * [IsolateSink]s can be transmitted to other isolates.
102 */
103 class IsolateSink extends StreamSink<dynamic> {
104 final SendPort _port;
105 IsolateSink._fromPort(this._port);
106
107 /**
108 * Sends an asynchronous [message] to the linked [IsolateStream]. The message
109 * is copied to the receiving isolate.
110 *
111 * The content of [message] can be: primitive values (null, num, bool, double,
112 * String), instances of [IsolateSink]s, and lists and maps whose elements are
113 * any of these. List and maps are also allowed to be cyclic.
114 *
115 * In the special circumstances when two isolates share the same code and are
116 * running in the same process (e.g. isolates created via [spawnFunction]), it
117 * is also possible to send object instances (which would be copied in the
118 * process). This is currently only supported by the dartvm. For now, the
119 * dart2js compiler only supports the restricted messages described above.
120 */
121 void write(dynamic message) {
122 var mangled = _mangleMessage(message);
123 _port.send(mangled);
124 }
125
126 void signalError(StreamError errorEvent) {
127 throw new UnimplementedError("signalError on isolate streams");
128 }
129
130 dynamic _mangleMessage(var message) {
131 _IsolateEncoder encoder = new _IsolateEncoder(
132 _ISOLATE_STREAM_TOKEN,
133 (data) => (data is IsolateSink) ? ["Sink", data._port] : data);
Anton Muhin 2012/11/25 10:23:16 I am sure you took care of it, but what happens if
floitsch 2012/11/28 14:13:18 The Encoder/Decoder takes care of this. If the man
134 return encoder.encode(message);
135 }
136
137 void close() {
138 // TODO(floitsch): what should happen when an IsolateSink is closed?
139 }
140
141 /**
142 * Tests whether [other] is an [IsolateSink] feeding into the same
143 * [IsolateStream] as this one.
144 */
145 bool operator==(var other) {
146 return other is IsolateSink && _port == other._port;
147 }
148
149 int get hashCode => _port.hashCode + 499;
150 }
151
152
153 /**
154 * Creates and spawns an isolate that shares the same code as the current
155 * isolate, but that starts from [topLevelFunction]. The [topLevelFunction]
156 * argument must be a static top-level function or a static method that takes no
157 * arguments.
158 *
159 * When any isolate starts (even the main script of the application), a default
160 * [IsolateStream] is created for it. This sink is available from the top-level
161 * getter [stream] defined in this library.
162 *
163 * [spawnFunction] returns an [IsolateSink] feeding into the child isolate's
164 * default stream.
165 *
166 * See comments at the top of this library for more details.
167 */
168 IsolateSink streamSpawnFunction(void topLevelFunction()) {
169 SendPort sendPort = spawnFunction(topLevelFunction);
170 return new IsolateSink._fromPort(sendPort);
171 }
172
173 /**
174 * Creates and spawns an isolate whose code is available at [uri]. Like with
175 * [streamSpawnFunction], the child isolate will have a default [IsolateStream],
176 * and a this function returns an [IsolateSink] feeding into it.
177 *
178 * See comments at the top of this library for more details.
179 */
180 IsolateSink streamSpawnUri(String uri) {
181 SendPort sendPort = spawnUri(uri);
182 return new IsolateSink._fromPort(sendPort);
183 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698