Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(362)

Side by Side Diff: sdk/lib/isolate/isolate_stream.dart

Issue 20703003: Proposal for new Isolate library. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Using capabilities. Created 7 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« sdk/lib/isolate/isolate.dart ('K') | « sdk/lib/isolate/isolate.dart ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file.
4
5 part of dart.isolate;
6
7 /**
8 * The initial IsolateStream available by default for this isolate.
9 *
10 * This IsolateStream is created automatically and is commonly used
11 * to establish the first communication between isolates.
12 * (See [streamSpawnFunction].)
13 */
14 final IsolateStream stream = new IsolateStream._fromOriginalReceivePort(port);
15
16 /**
17 * The creator of the [IsolateStream] and [IsolateSink]
18 * that allow an isolate to exchange messages with other isolates.
19 *
20 * Any message that is written into the [sink] (independent of the isolate) is
21 * sent to the [stream] where its subscribers can react to the messages.
22 */
23 class MessageBox {
24 final IsolateStream stream;
25 final IsolateSink sink;
26
27 external MessageBox.oneShot();
28 external MessageBox();
29 }
30
31 external bool _isCloseToken(var object);
32
33 /**
34 * Together with [IsolateSink], the only means of
35 * communication between isolates.
36 *
37 * Each IsolateStream has a corresponding
38 * [IsolateSink]. Any message written into that sink will be delivered to
39 * the stream and then dispatched to the stream's subscribers.
40 */
41 class IsolateStream extends Stream<dynamic> {
42 bool _isClosed = false;
43 final ReceivePort _port;
44 StreamController _controller = new StreamController(sync: true);
45
46 IsolateStream._fromOriginalReceivePort(this._port) {
47 _port.receive((message, replyTo) {
48 assert(replyTo == null);
49 _add(message);
50 });
51 }
52
53 IsolateStream._fromOriginalReceivePortOneShot(this._port) {
54 _port.receive((message, replyTo) {
55 assert(replyTo == null);
56 _add(message);
57 close();
58 });
59 }
60
61 void _add(var message) {
62 if (_isCloseToken(message)) {
63 close();
64 } else {
65 _controller.sink.add(message);
66 }
67 }
68
69 /**
70 * Closes the stream from the receiving end.
71 *
72 * Closing an already closed port has no effect.
73 */
74 void close() {
75 if (!_isClosed) {
76 _isClosed = true;
77 _port.close();
78 _controller.close();
79 }
80 }
81
82 StreamSubscription listen(void onData(event),
83 { void onError(error),
84 void onDone(),
85 bool cancelOnError}) {
86 return _controller.stream.listen(onData,
87 onError: onError,
88 onDone: onDone,
89 cancelOnError: cancelOnError);
90 }
91 }
92
93 /**
94 * The feed for an [IsolateStream].
95 *
96 * Any message written to [this] is delivered
97 * to its respective [IsolateStream].
98 * [IsolateSink]s are created by [MessageBox]es.
99 *
100 * [IsolateSink]s can be transmitted to other isolates.
101 */
102 abstract class IsolateSink extends EventSink<dynamic> {
103 // TODO(floitsch): Actually it should be a StreamSink (being able to flow-
104 // control).
105
106 /**
107 * Sends an asynchronous [message] to the linked [IsolateStream];
108 * the message is copied to the receiving isolate.
109 *
110 * The content of [message] can be: primitive values (null, num, bool, double,
111 * String), instances of [IsolateSink]s, and lists and maps whose elements are
112 * any of these. List and maps are also allowed to be cyclic.
113 *
114 * In the special circumstances when two isolates share the same code and are
115 * running in the same process (e.g. isolates created via [spawnFunction]), it
116 * is also possible to send object instances (which would be copied in the
117 * process). This is currently only supported by the dartvm. For now, the
118 * dart2js compiler only supports the restricted messages described above.
119 */
120 void add(dynamic message);
121
122 void addError(errorEvent);
123
124 /** Closing multiple times is allowed. */
125 void close();
126
127 /**
128 * Tests whether [other] is an [IsolateSink] feeding into the same
129 * [IsolateStream] as this one.
130 */
131 bool operator==(var other);
132 }
133
134
135 /**
136 * Creates and spawns an isolate that shares the same code as the current
137 * isolate, but that starts from the specified function.
138 *
139 * The [topLevelFunction] argument must be
140 * a static top-level function or a static method that takes no arguments.
141 *
142 * When any isolate starts (even the main script of the application), a default
143 * [IsolateStream] is created for it. This sink is available from the top-level
144 * getter [stream] defined in this library.
145 *
146 * [spawnFunction] returns an [IsolateSink] feeding into the child isolate's
147 * default stream.
148 *
149 * The optional [unhandledExceptionCallback] argument is invoked whenever an
150 * exception inside the isolate is unhandled. It can be seen as a big
151 * `try/catch` around everything that is executed inside the isolate. The
152 * callback should return `true` if it was able to handle the exception.
153 */
154 external IsolateSink streamSpawnFunction(
155 void topLevelFunction(),
156 [bool unhandledExceptionCallback(IsolateUnhandledException e)]);
OLDNEW
« sdk/lib/isolate/isolate.dart ('K') | « sdk/lib/isolate/isolate.dart ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698