Index: go/src/infra/tools/cipd/uploader.go |
diff --git a/go/src/infra/tools/cipd/uploader.go b/go/src/infra/tools/cipd/uploader.go |
deleted file mode 100644 |
index c76bb80ef9187db99fd2a00765973976a10037f3..0000000000000000000000000000000000000000 |
--- a/go/src/infra/tools/cipd/uploader.go |
+++ /dev/null |
@@ -1,362 +0,0 @@ |
-// Copyright 2014 The Chromium Authors. All rights reserved. |
-// Use of this source code is governed by a BSD-style license that can be |
-// found in the LICENSE file. |
- |
-package cipd |
- |
-import ( |
- "errors" |
- "fmt" |
- "io" |
- "net/http" |
- "os" |
- "time" |
- |
- "infra/libs/logging" |
-) |
- |
-var ( |
- // ErrFinalizationTimeout is returned if CAS service can not finalize upload fast enough. |
- ErrFinalizationTimeout = errors.New("Timeout while waiting for CAS service to finalize the upload") |
- // ErrAttachTagsTimeout is returned when service refuses to accept tags for a long time. |
- ErrAttachTagsTimeout = errors.New("Timeout while attaching tags") |
-) |
- |
-// UploadOptions contains upload related parameters shared by UploadToCAS and |
-// RegisterInstance functions. |
-type UploadOptions struct { |
- // ServiceURL is root URL of the backend service, or "" to use default service. |
- ServiceURL string |
- // FinalizationTimeout is how long to wait for CAS service to finalize the upload, default is 1 min. |
- FinalizationTimeout time.Duration |
- // Client is http.Client to use to make requests, default is http.DefaultClient. |
- Client *http.Client |
- // Log is a logger to use for logs, default is logging.DefaultLogger. |
- Log logging.Logger |
-} |
- |
-// UploadToCASOptions contains parameters for UploadToCAS function. |
-type UploadToCASOptions struct { |
- UploadOptions |
- |
- // SHA1 is a SHA1 hash of data to upload, usually package's InstanceID(). |
- SHA1 string |
- // Data provides actual data to upload. It is seekable to support resumable uploads. |
- Data io.ReadSeeker |
- // UploadSessionID identified existing upload session. Empty string to start a new one. |
- UploadSessionID string |
- // UploadURL is where to upload the file to. Must be set if UploadSessionID is not empty. |
- UploadURL string |
- |
- // remote is an instance of remoteService to use (if set). |
- remote *remoteService |
-} |
- |
-// UploadToCAS uploads package data blob to Content Addressed Store if it is not |
-// there already. The data is addressed by SHA1 hash (also known as package's |
-// InstanceID). It can be used as a standalone function (if UploadSessionID |
-// is "") or as a part of more high level upload process (in that case upload |
-// session can be opened elsewhere and its properties passed here via |
-// UploadSessionID and UploadURL). Returns nil on successful upload. |
-func UploadToCAS(options UploadToCASOptions) error { |
- // Fill in default options. |
- if options.ServiceURL == "" { |
- options.ServiceURL = DefaultServiceURL() |
- } |
- if options.FinalizationTimeout == 0 { |
- options.FinalizationTimeout = 60 * time.Second |
- } |
- if options.Client == nil { |
- options.Client = http.DefaultClient |
- } |
- if options.Log == nil { |
- options.Log = logging.DefaultLogger |
- } |
- if options.remote == nil { |
- options.remote = newRemoteService(options.Client, options.ServiceURL, options.Log) |
- } |
- log := options.Log |
- remote := options.remote |
- |
- // Open new upload session if existing is not provided. |
- var session *uploadSession |
- var err error |
- if options.UploadSessionID == "" { |
- log.Infof("cipd: uploading %s: initiating", options.SHA1) |
- session, err = remote.initiateUpload(options.SHA1) |
- if err != nil { |
- log.Warningf("cipd: can't upload %s - %s", options.SHA1, err) |
- return err |
- } |
- if session == nil { |
- log.Infof("cipd: %s is already uploaded", options.SHA1) |
- return nil |
- } |
- } else { |
- if options.UploadURL == "" { |
- return errors.New("UploadURL must be set if UploadSessionID is used") |
- } |
- session = &uploadSession{ |
- ID: options.UploadSessionID, |
- URL: options.UploadURL, |
- } |
- } |
- |
- // Upload the file to CAS storage. |
- err = resumableUpload(session.URL, 8*1024*1024, options) |
- if err != nil { |
- return err |
- } |
- |
- // Finalize the upload, wait until server verifies and publishes the file. |
- started := clock.Now() |
- delay := time.Second |
- for { |
- published, err := remote.finalizeUpload(session.ID) |
- if published { |
- log.Infof("cipd: successfully uploaded %s", options.SHA1) |
- return nil |
- } |
- if err != nil { |
- log.Warningf("cipd: upload of %s failed: %s", options.SHA1, err) |
- return err |
- } |
- if clock.Now().Sub(started) > options.FinalizationTimeout { |
- log.Warningf("cipd: upload of %s failed: timeout", options.SHA1) |
- return ErrFinalizationTimeout |
- } |
- log.Infof("cipd: uploading %s: verifying", options.SHA1) |
- clock.Sleep(delay) |
- if delay < 4*time.Second { |
- delay += 500 * time.Millisecond |
- } |
- } |
-} |
- |
-// RegisterInstanceOptions contains parameters for RegisterInstance function. |
-type RegisterInstanceOptions struct { |
- UploadOptions |
- |
- // PackageInstance is a package instance to register. |
- PackageInstance PackageInstance |
- // Tags is a list of tags to attach to an instance. Will be attached even if |
- // the instance already existed before. |
- Tags []string |
-} |
- |
-// RegisterInstance makes the package instance available for clients by |
-// uploading it to the storage and registering it in the package repository. |
-func RegisterInstance(options RegisterInstanceOptions) error { |
- // Fill in default options. |
- if options.ServiceURL == "" { |
- options.ServiceURL = DefaultServiceURL() |
- } |
- if options.Client == nil { |
- options.Client = http.DefaultClient |
- } |
- if options.Log == nil { |
- options.Log = logging.DefaultLogger |
- } |
- log := options.Log |
- inst := options.PackageInstance |
- remote := newRemoteService(options.Client, options.ServiceURL, log) |
- |
- // Attempt to register. |
- result, err := remote.registerInstance(inst.PackageName(), inst.InstanceID()) |
- if err != nil { |
- return err |
- } |
- |
- // Asked to upload the package file to CAS first? |
- if result.UploadSession != nil { |
- err = UploadToCAS(UploadToCASOptions{ |
- UploadOptions: options.UploadOptions, |
- SHA1: inst.InstanceID(), |
- Data: inst.DataReader(), |
- UploadSessionID: result.UploadSession.ID, |
- UploadURL: result.UploadSession.URL, |
- remote: remote, |
- }) |
- if err != nil { |
- return err |
- } |
- // Try again, now that file is uploaded. |
- result, err = remote.registerInstance(inst.PackageName(), inst.InstanceID()) |
- if err != nil { |
- return err |
- } |
- if result.UploadSession != nil { |
- return errors.New("Package file is uploaded, but servers asks us to upload it again") |
- } |
- } |
- |
- if result.AlreadyRegistered { |
- log.Infof( |
- "cipd: instance %s:%s is already registered by %s on %s", |
- inst.PackageName(), inst.InstanceID(), |
- result.Info.RegisteredBy, result.Info.RegisteredTs) |
- } else { |
- log.Infof("cipd: instance %s:%s was successfully registered", inst.PackageName(), inst.InstanceID()) |
- } |
- |
- return attachTagsWhenReady(remote, inst.PackageName(), inst.InstanceID(), options.Tags, log) |
-} |
- |
-//////////////////////////////////////////////////////////////////////////////// |
-// Google Storage resumable upload protocol. |
-// See https://cloud.google.com/storage/docs/concepts-techniques#resumable |
- |
-// errTransientError is returned by getNextOffset in case of retryable error. |
-var errTransientError = errors.New("Transient error in getUploadedOffset") |
- |
-// resumableUpload is mocked in tests. |
-var resumableUpload = func(uploadURL string, chunkSize int64, opts UploadToCASOptions) error { |
- // Grab the total length of the file. |
- length, err := opts.Data.Seek(0, os.SEEK_END) |
- if err != nil { |
- return err |
- } |
- _, err = opts.Data.Seek(0, os.SEEK_SET) |
- if err != nil { |
- return err |
- } |
- |
- // Called when some new data is uploaded. |
- reportProgress := func(offset int64) { |
- if length != 0 { |
- opts.Log.Infof("cipd: uploading %s: %d%%", opts.SHA1, offset*100/length) |
- } |
- } |
- |
- // Called when transient error happens. |
- reportTransientError := func() { |
- opts.Log.Warningf("cipd: transient upload error, retrying...") |
- clock.Sleep(2 * time.Second) |
- } |
- |
- var offset int64 |
- reportProgress(0) |
- for { |
- // Grab latest uploaded offset if not known. |
- if offset == -1 { |
- offset, err = getNextOffset(uploadURL, length, opts.Client) |
- if err == errTransientError { |
- offset = -1 |
- reportTransientError() |
- continue |
- } |
- if err != nil { |
- return err |
- } |
- reportProgress(offset) |
- if offset == length { |
- return nil |
- } |
- opts.Log.Warningf("cipd: resuming upload from offset %d", offset) |
- } |
- |
- // Length of a chunk to upload. |
- var chunk int64 = chunkSize |
- if chunk > length-offset { |
- chunk = length - offset |
- } |
- |
- // Upload the chunk. |
- opts.Data.Seek(offset, os.SEEK_SET) |
- r, err := http.NewRequest("PUT", uploadURL, io.LimitReader(opts.Data, chunk)) |
- if err != nil { |
- return err |
- } |
- rangeHeader := fmt.Sprintf("bytes %d-%d/%d", offset, offset+chunk-1, length) |
- r.Header.Set("Content-Range", rangeHeader) |
- r.Header.Set("Content-Length", fmt.Sprintf("%d", chunk)) |
- r.Header.Set("User-Agent", userAgent()) |
- resp, err := opts.Client.Do(r) |
- if err != nil { |
- return err |
- } |
- resp.Body.Close() |
- |
- // Partially or fully uploaded. |
- if resp.StatusCode == 308 || resp.StatusCode == 200 { |
- offset += chunk |
- reportProgress(offset) |
- if offset == length { |
- return nil |
- } |
- } else if resp.StatusCode < 500 && resp.StatusCode != 408 { |
- return fmt.Errorf("Unexpected response during file upload: HTTP %d", resp.StatusCode) |
- } else { |
- // Transient error. Need to query for latest uploaded offset to resume. |
- offset = -1 |
- reportTransientError() |
- } |
- } |
-} |
- |
-// getNextOffset queries the storage for size of persisted data. |
-func getNextOffset(uploadURL string, length int64, client *http.Client) (offset int64, err error) { |
- r, err := http.NewRequest("PUT", uploadURL, nil) |
- if err != nil { |
- return |
- } |
- r.Header.Set("Content-Range", fmt.Sprintf("bytes */%d", length)) |
- r.Header.Set("Content-Length", "0") |
- r.Header.Set("User-Agent", userAgent()) |
- resp, err := client.Do(r) |
- if err != nil { |
- return |
- } |
- resp.Body.Close() |
- |
- if resp.StatusCode == 200 { |
- // Fully uploaded. |
- offset = length |
- } else if resp.StatusCode == 308 { |
- // Partially uploaded, extract last uploaded offset from Range header. |
- rangeHeader := resp.Header.Get("Range") |
- if rangeHeader != "" { |
- _, err = fmt.Sscanf(rangeHeader, "bytes=0-%d", &offset) |
- if err == nil { |
- // |offset| is an *offset* of a last uploaded byte, not the data length. |
- offset++ |
- } |
- } |
- } else if resp.StatusCode < 500 && resp.StatusCode != 408 { |
- err = fmt.Errorf("Unexpected response (HTTP %d) when querying for uploaded offset", resp.StatusCode) |
- } else { |
- err = errTransientError |
- } |
- return |
-} |
- |
-//////////////////////////////////////////////////////////////////////////////// |
-// Tags related functions. |
- |
-// attachTagsWhenReady attaches tags to an instance retrying when receiving |
-// PROCESSING_NOT_FINISHED_YET errors. |
-func attachTagsWhenReady(remote *remoteService, packageName, instanceID string, tags []string, log logging.Logger) error { |
- if len(tags) == 0 { |
- return nil |
- } |
- for _, tag := range tags { |
- log.Infof("cipd: attaching tag %s", tag) |
- } |
- deadline := clock.Now().Add(60 * time.Second) |
- for clock.Now().Before(deadline) { |
- err := remote.attachTags(packageName, instanceID, tags) |
- if err == nil { |
- log.Infof("cipd: all tags attached") |
- return nil |
- } |
- if _, ok := err.(*pendingProcessingError); ok { |
- log.Warningf("cipd: package instance is not ready yet - %s", err) |
- clock.Sleep(5 * time.Second) |
- } else { |
- log.Errorf("cipd: failed to attach tags - %s", err) |
- return err |
- } |
- } |
- log.Errorf("cipd: failed to attach tags - deadline exceeded") |
- return ErrAttachTagsTimeout |
-} |