Chromium Code Reviews| OLD | NEW |
|---|---|
| (Empty) | |
| 1 // Copyright 2016 The LUCI Authors. All rights reserved. | |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | |
| 3 // that can be found in the LICENSE file. | |
| 4 | |
| 5 package main | |
| 6 | |
| 7 import ( | |
| 8 "bytes" | |
| 9 "errors" | |
| 10 "fmt" | |
| 11 "hash" | |
| 12 "io" | |
| 13 "io/ioutil" | |
| 14 "os" | |
| 15 "time" | |
| 16 | |
| 17 "github.com/luci/luci-go/client/internal/common" | |
| 18 //"github.com/luci/luci-go/client/isolate" | |
|
M-A Ruel
2016/06/08 20:52:36
please trim comments
| |
| 19 "github.com/luci/luci-go/client/isolatedclient" | |
| 20 "github.com/luci/luci-go/common/api/isolate/isolateservice/v1" | |
| 21 "github.com/luci/luci-go/common/archive/ar" | |
| 22 "github.com/luci/luci-go/common/dirtools" | |
| 23 "github.com/luci/luci-go/common/isolated" | |
| 24 //"github.com/luci/luci-go/common/units" | |
| 25 "github.com/maruel/subcommands" | |
| 26 ) | |
| 27 | |
| 28 type ReadSeekerCloser interface { | |
| 29 io.Reader | |
| 30 io.Seeker | |
| 31 // io.Closer | |
| 32 } | |
| 33 | |
| 34 type ToHash struct { | |
|
M-A Ruel
2016/06/08 20:52:36
these do not need to be exported
| |
| 35 path string | |
| 36 } | |
| 37 type ToCheck struct { | |
| 38 digest isolateservice.HandlersEndpointsV1Digest | |
| 39 name string | |
| 40 source ReadSeekerCloser | |
| 41 } | |
| 42 type ToPush struct { | |
| 43 state *isolatedclient.PushState | |
| 44 name string | |
| 45 source ReadSeekerCloser | |
| 46 } | |
| 47 | |
| 48 func HashFile(is isolatedclient.IsolateServer, _ common.Canceler, src <-chan *To Hash, dst chan<- *ToCheck) { | |
| 49 for tohash := range src { | |
| 50 fmt.Printf("hashing %s\n", tohash.path) | |
| 51 d, _ := isolated.HashFile(tohash.path) | |
| 52 f, _ := os.Open(tohash.path) | |
| 53 dst <- &ToCheck{ | |
| 54 digest: d, | |
| 55 source: f, | |
| 56 name: tohash.path, | |
| 57 } | |
| 58 } | |
| 59 close(dst) | |
| 60 } | |
| 61 | |
| 62 const CHECK_BATCH_SIZE = 20 | |
|
M-A Ruel
2016/06/08 20:52:36
I prefer const to be at the top for consistency, i
| |
| 63 | |
| 64 func ChckFile(is isolatedclient.IsolateServer, canceler common.Canceler, src <-c han *ToCheck, dst chan<- *ToPush) { | |
|
M-A Ruel
2016/06/08 20:52:36
Check?
| |
| 65 check_count := 0 | |
| 66 | |
| 67 pool := common.NewGoroutinePool(5, canceler) | |
| 68 defer func() { | |
| 69 _ = pool.Wait() | |
| 70 }() | |
| 71 | |
| 72 done := false | |
| 73 for !done { | |
| 74 var digests [CHECK_BATCH_SIZE]*isolateservice.HandlersEndpointsV 1Digest | |
| 75 var topush [CHECK_BATCH_SIZE]ToPush | |
| 76 | |
| 77 index := 0 | |
| 78 Loop: | |
| 79 for index < CHECK_BATCH_SIZE && !done { | |
| 80 select { | |
| 81 case tocheck, more := <-src: | |
| 82 if !more { | |
| 83 done = true | |
| 84 break Loop | |
| 85 } | |
| 86 digests[index] = &tocheck.digest | |
| 87 topush[index] = ToPush{state: nil, source: toche ck.source, name: tocheck.name} | |
| 88 index += 1 | |
| 89 case <-time.After(time.Millisecond * 10): | |
| 90 break Loop | |
| 91 } | |
| 92 } | |
| 93 | |
| 94 if index > 0 { | |
| 95 inner_count := check_count | |
| 96 inner_index := index | |
| 97 pool.Schedule(func() { | |
| 98 fmt.Printf("checking(%d) %d files\n", inner_coun t, inner_index) | |
| 99 pushstates, err := is.Contains(digests[:inner_in dex]) | |
| 100 if err != nil { | |
| 101 fmt.Printf("checking(%d) error: %s\n", i nner_count, err) | |
| 102 return | |
| 103 } | |
| 104 for j, state := range pushstates { | |
| 105 topush[j].state = state | |
| 106 if state != nil { | |
| 107 fmt.Printf("need to push(%d): %s \n", inner_count, topush[j].name) | |
| 108 dst <- &topush[j] | |
| 109 } else { | |
| 110 fmt.Printf("skipping(%d): %s\n", inner_count, topush[j].name) | |
| 111 // sources[j].Close() | |
| 112 } | |
| 113 } | |
| 114 }, func() {}) | |
| 115 check_count += 1 | |
| 116 } | |
| 117 } | |
| 118 _ = pool.Wait() | |
| 119 close(dst) | |
|
M-A Ruel
2016/06/08 20:52:36
don't close a channel handed in, have the caller c
| |
| 120 } | |
| 121 | |
| 122 func PushFile(is isolatedclient.IsolateServer, canceler common.Canceler, src <-c han *ToPush, dst chan<- bool) { | |
| 123 pool := common.NewGoroutinePool(100, canceler) | |
| 124 defer func() { | |
| 125 _ = pool.Wait() | |
| 126 }() | |
| 127 | |
| 128 for topush := range src { | |
| 129 pool.Schedule(func() { | |
| 130 fmt.Printf("pushing: %s\n", topush.name) | |
| 131 err := is.Push(topush.state, func() (io.ReadCloser, erro r) { | |
| 132 topush.source.Seek(0, 0) | |
| 133 return ioutil.NopCloser(topush.source), nil | |
| 134 }) | |
| 135 if err != nil { | |
| 136 fmt.Println("pushing err:", err) | |
| 137 } else { | |
| 138 fmt.Println("pushed:", topush.state) | |
| 139 } | |
| 140 // topush.source.Close() | |
| 141 }, func() {}) | |
| 142 } | |
| 143 _ = pool.Wait() | |
| 144 close(dst) | |
| 145 } | |
| 146 | |
| 147 // --- | |
| 148 type SmallFilesCollection struct { | |
| 149 index int | |
| 150 buffer *bytes.Buffer | |
| 151 hash hash.Hash | |
| 152 ar *ar.Writer | |
| 153 } | |
| 154 | |
| 155 func NewSmallFilesCollection(index int) *SmallFilesCollection { | |
| 156 var o SmallFilesCollection | |
| 157 o.index = index | |
| 158 o.buffer = new(bytes.Buffer) | |
| 159 o.hash = isolated.GetHash() | |
| 160 | |
| 161 var w io.Writer = o.buffer | |
| 162 w = io.MultiWriter(w, o.hash) | |
| 163 o.ar = ar.NewWriter(w) | |
| 164 return &o | |
| 165 } | |
| 166 | |
| 167 func (b SmallFilesCollection) RequestCheck(dst chan<- *ToCheck) { | |
| 168 fmt.Printf("rotating smallfilescollection-%d (%d bytes)\n", b.index, b.b uffer.Len()) | |
| 169 dst <- &ToCheck{ | |
| 170 digest: isolateservice.HandlersEndpointsV1Digest{ | |
| 171 Digest: string(isolated.Sum(b.hash)), | |
| 172 IsIsolated: false, | |
| 173 Size: int64(b.buffer.Len()), | |
| 174 }, | |
| 175 source: bytes.NewReader(b.buffer.Bytes()), | |
| 176 name: fmt.Sprintf("smallfilescollection-%d", b.index), | |
| 177 } | |
| 178 } | |
| 179 | |
| 180 // | |
| 181 | |
| 182 const SMALLFILES_MAXSIZE = 1024 * 64 // 64kbytes | |
| 183 const SMALLFILES_AR_MAXSIZE = 1024 * 1024 * 100 // 100MBytes | |
| 184 | |
| 185 type SmallFilesWalkObserver struct { | |
| 186 trim string | |
| 187 chck_chan chan<- *ToCheck | |
| 188 smallfiles_buffer *SmallFilesCollection | |
| 189 largefiles_queue []string | |
| 190 } | |
| 191 | |
| 192 func NewSmallFilesWalkObserver(trim string, chck_chan chan<- *ToCheck) *SmallFil esWalkObserver { | |
| 193 return &SmallFilesWalkObserver{ | |
| 194 trim: trim, | |
| 195 chck_chan: chck_chan, | |
| 196 smallfiles_buffer: NewSmallFilesCollection(0), | |
| 197 largefiles_queue: make([]string, 0), | |
| 198 } | |
| 199 } | |
| 200 | |
| 201 func (s *SmallFilesWalkObserver) SmallFile(name string, alldata []byte) { | |
| 202 s.smallfiles_buffer.ar.Add(name[len(s.trim)+1:], alldata) | |
| 203 if s.smallfiles_buffer.buffer.Len() > SMALLFILES_AR_MAXSIZE { | |
| 204 s.smallfiles_buffer.RequestCheck(s.chck_chan) | |
| 205 s.smallfiles_buffer = NewSmallFilesCollection(s.smallfiles_buffe r.index + 1) | |
| 206 if s.smallfiles_buffer.buffer.Len() > 100 { | |
| 207 panic("Ahh!") | |
| 208 } | |
| 209 } | |
| 210 } | |
| 211 | |
| 212 func (s *SmallFilesWalkObserver) LargeFile(name string) { | |
| 213 s.largefiles_queue = append(s.largefiles_queue, name) | |
| 214 } | |
| 215 | |
| 216 func (s *SmallFilesWalkObserver) Error(path string, err error) { | |
| 217 fmt.Println(path, err) | |
| 218 } | |
| 219 | |
| 220 func upload(is isolatedclient.IsolateServer, path string) { | |
| 221 hash_chan := make(chan *ToHash, 10) | |
| 222 chck_chan := make(chan *ToCheck, 1) | |
| 223 push_chan := make(chan *ToPush, 10) | |
| 224 done_chan := make(chan bool) | |
|
M-A Ruel
2016/06/08 20:52:36
technically, you want a sync.WaitGroup so in each
| |
| 225 | |
| 226 canceler := common.NewCanceler() | |
| 227 | |
| 228 go HashFile(is, canceler, hash_chan, chck_chan) | |
|
M-A Ruel
2016/06/08 20:52:36
- these do not need to be exported
- I prefer the
| |
| 229 go ChckFile(is, canceler, chck_chan, push_chan) | |
| 230 go PushFile(is, canceler, push_chan, done_chan) | |
| 231 | |
| 232 obs := NewSmallFilesWalkObserver(path, chck_chan) | |
| 233 dirtools.WalkNoStat(path, SMALLFILES_MAXSIZE, obs) | |
| 234 obs.smallfiles_buffer.RequestCheck(obs.chck_chan) | |
| 235 | |
| 236 for _, name := range obs.largefiles_queue { | |
| 237 hash_chan <- &ToHash{name} | |
| 238 } | |
| 239 | |
| 240 close(hash_chan) | |
| 241 <-done_chan | |
| 242 } | |
| 243 | |
| 244 var cmdFastArchive = &subcommands.Command{ | |
| 245 UsageLine: "fastarchive <options>", | |
| 246 ShortDesc: "creates a .isolated file and uploads the tree to an isolate server.", | |
| 247 LongDesc: "All the files listed in the .isolated file are put in the is olate server cache via isolateserver.py.", | |
| 248 CommandRun: func() subcommands.CommandRun { | |
| 249 c := fastArchiveRun{} | |
| 250 c.commonServerFlags.Init() | |
| 251 c.isolateFlags.Init(&c.Flags) | |
| 252 return &c | |
| 253 }, | |
| 254 } | |
| 255 | |
| 256 type fastArchiveRun struct { | |
| 257 commonServerFlags | |
| 258 isolateFlags | |
| 259 } | |
| 260 | |
| 261 func (c *fastArchiveRun) Parse(a subcommands.Application, args []string) error { | |
| 262 if err := c.commonServerFlags.Parse(); err != nil { | |
| 263 return err | |
| 264 } | |
| 265 cwd, err := os.Getwd() | |
| 266 if err != nil { | |
| 267 return err | |
| 268 } | |
| 269 if err := c.isolateFlags.Parse(cwd, RequireIsolatedFile); err != nil { | |
| 270 return err | |
| 271 } | |
| 272 if len(args) != 0 { | |
| 273 return errors.New("position arguments not expected") | |
| 274 } | |
| 275 return nil | |
| 276 } | |
| 277 | |
| 278 func (c *fastArchiveRun) main(a subcommands.Application, args []string) error { | |
| 279 /* | |
| 280 out := os.Stdout | |
| 281 prefix := "\n" | |
| 282 if c.defaultFlags.Quiet { | |
| 283 out = nil | |
| 284 prefix = "" | |
| 285 } | |
| 286 start := time.Now() | |
| 287 */ | |
| 288 client, err := c.createAuthClient() | |
| 289 if err != nil { | |
| 290 return err | |
| 291 } | |
| 292 | |
| 293 is := isolatedclient.New(client, c.isolatedFlags.ServerURL, c.isolatedFl ags.Namespace) | |
| 294 fmt.Println(c.Isolate) | |
| 295 upload(is, c.Isolate) | |
| 296 | |
| 297 return nil | |
| 298 } | |
| 299 | |
| 300 func (c *fastArchiveRun) Run(a subcommands.Application, args []string) int { | |
| 301 if err := c.Parse(a, args); err != nil { | |
| 302 fmt.Fprintf(a.GetErr(), "%s: %s\n", a.GetName(), err) | |
| 303 return 1 | |
| 304 } | |
| 305 cl, err := c.defaultFlags.StartTracing() | |
| 306 if err != nil { | |
| 307 fmt.Fprintf(a.GetErr(), "%s: %s\n", a.GetName(), err) | |
| 308 return 1 | |
| 309 } | |
| 310 defer cl.Close() | |
| 311 if err := c.main(a, args); err != nil { | |
| 312 fmt.Fprintf(a.GetErr(), "%s: %s\n", a.GetName(), err) | |
| 313 return 1 | |
| 314 } | |
| 315 return 0 | |
| 316 } | |
| OLD | NEW |