OLD | NEW |
(Empty) | |
| 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. |
| 4 |
| 5 /** |
| 6 * Utility functions for setting up ports and sending data. |
| 7 * |
| 8 * This library contains a number of functions that handle the |
| 9 * boiler-plate of setting up a receive port and receiving a |
| 10 * single message on the port. |
| 11 * |
| 12 * There are different functions that offer different ways to |
| 13 * handle the incoming message. |
| 14 * |
| 15 * The simplest function, [singleCallbackPort], takes a callback |
| 16 * and returns a port, and then calls the callback for the first |
| 17 * message sent on the port. |
| 18 * |
| 19 * Other functions intercept the returned value and either |
| 20 * does something with it, or puts it into a [Future] or [Completer]. |
| 21 */ |
| 22 library dart.pkg.isolate.ports; |
| 23 |
| 24 import "dart:isolate"; |
| 25 import "dart:async"; |
| 26 import "src/lists.dart"; |
| 27 |
| 28 /** |
| 29 * Create a [SendPort] that accepts only one message. |
| 30 * |
| 31 * The [callback] function is called once, with the first message |
| 32 * received by the receive port. |
| 33 * All futher messages are ignored. |
| 34 * |
| 35 * If [timeout] is supplied, it is used as a limit on how |
| 36 * long it can take before the message is received. If a |
| 37 * message isn't received in time, the `callback` function |
| 38 * is called once with the [timeoutValue] instead. |
| 39 * |
| 40 * Returns the `SendPort` expecting the single message. |
| 41 * |
| 42 * Equivalent to: |
| 43 * |
| 44 * (new ReceivePort() |
| 45 * ..first.timeout(duration, () => timeoutValue).then(callback)) |
| 46 * .sendPort |
| 47 */ |
| 48 SendPort singleCallbackPort(void callback(response), |
| 49 {Duration timeout, |
| 50 var timeoutValue}) { |
| 51 RawReceivePort responsePort = new RawReceivePort(); |
| 52 Zone zone = Zone.current; |
| 53 callback = zone.registerUnaryCallback(callback); |
| 54 var timer; |
| 55 responsePort.handler = (response) { |
| 56 responsePort.close(); |
| 57 if (timer != null) timer.cancel(); |
| 58 zone.runUnary(callback, response); |
| 59 }; |
| 60 if (timeout != null) { |
| 61 timer = new Timer(timeout, () { |
| 62 responsePort.close(); |
| 63 callback(timeoutValue); |
| 64 }); |
| 65 } |
| 66 return responsePort.sendPort; |
| 67 } |
| 68 |
| 69 /** |
| 70 * Create a [SendPort] that accepts only one message. |
| 71 * |
| 72 * When the first message is received, the [callback] function is |
| 73 * called with the message as argument, |
| 74 * and the [completer] is completed with the result of that call. |
| 75 * All further messages are ignored. |
| 76 * |
| 77 * If `callback` is omitted, it defaults to an identity function. |
| 78 * The `callback` call may return a future, and the completer will |
| 79 * wait for that future to complete. |
| 80 * |
| 81 * If [timeout] is supplied, it is used as a limit on how |
| 82 * long it can take before the message is received. If a |
| 83 * message isn't received in time, the [onTimeout] is called, |
| 84 * and `completer` is completed with the result of that call |
| 85 * instead. |
| 86 * The [callback] function will not be interrupted by the time-out, |
| 87 * as long as the initial message is received in time. |
| 88 * If `onTimeout` is omitted, it defaults to completing the `completer` with |
| 89 * a [TimeoutException]. |
| 90 * |
| 91 * The [completer] may be a synchronous completer. It is only |
| 92 * completed in response to another event, either a port message or a timer. |
| 93 * |
| 94 * Returns the `SendPort` expecting the single message. |
| 95 */ |
| 96 SendPort singleCompletePort(Completer completer, |
| 97 {callback(message), |
| 98 Duration timeout, |
| 99 onTimeout()}) { |
| 100 if (callback == null && timeout == null) { |
| 101 return singleCallbackPort(completer.complete); |
| 102 } |
| 103 RawReceivePort responsePort = new RawReceivePort(); |
| 104 var timer; |
| 105 if (callback == null) { |
| 106 responsePort.handler = (response) { |
| 107 responsePort.close(); |
| 108 if (timer != null) timer.cancel(); |
| 109 completer.complete(response); |
| 110 }; |
| 111 } else { |
| 112 Zone zone = Zone.current; |
| 113 Function action = zone.registerUnaryCallback((response) { |
| 114 completer.complete(new Future.sync(() => callback(response))); |
| 115 }); |
| 116 responsePort.handler = (response) { |
| 117 responsePort.close(); |
| 118 if (timer != null) timer.cancel(); |
| 119 zone.runUnary(action, response); |
| 120 }; |
| 121 } |
| 122 if (timeout != null) { |
| 123 timer = new Timer(timeout, () { |
| 124 responsePort.close(); |
| 125 if (onTimeout != null) { |
| 126 completer.complete(new Future.sync(onTimeout)); |
| 127 } else { |
| 128 completer.completeError(new TimeoutException("Future not completed", |
| 129 timeout)); |
| 130 } |
| 131 }); |
| 132 } |
| 133 return responsePort.sendPort; |
| 134 } |
| 135 |
| 136 /** |
| 137 * Creates a [Future], and a [SendPort] that can be used to complete that |
| 138 * future. |
| 139 * |
| 140 * Calls [action] with the response `SendPort`, then waits for someone |
| 141 * to send a value on that port |
| 142 * The returned `Future` is completed with the value sent on the port. |
| 143 * |
| 144 * If [action] throws, which it shouldn't, |
| 145 * the returned future is completed with that error. |
| 146 * Any return value of `action` is ignored, and if it is asynchronous, |
| 147 * it should handle its own errors. |
| 148 * |
| 149 * If [timeout] is supplied, it is used as a limit on how |
| 150 * long it can take before the message is received. If a |
| 151 * message isn't received in time, the [timeoutValue] used |
| 152 * as the returned future's value instead. |
| 153 * |
| 154 * If you want a timeout on the returned future, it's recommended to |
| 155 * use the [timeout] parameter, and not [Future.timeout] on the result. |
| 156 * The `Future` method won't be able to close the underlying [ReceivePort]. |
| 157 */ |
| 158 Future singleResponseFuture(void action(SendPort responsePort), |
| 159 {Duration timeout, |
| 160 var timeoutValue}) { |
| 161 Completer completer = new Completer.sync(); |
| 162 RawReceivePort responsePort = new RawReceivePort(); |
| 163 Timer timer; |
| 164 Zone zone = Zone.current; |
| 165 responsePort.handler = (v) { |
| 166 responsePort.close(); |
| 167 if (timer != null) timer.cancel(); |
| 168 zone.run(() { |
| 169 completer.complete(v); |
| 170 }); |
| 171 }; |
| 172 if (timeout != null) { |
| 173 timer = new Timer(timeout, () { |
| 174 responsePort.close(); |
| 175 completer.complete(timeoutValue); |
| 176 }); |
| 177 } |
| 178 try { |
| 179 action(responsePort.sendPort); |
| 180 } catch (e, s) { |
| 181 responsePort.close(); |
| 182 if (timer != null) timer.cancel(); |
| 183 // Delay completion because completer is sync. |
| 184 scheduleMicrotask(() { completer.completeError(e, s); }); |
| 185 } |
| 186 return completer.future; |
| 187 } |
| 188 |
| 189 |
| 190 /** |
| 191 * Send the result of a future, either value or error, as a message. |
| 192 * |
| 193 * The result of [future] is sent on [resultPort] in a form expected by |
| 194 * either [receiveFutureResult], [completeFutureResult], or |
| 195 * by the port of [singleResultFuture]. |
| 196 */ |
| 197 void sendFutureResult(Future future, SendPort resultPort) { |
| 198 future.then((v) { resultPort.send(list1(v)); }, |
| 199 onError: (e, s) { resultPort.send(list2("$e", "$s")); }); |
| 200 } |
| 201 |
| 202 |
| 203 /** |
| 204 * Creates a [Future], and a [SendPort] that can be used to complete that |
| 205 * future. |
| 206 * |
| 207 * Calls [action] with the response `SendPort`, then waits for someone |
| 208 * to send a future result on that port using [sendFutureResult]. |
| 209 * The returned `Future` is completed with the future result sent on the port. |
| 210 * |
| 211 * If [action] throws, which it shouldn't, |
| 212 * the returned future is completed with that error, |
| 213 * unless someone manages to send a message on the port before `action` throws. |
| 214 * |
| 215 * If [timeout] is supplied, it is used as a limit on how |
| 216 * long it can take before the message is received. If a |
| 217 * message isn't received in time, the [onTimeout] is called, |
| 218 * and the future is completed with the result of that call |
| 219 * instead. |
| 220 * If `onTimeout` is omitted, it defaults to throwing |
| 221 * a [TimeoutException]. |
| 222 */ |
| 223 Future singleResultFuture(void action(SendPort responsePort), |
| 224 {Duration timeout, |
| 225 onTimeout()}) { |
| 226 Completer completer = new Completer.sync(); |
| 227 SendPort port = singleCompletePort(completer, |
| 228 callback: receiveFutureResult, |
| 229 timeout: timeout, |
| 230 onTimeout: onTimeout); |
| 231 try { |
| 232 action(port); |
| 233 } catch (e, s) { |
| 234 // This should not happen. |
| 235 sendFutureResult(new Future.error(e, s), port); |
| 236 } |
| 237 return completer.future; |
| 238 } |
| 239 |
| 240 /** |
| 241 * Completes a completer with a message created by [sendFutureResult] |
| 242 * |
| 243 * The [response] must be a message on the format sent by [sendFutureResult]. |
| 244 */ |
| 245 void completeFutureResult(var response, Completer completer) { |
| 246 if (response.length == 2) { |
| 247 var error = new RemoteError(response[0], response[1]); |
| 248 completer.completeError(error, error.stackTrace); |
| 249 } else { |
| 250 var result = response[0]; |
| 251 completer.complete(result); |
| 252 } |
| 253 } |
| 254 |
| 255 |
| 256 /** |
| 257 * Convertes a received message created by [sendFutureResult] to a future |
| 258 * result. |
| 259 * |
| 260 * The [response] must be a message on the format sent by [sendFutureResult]. |
| 261 */ |
| 262 Future receiveFutureResult(var response) { |
| 263 if (response.length == 2) { |
| 264 var error = new RemoteError(response[0], response[1]); |
| 265 return new Future.error(error, error.stackTrace); |
| 266 } |
| 267 var result = response[0]; |
| 268 return new Future.value(result); |
| 269 } |
OLD | NEW |