| Index: pkg/gcloud/lib/common.dart
|
| diff --git a/pkg/gcloud/lib/common.dart b/pkg/gcloud/lib/common.dart
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..430e3dfd1672e0519582d180c93576af50739e3c
|
| --- /dev/null
|
| +++ b/pkg/gcloud/lib/common.dart
|
| @@ -0,0 +1,85 @@
|
| +// Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file
|
| +// for details. All rights reserved. Use of this source code is governed by a
|
| +// BSD-style license that can be found in the LICENSE file.
|
| +
|
| +library gcloud.common;
|
| +
|
| +import 'dart:async';
|
| +
|
| +/// A single page of paged results from a query.
|
| +///
|
| +/// Use `next` to move to the next page. If this is the last page `next`
|
| +/// completes with `null`
|
| +abstract class Page<T> {
|
| + /// The items in this page.
|
| + List<T> get items;
|
| +
|
| + /// Whether this is the last page of results.
|
| + bool get isLast;
|
| +
|
| + /// Move to the next page.
|
| + ///
|
| + /// The future returned completes with the next page or results.
|
| + ///
|
| + /// If [next] is called on the last page the returned future completes
|
| + /// with `null`.
|
| + Future<Page<T>> next({int pageSize});
|
| +}
|
| +
|
| +typedef Future<Page<T>> FirstPageProvider<T>(int pageSize);
|
| +
|
| +/// Helper class to turn a series of pages into a stream.
|
| +class StreamFromPages<T> {
|
| + static const int _PAGE_SIZE = 50;
|
| + final FirstPageProvider _firstPageProvider;
|
| + bool _pendingRequest = false;
|
| + bool _paused = false;
|
| + bool _cancelled = false;
|
| + Page _currentPage;
|
| + StreamController _controller;
|
| +
|
| + StreamFromPages(this._firstPageProvider) {
|
| + _controller = new StreamController(sync: true, onListen: _onListen,
|
| + onPause: _onPause, onResume: _onResume,
|
| + onCancel: _onCancel);
|
| + }
|
| +
|
| + Stream<T> get stream => _controller.stream;
|
| +
|
| + void _handleError(e, s) {
|
| + _controller.addError(e, s);
|
| + _controller.close();
|
| + }
|
| +
|
| + void _handlePage(Page<T> page) {
|
| + if (_cancelled) return;
|
| + _pendingRequest = false;
|
| + _currentPage = page;
|
| + page.items.forEach(_controller.add);
|
| + if (page.isLast) {
|
| + _controller.close();
|
| + } else if (!_paused && !_cancelled) {
|
| + page.next().then(_handlePage, onError: _handleError);
|
| + }
|
| + }
|
| +
|
| + _onListen() {
|
| + int pageSize = _PAGE_SIZE;
|
| + _pendingRequest = true;
|
| + _firstPageProvider(pageSize).then(_handlePage, onError: _handleError);
|
| + }
|
| +
|
| + _onPause() { _paused = true; }
|
| +
|
| + _onResume() {
|
| + _paused = false;
|
| + if (_pendingRequest) return;
|
| + _pendingRequest = true;
|
| + _currentPage.next().then(_handlePage, onError: _handleError);
|
| + }
|
| +
|
| + _onCancel() {
|
| + _cancelled = true;
|
| + }
|
| +
|
| +}
|
|
|