| OLD | NEW |
| 1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package logs | 5 package logs |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "time" | 8 "time" |
| 9 | 9 |
| 10 "github.com/luci/luci-go/common/errors" | 10 "github.com/luci/luci-go/common/errors" |
| 11 log "github.com/luci/luci-go/common/logging" | 11 log "github.com/luci/luci-go/common/logging" |
| 12 "github.com/luci/luci-go/common/proto/google" | 12 "github.com/luci/luci-go/common/proto/google" |
| 13 "github.com/luci/luci-go/common/retry" | 13 "github.com/luci/luci-go/common/retry" |
| 14 "github.com/luci/luci-go/common/retry/transient" |
| 14 "github.com/luci/luci-go/grpc/grpcutil" | 15 "github.com/luci/luci-go/grpc/grpcutil" |
| 15 "github.com/luci/luci-go/logdog/api/endpoints/coordinator/logs/v1" | 16 "github.com/luci/luci-go/logdog/api/endpoints/coordinator/logs/v1" |
| 16 "github.com/luci/luci-go/logdog/api/logpb" | 17 "github.com/luci/luci-go/logdog/api/logpb" |
| 17 "github.com/luci/luci-go/logdog/appengine/coordinator" | 18 "github.com/luci/luci-go/logdog/appengine/coordinator" |
| 18 "github.com/luci/luci-go/logdog/common/storage" | 19 "github.com/luci/luci-go/logdog/common/storage" |
| 19 "github.com/luci/luci-go/logdog/common/types" | 20 "github.com/luci/luci-go/logdog/common/types" |
| 20 "github.com/luci/luci-go/luci_config/common/cfgtypes" | 21 "github.com/luci/luci-go/luci_config/common/cfgtypes" |
| 21 | 22 |
| 22 ds "github.com/luci/gae/service/datastore" | 23 ds "github.com/luci/gae/service/datastore" |
| 23 | 24 |
| (...skipping 179 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 203 | 204 |
| 204 sreq := storage.GetRequest{ | 205 sreq := storage.GetRequest{ |
| 205 Project: project, | 206 Project: project, |
| 206 Path: path, | 207 Path: path, |
| 207 Index: types.MessageIndex(req.Index), | 208 Index: types.MessageIndex(req.Index), |
| 208 Limit: logCount, | 209 Limit: logCount, |
| 209 } | 210 } |
| 210 | 211 |
| 211 var ierr error | 212 var ierr error |
| 212 count := 0 | 213 count := 0 |
| 213 » err := retry.Retry(c, retry.TransientOnly(retry.Default), func() error { | 214 » err := retry.Retry(c, transient.Only(retry.Default), func() error { |
| 214 // Issue the Get request. This may return a transient error, in
which case | 215 // Issue the Get request. This may return a transient error, in
which case |
| 215 // we will retry. | 216 // we will retry. |
| 216 return st.Get(sreq, func(e *storage.Entry) bool { | 217 return st.Get(sreq, func(e *storage.Entry) bool { |
| 217 var le *logpb.LogEntry | 218 var le *logpb.LogEntry |
| 218 if le, ierr = e.GetLogEntry(); ierr != nil { | 219 if le, ierr = e.GetLogEntry(); ierr != nil { |
| 219 return false | 220 return false |
| 220 } | 221 } |
| 221 | 222 |
| 222 if count > 0 && byteLimit-len(e.D) < 0 { | 223 if count > 0 && byteLimit-len(e.D) < 0 { |
| 223 // Not the first log, and we've exceeded our byt
e limit. | 224 // Not the first log, and we've exceeded our byt
e limit. |
| (...skipping 43 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 267 } | 268 } |
| 268 | 269 |
| 269 func getTail(c context.Context, st coordinator.Storage, project cfgtypes.Project
Name, path types.StreamPath) ( | 270 func getTail(c context.Context, st coordinator.Storage, project cfgtypes.Project
Name, path types.StreamPath) ( |
| 270 []*logpb.LogEntry, error) { | 271 []*logpb.LogEntry, error) { |
| 271 log.Fields{ | 272 log.Fields{ |
| 272 "project": project, | 273 "project": project, |
| 273 "path": path, | 274 "path": path, |
| 274 }.Debugf(c, "Issuing Tail request.") | 275 }.Debugf(c, "Issuing Tail request.") |
| 275 | 276 |
| 276 var e *storage.Entry | 277 var e *storage.Entry |
| 277 » err := retry.Retry(c, retry.TransientOnly(retry.Default), func() (err er
ror) { | 278 » err := retry.Retry(c, transient.Only(retry.Default), func() (err error)
{ |
| 278 e, err = st.Tail(project, path) | 279 e, err = st.Tail(project, path) |
| 279 return | 280 return |
| 280 }, func(err error, delay time.Duration) { | 281 }, func(err error, delay time.Duration) { |
| 281 log.Fields{ | 282 log.Fields{ |
| 282 log.ErrorKey: err, | 283 log.ErrorKey: err, |
| 283 "delay": delay, | 284 "delay": delay, |
| 284 }.Warningf(c, "Transient error while fetching tail log; retrying
.") | 285 }.Warningf(c, "Transient error while fetching tail log; retrying
.") |
| 285 }) | 286 }) |
| 286 switch err { | 287 switch err { |
| 287 case nil: | 288 case nil: |
| 288 le, err := e.GetLogEntry() | 289 le, err := e.GetLogEntry() |
| 289 if err != nil { | 290 if err != nil { |
| 290 log.WithError(err).Errorf(c, "Failed to load tail entry
data.") | 291 log.WithError(err).Errorf(c, "Failed to load tail entry
data.") |
| 291 return nil, err | 292 return nil, err |
| 292 } | 293 } |
| 293 return []*logpb.LogEntry{le}, nil | 294 return []*logpb.LogEntry{le}, nil |
| 294 | 295 |
| 295 case storage.ErrDoesNotExist: | 296 case storage.ErrDoesNotExist: |
| 296 return nil, nil | 297 return nil, nil |
| 297 | 298 |
| 298 default: | 299 default: |
| 299 log.WithError(err).Errorf(c, "Failed to fetch tail log.") | 300 log.WithError(err).Errorf(c, "Failed to fetch tail log.") |
| 300 return nil, err | 301 return nil, err |
| 301 } | 302 } |
| 302 } | 303 } |
| OLD | NEW |