Chromium Code Reviews| Index: go/src/infra/tools/cipd/client.go |
| diff --git a/go/src/infra/tools/cipd/client.go b/go/src/infra/tools/cipd/client.go |
| index 183f08182e76e7b70be374748836919b919c6642..9ea4ddfb31cdba8ca29572b4ce82c1db839d09a2 100644 |
| --- a/go/src/infra/tools/cipd/client.go |
| +++ b/go/src/infra/tools/cipd/client.go |
| @@ -41,13 +41,16 @@ import ( |
| "io" |
| "net/http" |
| "os" |
| + "path/filepath" |
| "sort" |
| "strings" |
| + "sync" |
| "time" |
| "github.com/luci/luci-go/common/logging" |
| "infra/tools/cipd/common" |
| + "infra/tools/cipd/internal" |
| "infra/tools/cipd/local" |
| ) |
| @@ -76,29 +79,29 @@ const ( |
| var ( |
| // ErrFinalizationTimeout is returned if CAS service can not finalize upload fast enough. |
| - ErrFinalizationTimeout = errors.New("Timeout while waiting for CAS service to finalize the upload") |
| + ErrFinalizationTimeout = errors.New("timeout while waiting for CAS service to finalize the upload") |
| // ErrBadUpload is returned when a package file is uploaded, but servers asks us to upload it again. |
| - ErrBadUpload = errors.New("Package file is uploaded, but servers asks us to upload it again") |
| + ErrBadUpload = errors.New("package file is uploaded, but servers asks us to upload it again") |
| // ErrBadUploadSession is returned by UploadToCAS if provided UploadSession is not valid. |
| - ErrBadUploadSession = errors.New("UploadURL must be set if UploadSessionID is used") |
| + ErrBadUploadSession = errors.New("uploadURL must be set if UploadSessionID is used") |
| // ErrUploadSessionDied is returned by UploadToCAS if upload session suddenly disappeared. |
| - ErrUploadSessionDied = errors.New("Upload session is unexpectedly missing") |
| + ErrUploadSessionDied = errors.New("upload session is unexpectedly missing") |
| // ErrNoUploadSessionID is returned by UploadToCAS if server didn't provide upload session ID. |
| - ErrNoUploadSessionID = errors.New("Server didn't provide upload session ID") |
| + ErrNoUploadSessionID = errors.New("server didn't provide upload session ID") |
| // ErrSetRefTimeout is returned when service refuses to move a ref for a long time. |
| - ErrSetRefTimeout = errors.New("Timeout while moving a ref") |
| + ErrSetRefTimeout = errors.New("timeout while moving a ref") |
| // ErrAttachTagsTimeout is returned when service refuses to accept tags for a long time. |
| - ErrAttachTagsTimeout = errors.New("Timeout while attaching tags") |
| + ErrAttachTagsTimeout = errors.New("timeout while attaching tags") |
| // ErrDownloadError is returned by FetchInstance on download errors. |
| - ErrDownloadError = errors.New("Failed to download the package file after multiple attempts") |
| + ErrDownloadError = errors.New("failed to download the package file after multiple attempts") |
| // ErrUploadError is returned by RegisterInstance and UploadToCAS on upload errors. |
| - ErrUploadError = errors.New("Failed to upload the package file after multiple attempts") |
| + ErrUploadError = errors.New("failed to upload the package file after multiple attempts") |
| // ErrAccessDenined is returned by calls talking to backend on 401 or 403 HTTP errors. |
| - ErrAccessDenined = errors.New("Access denied (not authenticated or not enough permissions)") |
| + ErrAccessDenined = errors.New("access denied (not authenticated or not enough permissions)") |
| // ErrBackendInaccessible is returned by calls talking to backed if it doesn't response. |
| - ErrBackendInaccessible = errors.New("Request to the backend failed after multiple attempts") |
| + ErrBackendInaccessible = errors.New("request to the backend failed after multiple attempts") |
| // ErrEnsurePackagesFailed is returned by EnsurePackages if something is not right. |
| - ErrEnsurePackagesFailed = errors.New("Failed to update packages, see the log") |
| + ErrEnsurePackagesFailed = errors.New("failed to update packages, see the log") |
| ) |
| // PackageACL is per package path per role access control list that is a part of |
| @@ -135,7 +138,7 @@ type UploadSession struct { |
| URL string |
| } |
| -// Client provides high-level CIPD client interface. |
| +// Client provides high-level CIPD client interface. Thread safe. |
| type Client interface { |
| // FetchACL returns a list of PackageACL objects (parent paths first) that |
| // together define the access control list for the given package subpath. |
| @@ -191,6 +194,9 @@ type Client interface { |
| // what packages (and versions) should be installed it will do all necessary |
| // actions to bring the state of the site root to the desired one. |
| EnsurePackages(pins []common.Pin) error |
| + |
| + // Close should be called to dump any cached state to disk. |
| + Close() |
| } |
| // HTTPClientFactory lazily creates http.Client to use for making requests. |
| @@ -200,15 +206,25 @@ type HTTPClientFactory func() (*http.Client, error) |
| type ClientOptions struct { |
| // ServiceURL is root URL of the backend service. |
| ServiceURL string |
| - // Root is a site root directory (where packages will be installed). It can |
| - // be empty string if client is not going to be used to deploy or remove local packages. |
| + |
| + // Root is a site root directory (a directory where packages will be |
| + // installed to). It also hosts .cipd/* directory that tracks internal state |
| + // of installed packages and keeps various cache files. 'Root' can be an empty |
| + // string if the client is not going to be used to deploy or remove local |
| + // packages. In that case caches are also disabled. |
| Root string |
| + |
| // Logger is a logger to use for logs (null-logger by default). |
| Logger logging.Logger |
| - // AuthenticatedClientFactory lazily creates http.Client to use for making RPC requests. |
| + |
| + // AuthenticatedClientFactory lazily creates http.Client to use for making |
| + // RPC requests. |
| AuthenticatedClientFactory HTTPClientFactory |
| - // AnonymousClientFactory lazily creates http.Client to use for making requests to storage. |
| + |
| + // AnonymousClientFactory lazily creates http.Client to use for making |
| + // requests to storage. |
| AnonymousClientFactory HTTPClientFactory |
| + |
| // UserAgent is put into User-Agent HTTP header with each request. |
| UserAgent string |
| } |
| @@ -243,43 +259,110 @@ func NewClient(opts ClientOptions) Client { |
| type clientImpl struct { |
| ClientOptions |
| - // clock provides current time and ability to sleep. |
| + // lock protects lazily initialized portions of the client. |
| + lock sync.Mutex |
| + |
| + // clock provides current time and ability to sleep. Thread safe. |
| clock clock |
| - // remote knows how to call backend REST API. |
| + |
| + // remote knows how to call backend REST API. Thread safe. |
| remote remote |
| + |
| // storage knows how to upload and download raw binaries using signed URLs. |
| + // Thread safe. |
| storage storage |
| - // deployer knows how to install packages to local file system. |
| + |
| + // deployer knows how to install packages to local file system. Thread safe. |
| deployer local.Deployer |
| - // authClient is a lazily created http.Client to use for authenticated requests. |
| + // tagCache is used to cache (pkgname, tag) -> instanceID mapping. |
| + // Thread safe, but lazily initialized under lock. |
| + tagCache internal.TagCache |
| + |
| + // authClient is a lazily created http.Client to use for authenticated |
| + // requests. Thread safe, but lazily initialized under lock. |
| authClient *http.Client |
| + |
| // anonClient is a lazily created http.Client to use for anonymous requests. |
| + // Thread safe, but lazily initialized under lock. |
| anonClient *http.Client |
| } |
| // doAuthenticatedHTTPRequest is used by remote implementation to make HTTP calls. |
| func (client *clientImpl) doAuthenticatedHTTPRequest(req *http.Request) (*http.Response, error) { |
| - if client.authClient == nil { |
| - var err error |
| - client.authClient, err = client.AuthenticatedClientFactory() |
| - if err != nil { |
| - return nil, err |
| - } |
| - } |
| - return client.authClient.Do(req) |
| + return client.doRequest(req, &client.authClient, client.AuthenticatedClientFactory) |
| } |
| // doAnonymousHTTPRequest is used by storage implementation to make HTTP calls. |
| func (client *clientImpl) doAnonymousHTTPRequest(req *http.Request) (*http.Response, error) { |
| - if client.anonClient == nil { |
| + return client.doRequest(req, &client.anonClient, client.AnonymousClientFactory) |
| +} |
| + |
| +// doRequest lazy-initializes http.Client by calling giving callback and then |
|
tandrii(chromium)
2015/09/30 18:11:15
s/by calling giving callback/using provided factor
Vadim Sh.
2015/09/30 22:50:59
Done.
|
| +// executes the request. |
| +func (client *clientImpl) doRequest(req *http.Request, c **http.Client, fac HTTPClientFactory) (*http.Response, error) { |
| + httpClient, err := func() (*http.Client, error) { |
| + client.lock.Lock() |
| + defer client.lock.Unlock() |
| var err error |
| - client.anonClient, err = client.AnonymousClientFactory() |
| - if err != nil { |
| - return nil, err |
| + if *c == nil { |
| + *c, err = fac() |
| + } |
| + return *c, err |
| + }() |
| + if err != nil { |
| + return nil, err |
| + } |
| + return httpClient.Do(req) |
| +} |
| + |
| +// tagCachePath returns path to a tag cache file or "" if no root dir. |
| +func (client *clientImpl) tagCachePath() string { |
| + if client.Root == "" { |
| + return "" |
| + } |
| + return filepath.Join(client.Root, local.SiteServiceDir, "tagcache.db") |
| +} |
| + |
| +// getTagCache lazy-initializes tagCache instance and returns it. |
| +func (client *clientImpl) getTagCache() internal.TagCache { |
| + client.lock.Lock() |
| + defer client.lock.Unlock() |
| + if client.tagCache == nil { |
| + if path := client.tagCachePath(); path != "" { |
| + var err error |
| + client.tagCache, err = internal.LoadTagCacheFromFile(path) |
| + if err != nil { |
| + client.Logger.Warningf("cipd: failed to load tag cache - %s", err) |
| + client.tagCache = internal.NewTagCache() |
|
nodir
2015/09/30 19:08:23
remove this
Vadim Sh.
2015/09/30 22:50:59
Done.
|
| + } |
| + } else { |
| + client.tagCache = internal.NewTagCache() |
|
nodir
2015/09/30 19:08:23
remove this
Vadim Sh.
2015/09/30 22:50:58
Done.
|
| } |
| } |
|
nodir
2015/09/30 19:08:23
add
if client.tagCache == nil {
client.TagCach
Vadim Sh.
2015/09/30 22:50:58
Done.
|
| - return client.anonClient.Do(req) |
| + return client.tagCache |
| +} |
| + |
| +// closeTagCache dumps any changes made to tag cache to disk, if necessary. |
| +// Called under lock. |
|
tandrii(chromium)
2015/09/30 18:11:15
nit suggestion: s/.../Must be called under lock.
Vadim Sh.
2015/09/30 22:50:59
Done. Whatever.
|
| +func (client *clientImpl) closeTagCache() { |
| + path := client.tagCachePath() |
| + if client.tagCache == nil || path == "" || !client.tagCache.Dirty() { |
| + client.tagCache = nil |
| + return |
| + } |
| + //It's tiny in size (and protobuf can't serialize to io.Reader anyway). Then |
|
tandrii(chromium)
2015/09/30 18:11:15
nit: space between // and It
nit: s/Then/So,
Vadim Sh.
2015/09/30 22:50:59
Done.
|
| + // dump it to disk via FileSystem object to deal with possible concurrent |
| + // updates, missing directories, etc. |
| + fs := local.NewFileSystem(filepath.Dir(path), client.Logger) |
| + out, err := client.tagCache.Save() |
| + if err == nil { |
| + err = fs.EnsureFile(path, out, 0666) |
| + } |
| + if err != nil { |
| + client.Logger.Warningf("cipd: failed to update tag cache - %s", err) |
| + } |
| + client.tagCache = nil |
| } |
| func (client *clientImpl) FetchACL(packagePath string) ([]PackageACL, error) { |
| @@ -368,7 +451,23 @@ func (client *clientImpl) ResolveVersion(packageName, version string) (common.Pi |
| if err := common.ValidateInstanceVersion(version); err != nil { |
| return common.Pin{}, err |
| } |
| - return client.remote.resolveVersion(packageName, version) |
| + // Use local cache when resolving tags to avoid round trips to backed when |
|
tandrii(chromium)
2015/09/30 18:11:15
s/backed/backend
Vadim Sh.
2015/09/30 22:50:59
Done.
|
| + // calling same 'cipd ensure' command again and again. |
| + isTag := common.ValidateInstanceTag(version) == nil |
| + if isTag { |
| + cached := client.getTagCache().ResolveTag(packageName, version) |
| + if cached.InstanceID != "" { |
| + return cached, nil |
| + } |
| + } |
| + pin, err := client.remote.resolveVersion(packageName, version) |
| + if err != nil { |
| + return pin, err |
| + } |
| + if isTag { |
| + client.getTagCache().AddTag(pin, version) |
| + } |
| + return pin, nil |
| } |
| func (client *clientImpl) RegisterInstance(instance local.PackageInstance) error { |
| @@ -629,6 +728,14 @@ func (client *clientImpl) EnsurePackages(pins []common.Pin) error { |
| return ErrEnsurePackagesFailed |
| } |
| +func (client *clientImpl) Close() { |
| + client.lock.Lock() |
| + defer client.lock.Unlock() |
| + client.closeTagCache() |
| + client.authClient = nil |
| + client.anonClient = nil |
| +} |
| + |
| //////////////////////////////////////////////////////////////////////////////// |
| // Private structs and interfaces. |