Chromium Code Reviews| OLD | NEW |
|---|---|
| (Empty) | |
| 1 // Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file | |
| 2 // for details. All rights reserved. Use of this source code is governed by a | |
| 3 // BSD-style license that can be found in the LICENSE file. | |
| 4 | |
| 5 library pub_server.copy_and_write_repository; | |
| 6 | |
| 7 import 'dart:async'; | |
| 8 | |
| 9 import 'package:logging/logging.dart'; | |
| 10 import 'package:pub_server/repository.dart'; | |
| 11 | |
| 12 final Logger _logger = new Logger('pub_server.cow_repository'); | |
| 13 | |
| 14 /// A [CopyAndWriteRepository] writes to one repository and directs | |
| 15 /// read-misses to another repository. | |
| 16 /// | |
| 17 /// Package versions not available from the read-write repository will be | |
| 18 /// fetched from a read-fallback repository and uploaded to the read-write | |
| 19 /// repository. This caches effectively all packages requested through this | |
|
Søren Gjesse
2015/02/16 11:54:51
caches effectively -> effectively caches
kustermann
2015/02/16 14:44:02
Done.
| |
| 20 /// [CopyAndWriteRepository]. | |
| 21 /// | |
| 22 /// New package versions which get uploaded will be stored only locally. | |
| 23 class CopyAndWriteRepository extends PackageRepository { | |
| 24 final PackageRepository local; | |
| 25 final PackageRepository remote; | |
| 26 final _RemoteMetadataCache _localCache; | |
| 27 final _RemoteMetadataCache _remoteCache; | |
| 28 | |
| 29 /// Construct a new proxy with [local] as the local [PackageRepository] which | |
| 30 /// is used for uploading new package versions to and [remote] as the | |
| 31 /// read-only [PackageRepository] which is consulted on misses in [local]. | |
| 32 CopyAndWriteRepository(PackageRepository local, PackageRepository remote) | |
| 33 : this.local = local, | |
| 34 this.remote = remote, | |
| 35 this._localCache = new _RemoteMetadataCache(local), | |
| 36 this._remoteCache = new _RemoteMetadataCache(remote); | |
| 37 | |
| 38 Stream<PackageVersion> versions(String package) { | |
| 39 var controller; | |
| 40 | |
| 41 onListen() { | |
| 42 Future.wait([_localCache.fetchVersionlist(package), | |
| 43 _remoteCache.fetchVersionlist(package)]).then((tuple) { | |
| 44 var versions = new Set() | |
| 45 ..addAll(tuple[0]) | |
| 46 ..addAll(tuple[1]); | |
| 47 for (var version in versions) controller.add(version); | |
| 48 controller.close(); | |
| 49 }); | |
| 50 } | |
| 51 | |
| 52 controller = new StreamController(onListen: onListen); | |
| 53 return controller.stream; | |
| 54 } | |
| 55 | |
| 56 Future<PackageVersion> lookupVersion(String package, String version) { | |
| 57 return versions(package) | |
| 58 .where((pv) => pv.versionString == version) | |
| 59 .toList().then((List<PackageVersion> versions) { | |
| 60 if (versions.length >= 1) return versions.first; | |
| 61 return null; | |
| 62 }); | |
| 63 } | |
| 64 | |
| 65 Future<Stream> download(String package, String version) async { | |
| 66 var packageVersion = await local.lookupVersion(package, version); | |
| 67 | |
| 68 if (packageVersion != null) { | |
| 69 _logger.info('Serving $package/$version from local repository.'); | |
| 70 return local.download(package, packageVersion.versionString); | |
| 71 } else { | |
| 72 // We first download the package from the remote repository and store | |
| 73 // it locally. Then we read the local version and return it. | |
| 74 | |
| 75 _logger.info('Downloading $package/$version from remote repository.'); | |
| 76 var stream = await remote.download(package, version); | |
| 77 | |
| 78 _logger.info('Upload $package/$version to local repository.'); | |
| 79 await local.upload(stream); | |
| 80 | |
| 81 _logger.info('Serving $package/$version from local repository.'); | |
| 82 return local.download(package, version); | |
| 83 } | |
| 84 } | |
| 85 | |
| 86 bool get supportsUpload => true; | |
| 87 | |
| 88 Future upload(Stream<List<int>> data) async { | |
| 89 _logger.info('Starting upload to local package repository.'); | |
| 90 var stream = await local.upload(data); | |
| 91 | |
| 92 // TODO: It's not really necessary to invalidate all. | |
| 93 _logger.info('Upload finished. Invalidating in-memory cache.'); | |
| 94 _localCache.invalidateAll(); | |
| 95 } | |
| 96 | |
| 97 bool get supportsAsyncUpload => false; | |
| 98 } | |
| 99 | |
| 100 /// A cache for [PackageVersion] objects for a given `package`. | |
| 101 /// | |
| 102 /// The constructor takes a [PackageRepository] which will be used to populate | |
| 103 /// the cache. | |
| 104 class _RemoteMetadataCache { | |
| 105 final PackageRepository remote; | |
| 106 | |
| 107 Map<String, Set<PackageVersion>> _versions = {}; | |
| 108 Map<String, Completer<Set<PackageVersion>>> _versionCompleters = {}; | |
| 109 | |
| 110 _RemoteMetadataCache(this.remote); | |
| 111 | |
| 112 // TODO: After a cache expiration we should invalidate entries and re-fetch | |
| 113 // them. | |
| 114 Future<List<PackageVersion>> fetchVersionlist(String package) { | |
| 115 return _versionCompleters.putIfAbsent(package, () { | |
| 116 var c = new Completer(); | |
| 117 | |
| 118 _versions.putIfAbsent(package, () => new Set()); | |
| 119 remote.versions(package).toList().then((versions) { | |
| 120 _versions[package].addAll(versions); | |
| 121 c.complete(_versions[package]); | |
| 122 }); | |
| 123 | |
| 124 return c; | |
| 125 }).future.then((set) => set.toList()); | |
| 126 } | |
| 127 | |
| 128 void addVersion(String package, PackageVersion version) { | |
| 129 _versions.putIfAbsent(version.packageName, () => new Set()).add(version); | |
| 130 } | |
| 131 | |
| 132 void invalidateAll() { | |
| 133 _versionCompleters.clear(); | |
| 134 _versions.clear(); | |
| 135 } | |
| 136 } | |
| OLD | NEW |