| OLD | NEW |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package logs | 5 package logs |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "net/url" | |
| 9 "time" | 8 "time" |
| 10 | 9 |
| 11 "github.com/golang/protobuf/proto" | 10 "github.com/golang/protobuf/proto" |
| 12 ds "github.com/luci/gae/service/datastore" | 11 ds "github.com/luci/gae/service/datastore" |
| 13 "github.com/luci/luci-go/appengine/logdog/coordinator" | 12 "github.com/luci/luci-go/appengine/logdog/coordinator" |
| 14 "github.com/luci/luci-go/common/api/logdog_coordinator/logs/v1" | 13 "github.com/luci/luci-go/common/api/logdog_coordinator/logs/v1" |
| 14 "github.com/luci/luci-go/common/config" |
| 15 "github.com/luci/luci-go/common/grpcutil" | 15 "github.com/luci/luci-go/common/grpcutil" |
| 16 "github.com/luci/luci-go/common/logdog/types" | 16 "github.com/luci/luci-go/common/logdog/types" |
| 17 log "github.com/luci/luci-go/common/logging" | 17 log "github.com/luci/luci-go/common/logging" |
| 18 "github.com/luci/luci-go/common/proto/logdog/logpb" | 18 "github.com/luci/luci-go/common/proto/logdog/logpb" |
| 19 "github.com/luci/luci-go/common/retry" | 19 "github.com/luci/luci-go/common/retry" |
| 20 "github.com/luci/luci-go/server/logdog/storage" | 20 "github.com/luci/luci-go/server/logdog/storage" |
| 21 "github.com/luci/luci-go/server/logdog/storage/archive" | 21 "github.com/luci/luci-go/server/logdog/storage/archive" |
| 22 "golang.org/x/net/context" | 22 "golang.org/x/net/context" |
| 23 "google.golang.org/grpc/codes" | 23 "google.golang.org/grpc/codes" |
| 24 ) | 24 ) |
| (...skipping 14 matching lines...) Expand all Loading... |
| 39 ) | 39 ) |
| 40 | 40 |
| 41 // Get returns state and log data for a single log stream. | 41 // Get returns state and log data for a single log stream. |
| 42 func (s *server) Get(c context.Context, req *logdog.GetRequest) (*logdog.GetResp
onse, error) { | 42 func (s *server) Get(c context.Context, req *logdog.GetRequest) (*logdog.GetResp
onse, error) { |
| 43 return s.getImpl(c, req, false) | 43 return s.getImpl(c, req, false) |
| 44 } | 44 } |
| 45 | 45 |
| 46 // Tail returns the last log entry for a given log stream. | 46 // Tail returns the last log entry for a given log stream. |
| 47 func (s *server) Tail(c context.Context, req *logdog.TailRequest) (*logdog.GetRe
sponse, error) { | 47 func (s *server) Tail(c context.Context, req *logdog.TailRequest) (*logdog.GetRe
sponse, error) { |
| 48 r := logdog.GetRequest{ | 48 r := logdog.GetRequest{ |
| 49 » » Path: req.Path, | 49 » » Project: req.Project, |
| 50 » » State: req.State, | 50 » » Path: req.Path, |
| 51 » » State: req.State, |
| 51 } | 52 } |
| 52 return s.getImpl(c, &r, true) | 53 return s.getImpl(c, &r, true) |
| 53 } | 54 } |
| 54 | 55 |
| 55 // getImpl is common code shared between Get and Tail endpoints. | 56 // getImpl is common code shared between Get and Tail endpoints. |
| 56 func (s *server) getImpl(c context.Context, req *logdog.GetRequest, tail bool) (
*logdog.GetResponse, error) { | 57 func (s *server) getImpl(c context.Context, req *logdog.GetRequest, tail bool) (
*logdog.GetResponse, error) { |
| 57 log.Fields{ | 58 log.Fields{ |
| 58 » » "path": req.Path, | 59 » » "project": req.Project, |
| 59 » » "index": req.Index, | 60 » » "path": req.Path, |
| 60 » » "tail": tail, | 61 » » "index": req.Index, |
| 62 » » "tail": tail, |
| 61 }.Debugf(c, "Received get request.") | 63 }.Debugf(c, "Received get request.") |
| 62 | 64 |
| 63 » // Fetch the log stream state for this log stream. | 65 » ls, err := coordinator.NewLogStream(req.Path) |
| 64 » u, err := url.Parse(req.Path) | |
| 65 if err != nil { | 66 if err != nil { |
| 66 » » log.Fields{ | 67 » » log.WithError(err).Errorf(c, "Invalid path supplied.") |
| 67 » » » log.ErrorKey: err, | |
| 68 » » » "path": req.Path, | |
| 69 » » }.Errorf(c, "Could not parse path URL.") | |
| 70 » » return nil, grpcutil.Errf(codes.InvalidArgument, "invalid path e
ncoding") | |
| 71 » } | |
| 72 » ls, err := coordinator.NewLogStream(u.Path) | |
| 73 » if err != nil { | |
| 74 » » log.Fields{ | |
| 75 » » » log.ErrorKey: err, | |
| 76 » » » "path": u.Path, | |
| 77 » » }.Errorf(c, "Invalid path supplied.") | |
| 78 return nil, grpcutil.Errf(codes.InvalidArgument, "invalid path v
alue") | 68 return nil, grpcutil.Errf(codes.InvalidArgument, "invalid path v
alue") |
| 79 } | 69 } |
| 80 | 70 |
| 71 // The user may supply a hash instead of a full path. Once resolved, log |
| 72 // the original log stream. |
| 73 path := ls.Path() |
| 74 if req.Path != string(path) { |
| 75 log.Fields{ |
| 76 "hashPath": req.Path, |
| 77 "streamPath": path, |
| 78 }.Debugf(c, "Resolved hash path.") |
| 79 } |
| 80 |
| 81 // If this log entry is Purged and we're not admin, pretend it doesn't e
xist. | 81 // If this log entry is Purged and we're not admin, pretend it doesn't e
xist. |
| 82 err = ds.Get(c).Get(ls) | 82 err = ds.Get(c).Get(ls) |
| 83 switch err { | 83 switch err { |
| 84 case nil: | 84 case nil: |
| 85 if ls.Purged { | 85 if ls.Purged { |
| 86 if authErr := coordinator.IsAdminUser(c); authErr != nil
{ | 86 if authErr := coordinator.IsAdminUser(c); authErr != nil
{ |
| 87 log.Fields{ | 87 log.Fields{ |
| 88 log.ErrorKey: authErr, | 88 log.ErrorKey: authErr, |
| 89 }.Warningf(c, "Non-superuser requested purged lo
g.") | 89 }.Warningf(c, "Non-superuser requested purged lo
g.") |
| 90 return nil, grpcutil.Errf(codes.NotFound, "path
not found") | 90 return nil, grpcutil.Errf(codes.NotFound, "path
not found") |
| 91 } | 91 } |
| 92 } | 92 } |
| 93 | 93 |
| 94 case ds.ErrNoSuchEntity: | 94 case ds.ErrNoSuchEntity: |
| 95 » » log.Fields{ | 95 » » log.Errorf(c, "Log stream does not exist.") |
| 96 » » » "path": u.Path, | |
| 97 » » }.Errorf(c, "Log stream does not exist.") | |
| 98 return nil, grpcutil.Errf(codes.NotFound, "path not found") | 96 return nil, grpcutil.Errf(codes.NotFound, "path not found") |
| 99 | 97 |
| 100 default: | 98 default: |
| 101 » » log.Fields{ | 99 » » log.WithError(err).Errorf(c, "Failed to look up log stream.") |
| 102 » » » log.ErrorKey: err, | |
| 103 » » » "path": u.Path, | |
| 104 » » }.Errorf(c, "Failed to look up log stream.") | |
| 105 return nil, grpcutil.Internal | 100 return nil, grpcutil.Internal |
| 106 } | 101 } |
| 107 path := ls.Path() | |
| 108 | 102 |
| 109 // If nothing was requested, return nothing. | 103 // If nothing was requested, return nothing. |
| 110 resp := logdog.GetResponse{} | 104 resp := logdog.GetResponse{} |
| 111 if !(req.State || tail) && req.LogCount < 0 { | 105 if !(req.State || tail) && req.LogCount < 0 { |
| 112 return &resp, nil | 106 return &resp, nil |
| 113 } | 107 } |
| 114 | 108 |
| 115 if req.State { | 109 if req.State { |
| 116 resp.State = loadLogStreamState(ls) | 110 resp.State = loadLogStreamState(ls) |
| 117 | 111 |
| 118 var err error | 112 var err error |
| 119 resp.Desc, err = ls.DescriptorValue() | 113 resp.Desc, err = ls.DescriptorValue() |
| 120 if err != nil { | 114 if err != nil { |
| 121 log.WithError(err).Errorf(c, "Failed to deserialize desc
riptor protobuf.") | 115 log.WithError(err).Errorf(c, "Failed to deserialize desc
riptor protobuf.") |
| 122 return nil, grpcutil.Internal | 116 return nil, grpcutil.Internal |
| 123 } | 117 } |
| 124 } | 118 } |
| 125 | 119 |
| 126 // Retrieve requested logs from storage, if requested. | 120 // Retrieve requested logs from storage, if requested. |
| 127 if tail || req.LogCount >= 0 { | 121 if tail || req.LogCount >= 0 { |
| 128 resp.Logs, err = s.getLogs(c, req, tail, ls) | 122 resp.Logs, err = s.getLogs(c, req, tail, ls) |
| 129 if err != nil { | 123 if err != nil { |
| 130 » » » log.Fields{ | 124 » » » log.WithError(err).Errorf(c, "Failed to get logs.") |
| 131 » » » » log.ErrorKey: err, | |
| 132 » » » » "path": path, | |
| 133 » » » }.Errorf(c, "Failed to get logs.") | |
| 134 return nil, grpcutil.Internal | 125 return nil, grpcutil.Internal |
| 135 } | 126 } |
| 136 } | 127 } |
| 137 | 128 |
| 138 log.Fields{ | 129 log.Fields{ |
| 139 "logCount": len(resp.Logs), | 130 "logCount": len(resp.Logs), |
| 140 }.Debugf(c, "Get request completed successfully.") | 131 }.Debugf(c, "Get request completed successfully.") |
| 141 return &resp, nil | 132 return &resp, nil |
| 142 } | 133 } |
| 143 | 134 |
| (...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 184 Client: gs, | 175 Client: gs, |
| 185 MaxBytes: byteLimit, | 176 MaxBytes: byteLimit, |
| 186 }) | 177 }) |
| 187 if err != nil { | 178 if err != nil { |
| 188 log.WithError(err).Errorf(c, "Failed to create Google St
orage storage instance.") | 179 log.WithError(err).Errorf(c, "Failed to create Google St
orage storage instance.") |
| 189 return nil, err | 180 return nil, err |
| 190 } | 181 } |
| 191 } | 182 } |
| 192 defer st.Close() | 183 defer st.Close() |
| 193 | 184 |
| 194 » path := ls.Path() | 185 » project, path := coordinator.Project(c), ls.Path() |
| 195 | 186 |
| 196 var fetchedLogs [][]byte | 187 var fetchedLogs [][]byte |
| 197 var err error | 188 var err error |
| 198 if tail { | 189 if tail { |
| 199 » » fetchedLogs, err = getTail(c, st, path) | 190 » » fetchedLogs, err = getTail(c, st, project, path) |
| 200 } else { | 191 } else { |
| 201 » » fetchedLogs, err = getHead(c, req, st, path, byteLimit) | 192 » » fetchedLogs, err = getHead(c, req, st, project, path, byteLimit) |
| 202 } | 193 } |
| 203 if err != nil { | 194 if err != nil { |
| 204 log.WithError(err).Errorf(c, "Failed to fetch log records.") | 195 log.WithError(err).Errorf(c, "Failed to fetch log records.") |
| 205 return nil, err | 196 return nil, err |
| 206 } | 197 } |
| 207 | 198 |
| 208 logEntries := make([]*logpb.LogEntry, len(fetchedLogs)) | 199 logEntries := make([]*logpb.LogEntry, len(fetchedLogs)) |
| 209 for idx, ld := range fetchedLogs { | 200 for idx, ld := range fetchedLogs { |
| 210 // Deserialize the log entry, then convert it to output value. | 201 // Deserialize the log entry, then convert it to output value. |
| 211 le := logpb.LogEntry{} | 202 le := logpb.LogEntry{} |
| 212 if err := proto.Unmarshal(ld, &le); err != nil { | 203 if err := proto.Unmarshal(ld, &le); err != nil { |
| 213 log.Fields{ | 204 log.Fields{ |
| 214 log.ErrorKey: err, | 205 log.ErrorKey: err, |
| 215 "index": idx, | 206 "index": idx, |
| 216 }.Errorf(c, "Failed to generate response log entry.") | 207 }.Errorf(c, "Failed to generate response log entry.") |
| 217 return nil, err | 208 return nil, err |
| 218 } | 209 } |
| 219 logEntries[idx] = &le | 210 logEntries[idx] = &le |
| 220 } | 211 } |
| 221 return logEntries, nil | 212 return logEntries, nil |
| 222 } | 213 } |
| 223 | 214 |
| 224 func getHead(c context.Context, req *logdog.GetRequest, st storage.Storage, p ty
pes.StreamPath, byteLimit int) ( | 215 func getHead(c context.Context, req *logdog.GetRequest, st storage.Storage, proj
ect config.ProjectName, |
| 225 » [][]byte, error) { | 216 » path types.StreamPath, byteLimit int) ([][]byte, error) { |
| 226 » c = log.SetFields(c, log.Fields{ | 217 » log.Fields{ |
| 227 » » "path": p, | 218 » » "project": project, |
| 219 » » "path": path, |
| 228 "index": req.Index, | 220 "index": req.Index, |
| 229 "count": req.LogCount, | 221 "count": req.LogCount, |
| 230 "bytes": req.ByteCount, | 222 "bytes": req.ByteCount, |
| 231 "noncontiguous": req.NonContiguous, | 223 "noncontiguous": req.NonContiguous, |
| 232 » }) | 224 » }.Debugf(c, "Issuing Get request.") |
| 233 | 225 |
| 234 // Allocate result logs array. | 226 // Allocate result logs array. |
| 235 logCount := int(req.LogCount) | 227 logCount := int(req.LogCount) |
| 236 asz := getInitialArraySize | 228 asz := getInitialArraySize |
| 237 if logCount > 0 && logCount < asz { | 229 if logCount > 0 && logCount < asz { |
| 238 asz = logCount | 230 asz = logCount |
| 239 } | 231 } |
| 240 logs := make([][]byte, 0, asz) | 232 logs := make([][]byte, 0, asz) |
| 241 | 233 |
| 242 sreq := storage.GetRequest{ | 234 sreq := storage.GetRequest{ |
| 243 » » Path: p, | 235 » » Project: project, |
| 244 » » Index: types.MessageIndex(req.Index), | 236 » » Path: path, |
| 245 » » Limit: logCount, | 237 » » Index: types.MessageIndex(req.Index), |
| 238 » » Limit: logCount, |
| 246 } | 239 } |
| 247 | 240 |
| 248 count := 0 | 241 count := 0 |
| 249 err := retry.Retry(c, retry.TransientOnly(retry.Default), func() error { | 242 err := retry.Retry(c, retry.TransientOnly(retry.Default), func() error { |
| 250 // Issue the Get request. This may return a transient error, in
which case | 243 // Issue the Get request. This may return a transient error, in
which case |
| 251 // we will retry. | 244 // we will retry. |
| 252 return st.Get(sreq, func(idx types.MessageIndex, ld []byte) bool
{ | 245 return st.Get(sreq, func(idx types.MessageIndex, ld []byte) bool
{ |
| 253 if count > 0 && byteLimit-len(ld) < 0 { | 246 if count > 0 && byteLimit-len(ld) < 0 { |
| 254 // Not the first log, and we've exceeded our byt
e limit. | 247 // Not the first log, and we've exceeded our byt
e limit. |
| 255 return false | 248 return false |
| (...skipping 10 matching lines...) Expand all Loading... |
| 266 }) | 259 }) |
| 267 }, func(err error, delay time.Duration) { | 260 }, func(err error, delay time.Duration) { |
| 268 log.Fields{ | 261 log.Fields{ |
| 269 log.ErrorKey: err, | 262 log.ErrorKey: err, |
| 270 "delay": delay, | 263 "delay": delay, |
| 271 "initialIndex": req.Index, | 264 "initialIndex": req.Index, |
| 272 "nextIndex": sreq.Index, | 265 "nextIndex": sreq.Index, |
| 273 "count": len(logs), | 266 "count": len(logs), |
| 274 }.Warningf(c, "Transient error while loading logs; retrying.") | 267 }.Warningf(c, "Transient error while loading logs; retrying.") |
| 275 }) | 268 }) |
| 276 » if err != nil { | 269 » switch err { |
| 270 » case nil: |
| 271 » » return logs, nil |
| 272 |
| 273 » case storage.ErrDoesNotExist: |
| 274 » » return nil, nil |
| 275 |
| 276 » default: |
| 277 log.Fields{ | 277 log.Fields{ |
| 278 log.ErrorKey: err, | 278 log.ErrorKey: err, |
| 279 "initialIndex": req.Index, | 279 "initialIndex": req.Index, |
| 280 "nextIndex": sreq.Index, | 280 "nextIndex": sreq.Index, |
| 281 "count": len(logs), | 281 "count": len(logs), |
| 282 }.Errorf(c, "Failed to execute range request.") | 282 }.Errorf(c, "Failed to execute range request.") |
| 283 return nil, err | 283 return nil, err |
| 284 } | 284 } |
| 285 | |
| 286 return logs, nil | |
| 287 } | 285 } |
| 288 | 286 |
| 289 func getTail(c context.Context, st storage.Storage, p types.StreamPath) ([][]byt
e, error) { | 287 func getTail(c context.Context, st storage.Storage, project config.ProjectName,
path types.StreamPath) ([][]byte, error) { |
| 288 » log.Fields{ |
| 289 » » "project": project, |
| 290 » » "path": path, |
| 291 » }.Debugf(c, "Issuing Tail request.") |
| 292 |
| 290 var data []byte | 293 var data []byte |
| 291 err := retry.Retry(c, retry.TransientOnly(retry.Default), func() (err er
ror) { | 294 err := retry.Retry(c, retry.TransientOnly(retry.Default), func() (err er
ror) { |
| 292 » » data, _, err = st.Tail("", p) | 295 » » data, _, err = st.Tail(project, path) |
| 293 return | 296 return |
| 294 }, func(err error, delay time.Duration) { | 297 }, func(err error, delay time.Duration) { |
| 295 log.Fields{ | 298 log.Fields{ |
| 296 log.ErrorKey: err, | 299 log.ErrorKey: err, |
| 297 "delay": delay, | 300 "delay": delay, |
| 298 }.Warningf(c, "Transient error while fetching tail log; retrying
.") | 301 }.Warningf(c, "Transient error while fetching tail log; retrying
.") |
| 299 }) | 302 }) |
| 300 » if err != nil { | 303 » switch err { |
| 304 » case nil: |
| 305 » » return [][]byte{data}, nil |
| 306 |
| 307 » case storage.ErrDoesNotExist: |
| 308 » » return nil, nil |
| 309 |
| 310 » default: |
| 301 log.WithError(err).Errorf(c, "Failed to fetch tail log.") | 311 log.WithError(err).Errorf(c, "Failed to fetch tail log.") |
| 302 return nil, err | 312 return nil, err |
| 303 } | 313 } |
| 304 return [][]byte{data}, err | |
| 305 } | 314 } |
| OLD | NEW |