| OLD | NEW |
| (Empty) |
| 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | |
| 2 // for details. All rights reserved. Use of this source code is governed by a | |
| 3 // BSD-style license that can be found in the LICENSE file. | |
| 4 | |
| 5 part of dart.isolate; | |
| 6 | |
| 7 /** | |
| 8 * The initial [IsolateStream] available by default for this isolate. This | |
| 9 * [IsolateStream] is created automatically and it is commonly used to establish | |
| 10 * the first communication between isolates (see [streamSpawnFunction] and | |
| 11 * [streamSpawnUri]). | |
| 12 */ | |
| 13 final IsolateStream stream = new IsolateStream._fromOriginalReceivePort(port); | |
| 14 | |
| 15 /** | |
| 16 * A [MessageBox] creates an [IsolateStream], [stream], and an [IsolateSink], | |
| 17 * [sink]. | |
| 18 * | |
| 19 * Any message that is written into the [sink] (independent of the isolate) is | |
| 20 * sent to the [stream] where its subscribers can react to the messages. | |
| 21 */ | |
| 22 class MessageBox { | |
| 23 final IsolateStream stream; | |
| 24 final IsolateSink sink; | |
| 25 | |
| 26 external MessageBox.oneShot(); | |
| 27 external MessageBox(); | |
| 28 } | |
| 29 | |
| 30 external bool _isCloseToken(var object); | |
| 31 | |
| 32 /** | |
| 33 * [IsolateStream]s, together with [IsolateSink]s, are the only means of | |
| 34 * communication between isolates. Each IsolateStream has a corresponding | |
| 35 * [IsolateSink]. Any message written into that sink will be delivered to | |
| 36 * the stream and then dispatched to the stream's subscribers. | |
| 37 */ | |
| 38 class IsolateStream extends Stream<dynamic> { | |
| 39 bool _isClosed = false; | |
| 40 final ReceivePort _port; | |
| 41 StreamController _controller = new StreamController(sync: true); | |
| 42 | |
| 43 IsolateStream._fromOriginalReceivePort(this._port) { | |
| 44 _port.receive((message, replyTo) { | |
| 45 assert(replyTo == null); | |
| 46 _add(message); | |
| 47 }); | |
| 48 } | |
| 49 | |
| 50 IsolateStream._fromOriginalReceivePortOneShot(this._port) { | |
| 51 _port.receive((message, replyTo) { | |
| 52 assert(replyTo == null); | |
| 53 _add(message); | |
| 54 close(); | |
| 55 }); | |
| 56 } | |
| 57 | |
| 58 void _add(var message) { | |
| 59 if (_isCloseToken(message)) { | |
| 60 close(); | |
| 61 } else { | |
| 62 _controller.sink.add(message); | |
| 63 } | |
| 64 } | |
| 65 | |
| 66 /** | |
| 67 * Close the stream from the receiving end. | |
| 68 * | |
| 69 * Closing an already closed port has no effect. | |
| 70 */ | |
| 71 void close() { | |
| 72 if (!_isClosed) { | |
| 73 _isClosed = true; | |
| 74 _port.close(); | |
| 75 _controller.close(); | |
| 76 } | |
| 77 } | |
| 78 | |
| 79 StreamSubscription listen(void onData(event), | |
| 80 { void onError(error), | |
| 81 void onDone(), | |
| 82 bool cancelOnError}) { | |
| 83 return _controller.stream.listen(onData, | |
| 84 onError: onError, | |
| 85 onDone: onDone, | |
| 86 cancelOnError: cancelOnError); | |
| 87 } | |
| 88 } | |
| 89 | |
| 90 /** | |
| 91 * [IsolateSink]s represent the feed for [IsolateStream]s. Any message written | |
| 92 * to [this] is delivered to its respective [IsolateStream]. [IsolateSink]s are | |
| 93 * created by [MessageBox]es. | |
| 94 * | |
| 95 * [IsolateSink]s can be transmitted to other isolates. | |
| 96 */ | |
| 97 abstract class IsolateSink extends EventSink<dynamic> { | |
| 98 // TODO(floitsch): Actually it should be a StreamSink (being able to flow- | |
| 99 // control). | |
| 100 | |
| 101 /** | |
| 102 * Sends an asynchronous [message] to the linked [IsolateStream]. The message | |
| 103 * is copied to the receiving isolate. | |
| 104 * | |
| 105 * The content of [message] can be: primitive values (null, num, bool, double, | |
| 106 * String), instances of [IsolateSink]s, and lists and maps whose elements are | |
| 107 * any of these. List and maps are also allowed to be cyclic. | |
| 108 * | |
| 109 * In the special circumstances when two isolates share the same code and are | |
| 110 * running in the same process (e.g. isolates created via [spawnFunction]), it | |
| 111 * is also possible to send object instances (which would be copied in the | |
| 112 * process). This is currently only supported by the dartvm. For now, the | |
| 113 * dart2js compiler only supports the restricted messages described above. | |
| 114 */ | |
| 115 void add(dynamic message); | |
| 116 | |
| 117 void addError(errorEvent); | |
| 118 | |
| 119 /** Closing multiple times is allowed. */ | |
| 120 void close(); | |
| 121 | |
| 122 /** | |
| 123 * Tests whether [other] is an [IsolateSink] feeding into the same | |
| 124 * [IsolateStream] as this one. | |
| 125 */ | |
| 126 bool operator==(var other); | |
| 127 } | |
| 128 | |
| 129 | |
| 130 /** | |
| 131 * Creates and spawns an isolate that shares the same code as the current | |
| 132 * isolate, but that starts from [topLevelFunction]. The [topLevelFunction] | |
| 133 * argument must be a static top-level function or a static method that takes no | |
| 134 * arguments. | |
| 135 * | |
| 136 * When any isolate starts (even the main script of the application), a default | |
| 137 * [IsolateStream] is created for it. This sink is available from the top-level | |
| 138 * getter [stream] defined in this library. | |
| 139 * | |
| 140 * [spawnFunction] returns an [IsolateSink] feeding into the child isolate's | |
| 141 * default stream. | |
| 142 * | |
| 143 * The optional [unhandledExceptionCallback] argument is invoked whenever an | |
| 144 * exception inside the isolate is unhandled. It can be seen as a big | |
| 145 * `try/catch` around everything that is executed inside the isolate. The | |
| 146 * callback should return `true` when it was able to handled the exception. | |
| 147 */ | |
| 148 external IsolateSink streamSpawnFunction( | |
| 149 void topLevelFunction(), | |
| 150 [bool unhandledExceptionCallback(IsolateUnhandledException e)]); | |
| OLD | NEW |