| OLD | NEW |
| (Empty) | |
| 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. |
| 4 |
| 5 /// Utility functions for setting up ports and sending data. |
| 6 /// |
| 7 /// This library contains a number of functions that handle the |
| 8 /// boiler-plate of setting up a receive port and receiving a |
| 9 /// single message on the port. |
| 10 /// |
| 11 /// There are different functions that offer different ways to |
| 12 /// handle the incoming message. |
| 13 /// |
| 14 /// The simplest function, [singleCallbackPort], takes a callback |
| 15 /// and returns a port, and then calls the callback for the first |
| 16 /// message sent on the port. |
| 17 /// |
| 18 /// Other functions intercept the returned value and either |
| 19 /// does something with it, or puts it into a [Future] or [Completer]. |
| 20 library isolate.ports; |
| 21 |
| 22 import "dart:async"; |
| 23 import "dart:isolate"; |
| 24 |
| 25 import "src/lists.dart"; |
| 26 |
| 27 /// Create a [SendPort] that accepts only one message. |
| 28 /// |
| 29 /// The [callback] function is called once, with the first message |
| 30 /// received by the receive port. |
| 31 /// All further messages are ignored. |
| 32 /// |
| 33 /// If [timeout] is supplied, it is used as a limit on how |
| 34 /// long it can take before the message is received. If a |
| 35 /// message isn't received in time, the `callback` function |
| 36 /// is called once with the [timeoutValue] instead. |
| 37 /// |
| 38 /// Returns the `SendPort` expecting the single message. |
| 39 /// |
| 40 /// Equivalent to: |
| 41 /// |
| 42 /// (new ReceivePort() |
| 43 /// ..first.timeout(duration, () => timeoutValue).then(callback)) |
| 44 /// .sendPort |
| 45 SendPort singleCallbackPort(void callback(response), |
| 46 {Duration timeout, |
| 47 var timeoutValue}) { |
| 48 RawReceivePort responsePort = new RawReceivePort(); |
| 49 Zone zone = Zone.current; |
| 50 callback = zone.registerUnaryCallback(callback); |
| 51 var timer; |
| 52 responsePort.handler = (response) { |
| 53 responsePort.close(); |
| 54 if (timer != null) timer.cancel(); |
| 55 zone.runUnary(callback, response); |
| 56 }; |
| 57 if (timeout != null) { |
| 58 timer = new Timer(timeout, () { |
| 59 responsePort.close(); |
| 60 callback(timeoutValue); |
| 61 }); |
| 62 } |
| 63 return responsePort.sendPort; |
| 64 } |
| 65 |
| 66 /// Create a [SendPort] that accepts only one message. |
| 67 /// |
| 68 /// When the first message is received, the [callback] function is |
| 69 /// called with the message as argument, |
| 70 /// and the [completer] is completed with the result of that call. |
| 71 /// All further messages are ignored. |
| 72 /// |
| 73 /// If `callback` is omitted, it defaults to an identity function. |
| 74 /// The `callback` call may return a future, and the completer will |
| 75 /// wait for that future to complete. |
| 76 /// |
| 77 /// If [timeout] is supplied, it is used as a limit on how |
| 78 /// long it can take before the message is received. If a |
| 79 /// message isn't received in time, the [onTimeout] is called, |
| 80 /// and `completer` is completed with the result of that call |
| 81 /// instead. |
| 82 /// The [callback] function will not be interrupted by the time-out, |
| 83 /// as long as the initial message is received in time. |
| 84 /// If `onTimeout` is omitted, it defaults to completing the `completer` with |
| 85 /// a [TimeoutException]. |
| 86 /// |
| 87 /// The [completer] may be a synchronous completer. It is only |
| 88 /// completed in response to another event, either a port message or a timer. |
| 89 /// |
| 90 /// Returns the `SendPort` expecting the single message. |
| 91 SendPort singleCompletePort(Completer completer, |
| 92 {callback(message), |
| 93 Duration timeout, |
| 94 onTimeout()}) { |
| 95 if (callback == null && timeout == null) { |
| 96 return singleCallbackPort(completer.complete); |
| 97 } |
| 98 RawReceivePort responsePort = new RawReceivePort(); |
| 99 var timer; |
| 100 if (callback == null) { |
| 101 responsePort.handler = (response) { |
| 102 responsePort.close(); |
| 103 if (timer != null) timer.cancel(); |
| 104 completer.complete(response); |
| 105 }; |
| 106 } else { |
| 107 Zone zone = Zone.current; |
| 108 Function action = zone.registerUnaryCallback((response) { |
| 109 completer.complete(new Future.sync(() => callback(response))); |
| 110 }); |
| 111 responsePort.handler = (response) { |
| 112 responsePort.close(); |
| 113 if (timer != null) timer.cancel(); |
| 114 zone.runUnary(action, response); |
| 115 }; |
| 116 } |
| 117 if (timeout != null) { |
| 118 timer = new Timer(timeout, () { |
| 119 responsePort.close(); |
| 120 if (onTimeout != null) { |
| 121 completer.complete(new Future.sync(onTimeout)); |
| 122 } else { |
| 123 completer.completeError( |
| 124 new TimeoutException("Future not completed", timeout)); |
| 125 } |
| 126 }); |
| 127 } |
| 128 return responsePort.sendPort; |
| 129 } |
| 130 |
| 131 /// Creates a [Future], and a [SendPort] that can be used to complete that |
| 132 /// future. |
| 133 /// |
| 134 /// Calls [action] with the response `SendPort`, then waits for someone |
| 135 /// to send a value on that port |
| 136 /// The returned `Future` is completed with the value sent on the port. |
| 137 /// |
| 138 /// If [action] throws, which it shouldn't, |
| 139 /// the returned future is completed with that error. |
| 140 /// Any return value of `action` is ignored, and if it is asynchronous, |
| 141 /// it should handle its own errors. |
| 142 /// |
| 143 /// If [timeout] is supplied, it is used as a limit on how |
| 144 /// long it can take before the message is received. If a |
| 145 /// message isn't received in time, the [timeoutValue] used |
| 146 /// as the returned future's value instead. |
| 147 /// |
| 148 /// If you want a timeout on the returned future, it's recommended to |
| 149 /// use the [timeout] parameter, and not [Future.timeout] on the result. |
| 150 /// The `Future` method won't be able to close the underlying [ReceivePort]. |
| 151 Future singleResponseFuture(void action(SendPort responsePort), |
| 152 {Duration timeout, |
| 153 var timeoutValue}) { |
| 154 Completer completer = new Completer.sync(); |
| 155 RawReceivePort responsePort = new RawReceivePort(); |
| 156 Timer timer; |
| 157 Zone zone = Zone.current; |
| 158 responsePort.handler = (v) { |
| 159 responsePort.close(); |
| 160 if (timer != null) timer.cancel(); |
| 161 zone.run(() { |
| 162 completer.complete(v); |
| 163 }); |
| 164 }; |
| 165 if (timeout != null) { |
| 166 timer = new Timer(timeout, () { |
| 167 responsePort.close(); |
| 168 completer.complete(timeoutValue); |
| 169 }); |
| 170 } |
| 171 try { |
| 172 action(responsePort.sendPort); |
| 173 } catch (e, s) { |
| 174 responsePort.close(); |
| 175 if (timer != null) timer.cancel(); |
| 176 // Delay completion because completer is sync. |
| 177 scheduleMicrotask(() { |
| 178 completer.completeError(e, s); |
| 179 }); |
| 180 } |
| 181 return completer.future; |
| 182 } |
| 183 |
| 184 |
| 185 /// Send the result of a future, either value or error, as a message. |
| 186 /// |
| 187 /// The result of [future] is sent on [resultPort] in a form expected by |
| 188 /// either [receiveFutureResult], [completeFutureResult], or |
| 189 /// by the port of [singleResultFuture]. |
| 190 void sendFutureResult(Future future, SendPort resultPort) { |
| 191 future.then( |
| 192 (v) { resultPort.send(list1(v)); |
| 193 }, onError: (e, s) { |
| 194 resultPort.send(list2("$e", "$s")); |
| 195 }); |
| 196 } |
| 197 |
| 198 |
| 199 /// Creates a [Future], and a [SendPort] that can be used to complete that |
| 200 /// future. |
| 201 /// |
| 202 /// Calls [action] with the response `SendPort`, then waits for someone |
| 203 /// to send a future result on that port using [sendFutureResult]. |
| 204 /// The returned `Future` is completed with the future result sent on the port. |
| 205 /// |
| 206 /// If [action] throws, which it shouldn't, |
| 207 /// the returned future is completed with that error, |
| 208 /// unless someone manages to send a message on the port before `action` throws. |
| 209 /// |
| 210 /// If [timeout] is supplied, it is used as a limit on how |
| 211 /// long it can take before the message is received. If a |
| 212 /// message isn't received in time, the [onTimeout] is called, |
| 213 /// and the future is completed with the result of that call |
| 214 /// instead. |
| 215 /// If `onTimeout` is omitted, it defaults to throwing |
| 216 /// a [TimeoutException]. |
| 217 Future singleResultFuture(void action(SendPort responsePort), |
| 218 {Duration timeout, |
| 219 onTimeout()}) { |
| 220 Completer completer = new Completer.sync(); |
| 221 SendPort port = singleCompletePort(completer, |
| 222 callback: receiveFutureResult, |
| 223 timeout: timeout, |
| 224 onTimeout: onTimeout); |
| 225 try { |
| 226 action(port); |
| 227 } catch (e, s) { |
| 228 // This should not happen. |
| 229 sendFutureResult(new Future.error(e, s), port); |
| 230 } |
| 231 return completer.future; |
| 232 } |
| 233 |
| 234 /// Completes a completer with a message created by [sendFutureResult] |
| 235 /// |
| 236 /// The [response] must be a message on the format sent by [sendFutureResult]. |
| 237 void completeFutureResult(var response, Completer completer) { |
| 238 if (response.length == 2) { |
| 239 var error = new RemoteError(response[0], response[1]); |
| 240 completer.completeError(error, error.stackTrace); |
| 241 } else { |
| 242 var result = response[0]; |
| 243 completer.complete(result); |
| 244 } |
| 245 } |
| 246 |
| 247 |
| 248 /// Converts a received message created by [sendFutureResult] to a future |
| 249 /// result. |
| 250 /// |
| 251 /// The [response] must be a message on the format sent by [sendFutureResult]. |
| 252 Future receiveFutureResult(var response) { |
| 253 if (response.length == 2) { |
| 254 var error = new RemoteError(response[0], response[1]); |
| 255 return new Future.error(error, error.stackTrace); |
| 256 } |
| 257 var result = response[0]; |
| 258 return new Future.value(result); |
| 259 } |
| 260 |
| 261 /// A [Future] and a [SendPort] that can be used to complete the future. |
| 262 /// |
| 263 /// The first value sent to [port] is used to complete the [result]. |
| 264 /// All following values sent to `port` are ignored. |
| 265 class SingleResponseChannel { |
| 266 Zone _zone; |
| 267 final RawReceivePort _receivePort; |
| 268 final Completer _completer; |
| 269 final Function _callback; |
| 270 Timer _timer; |
| 271 |
| 272 /// Creates a response channel. |
| 273 /// |
| 274 /// The [result] is completed with the first value sent to [port]. |
| 275 /// |
| 276 /// If [callback] is provided, the value sent to [port] is first passed |
| 277 /// to `callback`, and the result of that is used to complete `result`. |
| 278 /// |
| 279 /// If [timeout] is provided, the future is completed after that |
| 280 /// duration if it hasn't received a value from the port earlier. |
| 281 /// If [throwOnTimeout] is true, the the future is completed with a |
| 282 /// [TimeoutException] as an error if it times out. |
| 283 /// Otherwise, if [onTimeout] is provided, |
| 284 /// the future is completed with the result of running `onTimeout()`. |
| 285 /// If `onTimeout` is not provided either, |
| 286 /// the future is completed with `timeoutValue`, which defaults to `null`. |
| 287 SingleResponseChannel({callback(value), |
| 288 Duration timeout, |
| 289 bool throwOnTimeout: false, |
| 290 onTimeout(), |
| 291 var timeoutValue}) |
| 292 : _receivePort = new RawReceivePort(), |
| 293 _completer = new Completer.sync(), |
| 294 _callback = callback, |
| 295 _zone = Zone.current { |
| 296 _receivePort.handler = _handleResponse; |
| 297 if (timeout != null) { |
| 298 _timer = new Timer(timeout, () { |
| 299 // Executed as a timer event. |
| 300 _receivePort.close(); |
| 301 if (!_completer.isCompleted) { |
| 302 if (throwOnTimeout) { |
| 303 _completer.completeError( |
| 304 new TimeoutException('Timeout waiting for response', timeout)); |
| 305 } else if (onTimeout != null) { |
| 306 _completer.complete(new Future.sync(onTimeout)); |
| 307 } else { |
| 308 _completer.complete(timeoutValue); |
| 309 } |
| 310 } |
| 311 }); |
| 312 } |
| 313 } |
| 314 |
| 315 /// The port expecting a value that will complete [result]. |
| 316 SendPort get port => _receivePort.sendPort; |
| 317 |
| 318 /// Future completed by the first value sent to [port]. |
| 319 Future get result => _completer.future; |
| 320 |
| 321 /// If the channel hasn't completed yet, interrupt it and complete the result. |
| 322 /// |
| 323 /// If the channel hasn't received a value yet, or timed out, it is stopped |
| 324 /// (like by a timeout) and the [SingleResponseChannel.result] |
| 325 /// is completed with [result]. |
| 326 void interrupt([result]) { |
| 327 _receivePort.close(); |
| 328 _cancelTimer(); |
| 329 if (!_completer.isCompleted) { |
| 330 // Not in event tail position, so complete the sync completer later. |
| 331 _completer.complete(new Future.microtask(() => result)); |
| 332 } |
| 333 } |
| 334 |
| 335 void _cancelTimer() { |
| 336 if (_timer != null) { |
| 337 _timer.cancel(); |
| 338 _timer = null; |
| 339 } |
| 340 } |
| 341 |
| 342 void _handleResponse(v) { |
| 343 // Executed as a port event. |
| 344 _receivePort.close(); |
| 345 _cancelTimer(); |
| 346 if (_callback == null) { |
| 347 _completer.complete(v); |
| 348 } else { |
| 349 // The _handleResponse function is the handler of a RawReceivePort. |
| 350 // As such, it runs in the root zone. |
| 351 // The _callback should be run in the original zone, both because it's |
| 352 // what the user expects, and because it may create an error that needs |
| 353 // to be propagated to the original completer. If that completer was |
| 354 // created in a different error zone, an error from the root zone |
| 355 // would become uncaught. |
| 356 _zone.run(() { |
| 357 _completer.complete(new Future.sync(() => _callback(v))); |
| 358 }); |
| 359 } |
| 360 } |
| 361 } |
| OLD | NEW |