OLD | NEW |
---|---|
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 import "dart:_internal"; | 5 import "dart:_internal"; |
6 | 6 |
7 // We need to pass the value as first argument and leave the second and third | |
8 // arguments empty (used for error handling). | |
9 // See vm/ast_transformer.cc for usage. | |
10 Function _asyncThenWrapperHelper(continuation) { | |
11 // Any function that is used as an asynchronous callback must be registered | |
12 // in the current Zone. Normally, this is done by the future when a | |
13 // callback is registered (for example with `.then` or `.catchError`). In our | |
14 // case we want to reuse the same callback multiple times and therefore avoid | |
15 // the multiple registrations. For our internal futures (`_Future`) we can | |
16 // use the shortcut-version of `.then`, and skip the registration. However, | |
17 // that means that the continuation must be registered by us. | |
18 // | |
19 // Furthermore, we know that the root-zone doesn't actually do anything and | |
20 // we can therefore skip the registration call for it. | |
21 // | |
22 // Note, that the contination accepts up to three arguments. If the current | |
hausner
2015/08/11 21:22:07
continuation
floitsch
2015/08/12 15:03:14
Done.
| |
23 // zone is the root zone, we don't wrap the continuation, and a bad | |
24 // `Future` implementation could potentially invoke the callback with the | |
25 // wrong number of arguments. | |
26 if (Zone.current == Zone.ROOT) return continuation; | |
27 return Zone.current.registerUnaryCallback((x) => continuation(x, null, null)); | |
28 } | |
29 | |
30 // We need to pass the exception and stack trace objects as second and third | |
31 // parameter to the continuation. See vm/ast_transformer.cc for usage. | |
32 Function _asyncErrorWrapperHelper(continuation) { | |
33 // See comments of `_asyncThenWrapperHelper`. | |
34 var errorCallback = (e, s) => continuation(null, e, s); | |
35 if (Zone.current == Zone.ROOT) return errorCallback; | |
36 return Zone.current.registerBinaryCallback(errorCallback); | |
37 } | |
38 | |
39 /// Registers the [thenCallback] and [errorCallback] on the given [object]. | |
40 /// | |
41 /// If [object] is not a future, then it is wrapped into one. | |
42 /// | |
43 /// Returns the result of registering with `.then`. | |
44 Future _awaitHelper( | |
45 var object, Function thenCallback, Function errorCallback) { | |
46 if (object is! Future) { | |
47 object = new _Future().._setValue(object); | |
48 } else if (object is! _Future) { | |
49 return object.then(thenCallback, onError: errorCallback); | |
50 } | |
51 // `object` is a `_Future`. | |
52 // | |
53 // Since the callbacks have been registered in the current zone (see | |
54 // [_asyncThenWrapperHelper] and [_asyncErrorWrapperHelper]), we can avoid | |
55 // another registration and directly invoke the no-zone-registration `.then`. | |
56 // | |
57 // We can only do this for our internal futures (the default implementation of | |
58 // all futures that are constructed by the `dart:async` library). | |
59 return object._thenNoZoneRegistration(thenCallback, errorCallback); | |
60 } | |
61 | |
62 // _AsyncStarStreamController is used by the compiler to implement | |
63 // async* generator functions. | |
64 class _AsyncStarStreamController { | |
65 StreamController controller; | |
66 Function asyncStarBody; | |
67 bool isAdding = false; | |
68 bool onListenReceived = false; | |
69 bool isScheduled = false; | |
70 bool isSuspendedAtYield = false; | |
71 Completer cancellationCompleter = null; | |
72 | |
73 Stream get stream => controller.stream; | |
74 | |
75 void runBody() { | |
76 isScheduled = false; | |
77 isSuspendedAtYield = false; | |
78 asyncStarBody(); | |
79 } | |
80 | |
81 void scheduleGenerator() { | |
82 if (isScheduled || controller.isPaused || isAdding) { | |
83 return; | |
84 } | |
85 isScheduled = true; | |
86 scheduleMicrotask(runBody); | |
87 } | |
88 | |
89 // Adds element to steam, returns true if the caller should terminate | |
90 // execution of the generator. | |
91 // | |
92 // TODO(hausner): Per spec, the generator should be suspended before | |
93 // exiting when the stream is closed. We could add a getter like this: | |
94 // get isCancelled => controller.hasListener; | |
95 // The generator would translate a 'yield e' statement to | |
96 // controller.add(e); | |
97 // suspend; | |
98 // if (controller.isCancelled) return; | |
99 bool add(event) { | |
100 if (!onListenReceived) _fatal("yield before stream is listened to!"); | |
101 if (isSuspendedAtYield) _fatal("unexpected yield"); | |
102 // If stream is cancelled, tell caller to exit the async generator. | |
103 if (!controller.hasListener) { | |
104 return true; | |
105 } | |
106 controller.add(event); | |
107 scheduleGenerator(); | |
108 isSuspendedAtYield = true; | |
109 return false; | |
110 } | |
111 | |
112 // Adds the elements of stream into this controller's stream. | |
113 // The generator will be scheduled again when all of the | |
114 // elements of the added stream have been consumed. | |
115 // Returns true if the caller should terminate | |
116 // execution of the generator. | |
117 bool addStream(Stream stream) { | |
118 if (!onListenReceived) _fatal("yield before stream is listened to!"); | |
119 // If stream is cancelled, tell caller to exit the async generator. | |
120 if (!controller.hasListener) return true; | |
121 isAdding = true; | |
122 var whenDoneAdding = | |
123 controller.addStream(stream as Stream, cancelOnError: false); | |
124 whenDoneAdding.then((_) { | |
125 isAdding = false; | |
126 scheduleGenerator(); | |
127 }); | |
128 return false; | |
129 } | |
130 | |
131 void addError(error, stackTrace) { | |
132 if ((cancellationCompleter != null) && !cancellationCompleter.isCompleted) { | |
133 // If the stream has been cancelled, complete the cancellation future | |
134 // with the error. | |
135 cancellationCompleter.completeError(error, stackTrace); | |
136 return; | |
137 } | |
138 // If stream is cancelled, tell caller to exit the async generator. | |
139 if (!controller.hasListener) return; | |
140 controller.addError(error, stackTrace); | |
141 // No need to schedule the generator body here. This code is only | |
142 // called from the catch clause of the implicit try-catch-finally | |
143 // around the generator body. That is, we are on the error path out | |
144 // of the generator and do not need to run the generator again. | |
145 } | |
146 | |
147 close() { | |
148 if ((cancellationCompleter != null) && !cancellationCompleter.isCompleted) { | |
149 // If the stream has been cancelled, complete the cancellation future | |
150 // with the error. | |
151 cancellationCompleter.complete(); | |
152 } | |
153 controller.close(); | |
154 } | |
155 | |
156 _AsyncStarStreamController(this.asyncStarBody) { | |
157 controller = new StreamController(onListen: this.onListen, | |
158 onResume: this.onResume, | |
159 onCancel: this.onCancel); | |
160 } | |
161 | |
162 onListen() { | |
163 assert(!onListenReceived); | |
164 onListenReceived = true; | |
165 scheduleGenerator(); | |
166 } | |
167 | |
168 onResume() { | |
169 if (isSuspendedAtYield) { | |
170 scheduleGenerator(); | |
171 } | |
172 } | |
173 | |
174 onCancel() { | |
175 if (controller.isClosed) { | |
176 return null; | |
177 } | |
178 if (cancellationCompleter == null) { | |
179 cancellationCompleter = new Completer(); | |
180 scheduleGenerator(); | |
181 } | |
182 return cancellationCompleter.future; | |
183 } | |
184 } | |
OLD | NEW |