Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(712)

Side by Side Diff: client/cmd/isolate/fastarchive.go

Issue 2014243002: WIP: Archive command which is much faster (Closed) Base URL: https://github.com/luci/luci-go@master
Patch Set: Fixes. Created 4 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « no previous file | client/cmd/isolate/main.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright 2016 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file.
4
5 package main
6
7 import (
8 "bytes"
9 "errors"
10 "fmt"
11 "hash"
12 "io"
13 "io/ioutil"
14 "os"
15 "time"
16
17 "github.com/luci/luci-go/client/internal/common"
18 //"github.com/luci/luci-go/client/isolate"
M-A Ruel 2016/06/08 20:52:36 please trim comments
19 "github.com/luci/luci-go/client/isolatedclient"
20 "github.com/luci/luci-go/common/api/isolate/isolateservice/v1"
21 "github.com/luci/luci-go/common/archive/ar"
22 "github.com/luci/luci-go/common/dirtools"
23 "github.com/luci/luci-go/common/isolated"
24 //"github.com/luci/luci-go/common/units"
25 "github.com/maruel/subcommands"
26 )
27
28 type ReadSeekerCloser interface {
29 io.Reader
30 io.Seeker
31 // io.Closer
32 }
33
34 type ToHash struct {
M-A Ruel 2016/06/08 20:52:36 these do not need to be exported
35 path string
36 }
37 type ToCheck struct {
38 digest isolateservice.HandlersEndpointsV1Digest
39 name string
40 source ReadSeekerCloser
41 }
42 type ToPush struct {
43 state *isolatedclient.PushState
44 name string
45 source ReadSeekerCloser
46 }
47
48 func HashFile(is isolatedclient.IsolateServer, _ common.Canceler, src <-chan *To Hash, dst chan<- *ToCheck) {
49 for tohash := range src {
50 fmt.Printf("hashing %s\n", tohash.path)
51 d, _ := isolated.HashFile(tohash.path)
52 f, _ := os.Open(tohash.path)
53 dst <- &ToCheck{
54 digest: d,
55 source: f,
56 name: tohash.path,
57 }
58 }
59 close(dst)
60 }
61
62 const CHECK_BATCH_SIZE = 20
M-A Ruel 2016/06/08 20:52:36 I prefer const to be at the top for consistency, i
63
64 func ChckFile(is isolatedclient.IsolateServer, canceler common.Canceler, src <-c han *ToCheck, dst chan<- *ToPush) {
M-A Ruel 2016/06/08 20:52:36 Check?
65 check_count := 0
66
67 pool := common.NewGoroutinePool(5, canceler)
68 defer func() {
69 _ = pool.Wait()
70 }()
71
72 done := false
73 for !done {
74 var digests [CHECK_BATCH_SIZE]*isolateservice.HandlersEndpointsV 1Digest
75 var topush [CHECK_BATCH_SIZE]ToPush
76
77 index := 0
78 Loop:
79 for index < CHECK_BATCH_SIZE && !done {
80 select {
81 case tocheck, more := <-src:
82 if !more {
83 done = true
84 break Loop
85 }
86 digests[index] = &tocheck.digest
87 topush[index] = ToPush{state: nil, source: toche ck.source, name: tocheck.name}
88 index += 1
89 case <-time.After(time.Millisecond * 10):
90 break Loop
91 }
92 }
93
94 if index > 0 {
95 inner_count := check_count
96 inner_index := index
97 pool.Schedule(func() {
98 fmt.Printf("checking(%d) %d files\n", inner_coun t, inner_index)
99 pushstates, err := is.Contains(digests[:inner_in dex])
100 if err != nil {
101 fmt.Printf("checking(%d) error: %s\n", i nner_count, err)
102 return
103 }
104 for j, state := range pushstates {
105 topush[j].state = state
106 if state != nil {
107 fmt.Printf("need to push(%d): %s \n", inner_count, topush[j].name)
108 dst <- &topush[j]
109 } else {
110 fmt.Printf("skipping(%d): %s\n", inner_count, topush[j].name)
111 // sources[j].Close()
112 }
113 }
114 }, func() {})
115 check_count += 1
116 }
117 }
118 _ = pool.Wait()
119 close(dst)
M-A Ruel 2016/06/08 20:52:36 don't close a channel handed in, have the caller c
120 }
121
122 func PushFile(is isolatedclient.IsolateServer, canceler common.Canceler, src <-c han *ToPush, dst chan<- bool) {
123 pool := common.NewGoroutinePool(100, canceler)
124 defer func() {
125 _ = pool.Wait()
126 }()
127
128 for topush := range src {
129 pool.Schedule(func() {
130 fmt.Printf("pushing: %s\n", topush.name)
131 err := is.Push(topush.state, func() (io.ReadCloser, erro r) {
132 topush.source.Seek(0, 0)
133 return ioutil.NopCloser(topush.source), nil
134 })
135 if err != nil {
136 fmt.Println("pushing err:", err)
137 } else {
138 fmt.Println("pushed:", topush.state)
139 }
140 // topush.source.Close()
141 }, func() {})
142 }
143 _ = pool.Wait()
144 close(dst)
145 }
146
147 // ---
148 type SmallFilesCollection struct {
149 index int
150 buffer *bytes.Buffer
151 hash hash.Hash
152 ar *ar.Writer
153 }
154
155 func NewSmallFilesCollection(index int) *SmallFilesCollection {
156 var o SmallFilesCollection
157 o.index = index
158 o.buffer = new(bytes.Buffer)
159 o.hash = isolated.GetHash()
160
161 var w io.Writer = o.buffer
162 w = io.MultiWriter(w, o.hash)
163 o.ar = ar.NewWriter(w)
164 return &o
165 }
166
167 func (b SmallFilesCollection) RequestCheck(dst chan<- *ToCheck) {
168 fmt.Printf("rotating smallfilescollection-%d (%d bytes)\n", b.index, b.b uffer.Len())
169 dst <- &ToCheck{
170 digest: isolateservice.HandlersEndpointsV1Digest{
171 Digest: string(isolated.Sum(b.hash)),
172 IsIsolated: false,
173 Size: int64(b.buffer.Len()),
174 },
175 source: bytes.NewReader(b.buffer.Bytes()),
176 name: fmt.Sprintf("smallfilescollection-%d", b.index),
177 }
178 }
179
180 //
181
182 const SMALLFILES_MAXSIZE = 1024 * 64 // 64kbytes
183 const SMALLFILES_AR_MAXSIZE = 1024 * 1024 * 100 // 100MBytes
184
185 type SmallFilesWalkObserver struct {
186 trim string
187 chck_chan chan<- *ToCheck
188 smallfiles_buffer *SmallFilesCollection
189 largefiles_queue []string
190 }
191
192 func NewSmallFilesWalkObserver(trim string, chck_chan chan<- *ToCheck) *SmallFil esWalkObserver {
193 return &SmallFilesWalkObserver{
194 trim: trim,
195 chck_chan: chck_chan,
196 smallfiles_buffer: NewSmallFilesCollection(0),
197 largefiles_queue: make([]string, 0),
198 }
199 }
200
201 func (s *SmallFilesWalkObserver) SmallFile(name string, alldata []byte) {
202 s.smallfiles_buffer.ar.Add(name[len(s.trim)+1:], alldata)
203 if s.smallfiles_buffer.buffer.Len() > SMALLFILES_AR_MAXSIZE {
204 s.smallfiles_buffer.RequestCheck(s.chck_chan)
205 s.smallfiles_buffer = NewSmallFilesCollection(s.smallfiles_buffe r.index + 1)
206 if s.smallfiles_buffer.buffer.Len() > 100 {
207 panic("Ahh!")
208 }
209 }
210 }
211
212 func (s *SmallFilesWalkObserver) LargeFile(name string) {
213 s.largefiles_queue = append(s.largefiles_queue, name)
214 }
215
216 func (s *SmallFilesWalkObserver) Error(path string, err error) {
217 fmt.Println(path, err)
218 }
219
220 func upload(is isolatedclient.IsolateServer, path string) {
221 hash_chan := make(chan *ToHash, 10)
222 chck_chan := make(chan *ToCheck, 1)
223 push_chan := make(chan *ToPush, 10)
224 done_chan := make(chan bool)
M-A Ruel 2016/06/08 20:52:36 technically, you want a sync.WaitGroup so in each
225
226 canceler := common.NewCanceler()
227
228 go HashFile(is, canceler, hash_chan, chck_chan)
M-A Ruel 2016/06/08 20:52:36 - these do not need to be exported - I prefer the
229 go ChckFile(is, canceler, chck_chan, push_chan)
230 go PushFile(is, canceler, push_chan, done_chan)
231
232 obs := NewSmallFilesWalkObserver(path, chck_chan)
233 dirtools.WalkNoStat(path, SMALLFILES_MAXSIZE, obs)
234 obs.smallfiles_buffer.RequestCheck(obs.chck_chan)
235
236 for _, name := range obs.largefiles_queue {
237 hash_chan <- &ToHash{name}
238 }
239
240 close(hash_chan)
241 <-done_chan
242 }
243
244 var cmdFastArchive = &subcommands.Command{
245 UsageLine: "fastarchive <options>",
246 ShortDesc: "creates a .isolated file and uploads the tree to an isolate server.",
247 LongDesc: "All the files listed in the .isolated file are put in the is olate server cache via isolateserver.py.",
248 CommandRun: func() subcommands.CommandRun {
249 c := fastArchiveRun{}
250 c.commonServerFlags.Init()
251 c.isolateFlags.Init(&c.Flags)
252 return &c
253 },
254 }
255
256 type fastArchiveRun struct {
257 commonServerFlags
258 isolateFlags
259 }
260
261 func (c *fastArchiveRun) Parse(a subcommands.Application, args []string) error {
262 if err := c.commonServerFlags.Parse(); err != nil {
263 return err
264 }
265 cwd, err := os.Getwd()
266 if err != nil {
267 return err
268 }
269 if err := c.isolateFlags.Parse(cwd, RequireIsolatedFile); err != nil {
270 return err
271 }
272 if len(args) != 0 {
273 return errors.New("position arguments not expected")
274 }
275 return nil
276 }
277
278 func (c *fastArchiveRun) main(a subcommands.Application, args []string) error {
279 /*
280 out := os.Stdout
281 prefix := "\n"
282 if c.defaultFlags.Quiet {
283 out = nil
284 prefix = ""
285 }
286 start := time.Now()
287 */
288 client, err := c.createAuthClient()
289 if err != nil {
290 return err
291 }
292
293 is := isolatedclient.New(client, c.isolatedFlags.ServerURL, c.isolatedFl ags.Namespace)
294 fmt.Println(c.Isolate)
295 upload(is, c.Isolate)
296
297 return nil
298 }
299
300 func (c *fastArchiveRun) Run(a subcommands.Application, args []string) int {
301 if err := c.Parse(a, args); err != nil {
302 fmt.Fprintf(a.GetErr(), "%s: %s\n", a.GetName(), err)
303 return 1
304 }
305 cl, err := c.defaultFlags.StartTracing()
306 if err != nil {
307 fmt.Fprintf(a.GetErr(), "%s: %s\n", a.GetName(), err)
308 return 1
309 }
310 defer cl.Close()
311 if err := c.main(a, args); err != nil {
312 fmt.Fprintf(a.GetErr(), "%s: %s\n", a.GetName(), err)
313 return 1
314 }
315 return 0
316 }
OLDNEW
« no previous file with comments | « no previous file | client/cmd/isolate/main.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698