OLD | NEW |
(Empty) | |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. |
| 4 |
| 5 package cipd |
| 6 |
| 7 import ( |
| 8 "errors" |
| 9 "fmt" |
| 10 "io" |
| 11 "net/http" |
| 12 "os" |
| 13 "time" |
| 14 ) |
| 15 |
| 16 const ( |
| 17 // uploadChunkSize is the max size of a single PUT HTTP request during u
pload. |
| 18 uploadChunkSize int64 = 4 * 1024 * 1024 |
| 19 // uploadMaxErrors is the number of transient errors after which upload
is aborted. |
| 20 uploadMaxErrors = 20 |
| 21 // downloadReportInterval defines frequency of "downloaded X%" log lines
. |
| 22 downloadReportInterval = 5 * time.Second |
| 23 // downloadMaxAttempts is how many times to retry a download on errors. |
| 24 downloadMaxAttempts = 10 |
| 25 ) |
| 26 |
| 27 // errTransientError is returned by getNextOffset in case of retryable error. |
| 28 var errTransientError = errors.New("Transient error in getUploadedOffset") |
| 29 |
| 30 // storageImpl implements storage via Google Storage signed URLs. |
| 31 type storageImpl struct { |
| 32 client *Client |
| 33 chunkSize int64 |
| 34 } |
| 35 |
| 36 // Google Storage resumable upload protocol. |
| 37 // See https://cloud.google.com/storage/docs/concepts-techniques#resumable |
| 38 |
| 39 func (s *storageImpl) upload(url string, data io.ReadSeeker) error { |
| 40 // Grab the total length of the file. |
| 41 length, err := data.Seek(0, os.SEEK_END) |
| 42 if err != nil { |
| 43 return err |
| 44 } |
| 45 _, err = data.Seek(0, os.SEEK_SET) |
| 46 if err != nil { |
| 47 return err |
| 48 } |
| 49 |
| 50 // Offset of data known to be persisted in the storage or -1 if needed t
o ask |
| 51 // Google Storage for it. |
| 52 offset := int64(0) |
| 53 // Number of transient errors reported so far. |
| 54 errors := 0 |
| 55 |
| 56 // Called when some new data is uploaded. |
| 57 reportProgress := func(offset int64) { |
| 58 if length != 0 { |
| 59 s.client.Log.Infof("cipd: uploading - %d%%", offset*100/
length) |
| 60 } |
| 61 } |
| 62 |
| 63 // Called when transient error happens. |
| 64 reportTransientError := func() { |
| 65 // Need to query for latest uploaded offset to resume. |
| 66 offset = -1 |
| 67 s.client.Log.Warningf("cipd: transient upload error, retrying...
") |
| 68 s.client.clock.sleep(2 * time.Second) |
| 69 } |
| 70 |
| 71 reportProgress(0) |
| 72 for errors < uploadMaxErrors { |
| 73 // Grab latest uploaded offset if not known. |
| 74 if offset == -1 { |
| 75 offset, err = s.getNextOffset(url, length) |
| 76 if err == errTransientError { |
| 77 reportTransientError() |
| 78 continue |
| 79 } |
| 80 if err != nil { |
| 81 return err |
| 82 } |
| 83 reportProgress(offset) |
| 84 if offset == length { |
| 85 return nil |
| 86 } |
| 87 s.client.Log.Warningf("cipd: resuming upload from offset
%d", offset) |
| 88 } |
| 89 |
| 90 // Length of a chunk to upload. |
| 91 chunk := s.chunkSize |
| 92 if chunk > length-offset { |
| 93 chunk = length - offset |
| 94 } |
| 95 |
| 96 // Upload the chunk. |
| 97 data.Seek(offset, os.SEEK_SET) |
| 98 r, err := http.NewRequest("PUT", url, io.LimitReader(data, chunk
)) |
| 99 if err != nil { |
| 100 return err |
| 101 } |
| 102 rangeHeader := fmt.Sprintf("bytes %d-%d/%d", offset, offset+chun
k-1, length) |
| 103 r.Header.Set("Content-Range", rangeHeader) |
| 104 r.Header.Set("Content-Length", fmt.Sprintf("%d", chunk)) |
| 105 r.Header.Set("User-Agent", s.client.UserAgent) |
| 106 resp, err := s.client.doAnonymousHTTPRequest(r) |
| 107 if err != nil { |
| 108 if isTemporaryNetError(err) { |
| 109 reportTransientError() |
| 110 continue |
| 111 } |
| 112 return err |
| 113 } |
| 114 resp.Body.Close() |
| 115 |
| 116 // Partially or fully uploaded. |
| 117 if resp.StatusCode == 308 || resp.StatusCode == 200 { |
| 118 offset += chunk |
| 119 reportProgress(offset) |
| 120 if offset == length { |
| 121 return nil |
| 122 } |
| 123 continue |
| 124 } |
| 125 |
| 126 // Fatal error. |
| 127 if resp.StatusCode < 500 && resp.StatusCode != 408 { |
| 128 return fmt.Errorf("Unexpected response during file uploa
d: HTTP %d", resp.StatusCode) |
| 129 } |
| 130 |
| 131 // Transient error. |
| 132 reportTransientError() |
| 133 } |
| 134 |
| 135 return ErrUploadError |
| 136 } |
| 137 |
| 138 // getNextOffset queries the storage for size of persisted data. |
| 139 func (s *storageImpl) getNextOffset(url string, length int64) (offset int64, err
error) { |
| 140 r, err := http.NewRequest("PUT", url, nil) |
| 141 if err != nil { |
| 142 return |
| 143 } |
| 144 r.Header.Set("Content-Range", fmt.Sprintf("bytes */%d", length)) |
| 145 r.Header.Set("Content-Length", "0") |
| 146 r.Header.Set("User-Agent", s.client.UserAgent) |
| 147 resp, err := s.client.doAnonymousHTTPRequest(r) |
| 148 if err != nil { |
| 149 if isTemporaryNetError(err) { |
| 150 err = errTransientError |
| 151 } |
| 152 return |
| 153 } |
| 154 resp.Body.Close() |
| 155 |
| 156 if resp.StatusCode == 200 { |
| 157 // Fully uploaded. |
| 158 offset = length |
| 159 } else if resp.StatusCode == 308 { |
| 160 // Partially uploaded, extract last uploaded offset from Range h
eader. |
| 161 rangeHeader := resp.Header.Get("Range") |
| 162 if rangeHeader != "" { |
| 163 _, err = fmt.Sscanf(rangeHeader, "bytes=0-%d", &offset) |
| 164 if err == nil { |
| 165 // |offset| is an *offset* of a last uploaded by
te, not the data length. |
| 166 offset++ |
| 167 } |
| 168 } |
| 169 } else if resp.StatusCode < 500 && resp.StatusCode != 408 { |
| 170 err = fmt.Errorf("Unexpected response (HTTP %d) when querying fo
r uploaded offset", resp.StatusCode) |
| 171 } else { |
| 172 err = errTransientError |
| 173 } |
| 174 return |
| 175 } |
| 176 |
| 177 // TODO(vadimsh): Use resumable download protocol. |
| 178 |
| 179 func (s *storageImpl) download(url string, output io.WriteSeeker) error { |
| 180 // reportProgress print fetch progress, throttling the reports rate. |
| 181 var prevProgress int64 = 1000 // >100% |
| 182 var prevReportTs time.Time |
| 183 reportProgress := func(read int64, total int64) { |
| 184 now := s.client.clock.now() |
| 185 progress := read * 100 / total |
| 186 if progress < prevProgress || read == total || now.Sub(prevRepor
tTs) > downloadReportInterval { |
| 187 s.client.Log.Infof("cipd: fetching - %d%%", progress) |
| 188 prevReportTs = now |
| 189 prevProgress = progress |
| 190 } |
| 191 } |
| 192 |
| 193 // download is a separate function to be able to use deferred close. |
| 194 download := func(out io.WriteSeeker, src io.ReadCloser, totalLen int64)
error { |
| 195 defer src.Close() |
| 196 s.client.Log.Infof("cipd: about to fetch %.1f Mb", float32(total
Len)/1024.0/1024.0) |
| 197 reportProgress(0, totalLen) |
| 198 _, err := io.Copy(out, &readerWithProgress{ |
| 199 reader: src, |
| 200 callback: func(read int64) { reportProgress(read, totalL
en) }, |
| 201 }) |
| 202 if err == nil { |
| 203 s.client.Log.Infof("cipd: fetch finished successfully") |
| 204 } |
| 205 return err |
| 206 } |
| 207 |
| 208 // Download the actual data (several attempts). |
| 209 for attempt := 0; attempt < downloadMaxAttempts; attempt++ { |
| 210 // Rewind output to zero offset. |
| 211 _, err := output.Seek(0, os.SEEK_SET) |
| 212 if err != nil { |
| 213 return err |
| 214 } |
| 215 |
| 216 // Send the request. |
| 217 s.client.Log.Infof("cipd: initiating the fetch") |
| 218 var req *http.Request |
| 219 var resp *http.Response |
| 220 req, err = http.NewRequest("GET", url, nil) |
| 221 if err != nil { |
| 222 return err |
| 223 } |
| 224 req.Header.Set("User-Agent", s.client.UserAgent) |
| 225 resp, err = s.client.doAnonymousHTTPRequest(req) |
| 226 if err != nil { |
| 227 if isTemporaryNetError(err) { |
| 228 s.client.Log.Warningf("cipd: transient network e
rror: %s", err) |
| 229 continue |
| 230 } |
| 231 return err |
| 232 } |
| 233 |
| 234 // Transient error, retry. |
| 235 if resp.StatusCode == 408 || resp.StatusCode >= 500 { |
| 236 resp.Body.Close() |
| 237 s.client.Log.Warningf("cipd: transient HTTP error %d whi
le fetching the file", resp.StatusCode) |
| 238 continue |
| 239 } |
| 240 |
| 241 // Fatal error, abort. |
| 242 if resp.StatusCode >= 400 { |
| 243 resp.Body.Close() |
| 244 return fmt.Errorf("Server replied with HTTP code %d", re
sp.StatusCode) |
| 245 } |
| 246 |
| 247 // Try to fetch (will close resp.Body when done). |
| 248 err = download(output, resp.Body, resp.ContentLength) |
| 249 if err != nil { |
| 250 s.client.Log.Warningf("cipd: transient error fetching th
e file: %s", err) |
| 251 continue |
| 252 } |
| 253 |
| 254 // Success. |
| 255 return nil |
| 256 } |
| 257 |
| 258 return ErrDownloadError |
| 259 } |
| 260 |
| 261 // readerWithProgress is io.Reader that calls callback whenever something is |
| 262 // read from it. |
| 263 type readerWithProgress struct { |
| 264 reader io.Reader |
| 265 total int64 |
| 266 callback func(total int64) |
| 267 } |
| 268 |
| 269 func (r *readerWithProgress) Read(p []byte) (int, error) { |
| 270 n, err := r.reader.Read(p) |
| 271 r.total += int64(n) |
| 272 r.callback(r.total) |
| 273 return n, err |
| 274 } |
OLD | NEW |