OLD | NEW |
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.async; | 5 part of dart.async; |
6 | 6 |
7 /** | 7 /** |
8 * Utility function to attach a stack trace to an [error] if it doesn't have | 8 * Utility function to attach a stack trace to an [error] if it doesn't have |
9 * one already. | 9 * one already. |
10 */ | 10 */ |
(...skipping 51 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
62 StreamSubscription<T> _createSubscription(void onData(T value), | 62 StreamSubscription<T> _createSubscription(void onData(T value), |
63 void onError(error), | 63 void onError(error), |
64 void onDone(), | 64 void onDone(), |
65 bool cancelOnError) { | 65 bool cancelOnError) { |
66 return new _ForwardingStreamSubscription<S, T>( | 66 return new _ForwardingStreamSubscription<S, T>( |
67 this, onData, onError, onDone, cancelOnError); | 67 this, onData, onError, onDone, cancelOnError); |
68 } | 68 } |
69 | 69 |
70 // Override the following methods in subclasses to change the behavior. | 70 // Override the following methods in subclasses to change the behavior. |
71 | 71 |
72 void _handleData(S data, _EventSink<T> sink) { | 72 void _handleData(S data, _EventOutputSink<T> sink) { |
73 var outputData = data; | 73 var outputData = data; |
74 sink._add(outputData); | 74 sink._sendData(outputData); |
75 } | 75 } |
76 | 76 |
77 void _handleError(error, _EventSink<T> sink) { | 77 void _handleError(error, _EventOutputSink<T> sink) { |
78 sink._addError(error); | 78 sink._sendError(error); |
79 } | 79 } |
80 | 80 |
81 void _handleDone(_EventSink<T> sink) { | 81 void _handleDone(_EventOutputSink<T> sink) { |
82 sink._close(); | 82 sink._sendDone(); |
83 } | 83 } |
84 } | 84 } |
85 | 85 |
86 /** | 86 /** |
| 87 * Common behavior of [StreamSubscription] classes. |
| 88 * |
| 89 * Stores and allows updating of the event handlers of a [StreamSubscription]. |
| 90 */ |
| 91 abstract class _BaseStreamSubscription<T> implements StreamSubscription<T> { |
| 92 // TODO(ahe): Restore type when feature is implemented in dart2js |
| 93 // checked mode. http://dartbug.com/7733 |
| 94 var /* _DataHandler<T> */ _onData; |
| 95 _ErrorHandler _onError; |
| 96 _DoneHandler _onDone; |
| 97 |
| 98 _BaseStreamSubscription(this._onData, |
| 99 this._onError, |
| 100 this._onDone) { |
| 101 if (_onData == null) _onData = _nullDataHandler; |
| 102 if (_onError == null) _onError = _nullErrorHandler; |
| 103 if (_onDone == null) _onDone = _nullDoneHandler; |
| 104 } |
| 105 |
| 106 // StreamSubscription interface. |
| 107 void onData(void handleData(T event)) { |
| 108 if (handleData == null) handleData = _nullDataHandler; |
| 109 _onData = handleData; |
| 110 } |
| 111 |
| 112 void onError(void handleError(error)) { |
| 113 if (handleError == null) handleError = _nullErrorHandler; |
| 114 _onError = handleError; |
| 115 } |
| 116 |
| 117 void onDone(void handleDone()) { |
| 118 if (handleDone == null) handleDone = _nullDoneHandler; |
| 119 _onDone = handleDone; |
| 120 } |
| 121 |
| 122 void pause([Future resumeSignal]); |
| 123 |
| 124 void resume(); |
| 125 |
| 126 void cancel(); |
| 127 |
| 128 Future asFuture([var futureValue]) { |
| 129 _FutureImpl<T> result = new _FutureImpl<T>(); |
| 130 |
| 131 // Overwrite the onDone and onError handlers. |
| 132 onDone(() { result._setValue(futureValue); }); |
| 133 onError((error) { |
| 134 cancel(); |
| 135 result._setError(error); |
| 136 }); |
| 137 |
| 138 return result; |
| 139 } |
| 140 } |
| 141 |
| 142 |
| 143 /** |
87 * Abstract superclass for subscriptions that forward to other subscriptions. | 144 * Abstract superclass for subscriptions that forward to other subscriptions. |
88 */ | 145 */ |
89 class _ForwardingStreamSubscription<S, T> | 146 class _ForwardingStreamSubscription<S, T> |
90 extends _BufferingStreamSubscription<T> { | 147 extends _BaseStreamSubscription<T> implements _EventOutputSink<T> { |
91 final _ForwardingStream<S, T> _stream; | 148 final _ForwardingStream<S, T> _stream; |
| 149 final bool _cancelOnError; |
92 | 150 |
93 StreamSubscription<S> _subscription; | 151 StreamSubscription<S> _subscription; |
94 | 152 |
95 _ForwardingStreamSubscription(this._stream, | 153 _ForwardingStreamSubscription(this._stream, |
96 void onData(T data), | 154 void onData(T data), |
97 void onError(error), | 155 void onError(error), |
98 void onDone(), | 156 void onDone(), |
99 bool cancelOnError) | 157 this._cancelOnError) |
100 : super(onData, onError, onDone, cancelOnError) { | 158 : super(onData, onError, onDone) { |
| 159 // Don't unsubscribe on incoming error, only if we send an error forwards. |
101 _subscription = | 160 _subscription = |
102 _stream._source.listen(_handleData, | 161 _stream._source.listen(_handleData, |
103 onError: _handleError, | 162 onError: _handleError, |
104 onDone: _handleDone); | 163 onDone: _handleDone); |
105 } | 164 } |
106 | 165 |
107 // _StreamSink interface. | 166 // StreamSubscription interface. |
108 // Transformers sending more than one event have no way to know if the stream | |
109 // is canceled or closed after the first, so we just ignore remaining events. | |
110 | 167 |
111 void _add(T data) { | 168 void pause([Future resumeSignal]) { |
112 if (_isClosed) return; | 169 if (_subscription == null) return; |
113 super._add(data); | 170 _subscription.pause(resumeSignal); |
114 } | 171 } |
115 | 172 |
116 void _addError(Object error) { | 173 void resume() { |
117 if (_isClosed) return; | |
118 super._addError(error); | |
119 } | |
120 | |
121 // StreamSubscription callbacks. | |
122 | |
123 void _onPause() { | |
124 if (_subscription == null) return; | |
125 _subscription.pause(); | |
126 } | |
127 | |
128 void _onResume() { | |
129 if (_subscription == null) return; | 174 if (_subscription == null) return; |
130 _subscription.resume(); | 175 _subscription.resume(); |
131 } | 176 } |
132 | 177 |
133 void _onCancel() { | 178 bool get isPaused { |
| 179 if (_subscription == null) return false; |
| 180 return _subscription.isPaused; |
| 181 } |
| 182 |
| 183 void cancel() { |
134 if (_subscription != null) { | 184 if (_subscription != null) { |
135 StreamSubscription subscription = _subscription; | 185 _subscription.cancel(); |
136 _subscription = null; | 186 _subscription = null; |
137 subscription.cancel(); | |
138 } | 187 } |
139 } | 188 } |
140 | 189 |
| 190 // _EventOutputSink interface. Sends data to this subscription. |
| 191 |
| 192 void _sendData(T data) { |
| 193 _onData(data); |
| 194 } |
| 195 |
| 196 void _sendError(error) { |
| 197 _onError(error); |
| 198 if (_cancelOnError) { |
| 199 _subscription.cancel(); |
| 200 _subscription = null; |
| 201 } |
| 202 } |
| 203 |
| 204 void _sendDone() { |
| 205 // If the transformation sends a done signal, we stop the subscription. |
| 206 if (_subscription != null) { |
| 207 _subscription.cancel(); |
| 208 _subscription = null; |
| 209 } |
| 210 _onDone(); |
| 211 } |
| 212 |
141 // Methods used as listener on source subscription. | 213 // Methods used as listener on source subscription. |
142 | 214 |
143 // TODO(ahe): Restore type when feature is implemented in dart2js | 215 // TODO(ahe): Restore type when feature is implemented in dart2js |
144 // checked mode. http://dartbug.com/7733 | 216 // checked mode. http://dartbug.com/7733 |
145 void _handleData(/*S*/ data) { | 217 void _handleData(/*S*/ data) { |
146 _stream._handleData(data, this); | 218 _stream._handleData(data, this); |
147 } | 219 } |
148 | 220 |
149 void _handleError(error) { | 221 void _handleError(error) { |
150 _stream._handleError(error, this); | 222 _stream._handleError(error, this); |
(...skipping 11 matching lines...) Expand all Loading... |
162 // ------------------------------------------------------------------- | 234 // ------------------------------------------------------------------- |
163 | 235 |
164 typedef bool _Predicate<T>(T value); | 236 typedef bool _Predicate<T>(T value); |
165 | 237 |
166 class _WhereStream<T> extends _ForwardingStream<T, T> { | 238 class _WhereStream<T> extends _ForwardingStream<T, T> { |
167 final _Predicate<T> _test; | 239 final _Predicate<T> _test; |
168 | 240 |
169 _WhereStream(Stream<T> source, bool test(T value)) | 241 _WhereStream(Stream<T> source, bool test(T value)) |
170 : _test = test, super(source); | 242 : _test = test, super(source); |
171 | 243 |
172 void _handleData(T inputEvent, _EventSink<T> sink) { | 244 void _handleData(T inputEvent, _EventOutputSink<T> sink) { |
173 bool satisfies; | 245 bool satisfies; |
174 try { | 246 try { |
175 satisfies = _test(inputEvent); | 247 satisfies = _test(inputEvent); |
176 } catch (e, s) { | 248 } catch (e, s) { |
177 sink._addError(_asyncError(e, s)); | 249 sink._sendError(_asyncError(e, s)); |
178 return; | 250 return; |
179 } | 251 } |
180 if (satisfies) { | 252 if (satisfies) { |
181 sink._add(inputEvent); | 253 sink._sendData(inputEvent); |
182 } | 254 } |
183 } | 255 } |
184 } | 256 } |
185 | 257 |
186 | 258 |
187 typedef T _Transformation<S, T>(S value); | 259 typedef T _Transformation<S, T>(S value); |
188 | 260 |
189 /** | 261 /** |
190 * A stream pipe that converts data events before passing them on. | 262 * A stream pipe that converts data events before passing them on. |
191 */ | 263 */ |
192 class _MapStream<S, T> extends _ForwardingStream<S, T> { | 264 class _MapStream<S, T> extends _ForwardingStream<S, T> { |
193 final _Transformation _transform; | 265 final _Transformation _transform; |
194 | 266 |
195 _MapStream(Stream<S> source, T transform(S event)) | 267 _MapStream(Stream<S> source, T transform(S event)) |
196 : this._transform = transform, super(source); | 268 : this._transform = transform, super(source); |
197 | 269 |
198 void _handleData(S inputEvent, _EventSink<T> sink) { | 270 void _handleData(S inputEvent, _EventOutputSink<T> sink) { |
199 T outputEvent; | 271 T outputEvent; |
200 try { | 272 try { |
201 outputEvent = _transform(inputEvent); | 273 outputEvent = _transform(inputEvent); |
202 } catch (e, s) { | 274 } catch (e, s) { |
203 sink._addError(_asyncError(e, s)); | 275 sink._sendError(_asyncError(e, s)); |
204 return; | 276 return; |
205 } | 277 } |
206 sink._add(outputEvent); | 278 sink._sendData(outputEvent); |
207 } | 279 } |
208 } | 280 } |
209 | 281 |
210 /** | 282 /** |
211 * A stream pipe that converts data events before passing them on. | 283 * A stream pipe that converts data events before passing them on. |
212 */ | 284 */ |
213 class _ExpandStream<S, T> extends _ForwardingStream<S, T> { | 285 class _ExpandStream<S, T> extends _ForwardingStream<S, T> { |
214 final _Transformation<S, Iterable<T>> _expand; | 286 final _Transformation<S, Iterable<T>> _expand; |
215 | 287 |
216 _ExpandStream(Stream<S> source, Iterable<T> expand(S event)) | 288 _ExpandStream(Stream<S> source, Iterable<T> expand(S event)) |
217 : this._expand = expand, super(source); | 289 : this._expand = expand, super(source); |
218 | 290 |
219 void _handleData(S inputEvent, _EventSink<T> sink) { | 291 void _handleData(S inputEvent, _EventOutputSink<T> sink) { |
220 try { | 292 try { |
221 for (T value in _expand(inputEvent)) { | 293 for (T value in _expand(inputEvent)) { |
222 sink._add(value); | 294 sink._sendData(value); |
223 } | 295 } |
224 } catch (e, s) { | 296 } catch (e, s) { |
225 // If either _expand or iterating the generated iterator throws, | 297 // If either _expand or iterating the generated iterator throws, |
226 // we abort the iteration. | 298 // we abort the iteration. |
227 sink._addError(_asyncError(e, s)); | 299 sink._sendError(_asyncError(e, s)); |
228 } | 300 } |
229 } | 301 } |
230 } | 302 } |
231 | 303 |
232 | 304 |
233 typedef void _ErrorTransformation(error); | 305 typedef void _ErrorTransformation(error); |
234 typedef bool _ErrorTest(error); | 306 typedef bool _ErrorTest(error); |
235 | 307 |
236 /** | 308 /** |
237 * A stream pipe that converts or disposes error events | 309 * A stream pipe that converts or disposes error events |
238 * before passing them on. | 310 * before passing them on. |
239 */ | 311 */ |
240 class _HandleErrorStream<T> extends _ForwardingStream<T, T> { | 312 class _HandleErrorStream<T> extends _ForwardingStream<T, T> { |
241 final _ErrorTransformation _transform; | 313 final _ErrorTransformation _transform; |
242 final _ErrorTest _test; | 314 final _ErrorTest _test; |
243 | 315 |
244 _HandleErrorStream(Stream<T> source, | 316 _HandleErrorStream(Stream<T> source, |
245 void transform(event), | 317 void transform(event), |
246 bool test(error)) | 318 bool test(error)) |
247 : this._transform = transform, this._test = test, super(source); | 319 : this._transform = transform, this._test = test, super(source); |
248 | 320 |
249 void _handleError(Object error, _EventSink<T> sink) { | 321 void _handleError(Object error, _EventOutputSink<T> sink) { |
250 bool matches = true; | 322 bool matches = true; |
251 if (_test != null) { | 323 if (_test != null) { |
252 try { | 324 try { |
253 matches = _test(error); | 325 matches = _test(error); |
254 } catch (e, s) { | 326 } catch (e, s) { |
255 sink._addError(_asyncError(e, s)); | 327 sink._sendError(_asyncError(e, s)); |
256 return; | 328 return; |
257 } | 329 } |
258 } | 330 } |
259 if (matches) { | 331 if (matches) { |
260 try { | 332 try { |
261 _transform(error); | 333 _transform(error); |
262 } catch (e, s) { | 334 } catch (e, s) { |
263 sink._addError(_asyncError(e, s)); | 335 sink._sendError(_asyncError(e, s)); |
264 return; | 336 return; |
265 } | 337 } |
266 } else { | 338 } else { |
267 sink._addError(error); | 339 sink._sendError(error); |
268 } | 340 } |
269 } | 341 } |
270 } | 342 } |
271 | 343 |
272 | 344 |
273 class _TakeStream<T> extends _ForwardingStream<T, T> { | 345 class _TakeStream<T> extends _ForwardingStream<T, T> { |
274 int _remaining; | 346 int _remaining; |
275 | 347 |
276 _TakeStream(Stream<T> source, int count) | 348 _TakeStream(Stream<T> source, int count) |
277 : this._remaining = count, super(source) { | 349 : this._remaining = count, super(source) { |
278 // This test is done early to avoid handling an async error | 350 // This test is done early to avoid handling an async error |
279 // in the _handleData method. | 351 // in the _handleData method. |
280 if (count is! int) throw new ArgumentError(count); | 352 if (count is! int) throw new ArgumentError(count); |
281 } | 353 } |
282 | 354 |
283 void _handleData(T inputEvent, _EventSink<T> sink) { | 355 void _handleData(T inputEvent, _EventOutputSink<T> sink) { |
284 if (_remaining > 0) { | 356 if (_remaining > 0) { |
285 sink._add(inputEvent); | 357 sink._sendData(inputEvent); |
286 _remaining -= 1; | 358 _remaining -= 1; |
287 if (_remaining == 0) { | 359 if (_remaining == 0) { |
288 // Closing also unsubscribes all subscribers, which unsubscribes | 360 // Closing also unsubscribes all subscribers, which unsubscribes |
289 // this from source. | 361 // this from source. |
290 sink._close(); | 362 sink._sendDone(); |
291 } | 363 } |
292 } | 364 } |
293 } | 365 } |
294 } | 366 } |
295 | 367 |
296 | 368 |
297 class _TakeWhileStream<T> extends _ForwardingStream<T, T> { | 369 class _TakeWhileStream<T> extends _ForwardingStream<T, T> { |
298 final _Predicate<T> _test; | 370 final _Predicate<T> _test; |
299 | 371 |
300 _TakeWhileStream(Stream<T> source, bool test(T value)) | 372 _TakeWhileStream(Stream<T> source, bool test(T value)) |
301 : this._test = test, super(source); | 373 : this._test = test, super(source); |
302 | 374 |
303 void _handleData(T inputEvent, _EventSink<T> sink) { | 375 void _handleData(T inputEvent, _EventOutputSink<T> sink) { |
304 bool satisfies; | 376 bool satisfies; |
305 try { | 377 try { |
306 satisfies = _test(inputEvent); | 378 satisfies = _test(inputEvent); |
307 } catch (e, s) { | 379 } catch (e, s) { |
308 sink._addError(_asyncError(e, s)); | 380 sink._sendError(_asyncError(e, s)); |
309 // The test didn't say true. Didn't say false either, but we stop anyway. | 381 // The test didn't say true. Didn't say false either, but we stop anyway. |
310 sink._close(); | 382 sink._sendDone(); |
311 return; | 383 return; |
312 } | 384 } |
313 if (satisfies) { | 385 if (satisfies) { |
314 sink._add(inputEvent); | 386 sink._sendData(inputEvent); |
315 } else { | 387 } else { |
316 sink._close(); | 388 sink._sendDone(); |
317 } | 389 } |
318 } | 390 } |
319 } | 391 } |
320 | 392 |
321 class _SkipStream<T> extends _ForwardingStream<T, T> { | 393 class _SkipStream<T> extends _ForwardingStream<T, T> { |
322 int _remaining; | 394 int _remaining; |
323 | 395 |
324 _SkipStream(Stream<T> source, int count) | 396 _SkipStream(Stream<T> source, int count) |
325 : this._remaining = count, super(source) { | 397 : this._remaining = count, super(source) { |
326 // This test is done early to avoid handling an async error | 398 // This test is done early to avoid handling an async error |
327 // in the _handleData method. | 399 // in the _handleData method. |
328 if (count is! int || count < 0) throw new ArgumentError(count); | 400 if (count is! int || count < 0) throw new ArgumentError(count); |
329 } | 401 } |
330 | 402 |
331 void _handleData(T inputEvent, _EventSink<T> sink) { | 403 void _handleData(T inputEvent, _EventOutputSink<T> sink) { |
332 if (_remaining > 0) { | 404 if (_remaining > 0) { |
333 _remaining--; | 405 _remaining--; |
334 return; | 406 return; |
335 } | 407 } |
336 return sink._add(inputEvent); | 408 return sink._sendData(inputEvent); |
337 } | 409 } |
338 } | 410 } |
339 | 411 |
340 class _SkipWhileStream<T> extends _ForwardingStream<T, T> { | 412 class _SkipWhileStream<T> extends _ForwardingStream<T, T> { |
341 final _Predicate<T> _test; | 413 final _Predicate<T> _test; |
342 bool _hasFailed = false; | 414 bool _hasFailed = false; |
343 | 415 |
344 _SkipWhileStream(Stream<T> source, bool test(T value)) | 416 _SkipWhileStream(Stream<T> source, bool test(T value)) |
345 : this._test = test, super(source); | 417 : this._test = test, super(source); |
346 | 418 |
347 void _handleData(T inputEvent, _EventSink<T> sink) { | 419 void _handleData(T inputEvent, _EventOutputSink<T> sink) { |
348 if (_hasFailed) { | 420 if (_hasFailed) { |
349 sink._add(inputEvent); | 421 sink._sendData(inputEvent); |
350 return; | 422 return; |
351 } | 423 } |
352 bool satisfies; | 424 bool satisfies; |
353 try { | 425 try { |
354 satisfies = _test(inputEvent); | 426 satisfies = _test(inputEvent); |
355 } catch (e, s) { | 427 } catch (e, s) { |
356 sink._addError(_asyncError(e, s)); | 428 sink._sendError(_asyncError(e, s)); |
357 // A failure to return a boolean is considered "not matching". | 429 // A failure to return a boolean is considered "not matching". |
358 _hasFailed = true; | 430 _hasFailed = true; |
359 return; | 431 return; |
360 } | 432 } |
361 if (!satisfies) { | 433 if (!satisfies) { |
362 _hasFailed = true; | 434 _hasFailed = true; |
363 sink._add(inputEvent); | 435 sink._sendData(inputEvent); |
364 } | 436 } |
365 } | 437 } |
366 } | 438 } |
367 | 439 |
368 typedef bool _Equality<T>(T a, T b); | 440 typedef bool _Equality<T>(T a, T b); |
369 | 441 |
370 class _DistinctStream<T> extends _ForwardingStream<T, T> { | 442 class _DistinctStream<T> extends _ForwardingStream<T, T> { |
371 static var _SENTINEL = new Object(); | 443 static var _SENTINEL = new Object(); |
372 | 444 |
373 _Equality<T> _equals; | 445 _Equality<T> _equals; |
374 var _previous = _SENTINEL; | 446 var _previous = _SENTINEL; |
375 | 447 |
376 _DistinctStream(Stream<T> source, bool equals(T a, T b)) | 448 _DistinctStream(Stream<T> source, bool equals(T a, T b)) |
377 : _equals = equals, super(source); | 449 : _equals = equals, super(source); |
378 | 450 |
379 void _handleData(T inputEvent, _EventSink<T> sink) { | 451 void _handleData(T inputEvent, _EventOutputSink<T> sink) { |
380 if (identical(_previous, _SENTINEL)) { | 452 if (identical(_previous, _SENTINEL)) { |
381 _previous = inputEvent; | 453 _previous = inputEvent; |
382 return sink._add(inputEvent); | 454 return sink._sendData(inputEvent); |
383 } else { | 455 } else { |
384 bool isEqual; | 456 bool isEqual; |
385 try { | 457 try { |
386 if (_equals == null) { | 458 if (_equals == null) { |
387 isEqual = (_previous == inputEvent); | 459 isEqual = (_previous == inputEvent); |
388 } else { | 460 } else { |
389 isEqual = _equals(_previous, inputEvent); | 461 isEqual = _equals(_previous, inputEvent); |
390 } | 462 } |
391 } catch (e, s) { | 463 } catch (e, s) { |
392 sink._addError(_asyncError(e, s)); | 464 sink._sendError(_asyncError(e, s)); |
393 return null; | 465 return null; |
394 } | 466 } |
395 if (!isEqual) { | 467 if (!isEqual) { |
396 sink._add(inputEvent); | 468 sink._sendData(inputEvent); |
397 _previous = inputEvent; | 469 _previous = inputEvent; |
398 } | 470 } |
399 } | 471 } |
400 } | 472 } |
401 } | 473 } |
402 | 474 |
403 // Stream transformations and event transformations. | 475 // Stream transformations and event transformations. |
404 | 476 |
405 typedef void _TransformDataHandler<S, T>(S data, EventSink<T> sink); | 477 typedef void _TransformDataHandler<S, T>(S data, EventSink<T> sink); |
406 typedef void _TransformErrorHandler<T>(data, EventSink<T> sink); | 478 typedef void _TransformErrorHandler<T>(data, EventSink<T> sink); |
(...skipping 48 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
455 | 527 |
456 void handleError(error, EventSink<T> sink) { | 528 void handleError(error, EventSink<T> sink) { |
457 _handleError(error, sink); | 529 _handleError(error, sink); |
458 } | 530 } |
459 | 531 |
460 void handleDone(EventSink<T> sink) { | 532 void handleDone(EventSink<T> sink) { |
461 _handleDone(sink); | 533 _handleDone(sink); |
462 } | 534 } |
463 } | 535 } |
464 | 536 |
OLD | NEW |