| OLD | NEW |
| 1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package cli | 5 package cli |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "errors" | |
| 9 "io" | 8 "io" |
| 10 "os" | 9 "os" |
| 11 "strconv" | 10 "strconv" |
| 12 "strings" | 11 "strings" |
| 13 "time" | 12 "time" |
| 14 | 13 |
| 14 "github.com/luci/luci-go/common/errors" |
| 15 "github.com/luci/luci-go/common/flag/flagenum" | 15 "github.com/luci/luci-go/common/flag/flagenum" |
| 16 log "github.com/luci/luci-go/common/logging" | 16 log "github.com/luci/luci-go/common/logging" |
| 17 "github.com/luci/luci-go/common/proto/google" | 17 "github.com/luci/luci-go/common/proto/google" |
| 18 "github.com/luci/luci-go/common/proto/milo" | 18 "github.com/luci/luci-go/common/proto/milo" |
| 19 "github.com/luci/luci-go/logdog/api/logpb" | 19 "github.com/luci/luci-go/logdog/api/logpb" |
| 20 "github.com/luci/luci-go/logdog/client/coordinator" | 20 "github.com/luci/luci-go/logdog/client/coordinator" |
| 21 "github.com/luci/luci-go/logdog/common/fetcher" | 21 "github.com/luci/luci-go/logdog/common/fetcher" |
| 22 "github.com/luci/luci-go/logdog/common/renderer" | 22 "github.com/luci/luci-go/logdog/common/renderer" |
| 23 "github.com/luci/luci-go/logdog/common/types" | 23 "github.com/luci/luci-go/logdog/common/types" |
| 24 "github.com/luci/luci-go/luci_config/common/cfgtypes" | |
| 25 | 24 |
| 26 "github.com/golang/protobuf/proto" | 25 "github.com/golang/protobuf/proto" |
| 27 "github.com/maruel/subcommands" | 26 "github.com/maruel/subcommands" |
| 28 "golang.org/x/net/context" | 27 "golang.org/x/net/context" |
| 29 ) | 28 ) |
| 30 | 29 |
| 31 var errDatagramNotSupported = errors.New("datagram not supported") | 30 var errDatagramNotSupported = errors.New("datagram not supported") |
| 32 | 31 |
| 33 type timestampsFlag string | 32 type timestampsFlag string |
| 34 | 33 |
| (...skipping 52 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 87 } | 86 } |
| 88 | 87 |
| 89 func (cmd *catCommandRun) Run(scApp subcommands.Application, args []string, _ su
bcommands.Env) int { | 88 func (cmd *catCommandRun) Run(scApp subcommands.Application, args []string, _ su
bcommands.Env) int { |
| 90 a := scApp.(*application) | 89 a := scApp.(*application) |
| 91 | 90 |
| 92 if len(args) == 0 { | 91 if len(args) == 0 { |
| 93 log.Errorf(a, "At least one log path must be supplied.") | 92 log.Errorf(a, "At least one log path must be supplied.") |
| 94 return 1 | 93 return 1 |
| 95 } | 94 } |
| 96 | 95 |
| 97 » // Validate and construct our cat paths. | 96 » // Validate and construct our cat addresses. |
| 98 » paths := make([]*streamPath, len(args)) | 97 » addrs := make([]*types.StreamAddr, len(args)) |
| 99 for i, arg := range args { | 98 for i, arg := range args { |
| 99 // If the address parses as a URL, use it directly. |
| 100 var err error |
| 101 if addrs[i], err = types.ParseURL(arg); err == nil { |
| 102 continue |
| 103 } |
| 104 |
| 100 // User-friendly: trim any leading or trailing slashes from the
path. | 105 // User-friendly: trim any leading or trailing slashes from the
path. |
| 101 project, path, _, err := a.splitPath(arg) | 106 project, path, _, err := a.splitPath(arg) |
| 102 if err != nil { | 107 if err != nil { |
| 103 log.WithError(err).Errorf(a, "Invalid path specifier.") | 108 log.WithError(err).Errorf(a, "Invalid path specifier.") |
| 104 return 1 | 109 return 1 |
| 105 } | 110 } |
| 106 | 111 |
| 107 » » sp := streamPath{project, types.StreamPath(path)} | 112 » » addr := types.StreamAddr{Project: project, Path: types.StreamPat
h(path)} |
| 108 » » if err := sp.path.Validate(); err != nil { | 113 » » if err := addr.Path.Validate(); err != nil { |
| 109 log.Fields{ | 114 log.Fields{ |
| 110 log.ErrorKey: err, | 115 log.ErrorKey: err, |
| 111 "index": i, | 116 "index": i, |
| 112 » » » » "project": sp.project, | 117 » » » » "project": addr.Project, |
| 113 » » » » "path": sp.path, | 118 » » » » "path": addr.Path, |
| 114 }.Errorf(a, "Invalid command-line stream path.") | 119 }.Errorf(a, "Invalid command-line stream path.") |
| 115 return 1 | 120 return 1 |
| 116 } | 121 } |
| 117 | 122 |
| 118 » » paths[i] = &sp | 123 » » if addr.Host, err = a.resolveHost(""); err != nil { |
| 124 » » » err = errors.Annotate(err).Reason("failed to resolve hos
t: %(host)q"). |
| 125 » » » » D("host", addr.Host).Err() |
| 126 » » » errors.Log(a, err) |
| 127 » » » return 1 |
| 128 » » } |
| 129 |
| 130 » » addrs[i] = &addr |
| 119 } | 131 } |
| 120 if cmd.buffer <= 0 { | 132 if cmd.buffer <= 0 { |
| 121 log.Fields{ | 133 log.Fields{ |
| 122 "value": cmd.buffer, | 134 "value": cmd.buffer, |
| 123 }.Errorf(a, "Buffer size must be >0.") | 135 }.Errorf(a, "Buffer size must be >0.") |
| 124 } | 136 } |
| 125 | 137 |
| 138 coords := make(map[string]*coordinator.Client, len(addrs)) |
| 139 for _, addr := range addrs { |
| 140 if _, ok := coords[addr.Host]; ok { |
| 141 continue |
| 142 } |
| 143 |
| 144 var err error |
| 145 if coords[addr.Host], err = a.coordinatorClient(addr.Host); err
!= nil { |
| 146 err = errors.Annotate(err).Reason("failed to create Coor
dinator client for %(host)q"). |
| 147 D("host", addr.Host).Err() |
| 148 |
| 149 errors.Log(a, err) |
| 150 return 1 |
| 151 } |
| 152 } |
| 153 |
| 126 tctx, _ := a.timeoutCtx(a) | 154 tctx, _ := a.timeoutCtx(a) |
| 127 » for i, sp := range paths { | 155 » for i, addr := range addrs { |
| 128 » » if err := cmd.catPath(tctx, a.coord, sp); err != nil { | 156 » » if err := cmd.catPath(tctx, coords[addr.Host], addr); err != nil
{ |
| 129 log.Fields{ | 157 log.Fields{ |
| 130 log.ErrorKey: err, | 158 log.ErrorKey: err, |
| 131 » » » » "project": sp.project, | 159 » » » » "project": addr.Project, |
| 132 » » » » "path": sp.path, | 160 » » » » "path": addr.Path, |
| 133 "index": i, | 161 "index": i, |
| 134 }.Errorf(a, "Failed to fetch log stream.") | 162 }.Errorf(a, "Failed to fetch log stream.") |
| 135 | 163 |
| 136 if err == context.DeadlineExceeded { | 164 if err == context.DeadlineExceeded { |
| 137 return 2 | 165 return 2 |
| 138 } | 166 } |
| 139 return 1 | 167 return 1 |
| 140 } | 168 } |
| 141 } | 169 } |
| 142 | 170 |
| 143 return 0 | 171 return 0 |
| 144 } | 172 } |
| 145 | 173 |
| 146 // streamPath is a single path to fetch. | 174 func (cmd *catCommandRun) catPath(c context.Context, coord *coordinator.Client,
addr *types.StreamAddr) error { |
| 147 type streamPath struct { | |
| 148 » project cfgtypes.ProjectName | |
| 149 » path types.StreamPath | |
| 150 } | |
| 151 | |
| 152 func (cmd *catCommandRun) catPath(c context.Context, coord *coordinator.Client,
sp *streamPath) error { | |
| 153 // Pull stream information. | 175 // Pull stream information. |
| 154 src := coordinatorSource{ | 176 src := coordinatorSource{ |
| 155 » » stream: coord.Stream(sp.project, sp.path), | 177 » » stream: coord.Stream(addr.Project, addr.Path), |
| 156 } | 178 } |
| 157 src.tidx = -1 // Must be set to probe for state. | 179 src.tidx = -1 // Must be set to probe for state. |
| 158 | 180 |
| 159 f := fetcher.New(c, fetcher.Options{ | 181 f := fetcher.New(c, fetcher.Options{ |
| 160 Source: &src, | 182 Source: &src, |
| 161 Index: types.MessageIndex(cmd.index), | 183 Index: types.MessageIndex(cmd.index), |
| 162 Count: cmd.count, | 184 Count: cmd.count, |
| 163 BufferCount: cmd.fetchSize, | 185 BufferCount: cmd.fetchSize, |
| 164 BufferBytes: int64(cmd.fetchBytes), | 186 BufferBytes: int64(cmd.fetchBytes), |
| 165 }) | 187 }) |
| (...skipping 67 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 233 } | 255 } |
| 234 | 256 |
| 235 if err := proto.MarshalText(w, pb); err != nil { | 257 if err := proto.MarshalText(w, pb); err != nil { |
| 236 log.WithError(err).Errorf(c, "Failed to marshal datagram
as text.") | 258 log.WithError(err).Errorf(c, "Failed to marshal datagram
as text.") |
| 237 return false | 259 return false |
| 238 } | 260 } |
| 239 | 261 |
| 240 return true | 262 return true |
| 241 } | 263 } |
| 242 } | 264 } |
| OLD | NEW |