OLD | NEW |
| (Empty) |
1 // Copyright 2014 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 package cipd | |
6 | |
7 import ( | |
8 "errors" | |
9 "fmt" | |
10 "io" | |
11 "net/http" | |
12 "os" | |
13 "time" | |
14 | |
15 "infra/libs/logging" | |
16 ) | |
17 | |
18 var ( | |
19 // ErrFinalizationTimeout is returned if CAS service can not finalize up
load fast enough. | |
20 ErrFinalizationTimeout = errors.New("Timeout while waiting for CAS servi
ce to finalize the upload") | |
21 // ErrAttachTagsTimeout is returned when service refuses to accept tags
for a long time. | |
22 ErrAttachTagsTimeout = errors.New("Timeout while attaching tags") | |
23 ) | |
24 | |
25 // UploadOptions contains upload related parameters shared by UploadToCAS and | |
26 // RegisterInstance functions. | |
27 type UploadOptions struct { | |
28 // ServiceURL is root URL of the backend service, or "" to use default s
ervice. | |
29 ServiceURL string | |
30 // FinalizationTimeout is how long to wait for CAS service to finalize t
he upload, default is 1 min. | |
31 FinalizationTimeout time.Duration | |
32 // Client is http.Client to use to make requests, default is http.Defaul
tClient. | |
33 Client *http.Client | |
34 // Log is a logger to use for logs, default is logging.DefaultLogger. | |
35 Log logging.Logger | |
36 } | |
37 | |
38 // UploadToCASOptions contains parameters for UploadToCAS function. | |
39 type UploadToCASOptions struct { | |
40 UploadOptions | |
41 | |
42 // SHA1 is a SHA1 hash of data to upload, usually package's InstanceID()
. | |
43 SHA1 string | |
44 // Data provides actual data to upload. It is seekable to support resuma
ble uploads. | |
45 Data io.ReadSeeker | |
46 // UploadSessionID identified existing upload session. Empty string to s
tart a new one. | |
47 UploadSessionID string | |
48 // UploadURL is where to upload the file to. Must be set if UploadSessio
nID is not empty. | |
49 UploadURL string | |
50 | |
51 // remote is an instance of remoteService to use (if set). | |
52 remote *remoteService | |
53 } | |
54 | |
55 // UploadToCAS uploads package data blob to Content Addressed Store if it is not | |
56 // there already. The data is addressed by SHA1 hash (also known as package's | |
57 // InstanceID). It can be used as a standalone function (if UploadSessionID | |
58 // is "") or as a part of more high level upload process (in that case upload | |
59 // session can be opened elsewhere and its properties passed here via | |
60 // UploadSessionID and UploadURL). Returns nil on successful upload. | |
61 func UploadToCAS(options UploadToCASOptions) error { | |
62 // Fill in default options. | |
63 if options.ServiceURL == "" { | |
64 options.ServiceURL = DefaultServiceURL() | |
65 } | |
66 if options.FinalizationTimeout == 0 { | |
67 options.FinalizationTimeout = 60 * time.Second | |
68 } | |
69 if options.Client == nil { | |
70 options.Client = http.DefaultClient | |
71 } | |
72 if options.Log == nil { | |
73 options.Log = logging.DefaultLogger | |
74 } | |
75 if options.remote == nil { | |
76 options.remote = newRemoteService(options.Client, options.Servic
eURL, options.Log) | |
77 } | |
78 log := options.Log | |
79 remote := options.remote | |
80 | |
81 // Open new upload session if existing is not provided. | |
82 var session *uploadSession | |
83 var err error | |
84 if options.UploadSessionID == "" { | |
85 log.Infof("cipd: uploading %s: initiating", options.SHA1) | |
86 session, err = remote.initiateUpload(options.SHA1) | |
87 if err != nil { | |
88 log.Warningf("cipd: can't upload %s - %s", options.SHA1,
err) | |
89 return err | |
90 } | |
91 if session == nil { | |
92 log.Infof("cipd: %s is already uploaded", options.SHA1) | |
93 return nil | |
94 } | |
95 } else { | |
96 if options.UploadURL == "" { | |
97 return errors.New("UploadURL must be set if UploadSessio
nID is used") | |
98 } | |
99 session = &uploadSession{ | |
100 ID: options.UploadSessionID, | |
101 URL: options.UploadURL, | |
102 } | |
103 } | |
104 | |
105 // Upload the file to CAS storage. | |
106 err = resumableUpload(session.URL, 8*1024*1024, options) | |
107 if err != nil { | |
108 return err | |
109 } | |
110 | |
111 // Finalize the upload, wait until server verifies and publishes the fil
e. | |
112 started := clock.Now() | |
113 delay := time.Second | |
114 for { | |
115 published, err := remote.finalizeUpload(session.ID) | |
116 if published { | |
117 log.Infof("cipd: successfully uploaded %s", options.SHA1
) | |
118 return nil | |
119 } | |
120 if err != nil { | |
121 log.Warningf("cipd: upload of %s failed: %s", options.SH
A1, err) | |
122 return err | |
123 } | |
124 if clock.Now().Sub(started) > options.FinalizationTimeout { | |
125 log.Warningf("cipd: upload of %s failed: timeout", optio
ns.SHA1) | |
126 return ErrFinalizationTimeout | |
127 } | |
128 log.Infof("cipd: uploading %s: verifying", options.SHA1) | |
129 clock.Sleep(delay) | |
130 if delay < 4*time.Second { | |
131 delay += 500 * time.Millisecond | |
132 } | |
133 } | |
134 } | |
135 | |
136 // RegisterInstanceOptions contains parameters for RegisterInstance function. | |
137 type RegisterInstanceOptions struct { | |
138 UploadOptions | |
139 | |
140 // PackageInstance is a package instance to register. | |
141 PackageInstance PackageInstance | |
142 // Tags is a list of tags to attach to an instance. Will be attached eve
n if | |
143 // the instance already existed before. | |
144 Tags []string | |
145 } | |
146 | |
147 // RegisterInstance makes the package instance available for clients by | |
148 // uploading it to the storage and registering it in the package repository. | |
149 func RegisterInstance(options RegisterInstanceOptions) error { | |
150 // Fill in default options. | |
151 if options.ServiceURL == "" { | |
152 options.ServiceURL = DefaultServiceURL() | |
153 } | |
154 if options.Client == nil { | |
155 options.Client = http.DefaultClient | |
156 } | |
157 if options.Log == nil { | |
158 options.Log = logging.DefaultLogger | |
159 } | |
160 log := options.Log | |
161 inst := options.PackageInstance | |
162 remote := newRemoteService(options.Client, options.ServiceURL, log) | |
163 | |
164 // Attempt to register. | |
165 result, err := remote.registerInstance(inst.PackageName(), inst.Instance
ID()) | |
166 if err != nil { | |
167 return err | |
168 } | |
169 | |
170 // Asked to upload the package file to CAS first? | |
171 if result.UploadSession != nil { | |
172 err = UploadToCAS(UploadToCASOptions{ | |
173 UploadOptions: options.UploadOptions, | |
174 SHA1: inst.InstanceID(), | |
175 Data: inst.DataReader(), | |
176 UploadSessionID: result.UploadSession.ID, | |
177 UploadURL: result.UploadSession.URL, | |
178 remote: remote, | |
179 }) | |
180 if err != nil { | |
181 return err | |
182 } | |
183 // Try again, now that file is uploaded. | |
184 result, err = remote.registerInstance(inst.PackageName(), inst.I
nstanceID()) | |
185 if err != nil { | |
186 return err | |
187 } | |
188 if result.UploadSession != nil { | |
189 return errors.New("Package file is uploaded, but servers
asks us to upload it again") | |
190 } | |
191 } | |
192 | |
193 if result.AlreadyRegistered { | |
194 log.Infof( | |
195 "cipd: instance %s:%s is already registered by %s on %s"
, | |
196 inst.PackageName(), inst.InstanceID(), | |
197 result.Info.RegisteredBy, result.Info.RegisteredTs) | |
198 } else { | |
199 log.Infof("cipd: instance %s:%s was successfully registered", in
st.PackageName(), inst.InstanceID()) | |
200 } | |
201 | |
202 return attachTagsWhenReady(remote, inst.PackageName(), inst.InstanceID()
, options.Tags, log) | |
203 } | |
204 | |
205 //////////////////////////////////////////////////////////////////////////////// | |
206 // Google Storage resumable upload protocol. | |
207 // See https://cloud.google.com/storage/docs/concepts-techniques#resumable | |
208 | |
209 // errTransientError is returned by getNextOffset in case of retryable error. | |
210 var errTransientError = errors.New("Transient error in getUploadedOffset") | |
211 | |
212 // resumableUpload is mocked in tests. | |
213 var resumableUpload = func(uploadURL string, chunkSize int64, opts UploadToCASOp
tions) error { | |
214 // Grab the total length of the file. | |
215 length, err := opts.Data.Seek(0, os.SEEK_END) | |
216 if err != nil { | |
217 return err | |
218 } | |
219 _, err = opts.Data.Seek(0, os.SEEK_SET) | |
220 if err != nil { | |
221 return err | |
222 } | |
223 | |
224 // Called when some new data is uploaded. | |
225 reportProgress := func(offset int64) { | |
226 if length != 0 { | |
227 opts.Log.Infof("cipd: uploading %s: %d%%", opts.SHA1, of
fset*100/length) | |
228 } | |
229 } | |
230 | |
231 // Called when transient error happens. | |
232 reportTransientError := func() { | |
233 opts.Log.Warningf("cipd: transient upload error, retrying...") | |
234 clock.Sleep(2 * time.Second) | |
235 } | |
236 | |
237 var offset int64 | |
238 reportProgress(0) | |
239 for { | |
240 // Grab latest uploaded offset if not known. | |
241 if offset == -1 { | |
242 offset, err = getNextOffset(uploadURL, length, opts.Clie
nt) | |
243 if err == errTransientError { | |
244 offset = -1 | |
245 reportTransientError() | |
246 continue | |
247 } | |
248 if err != nil { | |
249 return err | |
250 } | |
251 reportProgress(offset) | |
252 if offset == length { | |
253 return nil | |
254 } | |
255 opts.Log.Warningf("cipd: resuming upload from offset %d"
, offset) | |
256 } | |
257 | |
258 // Length of a chunk to upload. | |
259 var chunk int64 = chunkSize | |
260 if chunk > length-offset { | |
261 chunk = length - offset | |
262 } | |
263 | |
264 // Upload the chunk. | |
265 opts.Data.Seek(offset, os.SEEK_SET) | |
266 r, err := http.NewRequest("PUT", uploadURL, io.LimitReader(opts.
Data, chunk)) | |
267 if err != nil { | |
268 return err | |
269 } | |
270 rangeHeader := fmt.Sprintf("bytes %d-%d/%d", offset, offset+chun
k-1, length) | |
271 r.Header.Set("Content-Range", rangeHeader) | |
272 r.Header.Set("Content-Length", fmt.Sprintf("%d", chunk)) | |
273 r.Header.Set("User-Agent", userAgent()) | |
274 resp, err := opts.Client.Do(r) | |
275 if err != nil { | |
276 return err | |
277 } | |
278 resp.Body.Close() | |
279 | |
280 // Partially or fully uploaded. | |
281 if resp.StatusCode == 308 || resp.StatusCode == 200 { | |
282 offset += chunk | |
283 reportProgress(offset) | |
284 if offset == length { | |
285 return nil | |
286 } | |
287 } else if resp.StatusCode < 500 && resp.StatusCode != 408 { | |
288 return fmt.Errorf("Unexpected response during file uploa
d: HTTP %d", resp.StatusCode) | |
289 } else { | |
290 // Transient error. Need to query for latest uploaded of
fset to resume. | |
291 offset = -1 | |
292 reportTransientError() | |
293 } | |
294 } | |
295 } | |
296 | |
297 // getNextOffset queries the storage for size of persisted data. | |
298 func getNextOffset(uploadURL string, length int64, client *http.Client) (offset
int64, err error) { | |
299 r, err := http.NewRequest("PUT", uploadURL, nil) | |
300 if err != nil { | |
301 return | |
302 } | |
303 r.Header.Set("Content-Range", fmt.Sprintf("bytes */%d", length)) | |
304 r.Header.Set("Content-Length", "0") | |
305 r.Header.Set("User-Agent", userAgent()) | |
306 resp, err := client.Do(r) | |
307 if err != nil { | |
308 return | |
309 } | |
310 resp.Body.Close() | |
311 | |
312 if resp.StatusCode == 200 { | |
313 // Fully uploaded. | |
314 offset = length | |
315 } else if resp.StatusCode == 308 { | |
316 // Partially uploaded, extract last uploaded offset from Range h
eader. | |
317 rangeHeader := resp.Header.Get("Range") | |
318 if rangeHeader != "" { | |
319 _, err = fmt.Sscanf(rangeHeader, "bytes=0-%d", &offset) | |
320 if err == nil { | |
321 // |offset| is an *offset* of a last uploaded by
te, not the data length. | |
322 offset++ | |
323 } | |
324 } | |
325 } else if resp.StatusCode < 500 && resp.StatusCode != 408 { | |
326 err = fmt.Errorf("Unexpected response (HTTP %d) when querying fo
r uploaded offset", resp.StatusCode) | |
327 } else { | |
328 err = errTransientError | |
329 } | |
330 return | |
331 } | |
332 | |
333 //////////////////////////////////////////////////////////////////////////////// | |
334 // Tags related functions. | |
335 | |
336 // attachTagsWhenReady attaches tags to an instance retrying when receiving | |
337 // PROCESSING_NOT_FINISHED_YET errors. | |
338 func attachTagsWhenReady(remote *remoteService, packageName, instanceID string,
tags []string, log logging.Logger) error { | |
339 if len(tags) == 0 { | |
340 return nil | |
341 } | |
342 for _, tag := range tags { | |
343 log.Infof("cipd: attaching tag %s", tag) | |
344 } | |
345 deadline := clock.Now().Add(60 * time.Second) | |
346 for clock.Now().Before(deadline) { | |
347 err := remote.attachTags(packageName, instanceID, tags) | |
348 if err == nil { | |
349 log.Infof("cipd: all tags attached") | |
350 return nil | |
351 } | |
352 if _, ok := err.(*pendingProcessingError); ok { | |
353 log.Warningf("cipd: package instance is not ready yet -
%s", err) | |
354 clock.Sleep(5 * time.Second) | |
355 } else { | |
356 log.Errorf("cipd: failed to attach tags - %s", err) | |
357 return err | |
358 } | |
359 } | |
360 log.Errorf("cipd: failed to attach tags - deadline exceeded") | |
361 return ErrAttachTagsTimeout | |
362 } | |
OLD | NEW |