Chromium Code Reviews| OLD | NEW |
|---|---|
| (Empty) | |
| 1 // Copyright 2016 The LUCI Authors. All rights reserved. | |
|
dnj
2016/10/17 22:12:12
DON'T review this tool too closely. It's a useful
nodir
2016/10/17 22:24:06
why is it in common? it seems it should be somewhe
dnj
2016/10/17 22:26:19
It's in logdog/common.
| |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | |
| 3 // that can be found in the LICENSE file. | |
| 4 | |
| 5 // Package main implements a simple CLI tool to load and interact with Google | |
| 6 // Storage archived data. | |
| 7 package main | |
| 8 | |
| 9 import ( | |
| 10 "bytes" | |
| 11 "flag" | |
| 12 "io" | |
| 13 "io/ioutil" | |
| 14 "math" | |
| 15 "os" | |
| 16 | |
| 17 "github.com/luci/luci-go/client/authcli" | |
| 18 "github.com/luci/luci-go/common/auth" | |
| 19 "github.com/luci/luci-go/common/cli" | |
| 20 "github.com/luci/luci-go/common/data/recordio" | |
| 21 "github.com/luci/luci-go/common/errors" | |
| 22 "github.com/luci/luci-go/common/gcloud/gs" | |
| 23 log "github.com/luci/luci-go/common/logging" | |
| 24 "github.com/luci/luci-go/common/logging/gologger" | |
| 25 "github.com/luci/luci-go/logdog/api/logpb" | |
| 26 "github.com/luci/luci-go/logdog/common/storage" | |
| 27 "github.com/luci/luci-go/logdog/common/storage/archive" | |
| 28 "github.com/luci/luci-go/logdog/common/types" | |
| 29 | |
| 30 "github.com/golang/protobuf/proto" | |
| 31 "github.com/maruel/subcommands" | |
| 32 "golang.org/x/net/context" | |
| 33 ) | |
| 34 | |
| 35 //////////////////////////////////////////////////////////////////////////////// | |
| 36 // main | |
| 37 //////////////////////////////////////////////////////////////////////////////// | |
| 38 | |
| 39 type application struct { | |
| 40 cli.Application | |
| 41 | |
| 42 authOpts auth.Options | |
| 43 } | |
| 44 | |
| 45 func getApplication(base subcommands.Application) (*application, context.Context ) { | |
| 46 app := base.(*application) | |
| 47 return app, app.Context(context.Background()) | |
| 48 } | |
| 49 | |
| 50 func (a *application) getGSClient(c context.Context) (gs.Client, error) { | |
| 51 authenticator := auth.NewAuthenticator(c, auth.OptionalLogin, a.authOpts ) | |
| 52 transport, err := authenticator.Transport() | |
| 53 if err != nil { | |
| 54 return nil, errors.Annotate(err).Reason("failed to get auth tran sport").Err() | |
| 55 } | |
| 56 return gs.NewProdClient(c, transport) | |
| 57 } | |
| 58 | |
| 59 func mainImpl(c context.Context, args []string) int { | |
| 60 c = gologger.StdConfig.Use(c) | |
| 61 | |
| 62 logConfig := log.Config{ | |
| 63 Level: log.Warning, | |
| 64 } | |
| 65 | |
| 66 authOpts := auth.Options{ | |
| 67 Scopes: append([]string{auth.OAuthScopeEmail}, gs.ReadOnlyScopes ...), | |
| 68 } | |
| 69 var authFlags authcli.Flags | |
| 70 | |
| 71 a := application{ | |
| 72 Application: cli.Application{ | |
| 73 Name: "Archive Storage Utility", | |
| 74 Title: "Archive Storage Utility", | |
| 75 Context: func(c context.Context) context.Context { | |
| 76 // Install configured logger. | |
| 77 c = logConfig.Set(gologger.StdConfig.Use(c)) | |
| 78 return c | |
| 79 }, | |
| 80 | |
| 81 Commands: []*subcommands.Command{ | |
| 82 subcommands.CmdHelp, | |
| 83 | |
| 84 &subcommandDumpIndex, | |
| 85 &subcommandDumpStream, | |
| 86 &subcommandGet, | |
| 87 &subcommandTail, | |
| 88 | |
| 89 authcli.SubcommandLogin(authOpts, "auth-login"), | |
| 90 authcli.SubcommandLogout(authOpts, "auth-logout" ), | |
| 91 authcli.SubcommandInfo(authOpts, "auth-info"), | |
| 92 }, | |
| 93 }, | |
| 94 } | |
| 95 | |
| 96 fs := flag.NewFlagSet("flags", flag.ExitOnError) | |
| 97 logConfig.AddFlags(fs) | |
| 98 authFlags.Register(fs, authOpts) | |
| 99 fs.Parse(args) | |
| 100 | |
| 101 // Process authentication options. | |
| 102 var err error | |
| 103 a.authOpts, err = authFlags.Options() | |
| 104 if err != nil { | |
| 105 log.WithError(err).Errorf(c, "Failed to create auth options.") | |
| 106 return 1 | |
| 107 } | |
| 108 | |
| 109 // Execute our subcommand. | |
| 110 return subcommands.Run(&a, fs.Args()) | |
| 111 } | |
| 112 | |
| 113 func main() { | |
| 114 os.Exit(mainImpl(context.Background(), os.Args[1:])) | |
| 115 } | |
| 116 | |
| 117 //////////////////////////////////////////////////////////////////////////////// | |
| 118 // Subcommand: dump-index | |
| 119 //////////////////////////////////////////////////////////////////////////////// | |
| 120 | |
| 121 type cmdRunDumpIndex struct { | |
| 122 subcommands.CommandRunBase | |
| 123 | |
| 124 path string | |
| 125 } | |
| 126 | |
| 127 var subcommandDumpIndex = subcommands.Command{ | |
| 128 UsageLine: "dump-index", | |
| 129 ShortDesc: "Dumps the contents of an index protobuf.", | |
| 130 CommandRun: func() subcommands.CommandRun { | |
| 131 var cmd cmdRunDumpIndex | |
| 132 | |
| 133 cmd.Flags.StringVar(&cmd.path, "path", "", "Google Storage path to the index protobuf.") | |
| 134 | |
| 135 return &cmd | |
| 136 }, | |
| 137 } | |
| 138 | |
| 139 func (cmd *cmdRunDumpIndex) Run(baseApp subcommands.Application, args []string) int { | |
| 140 app, c := getApplication(baseApp) | |
| 141 | |
| 142 if cmd.path == "" { | |
| 143 log.Errorf(c, "Missing required argument (-path).") | |
| 144 return 1 | |
| 145 } | |
| 146 path := gs.Path(cmd.path) | |
| 147 | |
| 148 client, err := app.getGSClient(c) | |
| 149 if err != nil { | |
| 150 log.WithError(err).Errorf(c, "Failed to create GS client.") | |
| 151 return 1 | |
| 152 } | |
| 153 defer client.Close() | |
| 154 | |
| 155 reader, err := client.NewReader(path, 0, -1) | |
| 156 if err != nil { | |
| 157 log.WithError(err).Errorf(c, "Failed to create GS reader.") | |
| 158 return 1 | |
| 159 } | |
| 160 defer reader.Close() | |
| 161 | |
| 162 data, err := ioutil.ReadAll(reader) | |
| 163 if err != nil { | |
| 164 log.WithError(err).Errorf(c, "Failed to read index data from GS. ") | |
| 165 return 1 | |
| 166 } | |
| 167 log.Debugf(c, "Loaded %d byte(s).", len(data)) | |
| 168 | |
| 169 var index logpb.LogIndex | |
| 170 if err := unmarshalAndDump(c, os.Stdout, data, &index); err != nil { | |
| 171 log.WithError(err).Errorf(c, "Failed to dump index protobuf.") | |
| 172 return 1 | |
| 173 } | |
| 174 return 0 | |
| 175 } | |
| 176 | |
| 177 func unmarshalAndDump(c context.Context, out io.Writer, data []byte, msg proto.M essage) error { | |
| 178 if err := proto.Unmarshal(data, msg); err != nil { | |
| 179 log.WithError(err).Errorf(c, "Failed to unmarshal protobuf.") | |
| 180 return err | |
| 181 } | |
| 182 if err := proto.MarshalText(out, msg); err != nil { | |
| 183 log.WithError(err).Errorf(c, "Failed to dump protobuf to output. ") | |
| 184 return err | |
| 185 } | |
| 186 return nil | |
| 187 } | |
| 188 | |
| 189 //////////////////////////////////////////////////////////////////////////////// | |
| 190 // Subcommand: dump-stream | |
| 191 //////////////////////////////////////////////////////////////////////////////// | |
| 192 | |
| 193 type cmdRunDumpStream struct { | |
| 194 subcommands.CommandRunBase | |
| 195 | |
| 196 path string | |
| 197 } | |
| 198 | |
| 199 var subcommandDumpStream = subcommands.Command{ | |
| 200 UsageLine: "dump-stream", | |
| 201 ShortDesc: "Dumps the contents of a log stream protobuf.", | |
| 202 CommandRun: func() subcommands.CommandRun { | |
| 203 var cmd cmdRunDumpStream | |
| 204 | |
| 205 cmd.Flags.StringVar(&cmd.path, "path", "", "Google Storage path to the stream protobuf.") | |
| 206 | |
| 207 return &cmd | |
| 208 }, | |
| 209 } | |
| 210 | |
| 211 func (cmd *cmdRunDumpStream) Run(baseApp subcommands.Application, args []string) int { | |
| 212 app, c := getApplication(baseApp) | |
| 213 | |
| 214 if cmd.path == "" { | |
| 215 log.Errorf(c, "Missing required argument (-path).") | |
| 216 return 1 | |
| 217 } | |
| 218 path := gs.Path(cmd.path) | |
| 219 | |
| 220 client, err := app.getGSClient(c) | |
| 221 if err != nil { | |
| 222 log.WithError(err).Errorf(c, "Failed to create GS client.") | |
| 223 return 1 | |
| 224 } | |
| 225 defer client.Close() | |
| 226 | |
| 227 reader, err := client.NewReader(path, 0, 0) | |
| 228 if err != nil { | |
| 229 log.WithError(err).Errorf(c, "Failed to create GS reader.") | |
| 230 return 1 | |
| 231 } | |
| 232 defer reader.Close() | |
| 233 | |
| 234 // Re-use the same buffer for each RecordIO frame. | |
| 235 var ( | |
| 236 frameReader = recordio.NewReader(reader, math.MaxInt64) | |
| 237 frameIndex = 0 | |
| 238 buf bytes.Buffer | |
| 239 | |
| 240 entry logpb.LogEntry | |
| 241 ) | |
| 242 for { | |
| 243 frameSize, r, err := frameReader.ReadFrame() | |
| 244 switch err { | |
| 245 case nil: | |
| 246 break | |
| 247 | |
| 248 case io.EOF: | |
| 249 log.Debugf(c, "Encountered EOF.") | |
| 250 return 0 | |
| 251 | |
| 252 default: | |
| 253 log.Fields{ | |
| 254 log.ErrorKey: err, | |
| 255 "index": frameIndex, | |
| 256 }.Errorf(c, "Encountered error reading log stream.") | |
| 257 return 1 | |
| 258 } | |
| 259 | |
| 260 buf.Reset() | |
| 261 buf.Grow(int(frameSize)) | |
| 262 | |
| 263 if _, err := buf.ReadFrom(r); err != nil { | |
| 264 log.Fields{ | |
| 265 log.ErrorKey: err, | |
| 266 "index": frameIndex, | |
| 267 }.Errorf(c, "Failed to read log stream frame.") | |
| 268 return 1 | |
| 269 } | |
| 270 | |
| 271 log.Fields{ | |
| 272 "index": frameIndex, | |
| 273 "size": buf.Len(), | |
| 274 }.Debugf(c, "Read frame.") | |
| 275 frameIndex++ | |
| 276 | |
| 277 if err := unmarshalAndDump(c, os.Stdout, buf.Bytes(), &entry); e rr != nil { | |
| 278 log.Fields{ | |
| 279 log.ErrorKey: err, | |
| 280 "index": frameIndex, | |
| 281 }.Errorf(c, "Failed to dump log entry descriptor.") | |
| 282 return 1 | |
| 283 } | |
| 284 } | |
| 285 } | |
| 286 | |
| 287 //////////////////////////////////////////////////////////////////////////////// | |
| 288 // Subcommand: get | |
| 289 //////////////////////////////////////////////////////////////////////////////// | |
| 290 | |
| 291 type cmdRunGet struct { | |
| 292 subcommands.CommandRunBase | |
| 293 | |
| 294 indexPath string | |
| 295 streamPath string | |
| 296 | |
| 297 index int | |
| 298 limit int | |
| 299 } | |
| 300 | |
| 301 var subcommandGet = subcommands.Command{ | |
| 302 UsageLine: "get", | |
| 303 ShortDesc: "Performs a Storage Get operation.", | |
| 304 CommandRun: func() subcommands.CommandRun { | |
| 305 var cmd cmdRunGet | |
| 306 | |
| 307 cmd.Flags.StringVar(&cmd.indexPath, "index-path", "", "Google St orage path to the index protobuf.") | |
| 308 cmd.Flags.StringVar(&cmd.streamPath, "stream-path", "", "Google Storage path to the stream protobuf.") | |
| 309 cmd.Flags.IntVar(&cmd.index, "index", 0, "The index to fetch.") | |
| 310 cmd.Flags.IntVar(&cmd.limit, "limit", 0, "The log entry limit.") | |
| 311 | |
| 312 return &cmd | |
| 313 }, | |
| 314 } | |
| 315 | |
| 316 func (cmd *cmdRunGet) Run(baseApp subcommands.Application, args []string) int { | |
| 317 app, c := getApplication(baseApp) | |
| 318 | |
| 319 switch { | |
| 320 case cmd.indexPath == "": | |
| 321 log.Errorf(c, "Missing required argument (-index-path).") | |
| 322 return 1 | |
| 323 case cmd.streamPath == "": | |
| 324 log.Errorf(c, "Missing required argument (-stream-path).") | |
| 325 return 1 | |
| 326 } | |
| 327 | |
| 328 client, err := app.getGSClient(c) | |
| 329 if err != nil { | |
| 330 log.WithError(err).Errorf(c, "Failed to create GS client.") | |
| 331 return 1 | |
| 332 } | |
| 333 defer client.Close() | |
| 334 | |
| 335 stClient, err := archive.New(c, archive.Options{ | |
| 336 IndexURL: cmd.indexPath, | |
| 337 StreamURL: cmd.streamPath, | |
| 338 Client: client, | |
| 339 }) | |
| 340 if err != nil { | |
| 341 log.WithError(err).Errorf(c, "Failed to create storage client.") | |
| 342 return 1 | |
| 343 } | |
| 344 defer stClient.Close() | |
| 345 | |
| 346 var innerErr error | |
| 347 err = stClient.Get(storage.GetRequest{ | |
| 348 Index: types.MessageIndex(cmd.index), | |
| 349 Limit: cmd.limit, | |
| 350 }, func(idx types.MessageIndex, data []byte) bool { | |
| 351 log.Fields{ | |
| 352 "index": idx, | |
| 353 }.Infof(c, "Fetched log entry.") | |
| 354 | |
| 355 var log logpb.LogEntry | |
| 356 if innerErr = unmarshalAndDump(c, os.Stdout, data, &log); innerE rr != nil { | |
| 357 return false | |
| 358 } | |
| 359 return true | |
| 360 }) | |
| 361 switch { | |
| 362 case innerErr != nil: | |
| 363 log.WithError(innerErr).Errorf(c, "Failed to process fetched log entries.") | |
| 364 return 1 | |
| 365 | |
| 366 case err != nil: | |
| 367 log.WithError(err).Errorf(c, "Failed to Get log entries.") | |
| 368 return 1 | |
| 369 | |
| 370 default: | |
| 371 return 0 | |
| 372 } | |
| 373 } | |
| 374 | |
| 375 //////////////////////////////////////////////////////////////////////////////// | |
| 376 // Subcommand: tail | |
| 377 //////////////////////////////////////////////////////////////////////////////// | |
| 378 | |
| 379 type cmdRunTail struct { | |
| 380 subcommands.CommandRunBase | |
| 381 | |
| 382 indexPath string | |
| 383 streamPath string | |
| 384 } | |
| 385 | |
| 386 var subcommandTail = subcommands.Command{ | |
| 387 UsageLine: "tail", | |
| 388 ShortDesc: "Performs a Storage Tail operation.", | |
| 389 CommandRun: func() subcommands.CommandRun { | |
| 390 var cmd cmdRunTail | |
| 391 | |
| 392 cmd.Flags.StringVar(&cmd.indexPath, "index-path", "", "Google St orage path to the index protobuf.") | |
| 393 cmd.Flags.StringVar(&cmd.streamPath, "stream-path", "", "Google Storage path to the stream protobuf.") | |
| 394 | |
| 395 return &cmd | |
| 396 }, | |
| 397 } | |
| 398 | |
| 399 func (cmd *cmdRunTail) Run(baseApp subcommands.Application, args []string) int { | |
| 400 app, c := getApplication(baseApp) | |
| 401 | |
| 402 switch { | |
| 403 case cmd.indexPath == "": | |
| 404 log.Errorf(c, "Missing required argument (-index-path).") | |
| 405 return 1 | |
| 406 case cmd.streamPath == "": | |
| 407 log.Errorf(c, "Missing required argument (-stream-path).") | |
| 408 return 1 | |
| 409 } | |
| 410 | |
| 411 client, err := app.getGSClient(c) | |
| 412 if err != nil { | |
| 413 log.WithError(err).Errorf(c, "Failed to create GS client.") | |
| 414 return 1 | |
| 415 } | |
| 416 defer client.Close() | |
| 417 | |
| 418 stClient, err := archive.New(c, archive.Options{ | |
| 419 IndexURL: cmd.indexPath, | |
| 420 StreamURL: cmd.streamPath, | |
| 421 Client: client, | |
| 422 }) | |
| 423 if err != nil { | |
| 424 log.WithError(err).Errorf(c, "Failed to create storage client.") | |
| 425 return 1 | |
| 426 } | |
| 427 defer stClient.Close() | |
| 428 | |
| 429 data, idx, err := stClient.Tail("", "") | |
| 430 if err != nil { | |
| 431 log.WithError(err).Errorf(c, "Failed to Tail log entries.") | |
| 432 return 1 | |
| 433 } | |
| 434 | |
| 435 if data == nil { | |
| 436 log.Infof(c, "No log data to tail.") | |
| 437 return 0 | |
| 438 } | |
| 439 | |
| 440 log.Fields{ | |
| 441 "index": idx, | |
| 442 "size": len(data), | |
| 443 }.Debugf(c, "Dumping tail entry.") | |
| 444 var entry logpb.LogEntry | |
| 445 if err := unmarshalAndDump(c, os.Stdout, data, &entry); err != nil { | |
| 446 log.WithError(err).Errorf(c, "Failed to dump tail entry.") | |
| 447 return 1 | |
| 448 } | |
| 449 return 0 | |
| 450 } | |
| OLD | NEW |