Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(47)

Side by Side Diff: logdog/appengine/coordinator/endpoints/logs/get.go

Issue 2435883002: LogDog: Fix archival Get/Tail implementations. (Closed)
Patch Set: LogDog: Fix archival Get/Tail implementations. Created 4 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The LUCI Authors. All rights reserved. 1 // Copyright 2015 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0 2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file. 3 // that can be found in the LICENSE file.
4 4
5 package logs 5 package logs
6 6
7 import ( 7 import (
8 "time" 8 "time"
9 9
10 "github.com/golang/protobuf/proto"
11 ds "github.com/luci/gae/service/datastore" 10 ds "github.com/luci/gae/service/datastore"
12 "github.com/luci/luci-go/common/config" 11 "github.com/luci/luci-go/common/config"
13 log "github.com/luci/luci-go/common/logging" 12 log "github.com/luci/luci-go/common/logging"
14 "github.com/luci/luci-go/common/retry" 13 "github.com/luci/luci-go/common/retry"
15 "github.com/luci/luci-go/grpc/grpcutil" 14 "github.com/luci/luci-go/grpc/grpcutil"
16 "github.com/luci/luci-go/logdog/api/endpoints/coordinator/logs/v1" 15 "github.com/luci/luci-go/logdog/api/endpoints/coordinator/logs/v1"
17 "github.com/luci/luci-go/logdog/api/logpb" 16 "github.com/luci/luci-go/logdog/api/logpb"
18 "github.com/luci/luci-go/logdog/appengine/coordinator" 17 "github.com/luci/luci-go/logdog/appengine/coordinator"
19 "github.com/luci/luci-go/logdog/common/storage" 18 "github.com/luci/luci-go/logdog/common/storage"
20 "github.com/luci/luci-go/logdog/common/storage/archive" 19 "github.com/luci/luci-go/logdog/common/storage/archive"
(...skipping 141 matching lines...) Expand 10 before | Expand all | Expand 10 after
162 defer func() { 161 defer func() {
163 if err := gs.Close(); err != nil { 162 if err := gs.Close(); err != nil {
164 log.WithError(err).Warningf(c, "Failed to close Google Storage client.") 163 log.WithError(err).Warningf(c, "Failed to close Google Storage client.")
165 } 164 }
166 }() 165 }()
167 166
168 st, err = archive.New(c, archive.Options{ 167 st, err = archive.New(c, archive.Options{
169 IndexURL: lst.ArchiveIndexURL, 168 IndexURL: lst.ArchiveIndexURL,
170 StreamURL: lst.ArchiveStreamURL, 169 StreamURL: lst.ArchiveStreamURL,
171 Client: gs, 170 Client: gs,
172 MaxBytes: byteLimit,
dnj 2016/10/19 23:18:59 Getting rid of this is no big deal, since it's enf
173 }) 171 })
174 if err != nil { 172 if err != nil {
175 log.WithError(err).Errorf(c, "Failed to create Google St orage storage instance.") 173 log.WithError(err).Errorf(c, "Failed to create Google St orage storage instance.")
176 return nil, err 174 return nil, err
177 } 175 }
178 } 176 }
179 defer st.Close() 177 defer st.Close()
180 178
181 project, path := coordinator.Project(c), ls.Path() 179 project, path := coordinator.Project(c), ls.Path()
182 180
183 » var fetchedLogs [][]byte 181 » var fetchedLogs []*logpb.LogEntry
184 var err error 182 var err error
185 if tail { 183 if tail {
186 fetchedLogs, err = getTail(c, st, project, path) 184 fetchedLogs, err = getTail(c, st, project, path)
187 } else { 185 } else {
188 fetchedLogs, err = getHead(c, req, st, project, path, byteLimit) 186 fetchedLogs, err = getHead(c, req, st, project, path, byteLimit)
189 } 187 }
190 if err != nil { 188 if err != nil {
191 log.WithError(err).Errorf(c, "Failed to fetch log records.") 189 log.WithError(err).Errorf(c, "Failed to fetch log records.")
192 return nil, err 190 return nil, err
193 } 191 }
194 192
195 » logEntries := make([]*logpb.LogEntry, len(fetchedLogs)) 193 » return fetchedLogs, nil
196 » for idx, ld := range fetchedLogs {
197 » » // Deserialize the log entry, then convert it to output value.
198 » » le := logpb.LogEntry{}
199 » » if err := proto.Unmarshal(ld, &le); err != nil {
200 » » » log.Fields{
201 » » » » log.ErrorKey: err,
202 » » » » "index": idx,
203 » » » }.Errorf(c, "Failed to generate response log entry.")
204 » » » return nil, err
205 » » }
206 » » logEntries[idx] = &le
207 » }
208 » return logEntries, nil
209 } 194 }
210 195
211 func getHead(c context.Context, req *logdog.GetRequest, st storage.Storage, proj ect config.ProjectName, 196 func getHead(c context.Context, req *logdog.GetRequest, st storage.Storage, proj ect config.ProjectName,
212 » path types.StreamPath, byteLimit int) ([][]byte, error) { 197 » path types.StreamPath, byteLimit int) ([]*logpb.LogEntry, error) {
213 log.Fields{ 198 log.Fields{
214 "project": project, 199 "project": project,
215 "path": path, 200 "path": path,
216 "index": req.Index, 201 "index": req.Index,
217 "count": req.LogCount, 202 "count": req.LogCount,
218 "bytes": req.ByteCount, 203 "bytes": req.ByteCount,
219 "noncontiguous": req.NonContiguous, 204 "noncontiguous": req.NonContiguous,
220 }.Debugf(c, "Issuing Get request.") 205 }.Debugf(c, "Issuing Get request.")
221 206
222 // Allocate result logs array. 207 // Allocate result logs array.
223 logCount := int(req.LogCount) 208 logCount := int(req.LogCount)
224 asz := getInitialArraySize 209 asz := getInitialArraySize
225 if logCount > 0 && logCount < asz { 210 if logCount > 0 && logCount < asz {
226 asz = logCount 211 asz = logCount
227 } 212 }
228 » logs := make([][]byte, 0, asz) 213 » logs := make([]*logpb.LogEntry, 0, asz)
229 214
230 sreq := storage.GetRequest{ 215 sreq := storage.GetRequest{
231 Project: project, 216 Project: project,
232 Path: path, 217 Path: path,
233 Index: types.MessageIndex(req.Index), 218 Index: types.MessageIndex(req.Index),
234 Limit: logCount, 219 Limit: logCount,
235 } 220 }
236 221
237 count := 0 222 count := 0
223 var ierr error
238 err := retry.Retry(c, retry.TransientOnly(retry.Default), func() error { 224 err := retry.Retry(c, retry.TransientOnly(retry.Default), func() error {
239 // Issue the Get request. This may return a transient error, in which case 225 // Issue the Get request. This may return a transient error, in which case
240 // we will retry. 226 // we will retry.
241 » » return st.Get(sreq, func(idx types.MessageIndex, ld []byte) bool { 227 » » return st.Get(sreq, func(e *storage.Entry) bool {
242 » » » if count > 0 && byteLimit-len(ld) < 0 { 228 » » » var le *logpb.LogEntry
229 » » » if le, ierr = e.GetLogEntry(); ierr != nil {
230 » » » » return false
231 » » » }
232
233 » » » if count > 0 && byteLimit-len(e.D) < 0 {
243 // Not the first log, and we've exceeded our byt e limit. 234 // Not the first log, and we've exceeded our byt e limit.
244 return false 235 return false
245 } 236 }
246 » » » byteLimit -= len(ld) 237 » » » byteLimit -= len(e.D)
nodir 2016/10/26 20:25:33 you are mutating byteLimit in a retry loop. I thin
247 238
248 » » » if !(req.NonContiguous || idx == sreq.Index) { 239 » » » sidx, _ := e.GetStreamIndex() // GetLogEntry succeeded, so this must.
240 » » » if !(req.NonContiguous || sidx == sreq.Index) {
249 return false 241 return false
250 } 242 }
251 » » » logs = append(logs, ld) 243 » » » logs = append(logs, le)
252 » » » sreq.Index = idx + 1 244 » » » sreq.Index = sidx + 1
253 count++ 245 count++
nodir 2016/10/26 20:25:33 same here
254 return !(logCount > 0 && count >= logCount) 246 return !(logCount > 0 && count >= logCount)
255 }) 247 })
256 }, func(err error, delay time.Duration) { 248 }, func(err error, delay time.Duration) {
257 log.Fields{ 249 log.Fields{
258 log.ErrorKey: err, 250 log.ErrorKey: err,
259 "delay": delay, 251 "delay": delay,
260 "initialIndex": req.Index, 252 "initialIndex": req.Index,
261 "nextIndex": sreq.Index, 253 "nextIndex": sreq.Index,
262 "count": len(logs), 254 "count": len(logs),
263 }.Warningf(c, "Transient error while loading logs; retrying.") 255 }.Warningf(c, "Transient error while loading logs; retrying.")
264 }) 256 })
265 switch err { 257 switch err {
266 case nil: 258 case nil:
259 if ierr != nil {
260 log.WithError(ierr).Errorf(c, "Bad log entry data.")
261 return nil, ierr
262 }
267 return logs, nil 263 return logs, nil
268 264
269 case storage.ErrDoesNotExist: 265 case storage.ErrDoesNotExist:
270 return nil, nil 266 return nil, nil
271 267
272 default: 268 default:
273 log.Fields{ 269 log.Fields{
274 log.ErrorKey: err, 270 log.ErrorKey: err,
275 "initialIndex": req.Index, 271 "initialIndex": req.Index,
276 "nextIndex": sreq.Index, 272 "nextIndex": sreq.Index,
277 "count": len(logs), 273 "count": len(logs),
278 }.Errorf(c, "Failed to execute range request.") 274 }.Errorf(c, "Failed to execute range request.")
279 return nil, err 275 return nil, err
280 } 276 }
281 } 277 }
282 278
283 func getTail(c context.Context, st storage.Storage, project config.ProjectName, path types.StreamPath) ([][]byte, error) { 279 func getTail(c context.Context, st storage.Storage, project config.ProjectName, path types.StreamPath) (
280 » []*logpb.LogEntry, error) {
284 log.Fields{ 281 log.Fields{
285 "project": project, 282 "project": project,
286 "path": path, 283 "path": path,
287 }.Debugf(c, "Issuing Tail request.") 284 }.Debugf(c, "Issuing Tail request.")
288 285
289 » var data []byte 286 » var e *storage.Entry
290 err := retry.Retry(c, retry.TransientOnly(retry.Default), func() (err er ror) { 287 err := retry.Retry(c, retry.TransientOnly(retry.Default), func() (err er ror) {
291 » » data, _, err = st.Tail(project, path) 288 » » e, err = st.Tail(project, path)
292 return 289 return
293 }, func(err error, delay time.Duration) { 290 }, func(err error, delay time.Duration) {
294 log.Fields{ 291 log.Fields{
295 log.ErrorKey: err, 292 log.ErrorKey: err,
296 "delay": delay, 293 "delay": delay,
297 }.Warningf(c, "Transient error while fetching tail log; retrying .") 294 }.Warningf(c, "Transient error while fetching tail log; retrying .")
298 }) 295 })
299 switch err { 296 switch err {
300 case nil: 297 case nil:
301 » » return [][]byte{data}, nil 298 » » le, err := e.GetLogEntry()
299 » » if err != nil {
300 » » » log.WithError(err).Errorf(c, "Failed to load tail entry data.")
301 » » » return nil, err
302 » » }
303 » » return []*logpb.LogEntry{le}, nil
302 304
303 case storage.ErrDoesNotExist: 305 case storage.ErrDoesNotExist:
304 return nil, nil 306 return nil, nil
305 307
306 default: 308 default:
307 log.WithError(err).Errorf(c, "Failed to fetch tail log.") 309 log.WithError(err).Errorf(c, "Failed to fetch tail log.")
308 return nil, err 310 return nil, err
309 } 311 }
310 } 312 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698