| OLD | NEW |
| 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 /** A library to illustrate pipelining. */ | 5 /** A library to illustrate pipelining. */ |
| 6 #library("promise"); | 6 #library("promise"); |
| 7 #import("dart:isolate"); | 7 #import("dart:isolate"); |
| 8 | 8 |
| 9 /** A promise to value of type [T] that may be computed asynchronously. */ | 9 /** A promise to value of type [T] that may be computed asynchronously. */ |
| 10 // TODO(sigmund,benl): remove Promise<T> use Future<T> instead. | 10 // TODO(sigmund,benl): remove Promise<T> use Future<T> instead. |
| (...skipping 153 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 164 | 164 |
| 165 } | 165 } |
| 166 | 166 |
| 167 // When a promise is sent across a port, it is converted to a | 167 // When a promise is sent across a port, it is converted to a |
| 168 // Promise<SendPort> down which we must send a port to receive the | 168 // Promise<SendPort> down which we must send a port to receive the |
| 169 // completion value. Hand the Promise<SendPort> to this class to deal | 169 // completion value. Hand the Promise<SendPort> to this class to deal |
| 170 // with it. | 170 // with it. |
| 171 | 171 |
| 172 class PromiseProxy<T> extends PromiseImpl<T> { | 172 class PromiseProxy<T> extends PromiseImpl<T> { |
| 173 PromiseProxy(Promise<SendPort> sendCompleter) { | 173 PromiseProxy(Promise<SendPort> sendCompleter) { |
| 174 ReceivePort completer = new ReceivePort.singleShot(); | 174 ReceivePort completer = new ReceivePort(); |
| 175 completer.receive((var msg, SendPort _) { | 175 completer.then((var msg) { |
| 176 completer.close(); |
| 176 complete(msg[0]); | 177 complete(msg[0]); |
| 177 }); | 178 }); |
| 178 sendCompleter.addCompleteHandler((SendPort port) { | 179 sendCompleter.addCompleteHandler((SendPort port) { |
| 179 port.send([completer.toSendPort()], null); | 180 port.send([completer.toSendPort()], null); |
| 180 }); | 181 }); |
| 181 } | 182 } |
| 182 } | 183 } |
| 183 | 184 |
| 184 class PromiseImpl<T> implements Promise<T> { | 185 class PromiseImpl<T> implements Promise<T> { |
| 185 | 186 |
| (...skipping 334 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 520 } | 521 } |
| 521 // Obviously this will be true if [entry] was a Proxy. | 522 // Obviously this will be true if [entry] was a Proxy. |
| 522 if (entry is Promise) { | 523 if (entry is Promise) { |
| 523 // Note that we could optimise this by just sending the value | 524 // Note that we could optimise this by just sending the value |
| 524 // if the promise is already complete. Let's get this working | 525 // if the promise is already complete. Let's get this working |
| 525 // first! | 526 // first! |
| 526 | 527 |
| 527 // This port will receive a SendPort that can be used to | 528 // This port will receive a SendPort that can be used to |
| 528 // signal completion of this promise to the corresponding | 529 // signal completion of this promise to the corresponding |
| 529 // promise that the other end has created. | 530 // promise that the other end has created. |
| 530 ReceivePort receiveCompleter = new ReceivePort.singleShot(); | 531 ReceivePort receiveCompleter = new ReceivePort(); |
| 531 marshalled[i] = receiveCompleter.toSendPort(); | 532 marshalled[i] = receiveCompleter.toSendPort(); |
| 532 Promise<SendPort> completer = new Promise<SendPort>(); | 533 Promise<SendPort> completer = new Promise<SendPort>(); |
| 533 receiveCompleter.receive((var msg, SendPort replyPort) { | 534 receiveCompleter.receive((var msg, SendPort replyPort) { |
| 535 port.close(); |
| 534 completer.complete(msg[0]); | 536 completer.complete(msg[0]); |
| 535 }); | 537 }); |
| 536 entry.addCompleteHandler((value) { | 538 entry.addCompleteHandler((value) { |
| 537 completer.addCompleteHandler((SendPort completePort) { | 539 completer.addCompleteHandler((SendPort completePort) { |
| 538 _marshal([value], (List completeMessage) => completePort.send(comp
leteMessage, null)); | 540 _marshal([value], (List completeMessage) => completePort.send(comp
leteMessage, null)); |
| 539 }); | 541 }); |
| 540 }); | 542 }); |
| 541 } else { | 543 } else { |
| 542 // FIXME(kasperl, benl): this should probably be a copy? | 544 // FIXME(kasperl, benl): this should probably be a copy? |
| 543 marshalled[i] = entry; | 545 marshalled[i] = entry; |
| 544 } | 546 } |
| 545 if (marshalled[i] is ReceivePort) { | 547 if (marshalled[i] is ReceivePort) { |
| 546 throw new Exception("Despite the documentation, you cannot send a Rece
ivePort"); | 548 throw new Exception("Despite the documentation, you cannot send a Rece
ivePort"); |
| 547 } | 549 } |
| 548 } | 550 } |
| 549 return process(marshalled); | 551 return process(marshalled); |
| 550 }).flatten(); | 552 }).flatten(); |
| 551 } | 553 } |
| 552 | 554 |
| 553 Promise<SendPort> _promise; | 555 Promise<SendPort> _promise; |
| 554 static Map<SendPort, Dispatcher> _dispatchers; | 556 static Map<SendPort, Dispatcher> _dispatchers; |
| 555 | 557 |
| 556 } | 558 } |
| OLD | NEW |