Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(26)

Side by Side Diff: logdog/common/storage/archive/logdog_archive_test/main.go

Issue 2422393002: Fix sparse index handling, add index params. (Closed)
Patch Set: Created 4 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2016 The LUCI Authors. All rights reserved.
dnj 2016/10/17 22:12:12 DON'T review this tool too closely. It's a useful
nodir 2016/10/17 22:24:06 why is it in common? it seems it should be somewhe
dnj 2016/10/17 22:26:19 It's in logdog/common.
2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file.
4
5 // Package main implements a simple CLI tool to load and interact with Google
6 // Storage archived data.
7 package main
8
9 import (
10 "bytes"
11 "flag"
12 "io"
13 "io/ioutil"
14 "math"
15 "os"
16
17 "github.com/luci/luci-go/client/authcli"
18 "github.com/luci/luci-go/common/auth"
19 "github.com/luci/luci-go/common/cli"
20 "github.com/luci/luci-go/common/data/recordio"
21 "github.com/luci/luci-go/common/errors"
22 "github.com/luci/luci-go/common/gcloud/gs"
23 log "github.com/luci/luci-go/common/logging"
24 "github.com/luci/luci-go/common/logging/gologger"
25 "github.com/luci/luci-go/logdog/api/logpb"
26 "github.com/luci/luci-go/logdog/common/storage"
27 "github.com/luci/luci-go/logdog/common/storage/archive"
28 "github.com/luci/luci-go/logdog/common/types"
29
30 "github.com/golang/protobuf/proto"
31 "github.com/maruel/subcommands"
32 "golang.org/x/net/context"
33 )
34
35 ////////////////////////////////////////////////////////////////////////////////
36 // main
37 ////////////////////////////////////////////////////////////////////////////////
38
39 type application struct {
40 cli.Application
41
42 authOpts auth.Options
43 }
44
45 func getApplication(base subcommands.Application) (*application, context.Context ) {
46 app := base.(*application)
47 return app, app.Context(context.Background())
48 }
49
50 func (a *application) getGSClient(c context.Context) (gs.Client, error) {
51 authenticator := auth.NewAuthenticator(c, auth.OptionalLogin, a.authOpts )
52 transport, err := authenticator.Transport()
53 if err != nil {
54 return nil, errors.Annotate(err).Reason("failed to get auth tran sport").Err()
55 }
56 return gs.NewProdClient(c, transport)
57 }
58
59 func mainImpl(c context.Context, args []string) int {
60 c = gologger.StdConfig.Use(c)
61
62 logConfig := log.Config{
63 Level: log.Warning,
64 }
65
66 authOpts := auth.Options{
67 Scopes: append([]string{auth.OAuthScopeEmail}, gs.ReadOnlyScopes ...),
68 }
69 var authFlags authcli.Flags
70
71 a := application{
72 Application: cli.Application{
73 Name: "Archive Storage Utility",
74 Title: "Archive Storage Utility",
75 Context: func(c context.Context) context.Context {
76 // Install configured logger.
77 c = logConfig.Set(gologger.StdConfig.Use(c))
78 return c
79 },
80
81 Commands: []*subcommands.Command{
82 subcommands.CmdHelp,
83
84 &subcommandDumpIndex,
85 &subcommandDumpStream,
86 &subcommandGet,
87 &subcommandTail,
88
89 authcli.SubcommandLogin(authOpts, "auth-login"),
90 authcli.SubcommandLogout(authOpts, "auth-logout" ),
91 authcli.SubcommandInfo(authOpts, "auth-info"),
92 },
93 },
94 }
95
96 fs := flag.NewFlagSet("flags", flag.ExitOnError)
97 logConfig.AddFlags(fs)
98 authFlags.Register(fs, authOpts)
99 fs.Parse(args)
100
101 // Process authentication options.
102 var err error
103 a.authOpts, err = authFlags.Options()
104 if err != nil {
105 log.WithError(err).Errorf(c, "Failed to create auth options.")
106 return 1
107 }
108
109 // Execute our subcommand.
110 return subcommands.Run(&a, fs.Args())
111 }
112
113 func main() {
114 os.Exit(mainImpl(context.Background(), os.Args[1:]))
115 }
116
117 ////////////////////////////////////////////////////////////////////////////////
118 // Subcommand: dump-index
119 ////////////////////////////////////////////////////////////////////////////////
120
121 type cmdRunDumpIndex struct {
122 subcommands.CommandRunBase
123
124 path string
125 }
126
127 var subcommandDumpIndex = subcommands.Command{
128 UsageLine: "dump-index",
129 ShortDesc: "Dumps the contents of an index protobuf.",
130 CommandRun: func() subcommands.CommandRun {
131 var cmd cmdRunDumpIndex
132
133 cmd.Flags.StringVar(&cmd.path, "path", "", "Google Storage path to the index protobuf.")
134
135 return &cmd
136 },
137 }
138
139 func (cmd *cmdRunDumpIndex) Run(baseApp subcommands.Application, args []string) int {
140 app, c := getApplication(baseApp)
141
142 if cmd.path == "" {
143 log.Errorf(c, "Missing required argument (-path).")
144 return 1
145 }
146 path := gs.Path(cmd.path)
147
148 client, err := app.getGSClient(c)
149 if err != nil {
150 log.WithError(err).Errorf(c, "Failed to create GS client.")
151 return 1
152 }
153 defer client.Close()
154
155 reader, err := client.NewReader(path, 0, -1)
156 if err != nil {
157 log.WithError(err).Errorf(c, "Failed to create GS reader.")
158 return 1
159 }
160 defer reader.Close()
161
162 data, err := ioutil.ReadAll(reader)
163 if err != nil {
164 log.WithError(err).Errorf(c, "Failed to read index data from GS. ")
165 return 1
166 }
167 log.Debugf(c, "Loaded %d byte(s).", len(data))
168
169 var index logpb.LogIndex
170 if err := unmarshalAndDump(c, os.Stdout, data, &index); err != nil {
171 log.WithError(err).Errorf(c, "Failed to dump index protobuf.")
172 return 1
173 }
174 return 0
175 }
176
177 func unmarshalAndDump(c context.Context, out io.Writer, data []byte, msg proto.M essage) error {
178 if err := proto.Unmarshal(data, msg); err != nil {
179 log.WithError(err).Errorf(c, "Failed to unmarshal protobuf.")
180 return err
181 }
182 if err := proto.MarshalText(out, msg); err != nil {
183 log.WithError(err).Errorf(c, "Failed to dump protobuf to output. ")
184 return err
185 }
186 return nil
187 }
188
189 ////////////////////////////////////////////////////////////////////////////////
190 // Subcommand: dump-stream
191 ////////////////////////////////////////////////////////////////////////////////
192
193 type cmdRunDumpStream struct {
194 subcommands.CommandRunBase
195
196 path string
197 }
198
199 var subcommandDumpStream = subcommands.Command{
200 UsageLine: "dump-stream",
201 ShortDesc: "Dumps the contents of a log stream protobuf.",
202 CommandRun: func() subcommands.CommandRun {
203 var cmd cmdRunDumpStream
204
205 cmd.Flags.StringVar(&cmd.path, "path", "", "Google Storage path to the stream protobuf.")
206
207 return &cmd
208 },
209 }
210
211 func (cmd *cmdRunDumpStream) Run(baseApp subcommands.Application, args []string) int {
212 app, c := getApplication(baseApp)
213
214 if cmd.path == "" {
215 log.Errorf(c, "Missing required argument (-path).")
216 return 1
217 }
218 path := gs.Path(cmd.path)
219
220 client, err := app.getGSClient(c)
221 if err != nil {
222 log.WithError(err).Errorf(c, "Failed to create GS client.")
223 return 1
224 }
225 defer client.Close()
226
227 reader, err := client.NewReader(path, 0, 0)
228 if err != nil {
229 log.WithError(err).Errorf(c, "Failed to create GS reader.")
230 return 1
231 }
232 defer reader.Close()
233
234 // Re-use the same buffer for each RecordIO frame.
235 var (
236 frameReader = recordio.NewReader(reader, math.MaxInt64)
237 frameIndex = 0
238 buf bytes.Buffer
239
240 entry logpb.LogEntry
241 )
242 for {
243 frameSize, r, err := frameReader.ReadFrame()
244 switch err {
245 case nil:
246 break
247
248 case io.EOF:
249 log.Debugf(c, "Encountered EOF.")
250 return 0
251
252 default:
253 log.Fields{
254 log.ErrorKey: err,
255 "index": frameIndex,
256 }.Errorf(c, "Encountered error reading log stream.")
257 return 1
258 }
259
260 buf.Reset()
261 buf.Grow(int(frameSize))
262
263 if _, err := buf.ReadFrom(r); err != nil {
264 log.Fields{
265 log.ErrorKey: err,
266 "index": frameIndex,
267 }.Errorf(c, "Failed to read log stream frame.")
268 return 1
269 }
270
271 log.Fields{
272 "index": frameIndex,
273 "size": buf.Len(),
274 }.Debugf(c, "Read frame.")
275 frameIndex++
276
277 if err := unmarshalAndDump(c, os.Stdout, buf.Bytes(), &entry); e rr != nil {
278 log.Fields{
279 log.ErrorKey: err,
280 "index": frameIndex,
281 }.Errorf(c, "Failed to dump log entry descriptor.")
282 return 1
283 }
284 }
285 }
286
287 ////////////////////////////////////////////////////////////////////////////////
288 // Subcommand: get
289 ////////////////////////////////////////////////////////////////////////////////
290
291 type cmdRunGet struct {
292 subcommands.CommandRunBase
293
294 indexPath string
295 streamPath string
296
297 index int
298 limit int
299 }
300
301 var subcommandGet = subcommands.Command{
302 UsageLine: "get",
303 ShortDesc: "Performs a Storage Get operation.",
304 CommandRun: func() subcommands.CommandRun {
305 var cmd cmdRunGet
306
307 cmd.Flags.StringVar(&cmd.indexPath, "index-path", "", "Google St orage path to the index protobuf.")
308 cmd.Flags.StringVar(&cmd.streamPath, "stream-path", "", "Google Storage path to the stream protobuf.")
309 cmd.Flags.IntVar(&cmd.index, "index", 0, "The index to fetch.")
310 cmd.Flags.IntVar(&cmd.limit, "limit", 0, "The log entry limit.")
311
312 return &cmd
313 },
314 }
315
316 func (cmd *cmdRunGet) Run(baseApp subcommands.Application, args []string) int {
317 app, c := getApplication(baseApp)
318
319 switch {
320 case cmd.indexPath == "":
321 log.Errorf(c, "Missing required argument (-index-path).")
322 return 1
323 case cmd.streamPath == "":
324 log.Errorf(c, "Missing required argument (-stream-path).")
325 return 1
326 }
327
328 client, err := app.getGSClient(c)
329 if err != nil {
330 log.WithError(err).Errorf(c, "Failed to create GS client.")
331 return 1
332 }
333 defer client.Close()
334
335 stClient, err := archive.New(c, archive.Options{
336 IndexURL: cmd.indexPath,
337 StreamURL: cmd.streamPath,
338 Client: client,
339 })
340 if err != nil {
341 log.WithError(err).Errorf(c, "Failed to create storage client.")
342 return 1
343 }
344 defer stClient.Close()
345
346 var innerErr error
347 err = stClient.Get(storage.GetRequest{
348 Index: types.MessageIndex(cmd.index),
349 Limit: cmd.limit,
350 }, func(idx types.MessageIndex, data []byte) bool {
351 log.Fields{
352 "index": idx,
353 }.Infof(c, "Fetched log entry.")
354
355 var log logpb.LogEntry
356 if innerErr = unmarshalAndDump(c, os.Stdout, data, &log); innerE rr != nil {
357 return false
358 }
359 return true
360 })
361 switch {
362 case innerErr != nil:
363 log.WithError(innerErr).Errorf(c, "Failed to process fetched log entries.")
364 return 1
365
366 case err != nil:
367 log.WithError(err).Errorf(c, "Failed to Get log entries.")
368 return 1
369
370 default:
371 return 0
372 }
373 }
374
375 ////////////////////////////////////////////////////////////////////////////////
376 // Subcommand: tail
377 ////////////////////////////////////////////////////////////////////////////////
378
379 type cmdRunTail struct {
380 subcommands.CommandRunBase
381
382 indexPath string
383 streamPath string
384 }
385
386 var subcommandTail = subcommands.Command{
387 UsageLine: "tail",
388 ShortDesc: "Performs a Storage Tail operation.",
389 CommandRun: func() subcommands.CommandRun {
390 var cmd cmdRunTail
391
392 cmd.Flags.StringVar(&cmd.indexPath, "index-path", "", "Google St orage path to the index protobuf.")
393 cmd.Flags.StringVar(&cmd.streamPath, "stream-path", "", "Google Storage path to the stream protobuf.")
394
395 return &cmd
396 },
397 }
398
399 func (cmd *cmdRunTail) Run(baseApp subcommands.Application, args []string) int {
400 app, c := getApplication(baseApp)
401
402 switch {
403 case cmd.indexPath == "":
404 log.Errorf(c, "Missing required argument (-index-path).")
405 return 1
406 case cmd.streamPath == "":
407 log.Errorf(c, "Missing required argument (-stream-path).")
408 return 1
409 }
410
411 client, err := app.getGSClient(c)
412 if err != nil {
413 log.WithError(err).Errorf(c, "Failed to create GS client.")
414 return 1
415 }
416 defer client.Close()
417
418 stClient, err := archive.New(c, archive.Options{
419 IndexURL: cmd.indexPath,
420 StreamURL: cmd.streamPath,
421 Client: client,
422 })
423 if err != nil {
424 log.WithError(err).Errorf(c, "Failed to create storage client.")
425 return 1
426 }
427 defer stClient.Close()
428
429 data, idx, err := stClient.Tail("", "")
430 if err != nil {
431 log.WithError(err).Errorf(c, "Failed to Tail log entries.")
432 return 1
433 }
434
435 if data == nil {
436 log.Infof(c, "No log data to tail.")
437 return 0
438 }
439
440 log.Fields{
441 "index": idx,
442 "size": len(data),
443 }.Debugf(c, "Dumping tail entry.")
444 var entry logpb.LogEntry
445 if err := unmarshalAndDump(c, os.Stdout, data, &entry); err != nil {
446 log.WithError(err).Errorf(c, "Failed to dump tail entry.")
447 return 1
448 }
449 return 0
450 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698