Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(67)

Unified Diff: go/src/infra/tools/cipd/storage.go

Issue 1129043003: cipd: Refactor client to make it more readable. (Closed) Base URL: https://chromium.googlesource.com/infra/infra.git@master
Patch Set: Created 5 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « go/src/infra/tools/cipd/remote_test.go ('k') | go/src/infra/tools/cipd/storage_test.go » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: go/src/infra/tools/cipd/storage.go
diff --git a/go/src/infra/tools/cipd/storage.go b/go/src/infra/tools/cipd/storage.go
new file mode 100644
index 0000000000000000000000000000000000000000..787dfeff33dcbc41db0a5a16dadde4578ff26356
--- /dev/null
+++ b/go/src/infra/tools/cipd/storage.go
@@ -0,0 +1,278 @@
+// Copyright 2015 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+package cipd
+
+import (
+ "errors"
+ "fmt"
+ "io"
+ "net/http"
+ "os"
+ "time"
+)
+
+const (
+ // uploadChunkSize is the max size of a single PUT HTTP request during upload.
+ uploadChunkSize int64 = 4 * 1024 * 1024
+ // uploadMaxErrors is the number of transient errors after which upload is aborted.
+ uploadMaxErrors = 20
+ // downloadReportInterval defines frequency of "downloaded X%" log lines.
+ downloadReportInterval = 5 * time.Second
+ // downloadMaxAttempts is how many times to retry a download on errors.
+ downloadMaxAttempts = 10
+)
+
+// errTransientError is returned by getNextOffset in case of retryable error.
+var errTransientError = errors.New("Transient error in getUploadedOffset")
+
+// storageImpl implements storage via Google Storage signed URLs.
+type storageImpl struct {
+ client *Client
+ chunkSize int64
+}
+
+// Google Storage resumable upload protocol.
+// See https://cloud.google.com/storage/docs/concepts-techniques#resumable
+
+func (s *storageImpl) upload(url string, data io.ReadSeeker) error {
+ // Grab the total length of the file.
+ length, err := data.Seek(0, os.SEEK_END)
+ if err != nil {
+ return err
+ }
+ _, err = data.Seek(0, os.SEEK_SET)
+ if err != nil {
+ return err
+ }
+
+ // Offset of data known to be persisted in the storage or -1 if needed to ask
+ // Google Storage for it.
+ offset := int64(0)
+ // Number of transient errors reported so far.
+ errors := 0
+
+ // Called when some new data is uploaded.
+ reportProgress := func(offset int64) {
+ if length != 0 {
+ s.client.Log.Infof("cipd: uploading - %d%%", offset*100/length)
+ }
+ }
+
+ // Called when transient error happens.
+ reportTransientError := func() {
+ // Need to query for latest uploaded offset to resume.
+ errors++
+ offset = -1
+ if errors < uploadMaxErrors {
+ s.client.Log.Warningf("cipd: transient upload error, retrying...")
+ s.client.clock.sleep(2 * time.Second)
+ }
+ }
+
+ reportProgress(0)
+ for errors < uploadMaxErrors {
+ // Grab latest uploaded offset if not known.
+ if offset == -1 {
+ offset, err = s.getNextOffset(url, length)
+ if err == errTransientError {
+ reportTransientError()
+ continue
+ }
+ if err != nil {
+ return err
+ }
+ reportProgress(offset)
+ if offset == length {
+ return nil
+ }
+ s.client.Log.Warningf("cipd: resuming upload from offset %d", offset)
+ }
+
+ // Length of a chunk to upload.
+ chunk := s.chunkSize
+ if chunk > length-offset {
+ chunk = length - offset
+ }
+
+ // Upload the chunk.
+ data.Seek(offset, os.SEEK_SET)
+ r, err := http.NewRequest("PUT", url, io.LimitReader(data, chunk))
+ if err != nil {
+ return err
+ }
+ rangeHeader := fmt.Sprintf("bytes %d-%d/%d", offset, offset+chunk-1, length)
+ r.Header.Set("Content-Range", rangeHeader)
+ r.Header.Set("Content-Length", fmt.Sprintf("%d", chunk))
+ r.Header.Set("User-Agent", s.client.UserAgent)
+ resp, err := s.client.doAnonymousHTTPRequest(r)
+ if err != nil {
+ if isTemporaryNetError(err) {
+ reportTransientError()
+ continue
+ }
+ return err
+ }
+ resp.Body.Close()
+
+ // Partially or fully uploaded.
+ if resp.StatusCode == 308 || resp.StatusCode == 200 {
+ offset += chunk
+ reportProgress(offset)
+ if offset == length {
+ return nil
+ }
+ continue
+ }
+
+ // Transient error.
+ if isTemporaryHTTPError(resp.StatusCode) {
+ reportTransientError()
+ continue
+ }
+
+ // Fatal error.
+ return fmt.Errorf("Unexpected response during file upload: HTTP %d", resp.StatusCode)
+ }
+
+ return ErrUploadError
+}
+
+// getNextOffset queries the storage for size of persisted data.
+func (s *storageImpl) getNextOffset(url string, length int64) (offset int64, err error) {
+ r, err := http.NewRequest("PUT", url, nil)
+ if err != nil {
+ return
+ }
+ r.Header.Set("Content-Range", fmt.Sprintf("bytes */%d", length))
+ r.Header.Set("Content-Length", "0")
+ r.Header.Set("User-Agent", s.client.UserAgent)
+ resp, err := s.client.doAnonymousHTTPRequest(r)
+ if err != nil {
+ if isTemporaryNetError(err) {
+ err = errTransientError
+ }
+ return
+ }
+ resp.Body.Close()
+
+ if resp.StatusCode == 200 {
+ // Fully uploaded.
+ offset = length
+ } else if resp.StatusCode == 308 {
+ // Partially uploaded, extract last uploaded offset from Range header.
+ rangeHeader := resp.Header.Get("Range")
+ if rangeHeader != "" {
+ _, err = fmt.Sscanf(rangeHeader, "bytes=0-%d", &offset)
+ if err == nil {
+ // |offset| is an *offset* of a last uploaded byte, not the data length.
+ offset++
+ }
+ }
+ } else if isTemporaryHTTPError(resp.StatusCode) {
+ err = errTransientError
+ } else {
+ err = fmt.Errorf("Unexpected response (HTTP %d) when querying for uploaded offset", resp.StatusCode)
+ }
+ return
+}
+
+// TODO(vadimsh): Use resumable download protocol.
+
+func (s *storageImpl) download(url string, output io.WriteSeeker) error {
+ // reportProgress print fetch progress, throttling the reports rate.
+ var prevProgress int64 = 1000 // >100%
+ var prevReportTs time.Time
+ reportProgress := func(read int64, total int64) {
+ now := s.client.clock.now()
+ progress := read * 100 / total
+ if progress < prevProgress || read == total || now.Sub(prevReportTs) > downloadReportInterval {
+ s.client.Log.Infof("cipd: fetching - %d%%", progress)
+ prevReportTs = now
+ prevProgress = progress
+ }
+ }
+
+ // download is a separate function to be able to use deferred close.
+ download := func(out io.WriteSeeker, src io.ReadCloser, totalLen int64) error {
+ defer src.Close()
+ s.client.Log.Infof("cipd: about to fetch %.1f Mb", float32(totalLen)/1024.0/1024.0)
+ reportProgress(0, totalLen)
+ _, err := io.Copy(out, &readerWithProgress{
+ reader: src,
+ callback: func(read int64) { reportProgress(read, totalLen) },
+ })
+ if err == nil {
+ s.client.Log.Infof("cipd: fetch finished successfully")
+ }
+ return err
+ }
+
+ // Download the actual data (several attempts).
+ for attempt := 0; attempt < downloadMaxAttempts; attempt++ {
+ // Rewind output to zero offset.
+ _, err := output.Seek(0, os.SEEK_SET)
+ if err != nil {
+ return err
+ }
+
+ // Send the request.
+ s.client.Log.Infof("cipd: initiating the fetch")
+ var req *http.Request
+ var resp *http.Response
+ req, err = http.NewRequest("GET", url, nil)
+ if err != nil {
+ return err
+ }
+ req.Header.Set("User-Agent", s.client.UserAgent)
+ resp, err = s.client.doAnonymousHTTPRequest(req)
+ if err != nil {
+ if isTemporaryNetError(err) {
+ s.client.Log.Warningf("cipd: transient network error: %s", err)
+ continue
+ }
+ return err
+ }
+
+ // Transient error, retry.
+ if isTemporaryHTTPError(resp.StatusCode) {
+ resp.Body.Close()
+ s.client.Log.Warningf("cipd: transient HTTP error %d while fetching the file", resp.StatusCode)
+ continue
+ }
+
+ // Fatal error, abort.
+ if resp.StatusCode >= 400 {
+ resp.Body.Close()
+ return fmt.Errorf("Server replied with HTTP code %d", resp.StatusCode)
+ }
+
+ // Try to fetch (will close resp.Body when done).
+ err = download(output, resp.Body, resp.ContentLength)
+ if err != nil {
+ s.client.Log.Warningf("cipd: transient error fetching the file: %s", err)
+ continue
+ }
+
+ // Success.
+ return nil
+ }
+
+ return ErrDownloadError
+}
+
+// readerWithProgress is io.Reader that calls callback whenever something is
+// read from it.
+type readerWithProgress struct {
+ reader io.Reader
+ total int64
+ callback func(total int64)
+}
+
+func (r *readerWithProgress) Read(p []byte) (int, error) {
+ n, err := r.reader.Read(p)
+ r.total += int64(n)
+ r.callback(r.total)
+ return n, err
+}
« no previous file with comments | « go/src/infra/tools/cipd/remote_test.go ('k') | go/src/infra/tools/cipd/storage_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698