Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 library _isolate_helper; | 5 library _isolate_helper; |
| 6 | 6 |
| 7 import 'dart:async'; | 7 import 'dart:async'; |
| 8 import 'dart:collection' show Queue, HashMap; | 8 import 'dart:collection' show Queue, HashMap; |
| 9 import 'dart:isolate'; | 9 import 'dart:isolate'; |
| 10 import 'dart:_js_helper' show | 10 import 'dart:_js_helper' show |
| (...skipping 367 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 378 } | 378 } |
| 379 | 379 |
| 380 const String _SPAWNED_SIGNAL = "spawned"; | 380 const String _SPAWNED_SIGNAL = "spawned"; |
| 381 | 381 |
| 382 var globalThis = Primitives.computeGlobalThis(); | 382 var globalThis = Primitives.computeGlobalThis(); |
| 383 var globalWindow = JS('', "#.window", globalThis); | 383 var globalWindow = JS('', "#.window", globalThis); |
| 384 var globalWorker = JS('', "#.Worker", globalThis); | 384 var globalWorker = JS('', "#.Worker", globalThis); |
| 385 bool globalPostMessageDefined = | 385 bool globalPostMessageDefined = |
| 386 JS('', "#.postMessage !== (void 0)", globalThis); | 386 JS('', "#.postMessage !== (void 0)", globalThis); |
| 387 | 387 |
| 388 typedef _MainFunction(); | |
| 389 typedef _MainFunctionArgs(args); | |
| 390 typedef _MainFunctionArgsMessage(args, message); | |
| 391 | |
| 388 class IsolateNatives { | 392 class IsolateNatives { |
| 389 | 393 |
| 390 static String thisScript = computeThisScript(); | 394 static String thisScript = computeThisScript(); |
| 391 | 395 |
| 392 /// Associates an ID with a native worker object. | 396 /// Associates an ID with a native worker object. |
| 393 static final Expando<int> workerIds = new Expando<int>(); | 397 static final Expando<int> workerIds = new Expando<int>(); |
| 394 | 398 |
| 395 /** | 399 /** |
| 396 * The src url for the script tag that loaded this code. Used to create | 400 * The src url for the script tag that loaded this code. Used to create |
| 397 * JavaScript workers. | 401 * JavaScript workers. |
| (...skipping 63 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 461 */ | 465 */ |
| 462 static void _processWorkerMessage(/* Worker */ sender, e) { | 466 static void _processWorkerMessage(/* Worker */ sender, e) { |
| 463 var msg = _deserializeMessage(_getEventData(e)); | 467 var msg = _deserializeMessage(_getEventData(e)); |
| 464 switch (msg['command']) { | 468 switch (msg['command']) { |
| 465 case 'start': | 469 case 'start': |
| 466 _globalState.currentManagerId = msg['id']; | 470 _globalState.currentManagerId = msg['id']; |
| 467 String functionName = msg['functionName']; | 471 String functionName = msg['functionName']; |
| 468 Function entryPoint = (functionName == null) | 472 Function entryPoint = (functionName == null) |
| 469 ? _globalState.entry | 473 ? _globalState.entry |
| 470 : _getJSFunctionFromName(functionName); | 474 : _getJSFunctionFromName(functionName); |
| 475 var args = msg['args']; | |
|
Lasse Reichstein Nielsen
2013/10/25 12:45:59
Are we sure we don't need to deserializer args too
floitsch
2013/10/25 13:52:56
We need to check, because dart2js will (or should)
| |
| 476 var message = _deserializeMessage(msg['msg']); | |
| 477 var isSpawnUri = msg['isSpawnUri']; | |
| 471 var replyTo = _deserializeMessage(msg['replyTo']); | 478 var replyTo = _deserializeMessage(msg['replyTo']); |
| 472 var context = new _IsolateContext(); | 479 var context = new _IsolateContext(); |
| 473 _globalState.topEventLoop.enqueue(context, () { | 480 _globalState.topEventLoop.enqueue(context, () { |
| 474 _startIsolate(entryPoint, replyTo); | 481 _startIsolate(entryPoint, args, message, isSpawnUri, replyTo); |
| 475 }, 'worker-start'); | 482 }, 'worker-start'); |
| 476 // Make sure we always have a current context in this worker. | 483 // Make sure we always have a current context in this worker. |
| 477 // TODO(7907): This is currently needed because we're using | 484 // TODO(7907): This is currently needed because we're using |
| 478 // Timers to implement Futures, and this isolate library | 485 // Timers to implement Futures, and this isolate library |
| 479 // implementation uses Futures. We should either stop using | 486 // implementation uses Futures. We should either stop using |
| 480 // Futures in this library, or re-adapt if Futures get a | 487 // Futures in this library, or re-adapt if Futures get a |
| 481 // different implementation. | 488 // different implementation. |
| 482 _globalState.currentContext = context; | 489 _globalState.currentContext = context; |
| 483 _globalState.topEventLoop.run(); | 490 _globalState.topEventLoop.run(); |
| 484 break; | 491 break; |
| 485 case 'spawn-worker': | 492 case 'spawn-worker': |
| 486 _spawnWorker(msg['functionName'], msg['uri'], msg['replyPort']); | 493 _spawnWorker(msg['functionName'], msg['uri'], |
| 494 msg['args'], msg['msg'], | |
| 495 msg['isSpawnUri'], msg['replyPort']); | |
| 487 break; | 496 break; |
| 488 case 'message': | 497 case 'message': |
| 489 SendPort port = msg['port']; | 498 SendPort port = msg['port']; |
| 490 // If the port has been closed, we ignore the message. | 499 // If the port has been closed, we ignore the message. |
| 491 if (port != null) { | 500 if (port != null) { |
| 492 msg['port'].send(msg['msg'], msg['replyTo']); | 501 msg['port'].send(msg['msg']); |
| 493 } | 502 } |
| 494 _globalState.topEventLoop.run(); | 503 _globalState.topEventLoop.run(); |
| 495 break; | 504 break; |
| 496 case 'close': | 505 case 'close': |
| 497 _globalState.managers.remove(workerIds[sender]); | 506 _globalState.managers.remove(workerIds[sender]); |
| 498 JS('void', '#.terminate()', sender); | 507 JS('void', '#.terminate()', sender); |
| 499 _globalState.topEventLoop.run(); | 508 _globalState.topEventLoop.run(); |
| 500 break; | 509 break; |
| 501 case 'log': | 510 case 'log': |
| 502 _log(msg['msg']); | 511 _log(msg['msg']); |
| (...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 543 */ | 552 */ |
| 544 static String _getJSFunctionName(Function f) { | 553 static String _getJSFunctionName(Function f) { |
| 545 return JS("String|Null", r"(#['$name'] || #)", f, null); | 554 return JS("String|Null", r"(#['$name'] || #)", f, null); |
| 546 } | 555 } |
| 547 | 556 |
| 548 /** Create a new JavaScript object instance given its constructor. */ | 557 /** Create a new JavaScript object instance given its constructor. */ |
| 549 static dynamic _allocate(var ctor) { | 558 static dynamic _allocate(var ctor) { |
| 550 return JS("", "new #()", ctor); | 559 return JS("", "new #()", ctor); |
| 551 } | 560 } |
| 552 | 561 |
| 553 static SendPort spawnFunction(void topLevelFunction()) { | 562 static SendPort spawnFunction(void topLevelFunction(message), message) { |
| 554 final name = _getJSFunctionName(topLevelFunction); | 563 final name = _getJSFunctionName(topLevelFunction); |
| 555 if (name == null) { | 564 if (name == null) { |
| 556 throw new UnsupportedError( | 565 throw new UnsupportedError( |
| 557 "only top-level functions can be spawned."); | 566 "only top-level functions can be spawned."); |
| 558 } | 567 } |
| 559 return spawn(name, null, false); | 568 return spawn(name, null, null, message, false, false); |
| 569 } | |
| 570 | |
| 571 static SendPort spawnUri(Uri uri, List<String> args, message) { | |
| 572 return spawn(null, uri.path, args, message, false, true); | |
| 560 } | 573 } |
| 561 | 574 |
| 562 // TODO(sigmund): clean up above, after we make the new API the default: | 575 // TODO(sigmund): clean up above, after we make the new API the default: |
| 563 | 576 |
| 564 /// If [uri] is `null` it is replaced with the current script. | 577 /// If [uri] is `null` it is replaced with the current script. |
| 565 static spawn(String functionName, String uri, bool isLight) { | 578 static SendPort spawn(String functionName, String uri, |
| 579 List<String> args, message, | |
| 580 bool isLight, bool isSpawnUri) { | |
| 566 // Assume that the compiled version of the Dart file lives just next to the | 581 // Assume that the compiled version of the Dart file lives just next to the |
| 567 // dart file. | 582 // dart file. |
| 568 // TODO(floitsch): support precompiled version of dart2js output. | 583 // TODO(floitsch): support precompiled version of dart2js output. |
| 569 if (uri != null && uri.endsWith(".dart")) uri += ".js"; | 584 if (uri != null && uri.endsWith(".dart")) uri += ".js"; |
| 570 | 585 |
| 571 Completer<SendPort> completer = new Completer<SendPort>.sync(); | 586 Completer<SendPort> completer = new Completer<SendPort>.sync(); |
| 572 ReceivePort port = new ReceivePort(); | 587 ReceivePort port = new ReceivePort(); |
| 573 port.receive((msg, SendPort replyPort) { | 588 port.listen((msg) { |
| 574 port.close(); | 589 port.close(); |
| 575 assert(msg == _SPAWNED_SIGNAL); | 590 assert(msg[0] == _SPAWNED_SIGNAL); |
| 576 completer.complete(replyPort); | 591 completer.complete(msg[1]); |
| 577 }); | 592 }); |
| 578 | 593 |
| 579 SendPort signalReply = port.toSendPort(); | 594 SendPort signalReply = port.sendPort; |
| 580 | 595 |
| 581 if (_globalState.useWorkers && !isLight) { | 596 if (_globalState.useWorkers && !isLight) { |
| 582 _startWorker(functionName, uri, signalReply); | 597 _startWorker(functionName, uri, args, message, isSpawnUri, signalReply); |
| 583 } else { | 598 } else { |
| 584 _startNonWorker(functionName, uri, signalReply); | 599 _startNonWorker( |
| 600 functionName, uri, args, message, isSpawnUri, signalReply); | |
| 585 } | 601 } |
| 586 return new _BufferingSendPort( | 602 return new _BufferingSendPort( |
| 587 _globalState.currentContext.id, completer.future); | 603 _globalState.currentContext.id, completer.future); |
| 588 } | 604 } |
| 589 | 605 |
| 590 static SendPort _startWorker( | 606 static void _startWorker( |
| 591 String functionName, String uri, SendPort replyPort) { | 607 String functionName, String uri, |
| 608 List<String> args, message, | |
| 609 bool isSpawnUri, | |
| 610 SendPort replyPort) { | |
| 592 if (_globalState.isWorker) { | 611 if (_globalState.isWorker) { |
| 593 _globalState.mainManager.postMessage(_serializeMessage({ | 612 _globalState.mainManager.postMessage(_serializeMessage({ |
| 594 'command': 'spawn-worker', | 613 'command': 'spawn-worker', |
| 595 'functionName': functionName, | 614 'functionName': functionName, |
| 615 'args': args, | |
| 616 'msg': message, | |
| 596 'uri': uri, | 617 'uri': uri, |
| 618 'isSpawnUri': isSpawnUri, | |
| 597 'replyPort': replyPort})); | 619 'replyPort': replyPort})); |
| 598 } else { | 620 } else { |
| 599 _spawnWorker(functionName, uri, replyPort); | 621 _spawnWorker(functionName, uri, args, message, isSpawnUri, replyPort); |
| 600 } | 622 } |
| 601 } | 623 } |
| 602 | 624 |
| 603 static SendPort _startNonWorker( | 625 static void _startNonWorker( |
| 604 String functionName, String uri, SendPort replyPort) { | 626 String functionName, String uri, |
| 627 List<String> args, message, | |
| 628 bool isSpawnUri, | |
| 629 SendPort replyPort) { | |
| 605 // TODO(eub): support IE9 using an iframe -- Dart issue 1702. | 630 // TODO(eub): support IE9 using an iframe -- Dart issue 1702. |
| 606 if (uri != null) throw new UnsupportedError( | 631 if (uri != null) throw new UnsupportedError( |
|
Lasse Reichstein Nielsen
2013/10/25 12:45:59
Multi-line if needs braces.
floitsch
2013/10/25 13:52:56
Done.
| |
| 607 "Currently spawnUri is not supported without web workers."); | 632 "Currently spawnUri is not supported without web workers."); |
| 608 _globalState.topEventLoop.enqueue(new _IsolateContext(), () { | 633 _globalState.topEventLoop.enqueue(new _IsolateContext(), () { |
| 609 final func = _getJSFunctionFromName(functionName); | 634 final func = _getJSFunctionFromName(functionName); |
| 610 _startIsolate(func, replyPort); | 635 _startIsolate(func, args, message, isSpawnUri, replyPort); |
| 611 }, 'nonworker start'); | 636 }, 'nonworker start'); |
| 612 } | 637 } |
| 613 | 638 |
| 614 static void _startIsolate(Function topLevel, SendPort replyTo) { | 639 static void _startIsolate(Function topLevel, |
| 640 List<String> args, message, | |
| 641 bool isSpawnUri, | |
| 642 SendPort replyTo) { | |
| 615 _IsolateContext context = JS_CURRENT_ISOLATE_CONTEXT(); | 643 _IsolateContext context = JS_CURRENT_ISOLATE_CONTEXT(); |
| 616 Primitives.initializeStatics(context.id); | 644 Primitives.initializeStatics(context.id); |
| 617 lazyPort = new ReceivePort(); | 645 lazyPort = new ReceivePort(); |
| 618 replyTo.send(_SPAWNED_SIGNAL, port.toSendPort()); | 646 replyTo.send([_SPAWNED_SIGNAL, lazyPort.sendPort]); |
| 619 topLevel(); | 647 if (!isSpawnUri) { |
| 648 topLevel(message); | |
| 649 } else if (topLevel is _MainFunctionArgsMessage) { | |
|
Lasse Reichstein Nielsen
2013/10/25 12:45:59
If message was null, should we have preferred to c
floitsch
2013/10/25 13:52:56
I prefer providing the argument. This is the expli
| |
| 650 topLevel(args, message); | |
| 651 } else if (topLevel is _MainFunctionArgs) { | |
| 652 topLevel(args); | |
| 653 } else { | |
| 654 topLevel(); | |
| 655 } | |
| 620 } | 656 } |
| 621 | 657 |
| 622 /** | 658 /** |
| 623 * Spawns an isolate in a worker. [factoryName] is the Javascript constructor | 659 * Spawns an isolate in a worker. [factoryName] is the Javascript constructor |
| 624 * name for the isolate entry point class. | 660 * name for the isolate entry point class. |
| 625 */ | 661 */ |
| 626 static void _spawnWorker(functionName, uri, replyPort) { | 662 static void _spawnWorker(functionName, String uri, |
| 663 List<String> args, message, | |
| 664 bool isSpawnUri, | |
| 665 SendPort replyPort) { | |
| 666 print("message: $message"); | |
|
Lasse Reichstein Nielsen
2013/10/25 12:45:59
Debug print.
Was that mine?
floitsch
2013/10/25 13:52:56
probably mine.
| |
| 627 if (uri == null) uri = thisScript; | 667 if (uri == null) uri = thisScript; |
| 628 final worker = JS('var', 'new Worker(#)', uri); | 668 final worker = JS('var', 'new Worker(#)', uri); |
| 629 | 669 |
| 630 var processWorkerMessageTrampoline = | 670 var processWorkerMessageTrampoline = |
| 631 JS('', 'function(e) { #(#, e); }', | 671 JS('', 'function(e) { #(#, e); }', |
| 632 DART_CLOSURE_TO_JS(_processWorkerMessage), | 672 DART_CLOSURE_TO_JS(_processWorkerMessage), |
| 633 worker); | 673 worker); |
| 634 JS('void', '#.onmessage = #', worker, processWorkerMessageTrampoline); | 674 JS('void', '#.onmessage = #', worker, processWorkerMessageTrampoline); |
| 635 var workerId = _globalState.nextManagerId++; | 675 var workerId = _globalState.nextManagerId++; |
| 636 // We also store the id on the worker itself so that we can unregister it. | 676 // We also store the id on the worker itself so that we can unregister it. |
| 637 workerIds[worker] = workerId; | 677 workerIds[worker] = workerId; |
| 638 _globalState.managers[workerId] = worker; | 678 _globalState.managers[workerId] = worker; |
| 639 JS('void', '#.postMessage(#)', worker, _serializeMessage({ | 679 JS('void', '#.postMessage(#)', worker, _serializeMessage({ |
| 640 'command': 'start', | 680 'command': 'start', |
| 641 'id': workerId, | 681 'id': workerId, |
| 642 // Note: we serialize replyPort twice because the child worker needs to | 682 // Note: we serialize replyPort twice because the child worker needs to |
| 643 // first deserialize the worker id, before it can correctly deserialize | 683 // first deserialize the worker id, before it can correctly deserialize |
| 644 // the port (port deserialization is sensitive to what is the current | 684 // the port (port deserialization is sensitive to what is the current |
| 645 // workerId). | 685 // workerId). |
| 646 'replyTo': _serializeMessage(replyPort), | 686 'replyTo': _serializeMessage(replyPort), |
| 687 'args': args, | |
| 688 'msg': _serializeMessage(message), | |
| 689 'isSpawnUri': isSpawnUri, | |
| 647 'functionName': functionName })); | 690 'functionName': functionName })); |
| 648 } | 691 } |
| 649 } | 692 } |
| 650 | 693 |
| 651 /******************************************************** | 694 /******************************************************** |
| 652 Inserted from lib/isolate/dart2js/ports.dart | 695 Inserted from lib/isolate/dart2js/ports.dart |
| 653 ********************************************************/ | 696 ********************************************************/ |
| 654 | 697 |
| 655 /** Common functionality to all send ports. */ | 698 /** Common functionality to all send ports. */ |
| 656 class _BaseSendPort implements SendPort { | 699 class _BaseSendPort implements SendPort { |
| 657 /** Id for the destination isolate. */ | 700 /** Id for the destination isolate. */ |
| 658 final int _isolateId; | 701 final int _isolateId; |
| 659 | 702 |
| 660 const _BaseSendPort(this._isolateId); | 703 const _BaseSendPort(this._isolateId); |
| 661 | 704 |
| 662 void _checkReplyTo(SendPort replyTo) { | 705 void _checkReplyTo(SendPort replyTo) { |
| 663 if (replyTo != null | 706 if (replyTo != null |
| 664 && replyTo is! _NativeJsSendPort | 707 && replyTo is! _NativeJsSendPort |
| 665 && replyTo is! _WorkerSendPort | 708 && replyTo is! _WorkerSendPort |
| 666 && replyTo is! _BufferingSendPort) { | 709 && replyTo is! _BufferingSendPort) { |
| 667 throw new Exception("SendPort.send: Illegal replyTo port type"); | 710 throw new Exception("SendPort.send: Illegal replyTo port type"); |
| 668 } | 711 } |
| 669 } | 712 } |
| 670 | 713 |
| 671 Future call(var message) { | 714 void send(var message); |
| 672 final completer = new Completer(); | |
| 673 final port = new ReceivePortImpl(); | |
| 674 send(message, port.toSendPort()); | |
| 675 port.receive((value, ignoreReplyTo) { | |
| 676 port.close(); | |
| 677 if (value is Exception) { | |
| 678 completer.completeError(value); | |
| 679 } else { | |
| 680 completer.complete(value); | |
| 681 } | |
| 682 }); | |
| 683 return completer.future; | |
| 684 } | |
| 685 | |
| 686 void send(var message, [SendPort replyTo]); | |
| 687 bool operator ==(var other); | 715 bool operator ==(var other); |
| 688 int get hashCode; | 716 int get hashCode; |
| 689 } | 717 } |
| 690 | 718 |
| 691 /** A send port that delivers messages in-memory via native JavaScript calls. */ | 719 /** A send port that delivers messages in-memory via native JavaScript calls. */ |
| 692 class _NativeJsSendPort extends _BaseSendPort implements SendPort { | 720 class _NativeJsSendPort extends _BaseSendPort implements SendPort { |
| 693 final ReceivePortImpl _receivePort; | 721 final ReceivePortImpl _receivePort; |
| 694 | 722 |
| 695 const _NativeJsSendPort(this._receivePort, int isolateId) : super(isolateId); | 723 const _NativeJsSendPort(this._receivePort, int isolateId) : super(isolateId); |
| 696 | 724 |
| 697 void send(var message, [SendPort replyTo = null]) { | 725 void send(var message, [SendPort replyTo]) { |
| 698 _waitForPendingPorts([message, replyTo], () { | 726 _waitForPendingPorts(message, () { |
| 699 _checkReplyTo(replyTo); | |
| 700 // Check that the isolate still runs and the port is still open | 727 // Check that the isolate still runs and the port is still open |
| 701 final isolate = _globalState.isolates[_isolateId]; | 728 final isolate = _globalState.isolates[_isolateId]; |
| 702 if (isolate == null) return; | 729 if (isolate == null) return; |
| 703 if (_receivePort._callback == null) return; | 730 if (_receivePort._controller.isClosed) return; |
| 704 | 731 |
| 705 // We force serialization/deserialization as a simple way to ensure | 732 // We force serialization/deserialization as a simple way to ensure |
| 706 // isolate communication restrictions are respected between isolates that | 733 // isolate communication restrictions are respected between isolates that |
| 707 // live in the same worker. [_NativeJsSendPort] delivers both messages | 734 // live in the same worker. [_NativeJsSendPort] delivers both messages |
| 708 // from the same worker and messages from other workers. In particular, | 735 // from the same worker and messages from other workers. In particular, |
| 709 // messages sent from a worker via a [_WorkerSendPort] are received at | 736 // messages sent from a worker via a [_WorkerSendPort] are received at |
| 710 // [_processWorkerMessage] and forwarded to a native port. In such cases, | 737 // [_processWorkerMessage] and forwarded to a native port. In such cases, |
| 711 // here we'll see [_globalState.currentContext == null]. | 738 // here we'll see [_globalState.currentContext == null]. |
| 712 final shouldSerialize = _globalState.currentContext != null | 739 final shouldSerialize = _globalState.currentContext != null |
| 713 && _globalState.currentContext.id != _isolateId; | 740 && _globalState.currentContext.id != _isolateId; |
| 714 var msg = message; | 741 var msg = message; |
| 715 var reply = replyTo; | |
| 716 if (shouldSerialize) { | 742 if (shouldSerialize) { |
| 717 msg = _serializeMessage(msg); | 743 msg = _serializeMessage(msg); |
| 718 reply = _serializeMessage(reply); | |
| 719 } | 744 } |
| 720 _globalState.topEventLoop.enqueue(isolate, () { | 745 _globalState.topEventLoop.enqueue(isolate, () { |
| 721 if (_receivePort._callback != null) { | 746 if (!_receivePort._controller.isClosed) { |
| 722 if (shouldSerialize) { | 747 if (shouldSerialize) { |
| 723 msg = _deserializeMessage(msg); | 748 msg = _deserializeMessage(msg); |
| 724 reply = _deserializeMessage(reply); | |
| 725 } | 749 } |
| 726 _receivePort._callback(msg, reply); | 750 _receivePort._controller.add(msg); |
| 727 } | 751 } |
| 728 }, 'receive $message'); | 752 }, 'receive $message'); |
| 729 }); | 753 }); |
| 730 } | 754 } |
| 731 | 755 |
| 732 bool operator ==(var other) => (other is _NativeJsSendPort) && | 756 bool operator ==(var other) => (other is _NativeJsSendPort) && |
| 733 (_receivePort == other._receivePort); | 757 (_receivePort == other._receivePort); |
| 734 | 758 |
| 735 int get hashCode => _receivePort._id; | 759 int get hashCode => _receivePort._id; |
| 736 } | 760 } |
| 737 | 761 |
| 738 /** A send port that delivers messages via worker.postMessage. */ | 762 /** A send port that delivers messages via worker.postMessage. */ |
| 739 // TODO(eub): abstract this for iframes. | 763 // TODO(eub): abstract this for iframes. |
| 740 class _WorkerSendPort extends _BaseSendPort implements SendPort { | 764 class _WorkerSendPort extends _BaseSendPort implements SendPort { |
| 741 final int _workerId; | 765 final int _workerId; |
| 742 final int _receivePortId; | 766 final int _receivePortId; |
| 743 | 767 |
| 744 const _WorkerSendPort(this._workerId, int isolateId, this._receivePortId) | 768 const _WorkerSendPort(this._workerId, int isolateId, this._receivePortId) |
| 745 : super(isolateId); | 769 : super(isolateId); |
| 746 | 770 |
| 747 void send(var message, [SendPort replyTo = null]) { | 771 void send(var message, [SendPort replyTo]) { |
| 748 _waitForPendingPorts([message, replyTo], () { | 772 _waitForPendingPorts(message, () { |
| 749 _checkReplyTo(replyTo); | |
| 750 final workerMessage = _serializeMessage({ | 773 final workerMessage = _serializeMessage({ |
| 751 'command': 'message', | 774 'command': 'message', |
| 752 'port': this, | 775 'port': this, |
| 753 'msg': message, | 776 'msg': message}); |
| 754 'replyTo': replyTo}); | |
| 755 | 777 |
| 756 if (_globalState.isWorker) { | 778 if (_globalState.isWorker) { |
| 757 // Communication from one worker to another go through the | 779 // Communication from one worker to another go through the |
| 758 // main worker. | 780 // main worker. |
| 759 _globalState.mainManager.postMessage(workerMessage); | 781 _globalState.mainManager.postMessage(workerMessage); |
| 760 } else { | 782 } else { |
| 761 // Deliver the message only if the worker is still alive. | 783 // Deliver the message only if the worker is still alive. |
| 762 /* Worker */ var manager = _globalState.managers[_workerId]; | 784 /* Worker */ var manager = _globalState.managers[_workerId]; |
| 763 if (manager != null) { | 785 if (manager != null) { |
| 764 JS('void', '#.postMessage(#)', manager, workerMessage); | 786 JS('void', '#.postMessage(#)', manager, workerMessage); |
| (...skipping 34 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 799 | 821 |
| 800 /** Pending messages (and reply ports). */ | 822 /** Pending messages (and reply ports). */ |
| 801 List pending; | 823 List pending; |
| 802 | 824 |
| 803 _BufferingSendPort(isolateId, this._futurePort) | 825 _BufferingSendPort(isolateId, this._futurePort) |
| 804 : super(isolateId), _id = _idCount, pending = [] { | 826 : super(isolateId), _id = _idCount, pending = [] { |
| 805 _idCount++; | 827 _idCount++; |
| 806 _futurePort.then((p) { | 828 _futurePort.then((p) { |
| 807 _port = p; | 829 _port = p; |
| 808 for (final item in pending) { | 830 for (final item in pending) { |
| 809 p.send(item['message'], item['replyTo']); | 831 p.send(item); |
| 810 } | 832 } |
| 811 pending = null; | 833 pending = null; |
| 812 }); | 834 }); |
| 813 } | 835 } |
| 814 | 836 |
| 815 _BufferingSendPort.fromPort(isolateId, this._port) | 837 _BufferingSendPort.fromPort(isolateId, this._port) |
| 816 : super(isolateId), _id = _idCount { | 838 : super(isolateId), _id = _idCount { |
| 817 _idCount++; | 839 _idCount++; |
| 818 } | 840 } |
| 819 | 841 |
| 820 void send(var message, [SendPort replyTo]) { | 842 void send(var message, [SendPort replyTo]) { |
| 821 if (_port != null) { | 843 if (_port != null) { |
| 822 _port.send(message, replyTo); | 844 _port.send(message, replyTo); |
| 823 } else { | 845 } else { |
| 824 pending.add({'message': message, 'replyTo': replyTo}); | 846 pending.add(message); |
| 825 } | 847 } |
| 826 } | 848 } |
| 827 | 849 |
| 828 bool operator ==(var other) => | 850 bool operator ==(var other) => |
| 829 other is _BufferingSendPort && _id == other._id; | 851 other is _BufferingSendPort && _id == other._id; |
| 830 int get hashCode => _id; | 852 int get hashCode => _id; |
| 831 } | 853 } |
| 832 | 854 |
| 833 /** Implementation of a multi-use [ReceivePort] on top of JavaScript. */ | 855 /** Implementation of a multi-use [ReceivePort] on top of JavaScript. */ |
| 834 class ReceivePortImpl implements ReceivePort { | 856 class ReceivePortImpl extends Stream implements ReceivePort { |
| 835 int _id; | |
| 836 Function _callback; | |
| 837 static int _nextFreeId = 1; | 857 static int _nextFreeId = 1; |
| 858 final int _id; | |
| 859 StreamController _controller; | |
| 838 | 860 |
| 839 ReceivePortImpl() | 861 ReceivePortImpl() |
| 840 : _id = _nextFreeId++ { | 862 : _id = _nextFreeId++ { |
| 863 _controller = new StreamController(onCancel: close, sync: true); | |
| 841 _globalState.currentContext.register(_id, this); | 864 _globalState.currentContext.register(_id, this); |
| 842 } | 865 } |
| 843 | 866 |
| 844 void receive(void onMessage(var message, SendPort replyTo)) { | 867 StreamSubscription listen(void onData(var event), |
| 845 _callback = onMessage; | 868 {Function onError, |
| 869 void onDone(), | |
| 870 bool cancelOnError}) { | |
| 871 return _controller.stream.listen(onData, onError: onError, onDone: onDone, | |
| 872 cancelOnError: cancelOnError); | |
| 846 } | 873 } |
| 847 | 874 |
| 848 void close() { | 875 void close() { |
| 849 _callback = null; | 876 if (_controller.isClosed) return; |
| 877 _controller.close(); | |
| 850 _globalState.currentContext.unregister(_id); | 878 _globalState.currentContext.unregister(_id); |
| 851 } | 879 } |
| 852 | 880 |
| 853 SendPort toSendPort() { | 881 SendPort get sendPort { |
| 854 return new _NativeJsSendPort(this, _globalState.currentContext.id); | 882 return new _NativeJsSendPort(this, _globalState.currentContext.id); |
| 855 } | 883 } |
| 856 } | 884 } |
| 857 | 885 |
| 858 /** Wait until all ports in a message are resolved. */ | 886 /** Wait until all ports in a message are resolved. */ |
| 859 _waitForPendingPorts(var message, void callback()) { | 887 _waitForPendingPorts(var message, void callback()) { |
| 860 final finder = new _PendingSendPortFinder(); | 888 final finder = new _PendingSendPortFinder(); |
| 861 finder.traverse(message); | 889 finder.traverse(message); |
| 862 Future.wait(finder.ports).then((_) => callback()); | 890 Future.wait(finder.ports).then((_) => callback()); |
| 863 } | 891 } |
| 864 | 892 |
| 865 | 893 |
| 866 /** Visitor that finds all unresolved [SendPort]s in a message. */ | 894 /** Visitor that finds all unresolved [SendPort]s in a message. */ |
| 867 class _PendingSendPortFinder extends _MessageTraverser { | 895 class _PendingSendPortFinder extends _MessageTraverser { |
| 868 List<Future<SendPort>> ports; | 896 List<Future<SendPort>> ports; |
| 869 _PendingSendPortFinder() : super(), ports = [] { | 897 _PendingSendPortFinder() : super(), ports = [] { |
| 870 _visited = new _JsVisitedMap(); | 898 _visited = new _JsVisitedMap(); |
| 871 } | 899 } |
| 872 | 900 |
| 873 visitPrimitive(x) {} | 901 visitPrimitive(x) {} |
| 874 | 902 |
| 875 visitList(List list) { | 903 visitList(List list) { |
| 876 final seen = _visited[list]; | 904 final seen = _visited[list]; |
| 877 if (seen != null) return; | 905 if (seen != null) return; |
| 878 _visited[list] = true; | 906 _visited[list] = true; |
| 879 // TODO(sigmund): replace with the following: (bug #1660) | 907 list.forEach(_dispatch); |
| 880 // list.forEach(_dispatch); | |
| 881 list.forEach((e) => _dispatch(e)); | |
| 882 } | 908 } |
| 883 | 909 |
| 884 visitMap(Map map) { | 910 visitMap(Map map) { |
| 885 final seen = _visited[map]; | 911 final seen = _visited[map]; |
| 886 if (seen != null) return; | 912 if (seen != null) return; |
| 887 | 913 |
| 888 _visited[map] = true; | 914 _visited[map] = true; |
| 889 // TODO(sigmund): replace with the following: (bug #1660) | 915 map.values.forEach(_dispatch); |
| 890 // map.values.forEach(_dispatch); | |
| 891 map.values.forEach((e) => _dispatch(e)); | |
| 892 } | 916 } |
| 893 | 917 |
| 894 visitSendPort(var port) { | 918 visitSendPort(var port) { |
| 895 if (port is _BufferingSendPort && port._port == null) { | 919 if (port is _BufferingSendPort && port._port == null) { |
| 896 ports.add(port._futurePort); | 920 ports.add(port._futurePort); |
| 897 } | 921 } |
| 898 } | 922 } |
| 899 } | 923 } |
| 900 | 924 |
| 901 /******************************************************** | 925 /******************************************************** |
| (...skipping 441 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 1343 _handle = null; | 1367 _handle = null; |
| 1344 } else { | 1368 } else { |
| 1345 throw new UnsupportedError("Canceling a timer."); | 1369 throw new UnsupportedError("Canceling a timer."); |
| 1346 } | 1370 } |
| 1347 } | 1371 } |
| 1348 | 1372 |
| 1349 bool get isActive => _handle != null; | 1373 bool get isActive => _handle != null; |
| 1350 } | 1374 } |
| 1351 | 1375 |
| 1352 bool hasTimer() => JS('', '#.setTimeout', globalThis) != null; | 1376 bool hasTimer() => JS('', '#.setTimeout', globalThis) != null; |
| OLD | NEW |