OLD | NEW |
(Empty) | |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. |
| 4 |
| 5 package cipd |
| 6 |
| 7 import ( |
| 8 "errors" |
| 9 "fmt" |
| 10 "io" |
| 11 "net/http" |
| 12 "os" |
| 13 "time" |
| 14 ) |
| 15 |
| 16 const ( |
| 17 // uploadChunkSize is the max size of a single PUT HTTP request during u
pload. |
| 18 uploadChunkSize int64 = 4 * 1024 * 1024 |
| 19 // uploadMaxErrors is the number of transient errors after which upload
is aborted. |
| 20 uploadMaxErrors = 20 |
| 21 // downloadReportInterval defines frequency of "downloaded X%" log lines
. |
| 22 downloadReportInterval = 5 * time.Second |
| 23 // downloadMaxAttempts is how many times to retry a download on errors. |
| 24 downloadMaxAttempts = 10 |
| 25 ) |
| 26 |
| 27 // errTransientError is returned by getNextOffset in case of retryable error. |
| 28 var errTransientError = errors.New("Transient error in getUploadedOffset") |
| 29 |
| 30 // storageImpl implements storage via Google Storage signed URLs. |
| 31 type storageImpl struct { |
| 32 client *Client |
| 33 chunkSize int64 |
| 34 } |
| 35 |
| 36 // Google Storage resumable upload protocol. |
| 37 // See https://cloud.google.com/storage/docs/concepts-techniques#resumable |
| 38 |
| 39 func (s *storageImpl) upload(url string, data io.ReadSeeker) error { |
| 40 // Grab the total length of the file. |
| 41 length, err := data.Seek(0, os.SEEK_END) |
| 42 if err != nil { |
| 43 return err |
| 44 } |
| 45 _, err = data.Seek(0, os.SEEK_SET) |
| 46 if err != nil { |
| 47 return err |
| 48 } |
| 49 |
| 50 // Offset of data known to be persisted in the storage or -1 if needed t
o ask |
| 51 // Google Storage for it. |
| 52 offset := int64(0) |
| 53 // Number of transient errors reported so far. |
| 54 errors := 0 |
| 55 |
| 56 // Called when some new data is uploaded. |
| 57 reportProgress := func(offset int64) { |
| 58 if length != 0 { |
| 59 s.client.Log.Infof("cipd: uploading - %d%%", offset*100/
length) |
| 60 } |
| 61 } |
| 62 |
| 63 // Called when transient error happens. |
| 64 reportTransientError := func() { |
| 65 // Need to query for latest uploaded offset to resume. |
| 66 errors++ |
| 67 offset = -1 |
| 68 if errors < uploadMaxErrors { |
| 69 s.client.Log.Warningf("cipd: transient upload error, ret
rying...") |
| 70 s.client.clock.sleep(2 * time.Second) |
| 71 } |
| 72 } |
| 73 |
| 74 reportProgress(0) |
| 75 for errors < uploadMaxErrors { |
| 76 // Grab latest uploaded offset if not known. |
| 77 if offset == -1 { |
| 78 offset, err = s.getNextOffset(url, length) |
| 79 if err == errTransientError { |
| 80 reportTransientError() |
| 81 continue |
| 82 } |
| 83 if err != nil { |
| 84 return err |
| 85 } |
| 86 reportProgress(offset) |
| 87 if offset == length { |
| 88 return nil |
| 89 } |
| 90 s.client.Log.Warningf("cipd: resuming upload from offset
%d", offset) |
| 91 } |
| 92 |
| 93 // Length of a chunk to upload. |
| 94 chunk := s.chunkSize |
| 95 if chunk > length-offset { |
| 96 chunk = length - offset |
| 97 } |
| 98 |
| 99 // Upload the chunk. |
| 100 data.Seek(offset, os.SEEK_SET) |
| 101 r, err := http.NewRequest("PUT", url, io.LimitReader(data, chunk
)) |
| 102 if err != nil { |
| 103 return err |
| 104 } |
| 105 rangeHeader := fmt.Sprintf("bytes %d-%d/%d", offset, offset+chun
k-1, length) |
| 106 r.Header.Set("Content-Range", rangeHeader) |
| 107 r.Header.Set("Content-Length", fmt.Sprintf("%d", chunk)) |
| 108 r.Header.Set("User-Agent", s.client.UserAgent) |
| 109 resp, err := s.client.doAnonymousHTTPRequest(r) |
| 110 if err != nil { |
| 111 if isTemporaryNetError(err) { |
| 112 reportTransientError() |
| 113 continue |
| 114 } |
| 115 return err |
| 116 } |
| 117 resp.Body.Close() |
| 118 |
| 119 // Partially or fully uploaded. |
| 120 if resp.StatusCode == 308 || resp.StatusCode == 200 { |
| 121 offset += chunk |
| 122 reportProgress(offset) |
| 123 if offset == length { |
| 124 return nil |
| 125 } |
| 126 continue |
| 127 } |
| 128 |
| 129 // Transient error. |
| 130 if isTemporaryHTTPError(resp.StatusCode) { |
| 131 reportTransientError() |
| 132 continue |
| 133 } |
| 134 |
| 135 // Fatal error. |
| 136 return fmt.Errorf("Unexpected response during file upload: HTTP
%d", resp.StatusCode) |
| 137 } |
| 138 |
| 139 return ErrUploadError |
| 140 } |
| 141 |
| 142 // getNextOffset queries the storage for size of persisted data. |
| 143 func (s *storageImpl) getNextOffset(url string, length int64) (offset int64, err
error) { |
| 144 r, err := http.NewRequest("PUT", url, nil) |
| 145 if err != nil { |
| 146 return |
| 147 } |
| 148 r.Header.Set("Content-Range", fmt.Sprintf("bytes */%d", length)) |
| 149 r.Header.Set("Content-Length", "0") |
| 150 r.Header.Set("User-Agent", s.client.UserAgent) |
| 151 resp, err := s.client.doAnonymousHTTPRequest(r) |
| 152 if err != nil { |
| 153 if isTemporaryNetError(err) { |
| 154 err = errTransientError |
| 155 } |
| 156 return |
| 157 } |
| 158 resp.Body.Close() |
| 159 |
| 160 if resp.StatusCode == 200 { |
| 161 // Fully uploaded. |
| 162 offset = length |
| 163 } else if resp.StatusCode == 308 { |
| 164 // Partially uploaded, extract last uploaded offset from Range h
eader. |
| 165 rangeHeader := resp.Header.Get("Range") |
| 166 if rangeHeader != "" { |
| 167 _, err = fmt.Sscanf(rangeHeader, "bytes=0-%d", &offset) |
| 168 if err == nil { |
| 169 // |offset| is an *offset* of a last uploaded by
te, not the data length. |
| 170 offset++ |
| 171 } |
| 172 } |
| 173 } else if isTemporaryHTTPError(resp.StatusCode) { |
| 174 err = errTransientError |
| 175 } else { |
| 176 err = fmt.Errorf("Unexpected response (HTTP %d) when querying fo
r uploaded offset", resp.StatusCode) |
| 177 } |
| 178 return |
| 179 } |
| 180 |
| 181 // TODO(vadimsh): Use resumable download protocol. |
| 182 |
| 183 func (s *storageImpl) download(url string, output io.WriteSeeker) error { |
| 184 // reportProgress print fetch progress, throttling the reports rate. |
| 185 var prevProgress int64 = 1000 // >100% |
| 186 var prevReportTs time.Time |
| 187 reportProgress := func(read int64, total int64) { |
| 188 now := s.client.clock.now() |
| 189 progress := read * 100 / total |
| 190 if progress < prevProgress || read == total || now.Sub(prevRepor
tTs) > downloadReportInterval { |
| 191 s.client.Log.Infof("cipd: fetching - %d%%", progress) |
| 192 prevReportTs = now |
| 193 prevProgress = progress |
| 194 } |
| 195 } |
| 196 |
| 197 // download is a separate function to be able to use deferred close. |
| 198 download := func(out io.WriteSeeker, src io.ReadCloser, totalLen int64)
error { |
| 199 defer src.Close() |
| 200 s.client.Log.Infof("cipd: about to fetch %.1f Mb", float32(total
Len)/1024.0/1024.0) |
| 201 reportProgress(0, totalLen) |
| 202 _, err := io.Copy(out, &readerWithProgress{ |
| 203 reader: src, |
| 204 callback: func(read int64) { reportProgress(read, totalL
en) }, |
| 205 }) |
| 206 if err == nil { |
| 207 s.client.Log.Infof("cipd: fetch finished successfully") |
| 208 } |
| 209 return err |
| 210 } |
| 211 |
| 212 // Download the actual data (several attempts). |
| 213 for attempt := 0; attempt < downloadMaxAttempts; attempt++ { |
| 214 // Rewind output to zero offset. |
| 215 _, err := output.Seek(0, os.SEEK_SET) |
| 216 if err != nil { |
| 217 return err |
| 218 } |
| 219 |
| 220 // Send the request. |
| 221 s.client.Log.Infof("cipd: initiating the fetch") |
| 222 var req *http.Request |
| 223 var resp *http.Response |
| 224 req, err = http.NewRequest("GET", url, nil) |
| 225 if err != nil { |
| 226 return err |
| 227 } |
| 228 req.Header.Set("User-Agent", s.client.UserAgent) |
| 229 resp, err = s.client.doAnonymousHTTPRequest(req) |
| 230 if err != nil { |
| 231 if isTemporaryNetError(err) { |
| 232 s.client.Log.Warningf("cipd: transient network e
rror: %s", err) |
| 233 continue |
| 234 } |
| 235 return err |
| 236 } |
| 237 |
| 238 // Transient error, retry. |
| 239 if isTemporaryHTTPError(resp.StatusCode) { |
| 240 resp.Body.Close() |
| 241 s.client.Log.Warningf("cipd: transient HTTP error %d whi
le fetching the file", resp.StatusCode) |
| 242 continue |
| 243 } |
| 244 |
| 245 // Fatal error, abort. |
| 246 if resp.StatusCode >= 400 { |
| 247 resp.Body.Close() |
| 248 return fmt.Errorf("Server replied with HTTP code %d", re
sp.StatusCode) |
| 249 } |
| 250 |
| 251 // Try to fetch (will close resp.Body when done). |
| 252 err = download(output, resp.Body, resp.ContentLength) |
| 253 if err != nil { |
| 254 s.client.Log.Warningf("cipd: transient error fetching th
e file: %s", err) |
| 255 continue |
| 256 } |
| 257 |
| 258 // Success. |
| 259 return nil |
| 260 } |
| 261 |
| 262 return ErrDownloadError |
| 263 } |
| 264 |
| 265 // readerWithProgress is io.Reader that calls callback whenever something is |
| 266 // read from it. |
| 267 type readerWithProgress struct { |
| 268 reader io.Reader |
| 269 total int64 |
| 270 callback func(total int64) |
| 271 } |
| 272 |
| 273 func (r *readerWithProgress) Read(p []byte) (int, error) { |
| 274 n, err := r.reader.Read(p) |
| 275 r.total += int64(n) |
| 276 r.callback(r.total) |
| 277 return n, err |
| 278 } |
OLD | NEW |