Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1361)

Side by Side Diff: pkg/fletchc/lib/src/hub/hub_main.dart

Issue 1659163007: Rename fletch -> dartino (Closed) Base URL: https://github.com/dartino/sdk.git@master
Patch Set: address comments Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « pkg/fletchc/lib/src/hub/exit_codes.dart ('k') | pkg/fletchc/lib/src/hub/sentence_parser.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright (c) 2015, the Dartino project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE.md file.
4
5 library fletchc.hub_main;
6
7 import 'dart:collection' show
8 Queue;
9
10 import 'dart:io' hide
11 exitCode,
12 stderr,
13 stdin,
14 stdout;
15
16 import 'dart:io' as io;
17
18 import 'dart:async' show
19 Completer,
20 Stream,
21 StreamController,
22 StreamSubscription,
23 StreamTransformer;
24
25 import 'dart:typed_data' show
26 ByteData,
27 Endianness,
28 TypedData,
29 Uint8List;
30
31 import 'dart:convert' show
32 UTF8;
33
34 import 'dart:isolate' show
35 Isolate,
36 ReceivePort,
37 SendPort;
38
39 import '../zone_helper.dart' show
40 acknowledgeControlMessages,
41 runGuarded;
42
43 import 'exit_codes.dart' show
44 COMPILER_EXITCODE_CRASH,
45 DART_VM_EXITCODE_COMPILE_TIME_ERROR;
46
47 import 'client_commands.dart' show
48 ClientCommandCode,
49 handleSocketErrors;
50
51 import '../worker/worker_main.dart' show
52 workerMain;
53
54 import '../verbs/infrastructure.dart';
55
56 import 'sentence_parser.dart' show
57 Sentence,
58 parseSentence;
59
60 import '../diagnostic.dart' show
61 InputError,
62 throwInternalError;
63
64 import '../shared_command_infrastructure.dart' show
65 CommandBuffer,
66 CommandTransformerBuilder,
67 commandEndianness,
68 headerSize,
69 toUint8ListView;
70
71 import '../worker/developer.dart' show
72 allocateWorker,
73 combineTasks,
74 configFileUri;
75
76 import 'session_manager.dart' show
77 lookupSession;
78
79 import '../verbs/create_verb.dart' show
80 CreateSessionTask;
81
82 import '../please_report_crash.dart' show
83 crashReportRequested,
84 requestBugReportOnOtherCrashMessage;
85
86 import '../verbs/options.dart' show
87 Options;
88
89 import '../console_print.dart' show
90 printToConsole;
91
92 import '../please_report_crash.dart' show
93 stringifyError;
94
95 Function gracefulShutdown;
96
97 final List<String> mainArguments = <String>[];
98
99 class ClientCommandTransformerBuilder
100 extends CommandTransformerBuilder<ClientCommand> {
101 ClientCommand makeCommand(int commandCode, ByteData payload) {
102 ClientCommandCode code = ClientCommandCode.values[commandCode];
103 switch (code) {
104 case ClientCommandCode.Arguments:
105 return new ClientCommand(code, decodeArgumentsCommand(payload));
106
107 case ClientCommandCode.Stdin:
108 int length = payload.getUint32(0, commandEndianness);
109 return new ClientCommand(code, toUint8ListView(payload, 4, length));
110
111 case ClientCommandCode.Signal:
112 int signal = payload.getUint32(0, commandEndianness);
113 return new ClientCommand(code, signal);
114
115 default:
116 return null;
117 }
118 }
119
120 List<String> decodeArgumentsCommand(ByteData view) {
121 int offset = 0;
122 int argc = view.getUint32(offset, commandEndianness);
123 offset += 4;
124 List<String> argv = <String>[];
125 for (int i = 0; i < argc; i++) {
126 int length = view.getUint32(offset, commandEndianness);
127 offset += 4;
128 argv.add(UTF8.decode(toUint8ListView(view, offset, length)));
129 offset += length;
130 }
131 return argv;
132 }
133 }
134
135 // Class for sending client commands from the hub (main isolate) to the
136 // fletch c++ client.
137 class ClientCommandSender extends CommandSender {
138 final Sink<List<int>> sink;
139
140 ClientCommandSender(this.sink);
141
142 void sendExitCode(int exitCode) {
143 new CommandBuffer<ClientCommandCode>()
144 ..addUint32(exitCode)
145 ..sendOn(sink, ClientCommandCode.ExitCode);
146 }
147
148 void sendDataCommand(ClientCommandCode code, List<int> data) {
149 new CommandBuffer<ClientCommandCode>()
150 ..addUint32(data.length)
151 ..addUint8List(data)
152 ..sendOn(sink, code);
153 }
154
155 void sendClose() {
156 throwInternalError("Client (C++) doesn't support ClientCommandCode.Close.");
157 }
158
159 void sendEventLoopStarted() {
160 throwInternalError(
161 "Client (C++) doesn't support ClientCommandCode.EventLoopStarted.");
162 }
163 }
164
165 Future main(List<String> arguments) async {
166 // When running this program, -Dfletch.version must be provided on the Dart
167 // VM command line.
168 assert(const String.fromEnvironment('fletch.version') != null);
169
170 mainArguments.addAll(arguments);
171 configFileUri = Uri.base.resolve(arguments.first);
172 File configFile = new File.fromUri(configFileUri);
173 Directory tmpdir = Directory.systemTemp.createTempSync("fletch_client");
174
175 File socketFile = new File("${tmpdir.path}/socket");
176 try {
177 socketFile.deleteSync();
178 } on FileSystemException catch (e) {
179 // Ignored. There's no way to check if a socket file exists.
180 }
181
182 ServerSocket server;
183
184 Completer shutdown = new Completer();
185
186 gracefulShutdown = () {
187 try {
188 socketFile.deleteSync();
189 } catch (e) {
190 print("Unable to delete ${socketFile.path}: $e");
191 }
192
193 try {
194 tmpdir.deleteSync(recursive: true);
195 } catch (e) {
196 print("Unable to delete ${tmpdir.path}: $e");
197 }
198
199 if (server != null) {
200 server.close();
201 }
202 if (!shutdown.isCompleted) {
203 shutdown.complete();
204 }
205 };
206
207 void handleSignal(StreamSubscription<ProcessSignal> subscription) {
208 subscription.onData((ProcessSignal signal) {
209 // Cancel the subscription to restore default signal handler.
210 subscription.cancel();
211 print("Received signal $signal");
212 gracefulShutdown();
213 // 0 means kill the current process group (including this process, which
214 // will now die as we restored the default signal handler above). In
215 // addition, killing this process ensures that any processes waiting for
216 // it will observe that it was killed due to a signal. There's no way to
217 // fake that status using exit.
218 Process.killPid(0, signal);
219 });
220 shutdown.future.then((_) {
221 subscription.cancel();
222 });
223 }
224
225 // When receiving SIGTERM or gracefully shut down.
226 handleSignal(ProcessSignal.SIGTERM.watch().listen(null));
227 handleSignal(ProcessSignal.SIGINT.watch().listen(null));
228
229 server = await ServerSocket.bind(InternetAddress.LOOPBACK_IP_V4, 0);
230
231 // Write the TCP port to a config file. This lets multiple command line
232 // programs share this persistent driver process, which in turn eliminates
233 // start up overhead.
234 configFile.writeAsStringSync("${server.port}", flush: true);
235
236 // Print the temporary directory so the launching process knows where to
237 // connect, and that the socket is ready.
238 print(server.port);
239
240 IsolatePool pool = new IsolatePool(workerMain);
241 try {
242 await server.listen((Socket controlSocket) {
243 handleClient(pool, handleSocketErrors(controlSocket, "controlSocket"));
244 }).asFuture();
245 } finally {
246 gracefulShutdown();
247 }
248 }
249
250 Future<Null> handleClient(IsolatePool pool, Socket controlSocket) async {
251 ClientLogger log = ClientLogger.allocate();
252
253 ClientConnection clientConnection =
254 new ClientConnection(controlSocket, log)..start();
255 List<String> arguments = await clientConnection.arguments;
256 log.gotArguments(arguments);
257
258 await handleVerb(arguments, clientConnection, pool);
259 }
260
261 Future<Null> handleVerb(
262 List<String> arguments,
263 ClientConnection clientConnection,
264 IsolatePool pool) async {
265 crashReportRequested = false;
266
267 Future<int> performVerb() async {
268 clientConnection.parseArguments(arguments);
269 String sessionName = clientConnection.sentence.sessionName;
270 UserSession session;
271 SharedTask initializer;
272 if (sessionName != null) {
273 session = lookupSession(sessionName);
274 if (session == null) {
275 session = await createSession(sessionName, () => allocateWorker(pool));
276 initializer = new CreateSessionTask(
277 sessionName, null, clientConnection.sentence.base, configFileUri);
278 }
279 }
280 ClientVerbContext context = new ClientVerbContext(
281 clientConnection, pool, session, initializer: initializer);
282 return await clientConnection.sentence.performVerb(context);
283 }
284
285 int exitCode = await runGuarded(
286 performVerb,
287 printLineOnStdout: clientConnection.printLineOnStdout,
288 handleLateError: clientConnection.log.error)
289 .catchError(
290 clientConnection.reportErrorToClient, test: (e) => e is InputError)
291 .catchError((error, StackTrace stackTrace) {
292 if (!crashReportRequested) {
293 clientConnection.printLineOnStderr(
294 requestBugReportOnOtherCrashMessage);
295 crashReportRequested = true;
296 }
297 clientConnection.printLineOnStderr('$error');
298 if (stackTrace != null) {
299 clientConnection.printLineOnStderr('$stackTrace');
300 }
301 return COMPILER_EXITCODE_CRASH;
302 });
303 clientConnection.exit(exitCode);
304 }
305
306 class ClientVerbContext extends VerbContext {
307 SharedTask initializer;
308
309 ClientVerbContext(
310 ClientConnection clientConnection,
311 IsolatePool pool,
312 UserSession session,
313 {this.initializer})
314 : super(clientConnection, pool, session);
315
316 ClientVerbContext copyWithSession(UserSession session) {
317 return new ClientVerbContext(clientConnection, pool, session);
318 }
319
320 Future<int> performTaskInWorker(SharedTask task) async {
321 if (session.worker.isolate.wasKilled) {
322 throwInternalError(
323 "session ${session.name}: worker isolate terminated unexpectedly");
324 }
325 if (session.hasActiveWorkerTask) {
326 throwFatalError(DiagnosticKind.busySession, sessionName: session.name);
327 }
328 session.hasActiveWorkerTask = true;
329 return session.worker.performTask(
330 combineTasks(initializer, task), clientConnection, userSession: session)
331 .whenComplete(() {
332 session.hasActiveWorkerTask = false;
333 });
334 }
335 }
336
337 /// Handles communication with the Fletch C++ client.
338 class ClientConnection {
339 /// Socket used for receiving and sending commands from/to the Fletch C++
340 /// client.
341 final Socket socket;
342
343 /// Controller used to send commands to the from the ClientConnection to
344 /// anyone listening on ClientConnection.commands (see [commands] below). The
345 /// only listener as of now is the WorkerConnection which typically forwards
346 /// the commands to the worker isolate.
347 final StreamController<ClientCommand> controller =
348 new StreamController<ClientCommand>();
349
350 final ClientLogger log;
351
352 /// The commandSender is used to send commands back to the Fletch C++ client.
353 ClientCommandSender commandSender;
354
355 StreamSubscription<ClientCommand> subscription;
356 Completer<Null> completer;
357
358 Completer<List<String>> argumentsCompleter = new Completer<List<String>>();
359
360 /// The analysed version of the request from the client.
361 /// Updated by [parseArguments].
362 AnalyzedSentence sentence;
363
364 /// Path to the fletch VM. Updated by [parseArguments].
365 String fletchVm;
366
367 ClientConnection(this.socket, this.log);
368
369 /// Stream of commands from the Fletch C++ client to the hub (main isolate).
370 /// The commands are typically forwarded to a worker isolate, see
371 /// handleClientCommand.
372 Stream<ClientCommand> get commands => controller.stream;
373
374 /// Completes when [endSession] is called.
375 Future<Null> get done => completer.future;
376
377 /// Completes with the command-line arguments from the client.
378 Future<List<String>> get arguments => argumentsCompleter.future;
379
380 /// Start processing commands from the client.
381 void start() {
382 // Setup a command sender used to send responses from the hub (main isolate)
383 // back to the Fletch C++ client.
384 commandSender = new ClientCommandSender(socket);
385
386 // Setup a listener for handling commands coming from the Fletch C++
387 // client.
388 StreamTransformer<List<int>, ClientCommand> transformer =
389 new ClientCommandTransformerBuilder().build();
390 subscription = socket.transform(transformer).listen(null);
391 subscription
392 ..onData(handleClientCommand)
393 ..onError(handleClientCommandError)
394 ..onDone(handleClientCommandsDone);
395 completer = new Completer<Null>();
396 }
397
398 void handleClientCommand(ClientCommand command) {
399 if (command.code == ClientCommandCode.Arguments) {
400 // This intentionally throws if arguments are sent more than once.
401 argumentsCompleter.complete(command.data);
402 } else {
403 sendCommandToWorker(command);
404 }
405 }
406
407 void sendCommandToWorker(ClientCommand command) {
408 // TODO(ahe): It is a bit weird that this method is on the client proxy.
409 // Ideally, this would be a method on WorkerConnection. However the client
410 // is created before the WorkerConnection which is not created until/if
411 // needed. The WorkerConnection will start listening to the client's
412 // commands when attaching, see WorkerConnection.attachClient.
413 controller.add(command);
414 }
415
416 void handleClientCommandError(error, StackTrace trace) {
417 print(stringifyError(error, trace));
418 completer.completeError(error, trace);
419 // Cancel the subscription if an error occurred, this prevents
420 // [handleCommandsDone] from being called and attempt to complete
421 // [completer].
422 subscription.cancel();
423 }
424
425 void handleClientCommandsDone() {
426 completer.complete();
427 }
428
429 // Send a command back to the Fletch C++ client.
430 void sendCommandToClient(ClientCommand command) {
431 switch (command.code) {
432 case ClientCommandCode.Stdout:
433 commandSender.sendStdoutBytes(command.data);
434 break;
435
436 case ClientCommandCode.Stderr:
437 commandSender.sendStderrBytes(command.data);
438 break;
439
440 case ClientCommandCode.ExitCode:
441 commandSender.sendExitCode(command.data);
442 break;
443
444 default:
445 throwInternalError("Unexpected command: $command");
446 }
447 }
448
449 void endSession() {
450 socket.flush().then((_) {
451 socket.close();
452 });
453 }
454
455 void printLineOnStderr(String line) {
456 commandSender.sendStderrBytes(UTF8.encode("$line\n"));
457 }
458
459 void printLineOnStdout(String line) {
460 commandSender.sendStdoutBytes(UTF8.encode('$line\n'));
461 }
462
463 void exit(int exitCode) {
464 if (exitCode == null) {
465 exitCode = COMPILER_EXITCODE_CRASH;
466 try {
467 throwInternalError("Internal error: exitCode is null");
468 } on InputError catch (error, stackTrace) {
469 // We can't afford to throw an error here as it will take down the
470 // entire process.
471 exitCode = reportErrorToClient(error, stackTrace);
472 }
473 }
474 commandSender.sendExitCode(exitCode);
475 endSession();
476 }
477
478 AnalyzedSentence parseArguments(List<String> arguments) {
479 Options options = Options.parse(arguments);
480 Sentence sentence =
481 parseSentence(options.nonOptionArguments, includesProgramName: true);
482 // [programName] is the canonicalized absolute path to the fletch
483 // executable (the C++ program).
484 String programName = sentence.programName;
485 String fletchVm = "$programName-vm";
486 this.sentence = analyzeSentence(sentence, options);
487 this.fletchVm = fletchVm;
488 return this.sentence;
489 }
490
491 int reportErrorToClient(InputError error, StackTrace stackTrace) {
492 bool isInternalError = error.kind == DiagnosticKind.internalError;
493 if (isInternalError && !crashReportRequested) {
494 printLineOnStderr(requestBugReportOnOtherCrashMessage);
495 crashReportRequested = true;
496 }
497 printLineOnStderr(error.asDiagnostic().formatMessage());
498 if (isInternalError) {
499 printLineOnStderr('$stackTrace');
500 return COMPILER_EXITCODE_CRASH;
501 } else {
502 return DART_VM_EXITCODE_COMPILE_TIME_ERROR;
503 }
504 }
505 }
506
507 /// The WorkerConnection represents a worker isolate in the hub (main isolate).
508 /// Ie. it is the hub's object for communicating with a worker isolate.
509 class WorkerConnection {
510 /// The worker isolate.
511 final ManagedIsolate isolate;
512
513 /// A port used to send commands to the worker isolate.
514 SendPort sendPort;
515
516 /// A port used to read commands from the worker isolate.
517 ReceivePort receivePort;
518
519 /// workerCommands is an iterator over all the commands coming from the
520 /// worker isolate. These are typically the outbound messages destined for
521 /// the Fletch C++ client.
522 /// It iterates over the data coming on the receivePort.
523 StreamIterator<ClientCommand> workerCommands;
524
525 /// When true, the worker can be shutdown by sending it a
526 /// ClientCommandCode.Signal command. Otherwise, it must be killed.
527 bool eventLoopStarted = false;
528
529 /// Subscription for errors from [isolate].
530 StreamSubscription errorSubscription;
531
532 bool crashReportRequested = false;
533
534 WorkerConnection(this.isolate);
535
536 /// Begin a session with the worker isolate.
537 Future<Null> beginSession() async {
538 errorSubscription = isolate.errors.listen(null);
539 errorSubscription.pause();
540 receivePort = isolate.beginSession();
541 // Setup the workerCommands iterator using a stream converting the
542 // incoming data to [ClientCommand]s.
543 Stream<ClientCommand> workerCommandStream = receivePort.map(
544 (message) => new ClientCommand(
545 ClientCommandCode.values[message[0]], message[1]));
546 workerCommands = new StreamIterator<ClientCommand>(workerCommandStream);
547 if (!await workerCommands.moveNext()) {
548 // The worker must have been killed, or died in some other way.
549 // TODO(ahe): Add this assertion: assert(isolate.wasKilled);
550 endSession();
551 return;
552 }
553 ClientCommand command = workerCommands.current;
554 assert(command.code == ClientCommandCode.SendPort);
555 assert(command.data != null);
556 sendPort = command.data;
557 }
558
559 /// Attach to a fletch C++ client and forward commands to the worker isolate,
560 /// and vice versa. The returned future normally completes when the worker
561 /// isolate sends ClientCommandCode.ClosePort, or if the isolate is killed due
562 /// to ClientCommandCode.Signal arriving through client.commands.
563 Future<int> attachClient(
564 ClientConnection clientConnection,
565 UserSession userSession) async {
566
567 // Method for handling commands coming from the client. The commands are
568 // typically forwarded to the worker isolate.
569 handleCommandsFromClient(ClientCommand command) {
570 if (command.code == ClientCommandCode.Signal && !eventLoopStarted) {
571 if (userSession != null) {
572 userSession.kill(clientConnection.printLineOnStderr);
573 } else {
574 isolate.kill();
575 }
576 receivePort.close();
577 } else {
578 sendPort.send([command.code.index, command.data]);
579 }
580 }
581
582 // Method for handling commands coming back from the worker isolate.
583 // It typically forwards them to the Fletch C++ client via the
584 // clientConnection.
585 Future<int> handleCommandsFromWorker(
586 ClientConnection clientConnection) async {
587 int exitCode = COMPILER_EXITCODE_CRASH;
588 while (await workerCommands.moveNext()) {
589 ClientCommand command = workerCommands.current;
590 switch (command.code) {
591 case ClientCommandCode.ClosePort:
592 receivePort.close();
593 break;
594
595 case ClientCommandCode.EventLoopStarted:
596 eventLoopStarted = true;
597 break;
598
599 case ClientCommandCode.ExitCode:
600 exitCode = command.data;
601 break;
602
603 default:
604 clientConnection.sendCommandToClient(command);
605 break;
606 }
607 }
608 return exitCode;
609 }
610
611 eventLoopStarted = false;
612 crashReportRequested = false;
613 errorSubscription.onData((errorList) {
614 String error = errorList[0];
615 String stackTrace = errorList[1];
616 if (!crashReportRequested) {
617 clientConnection.printLineOnStderr(requestBugReportOnOtherCrashMessage);
618 crashReportRequested = true;
619 }
620 clientConnection.printLineOnStderr(error);
621 if (stackTrace != null) {
622 clientConnection.printLineOnStderr(stackTrace);
623 }
624 if (userSession != null) {
625 userSession.kill(clientConnection.printLineOnStderr);
626 } else {
627 isolate.kill();
628 }
629 receivePort.close();
630 });
631 errorSubscription.resume();
632
633 // Start listening for commands coming from the Fletch C++ client (via
634 // clientConnection).
635 // TODO(ahe): Add onDone event handler to detach the client.
636 clientConnection.commands.listen(handleCommandsFromClient);
637
638 // Start processing commands coming from the worker.
639 int exitCode = await handleCommandsFromWorker(clientConnection);
640
641 errorSubscription.pause();
642 return exitCode;
643 }
644
645 void endSession() {
646 receivePort.close();
647 isolate.endSession();
648 }
649
650 Future<Null> detachClient() async {
651 if (isolate.wasKilled) {
652 // Setting these to null will ensure that [attachClient] causes a crash
653 // if called after isolate was killed.
654 errorSubscription = null;
655 receivePort = null;
656 workerCommands = null;
657 sendPort = null;
658
659 // TODO(ahe): The session is dead. Tell the user about this.
660 return null;
661 }
662 // TODO(ahe): Perform the reverse of attachClient here.
663 await beginSession();
664 }
665
666 Future<int> performTask(
667 SharedTask task,
668 ClientConnection clientConnection,
669 {
670 UserSession userSession,
671 /// End this session and return this isolate to the pool.
672 bool endSession: false}) async {
673 ClientLogger log = clientConnection.log;
674
675 clientConnection.done.catchError((error, StackTrace stackTrace) {
676 log.error(error, stackTrace);
677 }).then((_) {
678 log.done();
679 });
680
681 // Indirectly send the task to be performed to the worker isolate via the
682 // clientConnection.
683 clientConnection.sendCommandToWorker(
684 new ClientCommand(ClientCommandCode.PerformTask, task));
685
686 // Forward commands between the C++ fletch client [clientConnection], and th e
687 // worker isolate `this`. Also, Intercept the signal command and
688 // potentially kill the isolate (the isolate needs to tell if it is
689 // interuptible or needs to be killed, an example of the latter is, if
690 // compiler is running).
691 int exitCode = await attachClient(clientConnection, userSession);
692 // The verb (which was performed in the worker) is done.
693 log.note("After attachClient (exitCode = $exitCode)");
694
695 if (endSession) {
696 // Return the isolate to the pool *before* shutting down the client. This
697 // ensures that the next client will be able to reuse the isolate instead
698 // of spawning a new.
699 this.endSession();
700 } else {
701 await detachClient();
702 }
703
704 return exitCode;
705 }
706 }
707
708 class ManagedIsolate {
709 final IsolatePool pool;
710 final Isolate isolate;
711 final SendPort port;
712 final Stream errors;
713 final ReceivePort exitPort;
714 final ReceivePort errorPort;
715 bool wasKilled = false;
716
717 ManagedIsolate(
718 this.pool, this.isolate, this.port, this.errors,
719 this.exitPort, this.errorPort);
720
721 ReceivePort beginSession() {
722 ReceivePort receivePort = new ReceivePort();
723 port.send(receivePort.sendPort);
724 return receivePort;
725 }
726
727 void endSession() {
728 if (!wasKilled) {
729 pool.idleIsolates.addLast(this);
730 }
731 }
732
733 void kill() {
734 wasKilled = true;
735 isolate.kill(priority: Isolate.IMMEDIATE);
736 isolate.removeOnExitListener(exitPort.sendPort);
737 isolate.removeErrorListener(errorPort.sendPort);
738 exitPort.close();
739 errorPort.close();
740 }
741 }
742
743 class IsolatePool {
744 // Queue of idle isolates. When an isolate becomes idle, it is added at the
745 // end.
746 final Queue<ManagedIsolate> idleIsolates = new Queue<ManagedIsolate>();
747 final Function isolateEntryPoint;
748
749 IsolatePool(this.isolateEntryPoint);
750
751 Future<ManagedIsolate> getIsolate({bool exitOnError: true}) async {
752 if (idleIsolates.isEmpty) {
753 return await spawnIsolate(exitOnError: exitOnError);
754 } else {
755 return idleIsolates.removeFirst();
756 }
757 }
758
759 Future<ManagedIsolate> spawnIsolate({bool exitOnError: true}) async {
760 StreamController errorController = new StreamController.broadcast();
761 ReceivePort receivePort = new ReceivePort();
762 Isolate isolate = await Isolate.spawn(
763 isolateEntryPoint, receivePort.sendPort, paused: true);
764 isolate.setErrorsFatal(true);
765 ReceivePort errorPort = new ReceivePort();
766 ManagedIsolate managedIsolate;
767 isolate.addErrorListener(errorPort.sendPort);
768 errorPort.listen((errorList) {
769 if (exitOnError) {
770 String error = errorList[0];
771 String stackTrace = errorList[1];
772 io.stderr.writeln(error);
773 if (stackTrace != null) {
774 io.stderr.writeln(stackTrace);
775 }
776 exit(COMPILER_EXITCODE_CRASH);
777 } else {
778 managedIsolate.wasKilled = true;
779 errorController.add(errorList);
780 }
781 });
782 ReceivePort exitPort = new ReceivePort();
783 isolate.addOnExitListener(exitPort.sendPort);
784 exitPort.listen((_) {
785 isolate.removeErrorListener(errorPort.sendPort);
786 isolate.removeOnExitListener(exitPort.sendPort);
787 errorPort.close();
788 exitPort.close();
789 idleIsolates.remove(managedIsolate);
790 });
791 await acknowledgeControlMessages(isolate, resume: isolate.pauseCapability);
792 StreamIterator iterator = new StreamIterator(receivePort);
793 bool hasElement = await iterator.moveNext();
794 if (!hasElement) {
795 throwInternalError("No port received from isolate");
796 }
797 SendPort port = iterator.current;
798 receivePort.close();
799 managedIsolate =
800 new ManagedIsolate(
801 this, isolate, port, errorController.stream, exitPort, errorPort);
802
803 return managedIsolate;
804 }
805
806 void shutdown() {
807 while (idleIsolates.isNotEmpty) {
808 idleIsolates.removeFirst().kill();
809 }
810 }
811 }
812
813 class ClientLogger {
814 static int clientLoggersAllocated = 0;
815
816 static Set<ClientLogger> pendingClients = new Set<ClientLogger>();
817
818 static Set<ClientLogger> erroneousClients = new Set<ClientLogger>();
819
820 static ClientLogger allocate() {
821 ClientLogger clientLogger = new ClientLogger(clientLoggersAllocated++);
822 pendingClients.add(clientLogger);
823 return clientLogger;
824 }
825
826 final int id;
827
828 final List<String> notes = <String>[];
829
830 List<String> arguments = <String>[];
831
832 ClientLogger(this.id);
833
834 void note(object) {
835 String note = "$object";
836 notes.add(note);
837 printToConsole("$id: $note");
838 }
839
840 void gotArguments(List<String> arguments) {
841 this.arguments = arguments;
842 note("Got arguments: ${arguments.join(' ')}.");
843 }
844
845 void done() {
846 pendingClients.remove(this);
847 note("Client done ($pendingClients).");
848 }
849
850 void error(error, StackTrace stackTrace) {
851 // TODO(ahe): Modify quit verb to report these errors.
852 erroneousClients.add(this);
853 note("Crash (${arguments.join(' ')}).\n"
854 "${stringifyError(error, stackTrace)}");
855 }
856
857 String toString() => "$id";
858 }
OLDNEW
« no previous file with comments | « pkg/fletchc/lib/src/hub/exit_codes.dart ('k') | pkg/fletchc/lib/src/hub/sentence_parser.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698