Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package logs | 5 package logs |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "time" | 8 "time" |
| 9 | 9 |
| 10 "github.com/golang/protobuf/proto" | |
| 11 ds "github.com/luci/gae/service/datastore" | 10 ds "github.com/luci/gae/service/datastore" |
| 12 "github.com/luci/luci-go/common/config" | 11 "github.com/luci/luci-go/common/config" |
| 13 log "github.com/luci/luci-go/common/logging" | 12 log "github.com/luci/luci-go/common/logging" |
| 14 "github.com/luci/luci-go/common/retry" | 13 "github.com/luci/luci-go/common/retry" |
| 15 "github.com/luci/luci-go/grpc/grpcutil" | 14 "github.com/luci/luci-go/grpc/grpcutil" |
| 16 "github.com/luci/luci-go/logdog/api/endpoints/coordinator/logs/v1" | 15 "github.com/luci/luci-go/logdog/api/endpoints/coordinator/logs/v1" |
| 17 "github.com/luci/luci-go/logdog/api/logpb" | 16 "github.com/luci/luci-go/logdog/api/logpb" |
| 18 "github.com/luci/luci-go/logdog/appengine/coordinator" | 17 "github.com/luci/luci-go/logdog/appengine/coordinator" |
| 19 "github.com/luci/luci-go/logdog/common/storage" | 18 "github.com/luci/luci-go/logdog/common/storage" |
| 20 "github.com/luci/luci-go/logdog/common/storage/archive" | 19 "github.com/luci/luci-go/logdog/common/storage/archive" |
| (...skipping 141 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 162 defer func() { | 161 defer func() { |
| 163 if err := gs.Close(); err != nil { | 162 if err := gs.Close(); err != nil { |
| 164 log.WithError(err).Warningf(c, "Failed to close Google Storage client.") | 163 log.WithError(err).Warningf(c, "Failed to close Google Storage client.") |
| 165 } | 164 } |
| 166 }() | 165 }() |
| 167 | 166 |
| 168 st, err = archive.New(c, archive.Options{ | 167 st, err = archive.New(c, archive.Options{ |
| 169 IndexURL: lst.ArchiveIndexURL, | 168 IndexURL: lst.ArchiveIndexURL, |
| 170 StreamURL: lst.ArchiveStreamURL, | 169 StreamURL: lst.ArchiveStreamURL, |
| 171 Client: gs, | 170 Client: gs, |
| 172 MaxBytes: byteLimit, | |
|
dnj
2016/10/19 23:18:59
Getting rid of this is no big deal, since it's enf
| |
| 173 }) | 171 }) |
| 174 if err != nil { | 172 if err != nil { |
| 175 log.WithError(err).Errorf(c, "Failed to create Google St orage storage instance.") | 173 log.WithError(err).Errorf(c, "Failed to create Google St orage storage instance.") |
| 176 return nil, err | 174 return nil, err |
| 177 } | 175 } |
| 178 } | 176 } |
| 179 defer st.Close() | 177 defer st.Close() |
| 180 | 178 |
| 181 project, path := coordinator.Project(c), ls.Path() | 179 project, path := coordinator.Project(c), ls.Path() |
| 182 | 180 |
| 183 » var fetchedLogs [][]byte | 181 » var fetchedLogs []*logpb.LogEntry |
| 184 var err error | 182 var err error |
| 185 if tail { | 183 if tail { |
| 186 fetchedLogs, err = getTail(c, st, project, path) | 184 fetchedLogs, err = getTail(c, st, project, path) |
| 187 } else { | 185 } else { |
| 188 fetchedLogs, err = getHead(c, req, st, project, path, byteLimit) | 186 fetchedLogs, err = getHead(c, req, st, project, path, byteLimit) |
| 189 } | 187 } |
| 190 if err != nil { | 188 if err != nil { |
| 191 log.WithError(err).Errorf(c, "Failed to fetch log records.") | 189 log.WithError(err).Errorf(c, "Failed to fetch log records.") |
| 192 return nil, err | 190 return nil, err |
| 193 } | 191 } |
| 194 | 192 |
| 195 » logEntries := make([]*logpb.LogEntry, len(fetchedLogs)) | 193 » return fetchedLogs, nil |
| 196 » for idx, ld := range fetchedLogs { | |
| 197 » » // Deserialize the log entry, then convert it to output value. | |
| 198 » » le := logpb.LogEntry{} | |
| 199 » » if err := proto.Unmarshal(ld, &le); err != nil { | |
| 200 » » » log.Fields{ | |
| 201 » » » » log.ErrorKey: err, | |
| 202 » » » » "index": idx, | |
| 203 » » » }.Errorf(c, "Failed to generate response log entry.") | |
| 204 » » » return nil, err | |
| 205 » » } | |
| 206 » » logEntries[idx] = &le | |
| 207 » } | |
| 208 » return logEntries, nil | |
| 209 } | 194 } |
| 210 | 195 |
| 211 func getHead(c context.Context, req *logdog.GetRequest, st storage.Storage, proj ect config.ProjectName, | 196 func getHead(c context.Context, req *logdog.GetRequest, st storage.Storage, proj ect config.ProjectName, |
| 212 » path types.StreamPath, byteLimit int) ([][]byte, error) { | 197 » path types.StreamPath, byteLimit int) ([]*logpb.LogEntry, error) { |
| 213 log.Fields{ | 198 log.Fields{ |
| 214 "project": project, | 199 "project": project, |
| 215 "path": path, | 200 "path": path, |
| 216 "index": req.Index, | 201 "index": req.Index, |
| 217 "count": req.LogCount, | 202 "count": req.LogCount, |
| 218 "bytes": req.ByteCount, | 203 "bytes": req.ByteCount, |
| 219 "noncontiguous": req.NonContiguous, | 204 "noncontiguous": req.NonContiguous, |
| 220 }.Debugf(c, "Issuing Get request.") | 205 }.Debugf(c, "Issuing Get request.") |
| 221 | 206 |
| 222 // Allocate result logs array. | 207 // Allocate result logs array. |
| 223 logCount := int(req.LogCount) | 208 logCount := int(req.LogCount) |
| 224 asz := getInitialArraySize | 209 asz := getInitialArraySize |
| 225 if logCount > 0 && logCount < asz { | 210 if logCount > 0 && logCount < asz { |
| 226 asz = logCount | 211 asz = logCount |
| 227 } | 212 } |
| 228 » logs := make([][]byte, 0, asz) | 213 » logs := make([]*logpb.LogEntry, 0, asz) |
| 229 | 214 |
| 230 sreq := storage.GetRequest{ | 215 sreq := storage.GetRequest{ |
| 231 Project: project, | 216 Project: project, |
| 232 Path: path, | 217 Path: path, |
| 233 Index: types.MessageIndex(req.Index), | 218 Index: types.MessageIndex(req.Index), |
| 234 Limit: logCount, | 219 Limit: logCount, |
| 235 } | 220 } |
| 236 | 221 |
| 237 count := 0 | 222 count := 0 |
| 223 var ierr error | |
| 238 err := retry.Retry(c, retry.TransientOnly(retry.Default), func() error { | 224 err := retry.Retry(c, retry.TransientOnly(retry.Default), func() error { |
| 239 // Issue the Get request. This may return a transient error, in which case | 225 // Issue the Get request. This may return a transient error, in which case |
| 240 // we will retry. | 226 // we will retry. |
| 241 » » return st.Get(sreq, func(idx types.MessageIndex, ld []byte) bool { | 227 » » return st.Get(sreq, func(e *storage.Entry) bool { |
| 242 » » » if count > 0 && byteLimit-len(ld) < 0 { | 228 » » » var le *logpb.LogEntry |
| 229 » » » if le, ierr = e.GetLogEntry(); ierr != nil { | |
| 230 » » » » return false | |
| 231 » » » } | |
| 232 | |
| 233 » » » if count > 0 && byteLimit-len(e.D) < 0 { | |
| 243 // Not the first log, and we've exceeded our byt e limit. | 234 // Not the first log, and we've exceeded our byt e limit. |
| 244 return false | 235 return false |
| 245 } | 236 } |
| 246 » » » byteLimit -= len(ld) | 237 » » » byteLimit -= len(e.D) |
|
nodir
2016/10/26 20:25:33
you are mutating byteLimit in a retry loop. I thin
| |
| 247 | 238 |
| 248 » » » if !(req.NonContiguous || idx == sreq.Index) { | 239 » » » sidx, _ := e.GetStreamIndex() // GetLogEntry succeeded, so this must. |
| 240 » » » if !(req.NonContiguous || sidx == sreq.Index) { | |
| 249 return false | 241 return false |
| 250 } | 242 } |
| 251 » » » logs = append(logs, ld) | 243 » » » logs = append(logs, le) |
| 252 » » » sreq.Index = idx + 1 | 244 » » » sreq.Index = sidx + 1 |
| 253 count++ | 245 count++ |
|
nodir
2016/10/26 20:25:33
same here
| |
| 254 return !(logCount > 0 && count >= logCount) | 246 return !(logCount > 0 && count >= logCount) |
| 255 }) | 247 }) |
| 256 }, func(err error, delay time.Duration) { | 248 }, func(err error, delay time.Duration) { |
| 257 log.Fields{ | 249 log.Fields{ |
| 258 log.ErrorKey: err, | 250 log.ErrorKey: err, |
| 259 "delay": delay, | 251 "delay": delay, |
| 260 "initialIndex": req.Index, | 252 "initialIndex": req.Index, |
| 261 "nextIndex": sreq.Index, | 253 "nextIndex": sreq.Index, |
| 262 "count": len(logs), | 254 "count": len(logs), |
| 263 }.Warningf(c, "Transient error while loading logs; retrying.") | 255 }.Warningf(c, "Transient error while loading logs; retrying.") |
| 264 }) | 256 }) |
| 265 switch err { | 257 switch err { |
| 266 case nil: | 258 case nil: |
| 259 if ierr != nil { | |
| 260 log.WithError(ierr).Errorf(c, "Bad log entry data.") | |
| 261 return nil, ierr | |
| 262 } | |
| 267 return logs, nil | 263 return logs, nil |
| 268 | 264 |
| 269 case storage.ErrDoesNotExist: | 265 case storage.ErrDoesNotExist: |
| 270 return nil, nil | 266 return nil, nil |
| 271 | 267 |
| 272 default: | 268 default: |
| 273 log.Fields{ | 269 log.Fields{ |
| 274 log.ErrorKey: err, | 270 log.ErrorKey: err, |
| 275 "initialIndex": req.Index, | 271 "initialIndex": req.Index, |
| 276 "nextIndex": sreq.Index, | 272 "nextIndex": sreq.Index, |
| 277 "count": len(logs), | 273 "count": len(logs), |
| 278 }.Errorf(c, "Failed to execute range request.") | 274 }.Errorf(c, "Failed to execute range request.") |
| 279 return nil, err | 275 return nil, err |
| 280 } | 276 } |
| 281 } | 277 } |
| 282 | 278 |
| 283 func getTail(c context.Context, st storage.Storage, project config.ProjectName, path types.StreamPath) ([][]byte, error) { | 279 func getTail(c context.Context, st storage.Storage, project config.ProjectName, path types.StreamPath) ( |
| 280 » []*logpb.LogEntry, error) { | |
| 284 log.Fields{ | 281 log.Fields{ |
| 285 "project": project, | 282 "project": project, |
| 286 "path": path, | 283 "path": path, |
| 287 }.Debugf(c, "Issuing Tail request.") | 284 }.Debugf(c, "Issuing Tail request.") |
| 288 | 285 |
| 289 » var data []byte | 286 » var e *storage.Entry |
| 290 err := retry.Retry(c, retry.TransientOnly(retry.Default), func() (err er ror) { | 287 err := retry.Retry(c, retry.TransientOnly(retry.Default), func() (err er ror) { |
| 291 » » data, _, err = st.Tail(project, path) | 288 » » e, err = st.Tail(project, path) |
| 292 return | 289 return |
| 293 }, func(err error, delay time.Duration) { | 290 }, func(err error, delay time.Duration) { |
| 294 log.Fields{ | 291 log.Fields{ |
| 295 log.ErrorKey: err, | 292 log.ErrorKey: err, |
| 296 "delay": delay, | 293 "delay": delay, |
| 297 }.Warningf(c, "Transient error while fetching tail log; retrying .") | 294 }.Warningf(c, "Transient error while fetching tail log; retrying .") |
| 298 }) | 295 }) |
| 299 switch err { | 296 switch err { |
| 300 case nil: | 297 case nil: |
| 301 » » return [][]byte{data}, nil | 298 » » le, err := e.GetLogEntry() |
| 299 » » if err != nil { | |
| 300 » » » log.WithError(err).Errorf(c, "Failed to load tail entry data.") | |
| 301 » » » return nil, err | |
| 302 » » } | |
| 303 » » return []*logpb.LogEntry{le}, nil | |
| 302 | 304 |
| 303 case storage.ErrDoesNotExist: | 305 case storage.ErrDoesNotExist: |
| 304 return nil, nil | 306 return nil, nil |
| 305 | 307 |
| 306 default: | 308 default: |
| 307 log.WithError(err).Errorf(c, "Failed to fetch tail log.") | 309 log.WithError(err).Errorf(c, "Failed to fetch tail log.") |
| 308 return nil, err | 310 return nil, err |
| 309 } | 311 } |
| 310 } | 312 } |
| OLD | NEW |