Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(152)

Side by Side Diff: utils/pub/io.dart

Issue 12316036: Merge IO v2 branch to bleeding edge (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Rebased to r18818 Created 7 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « utils/pub/http.dart ('k') | utils/pub/oauth2.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 /// Helper functionality to make working with IO easier. 5 /// Helper functionality to make working with IO easier.
6 library io; 6 library io;
7 7
8 import 'dart:async'; 8 import 'dart:async';
9 import 'dart:io'; 9 import 'dart:io';
10 import 'dart:isolate'; 10 import 'dart:isolate';
(...skipping 63 matching lines...) Expand 10 before | Expand all | Expand 10 after
74 ..closeSync(); 74 ..closeSync();
75 log.fine("Wrote text file $file."); 75 log.fine("Wrote text file $file.");
76 return file; 76 return file;
77 } 77 }
78 78
79 /// Writes [stream] to a new file at path [file]. Will replace any file already 79 /// Writes [stream] to a new file at path [file]. Will replace any file already
80 /// at that path. Completes when the file is done being written. 80 /// at that path. Completes when the file is done being written.
81 Future<String> createFileFromStream(Stream<List<int>> stream, String file) { 81 Future<String> createFileFromStream(Stream<List<int>> stream, String file) {
82 log.io("Creating $file from stream."); 82 log.io("Creating $file from stream.");
83 83
84 var outputStream = new File(file).openOutputStream(); 84 return stream.pipe(new File(file).openWrite()).then((_) {
85 return stream.pipe(wrapOutputStream(outputStream)).then((_) {
86 log.fine("Created $file from stream."); 85 log.fine("Created $file from stream.");
87 return file; 86 return file;
88 }); 87 });
89 } 88 }
90 89
91 /// Creates a directory [dir]. 90 /// Creates a directory [dir].
92 String createDir(String dir) { 91 String createDir(String dir) {
93 new Directory(dir).createSync(); 92 new Directory(dir).createSync();
94 return dir; 93 return dir;
95 } 94 }
(...skipping 55 matching lines...) Expand 10 before | Expand all | Expand 10 after
151 if (listedDirectories.contains(resolvedPath)) { 150 if (listedDirectories.contains(resolvedPath)) {
152 return new Future.immediate([]); 151 return new Future.immediate([]);
153 } 152 }
154 153
155 listedDirectories = new Set<String>.from(listedDirectories); 154 listedDirectories = new Set<String>.from(listedDirectories);
156 listedDirectories.add(resolvedPath); 155 listedDirectories.add(resolvedPath);
157 156
158 log.io("Listing directory $dir."); 157 log.io("Listing directory $dir.");
159 var lister = new Directory(dir).list(); 158 var lister = new Directory(dir).list();
160 159
161 lister.onDone = (done) {
162 // TODO(rnystrom): May need to sort here if it turns out onDir and onFile
163 // aren't guaranteed to be called in a certain order. So far, they seem to .
164 if (done) {
165 log.fine("Listed directory $dir:\n${contents.join('\n')}");
166 completer.complete(contents);
167 }
168 };
169
170 // TODO(nweiz): remove this when issue 4061 is fixed.
171 var stackTrace;
172 try {
173 throw "";
174 } catch (_, localStackTrace) {
175 stackTrace = localStackTrace;
176 }
177
178 var children = []; 160 var children = [];
179 lister.onError = (error) => completer.completeError(error, stackTrace); 161 lister.listen(
180 lister.onDir = (file) { 162 (entity) {
181 if (!includeHiddenFiles && path.basename(file).startsWith('.')) return; 163 if (entity is File) {
182 file = path.join(dir, path.basename(file)); 164 var file = entity.name;
183 contents.add(file); 165 if (!includeHiddenFiles && path.basename(file).startsWith('.')) {
184 // TODO(nweiz): don't manually recurse once issue 7358 is fixed. Note that 166 return;
185 // once we remove the manual recursion, we'll need to explicitly filter 167 }
186 // out files in hidden directories. 168 contents.add(path.join(dir, path.basename(file)));
187 if (recursive) { 169 } else if (entity is Directory) {
188 children.add(doList(file, listedDirectories)); 170 var file = entity.path;
189 } 171 if (!includeHiddenFiles && path.basename(file).startsWith('.')) {
190 }; 172 return;
191 173 }
192 lister.onFile = (file) { 174 file = path.join(dir, path.basename(file));
193 if (!includeHiddenFiles && path.basename(file).startsWith('.')) return; 175 contents.add(file);
194 contents.add(path.join(dir, path.basename(file))); 176 // TODO(nweiz): don't manually recurse once issue 7358 is fixed.
195 }; 177 // Note that once we remove the manual recursion, we'll need to
178 // explicitly filter out files in hidden directories.
179 if (recursive) {
180 children.add(doList(file, listedDirectories));
181 }
182 }
183 },
184 onDone: () {
185 // TODO(rnystrom): May need to sort here if it turns out
186 // onDir and onFile aren't guaranteed to be called in a
187 // certain order. So far, they seem to.
188 log.fine("Listed directory $dir:\n${contents.join('\n')}");
189 completer.complete(contents);
190 },
191 onError: (error) => completer.completeError(error, stackTrace));
196 192
197 return completer.future.then((contents) { 193 return completer.future.then((contents) {
198 return Future.wait(children).then((childContents) { 194 return Future.wait(children).then((childContents) {
199 contents.addAll(flatten(childContents)); 195 contents.addAll(flatten(childContents));
200 return contents; 196 return contents;
201 }); 197 });
202 }); 198 });
203 } 199 }
204 200
205 return doList(dir, new Set<String>()); 201 return doList(dir, new Set<String>());
(...skipping 151 matching lines...) Expand 10 before | Expand all | Expand 10 after
357 /// A sink that writes to standard output. Errors piped to this stream will be 353 /// A sink that writes to standard output. Errors piped to this stream will be
358 /// surfaced to the top-level error handler. 354 /// surfaced to the top-level error handler.
359 final StreamSink<List<int>> stdoutSink = _wrapStdio(stdout, "stdout"); 355 final StreamSink<List<int>> stdoutSink = _wrapStdio(stdout, "stdout");
360 356
361 /// A sink that writes to standard error. Errors piped to this stream will be 357 /// A sink that writes to standard error. Errors piped to this stream will be
362 /// surfaced to the top-level error handler. 358 /// surfaced to the top-level error handler.
363 final StreamSink<List<int>> stderrSink = _wrapStdio(stderr, "stderr"); 359 final StreamSink<List<int>> stderrSink = _wrapStdio(stderr, "stderr");
364 360
365 /// Wrap the standard output or error [stream] in a [StreamSink]. Any errors are 361 /// Wrap the standard output or error [stream] in a [StreamSink]. Any errors are
366 /// logged, and then the program is terminated. [name] is used for debugging. 362 /// logged, and then the program is terminated. [name] is used for debugging.
367 StreamSink<List<int>> _wrapStdio(OutputStream stream, String name) { 363 StreamSink<List<int>> _wrapStdio(IOSink sink, String name) {
368 var pair = consumerToSink(wrapOutputStream(stream)); 364 var pair = consumerToSink(sink);
369 pair.last.catchError((e) { 365 pair.last.catchError((e) {
370 // This log may or may not work, depending on how the stream failed. Not 366 // This log may or may not work, depending on how the stream failed. Not
371 // much we can do about that. 367 // much we can do about that.
372 log.error("Error writing to $name: $e"); 368 log.error("Error writing to $name: $e");
373 exit(exit_codes.IO); 369 exit(exit_codes.IO);
374 }); 370 });
375 return pair.first; 371 return pair.first;
376 } 372 }
377 373
378 /// A line-by-line stream of standard input. 374 /// A line-by-line stream of standard input.
379 final Stream<String> stdinLines = 375 final Stream<String> stdinLines = streamToLines(
380 streamToLines(wrapInputStream(stdin).toStringStream()); 376 new ByteStream(stdin).toStringStream());
381 377
382 /// Displays a message and reads a yes/no confirmation from the user. Returns 378 /// Displays a message and reads a yes/no confirmation from the user. Returns
383 /// a [Future] that completes to `true` if the user confirms or `false` if they 379 /// a [Future] that completes to `true` if the user confirms or `false` if they
384 /// do not. 380 /// do not.
385 /// 381 ///
386 /// This will automatically append " (y/n)?" to the message, so [message] 382 /// This will automatically append " (y/n)?" to the message, so [message]
387 /// should just be a fragment like, "Are you sure you want to proceed". 383 /// should just be a fragment like, "Are you sure you want to proceed".
388 Future<bool> confirm(String message) { 384 Future<bool> confirm(String message) {
389 log.fine('Showing confirm message: $message'); 385 log.fine('Showing confirm message: $message');
390 stdoutSink.add("$message (y/n)? ".charCodes); 386 stdoutSink.add("$message (y/n)? ".charCodes);
391 return streamFirst(stdinLines) 387 return streamFirst(stdinLines)
392 .then((line) => new RegExp(r"^[yY]").hasMatch(line)); 388 .then((line) => new RegExp(r"^[yY]").hasMatch(line));
393 } 389 }
394 390
395 /// Reads and discards all output from [inputStream]. Returns a [Future] that 391 /// Reads and discards all output from [stream]. Returns a [Future] that
396 /// completes when the stream is closed. 392 /// completes when the stream is closed.
397 Future drainInputStream(InputStream inputStream) { 393 Future drainStream(Stream stream) {
398 var completer = new Completer(); 394 return stream.reduce(null, (x, y) {});
399 if (inputStream.closed) {
400 completer.complete();
401 return completer.future;
402 }
403
404 inputStream.onClosed = () => completer.complete();
405 inputStream.onData = inputStream.read;
406 inputStream.onError = (error) => completer.completeError(error);
407 return completer.future;
408 }
409
410 /// Wraps [stream] in a single-subscription [Stream] that emits the same data.
411 ByteStream wrapInputStream(InputStream stream) {
412 var controller = new StreamController();
413 if (stream.closed) {
414 controller.close();
415 return new ByteStream(controller.stream);
416 }
417
418 stream.onClosed = controller.close;
419 stream.onData = () => controller.add(stream.read());
420 stream.onError = (e) => controller.signalError(new AsyncError(e));
421 return new ByteStream(controller.stream);
422 }
423
424 /// Wraps [stream] in a [StreamConsumer] so that [Stream]s can by piped into it
425 /// using [Stream.pipe]. Errors piped to the returned [StreamConsumer] will be
426 /// forwarded to the [Future] returned by [Stream.pipe].
427 StreamConsumer<List<int>, dynamic> wrapOutputStream(OutputStream stream) =>
428 new _OutputStreamConsumer(stream);
429
430 /// A [StreamConsumer] that pipes data into an [OutputStream].
431 class _OutputStreamConsumer implements StreamConsumer<List<int>, dynamic> {
432 final OutputStream _outputStream;
433
434 _OutputStreamConsumer(this._outputStream);
435
436 Future consume(Stream<List<int>> stream) {
437 // TODO(nweiz): we have to manually keep track of whether or not the
438 // completer has completed since the output stream could signal an error
439 // after close() has been called but before it has shut down internally. See
440 // the following TODO.
441 var completed = false;
442 var completer = new Completer();
443 stream.listen((data) {
444 // Writing empty data to a closed stream can cause errors.
445 if (data.isEmpty) return;
446
447 // TODO(nweiz): remove this try/catch when issue 7836 is fixed.
448 try {
449 _outputStream.write(data);
450 } catch (e, stack) {
451 if (!completed) completer.completeError(e, stack);
452 completed = true;
453 }
454 }, onError: (e) {
455 if (!completed) completer.completeError(e.error, e.stackTrace);
456 completed = true;
457 }, onDone: () => _outputStream.close());
458
459 _outputStream.onError = (e) {
460 if (!completed) completer.completeError(e);
461 completed = true;
462 };
463
464 _outputStream.onClosed = () {
465 if (!completed) completer.complete();
466 completed = true;
467 };
468
469 return completer.future;
470 }
471 } 395 }
472 396
473 /// Returns a [StreamSink] that pipes all data to [consumer] and a [Future] that 397 /// Returns a [StreamSink] that pipes all data to [consumer] and a [Future] that
474 /// will succeed when [StreamSink] is closed or fail with any errors that occur 398 /// will succeed when [StreamSink] is closed or fail with any errors that occur
475 /// while writing. 399 /// while writing.
476 Pair<StreamSink, Future> consumerToSink(StreamConsumer consumer) { 400 Pair<StreamSink, Future> consumerToSink(StreamConsumer consumer) {
477 var controller = new StreamController(); 401 var controller = new StreamController();
478 var done = controller.stream.pipe(consumer); 402 var done = controller.stream.pipe(consumer);
479 return new Pair<StreamSink, Future>(controller.sink, done); 403 return new Pair<StreamSink, Future>(controller.sink, done);
480 } 404 }
(...skipping 121 matching lines...) Expand 10 before | Expand all | Expand 10 after
602 /// This is in an [ErrorGroup] with [stdinClosed], [stdout], and [stderr], so 526 /// This is in an [ErrorGroup] with [stdinClosed], [stdout], and [stderr], so
603 /// any error in process will be passed to it, but won't reach the top-level 527 /// any error in process will be passed to it, but won't reach the top-level
604 /// error handler unless nothing has handled it. 528 /// error handler unless nothing has handled it.
605 Future<int> get exitCode => _exitCode; 529 Future<int> get exitCode => _exitCode;
606 530
607 /// Creates a new [PubProcess] wrapping [process]. 531 /// Creates a new [PubProcess] wrapping [process].
608 PubProcess(Process process) 532 PubProcess(Process process)
609 : _process = process { 533 : _process = process {
610 var errorGroup = new ErrorGroup(); 534 var errorGroup = new ErrorGroup();
611 535
612 var pair = consumerToSink(wrapOutputStream(process.stdin)); 536 var pair = consumerToSink(process.stdin);
613 _stdin = pair.first; 537 _stdin = pair.first;
614 _stdinClosed = errorGroup.registerFuture(pair.last); 538 _stdinClosed = errorGroup.registerFuture(pair.last);
615 539
616 _stdout = new ByteStream( 540 _stdout = new ByteStream(
617 errorGroup.registerStream(wrapInputStream(process.stdout))); 541 errorGroup.registerStream(process.stdout));
618 _stderr = new ByteStream( 542 _stderr = new ByteStream(
619 errorGroup.registerStream(wrapInputStream(process.stderr))); 543 errorGroup.registerStream(process.stderr));
620 544
621 var exitCodeCompleter = new Completer(); 545 var exitCodeCompleter = new Completer();
622 _exitCode = errorGroup.registerFuture(exitCodeCompleter.future); 546 _exitCode = errorGroup.registerFuture(exitCodeCompleter.future);
623 _process.onExit = (code) => exitCodeCompleter.complete(code); 547 _process.exitCode.then((code) => exitCodeCompleter.complete(code));
624 } 548 }
625 549
626 /// Sends [signal] to the underlying process. 550 /// Sends [signal] to the underlying process.
627 bool kill([ProcessSignal signal = ProcessSignal.SIGTERM]) => 551 bool kill([ProcessSignal signal = ProcessSignal.SIGTERM]) =>
628 _process.kill(signal); 552 _process.kill(signal);
629 } 553 }
630 554
631 /// Calls [fn] with appropriately modified arguments. [fn] should have the same 555 /// Calls [fn] with appropriately modified arguments. [fn] should have the same
632 /// signature as [Process.start], except that the returned [Future] may have a 556 /// signature as [Process.start], except that the returned [Future] may have a
633 /// type other than [Process]. 557 /// type other than [Process].
(...skipping 155 matching lines...) Expand 10 before | Expand all | Expand 10 after
789 /// Create a .tar.gz archive from a list of entries. Each entry can be a 713 /// Create a .tar.gz archive from a list of entries. Each entry can be a
790 /// [String], [Directory], or [File] object. The root of the archive is 714 /// [String], [Directory], or [File] object. The root of the archive is
791 /// considered to be [baseDir], which defaults to the current working directory. 715 /// considered to be [baseDir], which defaults to the current working directory.
792 /// Returns a [ByteStream] that will emit the contents of the archive. 716 /// Returns a [ByteStream] that will emit the contents of the archive.
793 ByteStream createTarGz(List contents, {baseDir}) { 717 ByteStream createTarGz(List contents, {baseDir}) {
794 var buffer = new StringBuffer(); 718 var buffer = new StringBuffer();
795 buffer.add('Creating .tag.gz stream containing:\n'); 719 buffer.add('Creating .tag.gz stream containing:\n');
796 contents.forEach((file) => buffer.add('$file\n')); 720 contents.forEach((file) => buffer.add('$file\n'));
797 log.fine(buffer.toString()); 721 log.fine(buffer.toString());
798 722
799 // TODO(nweiz): Propagate errors to the returned stream (including non-zero
800 // exit codes). See issue 3657.
801 var controller = new StreamController<List<int>>(); 723 var controller = new StreamController<List<int>>();
802 724
803 if (baseDir == null) baseDir = path.current; 725 if (baseDir == null) baseDir = path.current;
804 baseDir = path.absolute(baseDir); 726 baseDir = path.absolute(baseDir);
805 contents = contents.map((entry) { 727 contents = contents.map((entry) {
806 entry = path.absolute(entry); 728 entry = path.absolute(entry);
807 if (!isBeneath(entry, baseDir)) { 729 if (!isBeneath(entry, baseDir)) {
808 throw 'Entry $entry is not inside $baseDir.'; 730 throw 'Entry $entry is not inside $baseDir.';
809 } 731 }
810 return path.relative(entry, from: baseDir); 732 return path.relative(entry, from: baseDir);
(...skipping 71 matching lines...) Expand 10 before | Expand all | Expand 10 after
882 const PubProcessResult(this.stdout, this.stderr, this.exitCode); 804 const PubProcessResult(this.stdout, this.stderr, this.exitCode);
883 805
884 bool get success => exitCode == 0; 806 bool get success => exitCode == 0;
885 } 807 }
886 808
887 /// Gets a [Uri] for [uri], which can either already be one, or be a [String]. 809 /// Gets a [Uri] for [uri], which can either already be one, or be a [String].
888 Uri _getUri(uri) { 810 Uri _getUri(uri) {
889 if (uri is Uri) return uri; 811 if (uri is Uri) return uri;
890 return Uri.parse(uri); 812 return Uri.parse(uri);
891 } 813 }
OLDNEW
« no previous file with comments | « utils/pub/http.dart ('k') | utils/pub/oauth2.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698