OLD | NEW |
---|---|
(Empty) | |
1 // Copyright 2016 The LUCI Authors. All rights reserved. | |
2 // Use of this source code is governed under the Apache License, Version 2.0 | |
3 // that can be found in the LICENSE file. | |
4 | |
5 package main | |
6 | |
7 import ( | |
8 "bytes" | |
9 "errors" | |
10 "fmt" | |
11 "hash" | |
12 "io" | |
13 "io/ioutil" | |
14 "os" | |
15 "time" | |
16 | |
17 "github.com/luci/luci-go/client/internal/common" | |
18 //"github.com/luci/luci-go/client/isolate" | |
M-A Ruel
2016/06/08 20:52:36
please trim comments
| |
19 "github.com/luci/luci-go/client/isolatedclient" | |
20 "github.com/luci/luci-go/common/api/isolate/isolateservice/v1" | |
21 "github.com/luci/luci-go/common/archive/ar" | |
22 "github.com/luci/luci-go/common/dirtools" | |
23 "github.com/luci/luci-go/common/isolated" | |
24 //"github.com/luci/luci-go/common/units" | |
25 "github.com/maruel/subcommands" | |
26 ) | |
27 | |
28 type ReadSeekerCloser interface { | |
29 io.Reader | |
30 io.Seeker | |
31 // io.Closer | |
32 } | |
33 | |
34 type ToHash struct { | |
M-A Ruel
2016/06/08 20:52:36
these do not need to be exported
| |
35 path string | |
36 } | |
37 type ToCheck struct { | |
38 digest isolateservice.HandlersEndpointsV1Digest | |
39 name string | |
40 source ReadSeekerCloser | |
41 } | |
42 type ToPush struct { | |
43 state *isolatedclient.PushState | |
44 name string | |
45 source ReadSeekerCloser | |
46 } | |
47 | |
48 func HashFile(is isolatedclient.IsolateServer, _ common.Canceler, src <-chan *To Hash, dst chan<- *ToCheck) { | |
49 for tohash := range src { | |
50 fmt.Printf("hashing %s\n", tohash.path) | |
51 d, _ := isolated.HashFile(tohash.path) | |
52 f, _ := os.Open(tohash.path) | |
53 dst <- &ToCheck{ | |
54 digest: d, | |
55 source: f, | |
56 name: tohash.path, | |
57 } | |
58 } | |
59 close(dst) | |
60 } | |
61 | |
62 const CHECK_BATCH_SIZE = 20 | |
M-A Ruel
2016/06/08 20:52:36
I prefer const to be at the top for consistency, i
| |
63 | |
64 func ChckFile(is isolatedclient.IsolateServer, canceler common.Canceler, src <-c han *ToCheck, dst chan<- *ToPush) { | |
M-A Ruel
2016/06/08 20:52:36
Check?
| |
65 check_count := 0 | |
66 | |
67 pool := common.NewGoroutinePool(5, canceler) | |
68 defer func() { | |
69 _ = pool.Wait() | |
70 }() | |
71 | |
72 done := false | |
73 for !done { | |
74 var digests [CHECK_BATCH_SIZE]*isolateservice.HandlersEndpointsV 1Digest | |
75 var topush [CHECK_BATCH_SIZE]ToPush | |
76 | |
77 index := 0 | |
78 Loop: | |
79 for index < CHECK_BATCH_SIZE && !done { | |
80 select { | |
81 case tocheck, more := <-src: | |
82 if !more { | |
83 done = true | |
84 break Loop | |
85 } | |
86 digests[index] = &tocheck.digest | |
87 topush[index] = ToPush{state: nil, source: toche ck.source, name: tocheck.name} | |
88 index += 1 | |
89 case <-time.After(time.Millisecond * 10): | |
90 break Loop | |
91 } | |
92 } | |
93 | |
94 if index > 0 { | |
95 inner_count := check_count | |
96 inner_index := index | |
97 pool.Schedule(func() { | |
98 fmt.Printf("checking(%d) %d files\n", inner_coun t, inner_index) | |
99 pushstates, err := is.Contains(digests[:inner_in dex]) | |
100 if err != nil { | |
101 fmt.Printf("checking(%d) error: %s\n", i nner_count, err) | |
102 return | |
103 } | |
104 for j, state := range pushstates { | |
105 topush[j].state = state | |
106 if state != nil { | |
107 fmt.Printf("need to push(%d): %s \n", inner_count, topush[j].name) | |
108 dst <- &topush[j] | |
109 } else { | |
110 fmt.Printf("skipping(%d): %s\n", inner_count, topush[j].name) | |
111 // sources[j].Close() | |
112 } | |
113 } | |
114 }, func() {}) | |
115 check_count += 1 | |
116 } | |
117 } | |
118 _ = pool.Wait() | |
119 close(dst) | |
M-A Ruel
2016/06/08 20:52:36
don't close a channel handed in, have the caller c
| |
120 } | |
121 | |
122 func PushFile(is isolatedclient.IsolateServer, canceler common.Canceler, src <-c han *ToPush, dst chan<- bool) { | |
123 pool := common.NewGoroutinePool(100, canceler) | |
124 defer func() { | |
125 _ = pool.Wait() | |
126 }() | |
127 | |
128 for topush := range src { | |
129 pool.Schedule(func() { | |
130 fmt.Printf("pushing: %s\n", topush.name) | |
131 err := is.Push(topush.state, func() (io.ReadCloser, erro r) { | |
132 topush.source.Seek(0, 0) | |
133 return ioutil.NopCloser(topush.source), nil | |
134 }) | |
135 if err != nil { | |
136 fmt.Println("pushing err:", err) | |
137 } else { | |
138 fmt.Println("pushed:", topush.state) | |
139 } | |
140 // topush.source.Close() | |
141 }, func() {}) | |
142 } | |
143 _ = pool.Wait() | |
144 close(dst) | |
145 } | |
146 | |
147 // --- | |
148 type SmallFilesCollection struct { | |
149 index int | |
150 buffer *bytes.Buffer | |
151 hash hash.Hash | |
152 ar *ar.Writer | |
153 } | |
154 | |
155 func NewSmallFilesCollection(index int) *SmallFilesCollection { | |
156 var o SmallFilesCollection | |
157 o.index = index | |
158 o.buffer = new(bytes.Buffer) | |
159 o.hash = isolated.GetHash() | |
160 | |
161 var w io.Writer = o.buffer | |
162 w = io.MultiWriter(w, o.hash) | |
163 o.ar = ar.NewWriter(w) | |
164 return &o | |
165 } | |
166 | |
167 func (b SmallFilesCollection) RequestCheck(dst chan<- *ToCheck) { | |
168 fmt.Printf("rotating smallfilescollection-%d (%d bytes)\n", b.index, b.b uffer.Len()) | |
169 dst <- &ToCheck{ | |
170 digest: isolateservice.HandlersEndpointsV1Digest{ | |
171 Digest: string(isolated.Sum(b.hash)), | |
172 IsIsolated: false, | |
173 Size: int64(b.buffer.Len()), | |
174 }, | |
175 source: bytes.NewReader(b.buffer.Bytes()), | |
176 name: fmt.Sprintf("smallfilescollection-%d", b.index), | |
177 } | |
178 } | |
179 | |
180 // | |
181 | |
182 const SMALLFILES_MAXSIZE = 1024 * 64 // 64kbytes | |
183 const SMALLFILES_AR_MAXSIZE = 1024 * 1024 * 100 // 100MBytes | |
184 | |
185 type SmallFilesWalkObserver struct { | |
186 trim string | |
187 chck_chan chan<- *ToCheck | |
188 smallfiles_buffer *SmallFilesCollection | |
189 largefiles_queue []string | |
190 } | |
191 | |
192 func NewSmallFilesWalkObserver(trim string, chck_chan chan<- *ToCheck) *SmallFil esWalkObserver { | |
193 return &SmallFilesWalkObserver{ | |
194 trim: trim, | |
195 chck_chan: chck_chan, | |
196 smallfiles_buffer: NewSmallFilesCollection(0), | |
197 largefiles_queue: make([]string, 0), | |
198 } | |
199 } | |
200 | |
201 func (s *SmallFilesWalkObserver) SmallFile(name string, alldata []byte) { | |
202 s.smallfiles_buffer.ar.Add(name[len(s.trim)+1:], alldata) | |
203 if s.smallfiles_buffer.buffer.Len() > SMALLFILES_AR_MAXSIZE { | |
204 s.smallfiles_buffer.RequestCheck(s.chck_chan) | |
205 s.smallfiles_buffer = NewSmallFilesCollection(s.smallfiles_buffe r.index + 1) | |
206 if s.smallfiles_buffer.buffer.Len() > 100 { | |
207 panic("Ahh!") | |
208 } | |
209 } | |
210 } | |
211 | |
212 func (s *SmallFilesWalkObserver) LargeFile(name string) { | |
213 s.largefiles_queue = append(s.largefiles_queue, name) | |
214 } | |
215 | |
216 func (s *SmallFilesWalkObserver) Error(path string, err error) { | |
217 fmt.Println(path, err) | |
218 } | |
219 | |
220 func upload(is isolatedclient.IsolateServer, path string) { | |
221 hash_chan := make(chan *ToHash, 10) | |
222 chck_chan := make(chan *ToCheck, 1) | |
223 push_chan := make(chan *ToPush, 10) | |
224 done_chan := make(chan bool) | |
M-A Ruel
2016/06/08 20:52:36
technically, you want a sync.WaitGroup so in each
| |
225 | |
226 canceler := common.NewCanceler() | |
227 | |
228 go HashFile(is, canceler, hash_chan, chck_chan) | |
M-A Ruel
2016/06/08 20:52:36
- these do not need to be exported
- I prefer the
| |
229 go ChckFile(is, canceler, chck_chan, push_chan) | |
230 go PushFile(is, canceler, push_chan, done_chan) | |
231 | |
232 obs := NewSmallFilesWalkObserver(path, chck_chan) | |
233 dirtools.WalkNoStat(path, SMALLFILES_MAXSIZE, obs) | |
234 obs.smallfiles_buffer.RequestCheck(obs.chck_chan) | |
235 | |
236 for _, name := range obs.largefiles_queue { | |
237 hash_chan <- &ToHash{name} | |
238 } | |
239 | |
240 close(hash_chan) | |
241 <-done_chan | |
242 } | |
243 | |
244 var cmdFastArchive = &subcommands.Command{ | |
245 UsageLine: "fastarchive <options>", | |
246 ShortDesc: "creates a .isolated file and uploads the tree to an isolate server.", | |
247 LongDesc: "All the files listed in the .isolated file are put in the is olate server cache via isolateserver.py.", | |
248 CommandRun: func() subcommands.CommandRun { | |
249 c := fastArchiveRun{} | |
250 c.commonServerFlags.Init() | |
251 c.isolateFlags.Init(&c.Flags) | |
252 return &c | |
253 }, | |
254 } | |
255 | |
256 type fastArchiveRun struct { | |
257 commonServerFlags | |
258 isolateFlags | |
259 } | |
260 | |
261 func (c *fastArchiveRun) Parse(a subcommands.Application, args []string) error { | |
262 if err := c.commonServerFlags.Parse(); err != nil { | |
263 return err | |
264 } | |
265 cwd, err := os.Getwd() | |
266 if err != nil { | |
267 return err | |
268 } | |
269 if err := c.isolateFlags.Parse(cwd, RequireIsolatedFile); err != nil { | |
270 return err | |
271 } | |
272 if len(args) != 0 { | |
273 return errors.New("position arguments not expected") | |
274 } | |
275 return nil | |
276 } | |
277 | |
278 func (c *fastArchiveRun) main(a subcommands.Application, args []string) error { | |
279 /* | |
280 out := os.Stdout | |
281 prefix := "\n" | |
282 if c.defaultFlags.Quiet { | |
283 out = nil | |
284 prefix = "" | |
285 } | |
286 start := time.Now() | |
287 */ | |
288 client, err := c.createAuthClient() | |
289 if err != nil { | |
290 return err | |
291 } | |
292 | |
293 is := isolatedclient.New(client, c.isolatedFlags.ServerURL, c.isolatedFl ags.Namespace) | |
294 fmt.Println(c.Isolate) | |
295 upload(is, c.Isolate) | |
296 | |
297 return nil | |
298 } | |
299 | |
300 func (c *fastArchiveRun) Run(a subcommands.Application, args []string) int { | |
301 if err := c.Parse(a, args); err != nil { | |
302 fmt.Fprintf(a.GetErr(), "%s: %s\n", a.GetName(), err) | |
303 return 1 | |
304 } | |
305 cl, err := c.defaultFlags.StartTracing() | |
306 if err != nil { | |
307 fmt.Fprintf(a.GetErr(), "%s: %s\n", a.GetName(), err) | |
308 return 1 | |
309 } | |
310 defer cl.Close() | |
311 if err := c.main(a, args); err != nil { | |
312 fmt.Fprintf(a.GetErr(), "%s: %s\n", a.GetName(), err) | |
313 return 1 | |
314 } | |
315 return 0 | |
316 } | |
OLD | NEW |