Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(206)

Side by Side Diff: watcher/lib/src/utils.dart

Issue 1400473008: Roll Observatory packages and add a roll script (Closed) Base URL: git@github.com:dart-lang/observatory_pub_packages.git@master
Patch Set: Created 5 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « watcher/lib/src/stat.dart ('k') | watcher/lib/src/watch_event.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file.
4
5 library watcher.utils;
6
7 import 'dart:async';
8 import 'dart:io';
9 import 'dart:collection';
10
11 /// Returns `true` if [error] is a [FileSystemException] for a missing
12 /// directory.
13 bool isDirectoryNotFoundException(error) {
14 if (error is! FileSystemException) return false;
15
16 // See dartbug.com/12461 and tests/standalone/io/directory_error_test.dart.
17 var notFoundCode = Platform.operatingSystem == "windows" ? 3 : 2;
18 return error.osError.errorCode == notFoundCode;
19 }
20
21 /// Returns the union of all elements in each set in [sets].
22 Set unionAll(Iterable<Set> sets) =>
23 sets.fold(new Set(), (union, set) => union.union(set));
24
25 /// Returns a buffered stream that will emit the same values as the stream
26 /// returned by [future] once [future] completes.
27 ///
28 /// If [future] completes to an error, the return value will emit that error and
29 /// then close.
30 ///
31 /// If [broadcast] is true, a broadcast stream is returned. This assumes that
32 /// the stream returned by [future] will be a broadcast stream as well.
33 /// [broadcast] defaults to false.
34 Stream futureStream(Future<Stream> future, {bool broadcast: false}) {
35 var subscription;
36 var controller;
37
38 future = future.catchError((e, stackTrace) {
39 // Since [controller] is synchronous, it's likely that emitting an error
40 // will cause it to be cancelled before we call close.
41 if (controller != null) controller.addError(e, stackTrace);
42 if (controller != null) controller.close();
43 controller = null;
44 });
45
46 onListen() {
47 future.then((stream) {
48 if (controller == null) return;
49 subscription = stream.listen(
50 controller.add,
51 onError: controller.addError,
52 onDone: controller.close);
53 });
54 }
55
56 onCancel() {
57 if (subscription != null) subscription.cancel();
58 subscription = null;
59 controller = null;
60 }
61
62 if (broadcast) {
63 controller = new StreamController.broadcast(
64 sync: true, onListen: onListen, onCancel: onCancel);
65 } else {
66 controller = new StreamController(
67 sync: true, onListen: onListen, onCancel: onCancel);
68 }
69 return controller.stream;
70 }
71
72 /// Like [new Future], but avoids around issue 11911 by using [new Future.value]
73 /// under the covers.
74 Future newFuture(callback()) => new Future.value().then((_) => callback());
75
76 /// Returns a [Future] that completes after pumping the event queue [times]
77 /// times. By default, this should pump the event queue enough times to allow
78 /// any code to run, as long as it's not waiting on some external event.
79 Future pumpEventQueue([int times = 20]) {
80 if (times == 0) return new Future.value();
81 // We use a delayed future to allow microtask events to finish. The
82 // Future.value or Future() constructors use scheduleMicrotask themselves and
83 // would therefore not wait for microtask callbacks that are scheduled after
84 // invoking this method.
85 return new Future.delayed(Duration.ZERO, () => pumpEventQueue(times - 1));
86 }
87
88 /// A stream transformer that batches all events that are sent at the same time.
89 ///
90 /// When multiple events are synchronously added to a stream controller, the
91 /// [StreamController] implementation uses [scheduleMicrotask] to schedule the
92 /// asynchronous firing of each event. In order to recreate the synchronous
93 /// batches, this collates all the events that are received in "nearby"
94 /// microtasks.
95 class BatchedStreamTransformer<T> implements StreamTransformer<T, List<T>> {
96 Stream<List<T>> bind(Stream<T> input) {
97 var batch = new Queue();
98 return new StreamTransformer<T, List<T>>.fromHandlers(
99 handleData: (event, sink) {
100 batch.add(event);
101
102 // [Timer.run] schedules an event that runs after any microtasks that have
103 // been scheduled.
104 Timer.run(() {
105 if (batch.isEmpty) return;
106 sink.add(batch.toList());
107 batch.clear();
108 });
109 }, handleDone: (sink) {
110 if (batch.isNotEmpty) {
111 sink.add(batch.toList());
112 batch.clear();
113 }
114 sink.close();
115 }).bind(input);
116 }
117 }
OLDNEW
« no previous file with comments | « watcher/lib/src/stat.dart ('k') | watcher/lib/src/watch_event.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698