| OLD | NEW |
| 1 // Copyright 2014 The Chromium Authors. All rights reserved. | 1 // Copyright 2014 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package cipd | 5 package cipd |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "errors" | 8 "errors" |
| 9 "fmt" | 9 "fmt" |
| 10 "io" | 10 "io" |
| 11 "net/http" | 11 "net/http" |
| 12 "os" | 12 "os" |
| 13 "time" | 13 "time" |
| 14 | 14 |
| 15 "infra/libs/logging" | 15 "infra/libs/logging" |
| 16 ) | 16 ) |
| 17 | 17 |
| 18 var ( | 18 var ( |
| 19 // ErrFinalizationTimeout is returned if CAS service can not finalize up
load fast enough. | 19 // ErrFinalizationTimeout is returned if CAS service can not finalize up
load fast enough. |
| 20 ErrFinalizationTimeout = errors.New("Timeout while waiting for CAS servi
ce to finalize the upload") | 20 ErrFinalizationTimeout = errors.New("Timeout while waiting for CAS servi
ce to finalize the upload") |
| 21 // ErrAttachTagsTimeout is returned when service refuses to accept tags
for a long time. |
| 22 ErrAttachTagsTimeout = errors.New("Timeout while attaching tags") |
| 21 ) | 23 ) |
| 22 | 24 |
| 23 // UploadOptions contains upload related parameters shared by UploadToCAS and | 25 // UploadOptions contains upload related parameters shared by UploadToCAS and |
| 24 // RegisterInstance functions. | 26 // RegisterInstance functions. |
| 25 type UploadOptions struct { | 27 type UploadOptions struct { |
| 26 // ServiceURL is root URL of the backend service, or "" to use default s
ervice. | 28 // ServiceURL is root URL of the backend service, or "" to use default s
ervice. |
| 27 ServiceURL string | 29 ServiceURL string |
| 28 // FinalizationTimeout is how long to wait for CAS service to finalize t
he upload, default is 1 min. | 30 // FinalizationTimeout is how long to wait for CAS service to finalize t
he upload, default is 1 min. |
| 29 FinalizationTimeout time.Duration | 31 FinalizationTimeout time.Duration |
| 30 // Client is http.Client to use to make requests, default is http.Defaul
tClient. | 32 // Client is http.Client to use to make requests, default is http.Defaul
tClient. |
| (...skipping 97 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 128 if delay < 4*time.Second { | 130 if delay < 4*time.Second { |
| 129 delay += 500 * time.Millisecond | 131 delay += 500 * time.Millisecond |
| 130 } | 132 } |
| 131 } | 133 } |
| 132 } | 134 } |
| 133 | 135 |
| 134 // RegisterInstanceOptions contains parameters for RegisterInstance function. | 136 // RegisterInstanceOptions contains parameters for RegisterInstance function. |
| 135 type RegisterInstanceOptions struct { | 137 type RegisterInstanceOptions struct { |
| 136 UploadOptions | 138 UploadOptions |
| 137 | 139 |
| 138 » // PackageInstance is a package to upload. | 140 » // PackageInstance is a package instance to register. |
| 139 PackageInstance PackageInstance | 141 PackageInstance PackageInstance |
| 142 // Tags is a list of tags to attach to an instance. Will be attached eve
n if |
| 143 // the instance already existed before. |
| 144 Tags []string |
| 140 } | 145 } |
| 141 | 146 |
| 142 // RegisterInstance makes the package instance available for clients by | 147 // RegisterInstance makes the package instance available for clients by |
| 143 // uploading it to the storage and registering it in the package repository. | 148 // uploading it to the storage and registering it in the package repository. |
| 144 func RegisterInstance(options RegisterInstanceOptions) error { | 149 func RegisterInstance(options RegisterInstanceOptions) error { |
| 145 // Fill in default options. | 150 // Fill in default options. |
| 146 if options.ServiceURL == "" { | 151 if options.ServiceURL == "" { |
| 147 options.ServiceURL = DefaultServiceURL() | 152 options.ServiceURL = DefaultServiceURL() |
| 148 } | 153 } |
| 149 if options.Client == nil { | 154 if options.Client == nil { |
| (...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 187 | 192 |
| 188 if result.AlreadyRegistered { | 193 if result.AlreadyRegistered { |
| 189 log.Infof( | 194 log.Infof( |
| 190 "cipd: instance %s:%s is already registered by %s on %s"
, | 195 "cipd: instance %s:%s is already registered by %s on %s"
, |
| 191 inst.PackageName(), inst.InstanceID(), | 196 inst.PackageName(), inst.InstanceID(), |
| 192 result.Info.RegisteredBy, result.Info.RegisteredTs) | 197 result.Info.RegisteredBy, result.Info.RegisteredTs) |
| 193 } else { | 198 } else { |
| 194 log.Infof("cipd: instance %s:%s was successfully registered", in
st.PackageName(), inst.InstanceID()) | 199 log.Infof("cipd: instance %s:%s was successfully registered", in
st.PackageName(), inst.InstanceID()) |
| 195 } | 200 } |
| 196 | 201 |
| 197 » return nil | 202 » return attachTagsWhenReady(remote, inst.PackageName(), inst.InstanceID()
, options.Tags, log) |
| 198 } | 203 } |
| 199 | 204 |
| 200 //////////////////////////////////////////////////////////////////////////////// | 205 //////////////////////////////////////////////////////////////////////////////// |
| 201 // Google Storage resumable upload protocol. | 206 // Google Storage resumable upload protocol. |
| 202 // See https://cloud.google.com/storage/docs/concepts-techniques#resumable | 207 // See https://cloud.google.com/storage/docs/concepts-techniques#resumable |
| 203 | 208 |
| 204 // errTransientError is returned by getNextOffset in case of retryable error. | 209 // errTransientError is returned by getNextOffset in case of retryable error. |
| 205 var errTransientError = errors.New("Transient error in getUploadedOffset") | 210 var errTransientError = errors.New("Transient error in getUploadedOffset") |
| 206 | 211 |
| 207 // resumableUpload is mocked in tests. | 212 // resumableUpload is mocked in tests. |
| (...skipping 109 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 317 offset++ | 322 offset++ |
| 318 } | 323 } |
| 319 } | 324 } |
| 320 } else if resp.StatusCode < 500 && resp.StatusCode != 408 { | 325 } else if resp.StatusCode < 500 && resp.StatusCode != 408 { |
| 321 err = fmt.Errorf("Unexpected response (HTTP %d) when querying fo
r uploaded offset", resp.StatusCode) | 326 err = fmt.Errorf("Unexpected response (HTTP %d) when querying fo
r uploaded offset", resp.StatusCode) |
| 322 } else { | 327 } else { |
| 323 err = errTransientError | 328 err = errTransientError |
| 324 } | 329 } |
| 325 return | 330 return |
| 326 } | 331 } |
| 332 |
| 333 //////////////////////////////////////////////////////////////////////////////// |
| 334 // Tags related functions. |
| 335 |
| 336 // attachTagsWhenReady attaches tags to an instance retrying when receiving |
| 337 // PROCESSING_NOT_FINISHED_YET errors. |
| 338 func attachTagsWhenReady(remote *remoteService, packageName, instanceID string,
tags []string, log logging.Logger) error { |
| 339 if len(tags) == 0 { |
| 340 return nil |
| 341 } |
| 342 for _, tag := range tags { |
| 343 log.Infof("cipd: attaching tag %s", tag) |
| 344 } |
| 345 deadline := clock.Now().Add(60 * time.Second) |
| 346 for clock.Now().Before(deadline) { |
| 347 err := remote.attachTags(packageName, instanceID, tags) |
| 348 if err == nil { |
| 349 log.Infof("cipd: all tags attached") |
| 350 return nil |
| 351 } |
| 352 if _, ok := err.(*pendingProcessingError); ok { |
| 353 log.Warningf("cipd: package instance is not ready yet -
%s", err) |
| 354 clock.Sleep(5 * time.Second) |
| 355 } else { |
| 356 log.Errorf("cipd: failed to attach tags - %s", err) |
| 357 return err |
| 358 } |
| 359 } |
| 360 log.Errorf("cipd: failed to attach tags - deadline exceeded") |
| 361 return ErrAttachTagsTimeout |
| 362 } |
| OLD | NEW |