Chromium Code Reviews| Index: go/src/infra/tools/cipd/storage.go |
| diff --git a/go/src/infra/tools/cipd/storage.go b/go/src/infra/tools/cipd/storage.go |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..34bc23925acfc4215c39bcedf8f10e4d1ad03c62 |
| --- /dev/null |
| +++ b/go/src/infra/tools/cipd/storage.go |
| @@ -0,0 +1,274 @@ |
| +// Copyright 2015 The Chromium Authors. All rights reserved. |
| +// Use of this source code is governed by a BSD-style license that can be |
| +// found in the LICENSE file. |
| + |
| +package cipd |
| + |
| +import ( |
| + "errors" |
| + "fmt" |
| + "io" |
| + "net/http" |
| + "os" |
| + "time" |
| +) |
| + |
| +const ( |
| + // uploadChunkSize is the max size of a single PUT HTTP request during upload. |
| + uploadChunkSize int64 = 4 * 1024 * 1024 |
| + // uploadMaxErrors is the number of transient errors after which upload is aborted. |
| + uploadMaxErrors = 20 |
| + // downloadReportInterval defines frequency of "downloaded X%" log lines. |
| + downloadReportInterval = 5 * time.Second |
| + // downloadMaxAttempts is how many times to retry a download on errors. |
| + downloadMaxAttempts = 10 |
| +) |
| + |
| +// errTransientError is returned by getNextOffset in case of retryable error. |
| +var errTransientError = errors.New("Transient error in getUploadedOffset") |
| + |
| +// storageImpl implements storage via Google Storage signed URLs. |
| +type storageImpl struct { |
| + client *Client |
| + chunkSize int64 |
| +} |
| + |
| +// Google Storage resumable upload protocol. |
| +// See https://cloud.google.com/storage/docs/concepts-techniques#resumable |
| + |
| +func (s *storageImpl) upload(url string, data io.ReadSeeker) error { |
| + // Grab the total length of the file. |
| + length, err := data.Seek(0, os.SEEK_END) |
| + if err != nil { |
| + return err |
| + } |
| + _, err = data.Seek(0, os.SEEK_SET) |
| + if err != nil { |
| + return err |
| + } |
| + |
| + // Offset of data known to be persisted in the storage or -1 if needed to ask |
| + // Google Storage for it. |
| + offset := int64(0) |
| + // Number of transient errors reported so far. |
| + errors := 0 |
| + |
| + // Called when some new data is uploaded. |
| + reportProgress := func(offset int64) { |
| + if length != 0 { |
| + s.client.Log.Infof("cipd: uploading - %d%%", offset*100/length) |
| + } |
| + } |
| + |
| + // Called when transient error happens. |
| + reportTransientError := func() { |
| + // Need to query for latest uploaded offset to resume. |
| + offset = -1 |
|
nodir
2015/05/13 01:25:29
errors += 1
Vadim Sh.
2015/05/13 03:08:06
Done. Also deduplicated HTTP status code handling
|
| + s.client.Log.Warningf("cipd: transient upload error, retrying...") |
| + s.client.clock.sleep(2 * time.Second) |
| + } |
| + |
| + reportProgress(0) |
| + for errors < uploadMaxErrors { |
| + // Grab latest uploaded offset if not known. |
| + if offset == -1 { |
| + offset, err = s.getNextOffset(url, length) |
| + if err == errTransientError { |
| + reportTransientError() |
| + continue |
| + } |
| + if err != nil { |
| + return err |
| + } |
| + reportProgress(offset) |
| + if offset == length { |
| + return nil |
| + } |
| + s.client.Log.Warningf("cipd: resuming upload from offset %d", offset) |
| + } |
| + |
| + // Length of a chunk to upload. |
| + chunk := s.chunkSize |
| + if chunk > length-offset { |
| + chunk = length - offset |
| + } |
| + |
| + // Upload the chunk. |
| + data.Seek(offset, os.SEEK_SET) |
| + r, err := http.NewRequest("PUT", url, io.LimitReader(data, chunk)) |
| + if err != nil { |
| + return err |
| + } |
| + rangeHeader := fmt.Sprintf("bytes %d-%d/%d", offset, offset+chunk-1, length) |
| + r.Header.Set("Content-Range", rangeHeader) |
| + r.Header.Set("Content-Length", fmt.Sprintf("%d", chunk)) |
| + r.Header.Set("User-Agent", s.client.UserAgent) |
| + resp, err := s.client.doAnonymousHTTPRequest(r) |
| + if err != nil { |
| + if isTemporaryNetError(err) { |
| + reportTransientError() |
| + continue |
| + } |
| + return err |
| + } |
| + resp.Body.Close() |
| + |
| + // Partially or fully uploaded. |
| + if resp.StatusCode == 308 || resp.StatusCode == 200 { |
| + offset += chunk |
| + reportProgress(offset) |
| + if offset == length { |
| + return nil |
| + } |
| + continue |
| + } |
| + |
| + // Fatal error. |
| + if resp.StatusCode < 500 && resp.StatusCode != 408 { |
| + return fmt.Errorf("Unexpected response during file upload: HTTP %d", resp.StatusCode) |
| + } |
| + |
| + // Transient error. |
| + reportTransientError() |
| + } |
| + |
| + return ErrUploadError |
| +} |
| + |
| +// getNextOffset queries the storage for size of persisted data. |
| +func (s *storageImpl) getNextOffset(url string, length int64) (offset int64, err error) { |
| + r, err := http.NewRequest("PUT", url, nil) |
| + if err != nil { |
| + return |
| + } |
| + r.Header.Set("Content-Range", fmt.Sprintf("bytes */%d", length)) |
| + r.Header.Set("Content-Length", "0") |
| + r.Header.Set("User-Agent", s.client.UserAgent) |
| + resp, err := s.client.doAnonymousHTTPRequest(r) |
| + if err != nil { |
| + if isTemporaryNetError(err) { |
| + err = errTransientError |
| + } |
| + return |
| + } |
| + resp.Body.Close() |
| + |
| + if resp.StatusCode == 200 { |
| + // Fully uploaded. |
| + offset = length |
| + } else if resp.StatusCode == 308 { |
| + // Partially uploaded, extract last uploaded offset from Range header. |
| + rangeHeader := resp.Header.Get("Range") |
| + if rangeHeader != "" { |
| + _, err = fmt.Sscanf(rangeHeader, "bytes=0-%d", &offset) |
| + if err == nil { |
| + // |offset| is an *offset* of a last uploaded byte, not the data length. |
| + offset++ |
| + } |
| + } |
| + } else if resp.StatusCode < 500 && resp.StatusCode != 408 { |
| + err = fmt.Errorf("Unexpected response (HTTP %d) when querying for uploaded offset", resp.StatusCode) |
| + } else { |
| + err = errTransientError |
| + } |
| + return |
| +} |
| + |
| +// TODO(vadimsh): Use resumable download protocol. |
| + |
| +func (s *storageImpl) download(url string, output io.WriteSeeker) error { |
| + // reportProgress print fetch progress, throttling the reports rate. |
| + var prevProgress int64 = 1000 // >100% |
| + var prevReportTs time.Time |
| + reportProgress := func(read int64, total int64) { |
| + now := s.client.clock.now() |
| + progress := read * 100 / total |
| + if progress < prevProgress || read == total || now.Sub(prevReportTs) > downloadReportInterval { |
| + s.client.Log.Infof("cipd: fetching - %d%%", progress) |
| + prevReportTs = now |
| + prevProgress = progress |
| + } |
| + } |
| + |
| + // download is a separate function to be able to use deferred close. |
| + download := func(out io.WriteSeeker, src io.ReadCloser, totalLen int64) error { |
| + defer src.Close() |
| + s.client.Log.Infof("cipd: about to fetch %.1f Mb", float32(totalLen)/1024.0/1024.0) |
| + reportProgress(0, totalLen) |
| + _, err := io.Copy(out, &readerWithProgress{ |
| + reader: src, |
| + callback: func(read int64) { reportProgress(read, totalLen) }, |
| + }) |
| + if err == nil { |
| + s.client.Log.Infof("cipd: fetch finished successfully") |
| + } |
| + return err |
| + } |
| + |
| + // Download the actual data (several attempts). |
| + for attempt := 0; attempt < downloadMaxAttempts; attempt++ { |
| + // Rewind output to zero offset. |
| + _, err := output.Seek(0, os.SEEK_SET) |
| + if err != nil { |
| + return err |
| + } |
| + |
| + // Send the request. |
| + s.client.Log.Infof("cipd: initiating the fetch") |
| + var req *http.Request |
| + var resp *http.Response |
| + req, err = http.NewRequest("GET", url, nil) |
| + if err != nil { |
| + return err |
| + } |
| + req.Header.Set("User-Agent", s.client.UserAgent) |
| + resp, err = s.client.doAnonymousHTTPRequest(req) |
| + if err != nil { |
| + if isTemporaryNetError(err) { |
| + s.client.Log.Warningf("cipd: transient network error: %s", err) |
| + continue |
| + } |
| + return err |
| + } |
| + |
| + // Transient error, retry. |
| + if resp.StatusCode == 408 || resp.StatusCode >= 500 { |
| + resp.Body.Close() |
| + s.client.Log.Warningf("cipd: transient HTTP error %d while fetching the file", resp.StatusCode) |
| + continue |
| + } |
| + |
| + // Fatal error, abort. |
| + if resp.StatusCode >= 400 { |
| + resp.Body.Close() |
| + return fmt.Errorf("Server replied with HTTP code %d", resp.StatusCode) |
| + } |
| + |
| + // Try to fetch (will close resp.Body when done). |
| + err = download(output, resp.Body, resp.ContentLength) |
| + if err != nil { |
| + s.client.Log.Warningf("cipd: transient error fetching the file: %s", err) |
| + continue |
| + } |
| + |
| + // Success. |
| + return nil |
| + } |
| + |
| + return ErrDownloadError |
| +} |
| + |
| +// readerWithProgress is io.Reader that calls callback whenever something is |
| +// read from it. |
| +type readerWithProgress struct { |
| + reader io.Reader |
| + total int64 |
| + callback func(total int64) |
| +} |
| + |
| +func (r *readerWithProgress) Read(p []byte) (int, error) { |
| + n, err := r.reader.Read(p) |
| + r.total += int64(n) |
| + r.callback(r.total) |
| + return n, err |
| +} |