OLD | NEW |
| (Empty) |
1 // Copyright (c) 2015, the Dartino project authors. Please see the AUTHORS file | |
2 // for details. All rights reserved. Use of this source code is governed by a | |
3 // BSD-style license that can be found in the LICENSE.md file. | |
4 | |
5 library fletch_agent.agent; | |
6 | |
7 import 'dart:convert' show UTF8; | |
8 import 'dart:fletch'; | |
9 import 'dart:fletch.ffi'; | |
10 import 'dart:fletch.os' as os; | |
11 import 'dart:typed_data'; | |
12 | |
13 import 'package:ffi/ffi.dart'; | |
14 import 'package:file/file.dart'; | |
15 import 'package:fletch/fletch.dart' as fletch; | |
16 import 'package:os/os.dart' show sys; | |
17 import 'package:socket/socket.dart'; | |
18 | |
19 import '../lib/messages.dart'; | |
20 | |
21 class Logger { | |
22 final String _prefix; | |
23 final String _path; | |
24 final bool _logToStdout; | |
25 | |
26 factory Logger(String prefix, String logPath, {stdout: true}) { | |
27 return new Logger._(prefix, logPath, stdout); | |
28 } | |
29 | |
30 const Logger._(this._prefix, this._path, this._logToStdout); | |
31 | |
32 void info(String msg) => _write('$_prefix INFO: $msg'); | |
33 void warn(String msg) => _write('$_prefix WARNING: $msg'); | |
34 void error(String msg) => _write('$_prefix ERROR: $msg'); | |
35 | |
36 void _write(String msg) { | |
37 msg = '${new DateTime.now().toString()} $msg'; | |
38 if (_logToStdout) { | |
39 print(msg); | |
40 } | |
41 File log; | |
42 try { | |
43 log = new File.open(_path, mode: File.APPEND); | |
44 var encoded = UTF8.encode('$msg\n'); | |
45 var data = new Uint8List.fromList(encoded); | |
46 log.write(data.buffer); | |
47 } finally { | |
48 if (log != null) log.close(); | |
49 } | |
50 } | |
51 } | |
52 | |
53 class AgentContext { | |
54 static final ForeignFunction _getenv = ForeignLibrary.main.lookup('getenv'); | |
55 | |
56 static String _getEnv(String varName) { | |
57 ForeignPointer ptr; | |
58 var arg; | |
59 try { | |
60 arg = new ForeignMemory.fromStringAsUTF8(varName); | |
61 ptr = _getenv.pcall$1(arg); | |
62 } finally { | |
63 arg.free(); | |
64 } | |
65 if (ptr.address == 0) return null; | |
66 return cStringToString(ptr); | |
67 } | |
68 | |
69 // Agent specific info. | |
70 final String ip; | |
71 final int port; | |
72 final String pidFile; | |
73 final Logger logger; | |
74 final bool applyUpgrade; | |
75 | |
76 // Fletch-vm path and args. | |
77 final String vmBinPath; | |
78 final String vmLogDir; | |
79 final String tmpDir; | |
80 | |
81 factory AgentContext() { | |
82 String ip = _getEnv('AGENT_IP'); | |
83 if (ip == null) { | |
84 ip = '0.0.0.0'; | |
85 } | |
86 int port; | |
87 try { | |
88 String portStr = _getEnv('AGENT_PORT'); | |
89 port = int.parse(portStr); | |
90 } catch (_) { | |
91 port = AGENT_DEFAULT_PORT; // default | |
92 } | |
93 String logFile = _getEnv('AGENT_LOG_FILE'); | |
94 if (logFile == null) { | |
95 print('Agent requires a valid log file. Please specify file path in ' | |
96 'the AGENT_LOG_FILE environment variable.'); | |
97 Process.exit(); | |
98 } | |
99 var logger = new Logger('Agent', logFile); | |
100 String pidFile = _getEnv('AGENT_PID_FILE'); | |
101 if (pidFile == null) { | |
102 logger.error('Agent requires a valid pid file. Please specify file path ' | |
103 'in the AGENT_PID_FILE environment variable.'); | |
104 Process.exit(); | |
105 } | |
106 String vmBinPath = _getEnv('FLETCH_VM'); | |
107 String vmLogDir = _getEnv('VM_LOG_DIR'); | |
108 String tmpDir = _getEnv('TMPDIR'); | |
109 if (tmpDir == null) tmpDir = '/tmp'; | |
110 | |
111 // If the below ENV variable is set the agent will just store the agent | |
112 // debian package but not apply it. | |
113 bool applyUpgrade = _getEnv('AGENT_UPGRADE_DRY_RUN') == null; | |
114 | |
115 logger.info('Agent log file: $logFile'); | |
116 logger.info('Agent pid file: $pidFile'); | |
117 logger.info('Vm path: $vmBinPath'); | |
118 logger.info('Log path: $vmLogDir'); | |
119 | |
120 // Make sure we have a fletch-vm binary we can use for launching a vm. | |
121 if (!File.existsAsFile(vmBinPath)) { | |
122 logger.error('Cannot find fletch vm at path: $vmBinPath'); | |
123 Process.exit(); | |
124 } | |
125 // Make sure we have a valid log directory. | |
126 if (!File.existsAsFile(vmLogDir)) { | |
127 logger.error('Cannot find log directory: $vmLogDir'); | |
128 Process.exit(); | |
129 } | |
130 return new AgentContext._( | |
131 ip, port, pidFile, logger, vmBinPath, vmLogDir, tmpDir, applyUpgrade); | |
132 } | |
133 | |
134 const AgentContext._( | |
135 this.ip, this.port, this.pidFile, this.logger, this.vmBinPath, | |
136 this.vmLogDir, this.tmpDir, this.applyUpgrade); | |
137 } | |
138 | |
139 class Agent { | |
140 final AgentContext _context; | |
141 | |
142 Agent(this._context); | |
143 | |
144 void start() { | |
145 var ip = _context.ip; | |
146 var port = _context.port; | |
147 _context.logger.info('starting server on $ip:$port'); | |
148 var socket = new ServerSocket(ip, port); | |
149 // We have to make a final reference to the context to not have the | |
150 // containing instance passed into the closure given to spawnAccept. | |
151 final detachedContext = _context; | |
152 while (true) { | |
153 socket.spawnAccept((Socket s) => _handleCommand(s, detachedContext)); | |
154 } | |
155 // We run until killed. | |
156 } | |
157 | |
158 static void _handleCommand(Socket socket, AgentContext context) { | |
159 try { | |
160 var handler = new CommandHandler(socket, context); | |
161 handler.run(); | |
162 } catch (error) { | |
163 context.logger.warn('Caught error: $error. Closing socket'); | |
164 socket.close(); | |
165 } | |
166 } | |
167 } | |
168 | |
169 class CommandHandler { | |
170 static const int SIGHUB = 1; | |
171 static const int SIGINT = 2; | |
172 static const int SIGQUIT = 3; | |
173 static const int SIGKILL = 9; | |
174 static const int SIGTERM = 15; | |
175 static final ForeignFunction _kill = ForeignLibrary.main.lookup('kill'); | |
176 static final ForeignFunction _unlink = ForeignLibrary.main.lookup('unlink'); | |
177 | |
178 final Socket _socket; | |
179 final AgentContext _context; | |
180 RequestHeader _requestHeader; | |
181 | |
182 factory CommandHandler(Socket socket, AgentContext context) { | |
183 var bytes = socket.read(RequestHeader.HEADER_SIZE); | |
184 if (bytes == null) { | |
185 throw 'Connection closed by peer'; | |
186 } else if (bytes.lengthInBytes < RequestHeader.HEADER_SIZE) { | |
187 throw 'Insufficient bytes ($bytes.lengthInBytes) received in request'; | |
188 } | |
189 var header = new RequestHeader.fromBuffer(bytes); | |
190 return new CommandHandler._(socket, context, header); | |
191 } | |
192 | |
193 CommandHandler._(this._socket, this._context, this._requestHeader); | |
194 | |
195 void run() { | |
196 if (_requestHeader.version > AGENT_VERSION) { | |
197 _context.logger.warn('Received message with unsupported version ' | |
198 '${_requestHeader.version} and command ${_requestHeader.command}'); | |
199 _sendReply( | |
200 new ReplyHeader(_requestHeader.id, ReplyHeader.UNSUPPORTED_VERSION)); | |
201 } | |
202 switch (_requestHeader.command) { | |
203 case RequestHeader.START_VM: | |
204 _startVm(); | |
205 break; | |
206 case RequestHeader.STOP_VM: | |
207 _stopVm(); | |
208 break; | |
209 case RequestHeader.LIST_VMS: | |
210 _listVms(); | |
211 break; | |
212 case RequestHeader.UPGRADE_AGENT: | |
213 _upgradeAgent(); | |
214 break; | |
215 case RequestHeader.FLETCH_VERSION: | |
216 _fletchVersion(); | |
217 break; | |
218 case RequestHeader.SIGNAL_VM: | |
219 _signalVm(); | |
220 break; | |
221 default: | |
222 _context.logger.warn('Unknown command: ${_requestHeader.command}.'); | |
223 _sendReply( | |
224 new ReplyHeader(_requestHeader.id, ReplyHeader.UNKNOWN_COMMAND)); | |
225 break; | |
226 } | |
227 } | |
228 | |
229 void _sendReply(ReplyHeader reply) { | |
230 _socket.write(reply.toBuffer()); | |
231 _socket.close(); | |
232 } | |
233 | |
234 void _startVm() { | |
235 int vmPid = 0; | |
236 var reply; | |
237 // Create a tmp file for reading the port the vm is listening on. | |
238 File portFile = new File.temporary("${_context.tmpDir}/vm-port-"); | |
239 try { | |
240 List<String> args = ['--log-dir=${_context.vmLogDir}', | |
241 '--port-file=${portFile.path}', '--host=0.0.0.0']; | |
242 vmPid = os.NativeProcess.startDetached(_context.vmBinPath, args); | |
243 // Find out what port the vm is listening on. | |
244 _context.logger.info('Reading port from ${portFile.path} for vm $vmPid'); | |
245 int port = _retrieveVmPort(portFile.path); | |
246 reply = new StartVmReply( | |
247 _requestHeader.id, ReplyHeader.SUCCESS, vmId: vmPid, vmPort: port); | |
248 _context.logger.info('Started fletch vm with pid $vmPid on port $port'); | |
249 } catch (e) { | |
250 reply = new StartVmReply(_requestHeader.id, ReplyHeader.START_VM_FAILED); | |
251 // TODO(wibling): could extend the result with caught error string. | |
252 _context.logger.warn('Failed to start vm with error: $e'); | |
253 if (vmPid > 0) { | |
254 // Kill the vm. | |
255 _kill.icall$2(vmPid, SIGTERM); | |
256 } | |
257 } finally { | |
258 File.delete(portFile.path); | |
259 } | |
260 _sendReply(reply); | |
261 } | |
262 | |
263 int _retrieveVmPort(String portPath) { | |
264 // The fletch-vm will write the port it is listening on into the file | |
265 // specified by 'portPath' above. The agent waits for the file to be | |
266 // created (retries the File.open until it succeeds) and then reads the | |
267 // port from the file. | |
268 // To make sure we are reading a consistent value from the file, ie. the | |
269 // vm could have written a partial value at the time we read it, we continue | |
270 // reading the value from the file until we have read the same value from | |
271 // file in two consecutive reads. | |
272 // An alternative to the consecutive reading would be to use cooperative | |
273 // locking, but consecutive reading is not relying on the fletch-vm to | |
274 // behave. | |
275 // TODO(wibling): Look into passing a socket port to the fletch-vm and | |
276 // have it write the port to the socket. This allows the agent to just | |
277 // wait on the socket and wake up when it is ready. | |
278 int previousPort = -1; | |
279 for (int retries = 500; retries >= 0; --retries) { | |
280 int port = _tryReadPort(portPath, retries == 0); | |
281 // Check if we read the same port value twice in a row. | |
282 if (previousPort != -1 && previousPort == port) return port; | |
283 previousPort = port; | |
284 os.sleep(10); | |
285 } | |
286 throw 'Failed to read port from $portPath'; | |
287 } | |
288 | |
289 int _tryReadPort(String portPath, bool lastAttempt) { | |
290 File portFile; | |
291 var data; | |
292 try { | |
293 portFile = new File.open(portPath); | |
294 data = portFile.read(10); | |
295 } on FileException catch (_) { | |
296 if (lastAttempt) rethrow; | |
297 return -1; | |
298 } finally { | |
299 if (portFile != null) portFile.close(); | |
300 } | |
301 try { | |
302 if (data.lengthInBytes > 0) { | |
303 var portString = UTF8.decode(data.asUint8List().toList()); | |
304 return int.parse(portString); | |
305 } | |
306 } on FormatException catch (_) { | |
307 if (lastAttempt) rethrow; | |
308 } | |
309 // Retry if no data was read. | |
310 return -1; | |
311 } | |
312 | |
313 void _stopVm() { | |
314 if (_requestHeader.payloadLength != 4) { | |
315 _sendReply( | |
316 new StopVmReply(_requestHeader.id, ReplyHeader.INVALID_PAYLOAD)); | |
317 return; | |
318 } | |
319 var reply; | |
320 // Read in the vm id. | |
321 var pidBytes = _socket.read(4); | |
322 if (pidBytes == null) { | |
323 reply = new StopVmReply(_requestHeader.id, ReplyHeader.INVALID_PAYLOAD); | |
324 _context.logger.warn('Missing pid of the fletch vm to stop.'); | |
325 } else { | |
326 int pid = readUint32(pidBytes, 0); | |
327 int err = _kill.icall$2(pid, SIGTERM); | |
328 if (err != 0) { | |
329 reply = new StopVmReply(_requestHeader.id, ReplyHeader.UNKNOWN_VM_ID); | |
330 _context.logger.warn( | |
331 'Failed to stop pid $pid with error: ${Foreign.errno}'); | |
332 } else { | |
333 reply = new StopVmReply(_requestHeader.id, ReplyHeader.SUCCESS); | |
334 _context.logger.info('Stopped pid: $pid'); | |
335 } | |
336 } | |
337 _sendReply(reply); | |
338 } | |
339 | |
340 void _signalVm() { | |
341 if (_requestHeader.payloadLength != 8) { | |
342 _sendReply( | |
343 new SignalVmReply(_requestHeader.id, ReplyHeader.INVALID_PAYLOAD)); | |
344 return; | |
345 } | |
346 var reply; | |
347 // Read in the vm id and the signal to send. | |
348 var pidBytes = _socket.read(8); | |
349 if (pidBytes == null) { | |
350 reply = new SignalVmReply(_requestHeader.id, ReplyHeader.INVALID_PAYLOAD); | |
351 _context.logger.warn('Missing pid of the fletch vm to signal.'); | |
352 } else { | |
353 int pid = readUint32(pidBytes, 0); | |
354 int signal = readUint32(pidBytes, 4); | |
355 // Hack to make ctrl-c work for stopping spawned vms work on Raspbian | |
356 // wheezy. For some reason SIGINT doesn't work so we map it to SIGTERM as | |
357 // a workaround. | |
358 if (signal == SIGINT && sys.info().release.startsWith('3.18')) { | |
359 _context.logger.info('Remapping SIGINT to SIGTERM on Raspbian wheezy'); | |
360 signal = SIGTERM; | |
361 } | |
362 int err = _kill.icall$2(pid, signal); | |
363 if (err != 0) { | |
364 reply = new SignalVmReply(_requestHeader.id, ReplyHeader.UNKNOWN_VM_ID); | |
365 _context.logger.warn('Failed to send signal $signal to pid $pid with ' | |
366 'error: ${Foreign.errno}'); | |
367 } else { | |
368 reply = new SignalVmReply(_requestHeader.id, ReplyHeader.SUCCESS); | |
369 _context.logger.info('Sent signal $signal to pid: $pid'); | |
370 } | |
371 } | |
372 _sendReply(reply); | |
373 } | |
374 | |
375 void _listVms() { | |
376 // TODO(wibling): implement this method. For now just hardcode some values. | |
377 _sendReply( | |
378 new ListVmsReply(_requestHeader.id, ReplyHeader.UNKNOWN_COMMAND)); | |
379 } | |
380 | |
381 void _upgradeAgent() { | |
382 int result; | |
383 ByteBuffer binary = _socket.read(_requestHeader.payloadLength); | |
384 if (binary == null) { | |
385 _context.logger.warn('Could not read fletch-agent package binary' | |
386 ' of length ${_requestHeader.payloadLength} bytes'); | |
387 result = ReplyHeader.INVALID_PAYLOAD; | |
388 } else { | |
389 _context.logger.info('Read fletch-agent package binary' | |
390 ' of length ${binary.lengthInBytes} bytes.'); | |
391 File file = new File.open(PACKAGE_FILE_NAME, mode: File.WRITE); | |
392 try { | |
393 file.write(binary); | |
394 } catch (e) { | |
395 _context.logger.warn('UpgradeAgent failed: $e'); | |
396 _sendReply(new UpgradeAgentReply(_requestHeader.id, | |
397 ReplyHeader.UPGRADE_FAILED)); | |
398 } finally { | |
399 file.close(); | |
400 } | |
401 _context.logger.info('Package file written successfully.'); | |
402 if (_context.applyUpgrade) { | |
403 int pid = os.NativeProcess.startDetached('/usr/bin/dpkg', | |
404 [// Force dpkg to overwrite configuration files installed by | |
405 // the agent. | |
406 '--force-confnew', | |
407 '--install', | |
408 PACKAGE_FILE_NAME]); | |
409 _context.logger.info('started package update (PID $pid)'); | |
410 } | |
411 result = ReplyHeader.SUCCESS; | |
412 } | |
413 _context.logger.info('sending reply'); | |
414 _sendReply(new UpgradeAgentReply(_requestHeader.id, result)); | |
415 } | |
416 | |
417 void _fletchVersion() { | |
418 String version = fletch.version(); | |
419 _context.logger.info('Returning fletch version $version'); | |
420 _sendReply(new FletchVersionReply( | |
421 _requestHeader.id, ReplyHeader.SUCCESS, version: version)); | |
422 } | |
423 } | |
424 | |
425 void main(List<String> arguments) { | |
426 // The agent context will initialize itself from the runtime environment. | |
427 var context = new AgentContext(); | |
428 | |
429 // Write the program's pid to the pid file if set. | |
430 _writePid(context.pidFile); | |
431 | |
432 // Run fletch agent on given ip address and port. | |
433 var agent = new Agent(context); | |
434 agent.start(); | |
435 } | |
436 | |
437 void _writePid(String pidFilePath) { | |
438 final ForeignFunction _getpid = ForeignLibrary.main.lookup('getpid'); | |
439 | |
440 int pid = _getpid.icall$0(); | |
441 List<int> encodedPid = UTF8.encode('$pid'); | |
442 ByteBuffer buffer = new Uint8List.fromList(encodedPid).buffer; | |
443 var pidFile = new File.open(pidFilePath, mode: File.WRITE); | |
444 try { | |
445 pidFile.write(buffer); | |
446 } finally { | |
447 pidFile.close(); | |
448 } | |
449 } | |
450 | |
451 void printUsage() { | |
452 print('Usage:'); | |
453 print('The Fletch agent supports the following flags'); | |
454 print(''); | |
455 print(' --port: specify the port on which to listen, default: ' | |
456 '$AGENT_DEFAULT_PORT'); | |
457 print(' --ip: specify the ip address on which to listen, default: 0.0.0.0'); | |
458 print(' --vm: specify the path to the vm binary, default: ' | |
459 '/opt/fletch/bin/fletch-vm.'); | |
460 print(''); | |
461 Process.exit(); | |
462 } | |
OLD | NEW |