OLD | NEW |
| (Empty) |
1 // Copyright (c) 2015, the Dartino project authors. Please see the AUTHORS file | |
2 // for details. All rights reserved. Use of this source code is governed by a | |
3 // BSD-style license that can be found in the LICENSE.md file. | |
4 | |
5 library fletchc.hub_main; | |
6 | |
7 import 'dart:collection' show | |
8 Queue; | |
9 | |
10 import 'dart:io' hide | |
11 exitCode, | |
12 stderr, | |
13 stdin, | |
14 stdout; | |
15 | |
16 import 'dart:io' as io; | |
17 | |
18 import 'dart:async' show | |
19 Completer, | |
20 Stream, | |
21 StreamController, | |
22 StreamSubscription, | |
23 StreamTransformer; | |
24 | |
25 import 'dart:typed_data' show | |
26 ByteData, | |
27 Endianness, | |
28 TypedData, | |
29 Uint8List; | |
30 | |
31 import 'dart:convert' show | |
32 UTF8; | |
33 | |
34 import 'dart:isolate' show | |
35 Isolate, | |
36 ReceivePort, | |
37 SendPort; | |
38 | |
39 import '../zone_helper.dart' show | |
40 acknowledgeControlMessages, | |
41 runGuarded; | |
42 | |
43 import 'exit_codes.dart' show | |
44 COMPILER_EXITCODE_CRASH, | |
45 DART_VM_EXITCODE_COMPILE_TIME_ERROR; | |
46 | |
47 import 'client_commands.dart' show | |
48 ClientCommandCode, | |
49 handleSocketErrors; | |
50 | |
51 import '../worker/worker_main.dart' show | |
52 workerMain; | |
53 | |
54 import '../verbs/infrastructure.dart'; | |
55 | |
56 import 'sentence_parser.dart' show | |
57 Sentence, | |
58 parseSentence; | |
59 | |
60 import '../diagnostic.dart' show | |
61 InputError, | |
62 throwInternalError; | |
63 | |
64 import '../shared_command_infrastructure.dart' show | |
65 CommandBuffer, | |
66 CommandTransformerBuilder, | |
67 commandEndianness, | |
68 headerSize, | |
69 toUint8ListView; | |
70 | |
71 import '../worker/developer.dart' show | |
72 allocateWorker, | |
73 combineTasks, | |
74 configFileUri; | |
75 | |
76 import 'session_manager.dart' show | |
77 lookupSession; | |
78 | |
79 import '../verbs/create_verb.dart' show | |
80 CreateSessionTask; | |
81 | |
82 import '../please_report_crash.dart' show | |
83 crashReportRequested, | |
84 requestBugReportOnOtherCrashMessage; | |
85 | |
86 import '../verbs/options.dart' show | |
87 Options; | |
88 | |
89 import '../console_print.dart' show | |
90 printToConsole; | |
91 | |
92 import '../please_report_crash.dart' show | |
93 stringifyError; | |
94 | |
95 Function gracefulShutdown; | |
96 | |
97 final List<String> mainArguments = <String>[]; | |
98 | |
99 class ClientCommandTransformerBuilder | |
100 extends CommandTransformerBuilder<ClientCommand> { | |
101 ClientCommand makeCommand(int commandCode, ByteData payload) { | |
102 ClientCommandCode code = ClientCommandCode.values[commandCode]; | |
103 switch (code) { | |
104 case ClientCommandCode.Arguments: | |
105 return new ClientCommand(code, decodeArgumentsCommand(payload)); | |
106 | |
107 case ClientCommandCode.Stdin: | |
108 int length = payload.getUint32(0, commandEndianness); | |
109 return new ClientCommand(code, toUint8ListView(payload, 4, length)); | |
110 | |
111 case ClientCommandCode.Signal: | |
112 int signal = payload.getUint32(0, commandEndianness); | |
113 return new ClientCommand(code, signal); | |
114 | |
115 default: | |
116 return null; | |
117 } | |
118 } | |
119 | |
120 List<String> decodeArgumentsCommand(ByteData view) { | |
121 int offset = 0; | |
122 int argc = view.getUint32(offset, commandEndianness); | |
123 offset += 4; | |
124 List<String> argv = <String>[]; | |
125 for (int i = 0; i < argc; i++) { | |
126 int length = view.getUint32(offset, commandEndianness); | |
127 offset += 4; | |
128 argv.add(UTF8.decode(toUint8ListView(view, offset, length))); | |
129 offset += length; | |
130 } | |
131 return argv; | |
132 } | |
133 } | |
134 | |
135 // Class for sending client commands from the hub (main isolate) to the | |
136 // fletch c++ client. | |
137 class ClientCommandSender extends CommandSender { | |
138 final Sink<List<int>> sink; | |
139 | |
140 ClientCommandSender(this.sink); | |
141 | |
142 void sendExitCode(int exitCode) { | |
143 new CommandBuffer<ClientCommandCode>() | |
144 ..addUint32(exitCode) | |
145 ..sendOn(sink, ClientCommandCode.ExitCode); | |
146 } | |
147 | |
148 void sendDataCommand(ClientCommandCode code, List<int> data) { | |
149 new CommandBuffer<ClientCommandCode>() | |
150 ..addUint32(data.length) | |
151 ..addUint8List(data) | |
152 ..sendOn(sink, code); | |
153 } | |
154 | |
155 void sendClose() { | |
156 throwInternalError("Client (C++) doesn't support ClientCommandCode.Close."); | |
157 } | |
158 | |
159 void sendEventLoopStarted() { | |
160 throwInternalError( | |
161 "Client (C++) doesn't support ClientCommandCode.EventLoopStarted."); | |
162 } | |
163 } | |
164 | |
165 Future main(List<String> arguments) async { | |
166 // When running this program, -Dfletch.version must be provided on the Dart | |
167 // VM command line. | |
168 assert(const String.fromEnvironment('fletch.version') != null); | |
169 | |
170 mainArguments.addAll(arguments); | |
171 configFileUri = Uri.base.resolve(arguments.first); | |
172 File configFile = new File.fromUri(configFileUri); | |
173 Directory tmpdir = Directory.systemTemp.createTempSync("fletch_client"); | |
174 | |
175 File socketFile = new File("${tmpdir.path}/socket"); | |
176 try { | |
177 socketFile.deleteSync(); | |
178 } on FileSystemException catch (e) { | |
179 // Ignored. There's no way to check if a socket file exists. | |
180 } | |
181 | |
182 ServerSocket server; | |
183 | |
184 Completer shutdown = new Completer(); | |
185 | |
186 gracefulShutdown = () { | |
187 try { | |
188 socketFile.deleteSync(); | |
189 } catch (e) { | |
190 print("Unable to delete ${socketFile.path}: $e"); | |
191 } | |
192 | |
193 try { | |
194 tmpdir.deleteSync(recursive: true); | |
195 } catch (e) { | |
196 print("Unable to delete ${tmpdir.path}: $e"); | |
197 } | |
198 | |
199 if (server != null) { | |
200 server.close(); | |
201 } | |
202 if (!shutdown.isCompleted) { | |
203 shutdown.complete(); | |
204 } | |
205 }; | |
206 | |
207 void handleSignal(StreamSubscription<ProcessSignal> subscription) { | |
208 subscription.onData((ProcessSignal signal) { | |
209 // Cancel the subscription to restore default signal handler. | |
210 subscription.cancel(); | |
211 print("Received signal $signal"); | |
212 gracefulShutdown(); | |
213 // 0 means kill the current process group (including this process, which | |
214 // will now die as we restored the default signal handler above). In | |
215 // addition, killing this process ensures that any processes waiting for | |
216 // it will observe that it was killed due to a signal. There's no way to | |
217 // fake that status using exit. | |
218 Process.killPid(0, signal); | |
219 }); | |
220 shutdown.future.then((_) { | |
221 subscription.cancel(); | |
222 }); | |
223 } | |
224 | |
225 // When receiving SIGTERM or gracefully shut down. | |
226 handleSignal(ProcessSignal.SIGTERM.watch().listen(null)); | |
227 handleSignal(ProcessSignal.SIGINT.watch().listen(null)); | |
228 | |
229 server = await ServerSocket.bind(InternetAddress.LOOPBACK_IP_V4, 0); | |
230 | |
231 // Write the TCP port to a config file. This lets multiple command line | |
232 // programs share this persistent driver process, which in turn eliminates | |
233 // start up overhead. | |
234 configFile.writeAsStringSync("${server.port}", flush: true); | |
235 | |
236 // Print the temporary directory so the launching process knows where to | |
237 // connect, and that the socket is ready. | |
238 print(server.port); | |
239 | |
240 IsolatePool pool = new IsolatePool(workerMain); | |
241 try { | |
242 await server.listen((Socket controlSocket) { | |
243 handleClient(pool, handleSocketErrors(controlSocket, "controlSocket")); | |
244 }).asFuture(); | |
245 } finally { | |
246 gracefulShutdown(); | |
247 } | |
248 } | |
249 | |
250 Future<Null> handleClient(IsolatePool pool, Socket controlSocket) async { | |
251 ClientLogger log = ClientLogger.allocate(); | |
252 | |
253 ClientConnection clientConnection = | |
254 new ClientConnection(controlSocket, log)..start(); | |
255 List<String> arguments = await clientConnection.arguments; | |
256 log.gotArguments(arguments); | |
257 | |
258 await handleVerb(arguments, clientConnection, pool); | |
259 } | |
260 | |
261 Future<Null> handleVerb( | |
262 List<String> arguments, | |
263 ClientConnection clientConnection, | |
264 IsolatePool pool) async { | |
265 crashReportRequested = false; | |
266 | |
267 Future<int> performVerb() async { | |
268 clientConnection.parseArguments(arguments); | |
269 String sessionName = clientConnection.sentence.sessionName; | |
270 UserSession session; | |
271 SharedTask initializer; | |
272 if (sessionName != null) { | |
273 session = lookupSession(sessionName); | |
274 if (session == null) { | |
275 session = await createSession(sessionName, () => allocateWorker(pool)); | |
276 initializer = new CreateSessionTask( | |
277 sessionName, null, clientConnection.sentence.base, configFileUri); | |
278 } | |
279 } | |
280 ClientVerbContext context = new ClientVerbContext( | |
281 clientConnection, pool, session, initializer: initializer); | |
282 return await clientConnection.sentence.performVerb(context); | |
283 } | |
284 | |
285 int exitCode = await runGuarded( | |
286 performVerb, | |
287 printLineOnStdout: clientConnection.printLineOnStdout, | |
288 handleLateError: clientConnection.log.error) | |
289 .catchError( | |
290 clientConnection.reportErrorToClient, test: (e) => e is InputError) | |
291 .catchError((error, StackTrace stackTrace) { | |
292 if (!crashReportRequested) { | |
293 clientConnection.printLineOnStderr( | |
294 requestBugReportOnOtherCrashMessage); | |
295 crashReportRequested = true; | |
296 } | |
297 clientConnection.printLineOnStderr('$error'); | |
298 if (stackTrace != null) { | |
299 clientConnection.printLineOnStderr('$stackTrace'); | |
300 } | |
301 return COMPILER_EXITCODE_CRASH; | |
302 }); | |
303 clientConnection.exit(exitCode); | |
304 } | |
305 | |
306 class ClientVerbContext extends VerbContext { | |
307 SharedTask initializer; | |
308 | |
309 ClientVerbContext( | |
310 ClientConnection clientConnection, | |
311 IsolatePool pool, | |
312 UserSession session, | |
313 {this.initializer}) | |
314 : super(clientConnection, pool, session); | |
315 | |
316 ClientVerbContext copyWithSession(UserSession session) { | |
317 return new ClientVerbContext(clientConnection, pool, session); | |
318 } | |
319 | |
320 Future<int> performTaskInWorker(SharedTask task) async { | |
321 if (session.worker.isolate.wasKilled) { | |
322 throwInternalError( | |
323 "session ${session.name}: worker isolate terminated unexpectedly"); | |
324 } | |
325 if (session.hasActiveWorkerTask) { | |
326 throwFatalError(DiagnosticKind.busySession, sessionName: session.name); | |
327 } | |
328 session.hasActiveWorkerTask = true; | |
329 return session.worker.performTask( | |
330 combineTasks(initializer, task), clientConnection, userSession: session) | |
331 .whenComplete(() { | |
332 session.hasActiveWorkerTask = false; | |
333 }); | |
334 } | |
335 } | |
336 | |
337 /// Handles communication with the Fletch C++ client. | |
338 class ClientConnection { | |
339 /// Socket used for receiving and sending commands from/to the Fletch C++ | |
340 /// client. | |
341 final Socket socket; | |
342 | |
343 /// Controller used to send commands to the from the ClientConnection to | |
344 /// anyone listening on ClientConnection.commands (see [commands] below). The | |
345 /// only listener as of now is the WorkerConnection which typically forwards | |
346 /// the commands to the worker isolate. | |
347 final StreamController<ClientCommand> controller = | |
348 new StreamController<ClientCommand>(); | |
349 | |
350 final ClientLogger log; | |
351 | |
352 /// The commandSender is used to send commands back to the Fletch C++ client. | |
353 ClientCommandSender commandSender; | |
354 | |
355 StreamSubscription<ClientCommand> subscription; | |
356 Completer<Null> completer; | |
357 | |
358 Completer<List<String>> argumentsCompleter = new Completer<List<String>>(); | |
359 | |
360 /// The analysed version of the request from the client. | |
361 /// Updated by [parseArguments]. | |
362 AnalyzedSentence sentence; | |
363 | |
364 /// Path to the fletch VM. Updated by [parseArguments]. | |
365 String fletchVm; | |
366 | |
367 ClientConnection(this.socket, this.log); | |
368 | |
369 /// Stream of commands from the Fletch C++ client to the hub (main isolate). | |
370 /// The commands are typically forwarded to a worker isolate, see | |
371 /// handleClientCommand. | |
372 Stream<ClientCommand> get commands => controller.stream; | |
373 | |
374 /// Completes when [endSession] is called. | |
375 Future<Null> get done => completer.future; | |
376 | |
377 /// Completes with the command-line arguments from the client. | |
378 Future<List<String>> get arguments => argumentsCompleter.future; | |
379 | |
380 /// Start processing commands from the client. | |
381 void start() { | |
382 // Setup a command sender used to send responses from the hub (main isolate) | |
383 // back to the Fletch C++ client. | |
384 commandSender = new ClientCommandSender(socket); | |
385 | |
386 // Setup a listener for handling commands coming from the Fletch C++ | |
387 // client. | |
388 StreamTransformer<List<int>, ClientCommand> transformer = | |
389 new ClientCommandTransformerBuilder().build(); | |
390 subscription = socket.transform(transformer).listen(null); | |
391 subscription | |
392 ..onData(handleClientCommand) | |
393 ..onError(handleClientCommandError) | |
394 ..onDone(handleClientCommandsDone); | |
395 completer = new Completer<Null>(); | |
396 } | |
397 | |
398 void handleClientCommand(ClientCommand command) { | |
399 if (command.code == ClientCommandCode.Arguments) { | |
400 // This intentionally throws if arguments are sent more than once. | |
401 argumentsCompleter.complete(command.data); | |
402 } else { | |
403 sendCommandToWorker(command); | |
404 } | |
405 } | |
406 | |
407 void sendCommandToWorker(ClientCommand command) { | |
408 // TODO(ahe): It is a bit weird that this method is on the client proxy. | |
409 // Ideally, this would be a method on WorkerConnection. However the client | |
410 // is created before the WorkerConnection which is not created until/if | |
411 // needed. The WorkerConnection will start listening to the client's | |
412 // commands when attaching, see WorkerConnection.attachClient. | |
413 controller.add(command); | |
414 } | |
415 | |
416 void handleClientCommandError(error, StackTrace trace) { | |
417 print(stringifyError(error, trace)); | |
418 completer.completeError(error, trace); | |
419 // Cancel the subscription if an error occurred, this prevents | |
420 // [handleCommandsDone] from being called and attempt to complete | |
421 // [completer]. | |
422 subscription.cancel(); | |
423 } | |
424 | |
425 void handleClientCommandsDone() { | |
426 completer.complete(); | |
427 } | |
428 | |
429 // Send a command back to the Fletch C++ client. | |
430 void sendCommandToClient(ClientCommand command) { | |
431 switch (command.code) { | |
432 case ClientCommandCode.Stdout: | |
433 commandSender.sendStdoutBytes(command.data); | |
434 break; | |
435 | |
436 case ClientCommandCode.Stderr: | |
437 commandSender.sendStderrBytes(command.data); | |
438 break; | |
439 | |
440 case ClientCommandCode.ExitCode: | |
441 commandSender.sendExitCode(command.data); | |
442 break; | |
443 | |
444 default: | |
445 throwInternalError("Unexpected command: $command"); | |
446 } | |
447 } | |
448 | |
449 void endSession() { | |
450 socket.flush().then((_) { | |
451 socket.close(); | |
452 }); | |
453 } | |
454 | |
455 void printLineOnStderr(String line) { | |
456 commandSender.sendStderrBytes(UTF8.encode("$line\n")); | |
457 } | |
458 | |
459 void printLineOnStdout(String line) { | |
460 commandSender.sendStdoutBytes(UTF8.encode('$line\n')); | |
461 } | |
462 | |
463 void exit(int exitCode) { | |
464 if (exitCode == null) { | |
465 exitCode = COMPILER_EXITCODE_CRASH; | |
466 try { | |
467 throwInternalError("Internal error: exitCode is null"); | |
468 } on InputError catch (error, stackTrace) { | |
469 // We can't afford to throw an error here as it will take down the | |
470 // entire process. | |
471 exitCode = reportErrorToClient(error, stackTrace); | |
472 } | |
473 } | |
474 commandSender.sendExitCode(exitCode); | |
475 endSession(); | |
476 } | |
477 | |
478 AnalyzedSentence parseArguments(List<String> arguments) { | |
479 Options options = Options.parse(arguments); | |
480 Sentence sentence = | |
481 parseSentence(options.nonOptionArguments, includesProgramName: true); | |
482 // [programName] is the canonicalized absolute path to the fletch | |
483 // executable (the C++ program). | |
484 String programName = sentence.programName; | |
485 String fletchVm = "$programName-vm"; | |
486 this.sentence = analyzeSentence(sentence, options); | |
487 this.fletchVm = fletchVm; | |
488 return this.sentence; | |
489 } | |
490 | |
491 int reportErrorToClient(InputError error, StackTrace stackTrace) { | |
492 bool isInternalError = error.kind == DiagnosticKind.internalError; | |
493 if (isInternalError && !crashReportRequested) { | |
494 printLineOnStderr(requestBugReportOnOtherCrashMessage); | |
495 crashReportRequested = true; | |
496 } | |
497 printLineOnStderr(error.asDiagnostic().formatMessage()); | |
498 if (isInternalError) { | |
499 printLineOnStderr('$stackTrace'); | |
500 return COMPILER_EXITCODE_CRASH; | |
501 } else { | |
502 return DART_VM_EXITCODE_COMPILE_TIME_ERROR; | |
503 } | |
504 } | |
505 } | |
506 | |
507 /// The WorkerConnection represents a worker isolate in the hub (main isolate). | |
508 /// Ie. it is the hub's object for communicating with a worker isolate. | |
509 class WorkerConnection { | |
510 /// The worker isolate. | |
511 final ManagedIsolate isolate; | |
512 | |
513 /// A port used to send commands to the worker isolate. | |
514 SendPort sendPort; | |
515 | |
516 /// A port used to read commands from the worker isolate. | |
517 ReceivePort receivePort; | |
518 | |
519 /// workerCommands is an iterator over all the commands coming from the | |
520 /// worker isolate. These are typically the outbound messages destined for | |
521 /// the Fletch C++ client. | |
522 /// It iterates over the data coming on the receivePort. | |
523 StreamIterator<ClientCommand> workerCommands; | |
524 | |
525 /// When true, the worker can be shutdown by sending it a | |
526 /// ClientCommandCode.Signal command. Otherwise, it must be killed. | |
527 bool eventLoopStarted = false; | |
528 | |
529 /// Subscription for errors from [isolate]. | |
530 StreamSubscription errorSubscription; | |
531 | |
532 bool crashReportRequested = false; | |
533 | |
534 WorkerConnection(this.isolate); | |
535 | |
536 /// Begin a session with the worker isolate. | |
537 Future<Null> beginSession() async { | |
538 errorSubscription = isolate.errors.listen(null); | |
539 errorSubscription.pause(); | |
540 receivePort = isolate.beginSession(); | |
541 // Setup the workerCommands iterator using a stream converting the | |
542 // incoming data to [ClientCommand]s. | |
543 Stream<ClientCommand> workerCommandStream = receivePort.map( | |
544 (message) => new ClientCommand( | |
545 ClientCommandCode.values[message[0]], message[1])); | |
546 workerCommands = new StreamIterator<ClientCommand>(workerCommandStream); | |
547 if (!await workerCommands.moveNext()) { | |
548 // The worker must have been killed, or died in some other way. | |
549 // TODO(ahe): Add this assertion: assert(isolate.wasKilled); | |
550 endSession(); | |
551 return; | |
552 } | |
553 ClientCommand command = workerCommands.current; | |
554 assert(command.code == ClientCommandCode.SendPort); | |
555 assert(command.data != null); | |
556 sendPort = command.data; | |
557 } | |
558 | |
559 /// Attach to a fletch C++ client and forward commands to the worker isolate, | |
560 /// and vice versa. The returned future normally completes when the worker | |
561 /// isolate sends ClientCommandCode.ClosePort, or if the isolate is killed due | |
562 /// to ClientCommandCode.Signal arriving through client.commands. | |
563 Future<int> attachClient( | |
564 ClientConnection clientConnection, | |
565 UserSession userSession) async { | |
566 | |
567 // Method for handling commands coming from the client. The commands are | |
568 // typically forwarded to the worker isolate. | |
569 handleCommandsFromClient(ClientCommand command) { | |
570 if (command.code == ClientCommandCode.Signal && !eventLoopStarted) { | |
571 if (userSession != null) { | |
572 userSession.kill(clientConnection.printLineOnStderr); | |
573 } else { | |
574 isolate.kill(); | |
575 } | |
576 receivePort.close(); | |
577 } else { | |
578 sendPort.send([command.code.index, command.data]); | |
579 } | |
580 } | |
581 | |
582 // Method for handling commands coming back from the worker isolate. | |
583 // It typically forwards them to the Fletch C++ client via the | |
584 // clientConnection. | |
585 Future<int> handleCommandsFromWorker( | |
586 ClientConnection clientConnection) async { | |
587 int exitCode = COMPILER_EXITCODE_CRASH; | |
588 while (await workerCommands.moveNext()) { | |
589 ClientCommand command = workerCommands.current; | |
590 switch (command.code) { | |
591 case ClientCommandCode.ClosePort: | |
592 receivePort.close(); | |
593 break; | |
594 | |
595 case ClientCommandCode.EventLoopStarted: | |
596 eventLoopStarted = true; | |
597 break; | |
598 | |
599 case ClientCommandCode.ExitCode: | |
600 exitCode = command.data; | |
601 break; | |
602 | |
603 default: | |
604 clientConnection.sendCommandToClient(command); | |
605 break; | |
606 } | |
607 } | |
608 return exitCode; | |
609 } | |
610 | |
611 eventLoopStarted = false; | |
612 crashReportRequested = false; | |
613 errorSubscription.onData((errorList) { | |
614 String error = errorList[0]; | |
615 String stackTrace = errorList[1]; | |
616 if (!crashReportRequested) { | |
617 clientConnection.printLineOnStderr(requestBugReportOnOtherCrashMessage); | |
618 crashReportRequested = true; | |
619 } | |
620 clientConnection.printLineOnStderr(error); | |
621 if (stackTrace != null) { | |
622 clientConnection.printLineOnStderr(stackTrace); | |
623 } | |
624 if (userSession != null) { | |
625 userSession.kill(clientConnection.printLineOnStderr); | |
626 } else { | |
627 isolate.kill(); | |
628 } | |
629 receivePort.close(); | |
630 }); | |
631 errorSubscription.resume(); | |
632 | |
633 // Start listening for commands coming from the Fletch C++ client (via | |
634 // clientConnection). | |
635 // TODO(ahe): Add onDone event handler to detach the client. | |
636 clientConnection.commands.listen(handleCommandsFromClient); | |
637 | |
638 // Start processing commands coming from the worker. | |
639 int exitCode = await handleCommandsFromWorker(clientConnection); | |
640 | |
641 errorSubscription.pause(); | |
642 return exitCode; | |
643 } | |
644 | |
645 void endSession() { | |
646 receivePort.close(); | |
647 isolate.endSession(); | |
648 } | |
649 | |
650 Future<Null> detachClient() async { | |
651 if (isolate.wasKilled) { | |
652 // Setting these to null will ensure that [attachClient] causes a crash | |
653 // if called after isolate was killed. | |
654 errorSubscription = null; | |
655 receivePort = null; | |
656 workerCommands = null; | |
657 sendPort = null; | |
658 | |
659 // TODO(ahe): The session is dead. Tell the user about this. | |
660 return null; | |
661 } | |
662 // TODO(ahe): Perform the reverse of attachClient here. | |
663 await beginSession(); | |
664 } | |
665 | |
666 Future<int> performTask( | |
667 SharedTask task, | |
668 ClientConnection clientConnection, | |
669 { | |
670 UserSession userSession, | |
671 /// End this session and return this isolate to the pool. | |
672 bool endSession: false}) async { | |
673 ClientLogger log = clientConnection.log; | |
674 | |
675 clientConnection.done.catchError((error, StackTrace stackTrace) { | |
676 log.error(error, stackTrace); | |
677 }).then((_) { | |
678 log.done(); | |
679 }); | |
680 | |
681 // Indirectly send the task to be performed to the worker isolate via the | |
682 // clientConnection. | |
683 clientConnection.sendCommandToWorker( | |
684 new ClientCommand(ClientCommandCode.PerformTask, task)); | |
685 | |
686 // Forward commands between the C++ fletch client [clientConnection], and th
e | |
687 // worker isolate `this`. Also, Intercept the signal command and | |
688 // potentially kill the isolate (the isolate needs to tell if it is | |
689 // interuptible or needs to be killed, an example of the latter is, if | |
690 // compiler is running). | |
691 int exitCode = await attachClient(clientConnection, userSession); | |
692 // The verb (which was performed in the worker) is done. | |
693 log.note("After attachClient (exitCode = $exitCode)"); | |
694 | |
695 if (endSession) { | |
696 // Return the isolate to the pool *before* shutting down the client. This | |
697 // ensures that the next client will be able to reuse the isolate instead | |
698 // of spawning a new. | |
699 this.endSession(); | |
700 } else { | |
701 await detachClient(); | |
702 } | |
703 | |
704 return exitCode; | |
705 } | |
706 } | |
707 | |
708 class ManagedIsolate { | |
709 final IsolatePool pool; | |
710 final Isolate isolate; | |
711 final SendPort port; | |
712 final Stream errors; | |
713 final ReceivePort exitPort; | |
714 final ReceivePort errorPort; | |
715 bool wasKilled = false; | |
716 | |
717 ManagedIsolate( | |
718 this.pool, this.isolate, this.port, this.errors, | |
719 this.exitPort, this.errorPort); | |
720 | |
721 ReceivePort beginSession() { | |
722 ReceivePort receivePort = new ReceivePort(); | |
723 port.send(receivePort.sendPort); | |
724 return receivePort; | |
725 } | |
726 | |
727 void endSession() { | |
728 if (!wasKilled) { | |
729 pool.idleIsolates.addLast(this); | |
730 } | |
731 } | |
732 | |
733 void kill() { | |
734 wasKilled = true; | |
735 isolate.kill(priority: Isolate.IMMEDIATE); | |
736 isolate.removeOnExitListener(exitPort.sendPort); | |
737 isolate.removeErrorListener(errorPort.sendPort); | |
738 exitPort.close(); | |
739 errorPort.close(); | |
740 } | |
741 } | |
742 | |
743 class IsolatePool { | |
744 // Queue of idle isolates. When an isolate becomes idle, it is added at the | |
745 // end. | |
746 final Queue<ManagedIsolate> idleIsolates = new Queue<ManagedIsolate>(); | |
747 final Function isolateEntryPoint; | |
748 | |
749 IsolatePool(this.isolateEntryPoint); | |
750 | |
751 Future<ManagedIsolate> getIsolate({bool exitOnError: true}) async { | |
752 if (idleIsolates.isEmpty) { | |
753 return await spawnIsolate(exitOnError: exitOnError); | |
754 } else { | |
755 return idleIsolates.removeFirst(); | |
756 } | |
757 } | |
758 | |
759 Future<ManagedIsolate> spawnIsolate({bool exitOnError: true}) async { | |
760 StreamController errorController = new StreamController.broadcast(); | |
761 ReceivePort receivePort = new ReceivePort(); | |
762 Isolate isolate = await Isolate.spawn( | |
763 isolateEntryPoint, receivePort.sendPort, paused: true); | |
764 isolate.setErrorsFatal(true); | |
765 ReceivePort errorPort = new ReceivePort(); | |
766 ManagedIsolate managedIsolate; | |
767 isolate.addErrorListener(errorPort.sendPort); | |
768 errorPort.listen((errorList) { | |
769 if (exitOnError) { | |
770 String error = errorList[0]; | |
771 String stackTrace = errorList[1]; | |
772 io.stderr.writeln(error); | |
773 if (stackTrace != null) { | |
774 io.stderr.writeln(stackTrace); | |
775 } | |
776 exit(COMPILER_EXITCODE_CRASH); | |
777 } else { | |
778 managedIsolate.wasKilled = true; | |
779 errorController.add(errorList); | |
780 } | |
781 }); | |
782 ReceivePort exitPort = new ReceivePort(); | |
783 isolate.addOnExitListener(exitPort.sendPort); | |
784 exitPort.listen((_) { | |
785 isolate.removeErrorListener(errorPort.sendPort); | |
786 isolate.removeOnExitListener(exitPort.sendPort); | |
787 errorPort.close(); | |
788 exitPort.close(); | |
789 idleIsolates.remove(managedIsolate); | |
790 }); | |
791 await acknowledgeControlMessages(isolate, resume: isolate.pauseCapability); | |
792 StreamIterator iterator = new StreamIterator(receivePort); | |
793 bool hasElement = await iterator.moveNext(); | |
794 if (!hasElement) { | |
795 throwInternalError("No port received from isolate"); | |
796 } | |
797 SendPort port = iterator.current; | |
798 receivePort.close(); | |
799 managedIsolate = | |
800 new ManagedIsolate( | |
801 this, isolate, port, errorController.stream, exitPort, errorPort); | |
802 | |
803 return managedIsolate; | |
804 } | |
805 | |
806 void shutdown() { | |
807 while (idleIsolates.isNotEmpty) { | |
808 idleIsolates.removeFirst().kill(); | |
809 } | |
810 } | |
811 } | |
812 | |
813 class ClientLogger { | |
814 static int clientLoggersAllocated = 0; | |
815 | |
816 static Set<ClientLogger> pendingClients = new Set<ClientLogger>(); | |
817 | |
818 static Set<ClientLogger> erroneousClients = new Set<ClientLogger>(); | |
819 | |
820 static ClientLogger allocate() { | |
821 ClientLogger clientLogger = new ClientLogger(clientLoggersAllocated++); | |
822 pendingClients.add(clientLogger); | |
823 return clientLogger; | |
824 } | |
825 | |
826 final int id; | |
827 | |
828 final List<String> notes = <String>[]; | |
829 | |
830 List<String> arguments = <String>[]; | |
831 | |
832 ClientLogger(this.id); | |
833 | |
834 void note(object) { | |
835 String note = "$object"; | |
836 notes.add(note); | |
837 printToConsole("$id: $note"); | |
838 } | |
839 | |
840 void gotArguments(List<String> arguments) { | |
841 this.arguments = arguments; | |
842 note("Got arguments: ${arguments.join(' ')}."); | |
843 } | |
844 | |
845 void done() { | |
846 pendingClients.remove(this); | |
847 note("Client done ($pendingClients)."); | |
848 } | |
849 | |
850 void error(error, StackTrace stackTrace) { | |
851 // TODO(ahe): Modify quit verb to report these errors. | |
852 erroneousClients.add(this); | |
853 note("Crash (${arguments.join(' ')}).\n" | |
854 "${stringifyError(error, stackTrace)}"); | |
855 } | |
856 | |
857 String toString() => "$id"; | |
858 } | |
OLD | NEW |