| OLD | NEW |
| (Empty) | |
| 1 // Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. |
| 4 |
| 5 library pub_server.copy_and_write_repository; |
| 6 |
| 7 import 'dart:async'; |
| 8 |
| 9 import 'package:logging/logging.dart'; |
| 10 import 'package:pub_server/repository.dart'; |
| 11 |
| 12 final Logger _logger = new Logger('pub_server.cow_repository'); |
| 13 |
| 14 /// A [CopyAndWriteRepository] writes to one repository and directs |
| 15 /// read-misses to another repository. |
| 16 /// |
| 17 /// Package versions not available from the read-write repository will be |
| 18 /// fetched from a read-fallback repository and uploaded to the read-write |
| 19 /// repository. This effectively caches all packages requested through this |
| 20 /// [CopyAndWriteRepository]. |
| 21 /// |
| 22 /// New package versions which get uploaded will be stored only locally. |
| 23 class CopyAndWriteRepository extends PackageRepository { |
| 24 final PackageRepository local; |
| 25 final PackageRepository remote; |
| 26 final _RemoteMetadataCache _localCache; |
| 27 final _RemoteMetadataCache _remoteCache; |
| 28 |
| 29 /// Construct a new proxy with [local] as the local [PackageRepository] which |
| 30 /// is used for uploading new package versions to and [remote] as the |
| 31 /// read-only [PackageRepository] which is consulted on misses in [local]. |
| 32 CopyAndWriteRepository(PackageRepository local, PackageRepository remote) |
| 33 : this.local = local, |
| 34 this.remote = remote, |
| 35 this._localCache = new _RemoteMetadataCache(local), |
| 36 this._remoteCache = new _RemoteMetadataCache(remote); |
| 37 |
| 38 Stream<PackageVersion> versions(String package) { |
| 39 var controller; |
| 40 |
| 41 onListen() { |
| 42 Future.wait([_localCache.fetchVersionlist(package), |
| 43 _remoteCache.fetchVersionlist(package)]).then((tuple) { |
| 44 var versions = new Set() |
| 45 ..addAll(tuple[0]) |
| 46 ..addAll(tuple[1]); |
| 47 for (var version in versions) controller.add(version); |
| 48 controller.close(); |
| 49 }); |
| 50 } |
| 51 |
| 52 controller = new StreamController(onListen: onListen); |
| 53 return controller.stream; |
| 54 } |
| 55 |
| 56 Future<PackageVersion> lookupVersion(String package, String version) { |
| 57 return versions(package) |
| 58 .where((pv) => pv.versionString == version) |
| 59 .toList().then((List<PackageVersion> versions) { |
| 60 if (versions.length >= 1) return versions.first; |
| 61 return null; |
| 62 }); |
| 63 } |
| 64 |
| 65 Future<Stream> download(String package, String version) async { |
| 66 var packageVersion = await local.lookupVersion(package, version); |
| 67 |
| 68 if (packageVersion != null) { |
| 69 _logger.info('Serving $package/$version from local repository.'); |
| 70 return local.download(package, packageVersion.versionString); |
| 71 } else { |
| 72 // We first download the package from the remote repository and store |
| 73 // it locally. Then we read the local version and return it. |
| 74 |
| 75 _logger.info('Downloading $package/$version from remote repository.'); |
| 76 var stream = await remote.download(package, version); |
| 77 |
| 78 _logger.info('Upload $package/$version to local repository.'); |
| 79 await local.upload(stream); |
| 80 |
| 81 _logger.info('Serving $package/$version from local repository.'); |
| 82 return local.download(package, version); |
| 83 } |
| 84 } |
| 85 |
| 86 bool get supportsUpload => true; |
| 87 |
| 88 Future upload(Stream<List<int>> data) { |
| 89 _logger.info('Starting upload to local package repository.'); |
| 90 // TODO: Converting this to an async scope makes the stream not get any data |
| 91 // or done event. Seems like there is still an issue in |
| 92 // package:mime - making this an async scope results in this stream getting |
| 93 // no data. |
| 94 return local.upload(data).then((data) { |
| 95 // TODO: It's not really necessary to invalidate all. |
| 96 _logger.info('Upload finished. Invalidating in-memory cache.'); |
| 97 _localCache.invalidateAll(); |
| 98 }); |
| 99 } |
| 100 |
| 101 bool get supportsAsyncUpload => false; |
| 102 } |
| 103 |
| 104 /// A cache for [PackageVersion] objects for a given `package`. |
| 105 /// |
| 106 /// The constructor takes a [PackageRepository] which will be used to populate |
| 107 /// the cache. |
| 108 class _RemoteMetadataCache { |
| 109 final PackageRepository remote; |
| 110 |
| 111 Map<String, Set<PackageVersion>> _versions = {}; |
| 112 Map<String, Completer<Set<PackageVersion>>> _versionCompleters = {}; |
| 113 |
| 114 _RemoteMetadataCache(this.remote); |
| 115 |
| 116 // TODO: After a cache expiration we should invalidate entries and re-fetch |
| 117 // them. |
| 118 Future<List<PackageVersion>> fetchVersionlist(String package) { |
| 119 return _versionCompleters.putIfAbsent(package, () { |
| 120 var c = new Completer(); |
| 121 |
| 122 _versions.putIfAbsent(package, () => new Set()); |
| 123 remote.versions(package).toList().then((versions) { |
| 124 _versions[package].addAll(versions); |
| 125 c.complete(_versions[package]); |
| 126 }); |
| 127 |
| 128 return c; |
| 129 }).future.then((set) => set.toList()); |
| 130 } |
| 131 |
| 132 void addVersion(String package, PackageVersion version) { |
| 133 _versions.putIfAbsent(version.packageName, () => new Set()).add(version); |
| 134 } |
| 135 |
| 136 void invalidateAll() { |
| 137 _versionCompleters.clear(); |
| 138 _versions.clear(); |
| 139 } |
| 140 } |
| OLD | NEW |