Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(411)

Side by Side Diff: pkg/gcloud/lib/src/storage_impl.dart

Issue 804973002: Add appengine/gcloud/mustache dependencies. (Closed) Base URL: git@github.com:dart-lang/pub-dartlang-dart.git@master
Patch Set: Added AUTHORS/LICENSE/PATENTS files Created 6 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « pkg/gcloud/lib/src/db/models.dart ('k') | pkg/gcloud/lib/storage.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file.
4
5 part of gcloud.storage;
6
7 const String _ABSOLUTE_PREFIX = 'gs://';
8 const String _DIRECTORY_DELIMITER = 'gs://';
9
10 /// Representation of an absolute name consisting of bucket name and object
11 /// name.
12 class _AbsoluteName {
13 String bucketName;
14 String objectName;
15
16 _AbsoluteName.parse(String absoluteName) {
17 if (!absoluteName.startsWith(_ABSOLUTE_PREFIX)) {
18 throw new FormatException("Absolute name '$absoluteName' does not start "
19 "with '$_ABSOLUTE_PREFIX'");
20 }
21 int index = absoluteName.indexOf('/', _ABSOLUTE_PREFIX.length);
22 if (index == -1 || index == _ABSOLUTE_PREFIX.length) {
23 throw new FormatException("Absolute name '$absoluteName' does not have "
24 "a bucket name");
25 }
26 if (index == absoluteName.length - 1) {
27 throw new FormatException("Absolute name '$absoluteName' does not have "
28 "an object name");
29 }
30 bucketName = absoluteName.substring(_ABSOLUTE_PREFIX.length, index);
31 objectName = absoluteName.substring(index + 1);
32 }
33 }
34
35 /// Storage API implementation providing access to buckets.
36 class _StorageImpl implements Storage {
37 final String project;
38 final storage_api.StorageApi _api;
39
40 _StorageImpl(client, this.project)
41 : _api = new storage_api.StorageApi(client);
42
43 Future createBucket(String bucketName,
44 {PredefinedAcl predefinedAcl, Acl acl}) {
45 var bucket = new storage_api.Bucket()..name = bucketName;
46 var predefinedName = predefinedAcl != null ? predefinedAcl._name : null;
47 if (acl != null) {
48 bucket.acl = acl._toBucketAccessControlList();
49 }
50 return _api.buckets.insert(bucket,
51 project,
52 predefinedAcl: predefinedName)
53 .then((bucket) => null);
54 }
55
56 Future deleteBucket(String bucketName) {
57 return _api.buckets.delete(bucketName);
58 }
59
60 Bucket bucket(String bucketName,
61 {PredefinedAcl defaultPredefinedObjectAcl,
62 Acl defaultObjectAcl}) {
63 return new _BucketImpl(
64 this, bucketName, defaultPredefinedObjectAcl, defaultObjectAcl);
65 }
66
67 Future<bool> bucketExists(String bucketName) {
68 notFoundError(e) => e is common.DetailedApiRequestError && e.status == 404;
69 return _api.buckets.get(bucketName)
70 .then((_) => true)
71 .catchError((e) => false, test: notFoundError);
72
73 }
74
75 Future<BucketInfo> bucketInfo(String bucketName) {
76 return _api.buckets.get(bucketName, projection: 'full')
77 .then((bucket) => new _BucketInfoImpl(bucket));
78 }
79
80 Stream<String> listBucketNames() {
81 Future<Page<Bucket>> firstPage(pageSize) {
82 return _listBuckets(pageSize, null)
83 .then((response) => new _BucketPageImpl(this, pageSize, response));
84 }
85 return new StreamFromPages<String>(firstPage).stream;
86 }
87
88 Future<Page<String>> pageBucketNames({int pageSize: 50}) {
89 return _listBuckets(pageSize, null).then((response) {
90 return new _BucketPageImpl(this, pageSize, response);
91 });
92 }
93
94 Future copyObject(String src, String dest) {
95 var srcName = new _AbsoluteName.parse(src);
96 var destName = new _AbsoluteName.parse(dest);
97 return _api.objects.copy(null,
98 srcName.bucketName, srcName.objectName,
99 destName.bucketName, destName.objectName)
100 .then((_) => null);
101 }
102
103 Future<storage_api.Buckets> _listBuckets(int pageSize, String nextPageToken) {
104 return _api.buckets.list(
105 project,
106 maxResults: pageSize,
107 pageToken: nextPageToken);
108 }
109 }
110
111 class _BucketInfoImpl implements BucketInfo {
112 final storage_api.Bucket _bucket;
113
114 _BucketInfoImpl(this._bucket);
115
116 String get bucketName => _bucket.name;
117
118 String get etag => _bucket.etag;
119
120 DateTime get created => _bucket.timeCreated;
121
122 String get id => _bucket.id;
123
124 Acl get acl => new Acl._fromBucketAcl(_bucket);
125 }
126
127 /// Bucket API implementation providing access to objects.
128 class _BucketImpl implements Bucket {
129 final storage_api.StorageApi _api;
130 PredefinedAcl _defaultPredefinedObjectAcl;
131 Acl _defaultObjectAcl;
132 final String bucketName;
133
134 _BucketImpl(_StorageImpl storage,
135 this.bucketName,
136 this._defaultPredefinedObjectAcl,
137 this._defaultObjectAcl) :
138 this._api = storage._api;
139
140 String absoluteObjectName(String objectName) {
141 return '${_ABSOLUTE_PREFIX}$bucketName/$objectName';
142 }
143
144 StreamSink<List<int>> write(
145 String objectName,
146 {int length, ObjectMetadata metadata,
147 Acl acl, PredefinedAcl predefinedAcl, String contentType}) {
148 storage_api.Object object;
149 if (metadata == null) {
150 metadata = new _ObjectMetadata(acl: acl, contentType: contentType);
151 } else {
152 if (acl != null) {
153 metadata = metadata.replace(acl: acl);
154 }
155 if (contentType != null) {
156 metadata = metadata.replace(contentType: contentType);
157 }
158 }
159 _ObjectMetadata objectMetadata = metadata;
160 object = objectMetadata._object;
161
162 // If no predefined ACL is passed use the default (if any).
163 var predefinedName;
164 if (predefinedAcl != null || _defaultPredefinedObjectAcl != null) {
165 var predefined =
166 predefinedAcl != null ? predefinedAcl : _defaultPredefinedObjectAcl;
167 predefinedName = predefined._name;
168 }
169
170 // If no ACL is passed use the default (if any).
171 if (object.acl == null && _defaultObjectAcl != null) {
172 object.acl = _defaultObjectAcl._toObjectAccessControlList();
173 }
174
175 // Fill properties not passed in metadata.
176 object.name = objectName;
177
178 var sink = new _MediaUploadStreamSink(
179 _api, bucketName, objectName, object, predefinedName, length);
180 return sink;
181 }
182
183 Future writeBytes(
184 String objectName, List<int> bytes,
185 {ObjectMetadata metadata,
186 Acl acl, PredefinedAcl predefinedAcl, String contentType}) {
187 var sink = write(objectName, length: bytes.length,
188 metadata: metadata, acl: acl, predefinedAcl: predefinedAcl,
189 contentType: contentType);
190 sink.add(bytes);
191 return sink.close();
192 }
193
194 Stream read(String objectName, {int offset: 0, int length}) {
195 var controller = new StreamController();
196 _api.objects.get(
197 bucketName,
198 objectName,
199 downloadOptions: common.DownloadOptions.FullMedia).then(
200 (media) => media.stream.pipe(controller.sink));
201 return controller.stream;
202 }
203
204 Future<ObjectInfo> info(String objectName) {
205 return _api.objects.get(bucketName, objectName, projection: 'full')
206 .then((object) => new _ObjectInfoImpl(object));
207 }
208
209 Future delete(String objectName) {
210 return _api.objects.delete(bucketName, objectName);
211 }
212
213 Stream<BucketEntry> list({String prefix}) {
214 Future<Page<Bucket>> firstPage(pageSize) {
215 return _listObjects(bucketName, prefix, _DIRECTORY_DELIMITER, 50, null)
216 .then((response) => new _ObjectPageImpl(
217 this, prefix, pageSize, response));
218 }
219 return new StreamFromPages<BucketEntry>(firstPage).stream;
220 }
221
222 Future<Page<BucketEntry>> page({String prefix, int pageSize: 50}) {
223 return _listObjects(
224 bucketName, prefix, _DIRECTORY_DELIMITER, pageSize, null)
225 .then((response) {
226 return new _ObjectPageImpl(this, prefix, pageSize, response);
227 });
228 }
229
230 Future updateMetadata(String objectName, ObjectMetadata metadata) {
231 // TODO: support other ObjectMetadata implementations?
232 _ObjectMetadata md = metadata;
233 var object = md._object;
234 if (md._object.acl == null && _defaultObjectAcl == null) {
235 throw new ArgumentError('ACL is required for update');
236 }
237 if (md.contentType == null) {
238 throw new ArgumentError('Content-Type is required for update');
239 }
240 if (md._object.acl == null) {
241 md._object.acl = _defaultObjectAcl._toObjectAccessControlList();
242 }
243 return _api.objects.update(object, bucketName, objectName);
244 }
245
246 Future<storage_api.Objects> _listObjects(
247 String bucketName, String prefix, String delimiter,
248 int pageSize, String nextPageToken) {
249 return _api.objects.list(
250 bucketName,
251 prefix: prefix,
252 delimiter: delimiter,
253 maxResults: pageSize,
254 pageToken: nextPageToken);
255 }
256 }
257
258 class _BucketPageImpl implements Page<String> {
259 final _StorageImpl _storage;
260 final int _pageSize;
261 final String _nextPageToken;
262 final List<String> items;
263
264 _BucketPageImpl(this._storage, this._pageSize, storage_api.Buckets response)
265 : items = new List(response.items != null ? response.items.length : 0),
266 _nextPageToken = response.nextPageToken {
267 for (int i = 0; i < items.length; i++) {
268 items[i] = response.items[i].name;
269 }
270 }
271
272 bool get isLast => _nextPageToken == null;
273
274 Future<Page<String>> next({int pageSize}) {
275 if (isLast) return new Future.value(null);
276 if (pageSize == null) pageSize = this._pageSize;
277
278 return _storage._listBuckets(pageSize, _nextPageToken).then((response) {
279 return new _BucketPageImpl(_storage, pageSize, response);
280 });
281 }
282 }
283
284 class _ObjectPageImpl implements Page<BucketEntry> {
285 final _BucketImpl _bucket;
286 final String _prefix;
287 final int _pageSize;
288 final String _nextPageToken;
289 final List<BucketEntry> items;
290
291 _ObjectPageImpl(
292 this._bucket, this._prefix, this._pageSize,
293 storage_api.Objects response)
294 : items = new List(
295 (response.items != null ? response.items.length : 0) +
296 (response.prefixes != null ? response.prefixes.length : 0)),
297 _nextPageToken = response.nextPageToken {
298 var prefixes = 0;
299 if (response.prefixes != null) {
300 for (int i = 0; i < response.prefixes.length; i++) {
301 items[i] = new BucketEntry._directory(response.prefixes[i]);
302 }
303 prefixes = response.prefixes.length;
304 }
305 if (response.items != null) {
306 for (int i = 0; i < response.items.length; i++) {
307 items[prefixes + i] = new BucketEntry._object(response.items[i].name);
308 }
309 }
310 }
311
312 bool get isLast => _nextPageToken == null;
313
314 Future<Page<BucketEntry>> next({int pageSize}) {
315 if (isLast) return new Future.value(null);
316 if (pageSize == null) pageSize = this._pageSize;
317
318 return _bucket._listObjects(
319 _bucket.bucketName,
320 _prefix,
321 _DIRECTORY_DELIMITER,
322 pageSize,
323 _nextPageToken).then((response) {
324 return new _ObjectPageImpl(
325 _bucket, _prefix, pageSize, response);
326 });
327 }
328 }
329
330 class _ObjectGenerationImpl implements ObjectGeneration {
331 final String objectGeneration;
332 final int metaGeneration;
333
334 _ObjectGenerationImpl(this.objectGeneration, this.metaGeneration);
335 }
336
337 class _ObjectInfoImpl implements ObjectInfo {
338 final storage_api.Object _object;
339 final ObjectMetadata _metadata;
340 Uri _downloadLink;
341 ObjectGeneration _generation;
342
343 _ObjectInfoImpl(storage_api.Object object) :
344 _object = object, _metadata = new _ObjectMetadata._(object);
345
346 String get name => _object.name;
347
348 int get length => int.parse(_object.size);
349
350 DateTime get updated => _object.updated;
351
352 String get etag => _object.etag;
353
354 List<int> get md5Hash =>
355 crypto.CryptoUtils.base64StringToBytes(_object.md5Hash);
356
357 int get crc32CChecksum {
358 var list = crypto.CryptoUtils.base64StringToBytes(_object.crc32c);
359 return (list[3] << 24) | (list[2] << 16) | (list[1] << 8) | list[0];
360 }
361
362 Uri get downloadLink {
363 if (_downloadLink == null) {
364 _downloadLink = Uri.parse(_object.mediaLink);
365 }
366 return _downloadLink;
367 }
368
369 ObjectGeneration get generation {
370 if (_generation == null) {
371 _generation = new _ObjectGenerationImpl(
372 _object.generation, int.parse(_object.metageneration));
373 }
374 return _generation;
375 }
376
377 /// Additional metadata.
378 ObjectMetadata get metadata => _metadata;
379 }
380
381 class _ObjectMetadata implements ObjectMetadata {
382 final storage_api.Object _object;
383 Acl _cachedAcl;
384 ObjectGeneration _cachedGeneration;
385 Map _cachedCustom;
386
387 _ObjectMetadata({Acl acl,
388 String contentType,
389 String contentEncoding,
390 String cacheControl,
391 String contentDisposition,
392 String contentLanguage,
393 Map<String, String> custom})
394 : _object = new storage_api.Object() {
395 _object.acl = acl != null ? acl._toObjectAccessControlList() : null;
396 _object.contentType = contentType;
397 _object.contentEncoding = contentEncoding;
398 _object.cacheControl = cacheControl;
399 _object.contentDisposition = contentDisposition;
400 _object.contentLanguage = contentLanguage;
401 if (custom != null) _object.metadata = custom;
402 }
403
404 _ObjectMetadata._(this._object);
405
406 Acl get acl {
407 if (_cachedAcl == null) {
408 _cachedAcl = new Acl._fromObjectAcl(_object);
409 }
410 return _cachedAcl;
411 }
412
413 String get contentType => _object.contentType;
414
415 String get contentEncoding => _object.contentEncoding;
416
417 String get cacheControl => _object.cacheControl;
418
419 String get contentDisposition => _object.contentDisposition;
420
421 String get contentLanguage => _object.contentLanguage;
422
423 ObjectGeneration get generation {
424 if (_cachedGeneration == null) {
425 _cachedGeneration = new ObjectGeneration(
426 _object.generation, int.parse(_object.metageneration));
427 }
428 return _cachedGeneration;
429 }
430
431 Map<String, String> get custom {
432 if (_object.metadata == null) return null;
433 if (_cachedCustom == null) {
434 _cachedCustom = new UnmodifiableMapView(_object.metadata);
435 }
436 return _cachedCustom;
437 }
438
439 ObjectMetadata replace({Acl acl,
440 String contentType,
441 String contentEncoding,
442 String cacheControl,
443 String contentDisposition,
444 String contentLanguage,
445 Map<String, String> custom}) {
446 return new _ObjectMetadata(
447 acl: acl != null ? acl : this.acl,
448 contentType: contentType != null ? contentType : this.contentType,
449 contentEncoding: contentEncoding != null ? contentEncoding
450 : this.contentEncoding,
451 cacheControl: cacheControl != null ? cacheControl : this.cacheControl,
452 contentDisposition: contentDisposition != null ? contentDisposition
453 : this.contentEncoding,
454 contentLanguage: contentLanguage != null ? contentLanguage
455 : this.contentEncoding,
456 custom: custom != null ? new Map.from(custom) : this.custom);
457 }
458 }
459
460 /// Implementation of StreamSink which handles Google media upload.
461 /// It provides a StreamSink and logic which selects whether to use normal
462 /// media upload (multipart mime) or resumable media upload.
463 class _MediaUploadStreamSink implements StreamSink<List<int>> {
464 static const int _DEFAULT_MAX_NORMAL_UPLOAD_LENGTH = 1024 * 1024;
465 final storage_api.StorageApi _api;
466 final String _bucketName;
467 final String _objectName;
468 final storage_api.Object _object;
469 final String _predefinedAcl;
470 final int _length;
471 final int _maxNormalUploadLength;
472 int _bufferLength = 0;
473 final List<List<int>> buffer = new List<List<int>>();
474 final StreamController _controller = new StreamController(sync: true);
475 StreamSubscription _subscription;
476 StreamController _resumableController;
477 final _doneCompleter = new Completer();
478
479 static const int _STATE_LENGTH_KNOWN = 0;
480 static const int _STATE_PROBING_LENGTH = 1;
481 static const int _STATE_DECIDED_RESUMABLE = 2;
482 int _state;
483
484 _MediaUploadStreamSink(
485 this._api, this._bucketName, this._objectName, this._object,
486 this._predefinedAcl, this._length,
487 [this._maxNormalUploadLength = _DEFAULT_MAX_NORMAL_UPLOAD_LENGTH]) {
488 if (_length != null) {
489 // If the length is known in advance decide on the upload strategy
490 // immediately
491 _state = _STATE_LENGTH_KNOWN;
492 if (_length <= _maxNormalUploadLength) {
493 _startNormalUpload(_controller.stream, _length);
494 } else {
495 _startResumableUpload(_controller.stream, _length);
496 }
497 } else {
498 _state = _STATE_PROBING_LENGTH;
499 // If the length is not known in advance decide on the upload strategy
500 // later. Start buffering until enough data has been read to decide.
501 _subscription = _controller.stream.listen(
502 _onData, onDone: _onDone, onError: _onError);
503 }
504 }
505
506 void add(List<int> event) {
507 _controller.add(event);
508 }
509
510 void addError(errorEvent, [StackTrace stackTrace]) {
511 _controller.addError(errorEvent, stackTrace);
512 }
513
514 Future addStream(Stream<List<int>> stream) {
515 return _controller.addStream(stream);
516 }
517
518 Future close() {
519 _controller.close();
520 return _doneCompleter.future;
521 }
522
523 Future get done => _doneCompleter.future;
524
525 _onData(List<int> data) {
526 assert(_state != _STATE_LENGTH_KNOWN);
527 if (_state == _STATE_PROBING_LENGTH) {
528 buffer.add(data);
529 _bufferLength += data.length;
530 if (_bufferLength > _maxNormalUploadLength) {
531 // Start resumable upload.
532 // TODO: Avoid using another stream-controller.
533 _resumableController = new StreamController(sync: true);
534 buffer.forEach(_resumableController.add);
535 var media = new common.Media(_resumableController.stream, null);
536 _startResumableUpload(_resumableController.stream, _length);
537 _state = _STATE_DECIDED_RESUMABLE;
538 }
539 } else {
540 assert(_state == _STATE_DECIDED_RESUMABLE);
541 _resumableController.add(data);
542 }
543 }
544
545 _onDone() {
546 if (_state == _STATE_PROBING_LENGTH) {
547 // As the data is already cached don't bother to wait on somebody
548 // listening on the stream before adding the data.
549 var controller = new StreamController();
550 buffer.forEach(controller.add);
551 controller.close();
552 _startNormalUpload(controller.stream, _bufferLength);
553 } else {
554 _resumableController.close();
555 }
556 }
557
558 _onError(e, s) {
559 // If still deciding on the strategy complete with error. Otherwise
560 // forward the error for default processing.
561 if (_state == _STATE_PROBING_LENGTH) {
562 _completeError(e, s);
563 } else {
564 _resumableController.addError(e, s);
565 }
566 }
567
568 _completeError(e, s) {
569 if (_state != _STATE_LENGTH_KNOWN) {
570 // Always cancel subscription on error.
571 _subscription.cancel();
572 }
573 _doneCompleter.completeError(e, s);
574 }
575
576 void _startNormalUpload(Stream stream, int length) {
577 var contentType = _object.contentType != null
578 ? _object.contentType : 'application/octet-stream';
579 var media = new common.Media(stream, length, contentType: contentType);
580 _api.objects.insert(_object,
581 _bucketName,
582 name: _objectName,
583 predefinedAcl: _predefinedAcl,
584 uploadMedia: media,
585 uploadOptions: common.UploadOptions.Default)
586 .then((response) {
587 _doneCompleter.complete(new _ObjectInfoImpl(response));
588 }, onError: _completeError);
589 }
590
591 void _startResumableUpload(Stream stream, int length) {
592 var contentType = _object.contentType != null
593 ? _object.contentType : 'application/octet-stream';
594 var media = new common.Media(stream, length, contentType: contentType);
595 _api.objects.insert(_object,
596 _bucketName,
597 name: _objectName,
598 predefinedAcl: _predefinedAcl,
599 uploadMedia: media,
600 uploadOptions: common.UploadOptions.Resumable)
601 .then((response) {
602 _doneCompleter.complete(new _ObjectInfoImpl(response));
603 }, onError: _completeError);
604 }
605 }
OLDNEW
« no previous file with comments | « pkg/gcloud/lib/src/db/models.dart ('k') | pkg/gcloud/lib/storage.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698