Chromium Code Reviews| Index: client/archiver/archiver.go |
| diff --git a/client/archiver/archiver.go b/client/archiver/archiver.go |
| index 360538174b50094ebec2e86d582b006cdd53cf72..33aa9a181bab511171a041017d074eaf9d9ebfee 100644 |
| --- a/client/archiver/archiver.go |
| +++ b/client/archiver/archiver.go |
| @@ -75,7 +75,7 @@ type Archiver interface { |
| common.Canceler |
| // Push schedules item upload to the isolate server. |
| // Smaller priority value means earlier processing. |
| - Push(displayName string, src io.ReadSeeker, priority int64) Future |
| + Push(displayName string, src isolatedclient.Source, priority int64) Future |
| // PushFile schedules file upload to the isolate server. |
| // Smaller priority value means earlier processing. |
| PushFile(displayName, path string, priority int64) Future |
| @@ -190,9 +190,9 @@ func New(is isolatedclient.IsolateServer, out io.Writer) Archiver { |
| type archiverItem struct { |
| // Immutable. |
| displayName string // Name to use to qualify this item |
| - path string // Set when source is a file on disk |
| wgHashed sync.WaitGroup // Released once .digestItem.Digest is set |
| priority int64 // Lower values - earlier hashing and uploading. |
| + path string // True if this is file backed. |
|
M-A Ruel
2016/04/01 17:26:04
True?
I prefer the old doc.
dnj
2016/04/01 18:49:11
oops sorry this was a boolean for a moment
|
| a *archiver |
| // Mutable. |
| @@ -202,13 +202,13 @@ type archiverItem struct { |
| linked []*archiverItem // Deduplicated item. |
| // Mutable but not accessible externally. |
| - src io.ReadSeeker // Source of data |
| - state *isolatedclient.PushState // Server-side push state for cache miss |
| + source isolatedclient.Source // Source of data |
| + state *isolatedclient.PushState // Server-side push state for cache miss |
| } |
| -func newArchiverItem(a *archiver, displayName, path string, src io.ReadSeeker, priority int64) *archiverItem { |
| +func newArchiverItem(a *archiver, displayName, path string, source isolatedclient.Source, priority int64) *archiverItem { |
| tracer.CounterAdd(a, "itemsProcessing", 1) |
| - i := &archiverItem{a: a, displayName: displayName, path: path, src: src, priority: priority} |
| + i := &archiverItem{a: a, displayName: displayName, path: path, source: source, priority: priority} |
| i.wgHashed.Add(1) |
| return i |
| } |
| @@ -239,6 +239,10 @@ func (i *archiverItem) Digest() isolated.HexDigest { |
| return isolated.HexDigest(i.digestItem.Digest) |
| } |
| +func (i *archiverItem) isFile() bool { |
| + return i.path != "" |
|
M-A Ruel
2016/04/01 17:26:04
I learned that this is actually faster:
return len
dnj
2016/04/01 18:49:11
Interesting, sounds like a bug in Go compiler. Don
|
| +} |
| + |
| func (i *archiverItem) setErr(err error) { |
| if err == nil { |
| panic("internal error") |
| @@ -253,35 +257,26 @@ func (i *archiverItem) setErr(err error) { |
| child.lock.Unlock() |
| } |
| } |
| - // TODO(maruel): Support Close(). |
| - i.src = nil |
| } |
| func (i *archiverItem) calcDigest() error { |
| defer i.wgHashed.Done() |
| var d isolateservice.HandlersEndpointsV1Digest |
| - if i.path != "" { |
| - // Open and hash the file. |
| - var err error |
| - if d, err = isolated.HashFile(i.path); err != nil { |
| - i.setErr(err) |
| - return fmt.Errorf("hash(%s) failed: %s\n", i.DisplayName(), err) |
| - } |
| - } else { |
| - // Use src instead. |
| - h := isolated.GetHash() |
| - size, err := io.Copy(h, i.src) |
| - if err != nil { |
| - i.setErr(err) |
| - return fmt.Errorf("read(%s) failed: %s\n", i.DisplayName(), err) |
| - } |
| - if pos, err := i.src.Seek(0, os.SEEK_SET); err != nil || pos != 0 { |
| - err = fmt.Errorf("seek(%s) failed: %s\n", i.DisplayName(), err) |
| - i.setErr(err) |
| - return err |
| - } |
| - d = isolateservice.HandlersEndpointsV1Digest{Digest: string(isolated.Sum(h)), IsIsolated: true, Size: size} |
| + |
| + src, err := i.source() |
| + if err != nil { |
| + return fmt.Errorf("source(%s) failed: %s\n", i.DisplayName(), err) |
| } |
| + defer src.Close() |
| + |
| + h := isolated.GetHash() |
| + size, err := io.Copy(h, src) |
| + if err != nil { |
| + i.setErr(err) |
| + return fmt.Errorf("read(%s) failed: %s\n", i.DisplayName(), err) |
| + } |
| + d = isolateservice.HandlersEndpointsV1Digest{Digest: string(isolated.Sum(h)), IsIsolated: true, Size: size} |
| + |
| i.lock.Lock() |
| defer i.lock.Unlock() |
| i.digestItem = d |
| @@ -298,7 +293,7 @@ func (i *archiverItem) calcDigest() error { |
| func (i *archiverItem) link(child *archiverItem) { |
| child.lock.Lock() |
| defer child.lock.Unlock() |
| - if child.src != nil || child.state != nil || child.linked != nil || child.err != nil { |
| + if !child.isFile() || child.state != nil || child.linked != nil || child.err != nil { |
| panic("internal error") |
| } |
| i.lock.Lock() |
| @@ -380,18 +375,15 @@ func (a *archiver) Channel() <-chan error { |
| return a.canceler.Channel() |
| } |
| -func (a *archiver) Push(displayName string, src io.ReadSeeker, priority int64) Future { |
| - i := newArchiverItem(a, displayName, "", src, priority) |
| - if pos, err := i.src.Seek(0, os.SEEK_SET); pos != 0 || err != nil { |
| - i.setErr(fmt.Errorf("seek(%s) failed: %s\n", i.DisplayName(), err)) |
| - i.wgHashed.Done() |
| - return i |
| - } |
| - return a.push(i) |
| +func (a *archiver) Push(displayName string, source isolatedclient.Source, priority int64) Future { |
| + return a.push(newArchiverItem(a, displayName, "", source, priority)) |
| } |
| func (a *archiver) PushFile(displayName, path string, priority int64) Future { |
| - return a.push(newArchiverItem(a, displayName, path, nil, priority)) |
| + source := func() (io.ReadCloser, error) { |
| + return os.Open(path) |
| + } |
| + return a.push(newArchiverItem(a, displayName, path, source, priority)) |
| } |
| func (a *archiver) Stats() *Stats { |
| @@ -464,7 +456,7 @@ func (a *archiver) stage1DedupeLoop() { |
| } |
| // TODO(maruel): Resolve symlinks for further deduplication? Depends on the |
| // use case, not sure the trade off is worth. |
| - if item.path != "" { |
| + if item.isFile() { |
| if previous, ok := seen[item.path]; ok { |
| previous.link(item) |
| // TODO(maruel): Semantically weird. |
| @@ -624,23 +616,8 @@ func (a *archiver) doContains(items []*archiverItem) { |
| // doUpload is called by stage 4. |
| func (a *archiver) doUpload(item *archiverItem) { |
| - var src io.ReadSeeker |
| - if item.src == nil { |
| - f, err := os.Open(item.path) |
| - if err != nil { |
| - a.Cancel(err) |
| - item.setErr(err) |
| - item.Close() |
| - return |
| - } |
| - defer f.Close() |
| - src = f |
| - } else { |
| - src = item.src |
| - item.src = nil |
| - } |
| start := time.Now() |
| - if err := a.is.Push(item.state, src); err != nil { |
| + if err := a.is.Push(item.state, item.source); err != nil { |
| err = fmt.Errorf("push(%s) failed: %s\n", item.path, err) |
| a.Cancel(err) |
| item.setErr(err) |