| Index: logdog/client/cli/subcommandCat.go
|
| diff --git a/logdog/client/cli/subcommandCat.go b/logdog/client/cli/subcommandCat.go
|
| index a832127b591a81875c2fe70021e6133b46ce81a3..a770eaef718bcd0e6e5038f0d67968cb625583b2 100644
|
| --- a/logdog/client/cli/subcommandCat.go
|
| +++ b/logdog/client/cli/subcommandCat.go
|
| @@ -5,13 +5,13 @@
|
| package cli
|
|
|
| import (
|
| - "errors"
|
| "io"
|
| "os"
|
| "strconv"
|
| "strings"
|
| "time"
|
|
|
| + "github.com/luci/luci-go/common/errors"
|
| "github.com/luci/luci-go/common/flag/flagenum"
|
| log "github.com/luci/luci-go/common/logging"
|
| "github.com/luci/luci-go/common/proto/google"
|
| @@ -21,7 +21,6 @@ import (
|
| "github.com/luci/luci-go/logdog/common/fetcher"
|
| "github.com/luci/luci-go/logdog/common/renderer"
|
| "github.com/luci/luci-go/logdog/common/types"
|
| - "github.com/luci/luci-go/luci_config/common/cfgtypes"
|
|
|
| "github.com/golang/protobuf/proto"
|
| "github.com/maruel/subcommands"
|
| @@ -94,9 +93,15 @@ func (cmd *catCommandRun) Run(scApp subcommands.Application, args []string, _ su
|
| return 1
|
| }
|
|
|
| - // Validate and construct our cat paths.
|
| - paths := make([]*streamPath, len(args))
|
| + // Validate and construct our cat addresses.
|
| + addrs := make([]*types.StreamAddr, len(args))
|
| for i, arg := range args {
|
| + // If the address parses as a URL, use it directly.
|
| + var err error
|
| + if addrs[i], err = types.ParseURL(arg); err == nil {
|
| + continue
|
| + }
|
| +
|
| // User-friendly: trim any leading or trailing slashes from the path.
|
| project, path, _, err := a.splitPath(arg)
|
| if err != nil {
|
| @@ -104,18 +109,25 @@ func (cmd *catCommandRun) Run(scApp subcommands.Application, args []string, _ su
|
| return 1
|
| }
|
|
|
| - sp := streamPath{project, types.StreamPath(path)}
|
| - if err := sp.path.Validate(); err != nil {
|
| + addr := types.StreamAddr{Project: project, Path: types.StreamPath(path)}
|
| + if err := addr.Path.Validate(); err != nil {
|
| log.Fields{
|
| log.ErrorKey: err,
|
| "index": i,
|
| - "project": sp.project,
|
| - "path": sp.path,
|
| + "project": addr.Project,
|
| + "path": addr.Path,
|
| }.Errorf(a, "Invalid command-line stream path.")
|
| return 1
|
| }
|
|
|
| - paths[i] = &sp
|
| + if addr.Host, err = a.resolveHost(""); err != nil {
|
| + err = errors.Annotate(err).Reason("failed to resolve host: %(host)q").
|
| + D("host", addr.Host).Err()
|
| + errors.Log(a, err)
|
| + return 1
|
| + }
|
| +
|
| + addrs[i] = &addr
|
| }
|
| if cmd.buffer <= 0 {
|
| log.Fields{
|
| @@ -123,13 +135,29 @@ func (cmd *catCommandRun) Run(scApp subcommands.Application, args []string, _ su
|
| }.Errorf(a, "Buffer size must be >0.")
|
| }
|
|
|
| + coords := make(map[string]*coordinator.Client, len(addrs))
|
| + for _, addr := range addrs {
|
| + if _, ok := coords[addr.Host]; ok {
|
| + continue
|
| + }
|
| +
|
| + var err error
|
| + if coords[addr.Host], err = a.coordinatorClient(addr.Host); err != nil {
|
| + err = errors.Annotate(err).Reason("failed to create Coordinator client for %(host)q").
|
| + D("host", addr.Host).Err()
|
| +
|
| + errors.Log(a, err)
|
| + return 1
|
| + }
|
| + }
|
| +
|
| tctx, _ := a.timeoutCtx(a)
|
| - for i, sp := range paths {
|
| - if err := cmd.catPath(tctx, a.coord, sp); err != nil {
|
| + for i, addr := range addrs {
|
| + if err := cmd.catPath(tctx, coords[addr.Host], addr); err != nil {
|
| log.Fields{
|
| log.ErrorKey: err,
|
| - "project": sp.project,
|
| - "path": sp.path,
|
| + "project": addr.Project,
|
| + "path": addr.Path,
|
| "index": i,
|
| }.Errorf(a, "Failed to fetch log stream.")
|
|
|
| @@ -143,16 +171,10 @@ func (cmd *catCommandRun) Run(scApp subcommands.Application, args []string, _ su
|
| return 0
|
| }
|
|
|
| -// streamPath is a single path to fetch.
|
| -type streamPath struct {
|
| - project cfgtypes.ProjectName
|
| - path types.StreamPath
|
| -}
|
| -
|
| -func (cmd *catCommandRun) catPath(c context.Context, coord *coordinator.Client, sp *streamPath) error {
|
| +func (cmd *catCommandRun) catPath(c context.Context, coord *coordinator.Client, addr *types.StreamAddr) error {
|
| // Pull stream information.
|
| src := coordinatorSource{
|
| - stream: coord.Stream(sp.project, sp.path),
|
| + stream: coord.Stream(addr.Project, addr.Path),
|
| }
|
| src.tidx = -1 // Must be set to probe for state.
|
|
|
|
|