Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(6)

Side by Side Diff: runtime/lib/isolate_patch.dart

Issue 27215002: Very simple version of Isolates. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: More comments. Created 7 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 class _CloseToken {
6 /// This token is sent from [IsolateSink]s to [IsolateStream]s to ask them to
7 /// close themselves.
8 const _CloseToken();
9 }
10
11 patch bool _isCloseToken(var object) {
12 // TODO(floitsch): can we compare against const _CloseToken()?
13 return object is _CloseToken;
14 }
15
16 patch class MessageBox {
17 /* patch */ MessageBox.oneShot() : this._oneShot(new ReceivePort());
18 MessageBox._oneShot(ReceivePort receivePort)
19 : stream = new IsolateStream._fromOriginalReceivePortOneShot(receivePort),
20 sink = new _IsolateSink._fromPort(receivePort.toSendPort());
21
22 /* patch */ MessageBox() : this._(new ReceivePort());
23 MessageBox._(ReceivePort receivePort)
24 : stream = new IsolateStream._fromOriginalReceivePort(receivePort),
25 sink = new _IsolateSink._fromPort(receivePort.toSendPort());
26 }
27
28 class _IsolateSink implements IsolateSink {
29 bool _isClosed = false;
30 final SendPort _port;
31 _IsolateSink._fromPort(this._port);
32
33 void add(dynamic message) {
34 _port.send(message);
35 }
36
37 void addError(Object errorEvent) {
38 throw new UnimplementedError("addError on isolate streams");
39 }
40
41 void close() {
42 if (_isClosed) return;
43 add(const _CloseToken());
44 _isClosed = true;
45 }
46
47 bool operator==(var other) {
48 return other is IsolateSink && _port == other._port;
49 }
50
51 int get hashCode => _port.hashCode + 499;
52 }
53
54 patch IsolateSink streamSpawnFunction(
55 void topLevelFunction(),
56 [bool unhandledExceptionCallback(IsolateUnhandledException e)]) {
57 SendPort sendPort = spawnFunction(topLevelFunction,
58 unhandledExceptionCallback);
59 return new _IsolateSink._fromPort(sendPort);
60 }
61
62 patch class ReceivePort { 5 patch class ReceivePort {
63 /* patch */ factory ReceivePort() { 6 /* patch */ factory ReceivePort() {
64 return new _ReceivePortImpl(); 7 _ReceivePortImpl result = new _ReceivePortImpl();
8 // TODO(floitsch): remove the hack to close receive-ports on cancel.
9 result._controller = new StreamController(onCancel: result._close);
10 return result;
65 } 11 }
66 } 12 }
67 13
68 class _ReceivePortImpl implements ReceivePort { 14 class _ReceivePortImpl extends Stream implements ReceivePort {
69 factory _ReceivePortImpl() native "ReceivePortImpl_factory"; 15 factory _ReceivePortImpl() native "ReceivePortImpl_factory";
70 16
17 // Deprecated.
71 receive(void onMessage(var message, SendPort replyTo)) { 18 receive(void onMessage(var message, SendPort replyTo)) {
72 _onMessage = onMessage; 19 _onMessage = onMessage;
73 } 20 }
74 21
22 // Deprecated.
75 close() { 23 close() {
24 _close();
25 }
26
27 _close() {
76 _portMap.remove(_id); 28 _portMap.remove(_id);
77 _closeInternal(_id); 29 _closeInternal(_id);
78 } 30 }
79 31
80 SendPort toSendPort() { 32 SendPort get sendPort {
81 return new _SendPortImpl(_id); 33 return new _SendPortImpl(_id);
82 } 34 }
83 35
36 StreamSubscription listen(void onData(var message),
37 { Function onError,
38 void onDone(),
39 bool cancelOnError }) {
40 return _controller.stream.listen(onData);
41 }
42
84 /**** Internal implementation details ****/ 43 /**** Internal implementation details ****/
85 // Called from the VM to create a new ReceivePort instance. 44 // Called from the VM to create a new ReceivePort instance.
86 static _ReceivePortImpl _get_or_create(int id) { 45 static _ReceivePortImpl _get_or_create(int id) {
87 if (_portMap != null) { 46 if (_portMap != null) {
88 _ReceivePortImpl port = _portMap[id]; 47 _ReceivePortImpl port = _portMap[id];
89 if (port != null) { 48 if (port != null) {
90 return port; 49 return port;
91 } 50 }
92 } 51 }
93 return new _ReceivePortImpl._internal(id); 52 return new _ReceivePortImpl._internal(id);
94 } 53 }
95 54
96 _ReceivePortImpl._internal(int id) : _id = id { 55 _ReceivePortImpl._internal(int id) : _id = id {
97 if (_portMap == null) { 56 if (_portMap == null) {
98 _portMap = new Map(); 57 _portMap = new Map();
99 } 58 }
100 _portMap[id] = this; 59 _portMap[id] = this;
101 } 60 }
102 61
103 // Called from the VM to retrieve the ReceivePort for a message. 62 // Called from the VM to retrieve the ReceivePort for a message.
104 static _ReceivePortImpl _lookupReceivePort(int id) { 63 static _ReceivePortImpl _lookupReceivePort(int id) {
105 assert(_portMap != null); 64 assert(_portMap != null);
106 return _portMap[id]; 65 return _portMap[id];
107 } 66 }
108 67
109 // Called from the VM to dispatch to the handler. 68 // Called from the VM to dispatch to the handler.
110 static void _handleMessage(_ReceivePortImpl port, int replyId, var message) { 69 static void _handleMessage(_ReceivePortImpl port, int replyId, var message) {
111 assert(port != null); 70 assert(port != null);
112 SendPort replyTo = (replyId == 0) ? null : new _SendPortImpl(replyId); 71 SendPort replyTo = (replyId == 0) ? null : new _SendPortImpl(replyId);
113 (port._onMessage)(message, replyTo); 72 if (port._onMessage != null) {
73 (port._onMessage)(message, replyTo);
74 } else {
75 port._controller.add(message);
76 }
114 } 77 }
115 78
116 // Call into the VM to close the VM maintained mappings. 79 // Call into the VM to close the VM maintained mappings.
117 static _closeInternal(int id) native "ReceivePortImpl_closeInternal"; 80 static _closeInternal(int id) native "ReceivePortImpl_closeInternal";
118 81
119 final int _id; 82 final int _id;
83 StreamController _controller;
120 var _onMessage; 84 var _onMessage;
121 85
122 // id to ReceivePort mapping. 86 // id to ReceivePort mapping.
123 static Map _portMap; 87 static Map _portMap;
124 } 88 }
125 89
126 90
127 class _SendPortImpl implements SendPort { 91 class _SendPortImpl implements SendPort {
128 /*--- public interface ---*/ 92 /*--- public interface ---*/
129 void send(var message, [SendPort replyTo = null]) { 93 void send(var message, [SendPort replyTo = null]) {
130 this._sendNow(message, replyTo); 94 this._sendNow(message, replyTo);
131 } 95 }
132 96
133 void _sendNow(var message, SendPort replyTo) { 97 void _sendNow(var message, SendPort replyTo) {
134 int replyId = (replyTo == null) ? 0 : replyTo._id; 98 int replyId = (replyTo == null) ? 0 : replyTo._id;
135 _sendInternal(_id, replyId, message); 99 _sendInternal(_id, replyId, message);
136 } 100 }
137 101
102 /// Deprecated.
138 Future call(var message) { 103 Future call(var message) {
139 final completer = new Completer.sync(); 104 final completer = new Completer.sync();
140 final port = new _ReceivePortImpl(); 105 final port = new _ReceivePortImpl();
141 send(message, port.toSendPort()); 106 send(message, port.toSendPort());
142 port.receive((value, ignoreReplyTo) { 107 port.receive((value, ignoreReplyTo) {
143 port.close(); 108 port.close();
144 if (value is Exception) { 109 if (value is Exception) {
145 completer.completeError(value); 110 completer.completeError(value);
146 } else { 111 } else {
147 completer.complete(value); 112 completer.complete(value);
(...skipping 24 matching lines...) Expand all
172 static _sendInternal(int sendId, int replyId, var message) 137 static _sendInternal(int sendId, int replyId, var message)
173 native "SendPortImpl_sendInternal_"; 138 native "SendPortImpl_sendInternal_";
174 139
175 final int _id; 140 final int _id;
176 } 141 }
177 142
178 _getPortInternal() native "isolate_getPortInternal"; 143 _getPortInternal() native "isolate_getPortInternal";
179 144
180 ReceivePort _portInternal; 145 ReceivePort _portInternal;
181 146
147 typedef _ZeroArgFunction();
148
149 /**
150 * Takes the real entry point as argument and invokes it with the initial
151 * message.
152 *
153 * The initial message is (currently) received through the global port variable.
154 */
155 void _startIsolate(Function entryPoint) {
156 bool first = true;
157 _Isolate.port.receive((message, replyTo) {
158 assert(first);
159 first = false;
160 var initialMessage = message[0];
161 var reply = message[1];
162 reply.send("started");
163 if (entryPoint is _ZeroArgFunction) {
164 entryPoint();
165 } else {
166 entryPoint(initialMessage);
167 }
168 });
169 }
170
171 patch class Isolate {
172 /* patch */ static Future<Isolate> spawn(
173 void entryPoint(message), var message, { bool startPaused: false }) {
174 if (startPaused) throw new UnimplementedError("spawn paused isolate");
175 return new Future<Isolate>.sync(() {
176 // The VM will invoke [_startIsolate] with entryPoint as argument.
177 SendPort controlPort = _Isolate._spawnFunction(entryPoint);
178 ReceivePort readyPort = new ReceivePort();
179 controlPort.send([message, readyPort.sendPort]);
180 Completer completer = new Completer<Isolate>();
181 readyPort.receive((ignored1, ignored2) {
182 readyPort.close();
183 completer.complete(new Isolate._fromControlPort(controlPort));
184 });
185 return completer.future;
186 });
187 }
188
189 /* patch */ static Future<Isolate> spawnUri(Uri uri, var message) {
190 throw new UnimplementedError("Isolate.spawnUri");
191 }
192 }
193
182 patch class _Isolate { 194 patch class _Isolate {
183 /* patch */ static ReceivePort get port { 195 /* patch */ static ReceivePort get port {
184 if (_portInternal == null) { 196 if (_portInternal == null) {
185 _portInternal = _getPortInternal(); 197 _portInternal = _getPortInternal();
186 } 198 }
187 return _portInternal; 199 return _portInternal;
188 } 200 }
189 201
190 /* patch */ static SendPort spawnFunction(void topLevelFunction(), 202 static SendPort _spawnFunction(Function topLevelFunction)
191 [bool unhandledExceptionCallback(IsolateUnhandledException e)])
192 native "isolate_spawnFunction"; 203 native "isolate_spawnFunction";
193 204
194 /* patch */ static SendPort spawnUri(String uri) native "isolate_spawnUri"; 205 /* patch */ static SendPort spawnUri(String uri) native "isolate_spawnUri";
195 } 206 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698