OLD | NEW |
| (Empty) |
1 // Copyright (c) 2015, the Dartino project authors. Please see the AUTHORS file | |
2 // for details. All rights reserved. Use of this source code is governed by a | |
3 // BSD-style license that can be found in the LICENSE.md file. | |
4 | |
5 library fletchc.worker_isolate; | |
6 | |
7 import 'dart:async' show | |
8 Completer, | |
9 EventSink, | |
10 Future, | |
11 Stream, | |
12 StreamController, | |
13 StreamIterator, | |
14 StreamSubscription, | |
15 StreamTransformer, | |
16 ZoneSpecification, | |
17 runZoned; | |
18 | |
19 import 'dart:isolate' show | |
20 ReceivePort, | |
21 SendPort; | |
22 | |
23 import '../hub/client_commands.dart' show | |
24 ClientCommand, | |
25 ClientCommandCode, | |
26 CommandSender; | |
27 | |
28 import '../diagnostic.dart' show | |
29 DiagnosticKind, | |
30 InputError, | |
31 throwInternalError; | |
32 | |
33 import '../hub/exit_codes.dart' show | |
34 COMPILER_EXITCODE_CRASH; | |
35 | |
36 // This class is used to send commands from the worker isolate back to the | |
37 // hub (main isolate). | |
38 // TODO(ahe): Send ClientCommands directly when they are canonicalized | |
39 // correctly, see issue 23244. | |
40 class HubCommandSender extends CommandSender { | |
41 final SendPort port; | |
42 | |
43 HubCommandSender(this.port); | |
44 | |
45 void sendExitCode(int exitCode) { | |
46 port.send([ClientCommandCode.ExitCode.index, exitCode]); | |
47 } | |
48 | |
49 void sendDataCommand(ClientCommandCode commandCode, List<int> data) { | |
50 port.send([commandCode.index, data]); | |
51 } | |
52 | |
53 void sendClose() { | |
54 port.send([ClientCommandCode.ClosePort.index, null]); | |
55 } | |
56 | |
57 void sendEventLoopStarted() { | |
58 port.send([ClientCommandCode.EventLoopStarted.index, null]); | |
59 } | |
60 } | |
61 | |
62 Future<Null> workerMain(SendPort port) async { | |
63 ReceivePort receivePort = new ReceivePort(); | |
64 port.send(receivePort.sendPort); | |
65 port = null; | |
66 StreamIterator iterator = new StreamIterator(receivePort); | |
67 while (await iterator.moveNext()) { | |
68 await beginSession(iterator.current); | |
69 } | |
70 } | |
71 | |
72 Future<Null> beginSession(SendPort port) { | |
73 ReceivePort receivePort = new ReceivePort(); | |
74 port.send([ClientCommandCode.SendPort.index, receivePort.sendPort]); | |
75 return handleClient(port, receivePort); | |
76 } | |
77 | |
78 Future<int> doInZone(void printLineOnStdout(line), Future<int> f()) { | |
79 ZoneSpecification specification = new ZoneSpecification( | |
80 print: (_1, _2, _3, String line) => printLineOnStdout(line)); | |
81 return runZoned(f, zoneSpecification: specification); | |
82 } | |
83 | |
84 Future<Null> handleClient(SendPort clientOutgoing, ReceivePort clientIncoming) { | |
85 WorkerSideTask task = | |
86 new WorkerSideTask(clientIncoming, new HubCommandSender(clientOutgoing)); | |
87 | |
88 return doInZone(task.printLineOnStdout, task.perform).then((int exitCode) { | |
89 task.endTask(exitCode); | |
90 }); | |
91 } | |
92 | |
93 /// Represents a task running in this worker isolate. | |
94 class WorkerSideTask { | |
95 final ReceivePort clientIncoming; | |
96 | |
97 final HubCommandSender commandSender; | |
98 | |
99 final StreamController<ClientCommand> filteredIncomingCommands = | |
100 new StreamController<ClientCommand>(); | |
101 | |
102 final Completer<int> taskCompleter = new Completer<int>(); | |
103 | |
104 List<String> receivedArguments; | |
105 | |
106 WorkerSideTask(this.clientIncoming, this.commandSender); | |
107 | |
108 void printLineOnStdout(String line) { | |
109 commandSender.sendStdout("$line\n"); | |
110 } | |
111 | |
112 Stream<ClientCommand> buildIncomingCommandStream() { | |
113 void handleData(List message, EventSink<ClientCommand> sink) { | |
114 int code = message[0]; | |
115 var data = message[1]; | |
116 sink.add(new ClientCommand(ClientCommandCode.values[code], data)); | |
117 } | |
118 StreamTransformer<List, ClientCommand> commandDecoder = | |
119 new StreamTransformer<List, ClientCommand>.fromHandlers( | |
120 handleData: handleData); | |
121 return clientIncoming.transform(commandDecoder); | |
122 } | |
123 | |
124 void handleIncomingCommand(ClientCommand command) { | |
125 if (command.code == ClientCommandCode.PerformTask) { | |
126 performTask(command.data).then(taskCompleter.complete); | |
127 } else { | |
128 filteredIncomingCommands.add(command); | |
129 } | |
130 } | |
131 | |
132 void handleError(error, StackTrace stackTrace) { | |
133 filteredIncomingCommands.addError(error, stackTrace); | |
134 } | |
135 | |
136 void handleDone() { | |
137 filteredIncomingCommands.close(); | |
138 } | |
139 | |
140 Future<int> performTask( | |
141 Future<int> task( | |
142 CommandSender commandSender, | |
143 StreamIterator<ClientCommand> commandIterator)) async { | |
144 StreamIterator<ClientCommand> commandIterator = | |
145 new StreamIterator<ClientCommand>(filteredIncomingCommands.stream); | |
146 | |
147 try { | |
148 return await task(commandSender, commandIterator); | |
149 } on InputError catch (error, stackTrace) { | |
150 // TODO(ahe): Send [error] instead. | |
151 commandSender.sendStderr("${error.asDiagnostic().formatMessage()}\n"); | |
152 if (error.kind == DiagnosticKind.internalError) { | |
153 commandSender.sendStderr("$stackTrace\n"); | |
154 return COMPILER_EXITCODE_CRASH; | |
155 } else { | |
156 return 1; | |
157 } | |
158 } | |
159 } | |
160 | |
161 Future<int> perform() { | |
162 StreamSubscription<ClientCommand> subscription = | |
163 buildIncomingCommandStream().listen(null); | |
164 subscription | |
165 ..onData(handleIncomingCommand) | |
166 ..onError(handleError) | |
167 ..onDone(handleDone); | |
168 return taskCompleter.future; | |
169 } | |
170 | |
171 void endTask(int exitCode) { | |
172 clientIncoming.close(); | |
173 commandSender.sendExitCode(exitCode); | |
174 commandSender.sendClose(); | |
175 } | |
176 } | |
OLD | NEW |