| Index: client/archiver/archiver.go
|
| diff --git a/client/archiver/archiver.go b/client/archiver/archiver.go
|
| index 360538174b50094ebec2e86d582b006cdd53cf72..2b58260bc732b976faab8f037c8f7c7a3845ff78 100644
|
| --- a/client/archiver/archiver.go
|
| +++ b/client/archiver/archiver.go
|
| @@ -75,7 +75,7 @@ type Archiver interface {
|
| common.Canceler
|
| // Push schedules item upload to the isolate server.
|
| // Smaller priority value means earlier processing.
|
| - Push(displayName string, src io.ReadSeeker, priority int64) Future
|
| + Push(displayName string, src isolatedclient.Source, priority int64) Future
|
| // PushFile schedules file upload to the isolate server.
|
| // Smaller priority value means earlier processing.
|
| PushFile(displayName, path string, priority int64) Future
|
| @@ -190,9 +190,9 @@ func New(is isolatedclient.IsolateServer, out io.Writer) Archiver {
|
| type archiverItem struct {
|
| // Immutable.
|
| displayName string // Name to use to qualify this item
|
| - path string // Set when source is a file on disk
|
| wgHashed sync.WaitGroup // Released once .digestItem.Digest is set
|
| priority int64 // Lower values - earlier hashing and uploading.
|
| + path string // Set when the source is a file on disk.
|
| a *archiver
|
|
|
| // Mutable.
|
| @@ -202,13 +202,13 @@ type archiverItem struct {
|
| linked []*archiverItem // Deduplicated item.
|
|
|
| // Mutable but not accessible externally.
|
| - src io.ReadSeeker // Source of data
|
| - state *isolatedclient.PushState // Server-side push state for cache miss
|
| + source isolatedclient.Source // Source of data
|
| + state *isolatedclient.PushState // Server-side push state for cache miss
|
| }
|
|
|
| -func newArchiverItem(a *archiver, displayName, path string, src io.ReadSeeker, priority int64) *archiverItem {
|
| +func newArchiverItem(a *archiver, displayName, path string, source isolatedclient.Source, priority int64) *archiverItem {
|
| tracer.CounterAdd(a, "itemsProcessing", 1)
|
| - i := &archiverItem{a: a, displayName: displayName, path: path, src: src, priority: priority}
|
| + i := &archiverItem{a: a, displayName: displayName, path: path, source: source, priority: priority}
|
| i.wgHashed.Add(1)
|
| return i
|
| }
|
| @@ -239,6 +239,10 @@ func (i *archiverItem) Digest() isolated.HexDigest {
|
| return isolated.HexDigest(i.digestItem.Digest)
|
| }
|
|
|
| +func (i *archiverItem) isFile() bool {
|
| + return len(i.path) != 0
|
| +}
|
| +
|
| func (i *archiverItem) setErr(err error) {
|
| if err == nil {
|
| panic("internal error")
|
| @@ -253,35 +257,26 @@ func (i *archiverItem) setErr(err error) {
|
| child.lock.Unlock()
|
| }
|
| }
|
| - // TODO(maruel): Support Close().
|
| - i.src = nil
|
| }
|
|
|
| func (i *archiverItem) calcDigest() error {
|
| defer i.wgHashed.Done()
|
| var d isolateservice.HandlersEndpointsV1Digest
|
| - if i.path != "" {
|
| - // Open and hash the file.
|
| - var err error
|
| - if d, err = isolated.HashFile(i.path); err != nil {
|
| - i.setErr(err)
|
| - return fmt.Errorf("hash(%s) failed: %s\n", i.DisplayName(), err)
|
| - }
|
| - } else {
|
| - // Use src instead.
|
| - h := isolated.GetHash()
|
| - size, err := io.Copy(h, i.src)
|
| - if err != nil {
|
| - i.setErr(err)
|
| - return fmt.Errorf("read(%s) failed: %s\n", i.DisplayName(), err)
|
| - }
|
| - if pos, err := i.src.Seek(0, os.SEEK_SET); err != nil || pos != 0 {
|
| - err = fmt.Errorf("seek(%s) failed: %s\n", i.DisplayName(), err)
|
| - i.setErr(err)
|
| - return err
|
| - }
|
| - d = isolateservice.HandlersEndpointsV1Digest{Digest: string(isolated.Sum(h)), IsIsolated: true, Size: size}
|
| +
|
| + src, err := i.source()
|
| + if err != nil {
|
| + return fmt.Errorf("source(%s) failed: %s\n", i.DisplayName(), err)
|
| }
|
| + defer src.Close()
|
| +
|
| + h := isolated.GetHash()
|
| + size, err := io.Copy(h, src)
|
| + if err != nil {
|
| + i.setErr(err)
|
| + return fmt.Errorf("read(%s) failed: %s\n", i.DisplayName(), err)
|
| + }
|
| + d = isolateservice.HandlersEndpointsV1Digest{Digest: string(isolated.Sum(h)), IsIsolated: true, Size: size}
|
| +
|
| i.lock.Lock()
|
| defer i.lock.Unlock()
|
| i.digestItem = d
|
| @@ -298,7 +293,7 @@ func (i *archiverItem) calcDigest() error {
|
| func (i *archiverItem) link(child *archiverItem) {
|
| child.lock.Lock()
|
| defer child.lock.Unlock()
|
| - if child.src != nil || child.state != nil || child.linked != nil || child.err != nil {
|
| + if !child.isFile() || child.state != nil || child.linked != nil || child.err != nil {
|
| panic("internal error")
|
| }
|
| i.lock.Lock()
|
| @@ -380,18 +375,15 @@ func (a *archiver) Channel() <-chan error {
|
| return a.canceler.Channel()
|
| }
|
|
|
| -func (a *archiver) Push(displayName string, src io.ReadSeeker, priority int64) Future {
|
| - i := newArchiverItem(a, displayName, "", src, priority)
|
| - if pos, err := i.src.Seek(0, os.SEEK_SET); pos != 0 || err != nil {
|
| - i.setErr(fmt.Errorf("seek(%s) failed: %s\n", i.DisplayName(), err))
|
| - i.wgHashed.Done()
|
| - return i
|
| - }
|
| - return a.push(i)
|
| +func (a *archiver) Push(displayName string, source isolatedclient.Source, priority int64) Future {
|
| + return a.push(newArchiverItem(a, displayName, "", source, priority))
|
| }
|
|
|
| func (a *archiver) PushFile(displayName, path string, priority int64) Future {
|
| - return a.push(newArchiverItem(a, displayName, path, nil, priority))
|
| + source := func() (io.ReadCloser, error) {
|
| + return os.Open(path)
|
| + }
|
| + return a.push(newArchiverItem(a, displayName, path, source, priority))
|
| }
|
|
|
| func (a *archiver) Stats() *Stats {
|
| @@ -464,7 +456,7 @@ func (a *archiver) stage1DedupeLoop() {
|
| }
|
| // TODO(maruel): Resolve symlinks for further deduplication? Depends on the
|
| // use case, not sure the trade off is worth.
|
| - if item.path != "" {
|
| + if item.isFile() {
|
| if previous, ok := seen[item.path]; ok {
|
| previous.link(item)
|
| // TODO(maruel): Semantically weird.
|
| @@ -624,23 +616,8 @@ func (a *archiver) doContains(items []*archiverItem) {
|
|
|
| // doUpload is called by stage 4.
|
| func (a *archiver) doUpload(item *archiverItem) {
|
| - var src io.ReadSeeker
|
| - if item.src == nil {
|
| - f, err := os.Open(item.path)
|
| - if err != nil {
|
| - a.Cancel(err)
|
| - item.setErr(err)
|
| - item.Close()
|
| - return
|
| - }
|
| - defer f.Close()
|
| - src = f
|
| - } else {
|
| - src = item.src
|
| - item.src = nil
|
| - }
|
| start := time.Now()
|
| - if err := a.is.Push(item.state, src); err != nil {
|
| + if err := a.is.Push(item.state, item.source); err != nil {
|
| err = fmt.Errorf("push(%s) failed: %s\n", item.path, err)
|
| a.Cancel(err)
|
| item.setErr(err)
|
|
|