Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(135)

Side by Side Diff: pkg/fletch_agent/bin/agent.dart

Issue 1659163007: Rename fletch -> dartino (Closed) Base URL: https://github.com/dartino/sdk.git@master
Patch Set: address comments Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « pkg/fletch/pubspec.yaml ('k') | pkg/fletch_agent/lib/agent_connection.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright (c) 2015, the Dartino project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE.md file.
4
5 library fletch_agent.agent;
6
7 import 'dart:convert' show UTF8;
8 import 'dart:fletch';
9 import 'dart:fletch.ffi';
10 import 'dart:fletch.os' as os;
11 import 'dart:typed_data';
12
13 import 'package:ffi/ffi.dart';
14 import 'package:file/file.dart';
15 import 'package:fletch/fletch.dart' as fletch;
16 import 'package:os/os.dart' show sys;
17 import 'package:socket/socket.dart';
18
19 import '../lib/messages.dart';
20
21 class Logger {
22 final String _prefix;
23 final String _path;
24 final bool _logToStdout;
25
26 factory Logger(String prefix, String logPath, {stdout: true}) {
27 return new Logger._(prefix, logPath, stdout);
28 }
29
30 const Logger._(this._prefix, this._path, this._logToStdout);
31
32 void info(String msg) => _write('$_prefix INFO: $msg');
33 void warn(String msg) => _write('$_prefix WARNING: $msg');
34 void error(String msg) => _write('$_prefix ERROR: $msg');
35
36 void _write(String msg) {
37 msg = '${new DateTime.now().toString()} $msg';
38 if (_logToStdout) {
39 print(msg);
40 }
41 File log;
42 try {
43 log = new File.open(_path, mode: File.APPEND);
44 var encoded = UTF8.encode('$msg\n');
45 var data = new Uint8List.fromList(encoded);
46 log.write(data.buffer);
47 } finally {
48 if (log != null) log.close();
49 }
50 }
51 }
52
53 class AgentContext {
54 static final ForeignFunction _getenv = ForeignLibrary.main.lookup('getenv');
55
56 static String _getEnv(String varName) {
57 ForeignPointer ptr;
58 var arg;
59 try {
60 arg = new ForeignMemory.fromStringAsUTF8(varName);
61 ptr = _getenv.pcall$1(arg);
62 } finally {
63 arg.free();
64 }
65 if (ptr.address == 0) return null;
66 return cStringToString(ptr);
67 }
68
69 // Agent specific info.
70 final String ip;
71 final int port;
72 final String pidFile;
73 final Logger logger;
74 final bool applyUpgrade;
75
76 // Fletch-vm path and args.
77 final String vmBinPath;
78 final String vmLogDir;
79 final String tmpDir;
80
81 factory AgentContext() {
82 String ip = _getEnv('AGENT_IP');
83 if (ip == null) {
84 ip = '0.0.0.0';
85 }
86 int port;
87 try {
88 String portStr = _getEnv('AGENT_PORT');
89 port = int.parse(portStr);
90 } catch (_) {
91 port = AGENT_DEFAULT_PORT; // default
92 }
93 String logFile = _getEnv('AGENT_LOG_FILE');
94 if (logFile == null) {
95 print('Agent requires a valid log file. Please specify file path in '
96 'the AGENT_LOG_FILE environment variable.');
97 Process.exit();
98 }
99 var logger = new Logger('Agent', logFile);
100 String pidFile = _getEnv('AGENT_PID_FILE');
101 if (pidFile == null) {
102 logger.error('Agent requires a valid pid file. Please specify file path '
103 'in the AGENT_PID_FILE environment variable.');
104 Process.exit();
105 }
106 String vmBinPath = _getEnv('FLETCH_VM');
107 String vmLogDir = _getEnv('VM_LOG_DIR');
108 String tmpDir = _getEnv('TMPDIR');
109 if (tmpDir == null) tmpDir = '/tmp';
110
111 // If the below ENV variable is set the agent will just store the agent
112 // debian package but not apply it.
113 bool applyUpgrade = _getEnv('AGENT_UPGRADE_DRY_RUN') == null;
114
115 logger.info('Agent log file: $logFile');
116 logger.info('Agent pid file: $pidFile');
117 logger.info('Vm path: $vmBinPath');
118 logger.info('Log path: $vmLogDir');
119
120 // Make sure we have a fletch-vm binary we can use for launching a vm.
121 if (!File.existsAsFile(vmBinPath)) {
122 logger.error('Cannot find fletch vm at path: $vmBinPath');
123 Process.exit();
124 }
125 // Make sure we have a valid log directory.
126 if (!File.existsAsFile(vmLogDir)) {
127 logger.error('Cannot find log directory: $vmLogDir');
128 Process.exit();
129 }
130 return new AgentContext._(
131 ip, port, pidFile, logger, vmBinPath, vmLogDir, tmpDir, applyUpgrade);
132 }
133
134 const AgentContext._(
135 this.ip, this.port, this.pidFile, this.logger, this.vmBinPath,
136 this.vmLogDir, this.tmpDir, this.applyUpgrade);
137 }
138
139 class Agent {
140 final AgentContext _context;
141
142 Agent(this._context);
143
144 void start() {
145 var ip = _context.ip;
146 var port = _context.port;
147 _context.logger.info('starting server on $ip:$port');
148 var socket = new ServerSocket(ip, port);
149 // We have to make a final reference to the context to not have the
150 // containing instance passed into the closure given to spawnAccept.
151 final detachedContext = _context;
152 while (true) {
153 socket.spawnAccept((Socket s) => _handleCommand(s, detachedContext));
154 }
155 // We run until killed.
156 }
157
158 static void _handleCommand(Socket socket, AgentContext context) {
159 try {
160 var handler = new CommandHandler(socket, context);
161 handler.run();
162 } catch (error) {
163 context.logger.warn('Caught error: $error. Closing socket');
164 socket.close();
165 }
166 }
167 }
168
169 class CommandHandler {
170 static const int SIGHUB = 1;
171 static const int SIGINT = 2;
172 static const int SIGQUIT = 3;
173 static const int SIGKILL = 9;
174 static const int SIGTERM = 15;
175 static final ForeignFunction _kill = ForeignLibrary.main.lookup('kill');
176 static final ForeignFunction _unlink = ForeignLibrary.main.lookup('unlink');
177
178 final Socket _socket;
179 final AgentContext _context;
180 RequestHeader _requestHeader;
181
182 factory CommandHandler(Socket socket, AgentContext context) {
183 var bytes = socket.read(RequestHeader.HEADER_SIZE);
184 if (bytes == null) {
185 throw 'Connection closed by peer';
186 } else if (bytes.lengthInBytes < RequestHeader.HEADER_SIZE) {
187 throw 'Insufficient bytes ($bytes.lengthInBytes) received in request';
188 }
189 var header = new RequestHeader.fromBuffer(bytes);
190 return new CommandHandler._(socket, context, header);
191 }
192
193 CommandHandler._(this._socket, this._context, this._requestHeader);
194
195 void run() {
196 if (_requestHeader.version > AGENT_VERSION) {
197 _context.logger.warn('Received message with unsupported version '
198 '${_requestHeader.version} and command ${_requestHeader.command}');
199 _sendReply(
200 new ReplyHeader(_requestHeader.id, ReplyHeader.UNSUPPORTED_VERSION));
201 }
202 switch (_requestHeader.command) {
203 case RequestHeader.START_VM:
204 _startVm();
205 break;
206 case RequestHeader.STOP_VM:
207 _stopVm();
208 break;
209 case RequestHeader.LIST_VMS:
210 _listVms();
211 break;
212 case RequestHeader.UPGRADE_AGENT:
213 _upgradeAgent();
214 break;
215 case RequestHeader.FLETCH_VERSION:
216 _fletchVersion();
217 break;
218 case RequestHeader.SIGNAL_VM:
219 _signalVm();
220 break;
221 default:
222 _context.logger.warn('Unknown command: ${_requestHeader.command}.');
223 _sendReply(
224 new ReplyHeader(_requestHeader.id, ReplyHeader.UNKNOWN_COMMAND));
225 break;
226 }
227 }
228
229 void _sendReply(ReplyHeader reply) {
230 _socket.write(reply.toBuffer());
231 _socket.close();
232 }
233
234 void _startVm() {
235 int vmPid = 0;
236 var reply;
237 // Create a tmp file for reading the port the vm is listening on.
238 File portFile = new File.temporary("${_context.tmpDir}/vm-port-");
239 try {
240 List<String> args = ['--log-dir=${_context.vmLogDir}',
241 '--port-file=${portFile.path}', '--host=0.0.0.0'];
242 vmPid = os.NativeProcess.startDetached(_context.vmBinPath, args);
243 // Find out what port the vm is listening on.
244 _context.logger.info('Reading port from ${portFile.path} for vm $vmPid');
245 int port = _retrieveVmPort(portFile.path);
246 reply = new StartVmReply(
247 _requestHeader.id, ReplyHeader.SUCCESS, vmId: vmPid, vmPort: port);
248 _context.logger.info('Started fletch vm with pid $vmPid on port $port');
249 } catch (e) {
250 reply = new StartVmReply(_requestHeader.id, ReplyHeader.START_VM_FAILED);
251 // TODO(wibling): could extend the result with caught error string.
252 _context.logger.warn('Failed to start vm with error: $e');
253 if (vmPid > 0) {
254 // Kill the vm.
255 _kill.icall$2(vmPid, SIGTERM);
256 }
257 } finally {
258 File.delete(portFile.path);
259 }
260 _sendReply(reply);
261 }
262
263 int _retrieveVmPort(String portPath) {
264 // The fletch-vm will write the port it is listening on into the file
265 // specified by 'portPath' above. The agent waits for the file to be
266 // created (retries the File.open until it succeeds) and then reads the
267 // port from the file.
268 // To make sure we are reading a consistent value from the file, ie. the
269 // vm could have written a partial value at the time we read it, we continue
270 // reading the value from the file until we have read the same value from
271 // file in two consecutive reads.
272 // An alternative to the consecutive reading would be to use cooperative
273 // locking, but consecutive reading is not relying on the fletch-vm to
274 // behave.
275 // TODO(wibling): Look into passing a socket port to the fletch-vm and
276 // have it write the port to the socket. This allows the agent to just
277 // wait on the socket and wake up when it is ready.
278 int previousPort = -1;
279 for (int retries = 500; retries >= 0; --retries) {
280 int port = _tryReadPort(portPath, retries == 0);
281 // Check if we read the same port value twice in a row.
282 if (previousPort != -1 && previousPort == port) return port;
283 previousPort = port;
284 os.sleep(10);
285 }
286 throw 'Failed to read port from $portPath';
287 }
288
289 int _tryReadPort(String portPath, bool lastAttempt) {
290 File portFile;
291 var data;
292 try {
293 portFile = new File.open(portPath);
294 data = portFile.read(10);
295 } on FileException catch (_) {
296 if (lastAttempt) rethrow;
297 return -1;
298 } finally {
299 if (portFile != null) portFile.close();
300 }
301 try {
302 if (data.lengthInBytes > 0) {
303 var portString = UTF8.decode(data.asUint8List().toList());
304 return int.parse(portString);
305 }
306 } on FormatException catch (_) {
307 if (lastAttempt) rethrow;
308 }
309 // Retry if no data was read.
310 return -1;
311 }
312
313 void _stopVm() {
314 if (_requestHeader.payloadLength != 4) {
315 _sendReply(
316 new StopVmReply(_requestHeader.id, ReplyHeader.INVALID_PAYLOAD));
317 return;
318 }
319 var reply;
320 // Read in the vm id.
321 var pidBytes = _socket.read(4);
322 if (pidBytes == null) {
323 reply = new StopVmReply(_requestHeader.id, ReplyHeader.INVALID_PAYLOAD);
324 _context.logger.warn('Missing pid of the fletch vm to stop.');
325 } else {
326 int pid = readUint32(pidBytes, 0);
327 int err = _kill.icall$2(pid, SIGTERM);
328 if (err != 0) {
329 reply = new StopVmReply(_requestHeader.id, ReplyHeader.UNKNOWN_VM_ID);
330 _context.logger.warn(
331 'Failed to stop pid $pid with error: ${Foreign.errno}');
332 } else {
333 reply = new StopVmReply(_requestHeader.id, ReplyHeader.SUCCESS);
334 _context.logger.info('Stopped pid: $pid');
335 }
336 }
337 _sendReply(reply);
338 }
339
340 void _signalVm() {
341 if (_requestHeader.payloadLength != 8) {
342 _sendReply(
343 new SignalVmReply(_requestHeader.id, ReplyHeader.INVALID_PAYLOAD));
344 return;
345 }
346 var reply;
347 // Read in the vm id and the signal to send.
348 var pidBytes = _socket.read(8);
349 if (pidBytes == null) {
350 reply = new SignalVmReply(_requestHeader.id, ReplyHeader.INVALID_PAYLOAD);
351 _context.logger.warn('Missing pid of the fletch vm to signal.');
352 } else {
353 int pid = readUint32(pidBytes, 0);
354 int signal = readUint32(pidBytes, 4);
355 // Hack to make ctrl-c work for stopping spawned vms work on Raspbian
356 // wheezy. For some reason SIGINT doesn't work so we map it to SIGTERM as
357 // a workaround.
358 if (signal == SIGINT && sys.info().release.startsWith('3.18')) {
359 _context.logger.info('Remapping SIGINT to SIGTERM on Raspbian wheezy');
360 signal = SIGTERM;
361 }
362 int err = _kill.icall$2(pid, signal);
363 if (err != 0) {
364 reply = new SignalVmReply(_requestHeader.id, ReplyHeader.UNKNOWN_VM_ID);
365 _context.logger.warn('Failed to send signal $signal to pid $pid with '
366 'error: ${Foreign.errno}');
367 } else {
368 reply = new SignalVmReply(_requestHeader.id, ReplyHeader.SUCCESS);
369 _context.logger.info('Sent signal $signal to pid: $pid');
370 }
371 }
372 _sendReply(reply);
373 }
374
375 void _listVms() {
376 // TODO(wibling): implement this method. For now just hardcode some values.
377 _sendReply(
378 new ListVmsReply(_requestHeader.id, ReplyHeader.UNKNOWN_COMMAND));
379 }
380
381 void _upgradeAgent() {
382 int result;
383 ByteBuffer binary = _socket.read(_requestHeader.payloadLength);
384 if (binary == null) {
385 _context.logger.warn('Could not read fletch-agent package binary'
386 ' of length ${_requestHeader.payloadLength} bytes');
387 result = ReplyHeader.INVALID_PAYLOAD;
388 } else {
389 _context.logger.info('Read fletch-agent package binary'
390 ' of length ${binary.lengthInBytes} bytes.');
391 File file = new File.open(PACKAGE_FILE_NAME, mode: File.WRITE);
392 try {
393 file.write(binary);
394 } catch (e) {
395 _context.logger.warn('UpgradeAgent failed: $e');
396 _sendReply(new UpgradeAgentReply(_requestHeader.id,
397 ReplyHeader.UPGRADE_FAILED));
398 } finally {
399 file.close();
400 }
401 _context.logger.info('Package file written successfully.');
402 if (_context.applyUpgrade) {
403 int pid = os.NativeProcess.startDetached('/usr/bin/dpkg',
404 [// Force dpkg to overwrite configuration files installed by
405 // the agent.
406 '--force-confnew',
407 '--install',
408 PACKAGE_FILE_NAME]);
409 _context.logger.info('started package update (PID $pid)');
410 }
411 result = ReplyHeader.SUCCESS;
412 }
413 _context.logger.info('sending reply');
414 _sendReply(new UpgradeAgentReply(_requestHeader.id, result));
415 }
416
417 void _fletchVersion() {
418 String version = fletch.version();
419 _context.logger.info('Returning fletch version $version');
420 _sendReply(new FletchVersionReply(
421 _requestHeader.id, ReplyHeader.SUCCESS, version: version));
422 }
423 }
424
425 void main(List<String> arguments) {
426 // The agent context will initialize itself from the runtime environment.
427 var context = new AgentContext();
428
429 // Write the program's pid to the pid file if set.
430 _writePid(context.pidFile);
431
432 // Run fletch agent on given ip address and port.
433 var agent = new Agent(context);
434 agent.start();
435 }
436
437 void _writePid(String pidFilePath) {
438 final ForeignFunction _getpid = ForeignLibrary.main.lookup('getpid');
439
440 int pid = _getpid.icall$0();
441 List<int> encodedPid = UTF8.encode('$pid');
442 ByteBuffer buffer = new Uint8List.fromList(encodedPid).buffer;
443 var pidFile = new File.open(pidFilePath, mode: File.WRITE);
444 try {
445 pidFile.write(buffer);
446 } finally {
447 pidFile.close();
448 }
449 }
450
451 void printUsage() {
452 print('Usage:');
453 print('The Fletch agent supports the following flags');
454 print('');
455 print(' --port: specify the port on which to listen, default: '
456 '$AGENT_DEFAULT_PORT');
457 print(' --ip: specify the ip address on which to listen, default: 0.0.0.0');
458 print(' --vm: specify the path to the vm binary, default: '
459 '/opt/fletch/bin/fletch-vm.');
460 print('');
461 Process.exit();
462 }
OLDNEW
« no previous file with comments | « pkg/fletch/pubspec.yaml ('k') | pkg/fletch_agent/lib/agent_connection.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698