| Index: pkg/fletchc/lib/src/hub/hub_main.dart
|
| diff --git a/pkg/fletchc/lib/src/hub/hub_main.dart b/pkg/fletchc/lib/src/hub/hub_main.dart
|
| deleted file mode 100644
|
| index 00f0460d21a1c607205834fd77663a0b893ad274..0000000000000000000000000000000000000000
|
| --- a/pkg/fletchc/lib/src/hub/hub_main.dart
|
| +++ /dev/null
|
| @@ -1,858 +0,0 @@
|
| -// Copyright (c) 2015, the Dartino project authors. Please see the AUTHORS file
|
| -// for details. All rights reserved. Use of this source code is governed by a
|
| -// BSD-style license that can be found in the LICENSE.md file.
|
| -
|
| -library fletchc.hub_main;
|
| -
|
| -import 'dart:collection' show
|
| - Queue;
|
| -
|
| -import 'dart:io' hide
|
| - exitCode,
|
| - stderr,
|
| - stdin,
|
| - stdout;
|
| -
|
| -import 'dart:io' as io;
|
| -
|
| -import 'dart:async' show
|
| - Completer,
|
| - Stream,
|
| - StreamController,
|
| - StreamSubscription,
|
| - StreamTransformer;
|
| -
|
| -import 'dart:typed_data' show
|
| - ByteData,
|
| - Endianness,
|
| - TypedData,
|
| - Uint8List;
|
| -
|
| -import 'dart:convert' show
|
| - UTF8;
|
| -
|
| -import 'dart:isolate' show
|
| - Isolate,
|
| - ReceivePort,
|
| - SendPort;
|
| -
|
| -import '../zone_helper.dart' show
|
| - acknowledgeControlMessages,
|
| - runGuarded;
|
| -
|
| -import 'exit_codes.dart' show
|
| - COMPILER_EXITCODE_CRASH,
|
| - DART_VM_EXITCODE_COMPILE_TIME_ERROR;
|
| -
|
| -import 'client_commands.dart' show
|
| - ClientCommandCode,
|
| - handleSocketErrors;
|
| -
|
| -import '../worker/worker_main.dart' show
|
| - workerMain;
|
| -
|
| -import '../verbs/infrastructure.dart';
|
| -
|
| -import 'sentence_parser.dart' show
|
| - Sentence,
|
| - parseSentence;
|
| -
|
| -import '../diagnostic.dart' show
|
| - InputError,
|
| - throwInternalError;
|
| -
|
| -import '../shared_command_infrastructure.dart' show
|
| - CommandBuffer,
|
| - CommandTransformerBuilder,
|
| - commandEndianness,
|
| - headerSize,
|
| - toUint8ListView;
|
| -
|
| -import '../worker/developer.dart' show
|
| - allocateWorker,
|
| - combineTasks,
|
| - configFileUri;
|
| -
|
| -import 'session_manager.dart' show
|
| - lookupSession;
|
| -
|
| -import '../verbs/create_verb.dart' show
|
| - CreateSessionTask;
|
| -
|
| -import '../please_report_crash.dart' show
|
| - crashReportRequested,
|
| - requestBugReportOnOtherCrashMessage;
|
| -
|
| -import '../verbs/options.dart' show
|
| - Options;
|
| -
|
| -import '../console_print.dart' show
|
| - printToConsole;
|
| -
|
| -import '../please_report_crash.dart' show
|
| - stringifyError;
|
| -
|
| -Function gracefulShutdown;
|
| -
|
| -final List<String> mainArguments = <String>[];
|
| -
|
| -class ClientCommandTransformerBuilder
|
| - extends CommandTransformerBuilder<ClientCommand> {
|
| - ClientCommand makeCommand(int commandCode, ByteData payload) {
|
| - ClientCommandCode code = ClientCommandCode.values[commandCode];
|
| - switch (code) {
|
| - case ClientCommandCode.Arguments:
|
| - return new ClientCommand(code, decodeArgumentsCommand(payload));
|
| -
|
| - case ClientCommandCode.Stdin:
|
| - int length = payload.getUint32(0, commandEndianness);
|
| - return new ClientCommand(code, toUint8ListView(payload, 4, length));
|
| -
|
| - case ClientCommandCode.Signal:
|
| - int signal = payload.getUint32(0, commandEndianness);
|
| - return new ClientCommand(code, signal);
|
| -
|
| - default:
|
| - return null;
|
| - }
|
| - }
|
| -
|
| - List<String> decodeArgumentsCommand(ByteData view) {
|
| - int offset = 0;
|
| - int argc = view.getUint32(offset, commandEndianness);
|
| - offset += 4;
|
| - List<String> argv = <String>[];
|
| - for (int i = 0; i < argc; i++) {
|
| - int length = view.getUint32(offset, commandEndianness);
|
| - offset += 4;
|
| - argv.add(UTF8.decode(toUint8ListView(view, offset, length)));
|
| - offset += length;
|
| - }
|
| - return argv;
|
| - }
|
| -}
|
| -
|
| -// Class for sending client commands from the hub (main isolate) to the
|
| -// fletch c++ client.
|
| -class ClientCommandSender extends CommandSender {
|
| - final Sink<List<int>> sink;
|
| -
|
| - ClientCommandSender(this.sink);
|
| -
|
| - void sendExitCode(int exitCode) {
|
| - new CommandBuffer<ClientCommandCode>()
|
| - ..addUint32(exitCode)
|
| - ..sendOn(sink, ClientCommandCode.ExitCode);
|
| - }
|
| -
|
| - void sendDataCommand(ClientCommandCode code, List<int> data) {
|
| - new CommandBuffer<ClientCommandCode>()
|
| - ..addUint32(data.length)
|
| - ..addUint8List(data)
|
| - ..sendOn(sink, code);
|
| - }
|
| -
|
| - void sendClose() {
|
| - throwInternalError("Client (C++) doesn't support ClientCommandCode.Close.");
|
| - }
|
| -
|
| - void sendEventLoopStarted() {
|
| - throwInternalError(
|
| - "Client (C++) doesn't support ClientCommandCode.EventLoopStarted.");
|
| - }
|
| -}
|
| -
|
| -Future main(List<String> arguments) async {
|
| - // When running this program, -Dfletch.version must be provided on the Dart
|
| - // VM command line.
|
| - assert(const String.fromEnvironment('fletch.version') != null);
|
| -
|
| - mainArguments.addAll(arguments);
|
| - configFileUri = Uri.base.resolve(arguments.first);
|
| - File configFile = new File.fromUri(configFileUri);
|
| - Directory tmpdir = Directory.systemTemp.createTempSync("fletch_client");
|
| -
|
| - File socketFile = new File("${tmpdir.path}/socket");
|
| - try {
|
| - socketFile.deleteSync();
|
| - } on FileSystemException catch (e) {
|
| - // Ignored. There's no way to check if a socket file exists.
|
| - }
|
| -
|
| - ServerSocket server;
|
| -
|
| - Completer shutdown = new Completer();
|
| -
|
| - gracefulShutdown = () {
|
| - try {
|
| - socketFile.deleteSync();
|
| - } catch (e) {
|
| - print("Unable to delete ${socketFile.path}: $e");
|
| - }
|
| -
|
| - try {
|
| - tmpdir.deleteSync(recursive: true);
|
| - } catch (e) {
|
| - print("Unable to delete ${tmpdir.path}: $e");
|
| - }
|
| -
|
| - if (server != null) {
|
| - server.close();
|
| - }
|
| - if (!shutdown.isCompleted) {
|
| - shutdown.complete();
|
| - }
|
| - };
|
| -
|
| - void handleSignal(StreamSubscription<ProcessSignal> subscription) {
|
| - subscription.onData((ProcessSignal signal) {
|
| - // Cancel the subscription to restore default signal handler.
|
| - subscription.cancel();
|
| - print("Received signal $signal");
|
| - gracefulShutdown();
|
| - // 0 means kill the current process group (including this process, which
|
| - // will now die as we restored the default signal handler above). In
|
| - // addition, killing this process ensures that any processes waiting for
|
| - // it will observe that it was killed due to a signal. There's no way to
|
| - // fake that status using exit.
|
| - Process.killPid(0, signal);
|
| - });
|
| - shutdown.future.then((_) {
|
| - subscription.cancel();
|
| - });
|
| - }
|
| -
|
| - // When receiving SIGTERM or gracefully shut down.
|
| - handleSignal(ProcessSignal.SIGTERM.watch().listen(null));
|
| - handleSignal(ProcessSignal.SIGINT.watch().listen(null));
|
| -
|
| - server = await ServerSocket.bind(InternetAddress.LOOPBACK_IP_V4, 0);
|
| -
|
| - // Write the TCP port to a config file. This lets multiple command line
|
| - // programs share this persistent driver process, which in turn eliminates
|
| - // start up overhead.
|
| - configFile.writeAsStringSync("${server.port}", flush: true);
|
| -
|
| - // Print the temporary directory so the launching process knows where to
|
| - // connect, and that the socket is ready.
|
| - print(server.port);
|
| -
|
| - IsolatePool pool = new IsolatePool(workerMain);
|
| - try {
|
| - await server.listen((Socket controlSocket) {
|
| - handleClient(pool, handleSocketErrors(controlSocket, "controlSocket"));
|
| - }).asFuture();
|
| - } finally {
|
| - gracefulShutdown();
|
| - }
|
| -}
|
| -
|
| -Future<Null> handleClient(IsolatePool pool, Socket controlSocket) async {
|
| - ClientLogger log = ClientLogger.allocate();
|
| -
|
| - ClientConnection clientConnection =
|
| - new ClientConnection(controlSocket, log)..start();
|
| - List<String> arguments = await clientConnection.arguments;
|
| - log.gotArguments(arguments);
|
| -
|
| - await handleVerb(arguments, clientConnection, pool);
|
| -}
|
| -
|
| -Future<Null> handleVerb(
|
| - List<String> arguments,
|
| - ClientConnection clientConnection,
|
| - IsolatePool pool) async {
|
| - crashReportRequested = false;
|
| -
|
| - Future<int> performVerb() async {
|
| - clientConnection.parseArguments(arguments);
|
| - String sessionName = clientConnection.sentence.sessionName;
|
| - UserSession session;
|
| - SharedTask initializer;
|
| - if (sessionName != null) {
|
| - session = lookupSession(sessionName);
|
| - if (session == null) {
|
| - session = await createSession(sessionName, () => allocateWorker(pool));
|
| - initializer = new CreateSessionTask(
|
| - sessionName, null, clientConnection.sentence.base, configFileUri);
|
| - }
|
| - }
|
| - ClientVerbContext context = new ClientVerbContext(
|
| - clientConnection, pool, session, initializer: initializer);
|
| - return await clientConnection.sentence.performVerb(context);
|
| - }
|
| -
|
| - int exitCode = await runGuarded(
|
| - performVerb,
|
| - printLineOnStdout: clientConnection.printLineOnStdout,
|
| - handleLateError: clientConnection.log.error)
|
| - .catchError(
|
| - clientConnection.reportErrorToClient, test: (e) => e is InputError)
|
| - .catchError((error, StackTrace stackTrace) {
|
| - if (!crashReportRequested) {
|
| - clientConnection.printLineOnStderr(
|
| - requestBugReportOnOtherCrashMessage);
|
| - crashReportRequested = true;
|
| - }
|
| - clientConnection.printLineOnStderr('$error');
|
| - if (stackTrace != null) {
|
| - clientConnection.printLineOnStderr('$stackTrace');
|
| - }
|
| - return COMPILER_EXITCODE_CRASH;
|
| - });
|
| - clientConnection.exit(exitCode);
|
| -}
|
| -
|
| -class ClientVerbContext extends VerbContext {
|
| - SharedTask initializer;
|
| -
|
| - ClientVerbContext(
|
| - ClientConnection clientConnection,
|
| - IsolatePool pool,
|
| - UserSession session,
|
| - {this.initializer})
|
| - : super(clientConnection, pool, session);
|
| -
|
| - ClientVerbContext copyWithSession(UserSession session) {
|
| - return new ClientVerbContext(clientConnection, pool, session);
|
| - }
|
| -
|
| - Future<int> performTaskInWorker(SharedTask task) async {
|
| - if (session.worker.isolate.wasKilled) {
|
| - throwInternalError(
|
| - "session ${session.name}: worker isolate terminated unexpectedly");
|
| - }
|
| - if (session.hasActiveWorkerTask) {
|
| - throwFatalError(DiagnosticKind.busySession, sessionName: session.name);
|
| - }
|
| - session.hasActiveWorkerTask = true;
|
| - return session.worker.performTask(
|
| - combineTasks(initializer, task), clientConnection, userSession: session)
|
| - .whenComplete(() {
|
| - session.hasActiveWorkerTask = false;
|
| - });
|
| - }
|
| -}
|
| -
|
| -/// Handles communication with the Fletch C++ client.
|
| -class ClientConnection {
|
| - /// Socket used for receiving and sending commands from/to the Fletch C++
|
| - /// client.
|
| - final Socket socket;
|
| -
|
| - /// Controller used to send commands to the from the ClientConnection to
|
| - /// anyone listening on ClientConnection.commands (see [commands] below). The
|
| - /// only listener as of now is the WorkerConnection which typically forwards
|
| - /// the commands to the worker isolate.
|
| - final StreamController<ClientCommand> controller =
|
| - new StreamController<ClientCommand>();
|
| -
|
| - final ClientLogger log;
|
| -
|
| - /// The commandSender is used to send commands back to the Fletch C++ client.
|
| - ClientCommandSender commandSender;
|
| -
|
| - StreamSubscription<ClientCommand> subscription;
|
| - Completer<Null> completer;
|
| -
|
| - Completer<List<String>> argumentsCompleter = new Completer<List<String>>();
|
| -
|
| - /// The analysed version of the request from the client.
|
| - /// Updated by [parseArguments].
|
| - AnalyzedSentence sentence;
|
| -
|
| - /// Path to the fletch VM. Updated by [parseArguments].
|
| - String fletchVm;
|
| -
|
| - ClientConnection(this.socket, this.log);
|
| -
|
| - /// Stream of commands from the Fletch C++ client to the hub (main isolate).
|
| - /// The commands are typically forwarded to a worker isolate, see
|
| - /// handleClientCommand.
|
| - Stream<ClientCommand> get commands => controller.stream;
|
| -
|
| - /// Completes when [endSession] is called.
|
| - Future<Null> get done => completer.future;
|
| -
|
| - /// Completes with the command-line arguments from the client.
|
| - Future<List<String>> get arguments => argumentsCompleter.future;
|
| -
|
| - /// Start processing commands from the client.
|
| - void start() {
|
| - // Setup a command sender used to send responses from the hub (main isolate)
|
| - // back to the Fletch C++ client.
|
| - commandSender = new ClientCommandSender(socket);
|
| -
|
| - // Setup a listener for handling commands coming from the Fletch C++
|
| - // client.
|
| - StreamTransformer<List<int>, ClientCommand> transformer =
|
| - new ClientCommandTransformerBuilder().build();
|
| - subscription = socket.transform(transformer).listen(null);
|
| - subscription
|
| - ..onData(handleClientCommand)
|
| - ..onError(handleClientCommandError)
|
| - ..onDone(handleClientCommandsDone);
|
| - completer = new Completer<Null>();
|
| - }
|
| -
|
| - void handleClientCommand(ClientCommand command) {
|
| - if (command.code == ClientCommandCode.Arguments) {
|
| - // This intentionally throws if arguments are sent more than once.
|
| - argumentsCompleter.complete(command.data);
|
| - } else {
|
| - sendCommandToWorker(command);
|
| - }
|
| - }
|
| -
|
| - void sendCommandToWorker(ClientCommand command) {
|
| - // TODO(ahe): It is a bit weird that this method is on the client proxy.
|
| - // Ideally, this would be a method on WorkerConnection. However the client
|
| - // is created before the WorkerConnection which is not created until/if
|
| - // needed. The WorkerConnection will start listening to the client's
|
| - // commands when attaching, see WorkerConnection.attachClient.
|
| - controller.add(command);
|
| - }
|
| -
|
| - void handleClientCommandError(error, StackTrace trace) {
|
| - print(stringifyError(error, trace));
|
| - completer.completeError(error, trace);
|
| - // Cancel the subscription if an error occurred, this prevents
|
| - // [handleCommandsDone] from being called and attempt to complete
|
| - // [completer].
|
| - subscription.cancel();
|
| - }
|
| -
|
| - void handleClientCommandsDone() {
|
| - completer.complete();
|
| - }
|
| -
|
| - // Send a command back to the Fletch C++ client.
|
| - void sendCommandToClient(ClientCommand command) {
|
| - switch (command.code) {
|
| - case ClientCommandCode.Stdout:
|
| - commandSender.sendStdoutBytes(command.data);
|
| - break;
|
| -
|
| - case ClientCommandCode.Stderr:
|
| - commandSender.sendStderrBytes(command.data);
|
| - break;
|
| -
|
| - case ClientCommandCode.ExitCode:
|
| - commandSender.sendExitCode(command.data);
|
| - break;
|
| -
|
| - default:
|
| - throwInternalError("Unexpected command: $command");
|
| - }
|
| - }
|
| -
|
| - void endSession() {
|
| - socket.flush().then((_) {
|
| - socket.close();
|
| - });
|
| - }
|
| -
|
| - void printLineOnStderr(String line) {
|
| - commandSender.sendStderrBytes(UTF8.encode("$line\n"));
|
| - }
|
| -
|
| - void printLineOnStdout(String line) {
|
| - commandSender.sendStdoutBytes(UTF8.encode('$line\n'));
|
| - }
|
| -
|
| - void exit(int exitCode) {
|
| - if (exitCode == null) {
|
| - exitCode = COMPILER_EXITCODE_CRASH;
|
| - try {
|
| - throwInternalError("Internal error: exitCode is null");
|
| - } on InputError catch (error, stackTrace) {
|
| - // We can't afford to throw an error here as it will take down the
|
| - // entire process.
|
| - exitCode = reportErrorToClient(error, stackTrace);
|
| - }
|
| - }
|
| - commandSender.sendExitCode(exitCode);
|
| - endSession();
|
| - }
|
| -
|
| - AnalyzedSentence parseArguments(List<String> arguments) {
|
| - Options options = Options.parse(arguments);
|
| - Sentence sentence =
|
| - parseSentence(options.nonOptionArguments, includesProgramName: true);
|
| - // [programName] is the canonicalized absolute path to the fletch
|
| - // executable (the C++ program).
|
| - String programName = sentence.programName;
|
| - String fletchVm = "$programName-vm";
|
| - this.sentence = analyzeSentence(sentence, options);
|
| - this.fletchVm = fletchVm;
|
| - return this.sentence;
|
| - }
|
| -
|
| - int reportErrorToClient(InputError error, StackTrace stackTrace) {
|
| - bool isInternalError = error.kind == DiagnosticKind.internalError;
|
| - if (isInternalError && !crashReportRequested) {
|
| - printLineOnStderr(requestBugReportOnOtherCrashMessage);
|
| - crashReportRequested = true;
|
| - }
|
| - printLineOnStderr(error.asDiagnostic().formatMessage());
|
| - if (isInternalError) {
|
| - printLineOnStderr('$stackTrace');
|
| - return COMPILER_EXITCODE_CRASH;
|
| - } else {
|
| - return DART_VM_EXITCODE_COMPILE_TIME_ERROR;
|
| - }
|
| - }
|
| -}
|
| -
|
| -/// The WorkerConnection represents a worker isolate in the hub (main isolate).
|
| -/// Ie. it is the hub's object for communicating with a worker isolate.
|
| -class WorkerConnection {
|
| - /// The worker isolate.
|
| - final ManagedIsolate isolate;
|
| -
|
| - /// A port used to send commands to the worker isolate.
|
| - SendPort sendPort;
|
| -
|
| - /// A port used to read commands from the worker isolate.
|
| - ReceivePort receivePort;
|
| -
|
| - /// workerCommands is an iterator over all the commands coming from the
|
| - /// worker isolate. These are typically the outbound messages destined for
|
| - /// the Fletch C++ client.
|
| - /// It iterates over the data coming on the receivePort.
|
| - StreamIterator<ClientCommand> workerCommands;
|
| -
|
| - /// When true, the worker can be shutdown by sending it a
|
| - /// ClientCommandCode.Signal command. Otherwise, it must be killed.
|
| - bool eventLoopStarted = false;
|
| -
|
| - /// Subscription for errors from [isolate].
|
| - StreamSubscription errorSubscription;
|
| -
|
| - bool crashReportRequested = false;
|
| -
|
| - WorkerConnection(this.isolate);
|
| -
|
| - /// Begin a session with the worker isolate.
|
| - Future<Null> beginSession() async {
|
| - errorSubscription = isolate.errors.listen(null);
|
| - errorSubscription.pause();
|
| - receivePort = isolate.beginSession();
|
| - // Setup the workerCommands iterator using a stream converting the
|
| - // incoming data to [ClientCommand]s.
|
| - Stream<ClientCommand> workerCommandStream = receivePort.map(
|
| - (message) => new ClientCommand(
|
| - ClientCommandCode.values[message[0]], message[1]));
|
| - workerCommands = new StreamIterator<ClientCommand>(workerCommandStream);
|
| - if (!await workerCommands.moveNext()) {
|
| - // The worker must have been killed, or died in some other way.
|
| - // TODO(ahe): Add this assertion: assert(isolate.wasKilled);
|
| - endSession();
|
| - return;
|
| - }
|
| - ClientCommand command = workerCommands.current;
|
| - assert(command.code == ClientCommandCode.SendPort);
|
| - assert(command.data != null);
|
| - sendPort = command.data;
|
| - }
|
| -
|
| - /// Attach to a fletch C++ client and forward commands to the worker isolate,
|
| - /// and vice versa. The returned future normally completes when the worker
|
| - /// isolate sends ClientCommandCode.ClosePort, or if the isolate is killed due
|
| - /// to ClientCommandCode.Signal arriving through client.commands.
|
| - Future<int> attachClient(
|
| - ClientConnection clientConnection,
|
| - UserSession userSession) async {
|
| -
|
| - // Method for handling commands coming from the client. The commands are
|
| - // typically forwarded to the worker isolate.
|
| - handleCommandsFromClient(ClientCommand command) {
|
| - if (command.code == ClientCommandCode.Signal && !eventLoopStarted) {
|
| - if (userSession != null) {
|
| - userSession.kill(clientConnection.printLineOnStderr);
|
| - } else {
|
| - isolate.kill();
|
| - }
|
| - receivePort.close();
|
| - } else {
|
| - sendPort.send([command.code.index, command.data]);
|
| - }
|
| - }
|
| -
|
| - // Method for handling commands coming back from the worker isolate.
|
| - // It typically forwards them to the Fletch C++ client via the
|
| - // clientConnection.
|
| - Future<int> handleCommandsFromWorker(
|
| - ClientConnection clientConnection) async {
|
| - int exitCode = COMPILER_EXITCODE_CRASH;
|
| - while (await workerCommands.moveNext()) {
|
| - ClientCommand command = workerCommands.current;
|
| - switch (command.code) {
|
| - case ClientCommandCode.ClosePort:
|
| - receivePort.close();
|
| - break;
|
| -
|
| - case ClientCommandCode.EventLoopStarted:
|
| - eventLoopStarted = true;
|
| - break;
|
| -
|
| - case ClientCommandCode.ExitCode:
|
| - exitCode = command.data;
|
| - break;
|
| -
|
| - default:
|
| - clientConnection.sendCommandToClient(command);
|
| - break;
|
| - }
|
| - }
|
| - return exitCode;
|
| - }
|
| -
|
| - eventLoopStarted = false;
|
| - crashReportRequested = false;
|
| - errorSubscription.onData((errorList) {
|
| - String error = errorList[0];
|
| - String stackTrace = errorList[1];
|
| - if (!crashReportRequested) {
|
| - clientConnection.printLineOnStderr(requestBugReportOnOtherCrashMessage);
|
| - crashReportRequested = true;
|
| - }
|
| - clientConnection.printLineOnStderr(error);
|
| - if (stackTrace != null) {
|
| - clientConnection.printLineOnStderr(stackTrace);
|
| - }
|
| - if (userSession != null) {
|
| - userSession.kill(clientConnection.printLineOnStderr);
|
| - } else {
|
| - isolate.kill();
|
| - }
|
| - receivePort.close();
|
| - });
|
| - errorSubscription.resume();
|
| -
|
| - // Start listening for commands coming from the Fletch C++ client (via
|
| - // clientConnection).
|
| - // TODO(ahe): Add onDone event handler to detach the client.
|
| - clientConnection.commands.listen(handleCommandsFromClient);
|
| -
|
| - // Start processing commands coming from the worker.
|
| - int exitCode = await handleCommandsFromWorker(clientConnection);
|
| -
|
| - errorSubscription.pause();
|
| - return exitCode;
|
| - }
|
| -
|
| - void endSession() {
|
| - receivePort.close();
|
| - isolate.endSession();
|
| - }
|
| -
|
| - Future<Null> detachClient() async {
|
| - if (isolate.wasKilled) {
|
| - // Setting these to null will ensure that [attachClient] causes a crash
|
| - // if called after isolate was killed.
|
| - errorSubscription = null;
|
| - receivePort = null;
|
| - workerCommands = null;
|
| - sendPort = null;
|
| -
|
| - // TODO(ahe): The session is dead. Tell the user about this.
|
| - return null;
|
| - }
|
| - // TODO(ahe): Perform the reverse of attachClient here.
|
| - await beginSession();
|
| - }
|
| -
|
| - Future<int> performTask(
|
| - SharedTask task,
|
| - ClientConnection clientConnection,
|
| - {
|
| - UserSession userSession,
|
| - /// End this session and return this isolate to the pool.
|
| - bool endSession: false}) async {
|
| - ClientLogger log = clientConnection.log;
|
| -
|
| - clientConnection.done.catchError((error, StackTrace stackTrace) {
|
| - log.error(error, stackTrace);
|
| - }).then((_) {
|
| - log.done();
|
| - });
|
| -
|
| - // Indirectly send the task to be performed to the worker isolate via the
|
| - // clientConnection.
|
| - clientConnection.sendCommandToWorker(
|
| - new ClientCommand(ClientCommandCode.PerformTask, task));
|
| -
|
| - // Forward commands between the C++ fletch client [clientConnection], and the
|
| - // worker isolate `this`. Also, Intercept the signal command and
|
| - // potentially kill the isolate (the isolate needs to tell if it is
|
| - // interuptible or needs to be killed, an example of the latter is, if
|
| - // compiler is running).
|
| - int exitCode = await attachClient(clientConnection, userSession);
|
| - // The verb (which was performed in the worker) is done.
|
| - log.note("After attachClient (exitCode = $exitCode)");
|
| -
|
| - if (endSession) {
|
| - // Return the isolate to the pool *before* shutting down the client. This
|
| - // ensures that the next client will be able to reuse the isolate instead
|
| - // of spawning a new.
|
| - this.endSession();
|
| - } else {
|
| - await detachClient();
|
| - }
|
| -
|
| - return exitCode;
|
| - }
|
| -}
|
| -
|
| -class ManagedIsolate {
|
| - final IsolatePool pool;
|
| - final Isolate isolate;
|
| - final SendPort port;
|
| - final Stream errors;
|
| - final ReceivePort exitPort;
|
| - final ReceivePort errorPort;
|
| - bool wasKilled = false;
|
| -
|
| - ManagedIsolate(
|
| - this.pool, this.isolate, this.port, this.errors,
|
| - this.exitPort, this.errorPort);
|
| -
|
| - ReceivePort beginSession() {
|
| - ReceivePort receivePort = new ReceivePort();
|
| - port.send(receivePort.sendPort);
|
| - return receivePort;
|
| - }
|
| -
|
| - void endSession() {
|
| - if (!wasKilled) {
|
| - pool.idleIsolates.addLast(this);
|
| - }
|
| - }
|
| -
|
| - void kill() {
|
| - wasKilled = true;
|
| - isolate.kill(priority: Isolate.IMMEDIATE);
|
| - isolate.removeOnExitListener(exitPort.sendPort);
|
| - isolate.removeErrorListener(errorPort.sendPort);
|
| - exitPort.close();
|
| - errorPort.close();
|
| - }
|
| -}
|
| -
|
| -class IsolatePool {
|
| - // Queue of idle isolates. When an isolate becomes idle, it is added at the
|
| - // end.
|
| - final Queue<ManagedIsolate> idleIsolates = new Queue<ManagedIsolate>();
|
| - final Function isolateEntryPoint;
|
| -
|
| - IsolatePool(this.isolateEntryPoint);
|
| -
|
| - Future<ManagedIsolate> getIsolate({bool exitOnError: true}) async {
|
| - if (idleIsolates.isEmpty) {
|
| - return await spawnIsolate(exitOnError: exitOnError);
|
| - } else {
|
| - return idleIsolates.removeFirst();
|
| - }
|
| - }
|
| -
|
| - Future<ManagedIsolate> spawnIsolate({bool exitOnError: true}) async {
|
| - StreamController errorController = new StreamController.broadcast();
|
| - ReceivePort receivePort = new ReceivePort();
|
| - Isolate isolate = await Isolate.spawn(
|
| - isolateEntryPoint, receivePort.sendPort, paused: true);
|
| - isolate.setErrorsFatal(true);
|
| - ReceivePort errorPort = new ReceivePort();
|
| - ManagedIsolate managedIsolate;
|
| - isolate.addErrorListener(errorPort.sendPort);
|
| - errorPort.listen((errorList) {
|
| - if (exitOnError) {
|
| - String error = errorList[0];
|
| - String stackTrace = errorList[1];
|
| - io.stderr.writeln(error);
|
| - if (stackTrace != null) {
|
| - io.stderr.writeln(stackTrace);
|
| - }
|
| - exit(COMPILER_EXITCODE_CRASH);
|
| - } else {
|
| - managedIsolate.wasKilled = true;
|
| - errorController.add(errorList);
|
| - }
|
| - });
|
| - ReceivePort exitPort = new ReceivePort();
|
| - isolate.addOnExitListener(exitPort.sendPort);
|
| - exitPort.listen((_) {
|
| - isolate.removeErrorListener(errorPort.sendPort);
|
| - isolate.removeOnExitListener(exitPort.sendPort);
|
| - errorPort.close();
|
| - exitPort.close();
|
| - idleIsolates.remove(managedIsolate);
|
| - });
|
| - await acknowledgeControlMessages(isolate, resume: isolate.pauseCapability);
|
| - StreamIterator iterator = new StreamIterator(receivePort);
|
| - bool hasElement = await iterator.moveNext();
|
| - if (!hasElement) {
|
| - throwInternalError("No port received from isolate");
|
| - }
|
| - SendPort port = iterator.current;
|
| - receivePort.close();
|
| - managedIsolate =
|
| - new ManagedIsolate(
|
| - this, isolate, port, errorController.stream, exitPort, errorPort);
|
| -
|
| - return managedIsolate;
|
| - }
|
| -
|
| - void shutdown() {
|
| - while (idleIsolates.isNotEmpty) {
|
| - idleIsolates.removeFirst().kill();
|
| - }
|
| - }
|
| -}
|
| -
|
| -class ClientLogger {
|
| - static int clientLoggersAllocated = 0;
|
| -
|
| - static Set<ClientLogger> pendingClients = new Set<ClientLogger>();
|
| -
|
| - static Set<ClientLogger> erroneousClients = new Set<ClientLogger>();
|
| -
|
| - static ClientLogger allocate() {
|
| - ClientLogger clientLogger = new ClientLogger(clientLoggersAllocated++);
|
| - pendingClients.add(clientLogger);
|
| - return clientLogger;
|
| - }
|
| -
|
| - final int id;
|
| -
|
| - final List<String> notes = <String>[];
|
| -
|
| - List<String> arguments = <String>[];
|
| -
|
| - ClientLogger(this.id);
|
| -
|
| - void note(object) {
|
| - String note = "$object";
|
| - notes.add(note);
|
| - printToConsole("$id: $note");
|
| - }
|
| -
|
| - void gotArguments(List<String> arguments) {
|
| - this.arguments = arguments;
|
| - note("Got arguments: ${arguments.join(' ')}.");
|
| - }
|
| -
|
| - void done() {
|
| - pendingClients.remove(this);
|
| - note("Client done ($pendingClients).");
|
| - }
|
| -
|
| - void error(error, StackTrace stackTrace) {
|
| - // TODO(ahe): Modify quit verb to report these errors.
|
| - erroneousClients.add(this);
|
| - note("Crash (${arguments.join(' ')}).\n"
|
| - "${stringifyError(error, stackTrace)}");
|
| - }
|
| -
|
| - String toString() => "$id";
|
| -}
|
|
|