Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(80)

Side by Side Diff: sdk/lib/isolate/isolate_stream.dart

Issue 11308154: Stream isolates. (Closed) Base URL: https://dart.googlecode.com/svn/experimental/lib_v2/dart
Patch Set: Removed hack and improved doc. Created 8 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file.
4
5 /**
6 * The initial [IsolateStream] available by default for this isolate. This
7 * [IsolateStream] is created automatically and it is commonly used to establish
8 * the first communication between isolates (see [streamSpawnFunction] and
9 * [streamSpawnUri]).
10 */
11 final IsolateStream stream = new IsolateStream._fromOriginalReceivePort(port);
Lasse Reichstein Nielsen 2012/11/22 10:23:15 Is 'port' defined in base.dart?
floitsch 2012/11/22 11:58:04 yes.
12
13 /**
14 * A [MessageBox] creates an [IsolateStream] [stream] and an [IsolateSink]
Lasse Reichstein Nielsen 2012/11/22 10:23:15 Commas around [stream] and [sink].
floitsch 2012/11/22 11:58:04 Done.
15 * [sink].
16 *
17 * Any message that is written into the [sink] (independent of the isolate) is
18 * sent to the [stream] where its subscribers can react to the messages.
19 */
20 class MessageBox {
21 final IsolateStream stream;
22 final IsolateSink sink;
23
24 MessageBox.oneShot() : this._oneShot(new ReceivePort());
25 MessageBox._oneShot(ReceivePort receivePort)
26 : stream = new IsolateStream._fromOriginalReceivePortOneShot(receivePort),
27 sink = new IsolateSink._fromPort(receivePort.toSendPort());
28
29 MessageBox() : this._(new ReceivePort());
30 MessageBox._(ReceivePort receivePort)
31 : stream = new IsolateStream._fromOriginalReceivePort(receivePort),
32 sink = new IsolateSink._fromPort(receivePort.toSendPort());
33 }
34
35 // Used for mangling.
36 const int _ISOLATE_STREAM_TOKEN = 132421112;
Lasse Reichstein Nielsen 2012/11/22 10:23:15 Pah, doesn't look random enough :P
floitsch 2012/11/22 11:58:04 Changed to 132421119. ;)
37
38 /**
39 * [IsolateStream]s, together with [IsolateSink]s, are the only means of
40 * communication between isolates. Each IsolateStream has a corresponding
41 * [IsolateSink]. Any message written into that sink will be delivered to
42 * the stream and then dispatched to the stream's subscribers.
43 */
44 class IsolateStream extends Stream<dynamic> {
45 final ReceivePort _port;
46 StreamController _controller = new StreamController();
Lasse Reichstein Nielsen 2012/11/22 10:23:15 The base StreamController doesn't handle 'pause' a
floitsch 2012/11/22 11:58:04 It is (I think), but iirc when I started this CL t
47
48 IsolateStream._fromOriginalReceivePort(this._port) {
49 _port.receive((message, replyTo) {
50 assert(replyTo == null);
51 _write(message);
52 });
53 }
54
55 IsolateStream._fromOriginalReceivePortOneShot(this._port) {
56 _port.receive((message, replyTo) {
57 assert(replyTo == null);
58 _write(message);
59 close();
60 });
61 }
62
63 void _write(var message) {
64 message = _unmangleMessage(message);
65 _controller.sink.write(message);
66 }
67
68 void close() {
69 _controller.close()
70 _port.close();
71 }
72
73 StreamSubscription<T> subscribe({void onData(T event),
74 void onError(StreamError error),
75 void onDone(),
76 bool unsubscribeOnError}) {
77 return _controller.subscribe(onData: onData,
78 onError: onError,
79 onDone: onDone,
80 unsubscribeOnError: unsubscribeOnError);
81 }
82
83 dynamic _unmangleMessage(var message) {
84 _IsolateDecoder decoder = new _IsolateDecoder(
85 _ISOLATE_STREAM_TOKEN,
86 (data) {
87 if (data is! List) return data;
88 if (data.length != 2) return data;
89 if (data[0] != "Sink" || data[1] is! SendPort) return data;
90 return new IsolateSink._fromPort(data[1]);
91 });
92 return decoder.decode(message);
93 }
94 }
95
96 /**
97 * [IsolateSink]s represent the feed for [IsolateStream]s. Any message written
98 * to [this] is delivered to its respective [IsolateStream]. [IsolateSink]s are
99 * created by [MessageBox]es.
100 *
101 * [IsolateSink]s can be transmitted to other isolates.
102 */
103 class IsolateSink extends StreamSink<dynamic> {
104 final SendPort _port;
105 IsolateSink._fromPort(this._port);
106
107 /**
108 * Sends an asynchronous [message] to the linked [IsolateStream]. The message
109 * is copied to the receiving isolate.
110 *
111 * The content of [message] can be: primitive values (null, num, bool, double,
112 * String), instances of [IsolateSink]s, and lists and maps whose elements are
113 * any of these. List and maps are also allowed to be cyclic.
114 *
115 * In the special circumstances when two isolates share the same code and are
116 * running in the same process (e.g. isolates created via [spawnFunction]), it
117 * is also possible to send object instances (which would be copied in the
118 * process). This is currently only supported by the dartvm. For now, the
119 * dart2js compiler only supports the restricted messages described above.
120 */
121 void write(dynamic message) {
122 _port.send(_mangleMessage(message));
123 }
124
125 void signalError(StreamError errorEvent) {
126 throw new UnimplementedError("signalError on isolate streams");
127 }
128
129 dynamic _mangleMessage(var message) {
130 _IsolateEncoder encoder = new _IsolateEncoder(
131 _ISOLATE_STREAM_TOKEN,
132 (data) => (data is IsolateSink) ? ["Sink", data._port] : data);
133 return encoder.encode(message);
134 }
135
136 void close() {
137 // TODO(floitsch): what should happen when an IsolateSink is closed?
Lasse Reichstein Nielsen 2012/11/22 10:23:15 You are sending the 'done' event to the other end,
floitsch 2012/11/22 11:58:04 We will discuss this in person.
138 }
139
140 /**
141 * Tests whether [other] is an [IsolateSink] feeding into the same
142 * [IsolateStream] as this one.
143 */
144 bool operator==(var other) {
145 return other is IsolateSink && _port == other._port;
146 }
147
148 int get hashCode => _port.hashCode + 499;
149 }
150
151
152 /**
153 * Creates and spawns an isolate that shares the same code as the current
154 * isolate, but that starts from [topLevelFunction]. The [topLevelFunction]
155 * argument must be a static top-level function or a static method that takes no
156 * arguments. It is illegal to pass a function closure.
Lasse Reichstein Nielsen 2012/11/22 10:23:15 "closure" isn't defined. The previous two cases de
floitsch 2012/11/22 11:58:04 Done.
157 *
158 * When any isolate starts (even the main script of the application), a default
159 * [IsolateStream] is created for it. This sink is available from the top-level
160 * getter [stream] defined in this library.
161 *
162 * [spawnFunction] returns an [IsolateSink] feeding into the child isolate's
163 * default stream.
164 *
165 * See comments at the top of this library for more details.
166 */
167 IsolateSink streamSpawnFunction(void topLevelFunction()) {
168 SendPort sendPort = spawnFunction(topLevelFunction);
169 return new IsolateSink(sendPort);
170 }
171
172 /**
173 * Creates and spawns an isolate whose code is available at [uri]. Like with
174 * [streamSpawnFunction], the child isolate will have a default [IsolateStream],
175 * and a this function returns an [IsolateSink] feeding into it.
176 *
177 * See comments at the top of this library for more details.
178 */
179 IsolateSink streamSpawnUri(String uri) {
180 SendPort sendPort = spawnUri(uri);
181 return new IsolateSink(sendPort);
182 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698