Chromium Code Reviews| OLD | NEW |
|---|---|
| (Empty) | |
| 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file | |
| 2 // for details. All rights reserved. Use of this source code is governed by a | |
| 3 // BSD-style license that can be found in the LICENSE file. | |
| 4 | |
| 5 library async.stream_events; | |
| 6 | |
| 7 import 'dart:async'; | |
| 8 import 'dart:collection'; | |
| 9 | |
| 10 import "subscription_stream.dart"; | |
| 11 import "stream_completer.dart"; | |
| 12 import "../result.dart"; | |
| 13 | |
| 14 /// An asynchronous pull-based interface for accessing stream events. | |
| 15 /// | |
| 16 /// Wraps a stream and makes individual events available on request. | |
| 17 /// | |
| 18 /// You can request (and reserve) one or more events from the stream, | |
| 19 /// and after all previous requestes have been fulfilled, stream events | |
|
nweiz
2015/06/18 23:44:26
"requestes" -> "requests"
Lasse Reichstein Nielsen
2015/06/30 10:34:13
Done.
| |
| 20 /// go towards fulfilling your request. | |
| 21 /// | |
| 22 /// For example, if you ask for [next] two times, the returned futures | |
| 23 /// will be completed by the next two unreserved events from the stream. | |
|
nweiz
2015/06/18 23:44:25
"unreserved" -> "unrequested"? Just to keep the te
Lasse Reichstein Nielsen
2015/06/30 10:34:13
Done.
| |
| 24 /// | |
| 25 /// The stream subscription is paused when there are no active | |
| 26 /// requests. | |
| 27 /// | |
| 28 /// Some streams, including broadcast streams, will buffer | |
| 29 /// events while paused, so waiting too long between requests may | |
| 30 /// cause memory bloat somewhere else. | |
| 31 /// | |
| 32 /// The individual requests are served in the order they are requested, | |
| 33 /// and the stream subscription is paused when there are no active requests. | |
|
nweiz
2015/06/18 23:44:27
Both of these are already mentioned above, so this
Lasse Reichstein Nielsen
2015/06/30 10:34:13
Done.
| |
| 34 /// | |
| 35 /// This is similar to, but more convenient than, a [StreamIterator]. | |
| 36 /// A `StreamIterator` requires you to manually check when a new event is | |
| 37 /// available and you can only access the value of that event until you | |
| 38 /// check for the next one. A `StreamQueue` allows you to request, for example, | |
| 39 /// three events at a time, either individually, as a group using [take] | |
| 40 /// or [skip], or in any combination. | |
| 41 /// | |
| 42 /// You can also ask to have the [rest] of the stream provided as | |
| 43 /// a new stream. This allows, for example, taking the first event | |
| 44 /// out of a stream and continue using the rest of the stream as a stream. | |
|
nweiz
2015/06/18 23:44:27
"continue using" -> "continuing to use"
Lasse Reichstein Nielsen
2015/06/30 10:34:14
Done.
| |
| 45 /// | |
| 46 /// Example: | |
| 47 /// | |
| 48 /// var events = new StreamQueue<String>(someStreamOfLines); | |
| 49 /// var first = await events.next; | |
| 50 /// while (first.startsWith('#')) { | |
| 51 /// // Skip comments. | |
| 52 /// first = await events.next; | |
| 53 /// } | |
| 54 /// | |
| 55 /// if (first.startsWith(MAGIC_MARKER)) { | |
| 56 /// var headerCount = | |
| 57 /// first.parseInt(first.substring(MAGIC_MARKER.length + 1)); | |
| 58 /// handleMessage(headers: await events.take(headerCount), | |
| 59 /// body: events.rest); | |
| 60 /// return; | |
| 61 /// } | |
| 62 /// // Error handling. | |
| 63 /// | |
| 64 /// When you need no further events the `StreamQueue` should be closed | |
| 65 /// using [cancel]. This releases the underlying stream subscription. | |
| 66 /// | |
| 67 /// The underlying stream subscription is paused when there | |
| 68 /// are no requeusts. Some subscriptions, including those of broadcast streams, | |
| 69 /// will still buffer events while paused. Creating a `StreamQueue` from | |
| 70 /// such a stream and stopping to request events, will cause memory to fill up | |
|
nweiz
2015/06/18 23:44:27
"stopping" -> "ceasing", get rid of the comma
| |
| 71 /// unnecessarily. | |
|
nweiz
2015/06/18 23:44:26
This paragraph also seems redundant with the sixth
Lasse Reichstein Nielsen
2015/06/30 10:34:12
Done.
| |
| 72 class StreamQueue<T> { | |
| 73 // This class maintains two queues: one of events and one of requests. | |
| 74 // The active request (the one in front of the queue) is called with | |
| 75 // the current event queue when it becomes active. | |
|
nweiz
2015/06/18 23:44:27
"it" -> "the request" (to clarify whether you're r
Lasse Reichstein Nielsen
2015/06/30 10:34:13
Done.
| |
| 76 // If it returns true, it's done and will be removed from the request queue. | |
|
nweiz
2015/06/18 23:44:26
"it" -> "the request" again. The next "it" is fine
Lasse Reichstein Nielsen
2015/06/30 10:34:13
Done.
| |
| 77 // If it returns false, it needs more events, and will be called again when | |
| 78 // new events are available. | |
| 79 // The request can remove events that it uses, or keep them in the event | |
| 80 // queue until it has all that it needs. | |
| 81 // | |
| 82 // This model is very flexible and easily extensible. | |
| 83 // It allows requests that don't consume events (like [hasNext]) or | |
| 84 // potentially a request that takes either five or zero events, determined | |
| 85 // by the content of the fifth event. | |
| 86 | |
| 87 /// Source of events. | |
| 88 final Stream _sourceStream; | |
| 89 | |
| 90 /// Number of events that may be prefetched when there is no request. | |
| 91 final int _prefetchCount; | |
| 92 | |
| 93 /// Subscription on [_sourceStream] while listening for events. | |
| 94 /// | |
| 95 /// Set to subscription when listening, and set to `null` when the | |
| 96 /// subscription is done (and [_isDone] is set to true). | |
| 97 StreamSubscription _subscription; | |
| 98 | |
| 99 /// Whether we have listened on [_sourceStream] and the subscription is done. | |
| 100 bool _isDone = false; | |
| 101 | |
| 102 /// Whether a closing operation has been performed on the stream queue. | |
| 103 /// | |
| 104 /// Closing operations are [cancel] and [rest]. | |
| 105 bool _isClosed = false; | |
| 106 | |
| 107 /// Queue of events not used by a request yet. | |
| 108 final Queue<Result> _eventQueue = new Queue(); | |
| 109 | |
| 110 /// Queue of pending requests. | |
| 111 /// Access through methods below to ensure consistency. | |
|
nweiz
2015/06/18 23:44:26
Nit: add a newline above
Lasse Reichstein Nielsen
2015/06/30 10:34:13
Done.
| |
| 112 final Queue<_EventRequest> _requestQueue = new Queue(); | |
| 113 | |
| 114 /// Create a `StreamQueue` of the events of source. | |
|
nweiz
2015/06/18 23:44:26
"source" -> "[source]"
Lasse Reichstein Nielsen
2015/06/30 10:34:12
Done.
| |
| 115 /// | |
| 116 /// Allow prefetching [prefetch] events before pausing the source | |
| 117 /// stream even if there are no current requests for them. | |
| 118 /// The default is to pause immediately when there is no pending request. | |
| 119 /// Even if `prefetch` is greater than zero, the stream won't listened on | |
| 120 /// before the first request. | |
|
nweiz
2015/06/18 23:44:25
Consider allowing "prefetch: -1" to mean "never pa
Lasse Reichstein Nielsen
2015/06/30 10:34:14
I don't like -1 being magical, the alternative wou
| |
| 121 StreamQueue(Stream source, {int prefetch: 0}) | |
|
nweiz
2015/06/18 23:44:26
Consider making [prefetch] default to null and ass
Lasse Reichstein Nielsen
2015/06/30 10:34:12
True. I'll just remove it for now (effectively fix
| |
| 122 : _sourceStream = source, _prefetchCount = prefetch; | |
| 123 | |
| 124 /// Asks if the stream has any more events. | |
| 125 /// | |
| 126 /// Returns a future that completes with `true` if the stream has any | |
| 127 /// more events, whether data or error. | |
| 128 /// If the stream closes without producing any more events, the returned | |
| 129 /// future completes with `false`. | |
| 130 /// | |
| 131 /// Can be used before using [next] to avoid getting an error in the | |
| 132 /// future returned by `next` in the case where there are no more events. | |
| 133 Future<bool> get hasNext { | |
| 134 if (!_isClosed) { | |
| 135 _HasNextRequest hasNextRequest = new _HasNextRequest(); | |
|
nweiz
2015/06/18 23:44:26
Nit: "var" (also below)
Lasse Reichstein Nielsen
2015/06/30 10:34:12
Done.
| |
| 136 _addRequest(hasNextRequest); | |
| 137 return hasNextRequest.future; | |
| 138 } | |
| 139 throw _failClosed(); | |
| 140 } | |
| 141 | |
| 142 /// Requests the next (yet unrequested) event from the stream. | |
| 143 /// | |
| 144 /// When the requested event arrives, the returned future is completed with | |
| 145 /// the event. | |
| 146 /// If the event is a data event, the returned future completes | |
| 147 /// with its value. | |
| 148 /// If the event is an error event, the returned future completes with | |
| 149 /// its error and stack trace. | |
| 150 /// If the stream closes before an event arrives, the returned future | |
| 151 /// completes with a [StateError]. | |
| 152 /// | |
| 153 /// It's possible to have several pending [next] calls (or other requests), | |
| 154 /// and they will be completed in the order they were requested, by the | |
| 155 /// first events that were not used by previous requeusts. | |
|
nweiz
2015/06/18 23:44:26
"used" -> "consumed" (slightly more correct for re
Lasse Reichstein Nielsen
2015/06/30 10:34:14
Done.
| |
| 156 Future<T> get next { | |
| 157 if (!_isClosed) { | |
| 158 _NextRequest nextRequest = new _NextRequest<T>(); | |
| 159 _addRequest(nextRequest); | |
| 160 return nextRequest.future; | |
| 161 } | |
| 162 throw _failClosed(); | |
| 163 } | |
| 164 | |
| 165 /// Returns a stream of all the remaning events of the source stream. | |
| 166 /// | |
| 167 /// All requested [next], [skip] or [take] operations are completed | |
| 168 /// first, and then any remaining events are provided as events of | |
| 169 /// the returned stream. | |
| 170 /// | |
| 171 /// Using `rest` closes the stream events object. After getting the | |
|
nweiz
2015/06/18 23:44:26
"the stream events object" -> "this stream queue"
Lasse Reichstein Nielsen
2015/06/30 10:34:13
Done.
| |
| 172 /// `rest` the caller may no longer request other events, like | |
| 173 /// after calling [cancel]. | |
| 174 Stream<T> get rest { | |
| 175 if (_isClosed) { | |
| 176 throw _failClosed(); | |
| 177 } | |
| 178 var request = new _RestRequest<T>(this); | |
| 179 _isClosed = true; | |
| 180 _addRequest(request); | |
| 181 return request.stream; | |
| 182 } | |
| 183 | |
| 184 /// Skips the next [count] *data* events. | |
| 185 /// | |
| 186 /// The [count] must be non-negative. | |
| 187 /// | |
| 188 /// When successful, this is equivalent to using [take] | |
| 189 /// and ignoring the result. | |
| 190 /// | |
| 191 /// If an error occurs before `count` data events have been skipped, | |
| 192 /// the returned future completes with that error instead. | |
| 193 /// | |
| 194 /// If the stream closes before `count` data events, | |
| 195 /// the remaining unskipped event count is returned. | |
| 196 /// If the returned future completes with the integer `0`, | |
| 197 /// then all events were succssfully skipped. If the value | |
| 198 /// is greater than zero then the stream ended early. | |
| 199 Future<int> skip(int count) { | |
| 200 if (count < 0) throw new RangeError.range(count, 0, null, "count"); | |
| 201 if (!_isClosed) { | |
| 202 var request = new _SkipRequest(count); | |
| 203 _addRequest(request); | |
| 204 return request.future; | |
| 205 } | |
| 206 throw _failClosed(); | |
| 207 } | |
| 208 | |
| 209 /// Requests the next [count] data events as a list. | |
| 210 /// | |
| 211 /// The [count] must be non-negative. | |
| 212 /// | |
| 213 /// Equivalent to calling [next] `count` times and | |
| 214 /// storing the data values in a list. | |
| 215 /// | |
| 216 /// If an error occurs before `count` data events has | |
| 217 /// been collected, the returned future completes with | |
| 218 /// that error instead. | |
| 219 /// | |
| 220 /// If the stream closes before `count` data events, | |
| 221 /// the returned future completes with the list | |
| 222 /// of data collected so far. That is, the returned | |
| 223 /// list may have fewer than [count] elements. | |
| 224 Future<List<T>> take(int count) { | |
| 225 if (count < 0) throw new RangeError.range(count, 0, null, "count"); | |
| 226 if (!_isClosed) { | |
| 227 var request = new _TakeRequest<T>(count); | |
| 228 _addRequest(request); | |
| 229 return request.future; | |
| 230 } | |
| 231 throw _failClosed(); | |
| 232 } | |
| 233 | |
| 234 /// Cancels the underlying stream subscription. | |
| 235 /// | |
| 236 /// The cancel operation waits until all previously requested | |
| 237 /// events have been processed, then it cancels the subscription | |
| 238 /// providing the events. | |
| 239 /// | |
| 240 /// The returned future completes with the result of calling | |
| 241 /// `cancel`. | |
| 242 /// | |
| 243 /// After calling `cancel`, no further events can be requested. | |
| 244 /// None of [next], [rest], [skip], [take] or [cancel] may be | |
| 245 /// called again. | |
| 246 Future cancel() { | |
| 247 if (!_isClosed) { | |
| 248 _isClosed = true; | |
| 249 var request = new _CancelRequest(this); | |
| 250 _addRequest(request); | |
| 251 return request.future; | |
| 252 } | |
| 253 throw _failClosed(); | |
| 254 } | |
| 255 | |
| 256 /// Returns an error for when a request is made after cancel. | |
| 257 /// | |
| 258 /// Returns a [StateError] with a message saying that either | |
| 259 /// [cancel] or [rest] have already been called. | |
| 260 Error _failClosed() { | |
| 261 return new StateError("Already cancelled"); | |
| 262 } | |
| 263 | |
| 264 // Callbacks receiving the events of the source stream. | |
| 265 | |
| 266 void _onData(T data) { | |
| 267 _eventQueue.add(new Result.value(data)); | |
| 268 _checkQueues(); | |
| 269 } | |
| 270 | |
| 271 void _onError(error, StackTrace stack) { | |
| 272 _eventQueue.add(new Result.error(error, stack)); | |
| 273 _checkQueues(); | |
| 274 } | |
| 275 | |
| 276 void _onDone() { | |
| 277 _subscription = null; | |
| 278 _isDone = true; | |
| 279 _closeAllRequests(); | |
| 280 } | |
| 281 | |
| 282 // Request queue management. | |
| 283 | |
| 284 /// Add a new request to the queue. | |
|
nweiz
2015/06/18 23:44:26
"Add" -> "Adds"
Lasse Reichstein Nielsen
2015/06/30 10:34:13
Done.
| |
| 285 void _addRequest(_EventRequest request) { | |
| 286 if (_isDone) { | |
| 287 assert(_requestQueue.isEmpty); | |
| 288 if (!request.addEvents(_eventQueue)) { | |
| 289 request.close(_eventQueue); | |
| 290 } | |
| 291 return; | |
| 292 } | |
| 293 if (_requestQueue.isEmpty) { | |
| 294 if (request.addEvents(_eventQueue)) return; | |
| 295 _ensureListening(); | |
| 296 } | |
| 297 _requestQueue.add(request); | |
| 298 | |
|
nweiz
2015/06/18 23:44:26
Nit: extra newline
Lasse Reichstein Nielsen
2015/06/30 10:34:13
Done.
| |
| 299 } | |
| 300 | |
| 301 /// Ensures that we are listening on events from [_sourceStream]. | |
| 302 /// | |
| 303 /// Resumes subscription on [_sourceStream], or create it if necessary. | |
|
nweiz
2015/06/18 23:44:26
"create" -> "creates"
Lasse Reichstein Nielsen
2015/06/30 10:34:14
Done.
| |
| 304 StreamSubscription _ensureListening() { | |
| 305 assert(!_isDone); | |
| 306 if (_subscription == null) { | |
| 307 _subscription = | |
| 308 _sourceStream.listen(_onData, onError: _onError, onDone: _onDone); | |
| 309 } else { | |
| 310 _subscription.resume(); | |
| 311 } | |
| 312 } | |
| 313 | |
| 314 | |
| 315 /// Remove all requests and close them. | |
|
nweiz
2015/06/18 23:44:26
"Remove" -> "Removes", "close" -> "closes"
Lasse Reichstein Nielsen
2015/06/30 10:34:14
Done.
| |
| 316 /// | |
| 317 /// Used when the source stream is done. | |
| 318 /// After this, no further requests will be added to the queue, | |
| 319 /// requests are immediately served entirely by events already in the event | |
| 320 /// queue, if any. | |
| 321 void _closeAllRequests() { | |
| 322 assert(_isDone); | |
| 323 while (_requestQueue.isNotEmpty) { | |
| 324 var request = _requestQueue.removeFirst(); | |
| 325 if (!request.addEvents(_eventQueue)) { | |
|
nweiz
2015/06/18 23:44:26
Isn't this guaranteed to return false? [request.ad
Lasse Reichstein Nielsen
2015/06/30 10:34:12
It's only guaranteed to return false for the first
| |
| 326 request.close(_eventQueue); | |
| 327 } | |
| 328 } | |
| 329 } | |
| 330 | |
| 331 /// Matches events with requests. | |
| 332 /// | |
| 333 /// Called after receiving an event. | |
| 334 void _checkQueues() { | |
| 335 while (_requestQueue.isNotEmpty) { | |
| 336 if (_requestQueue.first.addEvents(_eventQueue)) { | |
| 337 _requestQueue.removeFirst(); | |
| 338 } else { | |
| 339 return; | |
| 340 } | |
| 341 } | |
| 342 if (!_isDone && _eventQueue.length >= _prefetchCount) { | |
| 343 _subscription.pause(); | |
| 344 } | |
| 345 } | |
| 346 | |
| 347 /// Extracts the subscription and makes the events object unusable. | |
|
nweiz
2015/06/18 23:44:26
"events object" -> "stream queue"
Lasse Reichstein Nielsen
2015/06/30 10:34:13
Done.
| |
| 348 /// | |
| 349 /// Can only be used by the very last request. | |
| 350 StreamSubscription _dispose() { | |
| 351 assert(_isClosed); | |
| 352 StreamSubscription subscription = _subscription; | |
|
nweiz
2015/06/18 23:44:25
Nit: "var"
Lasse Reichstein Nielsen
2015/06/30 10:34:13
Done.
| |
| 353 _subscription = null; | |
| 354 _isDone = true; | |
| 355 return subscription; | |
| 356 } | |
| 357 } | |
| 358 | |
| 359 /// Request object that receives events when they arrive, until fulfilled. | |
| 360 /// | |
| 361 /// Each request that cannot be fulfilled immediately is represented by | |
| 362 /// an `_EventRequest` object in the request queue. | |
| 363 /// | |
| 364 /// Events from the source stream are sent to the first request in the | |
| 365 /// queue until it reports itself as [isComplete]. | |
| 366 /// | |
| 367 /// When the first request in the queue `isComplete`, either when becoming | |
| 368 /// the first request or after receiving an event, its [close] methods is | |
| 369 /// called. | |
| 370 /// | |
| 371 /// The [close] method is also called immediately when the source stream | |
| 372 /// is done. | |
| 373 abstract class _EventRequest implements EventSink { | |
| 374 /// Handle available events. | |
| 375 /// | |
| 376 /// The available events are provided as a queue. The `addEvents` function | |
| 377 /// should only access the queue from the start, e.g., using [removeFirst]. | |
|
nweiz
2015/06/18 23:44:27
"access the queue from the start" -> "remove event
Lasse Reichstein Nielsen
2015/06/30 10:34:13
Done.
| |
| 378 /// | |
| 379 /// Returns `true` if if the request is completed, or `false` if it needs | |
|
nweiz
2015/06/18 23:44:27
"if if" -> "if"
Lasse Reichstein Nielsen
2015/06/30 10:34:13
Done.
| |
| 380 /// more events. | |
| 381 /// The call may keep events in the queue until the requeust is complete, | |
| 382 /// or it may remove them immediately. | |
| 383 /// | |
| 384 /// This method is called when a request reaches the front of the request | |
| 385 /// queue, and if it returns `false`, it's called again every time an event | |
|
nweiz
2015/06/18 23:44:25
"an event" -> "a new event"
Lasse Reichstein Nielsen
2015/06/30 10:34:13
Done.
| |
| 386 /// becomes available. | |
|
nweiz
2015/06/18 23:44:27
"and finally when the stream closes"
Lasse Reichstein Nielsen
2015/06/30 10:34:13
Done.
| |
| 387 bool addEvents(Queue<Result> events); | |
| 388 | |
| 389 /// Complete the request. | |
| 390 /// | |
| 391 /// This is called when the source stream is done before the request | |
| 392 /// had a chance to receive events. If there are any events available, | |
| 393 /// they are in the [events] queue. No further events will become available. | |
| 394 /// | |
| 395 /// The queue should only be accessed from the start, e.g., | |
| 396 /// using [removeFirst]. | |
| 397 /// | |
| 398 /// If the requests kept events in the queue after an [addEvents] call, | |
|
nweiz
2015/06/18 23:44:26
"requests" -> "request"
Lasse Reichstein Nielsen
2015/06/30 10:34:13
Done.
| |
| 399 /// it should remove them here. | |
| 400 void close(Queue<Result> events); | |
| 401 } | |
| 402 | |
| 403 /// Request for a [StreamQueue.next] call. | |
| 404 /// | |
| 405 /// Completes the returned future when receiving the first event, | |
| 406 /// and is then complete. | |
| 407 class _NextRequest<T> implements _EventRequest { | |
| 408 /// Completer for the future returned by [StreamQueue.next]. | |
| 409 /// | |
| 410 /// Set to `null` when it is completed, to mark it as already complete. | |
|
nweiz
2015/06/18 23:44:27
This is no longer accurate.
| |
| 411 final Completer _completer; | |
| 412 | |
| 413 _NextRequest() : _completer = new Completer<T>(); | |
|
nweiz
2015/06/18 23:44:26
Nit: assign _completer in the declaration
Lasse Reichstein Nielsen
2015/06/30 10:34:12
Done.
| |
| 414 | |
| 415 Future<T> get future => _completer.future; | |
| 416 | |
| 417 bool addEvents(Queue<Result> events) { | |
| 418 if (events.isEmpty) return false; | |
| 419 events.removeFirst().complete(_completer); | |
| 420 return true; | |
| 421 } | |
| 422 | |
| 423 void close(Queue<Result> events) { | |
| 424 _completer.completeError(new StateError("no elements")); | |
|
nweiz
2015/06/18 23:44:27
"no" -> "No"
Also include a stack trace here so t
Lasse Reichstein Nielsen
2015/06/30 10:34:14
Done.
| |
| 425 } | |
| 426 } | |
| 427 | |
| 428 /// Request for a [StreamQueue.skip] call. | |
| 429 class _SkipRequest implements _EventRequest { | |
| 430 /// Completer for the future returned by the skip call. | |
| 431 final Completer _completer = new Completer<int>(); | |
| 432 | |
| 433 /// Number of remaining events to skip. | |
| 434 /// | |
| 435 /// The request [isComplete] when the values reaches zero. | |
| 436 /// | |
| 437 /// Decremented when an event is seen. | |
| 438 /// Set to zero when an error is seen since errors abort the skip request. | |
| 439 int _eventsToSkip; | |
| 440 | |
| 441 _SkipRequest(this._eventsToSkip); | |
| 442 | |
| 443 /// The future completed when the correct number of events have been skipped. | |
| 444 Future get future => _completer.future; | |
| 445 | |
| 446 bool addEvents(Queue<Result> events) { | |
| 447 while (_eventsToSkip > 0) { | |
| 448 if (events.isEmpty) return false; | |
| 449 _eventsToSkip--; | |
| 450 var event = events.removeFirst(); | |
| 451 if (event.isError) { | |
| 452 event.complete(_completer); | |
| 453 return true; | |
| 454 } | |
| 455 } | |
| 456 _completer.complete(0); | |
| 457 return true; | |
| 458 } | |
| 459 | |
| 460 void close(Queue<Result> events) { | |
| 461 _completer.complete(_eventsToSkip); | |
| 462 } | |
| 463 } | |
| 464 | |
| 465 /// Request for a [StreamQueue.take] call. | |
| 466 class _TakeRequest<T> implements _EventRequest { | |
| 467 /// Completer for the future returned by the take call. | |
| 468 final Completer _completer; | |
| 469 | |
| 470 /// List collecting events until enough have been seen. | |
| 471 final List _list = <T>[]; | |
| 472 | |
| 473 /// Number of events to capture. | |
| 474 /// | |
| 475 /// The request [isComplete] when the length of [_list] reaches | |
| 476 /// this value. | |
| 477 final int _eventsToTake; | |
| 478 | |
| 479 _TakeRequest(this._eventsToTake) : _completer = new Completer<List<T>>(); | |
| 480 | |
| 481 /// The future completed when the correct number of events have been captured. | |
| 482 Future get future => _completer.future; | |
| 483 | |
| 484 bool addEvents(Queue<Events> events) { | |
| 485 while (_list.length < _eventsToTake) { | |
| 486 if (events.isEmpty) return false; | |
| 487 var result = events.removeFirst(); | |
| 488 if (result.isError) { | |
| 489 result.complete(_completer); | |
| 490 return true; | |
| 491 } | |
| 492 _list.add(result.asValue.value); | |
| 493 } | |
| 494 _completer.complete(_list); | |
| 495 return true; | |
| 496 } | |
| 497 | |
| 498 void close(Queue<Events> events) { | |
| 499 _completer.complete(_list); | |
| 500 } | |
| 501 } | |
| 502 | |
| 503 /// Request for a [StreamQueue.cancel] call. | |
| 504 /// | |
| 505 /// The request is always complete, it just waits in the request queue | |
| 506 /// until all previous events are fulfilled, then it cancels the stream events | |
| 507 /// subscription. | |
| 508 class _CancelRequest implements _EventRequest { | |
| 509 /// Completer for the future returned by the `cancel` call. | |
| 510 final Completer _completer = new Completer(); | |
| 511 | |
| 512 /// The [StreamQueue] object that has this request queued. | |
| 513 /// | |
| 514 /// When the event is completed, it needs to cancel the active subscription | |
| 515 /// of the `StreamQueue` object, if any. | |
| 516 final StreamQueue _streamQueue; | |
| 517 | |
| 518 _CancelRequest(this._streamQueue); | |
| 519 | |
| 520 /// The future completed when the cancel request is completed. | |
| 521 Future get future => _completer.future; | |
| 522 | |
| 523 bool addEvents(Queue<Result> events) { | |
| 524 _shutdown(); | |
| 525 return true; | |
| 526 } | |
| 527 | |
| 528 void close(_) { | |
| 529 _shutdown(); | |
| 530 } | |
| 531 | |
| 532 void _shutdown() { | |
| 533 if (_streamQueue._subscription == null) { | |
| 534 _completer.complete(); | |
| 535 } else { | |
| 536 _completer.complete(_streamQueue._dispose().cancel()); | |
| 537 } | |
| 538 } | |
| 539 } | |
| 540 | |
| 541 /// Request for a [StreamQueue.rest] call. | |
| 542 /// | |
| 543 /// The request is always complete, it just waits in the request queue | |
| 544 /// until all previous events are fulfilled, then it takes over the | |
| 545 /// stream events subscription and creates a stream from it. | |
| 546 class _RestRequest<T> implements _EventRequest { | |
| 547 /// Completer for the stream returned by the `rest` call. | |
| 548 final StreamCompleter _completer; | |
| 549 | |
| 550 /// The [StreamQueue] object that has this request queued. | |
| 551 /// | |
| 552 /// When the event is completed, it needs to cancel the active subscription | |
| 553 /// of the `StreamQueue` object, if any. | |
| 554 final StreamQueue _streamQueue; | |
| 555 _RestRequest(this._streamQueue) : _completer = new StreamCompleter<T>(); | |
|
nweiz
2015/06/18 23:44:27
Nit: newline above.
Also move [_completer]'s init
Lasse Reichstein Nielsen
2015/06/30 10:34:12
Done.
| |
| 556 | |
| 557 /// The future which will contain the remaining events of [_streamQueue]. | |
|
nweiz
2015/06/18 23:44:26
"future" -> "stream"
Lasse Reichstein Nielsen
2015/06/30 10:34:13
Done.
| |
| 558 Stream<T> get stream => _completer.stream; | |
| 559 | |
| 560 bool addEvents(Queue<Result> events) { | |
| 561 _completeStream(events); | |
| 562 return true; | |
| 563 } | |
| 564 | |
| 565 void close(Queue<Result> events) { | |
| 566 _completeStream(events); | |
| 567 } | |
| 568 | |
| 569 void _completeStream(Queue<Result> events) { | |
| 570 Stream getRestStream() { | |
|
nweiz
2015/06/18 23:44:26
Why is this a local method (as opposed to a privat
Lasse Reichstein Nielsen
2015/06/30 10:34:14
It's only used here, so I didn't see a need to giv
nweiz
2015/06/30 23:39:47
There are a lot of private methods at the top leve
Lasse Reichstein Nielsen
2015/07/01 08:24:56
Agree. Moved to private instance method.
| |
| 571 if (_streamQueue._isDone) { | |
| 572 return new Stream<T>.empty(); | |
|
nweiz
2015/06/18 23:44:25
If you're going to use this here, you'll need to u
Lasse Reichstein Nielsen
2015/06/30 10:34:13
Ack. I SOOO wish 1.11 would be released soon. All
nweiz
2015/06/30 23:39:47
Yeah, this is one of the burdens of developing in
| |
| 573 } | |
| 574 if (_streamQueue._subscription == null) { | |
| 575 return _streamQueue._sourceStream; | |
| 576 } | |
| 577 StreamSubscription subscription = _streamQueue._dispose(); | |
| 578 subscription.resume(); | |
| 579 return new SubscriptionStream<T>(subscription); | |
| 580 } | |
| 581 if (events.isEmpty) { | |
| 582 if (_streamQueue._isDone) { | |
| 583 _completer.setEmpty(); | |
| 584 } else { | |
| 585 _completer.setSourceStream(getRestStream()); | |
| 586 } | |
| 587 } else { | |
| 588 // There are prefetched events which needs to be added before the | |
| 589 // remaining stream. | |
| 590 StreamController controller = new StreamController<T>(); | |
|
nweiz
2015/06/18 23:44:27
Nit: "var"
| |
| 591 for (var event in events) event.addTo(controller); | |
|
nweiz
2015/06/18 23:44:25
I like this style of loop, but unfortunately it's
Lasse Reichstein Nielsen
2015/06/30 10:34:13
Do they want me to write
events.forEach((even
| |
| 592 controller.addStream(getRestStream(), cancelOnError: false) | |
| 593 .whenComplete(controller.close); | |
| 594 _completer.setSourceStream(controller.stream); | |
| 595 } | |
| 596 } | |
| 597 } | |
| 598 | |
| 599 /// Request for a [StreamQueue.hasNext] call. | |
| 600 /// | |
| 601 /// Completes the [future] with `true` if it sees any event, | |
| 602 /// but doesn't consume the event. | |
| 603 /// If the request is closed without seeing an event, then | |
| 604 /// the [future] is completed with `false`. | |
| 605 class _HasNextRequest<T> implements _EventRequest { | |
| 606 final Completer _completer = new Completer<bool>(); | |
| 607 | |
| 608 Future<bool> get future => _completer.future; | |
| 609 | |
| 610 bool addEvents(Queue<Result> events) { | |
| 611 if (events.isNotEmpty) { | |
| 612 _completer.complete(true); | |
| 613 return true; | |
| 614 } | |
| 615 return false; | |
| 616 } | |
| 617 | |
| 618 void close(_) { | |
| 619 _completer.complete(false); | |
| 620 } | |
| 621 } | |
| OLD | NEW |