Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(21)

Side by Side Diff: client/archiver/archiver.go

Issue 1846263002: Isolate: Use generators instead of seekers (Closed) Base URL: https://github.com/luci/luci-go@master
Patch Set: Tweaks from comments. Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package archiver 5 package archiver
6 6
7 import ( 7 import (
8 "errors" 8 "errors"
9 "fmt" 9 "fmt"
10 "io" 10 "io"
(...skipping 57 matching lines...) Expand 10 before | Expand all | Expand 10 after
68 Error() error 68 Error() error
69 // Digest returns the calculated digest once calculated, empty otherwise . 69 // Digest returns the calculated digest once calculated, empty otherwise .
70 Digest() isolated.HexDigest 70 Digest() isolated.HexDigest
71 } 71 }
72 72
73 // Archiver is an high level interface to an isolatedclient.IsolateServer. 73 // Archiver is an high level interface to an isolatedclient.IsolateServer.
74 type Archiver interface { 74 type Archiver interface {
75 common.Canceler 75 common.Canceler
76 // Push schedules item upload to the isolate server. 76 // Push schedules item upload to the isolate server.
77 // Smaller priority value means earlier processing. 77 // Smaller priority value means earlier processing.
78 » Push(displayName string, src io.ReadSeeker, priority int64) Future 78 » Push(displayName string, src isolatedclient.Source, priority int64) Futu re
79 // PushFile schedules file upload to the isolate server. 79 // PushFile schedules file upload to the isolate server.
80 // Smaller priority value means earlier processing. 80 // Smaller priority value means earlier processing.
81 PushFile(displayName, path string, priority int64) Future 81 PushFile(displayName, path string, priority int64) Future
82 Stats() *Stats 82 Stats() *Stats
83 } 83 }
84 84
85 // UploadStat is the statistic for a single upload. 85 // UploadStat is the statistic for a single upload.
86 type UploadStat struct { 86 type UploadStat struct {
87 Duration time.Duration 87 Duration time.Duration
88 Size units.Size 88 Size units.Size
(...skipping 94 matching lines...) Expand 10 before | Expand all | Expand 10 after
183 } 183 }
184 184
185 // Private details. 185 // Private details.
186 186
187 // archiverItem is an item to process. Implements Future. 187 // archiverItem is an item to process. Implements Future.
188 // 188 //
189 // It is caried over from pipeline stage to stage to do processing on it. 189 // It is caried over from pipeline stage to stage to do processing on it.
190 type archiverItem struct { 190 type archiverItem struct {
191 // Immutable. 191 // Immutable.
192 displayName string // Name to use to qualify this item 192 displayName string // Name to use to qualify this item
193 path string // Set when source is a file on disk
194 wgHashed sync.WaitGroup // Released once .digestItem.Digest is set 193 wgHashed sync.WaitGroup // Released once .digestItem.Digest is set
195 priority int64 // Lower values - earlier hashing and uploadi ng. 194 priority int64 // Lower values - earlier hashing and uploadi ng.
195 path string // Set when the source is a file on disk.
196 a *archiver 196 a *archiver
197 197
198 // Mutable. 198 // Mutable.
199 lock sync.Mutex 199 lock sync.Mutex
200 err error // Item specific err or 200 err error // Item specific err or
201 digestItem isolateservice.HandlersEndpointsV1Digest // Mutated by hashLo op(), used by doContains() 201 digestItem isolateservice.HandlersEndpointsV1Digest // Mutated by hashLo op(), used by doContains()
202 linked []*archiverItem // Deduplicated item . 202 linked []*archiverItem // Deduplicated item .
203 203
204 // Mutable but not accessible externally. 204 // Mutable but not accessible externally.
205 » src io.ReadSeeker // Source of data 205 » source isolatedclient.Source // Source of data
206 » state *isolatedclient.PushState // Server-side push state for cache miss 206 » state *isolatedclient.PushState // Server-side push state for cache mis s
207 } 207 }
208 208
209 func newArchiverItem(a *archiver, displayName, path string, src io.ReadSeeker, p riority int64) *archiverItem { 209 func newArchiverItem(a *archiver, displayName, path string, source isolatedclien t.Source, priority int64) *archiverItem {
210 tracer.CounterAdd(a, "itemsProcessing", 1) 210 tracer.CounterAdd(a, "itemsProcessing", 1)
211 » i := &archiverItem{a: a, displayName: displayName, path: path, src: src, priority: priority} 211 » i := &archiverItem{a: a, displayName: displayName, path: path, source: s ource, priority: priority}
212 i.wgHashed.Add(1) 212 i.wgHashed.Add(1)
213 return i 213 return i
214 } 214 }
215 215
216 func (i *archiverItem) Close() error { 216 func (i *archiverItem) Close() error {
217 tracer.CounterAdd(i.a, "itemsProcessing", -1) 217 tracer.CounterAdd(i.a, "itemsProcessing", -1)
218 i.a = nil 218 i.a = nil
219 return nil 219 return nil
220 } 220 }
221 221
(...skipping 10 matching lines...) Expand all
232 defer i.lock.Unlock() 232 defer i.lock.Unlock()
233 return i.err 233 return i.err
234 } 234 }
235 235
236 func (i *archiverItem) Digest() isolated.HexDigest { 236 func (i *archiverItem) Digest() isolated.HexDigest {
237 i.lock.Lock() 237 i.lock.Lock()
238 defer i.lock.Unlock() 238 defer i.lock.Unlock()
239 return isolated.HexDigest(i.digestItem.Digest) 239 return isolated.HexDigest(i.digestItem.Digest)
240 } 240 }
241 241
242 func (i *archiverItem) isFile() bool {
243 return len(i.path) != 0
244 }
245
242 func (i *archiverItem) setErr(err error) { 246 func (i *archiverItem) setErr(err error) {
243 if err == nil { 247 if err == nil {
244 panic("internal error") 248 panic("internal error")
245 } 249 }
246 i.lock.Lock() 250 i.lock.Lock()
247 defer i.lock.Unlock() 251 defer i.lock.Unlock()
248 if i.err == nil { 252 if i.err == nil {
249 i.err = err 253 i.err = err
250 for _, child := range i.linked { 254 for _, child := range i.linked {
251 child.lock.Lock() 255 child.lock.Lock()
252 child.err = err 256 child.err = err
253 child.lock.Unlock() 257 child.lock.Unlock()
254 } 258 }
255 } 259 }
256 // TODO(maruel): Support Close().
257 i.src = nil
258 } 260 }
259 261
260 func (i *archiverItem) calcDigest() error { 262 func (i *archiverItem) calcDigest() error {
261 defer i.wgHashed.Done() 263 defer i.wgHashed.Done()
262 var d isolateservice.HandlersEndpointsV1Digest 264 var d isolateservice.HandlersEndpointsV1Digest
263 » if i.path != "" { 265
264 » » // Open and hash the file. 266 » src, err := i.source()
265 » » var err error 267 » if err != nil {
266 » » if d, err = isolated.HashFile(i.path); err != nil { 268 » » return fmt.Errorf("source(%s) failed: %s\n", i.DisplayName(), er r)
267 » » » i.setErr(err)
268 » » » return fmt.Errorf("hash(%s) failed: %s\n", i.DisplayName (), err)
269 » » }
270 » } else {
271 » » // Use src instead.
272 » » h := isolated.GetHash()
273 » » size, err := io.Copy(h, i.src)
274 » » if err != nil {
275 » » » i.setErr(err)
276 » » » return fmt.Errorf("read(%s) failed: %s\n", i.DisplayName (), err)
277 » » }
278 » » if pos, err := i.src.Seek(0, os.SEEK_SET); err != nil || pos != 0 {
279 » » » err = fmt.Errorf("seek(%s) failed: %s\n", i.DisplayName( ), err)
280 » » » i.setErr(err)
281 » » » return err
282 » » }
283 » » d = isolateservice.HandlersEndpointsV1Digest{Digest: string(isol ated.Sum(h)), IsIsolated: true, Size: size}
284 } 269 }
270 defer src.Close()
271
272 h := isolated.GetHash()
273 size, err := io.Copy(h, src)
274 if err != nil {
275 i.setErr(err)
276 return fmt.Errorf("read(%s) failed: %s\n", i.DisplayName(), err)
277 }
278 d = isolateservice.HandlersEndpointsV1Digest{Digest: string(isolated.Sum (h)), IsIsolated: true, Size: size}
279
285 i.lock.Lock() 280 i.lock.Lock()
286 defer i.lock.Unlock() 281 defer i.lock.Unlock()
287 i.digestItem = d 282 i.digestItem = d
288 283
289 for _, child := range i.linked { 284 for _, child := range i.linked {
290 child.lock.Lock() 285 child.lock.Lock()
291 child.digestItem = d 286 child.digestItem = d
292 child.lock.Unlock() 287 child.lock.Unlock()
293 child.wgHashed.Done() 288 child.wgHashed.Done()
294 } 289 }
295 return nil 290 return nil
296 } 291 }
297 292
298 func (i *archiverItem) link(child *archiverItem) { 293 func (i *archiverItem) link(child *archiverItem) {
299 child.lock.Lock() 294 child.lock.Lock()
300 defer child.lock.Unlock() 295 defer child.lock.Unlock()
301 » if child.src != nil || child.state != nil || child.linked != nil || chil d.err != nil { 296 » if !child.isFile() || child.state != nil || child.linked != nil || child .err != nil {
302 panic("internal error") 297 panic("internal error")
303 } 298 }
304 i.lock.Lock() 299 i.lock.Lock()
305 defer i.lock.Unlock() 300 defer i.lock.Unlock()
306 i.linked = append(i.linked, child) 301 i.linked = append(i.linked, child)
307 if i.digestItem.Digest != "" { 302 if i.digestItem.Digest != "" {
308 child.digestItem = i.digestItem 303 child.digestItem = i.digestItem
309 child.err = i.err 304 child.err = i.err
310 child.wgHashed.Done() 305 child.wgHashed.Done()
311 } else if i.err != nil { 306 } else if i.err != nil {
(...skipping 61 matching lines...) Expand 10 before | Expand all | Expand 10 after
373 } 368 }
374 369
375 func (a *archiver) CancelationReason() error { 370 func (a *archiver) CancelationReason() error {
376 return a.canceler.CancelationReason() 371 return a.canceler.CancelationReason()
377 } 372 }
378 373
379 func (a *archiver) Channel() <-chan error { 374 func (a *archiver) Channel() <-chan error {
380 return a.canceler.Channel() 375 return a.canceler.Channel()
381 } 376 }
382 377
383 func (a *archiver) Push(displayName string, src io.ReadSeeker, priority int64) F uture { 378 func (a *archiver) Push(displayName string, source isolatedclient.Source, priori ty int64) Future {
384 » i := newArchiverItem(a, displayName, "", src, priority) 379 » return a.push(newArchiverItem(a, displayName, "", source, priority))
385 » if pos, err := i.src.Seek(0, os.SEEK_SET); pos != 0 || err != nil {
386 » » i.setErr(fmt.Errorf("seek(%s) failed: %s\n", i.DisplayName(), er r))
387 » » i.wgHashed.Done()
388 » » return i
389 » }
390 » return a.push(i)
391 } 380 }
392 381
393 func (a *archiver) PushFile(displayName, path string, priority int64) Future { 382 func (a *archiver) PushFile(displayName, path string, priority int64) Future {
394 » return a.push(newArchiverItem(a, displayName, path, nil, priority)) 383 » source := func() (io.ReadCloser, error) {
384 » » return os.Open(path)
385 » }
386 » return a.push(newArchiverItem(a, displayName, path, source, priority))
395 } 387 }
396 388
397 func (a *archiver) Stats() *Stats { 389 func (a *archiver) Stats() *Stats {
398 a.statsLock.Lock() 390 a.statsLock.Lock()
399 defer a.statsLock.Unlock() 391 defer a.statsLock.Unlock()
400 return a.stats.deepCopy() 392 return a.stats.deepCopy()
401 } 393 }
402 394
403 func (a *archiver) push(item *archiverItem) Future { 395 func (a *archiver) push(item *archiverItem) Future {
404 if a.pushLocked(item) { 396 if a.pushLocked(item) {
(...skipping 52 matching lines...) Expand 10 before | Expand all | Expand 10 after
457 // This loop must never block and must be as fast as it can as i t is 449 // This loop must never block and must be as fast as it can as i t is
458 // functionally equivalent to running with a.closeLock held. 450 // functionally equivalent to running with a.closeLock held.
459 if err := a.CancelationReason(); err != nil { 451 if err := a.CancelationReason(); err != nil {
460 item.setErr(err) 452 item.setErr(err)
461 item.Close() 453 item.Close()
462 item.wgHashed.Done() 454 item.wgHashed.Done()
463 continue 455 continue
464 } 456 }
465 // TODO(maruel): Resolve symlinks for further deduplication? Dep ends on the 457 // TODO(maruel): Resolve symlinks for further deduplication? Dep ends on the
466 // use case, not sure the trade off is worth. 458 // use case, not sure the trade off is worth.
467 » » if item.path != "" { 459 » » if item.isFile() {
468 if previous, ok := seen[item.path]; ok { 460 if previous, ok := seen[item.path]; ok {
469 previous.link(item) 461 previous.link(item)
470 // TODO(maruel): Semantically weird. 462 // TODO(maruel): Semantically weird.
471 item.Close() 463 item.Close()
472 continue 464 continue
473 } 465 }
474 } 466 }
475 467
476 buildUp = append(buildUp, item) 468 buildUp = append(buildUp, item)
477 seen[item.path] = item 469 seen[item.path] = item
(...skipping 139 matching lines...) Expand 10 before | Expand all | Expand 10 after
617 a.progress.Update(groupUpload, groupUploadTodo, 1) 609 a.progress.Update(groupUpload, groupUploadTodo, 1)
618 a.progress.Update(groupUpload, groupUploadTodoSize, item s[index].digestItem.Size) 610 a.progress.Update(groupUpload, groupUploadTodoSize, item s[index].digestItem.Size)
619 a.stage4UploadChan <- items[index] 611 a.stage4UploadChan <- items[index]
620 } 612 }
621 } 613 }
622 log.Printf("Looked up %d items\n", len(items)) 614 log.Printf("Looked up %d items\n", len(items))
623 } 615 }
624 616
625 // doUpload is called by stage 4. 617 // doUpload is called by stage 4.
626 func (a *archiver) doUpload(item *archiverItem) { 618 func (a *archiver) doUpload(item *archiverItem) {
627 var src io.ReadSeeker
628 if item.src == nil {
629 f, err := os.Open(item.path)
630 if err != nil {
631 a.Cancel(err)
632 item.setErr(err)
633 item.Close()
634 return
635 }
636 defer f.Close()
637 src = f
638 } else {
639 src = item.src
640 item.src = nil
641 }
642 start := time.Now() 619 start := time.Now()
643 » if err := a.is.Push(item.state, src); err != nil { 620 » if err := a.is.Push(item.state, item.source); err != nil {
644 err = fmt.Errorf("push(%s) failed: %s\n", item.path, err) 621 err = fmt.Errorf("push(%s) failed: %s\n", item.path, err)
645 a.Cancel(err) 622 a.Cancel(err)
646 item.setErr(err) 623 item.setErr(err)
647 } else { 624 } else {
648 a.progress.Update(groupUpload, groupUploadDone, 1) 625 a.progress.Update(groupUpload, groupUploadDone, 1)
649 a.progress.Update(groupUpload, groupUploadDoneSize, item.digestI tem.Size) 626 a.progress.Update(groupUpload, groupUploadDoneSize, item.digestI tem.Size)
650 } 627 }
651 item.Close() 628 item.Close()
652 size := units.Size(item.digestItem.Size) 629 size := units.Size(item.digestItem.Size)
653 u := &UploadStat{time.Since(start), size, item.DisplayName()} 630 u := &UploadStat{time.Since(start), size, item.DisplayName()}
654 a.statsLock.Lock() 631 a.statsLock.Lock()
655 a.stats.Pushed = append(a.stats.Pushed, u) 632 a.stats.Pushed = append(a.stats.Pushed, u)
656 a.statsLock.Unlock() 633 a.statsLock.Unlock()
657 log.Printf("Uploaded %7s: %s\n", size, item.DisplayName()) 634 log.Printf("Uploaded %7s: %s\n", size, item.DisplayName())
658 } 635 }
OLDNEW
« no previous file with comments | « no previous file | client/archiver/archiver_test.go » ('j') | client/isolatedclient/isolatedclient.go » ('J')

Powered by Google App Engine
This is Rietveld 408576698