Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(180)

Side by Side Diff: sdk/lib/_internal/lib/isolate_helper.dart

Issue 43663003: dart2js. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Created 7 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 library _isolate_helper; 5 library _isolate_helper;
6 6
7 import 'dart:async'; 7 import 'dart:async';
8 import 'dart:collection' show Queue, HashMap; 8 import 'dart:collection' show Queue, HashMap;
9 import 'dart:isolate'; 9 import 'dart:isolate';
10 import 'dart:_js_helper' show 10 import 'dart:_js_helper' show
(...skipping 367 matching lines...) Expand 10 before | Expand all | Expand 10 after
378 } 378 }
379 379
380 const String _SPAWNED_SIGNAL = "spawned"; 380 const String _SPAWNED_SIGNAL = "spawned";
381 381
382 var globalThis = Primitives.computeGlobalThis(); 382 var globalThis = Primitives.computeGlobalThis();
383 var globalWindow = JS('', "#.window", globalThis); 383 var globalWindow = JS('', "#.window", globalThis);
384 var globalWorker = JS('', "#.Worker", globalThis); 384 var globalWorker = JS('', "#.Worker", globalThis);
385 bool globalPostMessageDefined = 385 bool globalPostMessageDefined =
386 JS('', "#.postMessage !== (void 0)", globalThis); 386 JS('', "#.postMessage !== (void 0)", globalThis);
387 387
388 typedef _MainFunction();
389 typedef _MainFunctionArgs(args);
390 typedef _MainFunctionArgsMessage(args, message);
391
388 class IsolateNatives { 392 class IsolateNatives {
389 393
390 static String thisScript = computeThisScript(); 394 static String thisScript = computeThisScript();
391 395
392 /// Associates an ID with a native worker object. 396 /// Associates an ID with a native worker object.
393 static final Expando<int> workerIds = new Expando<int>(); 397 static final Expando<int> workerIds = new Expando<int>();
394 398
395 /** 399 /**
396 * The src url for the script tag that loaded this code. Used to create 400 * The src url for the script tag that loaded this code. Used to create
397 * JavaScript workers. 401 * JavaScript workers.
(...skipping 63 matching lines...) Expand 10 before | Expand all | Expand 10 after
461 */ 465 */
462 static void _processWorkerMessage(/* Worker */ sender, e) { 466 static void _processWorkerMessage(/* Worker */ sender, e) {
463 var msg = _deserializeMessage(_getEventData(e)); 467 var msg = _deserializeMessage(_getEventData(e));
464 switch (msg['command']) { 468 switch (msg['command']) {
465 case 'start': 469 case 'start':
466 _globalState.currentManagerId = msg['id']; 470 _globalState.currentManagerId = msg['id'];
467 String functionName = msg['functionName']; 471 String functionName = msg['functionName'];
468 Function entryPoint = (functionName == null) 472 Function entryPoint = (functionName == null)
469 ? _globalState.entry 473 ? _globalState.entry
470 : _getJSFunctionFromName(functionName); 474 : _getJSFunctionFromName(functionName);
475 var args = msg['args'];
Lasse Reichstein Nielsen 2013/10/25 12:45:59 Are we sure we don't need to deserializer args too
floitsch 2013/10/25 13:52:56 We need to check, because dart2js will (or should)
476 var message = _deserializeMessage(msg['msg']);
477 var isSpawnUri = msg['isSpawnUri'];
471 var replyTo = _deserializeMessage(msg['replyTo']); 478 var replyTo = _deserializeMessage(msg['replyTo']);
472 var context = new _IsolateContext(); 479 var context = new _IsolateContext();
473 _globalState.topEventLoop.enqueue(context, () { 480 _globalState.topEventLoop.enqueue(context, () {
474 _startIsolate(entryPoint, replyTo); 481 _startIsolate(entryPoint, args, message, isSpawnUri, replyTo);
475 }, 'worker-start'); 482 }, 'worker-start');
476 // Make sure we always have a current context in this worker. 483 // Make sure we always have a current context in this worker.
477 // TODO(7907): This is currently needed because we're using 484 // TODO(7907): This is currently needed because we're using
478 // Timers to implement Futures, and this isolate library 485 // Timers to implement Futures, and this isolate library
479 // implementation uses Futures. We should either stop using 486 // implementation uses Futures. We should either stop using
480 // Futures in this library, or re-adapt if Futures get a 487 // Futures in this library, or re-adapt if Futures get a
481 // different implementation. 488 // different implementation.
482 _globalState.currentContext = context; 489 _globalState.currentContext = context;
483 _globalState.topEventLoop.run(); 490 _globalState.topEventLoop.run();
484 break; 491 break;
485 case 'spawn-worker': 492 case 'spawn-worker':
486 _spawnWorker(msg['functionName'], msg['uri'], msg['replyPort']); 493 _spawnWorker(msg['functionName'], msg['uri'],
494 msg['args'], msg['msg'],
495 msg['isSpawnUri'], msg['replyPort']);
487 break; 496 break;
488 case 'message': 497 case 'message':
489 SendPort port = msg['port']; 498 SendPort port = msg['port'];
490 // If the port has been closed, we ignore the message. 499 // If the port has been closed, we ignore the message.
491 if (port != null) { 500 if (port != null) {
492 msg['port'].send(msg['msg'], msg['replyTo']); 501 msg['port'].send(msg['msg']);
493 } 502 }
494 _globalState.topEventLoop.run(); 503 _globalState.topEventLoop.run();
495 break; 504 break;
496 case 'close': 505 case 'close':
497 _globalState.managers.remove(workerIds[sender]); 506 _globalState.managers.remove(workerIds[sender]);
498 JS('void', '#.terminate()', sender); 507 JS('void', '#.terminate()', sender);
499 _globalState.topEventLoop.run(); 508 _globalState.topEventLoop.run();
500 break; 509 break;
501 case 'log': 510 case 'log':
502 _log(msg['msg']); 511 _log(msg['msg']);
(...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after
543 */ 552 */
544 static String _getJSFunctionName(Function f) { 553 static String _getJSFunctionName(Function f) {
545 return JS("String|Null", r"(#['$name'] || #)", f, null); 554 return JS("String|Null", r"(#['$name'] || #)", f, null);
546 } 555 }
547 556
548 /** Create a new JavaScript object instance given its constructor. */ 557 /** Create a new JavaScript object instance given its constructor. */
549 static dynamic _allocate(var ctor) { 558 static dynamic _allocate(var ctor) {
550 return JS("", "new #()", ctor); 559 return JS("", "new #()", ctor);
551 } 560 }
552 561
553 static SendPort spawnFunction(void topLevelFunction()) { 562 static SendPort spawnFunction(void topLevelFunction(message), message) {
554 final name = _getJSFunctionName(topLevelFunction); 563 final name = _getJSFunctionName(topLevelFunction);
555 if (name == null) { 564 if (name == null) {
556 throw new UnsupportedError( 565 throw new UnsupportedError(
557 "only top-level functions can be spawned."); 566 "only top-level functions can be spawned.");
558 } 567 }
559 return spawn(name, null, false); 568 return spawn(name, null, null, message, false, false);
569 }
570
571 static SendPort spawnUri(Uri uri, List<String> args, message) {
572 return spawn(null, uri.path, args, message, false, true);
560 } 573 }
561 574
562 // TODO(sigmund): clean up above, after we make the new API the default: 575 // TODO(sigmund): clean up above, after we make the new API the default:
563 576
564 /// If [uri] is `null` it is replaced with the current script. 577 /// If [uri] is `null` it is replaced with the current script.
565 static spawn(String functionName, String uri, bool isLight) { 578 static SendPort spawn(String functionName, String uri,
579 List<String> args, message,
580 bool isLight, bool isSpawnUri) {
566 // Assume that the compiled version of the Dart file lives just next to the 581 // Assume that the compiled version of the Dart file lives just next to the
567 // dart file. 582 // dart file.
568 // TODO(floitsch): support precompiled version of dart2js output. 583 // TODO(floitsch): support precompiled version of dart2js output.
569 if (uri != null && uri.endsWith(".dart")) uri += ".js"; 584 if (uri != null && uri.endsWith(".dart")) uri += ".js";
570 585
571 Completer<SendPort> completer = new Completer<SendPort>.sync(); 586 Completer<SendPort> completer = new Completer<SendPort>.sync();
572 ReceivePort port = new ReceivePort(); 587 ReceivePort port = new ReceivePort();
573 port.receive((msg, SendPort replyPort) { 588 port.listen((msg) {
574 port.close(); 589 port.close();
575 assert(msg == _SPAWNED_SIGNAL); 590 assert(msg[0] == _SPAWNED_SIGNAL);
576 completer.complete(replyPort); 591 completer.complete(msg[1]);
577 }); 592 });
578 593
579 SendPort signalReply = port.toSendPort(); 594 SendPort signalReply = port.sendPort;
580 595
581 if (_globalState.useWorkers && !isLight) { 596 if (_globalState.useWorkers && !isLight) {
582 _startWorker(functionName, uri, signalReply); 597 _startWorker(functionName, uri, args, message, isSpawnUri, signalReply);
583 } else { 598 } else {
584 _startNonWorker(functionName, uri, signalReply); 599 _startNonWorker(
600 functionName, uri, args, message, isSpawnUri, signalReply);
585 } 601 }
586 return new _BufferingSendPort( 602 return new _BufferingSendPort(
587 _globalState.currentContext.id, completer.future); 603 _globalState.currentContext.id, completer.future);
588 } 604 }
589 605
590 static SendPort _startWorker( 606 static void _startWorker(
591 String functionName, String uri, SendPort replyPort) { 607 String functionName, String uri,
608 List<String> args, message,
609 bool isSpawnUri,
610 SendPort replyPort) {
592 if (_globalState.isWorker) { 611 if (_globalState.isWorker) {
593 _globalState.mainManager.postMessage(_serializeMessage({ 612 _globalState.mainManager.postMessage(_serializeMessage({
594 'command': 'spawn-worker', 613 'command': 'spawn-worker',
595 'functionName': functionName, 614 'functionName': functionName,
615 'args': args,
616 'msg': message,
596 'uri': uri, 617 'uri': uri,
618 'isSpawnUri': isSpawnUri,
597 'replyPort': replyPort})); 619 'replyPort': replyPort}));
598 } else { 620 } else {
599 _spawnWorker(functionName, uri, replyPort); 621 _spawnWorker(functionName, uri, args, message, isSpawnUri, replyPort);
600 } 622 }
601 } 623 }
602 624
603 static SendPort _startNonWorker( 625 static void _startNonWorker(
604 String functionName, String uri, SendPort replyPort) { 626 String functionName, String uri,
627 List<String> args, message,
628 bool isSpawnUri,
629 SendPort replyPort) {
605 // TODO(eub): support IE9 using an iframe -- Dart issue 1702. 630 // TODO(eub): support IE9 using an iframe -- Dart issue 1702.
606 if (uri != null) throw new UnsupportedError( 631 if (uri != null) throw new UnsupportedError(
Lasse Reichstein Nielsen 2013/10/25 12:45:59 Multi-line if needs braces.
floitsch 2013/10/25 13:52:56 Done.
607 "Currently spawnUri is not supported without web workers."); 632 "Currently spawnUri is not supported without web workers.");
608 _globalState.topEventLoop.enqueue(new _IsolateContext(), () { 633 _globalState.topEventLoop.enqueue(new _IsolateContext(), () {
609 final func = _getJSFunctionFromName(functionName); 634 final func = _getJSFunctionFromName(functionName);
610 _startIsolate(func, replyPort); 635 _startIsolate(func, args, message, isSpawnUri, replyPort);
611 }, 'nonworker start'); 636 }, 'nonworker start');
612 } 637 }
613 638
614 static void _startIsolate(Function topLevel, SendPort replyTo) { 639 static void _startIsolate(Function topLevel,
640 List<String> args, message,
641 bool isSpawnUri,
642 SendPort replyTo) {
615 _IsolateContext context = JS_CURRENT_ISOLATE_CONTEXT(); 643 _IsolateContext context = JS_CURRENT_ISOLATE_CONTEXT();
616 Primitives.initializeStatics(context.id); 644 Primitives.initializeStatics(context.id);
617 lazyPort = new ReceivePort(); 645 lazyPort = new ReceivePort();
618 replyTo.send(_SPAWNED_SIGNAL, port.toSendPort()); 646 replyTo.send([_SPAWNED_SIGNAL, lazyPort.sendPort]);
619 topLevel(); 647 if (!isSpawnUri) {
648 topLevel(message);
649 } else if (topLevel is _MainFunctionArgsMessage) {
Lasse Reichstein Nielsen 2013/10/25 12:45:59 If message was null, should we have preferred to c
floitsch 2013/10/25 13:52:56 I prefer providing the argument. This is the expli
650 topLevel(args, message);
651 } else if (topLevel is _MainFunctionArgs) {
652 topLevel(args);
653 } else {
654 topLevel();
655 }
620 } 656 }
621 657
622 /** 658 /**
623 * Spawns an isolate in a worker. [factoryName] is the Javascript constructor 659 * Spawns an isolate in a worker. [factoryName] is the Javascript constructor
624 * name for the isolate entry point class. 660 * name for the isolate entry point class.
625 */ 661 */
626 static void _spawnWorker(functionName, uri, replyPort) { 662 static void _spawnWorker(functionName, String uri,
663 List<String> args, message,
664 bool isSpawnUri,
665 SendPort replyPort) {
666 print("message: $message");
Lasse Reichstein Nielsen 2013/10/25 12:45:59 Debug print. Was that mine?
floitsch 2013/10/25 13:52:56 probably mine.
627 if (uri == null) uri = thisScript; 667 if (uri == null) uri = thisScript;
628 final worker = JS('var', 'new Worker(#)', uri); 668 final worker = JS('var', 'new Worker(#)', uri);
629 669
630 var processWorkerMessageTrampoline = 670 var processWorkerMessageTrampoline =
631 JS('', 'function(e) { #(#, e); }', 671 JS('', 'function(e) { #(#, e); }',
632 DART_CLOSURE_TO_JS(_processWorkerMessage), 672 DART_CLOSURE_TO_JS(_processWorkerMessage),
633 worker); 673 worker);
634 JS('void', '#.onmessage = #', worker, processWorkerMessageTrampoline); 674 JS('void', '#.onmessage = #', worker, processWorkerMessageTrampoline);
635 var workerId = _globalState.nextManagerId++; 675 var workerId = _globalState.nextManagerId++;
636 // We also store the id on the worker itself so that we can unregister it. 676 // We also store the id on the worker itself so that we can unregister it.
637 workerIds[worker] = workerId; 677 workerIds[worker] = workerId;
638 _globalState.managers[workerId] = worker; 678 _globalState.managers[workerId] = worker;
639 JS('void', '#.postMessage(#)', worker, _serializeMessage({ 679 JS('void', '#.postMessage(#)', worker, _serializeMessage({
640 'command': 'start', 680 'command': 'start',
641 'id': workerId, 681 'id': workerId,
642 // Note: we serialize replyPort twice because the child worker needs to 682 // Note: we serialize replyPort twice because the child worker needs to
643 // first deserialize the worker id, before it can correctly deserialize 683 // first deserialize the worker id, before it can correctly deserialize
644 // the port (port deserialization is sensitive to what is the current 684 // the port (port deserialization is sensitive to what is the current
645 // workerId). 685 // workerId).
646 'replyTo': _serializeMessage(replyPort), 686 'replyTo': _serializeMessage(replyPort),
687 'args': args,
688 'msg': _serializeMessage(message),
689 'isSpawnUri': isSpawnUri,
647 'functionName': functionName })); 690 'functionName': functionName }));
648 } 691 }
649 } 692 }
650 693
651 /******************************************************** 694 /********************************************************
652 Inserted from lib/isolate/dart2js/ports.dart 695 Inserted from lib/isolate/dart2js/ports.dart
653 ********************************************************/ 696 ********************************************************/
654 697
655 /** Common functionality to all send ports. */ 698 /** Common functionality to all send ports. */
656 class _BaseSendPort implements SendPort { 699 class _BaseSendPort implements SendPort {
657 /** Id for the destination isolate. */ 700 /** Id for the destination isolate. */
658 final int _isolateId; 701 final int _isolateId;
659 702
660 const _BaseSendPort(this._isolateId); 703 const _BaseSendPort(this._isolateId);
661 704
662 void _checkReplyTo(SendPort replyTo) { 705 void _checkReplyTo(SendPort replyTo) {
663 if (replyTo != null 706 if (replyTo != null
664 && replyTo is! _NativeJsSendPort 707 && replyTo is! _NativeJsSendPort
665 && replyTo is! _WorkerSendPort 708 && replyTo is! _WorkerSendPort
666 && replyTo is! _BufferingSendPort) { 709 && replyTo is! _BufferingSendPort) {
667 throw new Exception("SendPort.send: Illegal replyTo port type"); 710 throw new Exception("SendPort.send: Illegal replyTo port type");
668 } 711 }
669 } 712 }
670 713
671 Future call(var message) { 714 void send(var message);
672 final completer = new Completer();
673 final port = new ReceivePortImpl();
674 send(message, port.toSendPort());
675 port.receive((value, ignoreReplyTo) {
676 port.close();
677 if (value is Exception) {
678 completer.completeError(value);
679 } else {
680 completer.complete(value);
681 }
682 });
683 return completer.future;
684 }
685
686 void send(var message, [SendPort replyTo]);
687 bool operator ==(var other); 715 bool operator ==(var other);
688 int get hashCode; 716 int get hashCode;
689 } 717 }
690 718
691 /** A send port that delivers messages in-memory via native JavaScript calls. */ 719 /** A send port that delivers messages in-memory via native JavaScript calls. */
692 class _NativeJsSendPort extends _BaseSendPort implements SendPort { 720 class _NativeJsSendPort extends _BaseSendPort implements SendPort {
693 final ReceivePortImpl _receivePort; 721 final ReceivePortImpl _receivePort;
694 722
695 const _NativeJsSendPort(this._receivePort, int isolateId) : super(isolateId); 723 const _NativeJsSendPort(this._receivePort, int isolateId) : super(isolateId);
696 724
697 void send(var message, [SendPort replyTo = null]) { 725 void send(var message, [SendPort replyTo]) {
698 _waitForPendingPorts([message, replyTo], () { 726 _waitForPendingPorts(message, () {
699 _checkReplyTo(replyTo);
700 // Check that the isolate still runs and the port is still open 727 // Check that the isolate still runs and the port is still open
701 final isolate = _globalState.isolates[_isolateId]; 728 final isolate = _globalState.isolates[_isolateId];
702 if (isolate == null) return; 729 if (isolate == null) return;
703 if (_receivePort._callback == null) return; 730 if (_receivePort._controller.isClosed) return;
704 731
705 // We force serialization/deserialization as a simple way to ensure 732 // We force serialization/deserialization as a simple way to ensure
706 // isolate communication restrictions are respected between isolates that 733 // isolate communication restrictions are respected between isolates that
707 // live in the same worker. [_NativeJsSendPort] delivers both messages 734 // live in the same worker. [_NativeJsSendPort] delivers both messages
708 // from the same worker and messages from other workers. In particular, 735 // from the same worker and messages from other workers. In particular,
709 // messages sent from a worker via a [_WorkerSendPort] are received at 736 // messages sent from a worker via a [_WorkerSendPort] are received at
710 // [_processWorkerMessage] and forwarded to a native port. In such cases, 737 // [_processWorkerMessage] and forwarded to a native port. In such cases,
711 // here we'll see [_globalState.currentContext == null]. 738 // here we'll see [_globalState.currentContext == null].
712 final shouldSerialize = _globalState.currentContext != null 739 final shouldSerialize = _globalState.currentContext != null
713 && _globalState.currentContext.id != _isolateId; 740 && _globalState.currentContext.id != _isolateId;
714 var msg = message; 741 var msg = message;
715 var reply = replyTo;
716 if (shouldSerialize) { 742 if (shouldSerialize) {
717 msg = _serializeMessage(msg); 743 msg = _serializeMessage(msg);
718 reply = _serializeMessage(reply);
719 } 744 }
720 _globalState.topEventLoop.enqueue(isolate, () { 745 _globalState.topEventLoop.enqueue(isolate, () {
721 if (_receivePort._callback != null) { 746 if (!_receivePort._controller.isClosed) {
722 if (shouldSerialize) { 747 if (shouldSerialize) {
723 msg = _deserializeMessage(msg); 748 msg = _deserializeMessage(msg);
724 reply = _deserializeMessage(reply);
725 } 749 }
726 _receivePort._callback(msg, reply); 750 _receivePort._controller.add(msg);
727 } 751 }
728 }, 'receive $message'); 752 }, 'receive $message');
729 }); 753 });
730 } 754 }
731 755
732 bool operator ==(var other) => (other is _NativeJsSendPort) && 756 bool operator ==(var other) => (other is _NativeJsSendPort) &&
733 (_receivePort == other._receivePort); 757 (_receivePort == other._receivePort);
734 758
735 int get hashCode => _receivePort._id; 759 int get hashCode => _receivePort._id;
736 } 760 }
737 761
738 /** A send port that delivers messages via worker.postMessage. */ 762 /** A send port that delivers messages via worker.postMessage. */
739 // TODO(eub): abstract this for iframes. 763 // TODO(eub): abstract this for iframes.
740 class _WorkerSendPort extends _BaseSendPort implements SendPort { 764 class _WorkerSendPort extends _BaseSendPort implements SendPort {
741 final int _workerId; 765 final int _workerId;
742 final int _receivePortId; 766 final int _receivePortId;
743 767
744 const _WorkerSendPort(this._workerId, int isolateId, this._receivePortId) 768 const _WorkerSendPort(this._workerId, int isolateId, this._receivePortId)
745 : super(isolateId); 769 : super(isolateId);
746 770
747 void send(var message, [SendPort replyTo = null]) { 771 void send(var message, [SendPort replyTo]) {
748 _waitForPendingPorts([message, replyTo], () { 772 _waitForPendingPorts(message, () {
749 _checkReplyTo(replyTo);
750 final workerMessage = _serializeMessage({ 773 final workerMessage = _serializeMessage({
751 'command': 'message', 774 'command': 'message',
752 'port': this, 775 'port': this,
753 'msg': message, 776 'msg': message});
754 'replyTo': replyTo});
755 777
756 if (_globalState.isWorker) { 778 if (_globalState.isWorker) {
757 // Communication from one worker to another go through the 779 // Communication from one worker to another go through the
758 // main worker. 780 // main worker.
759 _globalState.mainManager.postMessage(workerMessage); 781 _globalState.mainManager.postMessage(workerMessage);
760 } else { 782 } else {
761 // Deliver the message only if the worker is still alive. 783 // Deliver the message only if the worker is still alive.
762 /* Worker */ var manager = _globalState.managers[_workerId]; 784 /* Worker */ var manager = _globalState.managers[_workerId];
763 if (manager != null) { 785 if (manager != null) {
764 JS('void', '#.postMessage(#)', manager, workerMessage); 786 JS('void', '#.postMessage(#)', manager, workerMessage);
(...skipping 34 matching lines...) Expand 10 before | Expand all | Expand 10 after
799 821
800 /** Pending messages (and reply ports). */ 822 /** Pending messages (and reply ports). */
801 List pending; 823 List pending;
802 824
803 _BufferingSendPort(isolateId, this._futurePort) 825 _BufferingSendPort(isolateId, this._futurePort)
804 : super(isolateId), _id = _idCount, pending = [] { 826 : super(isolateId), _id = _idCount, pending = [] {
805 _idCount++; 827 _idCount++;
806 _futurePort.then((p) { 828 _futurePort.then((p) {
807 _port = p; 829 _port = p;
808 for (final item in pending) { 830 for (final item in pending) {
809 p.send(item['message'], item['replyTo']); 831 p.send(item);
810 } 832 }
811 pending = null; 833 pending = null;
812 }); 834 });
813 } 835 }
814 836
815 _BufferingSendPort.fromPort(isolateId, this._port) 837 _BufferingSendPort.fromPort(isolateId, this._port)
816 : super(isolateId), _id = _idCount { 838 : super(isolateId), _id = _idCount {
817 _idCount++; 839 _idCount++;
818 } 840 }
819 841
820 void send(var message, [SendPort replyTo]) { 842 void send(var message, [SendPort replyTo]) {
821 if (_port != null) { 843 if (_port != null) {
822 _port.send(message, replyTo); 844 _port.send(message, replyTo);
823 } else { 845 } else {
824 pending.add({'message': message, 'replyTo': replyTo}); 846 pending.add(message);
825 } 847 }
826 } 848 }
827 849
828 bool operator ==(var other) => 850 bool operator ==(var other) =>
829 other is _BufferingSendPort && _id == other._id; 851 other is _BufferingSendPort && _id == other._id;
830 int get hashCode => _id; 852 int get hashCode => _id;
831 } 853 }
832 854
833 /** Implementation of a multi-use [ReceivePort] on top of JavaScript. */ 855 /** Implementation of a multi-use [ReceivePort] on top of JavaScript. */
834 class ReceivePortImpl implements ReceivePort { 856 class ReceivePortImpl extends Stream implements ReceivePort {
835 int _id;
836 Function _callback;
837 static int _nextFreeId = 1; 857 static int _nextFreeId = 1;
858 final int _id;
859 StreamController _controller;
838 860
839 ReceivePortImpl() 861 ReceivePortImpl()
840 : _id = _nextFreeId++ { 862 : _id = _nextFreeId++ {
863 _controller = new StreamController(onCancel: close, sync: true);
841 _globalState.currentContext.register(_id, this); 864 _globalState.currentContext.register(_id, this);
842 } 865 }
843 866
844 void receive(void onMessage(var message, SendPort replyTo)) { 867 StreamSubscription listen(void onData(var event),
845 _callback = onMessage; 868 {Function onError,
869 void onDone(),
870 bool cancelOnError}) {
871 return _controller.stream.listen(onData, onError: onError, onDone: onDone,
872 cancelOnError: cancelOnError);
846 } 873 }
847 874
848 void close() { 875 void close() {
849 _callback = null; 876 if (_controller.isClosed) return;
877 _controller.close();
850 _globalState.currentContext.unregister(_id); 878 _globalState.currentContext.unregister(_id);
851 } 879 }
852 880
853 SendPort toSendPort() { 881 SendPort get sendPort {
854 return new _NativeJsSendPort(this, _globalState.currentContext.id); 882 return new _NativeJsSendPort(this, _globalState.currentContext.id);
855 } 883 }
856 } 884 }
857 885
858 /** Wait until all ports in a message are resolved. */ 886 /** Wait until all ports in a message are resolved. */
859 _waitForPendingPorts(var message, void callback()) { 887 _waitForPendingPorts(var message, void callback()) {
860 final finder = new _PendingSendPortFinder(); 888 final finder = new _PendingSendPortFinder();
861 finder.traverse(message); 889 finder.traverse(message);
862 Future.wait(finder.ports).then((_) => callback()); 890 Future.wait(finder.ports).then((_) => callback());
863 } 891 }
864 892
865 893
866 /** Visitor that finds all unresolved [SendPort]s in a message. */ 894 /** Visitor that finds all unresolved [SendPort]s in a message. */
867 class _PendingSendPortFinder extends _MessageTraverser { 895 class _PendingSendPortFinder extends _MessageTraverser {
868 List<Future<SendPort>> ports; 896 List<Future<SendPort>> ports;
869 _PendingSendPortFinder() : super(), ports = [] { 897 _PendingSendPortFinder() : super(), ports = [] {
870 _visited = new _JsVisitedMap(); 898 _visited = new _JsVisitedMap();
871 } 899 }
872 900
873 visitPrimitive(x) {} 901 visitPrimitive(x) {}
874 902
875 visitList(List list) { 903 visitList(List list) {
876 final seen = _visited[list]; 904 final seen = _visited[list];
877 if (seen != null) return; 905 if (seen != null) return;
878 _visited[list] = true; 906 _visited[list] = true;
879 // TODO(sigmund): replace with the following: (bug #1660) 907 list.forEach(_dispatch);
880 // list.forEach(_dispatch);
881 list.forEach((e) => _dispatch(e));
882 } 908 }
883 909
884 visitMap(Map map) { 910 visitMap(Map map) {
885 final seen = _visited[map]; 911 final seen = _visited[map];
886 if (seen != null) return; 912 if (seen != null) return;
887 913
888 _visited[map] = true; 914 _visited[map] = true;
889 // TODO(sigmund): replace with the following: (bug #1660) 915 map.values.forEach(_dispatch);
890 // map.values.forEach(_dispatch);
891 map.values.forEach((e) => _dispatch(e));
892 } 916 }
893 917
894 visitSendPort(var port) { 918 visitSendPort(var port) {
895 if (port is _BufferingSendPort && port._port == null) { 919 if (port is _BufferingSendPort && port._port == null) {
896 ports.add(port._futurePort); 920 ports.add(port._futurePort);
897 } 921 }
898 } 922 }
899 } 923 }
900 924
901 /******************************************************** 925 /********************************************************
(...skipping 441 matching lines...) Expand 10 before | Expand all | Expand 10 after
1343 _handle = null; 1367 _handle = null;
1344 } else { 1368 } else {
1345 throw new UnsupportedError("Canceling a timer."); 1369 throw new UnsupportedError("Canceling a timer.");
1346 } 1370 }
1347 } 1371 }
1348 1372
1349 bool get isActive => _handle != null; 1373 bool get isActive => _handle != null;
1350 } 1374 }
1351 1375
1352 bool hasTimer() => JS('', '#.setTimeout', globalThis) != null; 1376 bool hasTimer() => JS('', '#.setTimeout', globalThis) != null;
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698