| OLD | NEW |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package archiver | 5 package archiver |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "errors" | 8 "errors" |
| 9 "fmt" | 9 "fmt" |
| 10 "io" | 10 "io" |
| (...skipping 57 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 68 Error() error | 68 Error() error |
| 69 // Digest returns the calculated digest once calculated, empty otherwise
. | 69 // Digest returns the calculated digest once calculated, empty otherwise
. |
| 70 Digest() isolated.HexDigest | 70 Digest() isolated.HexDigest |
| 71 } | 71 } |
| 72 | 72 |
| 73 // Archiver is an high level interface to an isolatedclient.IsolateServer. | 73 // Archiver is an high level interface to an isolatedclient.IsolateServer. |
| 74 type Archiver interface { | 74 type Archiver interface { |
| 75 common.Canceler | 75 common.Canceler |
| 76 // Push schedules item upload to the isolate server. | 76 // Push schedules item upload to the isolate server. |
| 77 // Smaller priority value means earlier processing. | 77 // Smaller priority value means earlier processing. |
| 78 » Push(displayName string, src io.ReadSeeker, priority int64) Future | 78 » Push(displayName string, src isolatedclient.Source, priority int64) Futu
re |
| 79 // PushFile schedules file upload to the isolate server. | 79 // PushFile schedules file upload to the isolate server. |
| 80 // Smaller priority value means earlier processing. | 80 // Smaller priority value means earlier processing. |
| 81 PushFile(displayName, path string, priority int64) Future | 81 PushFile(displayName, path string, priority int64) Future |
| 82 Stats() *Stats | 82 Stats() *Stats |
| 83 } | 83 } |
| 84 | 84 |
| 85 // UploadStat is the statistic for a single upload. | 85 // UploadStat is the statistic for a single upload. |
| 86 type UploadStat struct { | 86 type UploadStat struct { |
| 87 Duration time.Duration | 87 Duration time.Duration |
| 88 Size units.Size | 88 Size units.Size |
| (...skipping 94 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 183 } | 183 } |
| 184 | 184 |
| 185 // Private details. | 185 // Private details. |
| 186 | 186 |
| 187 // archiverItem is an item to process. Implements Future. | 187 // archiverItem is an item to process. Implements Future. |
| 188 // | 188 // |
| 189 // It is caried over from pipeline stage to stage to do processing on it. | 189 // It is caried over from pipeline stage to stage to do processing on it. |
| 190 type archiverItem struct { | 190 type archiverItem struct { |
| 191 // Immutable. | 191 // Immutable. |
| 192 displayName string // Name to use to qualify this item | 192 displayName string // Name to use to qualify this item |
| 193 path string // Set when source is a file on disk | |
| 194 wgHashed sync.WaitGroup // Released once .digestItem.Digest is set | 193 wgHashed sync.WaitGroup // Released once .digestItem.Digest is set |
| 195 priority int64 // Lower values - earlier hashing and uploadi
ng. | 194 priority int64 // Lower values - earlier hashing and uploadi
ng. |
| 195 path string // Set when the source is a file on disk. |
| 196 a *archiver | 196 a *archiver |
| 197 | 197 |
| 198 // Mutable. | 198 // Mutable. |
| 199 lock sync.Mutex | 199 lock sync.Mutex |
| 200 err error // Item specific err
or | 200 err error // Item specific err
or |
| 201 digestItem isolateservice.HandlersEndpointsV1Digest // Mutated by hashLo
op(), used by doContains() | 201 digestItem isolateservice.HandlersEndpointsV1Digest // Mutated by hashLo
op(), used by doContains() |
| 202 linked []*archiverItem // Deduplicated item
. | 202 linked []*archiverItem // Deduplicated item
. |
| 203 | 203 |
| 204 // Mutable but not accessible externally. | 204 // Mutable but not accessible externally. |
| 205 » src io.ReadSeeker // Source of data | 205 » source isolatedclient.Source // Source of data |
| 206 » state *isolatedclient.PushState // Server-side push state for cache miss | 206 » state *isolatedclient.PushState // Server-side push state for cache mis
s |
| 207 } | 207 } |
| 208 | 208 |
| 209 func newArchiverItem(a *archiver, displayName, path string, src io.ReadSeeker, p
riority int64) *archiverItem { | 209 func newArchiverItem(a *archiver, displayName, path string, source isolatedclien
t.Source, priority int64) *archiverItem { |
| 210 tracer.CounterAdd(a, "itemsProcessing", 1) | 210 tracer.CounterAdd(a, "itemsProcessing", 1) |
| 211 » i := &archiverItem{a: a, displayName: displayName, path: path, src: src,
priority: priority} | 211 » i := &archiverItem{a: a, displayName: displayName, path: path, source: s
ource, priority: priority} |
| 212 i.wgHashed.Add(1) | 212 i.wgHashed.Add(1) |
| 213 return i | 213 return i |
| 214 } | 214 } |
| 215 | 215 |
| 216 func (i *archiverItem) Close() error { | 216 func (i *archiverItem) Close() error { |
| 217 tracer.CounterAdd(i.a, "itemsProcessing", -1) | 217 tracer.CounterAdd(i.a, "itemsProcessing", -1) |
| 218 i.a = nil | 218 i.a = nil |
| 219 return nil | 219 return nil |
| 220 } | 220 } |
| 221 | 221 |
| (...skipping 10 matching lines...) Expand all Loading... |
| 232 defer i.lock.Unlock() | 232 defer i.lock.Unlock() |
| 233 return i.err | 233 return i.err |
| 234 } | 234 } |
| 235 | 235 |
| 236 func (i *archiverItem) Digest() isolated.HexDigest { | 236 func (i *archiverItem) Digest() isolated.HexDigest { |
| 237 i.lock.Lock() | 237 i.lock.Lock() |
| 238 defer i.lock.Unlock() | 238 defer i.lock.Unlock() |
| 239 return isolated.HexDigest(i.digestItem.Digest) | 239 return isolated.HexDigest(i.digestItem.Digest) |
| 240 } | 240 } |
| 241 | 241 |
| 242 func (i *archiverItem) isFile() bool { |
| 243 return len(i.path) != 0 |
| 244 } |
| 245 |
| 242 func (i *archiverItem) setErr(err error) { | 246 func (i *archiverItem) setErr(err error) { |
| 243 if err == nil { | 247 if err == nil { |
| 244 panic("internal error") | 248 panic("internal error") |
| 245 } | 249 } |
| 246 i.lock.Lock() | 250 i.lock.Lock() |
| 247 defer i.lock.Unlock() | 251 defer i.lock.Unlock() |
| 248 if i.err == nil { | 252 if i.err == nil { |
| 249 i.err = err | 253 i.err = err |
| 250 for _, child := range i.linked { | 254 for _, child := range i.linked { |
| 251 child.lock.Lock() | 255 child.lock.Lock() |
| 252 child.err = err | 256 child.err = err |
| 253 child.lock.Unlock() | 257 child.lock.Unlock() |
| 254 } | 258 } |
| 255 } | 259 } |
| 256 // TODO(maruel): Support Close(). | |
| 257 i.src = nil | |
| 258 } | 260 } |
| 259 | 261 |
| 260 func (i *archiverItem) calcDigest() error { | 262 func (i *archiverItem) calcDigest() error { |
| 261 defer i.wgHashed.Done() | 263 defer i.wgHashed.Done() |
| 262 var d isolateservice.HandlersEndpointsV1Digest | 264 var d isolateservice.HandlersEndpointsV1Digest |
| 263 » if i.path != "" { | 265 |
| 264 » » // Open and hash the file. | 266 » src, err := i.source() |
| 265 » » var err error | 267 » if err != nil { |
| 266 » » if d, err = isolated.HashFile(i.path); err != nil { | 268 » » return fmt.Errorf("source(%s) failed: %s\n", i.DisplayName(), er
r) |
| 267 » » » i.setErr(err) | |
| 268 » » » return fmt.Errorf("hash(%s) failed: %s\n", i.DisplayName
(), err) | |
| 269 » » } | |
| 270 » } else { | |
| 271 » » // Use src instead. | |
| 272 » » h := isolated.GetHash() | |
| 273 » » size, err := io.Copy(h, i.src) | |
| 274 » » if err != nil { | |
| 275 » » » i.setErr(err) | |
| 276 » » » return fmt.Errorf("read(%s) failed: %s\n", i.DisplayName
(), err) | |
| 277 » » } | |
| 278 » » if pos, err := i.src.Seek(0, os.SEEK_SET); err != nil || pos !=
0 { | |
| 279 » » » err = fmt.Errorf("seek(%s) failed: %s\n", i.DisplayName(
), err) | |
| 280 » » » i.setErr(err) | |
| 281 » » » return err | |
| 282 » » } | |
| 283 » » d = isolateservice.HandlersEndpointsV1Digest{Digest: string(isol
ated.Sum(h)), IsIsolated: true, Size: size} | |
| 284 } | 269 } |
| 270 defer src.Close() |
| 271 |
| 272 h := isolated.GetHash() |
| 273 size, err := io.Copy(h, src) |
| 274 if err != nil { |
| 275 i.setErr(err) |
| 276 return fmt.Errorf("read(%s) failed: %s\n", i.DisplayName(), err) |
| 277 } |
| 278 d = isolateservice.HandlersEndpointsV1Digest{Digest: string(isolated.Sum
(h)), IsIsolated: true, Size: size} |
| 279 |
| 285 i.lock.Lock() | 280 i.lock.Lock() |
| 286 defer i.lock.Unlock() | 281 defer i.lock.Unlock() |
| 287 i.digestItem = d | 282 i.digestItem = d |
| 288 | 283 |
| 289 for _, child := range i.linked { | 284 for _, child := range i.linked { |
| 290 child.lock.Lock() | 285 child.lock.Lock() |
| 291 child.digestItem = d | 286 child.digestItem = d |
| 292 child.lock.Unlock() | 287 child.lock.Unlock() |
| 293 child.wgHashed.Done() | 288 child.wgHashed.Done() |
| 294 } | 289 } |
| 295 return nil | 290 return nil |
| 296 } | 291 } |
| 297 | 292 |
| 298 func (i *archiverItem) link(child *archiverItem) { | 293 func (i *archiverItem) link(child *archiverItem) { |
| 299 child.lock.Lock() | 294 child.lock.Lock() |
| 300 defer child.lock.Unlock() | 295 defer child.lock.Unlock() |
| 301 » if child.src != nil || child.state != nil || child.linked != nil || chil
d.err != nil { | 296 » if !child.isFile() || child.state != nil || child.linked != nil || child
.err != nil { |
| 302 panic("internal error") | 297 panic("internal error") |
| 303 } | 298 } |
| 304 i.lock.Lock() | 299 i.lock.Lock() |
| 305 defer i.lock.Unlock() | 300 defer i.lock.Unlock() |
| 306 i.linked = append(i.linked, child) | 301 i.linked = append(i.linked, child) |
| 307 if i.digestItem.Digest != "" { | 302 if i.digestItem.Digest != "" { |
| 308 child.digestItem = i.digestItem | 303 child.digestItem = i.digestItem |
| 309 child.err = i.err | 304 child.err = i.err |
| 310 child.wgHashed.Done() | 305 child.wgHashed.Done() |
| 311 } else if i.err != nil { | 306 } else if i.err != nil { |
| (...skipping 61 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 373 } | 368 } |
| 374 | 369 |
| 375 func (a *archiver) CancelationReason() error { | 370 func (a *archiver) CancelationReason() error { |
| 376 return a.canceler.CancelationReason() | 371 return a.canceler.CancelationReason() |
| 377 } | 372 } |
| 378 | 373 |
| 379 func (a *archiver) Channel() <-chan error { | 374 func (a *archiver) Channel() <-chan error { |
| 380 return a.canceler.Channel() | 375 return a.canceler.Channel() |
| 381 } | 376 } |
| 382 | 377 |
| 383 func (a *archiver) Push(displayName string, src io.ReadSeeker, priority int64) F
uture { | 378 func (a *archiver) Push(displayName string, source isolatedclient.Source, priori
ty int64) Future { |
| 384 » i := newArchiverItem(a, displayName, "", src, priority) | 379 » return a.push(newArchiverItem(a, displayName, "", source, priority)) |
| 385 » if pos, err := i.src.Seek(0, os.SEEK_SET); pos != 0 || err != nil { | |
| 386 » » i.setErr(fmt.Errorf("seek(%s) failed: %s\n", i.DisplayName(), er
r)) | |
| 387 » » i.wgHashed.Done() | |
| 388 » » return i | |
| 389 » } | |
| 390 » return a.push(i) | |
| 391 } | 380 } |
| 392 | 381 |
| 393 func (a *archiver) PushFile(displayName, path string, priority int64) Future { | 382 func (a *archiver) PushFile(displayName, path string, priority int64) Future { |
| 394 » return a.push(newArchiverItem(a, displayName, path, nil, priority)) | 383 » source := func() (io.ReadCloser, error) { |
| 384 » » return os.Open(path) |
| 385 » } |
| 386 » return a.push(newArchiverItem(a, displayName, path, source, priority)) |
| 395 } | 387 } |
| 396 | 388 |
| 397 func (a *archiver) Stats() *Stats { | 389 func (a *archiver) Stats() *Stats { |
| 398 a.statsLock.Lock() | 390 a.statsLock.Lock() |
| 399 defer a.statsLock.Unlock() | 391 defer a.statsLock.Unlock() |
| 400 return a.stats.deepCopy() | 392 return a.stats.deepCopy() |
| 401 } | 393 } |
| 402 | 394 |
| 403 func (a *archiver) push(item *archiverItem) Future { | 395 func (a *archiver) push(item *archiverItem) Future { |
| 404 if a.pushLocked(item) { | 396 if a.pushLocked(item) { |
| (...skipping 52 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 457 // This loop must never block and must be as fast as it can as i
t is | 449 // This loop must never block and must be as fast as it can as i
t is |
| 458 // functionally equivalent to running with a.closeLock held. | 450 // functionally equivalent to running with a.closeLock held. |
| 459 if err := a.CancelationReason(); err != nil { | 451 if err := a.CancelationReason(); err != nil { |
| 460 item.setErr(err) | 452 item.setErr(err) |
| 461 item.Close() | 453 item.Close() |
| 462 item.wgHashed.Done() | 454 item.wgHashed.Done() |
| 463 continue | 455 continue |
| 464 } | 456 } |
| 465 // TODO(maruel): Resolve symlinks for further deduplication? Dep
ends on the | 457 // TODO(maruel): Resolve symlinks for further deduplication? Dep
ends on the |
| 466 // use case, not sure the trade off is worth. | 458 // use case, not sure the trade off is worth. |
| 467 » » if item.path != "" { | 459 » » if item.isFile() { |
| 468 if previous, ok := seen[item.path]; ok { | 460 if previous, ok := seen[item.path]; ok { |
| 469 previous.link(item) | 461 previous.link(item) |
| 470 // TODO(maruel): Semantically weird. | 462 // TODO(maruel): Semantically weird. |
| 471 item.Close() | 463 item.Close() |
| 472 continue | 464 continue |
| 473 } | 465 } |
| 474 } | 466 } |
| 475 | 467 |
| 476 buildUp = append(buildUp, item) | 468 buildUp = append(buildUp, item) |
| 477 seen[item.path] = item | 469 seen[item.path] = item |
| (...skipping 139 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 617 a.progress.Update(groupUpload, groupUploadTodo, 1) | 609 a.progress.Update(groupUpload, groupUploadTodo, 1) |
| 618 a.progress.Update(groupUpload, groupUploadTodoSize, item
s[index].digestItem.Size) | 610 a.progress.Update(groupUpload, groupUploadTodoSize, item
s[index].digestItem.Size) |
| 619 a.stage4UploadChan <- items[index] | 611 a.stage4UploadChan <- items[index] |
| 620 } | 612 } |
| 621 } | 613 } |
| 622 log.Printf("Looked up %d items\n", len(items)) | 614 log.Printf("Looked up %d items\n", len(items)) |
| 623 } | 615 } |
| 624 | 616 |
| 625 // doUpload is called by stage 4. | 617 // doUpload is called by stage 4. |
| 626 func (a *archiver) doUpload(item *archiverItem) { | 618 func (a *archiver) doUpload(item *archiverItem) { |
| 627 var src io.ReadSeeker | |
| 628 if item.src == nil { | |
| 629 f, err := os.Open(item.path) | |
| 630 if err != nil { | |
| 631 a.Cancel(err) | |
| 632 item.setErr(err) | |
| 633 item.Close() | |
| 634 return | |
| 635 } | |
| 636 defer f.Close() | |
| 637 src = f | |
| 638 } else { | |
| 639 src = item.src | |
| 640 item.src = nil | |
| 641 } | |
| 642 start := time.Now() | 619 start := time.Now() |
| 643 » if err := a.is.Push(item.state, src); err != nil { | 620 » if err := a.is.Push(item.state, item.source); err != nil { |
| 644 err = fmt.Errorf("push(%s) failed: %s\n", item.path, err) | 621 err = fmt.Errorf("push(%s) failed: %s\n", item.path, err) |
| 645 a.Cancel(err) | 622 a.Cancel(err) |
| 646 item.setErr(err) | 623 item.setErr(err) |
| 647 } else { | 624 } else { |
| 648 a.progress.Update(groupUpload, groupUploadDone, 1) | 625 a.progress.Update(groupUpload, groupUploadDone, 1) |
| 649 a.progress.Update(groupUpload, groupUploadDoneSize, item.digestI
tem.Size) | 626 a.progress.Update(groupUpload, groupUploadDoneSize, item.digestI
tem.Size) |
| 650 } | 627 } |
| 651 item.Close() | 628 item.Close() |
| 652 size := units.Size(item.digestItem.Size) | 629 size := units.Size(item.digestItem.Size) |
| 653 u := &UploadStat{time.Since(start), size, item.DisplayName()} | 630 u := &UploadStat{time.Since(start), size, item.DisplayName()} |
| 654 a.statsLock.Lock() | 631 a.statsLock.Lock() |
| 655 a.stats.Pushed = append(a.stats.Pushed, u) | 632 a.stats.Pushed = append(a.stats.Pushed, u) |
| 656 a.statsLock.Unlock() | 633 a.statsLock.Unlock() |
| 657 log.Printf("Uploaded %7s: %s\n", size, item.DisplayName()) | 634 log.Printf("Uploaded %7s: %s\n", size, item.DisplayName()) |
| 658 } | 635 } |
| OLD | NEW |