Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(166)

Unified Diff: client/archiver/archiver.go

Issue 1846263002: Isolate: Use generators instead of seekers (Closed) Base URL: https://github.com/luci/luci-go@master
Patch Set: Tweaks from comments. Created 4 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « no previous file | client/archiver/archiver_test.go » ('j') | client/isolatedclient/isolatedclient.go » ('J')
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: client/archiver/archiver.go
diff --git a/client/archiver/archiver.go b/client/archiver/archiver.go
index 360538174b50094ebec2e86d582b006cdd53cf72..2b58260bc732b976faab8f037c8f7c7a3845ff78 100644
--- a/client/archiver/archiver.go
+++ b/client/archiver/archiver.go
@@ -75,7 +75,7 @@ type Archiver interface {
common.Canceler
// Push schedules item upload to the isolate server.
// Smaller priority value means earlier processing.
- Push(displayName string, src io.ReadSeeker, priority int64) Future
+ Push(displayName string, src isolatedclient.Source, priority int64) Future
// PushFile schedules file upload to the isolate server.
// Smaller priority value means earlier processing.
PushFile(displayName, path string, priority int64) Future
@@ -190,9 +190,9 @@ func New(is isolatedclient.IsolateServer, out io.Writer) Archiver {
type archiverItem struct {
// Immutable.
displayName string // Name to use to qualify this item
- path string // Set when source is a file on disk
wgHashed sync.WaitGroup // Released once .digestItem.Digest is set
priority int64 // Lower values - earlier hashing and uploading.
+ path string // Set when the source is a file on disk.
a *archiver
// Mutable.
@@ -202,13 +202,13 @@ type archiverItem struct {
linked []*archiverItem // Deduplicated item.
// Mutable but not accessible externally.
- src io.ReadSeeker // Source of data
- state *isolatedclient.PushState // Server-side push state for cache miss
+ source isolatedclient.Source // Source of data
+ state *isolatedclient.PushState // Server-side push state for cache miss
}
-func newArchiverItem(a *archiver, displayName, path string, src io.ReadSeeker, priority int64) *archiverItem {
+func newArchiverItem(a *archiver, displayName, path string, source isolatedclient.Source, priority int64) *archiverItem {
tracer.CounterAdd(a, "itemsProcessing", 1)
- i := &archiverItem{a: a, displayName: displayName, path: path, src: src, priority: priority}
+ i := &archiverItem{a: a, displayName: displayName, path: path, source: source, priority: priority}
i.wgHashed.Add(1)
return i
}
@@ -239,6 +239,10 @@ func (i *archiverItem) Digest() isolated.HexDigest {
return isolated.HexDigest(i.digestItem.Digest)
}
+func (i *archiverItem) isFile() bool {
+ return len(i.path) != 0
+}
+
func (i *archiverItem) setErr(err error) {
if err == nil {
panic("internal error")
@@ -253,35 +257,26 @@ func (i *archiverItem) setErr(err error) {
child.lock.Unlock()
}
}
- // TODO(maruel): Support Close().
- i.src = nil
}
func (i *archiverItem) calcDigest() error {
defer i.wgHashed.Done()
var d isolateservice.HandlersEndpointsV1Digest
- if i.path != "" {
- // Open and hash the file.
- var err error
- if d, err = isolated.HashFile(i.path); err != nil {
- i.setErr(err)
- return fmt.Errorf("hash(%s) failed: %s\n", i.DisplayName(), err)
- }
- } else {
- // Use src instead.
- h := isolated.GetHash()
- size, err := io.Copy(h, i.src)
- if err != nil {
- i.setErr(err)
- return fmt.Errorf("read(%s) failed: %s\n", i.DisplayName(), err)
- }
- if pos, err := i.src.Seek(0, os.SEEK_SET); err != nil || pos != 0 {
- err = fmt.Errorf("seek(%s) failed: %s\n", i.DisplayName(), err)
- i.setErr(err)
- return err
- }
- d = isolateservice.HandlersEndpointsV1Digest{Digest: string(isolated.Sum(h)), IsIsolated: true, Size: size}
+
+ src, err := i.source()
+ if err != nil {
+ return fmt.Errorf("source(%s) failed: %s\n", i.DisplayName(), err)
}
+ defer src.Close()
+
+ h := isolated.GetHash()
+ size, err := io.Copy(h, src)
+ if err != nil {
+ i.setErr(err)
+ return fmt.Errorf("read(%s) failed: %s\n", i.DisplayName(), err)
+ }
+ d = isolateservice.HandlersEndpointsV1Digest{Digest: string(isolated.Sum(h)), IsIsolated: true, Size: size}
+
i.lock.Lock()
defer i.lock.Unlock()
i.digestItem = d
@@ -298,7 +293,7 @@ func (i *archiverItem) calcDigest() error {
func (i *archiverItem) link(child *archiverItem) {
child.lock.Lock()
defer child.lock.Unlock()
- if child.src != nil || child.state != nil || child.linked != nil || child.err != nil {
+ if !child.isFile() || child.state != nil || child.linked != nil || child.err != nil {
panic("internal error")
}
i.lock.Lock()
@@ -380,18 +375,15 @@ func (a *archiver) Channel() <-chan error {
return a.canceler.Channel()
}
-func (a *archiver) Push(displayName string, src io.ReadSeeker, priority int64) Future {
- i := newArchiverItem(a, displayName, "", src, priority)
- if pos, err := i.src.Seek(0, os.SEEK_SET); pos != 0 || err != nil {
- i.setErr(fmt.Errorf("seek(%s) failed: %s\n", i.DisplayName(), err))
- i.wgHashed.Done()
- return i
- }
- return a.push(i)
+func (a *archiver) Push(displayName string, source isolatedclient.Source, priority int64) Future {
+ return a.push(newArchiverItem(a, displayName, "", source, priority))
}
func (a *archiver) PushFile(displayName, path string, priority int64) Future {
- return a.push(newArchiverItem(a, displayName, path, nil, priority))
+ source := func() (io.ReadCloser, error) {
+ return os.Open(path)
+ }
+ return a.push(newArchiverItem(a, displayName, path, source, priority))
}
func (a *archiver) Stats() *Stats {
@@ -464,7 +456,7 @@ func (a *archiver) stage1DedupeLoop() {
}
// TODO(maruel): Resolve symlinks for further deduplication? Depends on the
// use case, not sure the trade off is worth.
- if item.path != "" {
+ if item.isFile() {
if previous, ok := seen[item.path]; ok {
previous.link(item)
// TODO(maruel): Semantically weird.
@@ -624,23 +616,8 @@ func (a *archiver) doContains(items []*archiverItem) {
// doUpload is called by stage 4.
func (a *archiver) doUpload(item *archiverItem) {
- var src io.ReadSeeker
- if item.src == nil {
- f, err := os.Open(item.path)
- if err != nil {
- a.Cancel(err)
- item.setErr(err)
- item.Close()
- return
- }
- defer f.Close()
- src = f
- } else {
- src = item.src
- item.src = nil
- }
start := time.Now()
- if err := a.is.Push(item.state, src); err != nil {
+ if err := a.is.Push(item.state, item.source); err != nil {
err = fmt.Errorf("push(%s) failed: %s\n", item.path, err)
a.Cancel(err)
item.setErr(err)
« no previous file with comments | « no previous file | client/archiver/archiver_test.go » ('j') | client/isolatedclient/isolatedclient.go » ('J')

Powered by Google App Engine
This is Rietveld 408576698