OLD | NEW |
1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
4 | 4 |
5 package logs | 5 package logs |
6 | 6 |
7 import ( | 7 import ( |
8 "time" | 8 "time" |
9 | 9 |
10 "github.com/luci/luci-go/common/errors" | 10 "github.com/luci/luci-go/common/errors" |
11 log "github.com/luci/luci-go/common/logging" | 11 log "github.com/luci/luci-go/common/logging" |
12 "github.com/luci/luci-go/common/proto/google" | 12 "github.com/luci/luci-go/common/proto/google" |
13 "github.com/luci/luci-go/common/retry" | 13 "github.com/luci/luci-go/common/retry" |
| 14 "github.com/luci/luci-go/common/retry/transient" |
14 "github.com/luci/luci-go/grpc/grpcutil" | 15 "github.com/luci/luci-go/grpc/grpcutil" |
15 "github.com/luci/luci-go/logdog/api/endpoints/coordinator/logs/v1" | 16 "github.com/luci/luci-go/logdog/api/endpoints/coordinator/logs/v1" |
16 "github.com/luci/luci-go/logdog/api/logpb" | 17 "github.com/luci/luci-go/logdog/api/logpb" |
17 "github.com/luci/luci-go/logdog/appengine/coordinator" | 18 "github.com/luci/luci-go/logdog/appengine/coordinator" |
18 "github.com/luci/luci-go/logdog/common/storage" | 19 "github.com/luci/luci-go/logdog/common/storage" |
19 "github.com/luci/luci-go/logdog/common/types" | 20 "github.com/luci/luci-go/logdog/common/types" |
20 "github.com/luci/luci-go/luci_config/common/cfgtypes" | 21 "github.com/luci/luci-go/luci_config/common/cfgtypes" |
21 | 22 |
22 ds "github.com/luci/gae/service/datastore" | 23 ds "github.com/luci/gae/service/datastore" |
23 | 24 |
(...skipping 179 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
203 | 204 |
204 sreq := storage.GetRequest{ | 205 sreq := storage.GetRequest{ |
205 Project: project, | 206 Project: project, |
206 Path: path, | 207 Path: path, |
207 Index: types.MessageIndex(req.Index), | 208 Index: types.MessageIndex(req.Index), |
208 Limit: logCount, | 209 Limit: logCount, |
209 } | 210 } |
210 | 211 |
211 var ierr error | 212 var ierr error |
212 count := 0 | 213 count := 0 |
213 » err := retry.Retry(c, retry.TransientOnly(retry.Default), func() error { | 214 » err := retry.Retry(c, transient.Only(retry.Default), func() error { |
214 // Issue the Get request. This may return a transient error, in
which case | 215 // Issue the Get request. This may return a transient error, in
which case |
215 // we will retry. | 216 // we will retry. |
216 return st.Get(sreq, func(e *storage.Entry) bool { | 217 return st.Get(sreq, func(e *storage.Entry) bool { |
217 var le *logpb.LogEntry | 218 var le *logpb.LogEntry |
218 if le, ierr = e.GetLogEntry(); ierr != nil { | 219 if le, ierr = e.GetLogEntry(); ierr != nil { |
219 return false | 220 return false |
220 } | 221 } |
221 | 222 |
222 if count > 0 && byteLimit-len(e.D) < 0 { | 223 if count > 0 && byteLimit-len(e.D) < 0 { |
223 // Not the first log, and we've exceeded our byt
e limit. | 224 // Not the first log, and we've exceeded our byt
e limit. |
(...skipping 43 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
267 } | 268 } |
268 | 269 |
269 func getTail(c context.Context, st coordinator.Storage, project cfgtypes.Project
Name, path types.StreamPath) ( | 270 func getTail(c context.Context, st coordinator.Storage, project cfgtypes.Project
Name, path types.StreamPath) ( |
270 []*logpb.LogEntry, error) { | 271 []*logpb.LogEntry, error) { |
271 log.Fields{ | 272 log.Fields{ |
272 "project": project, | 273 "project": project, |
273 "path": path, | 274 "path": path, |
274 }.Debugf(c, "Issuing Tail request.") | 275 }.Debugf(c, "Issuing Tail request.") |
275 | 276 |
276 var e *storage.Entry | 277 var e *storage.Entry |
277 » err := retry.Retry(c, retry.TransientOnly(retry.Default), func() (err er
ror) { | 278 » err := retry.Retry(c, transient.Only(retry.Default), func() (err error)
{ |
278 e, err = st.Tail(project, path) | 279 e, err = st.Tail(project, path) |
279 return | 280 return |
280 }, func(err error, delay time.Duration) { | 281 }, func(err error, delay time.Duration) { |
281 log.Fields{ | 282 log.Fields{ |
282 log.ErrorKey: err, | 283 log.ErrorKey: err, |
283 "delay": delay, | 284 "delay": delay, |
284 }.Warningf(c, "Transient error while fetching tail log; retrying
.") | 285 }.Warningf(c, "Transient error while fetching tail log; retrying
.") |
285 }) | 286 }) |
286 switch err { | 287 switch err { |
287 case nil: | 288 case nil: |
288 le, err := e.GetLogEntry() | 289 le, err := e.GetLogEntry() |
289 if err != nil { | 290 if err != nil { |
290 log.WithError(err).Errorf(c, "Failed to load tail entry
data.") | 291 log.WithError(err).Errorf(c, "Failed to load tail entry
data.") |
291 return nil, err | 292 return nil, err |
292 } | 293 } |
293 return []*logpb.LogEntry{le}, nil | 294 return []*logpb.LogEntry{le}, nil |
294 | 295 |
295 case storage.ErrDoesNotExist: | 296 case storage.ErrDoesNotExist: |
296 return nil, nil | 297 return nil, nil |
297 | 298 |
298 default: | 299 default: |
299 log.WithError(err).Errorf(c, "Failed to fetch tail log.") | 300 log.WithError(err).Errorf(c, "Failed to fetch tail log.") |
300 return nil, err | 301 return nil, err |
301 } | 302 } |
302 } | 303 } |
OLD | NEW |