Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(159)

Side by Side Diff: logdog/appengine/coordinator/endpoints/logs/get.go

Issue 2435113002: LogDog: Add Storage-layer data caching. (Closed)
Patch Set: Fix byteLimit bug. Created 4 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The LUCI Authors. All rights reserved. 1 // Copyright 2015 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0 2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file. 3 // that can be found in the LICENSE file.
4 4
5 package logs 5 package logs
6 6
7 import ( 7 import (
8 "time" 8 "time"
9 9
10 ds "github.com/luci/gae/service/datastore" 10 ds "github.com/luci/gae/service/datastore"
(...skipping 150 matching lines...) Expand 10 before | Expand all | Expand 10 after
161 defer func() { 161 defer func() {
162 if err := gs.Close(); err != nil { 162 if err := gs.Close(); err != nil {
163 log.WithError(err).Warningf(c, "Failed to close Google Storage client.") 163 log.WithError(err).Warningf(c, "Failed to close Google Storage client.")
164 } 164 }
165 }() 165 }()
166 166
167 st, err = archive.New(c, archive.Options{ 167 st, err = archive.New(c, archive.Options{
168 IndexURL: lst.ArchiveIndexURL, 168 IndexURL: lst.ArchiveIndexURL,
169 StreamURL: lst.ArchiveStreamURL, 169 StreamURL: lst.ArchiveStreamURL,
170 Client: gs, 170 Client: gs,
171 Cache: svc.StorageCache(),
171 }) 172 })
172 if err != nil { 173 if err != nil {
173 log.WithError(err).Errorf(c, "Failed to create Google St orage storage instance.") 174 log.WithError(err).Errorf(c, "Failed to create Google St orage storage instance.")
174 return nil, err 175 return nil, err
175 } 176 }
176 } 177 }
177 defer st.Close() 178 defer st.Close()
178 179
179 project, path := coordinator.Project(c), ls.Path() 180 project, path := coordinator.Project(c), ls.Path()
180 181
(...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after
212 } 213 }
213 logs := make([]*logpb.LogEntry, 0, asz) 214 logs := make([]*logpb.LogEntry, 0, asz)
214 215
215 sreq := storage.GetRequest{ 216 sreq := storage.GetRequest{
216 Project: project, 217 Project: project,
217 Path: path, 218 Path: path,
218 Index: types.MessageIndex(req.Index), 219 Index: types.MessageIndex(req.Index),
219 Limit: logCount, 220 Limit: logCount,
220 } 221 }
221 222
223 var ierr error
222 count := 0 224 count := 0
223 var ierr error
224 err := retry.Retry(c, retry.TransientOnly(retry.Default), func() error { 225 err := retry.Retry(c, retry.TransientOnly(retry.Default), func() error {
225 // Issue the Get request. This may return a transient error, in which case 226 // Issue the Get request. This may return a transient error, in which case
226 // we will retry. 227 // we will retry.
227 return st.Get(sreq, func(e *storage.Entry) bool { 228 return st.Get(sreq, func(e *storage.Entry) bool {
228 var le *logpb.LogEntry 229 var le *logpb.LogEntry
229 if le, ierr = e.GetLogEntry(); ierr != nil { 230 if le, ierr = e.GetLogEntry(); ierr != nil {
230 return false 231 return false
231 } 232 }
232 233
233 if count > 0 && byteLimit-len(e.D) < 0 { 234 if count > 0 && byteLimit-len(e.D) < 0 {
234 // Not the first log, and we've exceeded our byt e limit. 235 // Not the first log, and we've exceeded our byt e limit.
235 return false 236 return false
236 } 237 }
237 byteLimit -= len(e.D)
238 238
239 sidx, _ := e.GetStreamIndex() // GetLogEntry succeeded, so this must. 239 sidx, _ := e.GetStreamIndex() // GetLogEntry succeeded, so this must.
240 if !(req.NonContiguous || sidx == sreq.Index) { 240 if !(req.NonContiguous || sidx == sreq.Index) {
241 return false 241 return false
242 } 242 }
243
243 logs = append(logs, le) 244 logs = append(logs, le)
244 sreq.Index = sidx + 1 245 sreq.Index = sidx + 1
246 byteLimit -= len(e.D)
245 count++ 247 count++
246 return !(logCount > 0 && count >= logCount) 248 return !(logCount > 0 && count >= logCount)
247 }) 249 })
248 }, func(err error, delay time.Duration) { 250 }, func(err error, delay time.Duration) {
249 log.Fields{ 251 log.Fields{
250 log.ErrorKey: err, 252 log.ErrorKey: err,
251 "delay": delay, 253 "delay": delay,
252 "initialIndex": req.Index, 254 "initialIndex": req.Index,
253 "nextIndex": sreq.Index, 255 "nextIndex": sreq.Index,
254 "count": len(logs), 256 "count": len(logs),
(...skipping 48 matching lines...) Expand 10 before | Expand all | Expand 10 after
303 return []*logpb.LogEntry{le}, nil 305 return []*logpb.LogEntry{le}, nil
304 306
305 case storage.ErrDoesNotExist: 307 case storage.ErrDoesNotExist:
306 return nil, nil 308 return nil, nil
307 309
308 default: 310 default:
309 log.WithError(err).Errorf(c, "Failed to fetch tail log.") 311 log.WithError(err).Errorf(c, "Failed to fetch tail log.")
310 return nil, err 312 return nil, err
311 } 313 }
312 } 314 }
OLDNEW
« no previous file with comments | « logdog/appengine/coordinator/coordinatorTest/storage_cache.go ('k') | logdog/appengine/coordinator/endpoints/logs/get_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698