Index: client/cmd/isolate/fastarchive.go |
diff --git a/client/cmd/isolate/fastarchive.go b/client/cmd/isolate/fastarchive.go |
new file mode 100644 |
index 0000000000000000000000000000000000000000..338a5b6abfb21e9e439ef9c1fe2bb685ed160d7d |
--- /dev/null |
+++ b/client/cmd/isolate/fastarchive.go |
@@ -0,0 +1,316 @@ |
+// Copyright 2016 The LUCI Authors. All rights reserved. |
+// Use of this source code is governed under the Apache License, Version 2.0 |
+// that can be found in the LICENSE file. |
+ |
+package main |
+ |
+import ( |
+ "bytes" |
+ "errors" |
+ "fmt" |
+ "hash" |
+ "io" |
+ "io/ioutil" |
+ "os" |
+ "time" |
+ |
+ "github.com/luci/luci-go/client/internal/common" |
+ //"github.com/luci/luci-go/client/isolate" |
M-A Ruel
2016/06/08 20:52:36
please trim comments
|
+ "github.com/luci/luci-go/client/isolatedclient" |
+ "github.com/luci/luci-go/common/api/isolate/isolateservice/v1" |
+ "github.com/luci/luci-go/common/archive/ar" |
+ "github.com/luci/luci-go/common/dirtools" |
+ "github.com/luci/luci-go/common/isolated" |
+ //"github.com/luci/luci-go/common/units" |
+ "github.com/maruel/subcommands" |
+) |
+ |
+type ReadSeekerCloser interface { |
+ io.Reader |
+ io.Seeker |
+ // io.Closer |
+} |
+ |
+type ToHash struct { |
M-A Ruel
2016/06/08 20:52:36
these do not need to be exported
|
+ path string |
+} |
+type ToCheck struct { |
+ digest isolateservice.HandlersEndpointsV1Digest |
+ name string |
+ source ReadSeekerCloser |
+} |
+type ToPush struct { |
+ state *isolatedclient.PushState |
+ name string |
+ source ReadSeekerCloser |
+} |
+ |
+func HashFile(is isolatedclient.IsolateServer, _ common.Canceler, src <-chan *ToHash, dst chan<- *ToCheck) { |
+ for tohash := range src { |
+ fmt.Printf("hashing %s\n", tohash.path) |
+ d, _ := isolated.HashFile(tohash.path) |
+ f, _ := os.Open(tohash.path) |
+ dst <- &ToCheck{ |
+ digest: d, |
+ source: f, |
+ name: tohash.path, |
+ } |
+ } |
+ close(dst) |
+} |
+ |
+const CHECK_BATCH_SIZE = 20 |
M-A Ruel
2016/06/08 20:52:36
I prefer const to be at the top for consistency, i
|
+ |
+func ChckFile(is isolatedclient.IsolateServer, canceler common.Canceler, src <-chan *ToCheck, dst chan<- *ToPush) { |
M-A Ruel
2016/06/08 20:52:36
Check?
|
+ check_count := 0 |
+ |
+ pool := common.NewGoroutinePool(5, canceler) |
+ defer func() { |
+ _ = pool.Wait() |
+ }() |
+ |
+ done := false |
+ for !done { |
+ var digests [CHECK_BATCH_SIZE]*isolateservice.HandlersEndpointsV1Digest |
+ var topush [CHECK_BATCH_SIZE]ToPush |
+ |
+ index := 0 |
+ Loop: |
+ for index < CHECK_BATCH_SIZE && !done { |
+ select { |
+ case tocheck, more := <-src: |
+ if !more { |
+ done = true |
+ break Loop |
+ } |
+ digests[index] = &tocheck.digest |
+ topush[index] = ToPush{state: nil, source: tocheck.source, name: tocheck.name} |
+ index += 1 |
+ case <-time.After(time.Millisecond * 10): |
+ break Loop |
+ } |
+ } |
+ |
+ if index > 0 { |
+ inner_count := check_count |
+ inner_index := index |
+ pool.Schedule(func() { |
+ fmt.Printf("checking(%d) %d files\n", inner_count, inner_index) |
+ pushstates, err := is.Contains(digests[:inner_index]) |
+ if err != nil { |
+ fmt.Printf("checking(%d) error: %s\n", inner_count, err) |
+ return |
+ } |
+ for j, state := range pushstates { |
+ topush[j].state = state |
+ if state != nil { |
+ fmt.Printf("need to push(%d): %s\n", inner_count, topush[j].name) |
+ dst <- &topush[j] |
+ } else { |
+ fmt.Printf("skipping(%d): %s\n", inner_count, topush[j].name) |
+ // sources[j].Close() |
+ } |
+ } |
+ }, func() {}) |
+ check_count += 1 |
+ } |
+ } |
+ _ = pool.Wait() |
+ close(dst) |
M-A Ruel
2016/06/08 20:52:36
don't close a channel handed in, have the caller c
|
+} |
+ |
+func PushFile(is isolatedclient.IsolateServer, canceler common.Canceler, src <-chan *ToPush, dst chan<- bool) { |
+ pool := common.NewGoroutinePool(100, canceler) |
+ defer func() { |
+ _ = pool.Wait() |
+ }() |
+ |
+ for topush := range src { |
+ pool.Schedule(func() { |
+ fmt.Printf("pushing: %s\n", topush.name) |
+ err := is.Push(topush.state, func() (io.ReadCloser, error) { |
+ topush.source.Seek(0, 0) |
+ return ioutil.NopCloser(topush.source), nil |
+ }) |
+ if err != nil { |
+ fmt.Println("pushing err:", err) |
+ } else { |
+ fmt.Println("pushed:", topush.state) |
+ } |
+ // topush.source.Close() |
+ }, func() {}) |
+ } |
+ _ = pool.Wait() |
+ close(dst) |
+} |
+ |
+// --- |
+type SmallFilesCollection struct { |
+ index int |
+ buffer *bytes.Buffer |
+ hash hash.Hash |
+ ar *ar.Writer |
+} |
+ |
+func NewSmallFilesCollection(index int) *SmallFilesCollection { |
+ var o SmallFilesCollection |
+ o.index = index |
+ o.buffer = new(bytes.Buffer) |
+ o.hash = isolated.GetHash() |
+ |
+ var w io.Writer = o.buffer |
+ w = io.MultiWriter(w, o.hash) |
+ o.ar = ar.NewWriter(w) |
+ return &o |
+} |
+ |
+func (b SmallFilesCollection) RequestCheck(dst chan<- *ToCheck) { |
+ fmt.Printf("rotating smallfilescollection-%d (%d bytes)\n", b.index, b.buffer.Len()) |
+ dst <- &ToCheck{ |
+ digest: isolateservice.HandlersEndpointsV1Digest{ |
+ Digest: string(isolated.Sum(b.hash)), |
+ IsIsolated: false, |
+ Size: int64(b.buffer.Len()), |
+ }, |
+ source: bytes.NewReader(b.buffer.Bytes()), |
+ name: fmt.Sprintf("smallfilescollection-%d", b.index), |
+ } |
+} |
+ |
+// |
+ |
+const SMALLFILES_MAXSIZE = 1024 * 64 // 64kbytes |
+const SMALLFILES_AR_MAXSIZE = 1024 * 1024 * 100 // 100MBytes |
+ |
+type SmallFilesWalkObserver struct { |
+ trim string |
+ chck_chan chan<- *ToCheck |
+ smallfiles_buffer *SmallFilesCollection |
+ largefiles_queue []string |
+} |
+ |
+func NewSmallFilesWalkObserver(trim string, chck_chan chan<- *ToCheck) *SmallFilesWalkObserver { |
+ return &SmallFilesWalkObserver{ |
+ trim: trim, |
+ chck_chan: chck_chan, |
+ smallfiles_buffer: NewSmallFilesCollection(0), |
+ largefiles_queue: make([]string, 0), |
+ } |
+} |
+ |
+func (s *SmallFilesWalkObserver) SmallFile(name string, alldata []byte) { |
+ s.smallfiles_buffer.ar.Add(name[len(s.trim)+1:], alldata) |
+ if s.smallfiles_buffer.buffer.Len() > SMALLFILES_AR_MAXSIZE { |
+ s.smallfiles_buffer.RequestCheck(s.chck_chan) |
+ s.smallfiles_buffer = NewSmallFilesCollection(s.smallfiles_buffer.index + 1) |
+ if s.smallfiles_buffer.buffer.Len() > 100 { |
+ panic("Ahh!") |
+ } |
+ } |
+} |
+ |
+func (s *SmallFilesWalkObserver) LargeFile(name string) { |
+ s.largefiles_queue = append(s.largefiles_queue, name) |
+} |
+ |
+func (s *SmallFilesWalkObserver) Error(path string, err error) { |
+ fmt.Println(path, err) |
+} |
+ |
+func upload(is isolatedclient.IsolateServer, path string) { |
+ hash_chan := make(chan *ToHash, 10) |
+ chck_chan := make(chan *ToCheck, 1) |
+ push_chan := make(chan *ToPush, 10) |
+ done_chan := make(chan bool) |
M-A Ruel
2016/06/08 20:52:36
technically, you want a sync.WaitGroup so in each
|
+ |
+ canceler := common.NewCanceler() |
+ |
+ go HashFile(is, canceler, hash_chan, chck_chan) |
M-A Ruel
2016/06/08 20:52:36
- these do not need to be exported
- I prefer the
|
+ go ChckFile(is, canceler, chck_chan, push_chan) |
+ go PushFile(is, canceler, push_chan, done_chan) |
+ |
+ obs := NewSmallFilesWalkObserver(path, chck_chan) |
+ dirtools.WalkNoStat(path, SMALLFILES_MAXSIZE, obs) |
+ obs.smallfiles_buffer.RequestCheck(obs.chck_chan) |
+ |
+ for _, name := range obs.largefiles_queue { |
+ hash_chan <- &ToHash{name} |
+ } |
+ |
+ close(hash_chan) |
+ <-done_chan |
+} |
+ |
+var cmdFastArchive = &subcommands.Command{ |
+ UsageLine: "fastarchive <options>", |
+ ShortDesc: "creates a .isolated file and uploads the tree to an isolate server.", |
+ LongDesc: "All the files listed in the .isolated file are put in the isolate server cache via isolateserver.py.", |
+ CommandRun: func() subcommands.CommandRun { |
+ c := fastArchiveRun{} |
+ c.commonServerFlags.Init() |
+ c.isolateFlags.Init(&c.Flags) |
+ return &c |
+ }, |
+} |
+ |
+type fastArchiveRun struct { |
+ commonServerFlags |
+ isolateFlags |
+} |
+ |
+func (c *fastArchiveRun) Parse(a subcommands.Application, args []string) error { |
+ if err := c.commonServerFlags.Parse(); err != nil { |
+ return err |
+ } |
+ cwd, err := os.Getwd() |
+ if err != nil { |
+ return err |
+ } |
+ if err := c.isolateFlags.Parse(cwd, RequireIsolatedFile); err != nil { |
+ return err |
+ } |
+ if len(args) != 0 { |
+ return errors.New("position arguments not expected") |
+ } |
+ return nil |
+} |
+ |
+func (c *fastArchiveRun) main(a subcommands.Application, args []string) error { |
+ /* |
+ out := os.Stdout |
+ prefix := "\n" |
+ if c.defaultFlags.Quiet { |
+ out = nil |
+ prefix = "" |
+ } |
+ start := time.Now() |
+ */ |
+ client, err := c.createAuthClient() |
+ if err != nil { |
+ return err |
+ } |
+ |
+ is := isolatedclient.New(client, c.isolatedFlags.ServerURL, c.isolatedFlags.Namespace) |
+ fmt.Println(c.Isolate) |
+ upload(is, c.Isolate) |
+ |
+ return nil |
+} |
+ |
+func (c *fastArchiveRun) Run(a subcommands.Application, args []string) int { |
+ if err := c.Parse(a, args); err != nil { |
+ fmt.Fprintf(a.GetErr(), "%s: %s\n", a.GetName(), err) |
+ return 1 |
+ } |
+ cl, err := c.defaultFlags.StartTracing() |
+ if err != nil { |
+ fmt.Fprintf(a.GetErr(), "%s: %s\n", a.GetName(), err) |
+ return 1 |
+ } |
+ defer cl.Close() |
+ if err := c.main(a, args); err != nil { |
+ fmt.Fprintf(a.GetErr(), "%s: %s\n", a.GetName(), err) |
+ return 1 |
+ } |
+ return 0 |
+} |