| OLD | NEW |
| (Empty) |
| 1 // Copyright 2014 The Chromium Authors. All rights reserved. | |
| 2 // Use of this source code is governed by a BSD-style license that can be | |
| 3 // found in the LICENSE file. | |
| 4 | |
| 5 package cipd | |
| 6 | |
| 7 import ( | |
| 8 "errors" | |
| 9 "fmt" | |
| 10 "io" | |
| 11 "net/http" | |
| 12 "os" | |
| 13 "time" | |
| 14 | |
| 15 "infra/libs/logging" | |
| 16 ) | |
| 17 | |
| 18 var ( | |
| 19 // ErrFinalizationTimeout is returned if CAS service can not finalize up
load fast enough. | |
| 20 ErrFinalizationTimeout = errors.New("Timeout while waiting for CAS servi
ce to finalize the upload") | |
| 21 // ErrAttachTagsTimeout is returned when service refuses to accept tags
for a long time. | |
| 22 ErrAttachTagsTimeout = errors.New("Timeout while attaching tags") | |
| 23 ) | |
| 24 | |
| 25 // UploadOptions contains upload related parameters shared by UploadToCAS and | |
| 26 // RegisterInstance functions. | |
| 27 type UploadOptions struct { | |
| 28 // ServiceURL is root URL of the backend service, or "" to use default s
ervice. | |
| 29 ServiceURL string | |
| 30 // FinalizationTimeout is how long to wait for CAS service to finalize t
he upload, default is 1 min. | |
| 31 FinalizationTimeout time.Duration | |
| 32 // Client is http.Client to use to make requests, default is http.Defaul
tClient. | |
| 33 Client *http.Client | |
| 34 // Log is a logger to use for logs, default is logging.DefaultLogger. | |
| 35 Log logging.Logger | |
| 36 } | |
| 37 | |
| 38 // UploadToCASOptions contains parameters for UploadToCAS function. | |
| 39 type UploadToCASOptions struct { | |
| 40 UploadOptions | |
| 41 | |
| 42 // SHA1 is a SHA1 hash of data to upload, usually package's InstanceID()
. | |
| 43 SHA1 string | |
| 44 // Data provides actual data to upload. It is seekable to support resuma
ble uploads. | |
| 45 Data io.ReadSeeker | |
| 46 // UploadSessionID identified existing upload session. Empty string to s
tart a new one. | |
| 47 UploadSessionID string | |
| 48 // UploadURL is where to upload the file to. Must be set if UploadSessio
nID is not empty. | |
| 49 UploadURL string | |
| 50 | |
| 51 // remote is an instance of remoteService to use (if set). | |
| 52 remote *remoteService | |
| 53 } | |
| 54 | |
| 55 // UploadToCAS uploads package data blob to Content Addressed Store if it is not | |
| 56 // there already. The data is addressed by SHA1 hash (also known as package's | |
| 57 // InstanceID). It can be used as a standalone function (if UploadSessionID | |
| 58 // is "") or as a part of more high level upload process (in that case upload | |
| 59 // session can be opened elsewhere and its properties passed here via | |
| 60 // UploadSessionID and UploadURL). Returns nil on successful upload. | |
| 61 func UploadToCAS(options UploadToCASOptions) error { | |
| 62 // Fill in default options. | |
| 63 if options.ServiceURL == "" { | |
| 64 options.ServiceURL = DefaultServiceURL() | |
| 65 } | |
| 66 if options.FinalizationTimeout == 0 { | |
| 67 options.FinalizationTimeout = 60 * time.Second | |
| 68 } | |
| 69 if options.Client == nil { | |
| 70 options.Client = http.DefaultClient | |
| 71 } | |
| 72 if options.Log == nil { | |
| 73 options.Log = logging.DefaultLogger | |
| 74 } | |
| 75 if options.remote == nil { | |
| 76 options.remote = newRemoteService(options.Client, options.Servic
eURL, options.Log) | |
| 77 } | |
| 78 log := options.Log | |
| 79 remote := options.remote | |
| 80 | |
| 81 // Open new upload session if existing is not provided. | |
| 82 var session *uploadSession | |
| 83 var err error | |
| 84 if options.UploadSessionID == "" { | |
| 85 log.Infof("cipd: uploading %s: initiating", options.SHA1) | |
| 86 session, err = remote.initiateUpload(options.SHA1) | |
| 87 if err != nil { | |
| 88 log.Warningf("cipd: can't upload %s - %s", options.SHA1,
err) | |
| 89 return err | |
| 90 } | |
| 91 if session == nil { | |
| 92 log.Infof("cipd: %s is already uploaded", options.SHA1) | |
| 93 return nil | |
| 94 } | |
| 95 } else { | |
| 96 if options.UploadURL == "" { | |
| 97 return errors.New("UploadURL must be set if UploadSessio
nID is used") | |
| 98 } | |
| 99 session = &uploadSession{ | |
| 100 ID: options.UploadSessionID, | |
| 101 URL: options.UploadURL, | |
| 102 } | |
| 103 } | |
| 104 | |
| 105 // Upload the file to CAS storage. | |
| 106 err = resumableUpload(session.URL, 8*1024*1024, options) | |
| 107 if err != nil { | |
| 108 return err | |
| 109 } | |
| 110 | |
| 111 // Finalize the upload, wait until server verifies and publishes the fil
e. | |
| 112 started := clock.Now() | |
| 113 delay := time.Second | |
| 114 for { | |
| 115 published, err := remote.finalizeUpload(session.ID) | |
| 116 if published { | |
| 117 log.Infof("cipd: successfully uploaded %s", options.SHA1
) | |
| 118 return nil | |
| 119 } | |
| 120 if err != nil { | |
| 121 log.Warningf("cipd: upload of %s failed: %s", options.SH
A1, err) | |
| 122 return err | |
| 123 } | |
| 124 if clock.Now().Sub(started) > options.FinalizationTimeout { | |
| 125 log.Warningf("cipd: upload of %s failed: timeout", optio
ns.SHA1) | |
| 126 return ErrFinalizationTimeout | |
| 127 } | |
| 128 log.Infof("cipd: uploading %s: verifying", options.SHA1) | |
| 129 clock.Sleep(delay) | |
| 130 if delay < 4*time.Second { | |
| 131 delay += 500 * time.Millisecond | |
| 132 } | |
| 133 } | |
| 134 } | |
| 135 | |
| 136 // RegisterInstanceOptions contains parameters for RegisterInstance function. | |
| 137 type RegisterInstanceOptions struct { | |
| 138 UploadOptions | |
| 139 | |
| 140 // PackageInstance is a package instance to register. | |
| 141 PackageInstance PackageInstance | |
| 142 // Tags is a list of tags to attach to an instance. Will be attached eve
n if | |
| 143 // the instance already existed before. | |
| 144 Tags []string | |
| 145 } | |
| 146 | |
| 147 // RegisterInstance makes the package instance available for clients by | |
| 148 // uploading it to the storage and registering it in the package repository. | |
| 149 func RegisterInstance(options RegisterInstanceOptions) error { | |
| 150 // Fill in default options. | |
| 151 if options.ServiceURL == "" { | |
| 152 options.ServiceURL = DefaultServiceURL() | |
| 153 } | |
| 154 if options.Client == nil { | |
| 155 options.Client = http.DefaultClient | |
| 156 } | |
| 157 if options.Log == nil { | |
| 158 options.Log = logging.DefaultLogger | |
| 159 } | |
| 160 log := options.Log | |
| 161 inst := options.PackageInstance | |
| 162 remote := newRemoteService(options.Client, options.ServiceURL, log) | |
| 163 | |
| 164 // Attempt to register. | |
| 165 result, err := remote.registerInstance(inst.PackageName(), inst.Instance
ID()) | |
| 166 if err != nil { | |
| 167 return err | |
| 168 } | |
| 169 | |
| 170 // Asked to upload the package file to CAS first? | |
| 171 if result.UploadSession != nil { | |
| 172 err = UploadToCAS(UploadToCASOptions{ | |
| 173 UploadOptions: options.UploadOptions, | |
| 174 SHA1: inst.InstanceID(), | |
| 175 Data: inst.DataReader(), | |
| 176 UploadSessionID: result.UploadSession.ID, | |
| 177 UploadURL: result.UploadSession.URL, | |
| 178 remote: remote, | |
| 179 }) | |
| 180 if err != nil { | |
| 181 return err | |
| 182 } | |
| 183 // Try again, now that file is uploaded. | |
| 184 result, err = remote.registerInstance(inst.PackageName(), inst.I
nstanceID()) | |
| 185 if err != nil { | |
| 186 return err | |
| 187 } | |
| 188 if result.UploadSession != nil { | |
| 189 return errors.New("Package file is uploaded, but servers
asks us to upload it again") | |
| 190 } | |
| 191 } | |
| 192 | |
| 193 if result.AlreadyRegistered { | |
| 194 log.Infof( | |
| 195 "cipd: instance %s:%s is already registered by %s on %s"
, | |
| 196 inst.PackageName(), inst.InstanceID(), | |
| 197 result.Info.RegisteredBy, result.Info.RegisteredTs) | |
| 198 } else { | |
| 199 log.Infof("cipd: instance %s:%s was successfully registered", in
st.PackageName(), inst.InstanceID()) | |
| 200 } | |
| 201 | |
| 202 return attachTagsWhenReady(remote, inst.PackageName(), inst.InstanceID()
, options.Tags, log) | |
| 203 } | |
| 204 | |
| 205 //////////////////////////////////////////////////////////////////////////////// | |
| 206 // Google Storage resumable upload protocol. | |
| 207 // See https://cloud.google.com/storage/docs/concepts-techniques#resumable | |
| 208 | |
| 209 // errTransientError is returned by getNextOffset in case of retryable error. | |
| 210 var errTransientError = errors.New("Transient error in getUploadedOffset") | |
| 211 | |
| 212 // resumableUpload is mocked in tests. | |
| 213 var resumableUpload = func(uploadURL string, chunkSize int64, opts UploadToCASOp
tions) error { | |
| 214 // Grab the total length of the file. | |
| 215 length, err := opts.Data.Seek(0, os.SEEK_END) | |
| 216 if err != nil { | |
| 217 return err | |
| 218 } | |
| 219 _, err = opts.Data.Seek(0, os.SEEK_SET) | |
| 220 if err != nil { | |
| 221 return err | |
| 222 } | |
| 223 | |
| 224 // Called when some new data is uploaded. | |
| 225 reportProgress := func(offset int64) { | |
| 226 if length != 0 { | |
| 227 opts.Log.Infof("cipd: uploading %s: %d%%", opts.SHA1, of
fset*100/length) | |
| 228 } | |
| 229 } | |
| 230 | |
| 231 // Called when transient error happens. | |
| 232 reportTransientError := func() { | |
| 233 opts.Log.Warningf("cipd: transient upload error, retrying...") | |
| 234 clock.Sleep(2 * time.Second) | |
| 235 } | |
| 236 | |
| 237 var offset int64 | |
| 238 reportProgress(0) | |
| 239 for { | |
| 240 // Grab latest uploaded offset if not known. | |
| 241 if offset == -1 { | |
| 242 offset, err = getNextOffset(uploadURL, length, opts.Clie
nt) | |
| 243 if err == errTransientError { | |
| 244 offset = -1 | |
| 245 reportTransientError() | |
| 246 continue | |
| 247 } | |
| 248 if err != nil { | |
| 249 return err | |
| 250 } | |
| 251 reportProgress(offset) | |
| 252 if offset == length { | |
| 253 return nil | |
| 254 } | |
| 255 opts.Log.Warningf("cipd: resuming upload from offset %d"
, offset) | |
| 256 } | |
| 257 | |
| 258 // Length of a chunk to upload. | |
| 259 var chunk int64 = chunkSize | |
| 260 if chunk > length-offset { | |
| 261 chunk = length - offset | |
| 262 } | |
| 263 | |
| 264 // Upload the chunk. | |
| 265 opts.Data.Seek(offset, os.SEEK_SET) | |
| 266 r, err := http.NewRequest("PUT", uploadURL, io.LimitReader(opts.
Data, chunk)) | |
| 267 if err != nil { | |
| 268 return err | |
| 269 } | |
| 270 rangeHeader := fmt.Sprintf("bytes %d-%d/%d", offset, offset+chun
k-1, length) | |
| 271 r.Header.Set("Content-Range", rangeHeader) | |
| 272 r.Header.Set("Content-Length", fmt.Sprintf("%d", chunk)) | |
| 273 r.Header.Set("User-Agent", userAgent()) | |
| 274 resp, err := opts.Client.Do(r) | |
| 275 if err != nil { | |
| 276 return err | |
| 277 } | |
| 278 resp.Body.Close() | |
| 279 | |
| 280 // Partially or fully uploaded. | |
| 281 if resp.StatusCode == 308 || resp.StatusCode == 200 { | |
| 282 offset += chunk | |
| 283 reportProgress(offset) | |
| 284 if offset == length { | |
| 285 return nil | |
| 286 } | |
| 287 } else if resp.StatusCode < 500 && resp.StatusCode != 408 { | |
| 288 return fmt.Errorf("Unexpected response during file uploa
d: HTTP %d", resp.StatusCode) | |
| 289 } else { | |
| 290 // Transient error. Need to query for latest uploaded of
fset to resume. | |
| 291 offset = -1 | |
| 292 reportTransientError() | |
| 293 } | |
| 294 } | |
| 295 } | |
| 296 | |
| 297 // getNextOffset queries the storage for size of persisted data. | |
| 298 func getNextOffset(uploadURL string, length int64, client *http.Client) (offset
int64, err error) { | |
| 299 r, err := http.NewRequest("PUT", uploadURL, nil) | |
| 300 if err != nil { | |
| 301 return | |
| 302 } | |
| 303 r.Header.Set("Content-Range", fmt.Sprintf("bytes */%d", length)) | |
| 304 r.Header.Set("Content-Length", "0") | |
| 305 r.Header.Set("User-Agent", userAgent()) | |
| 306 resp, err := client.Do(r) | |
| 307 if err != nil { | |
| 308 return | |
| 309 } | |
| 310 resp.Body.Close() | |
| 311 | |
| 312 if resp.StatusCode == 200 { | |
| 313 // Fully uploaded. | |
| 314 offset = length | |
| 315 } else if resp.StatusCode == 308 { | |
| 316 // Partially uploaded, extract last uploaded offset from Range h
eader. | |
| 317 rangeHeader := resp.Header.Get("Range") | |
| 318 if rangeHeader != "" { | |
| 319 _, err = fmt.Sscanf(rangeHeader, "bytes=0-%d", &offset) | |
| 320 if err == nil { | |
| 321 // |offset| is an *offset* of a last uploaded by
te, not the data length. | |
| 322 offset++ | |
| 323 } | |
| 324 } | |
| 325 } else if resp.StatusCode < 500 && resp.StatusCode != 408 { | |
| 326 err = fmt.Errorf("Unexpected response (HTTP %d) when querying fo
r uploaded offset", resp.StatusCode) | |
| 327 } else { | |
| 328 err = errTransientError | |
| 329 } | |
| 330 return | |
| 331 } | |
| 332 | |
| 333 //////////////////////////////////////////////////////////////////////////////// | |
| 334 // Tags related functions. | |
| 335 | |
| 336 // attachTagsWhenReady attaches tags to an instance retrying when receiving | |
| 337 // PROCESSING_NOT_FINISHED_YET errors. | |
| 338 func attachTagsWhenReady(remote *remoteService, packageName, instanceID string,
tags []string, log logging.Logger) error { | |
| 339 if len(tags) == 0 { | |
| 340 return nil | |
| 341 } | |
| 342 for _, tag := range tags { | |
| 343 log.Infof("cipd: attaching tag %s", tag) | |
| 344 } | |
| 345 deadline := clock.Now().Add(60 * time.Second) | |
| 346 for clock.Now().Before(deadline) { | |
| 347 err := remote.attachTags(packageName, instanceID, tags) | |
| 348 if err == nil { | |
| 349 log.Infof("cipd: all tags attached") | |
| 350 return nil | |
| 351 } | |
| 352 if _, ok := err.(*pendingProcessingError); ok { | |
| 353 log.Warningf("cipd: package instance is not ready yet -
%s", err) | |
| 354 clock.Sleep(5 * time.Second) | |
| 355 } else { | |
| 356 log.Errorf("cipd: failed to attach tags - %s", err) | |
| 357 return err | |
| 358 } | |
| 359 } | |
| 360 log.Errorf("cipd: failed to attach tags - deadline exceeded") | |
| 361 return ErrAttachTagsTimeout | |
| 362 } | |
| OLD | NEW |