Index: pkg/gcloud/lib/src/storage_impl.dart |
diff --git a/pkg/gcloud/lib/src/storage_impl.dart b/pkg/gcloud/lib/src/storage_impl.dart |
new file mode 100644 |
index 0000000000000000000000000000000000000000..21a9de92a48433fcc618424e439e9b1d66c6bb9c |
--- /dev/null |
+++ b/pkg/gcloud/lib/src/storage_impl.dart |
@@ -0,0 +1,605 @@ |
+// Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file |
+// for details. All rights reserved. Use of this source code is governed by a |
+// BSD-style license that can be found in the LICENSE file. |
+ |
+part of gcloud.storage; |
+ |
+const String _ABSOLUTE_PREFIX = 'gs://'; |
+const String _DIRECTORY_DELIMITER = 'gs://'; |
+ |
+/// Representation of an absolute name consisting of bucket name and object |
+/// name. |
+class _AbsoluteName { |
+ String bucketName; |
+ String objectName; |
+ |
+ _AbsoluteName.parse(String absoluteName) { |
+ if (!absoluteName.startsWith(_ABSOLUTE_PREFIX)) { |
+ throw new FormatException("Absolute name '$absoluteName' does not start " |
+ "with '$_ABSOLUTE_PREFIX'"); |
+ } |
+ int index = absoluteName.indexOf('/', _ABSOLUTE_PREFIX.length); |
+ if (index == -1 || index == _ABSOLUTE_PREFIX.length) { |
+ throw new FormatException("Absolute name '$absoluteName' does not have " |
+ "a bucket name"); |
+ } |
+ if (index == absoluteName.length - 1) { |
+ throw new FormatException("Absolute name '$absoluteName' does not have " |
+ "an object name"); |
+ } |
+ bucketName = absoluteName.substring(_ABSOLUTE_PREFIX.length, index); |
+ objectName = absoluteName.substring(index + 1); |
+ } |
+} |
+ |
+/// Storage API implementation providing access to buckets. |
+class _StorageImpl implements Storage { |
+ final String project; |
+ final storage_api.StorageApi _api; |
+ |
+ _StorageImpl(client, this.project) |
+ : _api = new storage_api.StorageApi(client); |
+ |
+ Future createBucket(String bucketName, |
+ {PredefinedAcl predefinedAcl, Acl acl}) { |
+ var bucket = new storage_api.Bucket()..name = bucketName; |
+ var predefinedName = predefinedAcl != null ? predefinedAcl._name : null; |
+ if (acl != null) { |
+ bucket.acl = acl._toBucketAccessControlList(); |
+ } |
+ return _api.buckets.insert(bucket, |
+ project, |
+ predefinedAcl: predefinedName) |
+ .then((bucket) => null); |
+ } |
+ |
+ Future deleteBucket(String bucketName) { |
+ return _api.buckets.delete(bucketName); |
+ } |
+ |
+ Bucket bucket(String bucketName, |
+ {PredefinedAcl defaultPredefinedObjectAcl, |
+ Acl defaultObjectAcl}) { |
+ return new _BucketImpl( |
+ this, bucketName, defaultPredefinedObjectAcl, defaultObjectAcl); |
+ } |
+ |
+ Future<bool> bucketExists(String bucketName) { |
+ notFoundError(e) => e is common.DetailedApiRequestError && e.status == 404; |
+ return _api.buckets.get(bucketName) |
+ .then((_) => true) |
+ .catchError((e) => false, test: notFoundError); |
+ |
+ } |
+ |
+ Future<BucketInfo> bucketInfo(String bucketName) { |
+ return _api.buckets.get(bucketName, projection: 'full') |
+ .then((bucket) => new _BucketInfoImpl(bucket)); |
+ } |
+ |
+ Stream<String> listBucketNames() { |
+ Future<Page<Bucket>> firstPage(pageSize) { |
+ return _listBuckets(pageSize, null) |
+ .then((response) => new _BucketPageImpl(this, pageSize, response)); |
+ } |
+ return new StreamFromPages<String>(firstPage).stream; |
+ } |
+ |
+ Future<Page<String>> pageBucketNames({int pageSize: 50}) { |
+ return _listBuckets(pageSize, null).then((response) { |
+ return new _BucketPageImpl(this, pageSize, response); |
+ }); |
+ } |
+ |
+ Future copyObject(String src, String dest) { |
+ var srcName = new _AbsoluteName.parse(src); |
+ var destName = new _AbsoluteName.parse(dest); |
+ return _api.objects.copy(null, |
+ srcName.bucketName, srcName.objectName, |
+ destName.bucketName, destName.objectName) |
+ .then((_) => null); |
+ } |
+ |
+ Future<storage_api.Buckets> _listBuckets(int pageSize, String nextPageToken) { |
+ return _api.buckets.list( |
+ project, |
+ maxResults: pageSize, |
+ pageToken: nextPageToken); |
+ } |
+} |
+ |
+class _BucketInfoImpl implements BucketInfo { |
+ final storage_api.Bucket _bucket; |
+ |
+ _BucketInfoImpl(this._bucket); |
+ |
+ String get bucketName => _bucket.name; |
+ |
+ String get etag => _bucket.etag; |
+ |
+ DateTime get created => _bucket.timeCreated; |
+ |
+ String get id => _bucket.id; |
+ |
+ Acl get acl => new Acl._fromBucketAcl(_bucket); |
+} |
+ |
+/// Bucket API implementation providing access to objects. |
+class _BucketImpl implements Bucket { |
+ final storage_api.StorageApi _api; |
+ PredefinedAcl _defaultPredefinedObjectAcl; |
+ Acl _defaultObjectAcl; |
+ final String bucketName; |
+ |
+ _BucketImpl(_StorageImpl storage, |
+ this.bucketName, |
+ this._defaultPredefinedObjectAcl, |
+ this._defaultObjectAcl) : |
+ this._api = storage._api; |
+ |
+ String absoluteObjectName(String objectName) { |
+ return '${_ABSOLUTE_PREFIX}$bucketName/$objectName'; |
+ } |
+ |
+ StreamSink<List<int>> write( |
+ String objectName, |
+ {int length, ObjectMetadata metadata, |
+ Acl acl, PredefinedAcl predefinedAcl, String contentType}) { |
+ storage_api.Object object; |
+ if (metadata == null) { |
+ metadata = new _ObjectMetadata(acl: acl, contentType: contentType); |
+ } else { |
+ if (acl != null) { |
+ metadata = metadata.replace(acl: acl); |
+ } |
+ if (contentType != null) { |
+ metadata = metadata.replace(contentType: contentType); |
+ } |
+ } |
+ _ObjectMetadata objectMetadata = metadata; |
+ object = objectMetadata._object; |
+ |
+ // If no predefined ACL is passed use the default (if any). |
+ var predefinedName; |
+ if (predefinedAcl != null || _defaultPredefinedObjectAcl != null) { |
+ var predefined = |
+ predefinedAcl != null ? predefinedAcl : _defaultPredefinedObjectAcl; |
+ predefinedName = predefined._name; |
+ } |
+ |
+ // If no ACL is passed use the default (if any). |
+ if (object.acl == null && _defaultObjectAcl != null) { |
+ object.acl = _defaultObjectAcl._toObjectAccessControlList(); |
+ } |
+ |
+ // Fill properties not passed in metadata. |
+ object.name = objectName; |
+ |
+ var sink = new _MediaUploadStreamSink( |
+ _api, bucketName, objectName, object, predefinedName, length); |
+ return sink; |
+ } |
+ |
+ Future writeBytes( |
+ String objectName, List<int> bytes, |
+ {ObjectMetadata metadata, |
+ Acl acl, PredefinedAcl predefinedAcl, String contentType}) { |
+ var sink = write(objectName, length: bytes.length, |
+ metadata: metadata, acl: acl, predefinedAcl: predefinedAcl, |
+ contentType: contentType); |
+ sink.add(bytes); |
+ return sink.close(); |
+ } |
+ |
+ Stream read(String objectName, {int offset: 0, int length}) { |
+ var controller = new StreamController(); |
+ _api.objects.get( |
+ bucketName, |
+ objectName, |
+ downloadOptions: common.DownloadOptions.FullMedia).then( |
+ (media) => media.stream.pipe(controller.sink)); |
+ return controller.stream; |
+ } |
+ |
+ Future<ObjectInfo> info(String objectName) { |
+ return _api.objects.get(bucketName, objectName, projection: 'full') |
+ .then((object) => new _ObjectInfoImpl(object)); |
+ } |
+ |
+ Future delete(String objectName) { |
+ return _api.objects.delete(bucketName, objectName); |
+ } |
+ |
+ Stream<BucketEntry> list({String prefix}) { |
+ Future<Page<Bucket>> firstPage(pageSize) { |
+ return _listObjects(bucketName, prefix, _DIRECTORY_DELIMITER, 50, null) |
+ .then((response) => new _ObjectPageImpl( |
+ this, prefix, pageSize, response)); |
+ } |
+ return new StreamFromPages<BucketEntry>(firstPage).stream; |
+ } |
+ |
+ Future<Page<BucketEntry>> page({String prefix, int pageSize: 50}) { |
+ return _listObjects( |
+ bucketName, prefix, _DIRECTORY_DELIMITER, pageSize, null) |
+ .then((response) { |
+ return new _ObjectPageImpl(this, prefix, pageSize, response); |
+ }); |
+ } |
+ |
+ Future updateMetadata(String objectName, ObjectMetadata metadata) { |
+ // TODO: support other ObjectMetadata implementations? |
+ _ObjectMetadata md = metadata; |
+ var object = md._object; |
+ if (md._object.acl == null && _defaultObjectAcl == null) { |
+ throw new ArgumentError('ACL is required for update'); |
+ } |
+ if (md.contentType == null) { |
+ throw new ArgumentError('Content-Type is required for update'); |
+ } |
+ if (md._object.acl == null) { |
+ md._object.acl = _defaultObjectAcl._toObjectAccessControlList(); |
+ } |
+ return _api.objects.update(object, bucketName, objectName); |
+ } |
+ |
+ Future<storage_api.Objects> _listObjects( |
+ String bucketName, String prefix, String delimiter, |
+ int pageSize, String nextPageToken) { |
+ return _api.objects.list( |
+ bucketName, |
+ prefix: prefix, |
+ delimiter: delimiter, |
+ maxResults: pageSize, |
+ pageToken: nextPageToken); |
+ } |
+} |
+ |
+class _BucketPageImpl implements Page<String> { |
+ final _StorageImpl _storage; |
+ final int _pageSize; |
+ final String _nextPageToken; |
+ final List<String> items; |
+ |
+ _BucketPageImpl(this._storage, this._pageSize, storage_api.Buckets response) |
+ : items = new List(response.items != null ? response.items.length : 0), |
+ _nextPageToken = response.nextPageToken { |
+ for (int i = 0; i < items.length; i++) { |
+ items[i] = response.items[i].name; |
+ } |
+ } |
+ |
+ bool get isLast => _nextPageToken == null; |
+ |
+ Future<Page<String>> next({int pageSize}) { |
+ if (isLast) return new Future.value(null); |
+ if (pageSize == null) pageSize = this._pageSize; |
+ |
+ return _storage._listBuckets(pageSize, _nextPageToken).then((response) { |
+ return new _BucketPageImpl(_storage, pageSize, response); |
+ }); |
+ } |
+} |
+ |
+class _ObjectPageImpl implements Page<BucketEntry> { |
+ final _BucketImpl _bucket; |
+ final String _prefix; |
+ final int _pageSize; |
+ final String _nextPageToken; |
+ final List<BucketEntry> items; |
+ |
+ _ObjectPageImpl( |
+ this._bucket, this._prefix, this._pageSize, |
+ storage_api.Objects response) |
+ : items = new List( |
+ (response.items != null ? response.items.length : 0) + |
+ (response.prefixes != null ? response.prefixes.length : 0)), |
+ _nextPageToken = response.nextPageToken { |
+ var prefixes = 0; |
+ if (response.prefixes != null) { |
+ for (int i = 0; i < response.prefixes.length; i++) { |
+ items[i] = new BucketEntry._directory(response.prefixes[i]); |
+ } |
+ prefixes = response.prefixes.length; |
+ } |
+ if (response.items != null) { |
+ for (int i = 0; i < response.items.length; i++) { |
+ items[prefixes + i] = new BucketEntry._object(response.items[i].name); |
+ } |
+ } |
+ } |
+ |
+ bool get isLast => _nextPageToken == null; |
+ |
+ Future<Page<BucketEntry>> next({int pageSize}) { |
+ if (isLast) return new Future.value(null); |
+ if (pageSize == null) pageSize = this._pageSize; |
+ |
+ return _bucket._listObjects( |
+ _bucket.bucketName, |
+ _prefix, |
+ _DIRECTORY_DELIMITER, |
+ pageSize, |
+ _nextPageToken).then((response) { |
+ return new _ObjectPageImpl( |
+ _bucket, _prefix, pageSize, response); |
+ }); |
+ } |
+} |
+ |
+class _ObjectGenerationImpl implements ObjectGeneration { |
+ final String objectGeneration; |
+ final int metaGeneration; |
+ |
+ _ObjectGenerationImpl(this.objectGeneration, this.metaGeneration); |
+} |
+ |
+class _ObjectInfoImpl implements ObjectInfo { |
+ final storage_api.Object _object; |
+ final ObjectMetadata _metadata; |
+ Uri _downloadLink; |
+ ObjectGeneration _generation; |
+ |
+ _ObjectInfoImpl(storage_api.Object object) : |
+ _object = object, _metadata = new _ObjectMetadata._(object); |
+ |
+ String get name => _object.name; |
+ |
+ int get length => int.parse(_object.size); |
+ |
+ DateTime get updated => _object.updated; |
+ |
+ String get etag => _object.etag; |
+ |
+ List<int> get md5Hash => |
+ crypto.CryptoUtils.base64StringToBytes(_object.md5Hash); |
+ |
+ int get crc32CChecksum { |
+ var list = crypto.CryptoUtils.base64StringToBytes(_object.crc32c); |
+ return (list[3] << 24) | (list[2] << 16) | (list[1] << 8) | list[0]; |
+ } |
+ |
+ Uri get downloadLink { |
+ if (_downloadLink == null) { |
+ _downloadLink = Uri.parse(_object.mediaLink); |
+ } |
+ return _downloadLink; |
+ } |
+ |
+ ObjectGeneration get generation { |
+ if (_generation == null) { |
+ _generation = new _ObjectGenerationImpl( |
+ _object.generation, int.parse(_object.metageneration)); |
+ } |
+ return _generation; |
+ } |
+ |
+ /// Additional metadata. |
+ ObjectMetadata get metadata => _metadata; |
+} |
+ |
+class _ObjectMetadata implements ObjectMetadata { |
+ final storage_api.Object _object; |
+ Acl _cachedAcl; |
+ ObjectGeneration _cachedGeneration; |
+ Map _cachedCustom; |
+ |
+ _ObjectMetadata({Acl acl, |
+ String contentType, |
+ String contentEncoding, |
+ String cacheControl, |
+ String contentDisposition, |
+ String contentLanguage, |
+ Map<String, String> custom}) |
+ : _object = new storage_api.Object() { |
+ _object.acl = acl != null ? acl._toObjectAccessControlList() : null; |
+ _object.contentType = contentType; |
+ _object.contentEncoding = contentEncoding; |
+ _object.cacheControl = cacheControl; |
+ _object.contentDisposition = contentDisposition; |
+ _object.contentLanguage = contentLanguage; |
+ if (custom != null) _object.metadata = custom; |
+ } |
+ |
+ _ObjectMetadata._(this._object); |
+ |
+ Acl get acl { |
+ if (_cachedAcl == null) { |
+ _cachedAcl = new Acl._fromObjectAcl(_object); |
+ } |
+ return _cachedAcl; |
+ } |
+ |
+ String get contentType => _object.contentType; |
+ |
+ String get contentEncoding => _object.contentEncoding; |
+ |
+ String get cacheControl => _object.cacheControl; |
+ |
+ String get contentDisposition => _object.contentDisposition; |
+ |
+ String get contentLanguage => _object.contentLanguage; |
+ |
+ ObjectGeneration get generation { |
+ if (_cachedGeneration == null) { |
+ _cachedGeneration = new ObjectGeneration( |
+ _object.generation, int.parse(_object.metageneration)); |
+ } |
+ return _cachedGeneration; |
+ } |
+ |
+ Map<String, String> get custom { |
+ if (_object.metadata == null) return null; |
+ if (_cachedCustom == null) { |
+ _cachedCustom = new UnmodifiableMapView(_object.metadata); |
+ } |
+ return _cachedCustom; |
+ } |
+ |
+ ObjectMetadata replace({Acl acl, |
+ String contentType, |
+ String contentEncoding, |
+ String cacheControl, |
+ String contentDisposition, |
+ String contentLanguage, |
+ Map<String, String> custom}) { |
+ return new _ObjectMetadata( |
+ acl: acl != null ? acl : this.acl, |
+ contentType: contentType != null ? contentType : this.contentType, |
+ contentEncoding: contentEncoding != null ? contentEncoding |
+ : this.contentEncoding, |
+ cacheControl: cacheControl != null ? cacheControl : this.cacheControl, |
+ contentDisposition: contentDisposition != null ? contentDisposition |
+ : this.contentEncoding, |
+ contentLanguage: contentLanguage != null ? contentLanguage |
+ : this.contentEncoding, |
+ custom: custom != null ? new Map.from(custom) : this.custom); |
+ } |
+} |
+ |
+/// Implementation of StreamSink which handles Google media upload. |
+/// It provides a StreamSink and logic which selects whether to use normal |
+/// media upload (multipart mime) or resumable media upload. |
+class _MediaUploadStreamSink implements StreamSink<List<int>> { |
+ static const int _DEFAULT_MAX_NORMAL_UPLOAD_LENGTH = 1024 * 1024; |
+ final storage_api.StorageApi _api; |
+ final String _bucketName; |
+ final String _objectName; |
+ final storage_api.Object _object; |
+ final String _predefinedAcl; |
+ final int _length; |
+ final int _maxNormalUploadLength; |
+ int _bufferLength = 0; |
+ final List<List<int>> buffer = new List<List<int>>(); |
+ final StreamController _controller = new StreamController(sync: true); |
+ StreamSubscription _subscription; |
+ StreamController _resumableController; |
+ final _doneCompleter = new Completer(); |
+ |
+ static const int _STATE_LENGTH_KNOWN = 0; |
+ static const int _STATE_PROBING_LENGTH = 1; |
+ static const int _STATE_DECIDED_RESUMABLE = 2; |
+ int _state; |
+ |
+ _MediaUploadStreamSink( |
+ this._api, this._bucketName, this._objectName, this._object, |
+ this._predefinedAcl, this._length, |
+ [this._maxNormalUploadLength = _DEFAULT_MAX_NORMAL_UPLOAD_LENGTH]) { |
+ if (_length != null) { |
+ // If the length is known in advance decide on the upload strategy |
+ // immediately |
+ _state = _STATE_LENGTH_KNOWN; |
+ if (_length <= _maxNormalUploadLength) { |
+ _startNormalUpload(_controller.stream, _length); |
+ } else { |
+ _startResumableUpload(_controller.stream, _length); |
+ } |
+ } else { |
+ _state = _STATE_PROBING_LENGTH; |
+ // If the length is not known in advance decide on the upload strategy |
+ // later. Start buffering until enough data has been read to decide. |
+ _subscription = _controller.stream.listen( |
+ _onData, onDone: _onDone, onError: _onError); |
+ } |
+ } |
+ |
+ void add(List<int> event) { |
+ _controller.add(event); |
+ } |
+ |
+ void addError(errorEvent, [StackTrace stackTrace]) { |
+ _controller.addError(errorEvent, stackTrace); |
+ } |
+ |
+ Future addStream(Stream<List<int>> stream) { |
+ return _controller.addStream(stream); |
+ } |
+ |
+ Future close() { |
+ _controller.close(); |
+ return _doneCompleter.future; |
+ } |
+ |
+ Future get done => _doneCompleter.future; |
+ |
+ _onData(List<int> data) { |
+ assert(_state != _STATE_LENGTH_KNOWN); |
+ if (_state == _STATE_PROBING_LENGTH) { |
+ buffer.add(data); |
+ _bufferLength += data.length; |
+ if (_bufferLength > _maxNormalUploadLength) { |
+ // Start resumable upload. |
+ // TODO: Avoid using another stream-controller. |
+ _resumableController = new StreamController(sync: true); |
+ buffer.forEach(_resumableController.add); |
+ var media = new common.Media(_resumableController.stream, null); |
+ _startResumableUpload(_resumableController.stream, _length); |
+ _state = _STATE_DECIDED_RESUMABLE; |
+ } |
+ } else { |
+ assert(_state == _STATE_DECIDED_RESUMABLE); |
+ _resumableController.add(data); |
+ } |
+ } |
+ |
+ _onDone() { |
+ if (_state == _STATE_PROBING_LENGTH) { |
+ // As the data is already cached don't bother to wait on somebody |
+ // listening on the stream before adding the data. |
+ var controller = new StreamController(); |
+ buffer.forEach(controller.add); |
+ controller.close(); |
+ _startNormalUpload(controller.stream, _bufferLength); |
+ } else { |
+ _resumableController.close(); |
+ } |
+ } |
+ |
+ _onError(e, s) { |
+ // If still deciding on the strategy complete with error. Otherwise |
+ // forward the error for default processing. |
+ if (_state == _STATE_PROBING_LENGTH) { |
+ _completeError(e, s); |
+ } else { |
+ _resumableController.addError(e, s); |
+ } |
+ } |
+ |
+ _completeError(e, s) { |
+ if (_state != _STATE_LENGTH_KNOWN) { |
+ // Always cancel subscription on error. |
+ _subscription.cancel(); |
+ } |
+ _doneCompleter.completeError(e, s); |
+ } |
+ |
+ void _startNormalUpload(Stream stream, int length) { |
+ var contentType = _object.contentType != null |
+ ? _object.contentType : 'application/octet-stream'; |
+ var media = new common.Media(stream, length, contentType: contentType); |
+ _api.objects.insert(_object, |
+ _bucketName, |
+ name: _objectName, |
+ predefinedAcl: _predefinedAcl, |
+ uploadMedia: media, |
+ uploadOptions: common.UploadOptions.Default) |
+ .then((response) { |
+ _doneCompleter.complete(new _ObjectInfoImpl(response)); |
+ }, onError: _completeError); |
+ } |
+ |
+ void _startResumableUpload(Stream stream, int length) { |
+ var contentType = _object.contentType != null |
+ ? _object.contentType : 'application/octet-stream'; |
+ var media = new common.Media(stream, length, contentType: contentType); |
+ _api.objects.insert(_object, |
+ _bucketName, |
+ name: _objectName, |
+ predefinedAcl: _predefinedAcl, |
+ uploadMedia: media, |
+ uploadOptions: common.UploadOptions.Resumable) |
+ .then((response) { |
+ _doneCompleter.complete(new _ObjectInfoImpl(response)); |
+ }, onError: _completeError); |
+ } |
+} |