| OLD | NEW |
| (Empty) |
| 1 // Copyright (c) 2015, the Dartino project authors. Please see the AUTHORS file | |
| 2 // for details. All rights reserved. Use of this source code is governed by a | |
| 3 // BSD-style license that can be found in the LICENSE.md file. | |
| 4 | |
| 5 library fletch_agent.agent; | |
| 6 | |
| 7 import 'dart:convert' show UTF8; | |
| 8 import 'dart:fletch'; | |
| 9 import 'dart:fletch.ffi'; | |
| 10 import 'dart:fletch.os' as os; | |
| 11 import 'dart:typed_data'; | |
| 12 | |
| 13 import 'package:ffi/ffi.dart'; | |
| 14 import 'package:file/file.dart'; | |
| 15 import 'package:fletch/fletch.dart' as fletch; | |
| 16 import 'package:os/os.dart' show sys; | |
| 17 import 'package:socket/socket.dart'; | |
| 18 | |
| 19 import '../lib/messages.dart'; | |
| 20 | |
| 21 class Logger { | |
| 22 final String _prefix; | |
| 23 final String _path; | |
| 24 final bool _logToStdout; | |
| 25 | |
| 26 factory Logger(String prefix, String logPath, {stdout: true}) { | |
| 27 return new Logger._(prefix, logPath, stdout); | |
| 28 } | |
| 29 | |
| 30 const Logger._(this._prefix, this._path, this._logToStdout); | |
| 31 | |
| 32 void info(String msg) => _write('$_prefix INFO: $msg'); | |
| 33 void warn(String msg) => _write('$_prefix WARNING: $msg'); | |
| 34 void error(String msg) => _write('$_prefix ERROR: $msg'); | |
| 35 | |
| 36 void _write(String msg) { | |
| 37 msg = '${new DateTime.now().toString()} $msg'; | |
| 38 if (_logToStdout) { | |
| 39 print(msg); | |
| 40 } | |
| 41 File log; | |
| 42 try { | |
| 43 log = new File.open(_path, mode: File.APPEND); | |
| 44 var encoded = UTF8.encode('$msg\n'); | |
| 45 var data = new Uint8List.fromList(encoded); | |
| 46 log.write(data.buffer); | |
| 47 } finally { | |
| 48 if (log != null) log.close(); | |
| 49 } | |
| 50 } | |
| 51 } | |
| 52 | |
| 53 class AgentContext { | |
| 54 static final ForeignFunction _getenv = ForeignLibrary.main.lookup('getenv'); | |
| 55 | |
| 56 static String _getEnv(String varName) { | |
| 57 ForeignPointer ptr; | |
| 58 var arg; | |
| 59 try { | |
| 60 arg = new ForeignMemory.fromStringAsUTF8(varName); | |
| 61 ptr = _getenv.pcall$1(arg); | |
| 62 } finally { | |
| 63 arg.free(); | |
| 64 } | |
| 65 if (ptr.address == 0) return null; | |
| 66 return cStringToString(ptr); | |
| 67 } | |
| 68 | |
| 69 // Agent specific info. | |
| 70 final String ip; | |
| 71 final int port; | |
| 72 final String pidFile; | |
| 73 final Logger logger; | |
| 74 final bool applyUpgrade; | |
| 75 | |
| 76 // Fletch-vm path and args. | |
| 77 final String vmBinPath; | |
| 78 final String vmLogDir; | |
| 79 final String tmpDir; | |
| 80 | |
| 81 factory AgentContext() { | |
| 82 String ip = _getEnv('AGENT_IP'); | |
| 83 if (ip == null) { | |
| 84 ip = '0.0.0.0'; | |
| 85 } | |
| 86 int port; | |
| 87 try { | |
| 88 String portStr = _getEnv('AGENT_PORT'); | |
| 89 port = int.parse(portStr); | |
| 90 } catch (_) { | |
| 91 port = AGENT_DEFAULT_PORT; // default | |
| 92 } | |
| 93 String logFile = _getEnv('AGENT_LOG_FILE'); | |
| 94 if (logFile == null) { | |
| 95 print('Agent requires a valid log file. Please specify file path in ' | |
| 96 'the AGENT_LOG_FILE environment variable.'); | |
| 97 Process.exit(); | |
| 98 } | |
| 99 var logger = new Logger('Agent', logFile); | |
| 100 String pidFile = _getEnv('AGENT_PID_FILE'); | |
| 101 if (pidFile == null) { | |
| 102 logger.error('Agent requires a valid pid file. Please specify file path ' | |
| 103 'in the AGENT_PID_FILE environment variable.'); | |
| 104 Process.exit(); | |
| 105 } | |
| 106 String vmBinPath = _getEnv('FLETCH_VM'); | |
| 107 String vmLogDir = _getEnv('VM_LOG_DIR'); | |
| 108 String tmpDir = _getEnv('TMPDIR'); | |
| 109 if (tmpDir == null) tmpDir = '/tmp'; | |
| 110 | |
| 111 // If the below ENV variable is set the agent will just store the agent | |
| 112 // debian package but not apply it. | |
| 113 bool applyUpgrade = _getEnv('AGENT_UPGRADE_DRY_RUN') == null; | |
| 114 | |
| 115 logger.info('Agent log file: $logFile'); | |
| 116 logger.info('Agent pid file: $pidFile'); | |
| 117 logger.info('Vm path: $vmBinPath'); | |
| 118 logger.info('Log path: $vmLogDir'); | |
| 119 | |
| 120 // Make sure we have a fletch-vm binary we can use for launching a vm. | |
| 121 if (!File.existsAsFile(vmBinPath)) { | |
| 122 logger.error('Cannot find fletch vm at path: $vmBinPath'); | |
| 123 Process.exit(); | |
| 124 } | |
| 125 // Make sure we have a valid log directory. | |
| 126 if (!File.existsAsFile(vmLogDir)) { | |
| 127 logger.error('Cannot find log directory: $vmLogDir'); | |
| 128 Process.exit(); | |
| 129 } | |
| 130 return new AgentContext._( | |
| 131 ip, port, pidFile, logger, vmBinPath, vmLogDir, tmpDir, applyUpgrade); | |
| 132 } | |
| 133 | |
| 134 const AgentContext._( | |
| 135 this.ip, this.port, this.pidFile, this.logger, this.vmBinPath, | |
| 136 this.vmLogDir, this.tmpDir, this.applyUpgrade); | |
| 137 } | |
| 138 | |
| 139 class Agent { | |
| 140 final AgentContext _context; | |
| 141 | |
| 142 Agent(this._context); | |
| 143 | |
| 144 void start() { | |
| 145 var ip = _context.ip; | |
| 146 var port = _context.port; | |
| 147 _context.logger.info('starting server on $ip:$port'); | |
| 148 var socket = new ServerSocket(ip, port); | |
| 149 // We have to make a final reference to the context to not have the | |
| 150 // containing instance passed into the closure given to spawnAccept. | |
| 151 final detachedContext = _context; | |
| 152 while (true) { | |
| 153 socket.spawnAccept((Socket s) => _handleCommand(s, detachedContext)); | |
| 154 } | |
| 155 // We run until killed. | |
| 156 } | |
| 157 | |
| 158 static void _handleCommand(Socket socket, AgentContext context) { | |
| 159 try { | |
| 160 var handler = new CommandHandler(socket, context); | |
| 161 handler.run(); | |
| 162 } catch (error) { | |
| 163 context.logger.warn('Caught error: $error. Closing socket'); | |
| 164 socket.close(); | |
| 165 } | |
| 166 } | |
| 167 } | |
| 168 | |
| 169 class CommandHandler { | |
| 170 static const int SIGHUB = 1; | |
| 171 static const int SIGINT = 2; | |
| 172 static const int SIGQUIT = 3; | |
| 173 static const int SIGKILL = 9; | |
| 174 static const int SIGTERM = 15; | |
| 175 static final ForeignFunction _kill = ForeignLibrary.main.lookup('kill'); | |
| 176 static final ForeignFunction _unlink = ForeignLibrary.main.lookup('unlink'); | |
| 177 | |
| 178 final Socket _socket; | |
| 179 final AgentContext _context; | |
| 180 RequestHeader _requestHeader; | |
| 181 | |
| 182 factory CommandHandler(Socket socket, AgentContext context) { | |
| 183 var bytes = socket.read(RequestHeader.HEADER_SIZE); | |
| 184 if (bytes == null) { | |
| 185 throw 'Connection closed by peer'; | |
| 186 } else if (bytes.lengthInBytes < RequestHeader.HEADER_SIZE) { | |
| 187 throw 'Insufficient bytes ($bytes.lengthInBytes) received in request'; | |
| 188 } | |
| 189 var header = new RequestHeader.fromBuffer(bytes); | |
| 190 return new CommandHandler._(socket, context, header); | |
| 191 } | |
| 192 | |
| 193 CommandHandler._(this._socket, this._context, this._requestHeader); | |
| 194 | |
| 195 void run() { | |
| 196 if (_requestHeader.version > AGENT_VERSION) { | |
| 197 _context.logger.warn('Received message with unsupported version ' | |
| 198 '${_requestHeader.version} and command ${_requestHeader.command}'); | |
| 199 _sendReply( | |
| 200 new ReplyHeader(_requestHeader.id, ReplyHeader.UNSUPPORTED_VERSION)); | |
| 201 } | |
| 202 switch (_requestHeader.command) { | |
| 203 case RequestHeader.START_VM: | |
| 204 _startVm(); | |
| 205 break; | |
| 206 case RequestHeader.STOP_VM: | |
| 207 _stopVm(); | |
| 208 break; | |
| 209 case RequestHeader.LIST_VMS: | |
| 210 _listVms(); | |
| 211 break; | |
| 212 case RequestHeader.UPGRADE_AGENT: | |
| 213 _upgradeAgent(); | |
| 214 break; | |
| 215 case RequestHeader.FLETCH_VERSION: | |
| 216 _fletchVersion(); | |
| 217 break; | |
| 218 case RequestHeader.SIGNAL_VM: | |
| 219 _signalVm(); | |
| 220 break; | |
| 221 default: | |
| 222 _context.logger.warn('Unknown command: ${_requestHeader.command}.'); | |
| 223 _sendReply( | |
| 224 new ReplyHeader(_requestHeader.id, ReplyHeader.UNKNOWN_COMMAND)); | |
| 225 break; | |
| 226 } | |
| 227 } | |
| 228 | |
| 229 void _sendReply(ReplyHeader reply) { | |
| 230 _socket.write(reply.toBuffer()); | |
| 231 _socket.close(); | |
| 232 } | |
| 233 | |
| 234 void _startVm() { | |
| 235 int vmPid = 0; | |
| 236 var reply; | |
| 237 // Create a tmp file for reading the port the vm is listening on. | |
| 238 File portFile = new File.temporary("${_context.tmpDir}/vm-port-"); | |
| 239 try { | |
| 240 List<String> args = ['--log-dir=${_context.vmLogDir}', | |
| 241 '--port-file=${portFile.path}', '--host=0.0.0.0']; | |
| 242 vmPid = os.NativeProcess.startDetached(_context.vmBinPath, args); | |
| 243 // Find out what port the vm is listening on. | |
| 244 _context.logger.info('Reading port from ${portFile.path} for vm $vmPid'); | |
| 245 int port = _retrieveVmPort(portFile.path); | |
| 246 reply = new StartVmReply( | |
| 247 _requestHeader.id, ReplyHeader.SUCCESS, vmId: vmPid, vmPort: port); | |
| 248 _context.logger.info('Started fletch vm with pid $vmPid on port $port'); | |
| 249 } catch (e) { | |
| 250 reply = new StartVmReply(_requestHeader.id, ReplyHeader.START_VM_FAILED); | |
| 251 // TODO(wibling): could extend the result with caught error string. | |
| 252 _context.logger.warn('Failed to start vm with error: $e'); | |
| 253 if (vmPid > 0) { | |
| 254 // Kill the vm. | |
| 255 _kill.icall$2(vmPid, SIGTERM); | |
| 256 } | |
| 257 } finally { | |
| 258 File.delete(portFile.path); | |
| 259 } | |
| 260 _sendReply(reply); | |
| 261 } | |
| 262 | |
| 263 int _retrieveVmPort(String portPath) { | |
| 264 // The fletch-vm will write the port it is listening on into the file | |
| 265 // specified by 'portPath' above. The agent waits for the file to be | |
| 266 // created (retries the File.open until it succeeds) and then reads the | |
| 267 // port from the file. | |
| 268 // To make sure we are reading a consistent value from the file, ie. the | |
| 269 // vm could have written a partial value at the time we read it, we continue | |
| 270 // reading the value from the file until we have read the same value from | |
| 271 // file in two consecutive reads. | |
| 272 // An alternative to the consecutive reading would be to use cooperative | |
| 273 // locking, but consecutive reading is not relying on the fletch-vm to | |
| 274 // behave. | |
| 275 // TODO(wibling): Look into passing a socket port to the fletch-vm and | |
| 276 // have it write the port to the socket. This allows the agent to just | |
| 277 // wait on the socket and wake up when it is ready. | |
| 278 int previousPort = -1; | |
| 279 for (int retries = 500; retries >= 0; --retries) { | |
| 280 int port = _tryReadPort(portPath, retries == 0); | |
| 281 // Check if we read the same port value twice in a row. | |
| 282 if (previousPort != -1 && previousPort == port) return port; | |
| 283 previousPort = port; | |
| 284 os.sleep(10); | |
| 285 } | |
| 286 throw 'Failed to read port from $portPath'; | |
| 287 } | |
| 288 | |
| 289 int _tryReadPort(String portPath, bool lastAttempt) { | |
| 290 File portFile; | |
| 291 var data; | |
| 292 try { | |
| 293 portFile = new File.open(portPath); | |
| 294 data = portFile.read(10); | |
| 295 } on FileException catch (_) { | |
| 296 if (lastAttempt) rethrow; | |
| 297 return -1; | |
| 298 } finally { | |
| 299 if (portFile != null) portFile.close(); | |
| 300 } | |
| 301 try { | |
| 302 if (data.lengthInBytes > 0) { | |
| 303 var portString = UTF8.decode(data.asUint8List().toList()); | |
| 304 return int.parse(portString); | |
| 305 } | |
| 306 } on FormatException catch (_) { | |
| 307 if (lastAttempt) rethrow; | |
| 308 } | |
| 309 // Retry if no data was read. | |
| 310 return -1; | |
| 311 } | |
| 312 | |
| 313 void _stopVm() { | |
| 314 if (_requestHeader.payloadLength != 4) { | |
| 315 _sendReply( | |
| 316 new StopVmReply(_requestHeader.id, ReplyHeader.INVALID_PAYLOAD)); | |
| 317 return; | |
| 318 } | |
| 319 var reply; | |
| 320 // Read in the vm id. | |
| 321 var pidBytes = _socket.read(4); | |
| 322 if (pidBytes == null) { | |
| 323 reply = new StopVmReply(_requestHeader.id, ReplyHeader.INVALID_PAYLOAD); | |
| 324 _context.logger.warn('Missing pid of the fletch vm to stop.'); | |
| 325 } else { | |
| 326 int pid = readUint32(pidBytes, 0); | |
| 327 int err = _kill.icall$2(pid, SIGTERM); | |
| 328 if (err != 0) { | |
| 329 reply = new StopVmReply(_requestHeader.id, ReplyHeader.UNKNOWN_VM_ID); | |
| 330 _context.logger.warn( | |
| 331 'Failed to stop pid $pid with error: ${Foreign.errno}'); | |
| 332 } else { | |
| 333 reply = new StopVmReply(_requestHeader.id, ReplyHeader.SUCCESS); | |
| 334 _context.logger.info('Stopped pid: $pid'); | |
| 335 } | |
| 336 } | |
| 337 _sendReply(reply); | |
| 338 } | |
| 339 | |
| 340 void _signalVm() { | |
| 341 if (_requestHeader.payloadLength != 8) { | |
| 342 _sendReply( | |
| 343 new SignalVmReply(_requestHeader.id, ReplyHeader.INVALID_PAYLOAD)); | |
| 344 return; | |
| 345 } | |
| 346 var reply; | |
| 347 // Read in the vm id and the signal to send. | |
| 348 var pidBytes = _socket.read(8); | |
| 349 if (pidBytes == null) { | |
| 350 reply = new SignalVmReply(_requestHeader.id, ReplyHeader.INVALID_PAYLOAD); | |
| 351 _context.logger.warn('Missing pid of the fletch vm to signal.'); | |
| 352 } else { | |
| 353 int pid = readUint32(pidBytes, 0); | |
| 354 int signal = readUint32(pidBytes, 4); | |
| 355 // Hack to make ctrl-c work for stopping spawned vms work on Raspbian | |
| 356 // wheezy. For some reason SIGINT doesn't work so we map it to SIGTERM as | |
| 357 // a workaround. | |
| 358 if (signal == SIGINT && sys.info().release.startsWith('3.18')) { | |
| 359 _context.logger.info('Remapping SIGINT to SIGTERM on Raspbian wheezy'); | |
| 360 signal = SIGTERM; | |
| 361 } | |
| 362 int err = _kill.icall$2(pid, signal); | |
| 363 if (err != 0) { | |
| 364 reply = new SignalVmReply(_requestHeader.id, ReplyHeader.UNKNOWN_VM_ID); | |
| 365 _context.logger.warn('Failed to send signal $signal to pid $pid with ' | |
| 366 'error: ${Foreign.errno}'); | |
| 367 } else { | |
| 368 reply = new SignalVmReply(_requestHeader.id, ReplyHeader.SUCCESS); | |
| 369 _context.logger.info('Sent signal $signal to pid: $pid'); | |
| 370 } | |
| 371 } | |
| 372 _sendReply(reply); | |
| 373 } | |
| 374 | |
| 375 void _listVms() { | |
| 376 // TODO(wibling): implement this method. For now just hardcode some values. | |
| 377 _sendReply( | |
| 378 new ListVmsReply(_requestHeader.id, ReplyHeader.UNKNOWN_COMMAND)); | |
| 379 } | |
| 380 | |
| 381 void _upgradeAgent() { | |
| 382 int result; | |
| 383 ByteBuffer binary = _socket.read(_requestHeader.payloadLength); | |
| 384 if (binary == null) { | |
| 385 _context.logger.warn('Could not read fletch-agent package binary' | |
| 386 ' of length ${_requestHeader.payloadLength} bytes'); | |
| 387 result = ReplyHeader.INVALID_PAYLOAD; | |
| 388 } else { | |
| 389 _context.logger.info('Read fletch-agent package binary' | |
| 390 ' of length ${binary.lengthInBytes} bytes.'); | |
| 391 File file = new File.open(PACKAGE_FILE_NAME, mode: File.WRITE); | |
| 392 try { | |
| 393 file.write(binary); | |
| 394 } catch (e) { | |
| 395 _context.logger.warn('UpgradeAgent failed: $e'); | |
| 396 _sendReply(new UpgradeAgentReply(_requestHeader.id, | |
| 397 ReplyHeader.UPGRADE_FAILED)); | |
| 398 } finally { | |
| 399 file.close(); | |
| 400 } | |
| 401 _context.logger.info('Package file written successfully.'); | |
| 402 if (_context.applyUpgrade) { | |
| 403 int pid = os.NativeProcess.startDetached('/usr/bin/dpkg', | |
| 404 [// Force dpkg to overwrite configuration files installed by | |
| 405 // the agent. | |
| 406 '--force-confnew', | |
| 407 '--install', | |
| 408 PACKAGE_FILE_NAME]); | |
| 409 _context.logger.info('started package update (PID $pid)'); | |
| 410 } | |
| 411 result = ReplyHeader.SUCCESS; | |
| 412 } | |
| 413 _context.logger.info('sending reply'); | |
| 414 _sendReply(new UpgradeAgentReply(_requestHeader.id, result)); | |
| 415 } | |
| 416 | |
| 417 void _fletchVersion() { | |
| 418 String version = fletch.version(); | |
| 419 _context.logger.info('Returning fletch version $version'); | |
| 420 _sendReply(new FletchVersionReply( | |
| 421 _requestHeader.id, ReplyHeader.SUCCESS, version: version)); | |
| 422 } | |
| 423 } | |
| 424 | |
| 425 void main(List<String> arguments) { | |
| 426 // The agent context will initialize itself from the runtime environment. | |
| 427 var context = new AgentContext(); | |
| 428 | |
| 429 // Write the program's pid to the pid file if set. | |
| 430 _writePid(context.pidFile); | |
| 431 | |
| 432 // Run fletch agent on given ip address and port. | |
| 433 var agent = new Agent(context); | |
| 434 agent.start(); | |
| 435 } | |
| 436 | |
| 437 void _writePid(String pidFilePath) { | |
| 438 final ForeignFunction _getpid = ForeignLibrary.main.lookup('getpid'); | |
| 439 | |
| 440 int pid = _getpid.icall$0(); | |
| 441 List<int> encodedPid = UTF8.encode('$pid'); | |
| 442 ByteBuffer buffer = new Uint8List.fromList(encodedPid).buffer; | |
| 443 var pidFile = new File.open(pidFilePath, mode: File.WRITE); | |
| 444 try { | |
| 445 pidFile.write(buffer); | |
| 446 } finally { | |
| 447 pidFile.close(); | |
| 448 } | |
| 449 } | |
| 450 | |
| 451 void printUsage() { | |
| 452 print('Usage:'); | |
| 453 print('The Fletch agent supports the following flags'); | |
| 454 print(''); | |
| 455 print(' --port: specify the port on which to listen, default: ' | |
| 456 '$AGENT_DEFAULT_PORT'); | |
| 457 print(' --ip: specify the ip address on which to listen, default: 0.0.0.0'); | |
| 458 print(' --vm: specify the path to the vm binary, default: ' | |
| 459 '/opt/fletch/bin/fletch-vm.'); | |
| 460 print(''); | |
| 461 Process.exit(); | |
| 462 } | |
| OLD | NEW |