Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(783)

Side by Side Diff: appengine/logdog/coordinator/endpoints/logs/get.go

Issue 1672833003: LogDog: Add log rendering view. Base URL: https://github.com/luci/luci-go@master
Patch Set: Clean up, add tests, little reorg. Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package logs 5 package logs
6 6
7 import ( 7 import (
8 "net/url" 8 "net/url"
9 "time" 9 "time"
10 10
11 "github.com/golang/protobuf/proto" 11 "github.com/golang/protobuf/proto"
12 ds "github.com/luci/gae/service/datastore" 12 ds "github.com/luci/gae/service/datastore"
13 "github.com/luci/luci-go/appengine/logdog/coordinator" 13 "github.com/luci/luci-go/appengine/logdog/coordinator"
14 "github.com/luci/luci-go/appengine/logdog/coordinator/config" 14 "github.com/luci/luci-go/appengine/logdog/coordinator/config"
15 "github.com/luci/luci-go/common/api/logdog_coordinator/logs/v1" 15 "github.com/luci/luci-go/common/api/logdog_coordinator/logs/v1"
16 "github.com/luci/luci-go/common/grpcutil" 16 "github.com/luci/luci-go/common/grpcutil"
17 "github.com/luci/luci-go/common/logdog/types" 17 "github.com/luci/luci-go/common/logdog/types"
18 log "github.com/luci/luci-go/common/logging" 18 log "github.com/luci/luci-go/common/logging"
19 "github.com/luci/luci-go/common/proto/logdog/logpb" 19 "github.com/luci/luci-go/common/proto/logdog/logpb"
20 "github.com/luci/luci-go/common/retry" 20 "github.com/luci/luci-go/common/retry"
21 "github.com/luci/luci-go/server/logdog/storage" 21 "github.com/luci/luci-go/server/logdog/storage"
22 "github.com/luci/luci-go/server/logdog/storage/archive"
23 "golang.org/x/net/context" 22 "golang.org/x/net/context"
24 "google.golang.org/grpc/codes" 23 "google.golang.org/grpc/codes"
25 ) 24 )
26 25
27 const ( 26 const (
28 // getInitialArraySize is the initial amount of log slots to allocate fo r a 27 // getInitialArraySize is the initial amount of log slots to allocate fo r a
29 // Get request. 28 // Get request.
30 » getInitialArraySize = 256 29 » getInitialArraySize = int64(256)
31 30
32 // getBytesLimit is the maximum amount of data that we are willing to qu ery. 31 // getBytesLimit is the maximum amount of data that we are willing to qu ery.
33 // AppEngine limits our response size to 32MB. However, this limit appli es 32 // AppEngine limits our response size to 32MB. However, this limit appli es
34 // to the raw recovered LogEntry data, so we'll artificially constrain t his 33 // to the raw recovered LogEntry data, so we'll artificially constrain t his
35 // to 16MB so the additional JSON overhead doesn't kill it. 34 // to 16MB so the additional JSON overhead doesn't kill it.
36 getBytesLimit = 16 * 1024 * 1024 35 getBytesLimit = 16 * 1024 * 1024
37 ) 36 )
38 37
39 // Get returns state and log data for a single log stream. 38 // Get returns state and log data for a single log stream.
40 func (s *Server) Get(c context.Context, req *logs.GetRequest) (*logs.GetResponse , error) { 39 func (s *Server) Get(c context.Context, req *logs.GetRequest) (*logs.GetResponse , error) {
(...skipping 87 matching lines...) Expand 10 before | Expand all | Expand 10 after
128 } 127 }
129 128
130 log.Fields{ 129 log.Fields{
131 "logCount": len(resp.Logs), 130 "logCount": len(resp.Logs),
132 }.Debugf(c, "Get request completed successfully.") 131 }.Debugf(c, "Get request completed successfully.")
133 return &resp, nil 132 return &resp, nil
134 } 133 }
135 134
136 func (s *Server) getLogs(c context.Context, req *logs.GetRequest, tail bool, ls *coordinator.LogStream) ( 135 func (s *Server) getLogs(c context.Context, req *logs.GetRequest, tail bool, ls *coordinator.LogStream) (
137 []*logpb.LogEntry, error) { 136 []*logpb.LogEntry, error) {
138 » var st storage.Storage 137 » st, err := s.StorageForLogStream(c, ls)
139 » if !ls.Archived() { 138 » if err != nil {
140 » » log.Debugf(c, "Log is not archived. Fetching from intermediate s torage.") 139 » » return nil, err
141
142 » » // Logs are not archived. Fetch from intermediate storage.
143 » » var err error
144 » » st, err = s.Storage(c)
145 » » if err != nil {
146 » » » return nil, err
147 » » }
148 » } else {
149 » » log.Debugf(c, "Log is archived. Fetching from archive storage.")
150 » » var err error
151 » » gs, err := s.GSClient(c)
152 » » if err != nil {
153 » » » log.WithError(err).Errorf(c, "Failed to create Google St orage client.")
154 » » » return nil, err
155 » » }
156 » » defer func() {
157 » » » if err := gs.Close(); err != nil {
158 » » » » log.WithError(err).Warningf(c, "Failed to close Google Storage client.")
159 » » » }
160 » » }()
161
162 » » st, err = archive.New(c, archive.Options{
163 » » » IndexURL: ls.ArchiveIndexURL,
164 » » » StreamURL: ls.ArchiveStreamURL,
165 » » » Client: gs,
166 » » })
167 » » if err != nil {
168 » » » log.WithError(err).Errorf(c, "Failed to create Google St orage storage instance.")
169 » » » return nil, err
170 » » }
171 } 140 }
172 defer st.Close() 141 defer st.Close()
173 142
174 path := ls.Path() 143 path := ls.Path()
175 144
176 var fetchedLogs [][]byte 145 var fetchedLogs [][]byte
177 var err error
178 if tail { 146 if tail {
179 fetchedLogs, err = getTail(c, st, path) 147 fetchedLogs, err = getTail(c, st, path)
180 } else { 148 } else {
181 fetchedLogs, err = getHead(c, req, st, path) 149 fetchedLogs, err = getHead(c, req, st, path)
182 } 150 }
183 if err != nil { 151 if err != nil {
184 log.WithError(err).Errorf(c, "Failed to fetch log records.") 152 log.WithError(err).Errorf(c, "Failed to fetch log records.")
185 return nil, err 153 return nil, err
186 } 154 }
187 155
(...skipping 21 matching lines...) Expand all
209 "bytes": req.ByteCount, 177 "bytes": req.ByteCount,
210 "noncontiguous": req.NonContiguous, 178 "noncontiguous": req.NonContiguous,
211 }) 179 })
212 180
213 byteLimit := int(req.ByteCount) 181 byteLimit := int(req.ByteCount)
214 if byteLimit <= 0 || byteLimit > getBytesLimit { 182 if byteLimit <= 0 || byteLimit > getBytesLimit {
215 byteLimit = getBytesLimit 183 byteLimit = getBytesLimit
216 } 184 }
217 185
218 // Allocate result logs array. 186 // Allocate result logs array.
219 » logCount := int(req.LogCount) 187 » logCount := int64(req.LogCount)
220 asz := getInitialArraySize 188 asz := getInitialArraySize
221 if logCount > 0 && logCount < asz { 189 if logCount > 0 && logCount < asz {
222 asz = logCount 190 asz = logCount
223 } 191 }
224 logs := make([][]byte, 0, asz) 192 logs := make([][]byte, 0, asz)
225 193
226 sreq := storage.GetRequest{ 194 sreq := storage.GetRequest{
227 Path: p, 195 Path: p,
228 Index: types.MessageIndex(req.Index), 196 Index: types.MessageIndex(req.Index),
229 Limit: logCount, 197 Limit: logCount,
230 } 198 }
231 199
232 count := 0
233 err := retry.Retry(c, retry.TransientOnly(retry.Default), func() error { 200 err := retry.Retry(c, retry.TransientOnly(retry.Default), func() error {
234 // Issue the Get request. This may return a transient error, in which case 201 // Issue the Get request. This may return a transient error, in which case
235 // we will retry. 202 // we will retry.
236 return st.Get(&sreq, func(idx types.MessageIndex, ld []byte) boo l { 203 return st.Get(&sreq, func(idx types.MessageIndex, ld []byte) boo l {
237 » » » if count > 0 && byteLimit-len(ld) < 0 { 204 » » » if len(logs) > 0 && byteLimit-len(ld) < 0 {
238 // Not the first log, and we've exceeded our byt e limit. 205 // Not the first log, and we've exceeded our byt e limit.
239 return false 206 return false
240 } 207 }
241 byteLimit -= len(ld) 208 byteLimit -= len(ld)
242 209
243 if !(req.NonContiguous || idx == sreq.Index) { 210 if !(req.NonContiguous || idx == sreq.Index) {
244 return false 211 return false
245 } 212 }
246 logs = append(logs, ld) 213 logs = append(logs, ld)
247 sreq.Index = idx + 1 214 sreq.Index = idx + 1
248 » » » count++ 215 » » » return !(logCount > 0 && int64(len(logs)) >= logCount)
249 » » » return !(logCount > 0 && count >= logCount)
250 }) 216 })
251 }, func(err error, delay time.Duration) { 217 }, func(err error, delay time.Duration) {
252 log.Fields{ 218 log.Fields{
253 log.ErrorKey: err, 219 log.ErrorKey: err,
254 "delay": delay, 220 "delay": delay,
255 "initialIndex": req.Index, 221 "initialIndex": req.Index,
256 "nextIndex": sreq.Index, 222 "nextIndex": sreq.Index,
257 "count": len(logs), 223 "count": len(logs),
258 }.Warningf(c, "Transient error while loading logs; retrying.") 224 }.Warningf(c, "Transient error while loading logs; retrying.")
259 }) 225 })
(...skipping 20 matching lines...) Expand all
280 log.ErrorKey: err, 246 log.ErrorKey: err,
281 "delay": delay, 247 "delay": delay,
282 }.Warningf(c, "Transient error while fetching tail log; retrying .") 248 }.Warningf(c, "Transient error while fetching tail log; retrying .")
283 }) 249 })
284 if err != nil { 250 if err != nil {
285 log.WithError(err).Errorf(c, "Failed to fetch tail log.") 251 log.WithError(err).Errorf(c, "Failed to fetch tail log.")
286 return nil, err 252 return nil, err
287 } 253 }
288 return [][]byte{data}, err 254 return [][]byte{data}, err
289 } 255 }
OLDNEW
« no previous file with comments | « appengine/logdog/coordinator/backend/storageCleanup_test.go ('k') | appengine/logdog/coordinator/endpoints/logs/get_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698