OLD | NEW |
---|---|
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 library _isolate_helper; | 5 library _isolate_helper; |
6 | 6 |
7 import 'dart:async'; | 7 import 'dart:async'; |
8 import 'dart:collection' show Queue, HashMap; | 8 import 'dart:collection' show Queue, HashMap; |
9 import 'dart:isolate'; | 9 import 'dart:isolate'; |
10 import 'dart:_js_helper' show | 10 import 'dart:_js_helper' show |
(...skipping 367 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
378 } | 378 } |
379 | 379 |
380 const String _SPAWNED_SIGNAL = "spawned"; | 380 const String _SPAWNED_SIGNAL = "spawned"; |
381 | 381 |
382 var globalThis = Primitives.computeGlobalThis(); | 382 var globalThis = Primitives.computeGlobalThis(); |
383 var globalWindow = JS('', "#.window", globalThis); | 383 var globalWindow = JS('', "#.window", globalThis); |
384 var globalWorker = JS('', "#.Worker", globalThis); | 384 var globalWorker = JS('', "#.Worker", globalThis); |
385 bool globalPostMessageDefined = | 385 bool globalPostMessageDefined = |
386 JS('', "#.postMessage !== (void 0)", globalThis); | 386 JS('', "#.postMessage !== (void 0)", globalThis); |
387 | 387 |
388 typedef _MainFunction(); | |
389 typedef _MainFunctionArgs(args); | |
390 typedef _MainFunctionArgsMessage(args, message); | |
391 | |
388 class IsolateNatives { | 392 class IsolateNatives { |
389 | 393 |
390 static String thisScript = computeThisScript(); | 394 static String thisScript = computeThisScript(); |
391 | 395 |
392 /// Associates an ID with a native worker object. | 396 /// Associates an ID with a native worker object. |
393 static final Expando<int> workerIds = new Expando<int>(); | 397 static final Expando<int> workerIds = new Expando<int>(); |
394 | 398 |
395 /** | 399 /** |
396 * The src url for the script tag that loaded this code. Used to create | 400 * The src url for the script tag that loaded this code. Used to create |
397 * JavaScript workers. | 401 * JavaScript workers. |
(...skipping 63 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
461 */ | 465 */ |
462 static void _processWorkerMessage(/* Worker */ sender, e) { | 466 static void _processWorkerMessage(/* Worker */ sender, e) { |
463 var msg = _deserializeMessage(_getEventData(e)); | 467 var msg = _deserializeMessage(_getEventData(e)); |
464 switch (msg['command']) { | 468 switch (msg['command']) { |
465 case 'start': | 469 case 'start': |
466 _globalState.currentManagerId = msg['id']; | 470 _globalState.currentManagerId = msg['id']; |
467 String functionName = msg['functionName']; | 471 String functionName = msg['functionName']; |
468 Function entryPoint = (functionName == null) | 472 Function entryPoint = (functionName == null) |
469 ? _globalState.entry | 473 ? _globalState.entry |
470 : _getJSFunctionFromName(functionName); | 474 : _getJSFunctionFromName(functionName); |
475 var args = msg['args']; | |
Lasse Reichstein Nielsen
2013/10/25 12:45:59
Are we sure we don't need to deserializer args too
floitsch
2013/10/25 13:52:56
We need to check, because dart2js will (or should)
| |
476 var message = _deserializeMessage(msg['msg']); | |
477 var isSpawnUri = msg['isSpawnUri']; | |
471 var replyTo = _deserializeMessage(msg['replyTo']); | 478 var replyTo = _deserializeMessage(msg['replyTo']); |
472 var context = new _IsolateContext(); | 479 var context = new _IsolateContext(); |
473 _globalState.topEventLoop.enqueue(context, () { | 480 _globalState.topEventLoop.enqueue(context, () { |
474 _startIsolate(entryPoint, replyTo); | 481 _startIsolate(entryPoint, args, message, isSpawnUri, replyTo); |
475 }, 'worker-start'); | 482 }, 'worker-start'); |
476 // Make sure we always have a current context in this worker. | 483 // Make sure we always have a current context in this worker. |
477 // TODO(7907): This is currently needed because we're using | 484 // TODO(7907): This is currently needed because we're using |
478 // Timers to implement Futures, and this isolate library | 485 // Timers to implement Futures, and this isolate library |
479 // implementation uses Futures. We should either stop using | 486 // implementation uses Futures. We should either stop using |
480 // Futures in this library, or re-adapt if Futures get a | 487 // Futures in this library, or re-adapt if Futures get a |
481 // different implementation. | 488 // different implementation. |
482 _globalState.currentContext = context; | 489 _globalState.currentContext = context; |
483 _globalState.topEventLoop.run(); | 490 _globalState.topEventLoop.run(); |
484 break; | 491 break; |
485 case 'spawn-worker': | 492 case 'spawn-worker': |
486 _spawnWorker(msg['functionName'], msg['uri'], msg['replyPort']); | 493 _spawnWorker(msg['functionName'], msg['uri'], |
494 msg['args'], msg['msg'], | |
495 msg['isSpawnUri'], msg['replyPort']); | |
487 break; | 496 break; |
488 case 'message': | 497 case 'message': |
489 SendPort port = msg['port']; | 498 SendPort port = msg['port']; |
490 // If the port has been closed, we ignore the message. | 499 // If the port has been closed, we ignore the message. |
491 if (port != null) { | 500 if (port != null) { |
492 msg['port'].send(msg['msg'], msg['replyTo']); | 501 msg['port'].send(msg['msg']); |
493 } | 502 } |
494 _globalState.topEventLoop.run(); | 503 _globalState.topEventLoop.run(); |
495 break; | 504 break; |
496 case 'close': | 505 case 'close': |
497 _globalState.managers.remove(workerIds[sender]); | 506 _globalState.managers.remove(workerIds[sender]); |
498 JS('void', '#.terminate()', sender); | 507 JS('void', '#.terminate()', sender); |
499 _globalState.topEventLoop.run(); | 508 _globalState.topEventLoop.run(); |
500 break; | 509 break; |
501 case 'log': | 510 case 'log': |
502 _log(msg['msg']); | 511 _log(msg['msg']); |
(...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
543 */ | 552 */ |
544 static String _getJSFunctionName(Function f) { | 553 static String _getJSFunctionName(Function f) { |
545 return JS("String|Null", r"(#['$name'] || #)", f, null); | 554 return JS("String|Null", r"(#['$name'] || #)", f, null); |
546 } | 555 } |
547 | 556 |
548 /** Create a new JavaScript object instance given its constructor. */ | 557 /** Create a new JavaScript object instance given its constructor. */ |
549 static dynamic _allocate(var ctor) { | 558 static dynamic _allocate(var ctor) { |
550 return JS("", "new #()", ctor); | 559 return JS("", "new #()", ctor); |
551 } | 560 } |
552 | 561 |
553 static SendPort spawnFunction(void topLevelFunction()) { | 562 static SendPort spawnFunction(void topLevelFunction(message), message) { |
554 final name = _getJSFunctionName(topLevelFunction); | 563 final name = _getJSFunctionName(topLevelFunction); |
555 if (name == null) { | 564 if (name == null) { |
556 throw new UnsupportedError( | 565 throw new UnsupportedError( |
557 "only top-level functions can be spawned."); | 566 "only top-level functions can be spawned."); |
558 } | 567 } |
559 return spawn(name, null, false); | 568 return spawn(name, null, null, message, false, false); |
569 } | |
570 | |
571 static SendPort spawnUri(Uri uri, List<String> args, message) { | |
572 return spawn(null, uri.path, args, message, false, true); | |
560 } | 573 } |
561 | 574 |
562 // TODO(sigmund): clean up above, after we make the new API the default: | 575 // TODO(sigmund): clean up above, after we make the new API the default: |
563 | 576 |
564 /// If [uri] is `null` it is replaced with the current script. | 577 /// If [uri] is `null` it is replaced with the current script. |
565 static spawn(String functionName, String uri, bool isLight) { | 578 static SendPort spawn(String functionName, String uri, |
579 List<String> args, message, | |
580 bool isLight, bool isSpawnUri) { | |
566 // Assume that the compiled version of the Dart file lives just next to the | 581 // Assume that the compiled version of the Dart file lives just next to the |
567 // dart file. | 582 // dart file. |
568 // TODO(floitsch): support precompiled version of dart2js output. | 583 // TODO(floitsch): support precompiled version of dart2js output. |
569 if (uri != null && uri.endsWith(".dart")) uri += ".js"; | 584 if (uri != null && uri.endsWith(".dart")) uri += ".js"; |
570 | 585 |
571 Completer<SendPort> completer = new Completer<SendPort>.sync(); | 586 Completer<SendPort> completer = new Completer<SendPort>.sync(); |
572 ReceivePort port = new ReceivePort(); | 587 ReceivePort port = new ReceivePort(); |
573 port.receive((msg, SendPort replyPort) { | 588 port.listen((msg) { |
574 port.close(); | 589 port.close(); |
575 assert(msg == _SPAWNED_SIGNAL); | 590 assert(msg[0] == _SPAWNED_SIGNAL); |
576 completer.complete(replyPort); | 591 completer.complete(msg[1]); |
577 }); | 592 }); |
578 | 593 |
579 SendPort signalReply = port.toSendPort(); | 594 SendPort signalReply = port.sendPort; |
580 | 595 |
581 if (_globalState.useWorkers && !isLight) { | 596 if (_globalState.useWorkers && !isLight) { |
582 _startWorker(functionName, uri, signalReply); | 597 _startWorker(functionName, uri, args, message, isSpawnUri, signalReply); |
583 } else { | 598 } else { |
584 _startNonWorker(functionName, uri, signalReply); | 599 _startNonWorker( |
600 functionName, uri, args, message, isSpawnUri, signalReply); | |
585 } | 601 } |
586 return new _BufferingSendPort( | 602 return new _BufferingSendPort( |
587 _globalState.currentContext.id, completer.future); | 603 _globalState.currentContext.id, completer.future); |
588 } | 604 } |
589 | 605 |
590 static SendPort _startWorker( | 606 static void _startWorker( |
591 String functionName, String uri, SendPort replyPort) { | 607 String functionName, String uri, |
608 List<String> args, message, | |
609 bool isSpawnUri, | |
610 SendPort replyPort) { | |
592 if (_globalState.isWorker) { | 611 if (_globalState.isWorker) { |
593 _globalState.mainManager.postMessage(_serializeMessage({ | 612 _globalState.mainManager.postMessage(_serializeMessage({ |
594 'command': 'spawn-worker', | 613 'command': 'spawn-worker', |
595 'functionName': functionName, | 614 'functionName': functionName, |
615 'args': args, | |
616 'msg': message, | |
596 'uri': uri, | 617 'uri': uri, |
618 'isSpawnUri': isSpawnUri, | |
597 'replyPort': replyPort})); | 619 'replyPort': replyPort})); |
598 } else { | 620 } else { |
599 _spawnWorker(functionName, uri, replyPort); | 621 _spawnWorker(functionName, uri, args, message, isSpawnUri, replyPort); |
600 } | 622 } |
601 } | 623 } |
602 | 624 |
603 static SendPort _startNonWorker( | 625 static void _startNonWorker( |
604 String functionName, String uri, SendPort replyPort) { | 626 String functionName, String uri, |
627 List<String> args, message, | |
628 bool isSpawnUri, | |
629 SendPort replyPort) { | |
605 // TODO(eub): support IE9 using an iframe -- Dart issue 1702. | 630 // TODO(eub): support IE9 using an iframe -- Dart issue 1702. |
606 if (uri != null) throw new UnsupportedError( | 631 if (uri != null) throw new UnsupportedError( |
Lasse Reichstein Nielsen
2013/10/25 12:45:59
Multi-line if needs braces.
floitsch
2013/10/25 13:52:56
Done.
| |
607 "Currently spawnUri is not supported without web workers."); | 632 "Currently spawnUri is not supported without web workers."); |
608 _globalState.topEventLoop.enqueue(new _IsolateContext(), () { | 633 _globalState.topEventLoop.enqueue(new _IsolateContext(), () { |
609 final func = _getJSFunctionFromName(functionName); | 634 final func = _getJSFunctionFromName(functionName); |
610 _startIsolate(func, replyPort); | 635 _startIsolate(func, args, message, isSpawnUri, replyPort); |
611 }, 'nonworker start'); | 636 }, 'nonworker start'); |
612 } | 637 } |
613 | 638 |
614 static void _startIsolate(Function topLevel, SendPort replyTo) { | 639 static void _startIsolate(Function topLevel, |
640 List<String> args, message, | |
641 bool isSpawnUri, | |
642 SendPort replyTo) { | |
615 _IsolateContext context = JS_CURRENT_ISOLATE_CONTEXT(); | 643 _IsolateContext context = JS_CURRENT_ISOLATE_CONTEXT(); |
616 Primitives.initializeStatics(context.id); | 644 Primitives.initializeStatics(context.id); |
617 lazyPort = new ReceivePort(); | 645 lazyPort = new ReceivePort(); |
618 replyTo.send(_SPAWNED_SIGNAL, port.toSendPort()); | 646 replyTo.send([_SPAWNED_SIGNAL, lazyPort.sendPort]); |
619 topLevel(); | 647 if (!isSpawnUri) { |
648 topLevel(message); | |
649 } else if (topLevel is _MainFunctionArgsMessage) { | |
Lasse Reichstein Nielsen
2013/10/25 12:45:59
If message was null, should we have preferred to c
floitsch
2013/10/25 13:52:56
I prefer providing the argument. This is the expli
| |
650 topLevel(args, message); | |
651 } else if (topLevel is _MainFunctionArgs) { | |
652 topLevel(args); | |
653 } else { | |
654 topLevel(); | |
655 } | |
620 } | 656 } |
621 | 657 |
622 /** | 658 /** |
623 * Spawns an isolate in a worker. [factoryName] is the Javascript constructor | 659 * Spawns an isolate in a worker. [factoryName] is the Javascript constructor |
624 * name for the isolate entry point class. | 660 * name for the isolate entry point class. |
625 */ | 661 */ |
626 static void _spawnWorker(functionName, uri, replyPort) { | 662 static void _spawnWorker(functionName, String uri, |
663 List<String> args, message, | |
664 bool isSpawnUri, | |
665 SendPort replyPort) { | |
666 print("message: $message"); | |
Lasse Reichstein Nielsen
2013/10/25 12:45:59
Debug print.
Was that mine?
floitsch
2013/10/25 13:52:56
probably mine.
| |
627 if (uri == null) uri = thisScript; | 667 if (uri == null) uri = thisScript; |
628 final worker = JS('var', 'new Worker(#)', uri); | 668 final worker = JS('var', 'new Worker(#)', uri); |
629 | 669 |
630 var processWorkerMessageTrampoline = | 670 var processWorkerMessageTrampoline = |
631 JS('', 'function(e) { #(#, e); }', | 671 JS('', 'function(e) { #(#, e); }', |
632 DART_CLOSURE_TO_JS(_processWorkerMessage), | 672 DART_CLOSURE_TO_JS(_processWorkerMessage), |
633 worker); | 673 worker); |
634 JS('void', '#.onmessage = #', worker, processWorkerMessageTrampoline); | 674 JS('void', '#.onmessage = #', worker, processWorkerMessageTrampoline); |
635 var workerId = _globalState.nextManagerId++; | 675 var workerId = _globalState.nextManagerId++; |
636 // We also store the id on the worker itself so that we can unregister it. | 676 // We also store the id on the worker itself so that we can unregister it. |
637 workerIds[worker] = workerId; | 677 workerIds[worker] = workerId; |
638 _globalState.managers[workerId] = worker; | 678 _globalState.managers[workerId] = worker; |
639 JS('void', '#.postMessage(#)', worker, _serializeMessage({ | 679 JS('void', '#.postMessage(#)', worker, _serializeMessage({ |
640 'command': 'start', | 680 'command': 'start', |
641 'id': workerId, | 681 'id': workerId, |
642 // Note: we serialize replyPort twice because the child worker needs to | 682 // Note: we serialize replyPort twice because the child worker needs to |
643 // first deserialize the worker id, before it can correctly deserialize | 683 // first deserialize the worker id, before it can correctly deserialize |
644 // the port (port deserialization is sensitive to what is the current | 684 // the port (port deserialization is sensitive to what is the current |
645 // workerId). | 685 // workerId). |
646 'replyTo': _serializeMessage(replyPort), | 686 'replyTo': _serializeMessage(replyPort), |
687 'args': args, | |
688 'msg': _serializeMessage(message), | |
689 'isSpawnUri': isSpawnUri, | |
647 'functionName': functionName })); | 690 'functionName': functionName })); |
648 } | 691 } |
649 } | 692 } |
650 | 693 |
651 /******************************************************** | 694 /******************************************************** |
652 Inserted from lib/isolate/dart2js/ports.dart | 695 Inserted from lib/isolate/dart2js/ports.dart |
653 ********************************************************/ | 696 ********************************************************/ |
654 | 697 |
655 /** Common functionality to all send ports. */ | 698 /** Common functionality to all send ports. */ |
656 class _BaseSendPort implements SendPort { | 699 class _BaseSendPort implements SendPort { |
657 /** Id for the destination isolate. */ | 700 /** Id for the destination isolate. */ |
658 final int _isolateId; | 701 final int _isolateId; |
659 | 702 |
660 const _BaseSendPort(this._isolateId); | 703 const _BaseSendPort(this._isolateId); |
661 | 704 |
662 void _checkReplyTo(SendPort replyTo) { | 705 void _checkReplyTo(SendPort replyTo) { |
663 if (replyTo != null | 706 if (replyTo != null |
664 && replyTo is! _NativeJsSendPort | 707 && replyTo is! _NativeJsSendPort |
665 && replyTo is! _WorkerSendPort | 708 && replyTo is! _WorkerSendPort |
666 && replyTo is! _BufferingSendPort) { | 709 && replyTo is! _BufferingSendPort) { |
667 throw new Exception("SendPort.send: Illegal replyTo port type"); | 710 throw new Exception("SendPort.send: Illegal replyTo port type"); |
668 } | 711 } |
669 } | 712 } |
670 | 713 |
671 Future call(var message) { | 714 void send(var message); |
672 final completer = new Completer(); | |
673 final port = new ReceivePortImpl(); | |
674 send(message, port.toSendPort()); | |
675 port.receive((value, ignoreReplyTo) { | |
676 port.close(); | |
677 if (value is Exception) { | |
678 completer.completeError(value); | |
679 } else { | |
680 completer.complete(value); | |
681 } | |
682 }); | |
683 return completer.future; | |
684 } | |
685 | |
686 void send(var message, [SendPort replyTo]); | |
687 bool operator ==(var other); | 715 bool operator ==(var other); |
688 int get hashCode; | 716 int get hashCode; |
689 } | 717 } |
690 | 718 |
691 /** A send port that delivers messages in-memory via native JavaScript calls. */ | 719 /** A send port that delivers messages in-memory via native JavaScript calls. */ |
692 class _NativeJsSendPort extends _BaseSendPort implements SendPort { | 720 class _NativeJsSendPort extends _BaseSendPort implements SendPort { |
693 final ReceivePortImpl _receivePort; | 721 final ReceivePortImpl _receivePort; |
694 | 722 |
695 const _NativeJsSendPort(this._receivePort, int isolateId) : super(isolateId); | 723 const _NativeJsSendPort(this._receivePort, int isolateId) : super(isolateId); |
696 | 724 |
697 void send(var message, [SendPort replyTo = null]) { | 725 void send(var message, [SendPort replyTo]) { |
698 _waitForPendingPorts([message, replyTo], () { | 726 _waitForPendingPorts(message, () { |
699 _checkReplyTo(replyTo); | |
700 // Check that the isolate still runs and the port is still open | 727 // Check that the isolate still runs and the port is still open |
701 final isolate = _globalState.isolates[_isolateId]; | 728 final isolate = _globalState.isolates[_isolateId]; |
702 if (isolate == null) return; | 729 if (isolate == null) return; |
703 if (_receivePort._callback == null) return; | 730 if (_receivePort._controller.isClosed) return; |
704 | 731 |
705 // We force serialization/deserialization as a simple way to ensure | 732 // We force serialization/deserialization as a simple way to ensure |
706 // isolate communication restrictions are respected between isolates that | 733 // isolate communication restrictions are respected between isolates that |
707 // live in the same worker. [_NativeJsSendPort] delivers both messages | 734 // live in the same worker. [_NativeJsSendPort] delivers both messages |
708 // from the same worker and messages from other workers. In particular, | 735 // from the same worker and messages from other workers. In particular, |
709 // messages sent from a worker via a [_WorkerSendPort] are received at | 736 // messages sent from a worker via a [_WorkerSendPort] are received at |
710 // [_processWorkerMessage] and forwarded to a native port. In such cases, | 737 // [_processWorkerMessage] and forwarded to a native port. In such cases, |
711 // here we'll see [_globalState.currentContext == null]. | 738 // here we'll see [_globalState.currentContext == null]. |
712 final shouldSerialize = _globalState.currentContext != null | 739 final shouldSerialize = _globalState.currentContext != null |
713 && _globalState.currentContext.id != _isolateId; | 740 && _globalState.currentContext.id != _isolateId; |
714 var msg = message; | 741 var msg = message; |
715 var reply = replyTo; | |
716 if (shouldSerialize) { | 742 if (shouldSerialize) { |
717 msg = _serializeMessage(msg); | 743 msg = _serializeMessage(msg); |
718 reply = _serializeMessage(reply); | |
719 } | 744 } |
720 _globalState.topEventLoop.enqueue(isolate, () { | 745 _globalState.topEventLoop.enqueue(isolate, () { |
721 if (_receivePort._callback != null) { | 746 if (!_receivePort._controller.isClosed) { |
722 if (shouldSerialize) { | 747 if (shouldSerialize) { |
723 msg = _deserializeMessage(msg); | 748 msg = _deserializeMessage(msg); |
724 reply = _deserializeMessage(reply); | |
725 } | 749 } |
726 _receivePort._callback(msg, reply); | 750 _receivePort._controller.add(msg); |
727 } | 751 } |
728 }, 'receive $message'); | 752 }, 'receive $message'); |
729 }); | 753 }); |
730 } | 754 } |
731 | 755 |
732 bool operator ==(var other) => (other is _NativeJsSendPort) && | 756 bool operator ==(var other) => (other is _NativeJsSendPort) && |
733 (_receivePort == other._receivePort); | 757 (_receivePort == other._receivePort); |
734 | 758 |
735 int get hashCode => _receivePort._id; | 759 int get hashCode => _receivePort._id; |
736 } | 760 } |
737 | 761 |
738 /** A send port that delivers messages via worker.postMessage. */ | 762 /** A send port that delivers messages via worker.postMessage. */ |
739 // TODO(eub): abstract this for iframes. | 763 // TODO(eub): abstract this for iframes. |
740 class _WorkerSendPort extends _BaseSendPort implements SendPort { | 764 class _WorkerSendPort extends _BaseSendPort implements SendPort { |
741 final int _workerId; | 765 final int _workerId; |
742 final int _receivePortId; | 766 final int _receivePortId; |
743 | 767 |
744 const _WorkerSendPort(this._workerId, int isolateId, this._receivePortId) | 768 const _WorkerSendPort(this._workerId, int isolateId, this._receivePortId) |
745 : super(isolateId); | 769 : super(isolateId); |
746 | 770 |
747 void send(var message, [SendPort replyTo = null]) { | 771 void send(var message, [SendPort replyTo]) { |
748 _waitForPendingPorts([message, replyTo], () { | 772 _waitForPendingPorts(message, () { |
749 _checkReplyTo(replyTo); | |
750 final workerMessage = _serializeMessage({ | 773 final workerMessage = _serializeMessage({ |
751 'command': 'message', | 774 'command': 'message', |
752 'port': this, | 775 'port': this, |
753 'msg': message, | 776 'msg': message}); |
754 'replyTo': replyTo}); | |
755 | 777 |
756 if (_globalState.isWorker) { | 778 if (_globalState.isWorker) { |
757 // Communication from one worker to another go through the | 779 // Communication from one worker to another go through the |
758 // main worker. | 780 // main worker. |
759 _globalState.mainManager.postMessage(workerMessage); | 781 _globalState.mainManager.postMessage(workerMessage); |
760 } else { | 782 } else { |
761 // Deliver the message only if the worker is still alive. | 783 // Deliver the message only if the worker is still alive. |
762 /* Worker */ var manager = _globalState.managers[_workerId]; | 784 /* Worker */ var manager = _globalState.managers[_workerId]; |
763 if (manager != null) { | 785 if (manager != null) { |
764 JS('void', '#.postMessage(#)', manager, workerMessage); | 786 JS('void', '#.postMessage(#)', manager, workerMessage); |
(...skipping 34 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
799 | 821 |
800 /** Pending messages (and reply ports). */ | 822 /** Pending messages (and reply ports). */ |
801 List pending; | 823 List pending; |
802 | 824 |
803 _BufferingSendPort(isolateId, this._futurePort) | 825 _BufferingSendPort(isolateId, this._futurePort) |
804 : super(isolateId), _id = _idCount, pending = [] { | 826 : super(isolateId), _id = _idCount, pending = [] { |
805 _idCount++; | 827 _idCount++; |
806 _futurePort.then((p) { | 828 _futurePort.then((p) { |
807 _port = p; | 829 _port = p; |
808 for (final item in pending) { | 830 for (final item in pending) { |
809 p.send(item['message'], item['replyTo']); | 831 p.send(item); |
810 } | 832 } |
811 pending = null; | 833 pending = null; |
812 }); | 834 }); |
813 } | 835 } |
814 | 836 |
815 _BufferingSendPort.fromPort(isolateId, this._port) | 837 _BufferingSendPort.fromPort(isolateId, this._port) |
816 : super(isolateId), _id = _idCount { | 838 : super(isolateId), _id = _idCount { |
817 _idCount++; | 839 _idCount++; |
818 } | 840 } |
819 | 841 |
820 void send(var message, [SendPort replyTo]) { | 842 void send(var message, [SendPort replyTo]) { |
821 if (_port != null) { | 843 if (_port != null) { |
822 _port.send(message, replyTo); | 844 _port.send(message, replyTo); |
823 } else { | 845 } else { |
824 pending.add({'message': message, 'replyTo': replyTo}); | 846 pending.add(message); |
825 } | 847 } |
826 } | 848 } |
827 | 849 |
828 bool operator ==(var other) => | 850 bool operator ==(var other) => |
829 other is _BufferingSendPort && _id == other._id; | 851 other is _BufferingSendPort && _id == other._id; |
830 int get hashCode => _id; | 852 int get hashCode => _id; |
831 } | 853 } |
832 | 854 |
833 /** Implementation of a multi-use [ReceivePort] on top of JavaScript. */ | 855 /** Implementation of a multi-use [ReceivePort] on top of JavaScript. */ |
834 class ReceivePortImpl implements ReceivePort { | 856 class ReceivePortImpl extends Stream implements ReceivePort { |
835 int _id; | |
836 Function _callback; | |
837 static int _nextFreeId = 1; | 857 static int _nextFreeId = 1; |
858 final int _id; | |
859 StreamController _controller; | |
838 | 860 |
839 ReceivePortImpl() | 861 ReceivePortImpl() |
840 : _id = _nextFreeId++ { | 862 : _id = _nextFreeId++ { |
863 _controller = new StreamController(onCancel: close, sync: true); | |
841 _globalState.currentContext.register(_id, this); | 864 _globalState.currentContext.register(_id, this); |
842 } | 865 } |
843 | 866 |
844 void receive(void onMessage(var message, SendPort replyTo)) { | 867 StreamSubscription listen(void onData(var event), |
845 _callback = onMessage; | 868 {Function onError, |
869 void onDone(), | |
870 bool cancelOnError}) { | |
871 return _controller.stream.listen(onData, onError: onError, onDone: onDone, | |
872 cancelOnError: cancelOnError); | |
846 } | 873 } |
847 | 874 |
848 void close() { | 875 void close() { |
849 _callback = null; | 876 if (_controller.isClosed) return; |
877 _controller.close(); | |
850 _globalState.currentContext.unregister(_id); | 878 _globalState.currentContext.unregister(_id); |
851 } | 879 } |
852 | 880 |
853 SendPort toSendPort() { | 881 SendPort get sendPort { |
854 return new _NativeJsSendPort(this, _globalState.currentContext.id); | 882 return new _NativeJsSendPort(this, _globalState.currentContext.id); |
855 } | 883 } |
856 } | 884 } |
857 | 885 |
858 /** Wait until all ports in a message are resolved. */ | 886 /** Wait until all ports in a message are resolved. */ |
859 _waitForPendingPorts(var message, void callback()) { | 887 _waitForPendingPorts(var message, void callback()) { |
860 final finder = new _PendingSendPortFinder(); | 888 final finder = new _PendingSendPortFinder(); |
861 finder.traverse(message); | 889 finder.traverse(message); |
862 Future.wait(finder.ports).then((_) => callback()); | 890 Future.wait(finder.ports).then((_) => callback()); |
863 } | 891 } |
864 | 892 |
865 | 893 |
866 /** Visitor that finds all unresolved [SendPort]s in a message. */ | 894 /** Visitor that finds all unresolved [SendPort]s in a message. */ |
867 class _PendingSendPortFinder extends _MessageTraverser { | 895 class _PendingSendPortFinder extends _MessageTraverser { |
868 List<Future<SendPort>> ports; | 896 List<Future<SendPort>> ports; |
869 _PendingSendPortFinder() : super(), ports = [] { | 897 _PendingSendPortFinder() : super(), ports = [] { |
870 _visited = new _JsVisitedMap(); | 898 _visited = new _JsVisitedMap(); |
871 } | 899 } |
872 | 900 |
873 visitPrimitive(x) {} | 901 visitPrimitive(x) {} |
874 | 902 |
875 visitList(List list) { | 903 visitList(List list) { |
876 final seen = _visited[list]; | 904 final seen = _visited[list]; |
877 if (seen != null) return; | 905 if (seen != null) return; |
878 _visited[list] = true; | 906 _visited[list] = true; |
879 // TODO(sigmund): replace with the following: (bug #1660) | 907 list.forEach(_dispatch); |
880 // list.forEach(_dispatch); | |
881 list.forEach((e) => _dispatch(e)); | |
882 } | 908 } |
883 | 909 |
884 visitMap(Map map) { | 910 visitMap(Map map) { |
885 final seen = _visited[map]; | 911 final seen = _visited[map]; |
886 if (seen != null) return; | 912 if (seen != null) return; |
887 | 913 |
888 _visited[map] = true; | 914 _visited[map] = true; |
889 // TODO(sigmund): replace with the following: (bug #1660) | 915 map.values.forEach(_dispatch); |
890 // map.values.forEach(_dispatch); | |
891 map.values.forEach((e) => _dispatch(e)); | |
892 } | 916 } |
893 | 917 |
894 visitSendPort(var port) { | 918 visitSendPort(var port) { |
895 if (port is _BufferingSendPort && port._port == null) { | 919 if (port is _BufferingSendPort && port._port == null) { |
896 ports.add(port._futurePort); | 920 ports.add(port._futurePort); |
897 } | 921 } |
898 } | 922 } |
899 } | 923 } |
900 | 924 |
901 /******************************************************** | 925 /******************************************************** |
(...skipping 441 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
1343 _handle = null; | 1367 _handle = null; |
1344 } else { | 1368 } else { |
1345 throw new UnsupportedError("Canceling a timer."); | 1369 throw new UnsupportedError("Canceling a timer."); |
1346 } | 1370 } |
1347 } | 1371 } |
1348 | 1372 |
1349 bool get isActive => _handle != null; | 1373 bool get isActive => _handle != null; |
1350 } | 1374 } |
1351 | 1375 |
1352 bool hasTimer() => JS('', '#.setTimeout', globalThis) != null; | 1376 bool hasTimer() => JS('', '#.setTimeout', globalThis) != null; |
OLD | NEW |