Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(45)

Side by Side Diff: appengine/logdog/coordinator/endpoints/logs/get.go

Issue 1904503003: LogDog: Fix archived log stream read errors. (Closed) Base URL: https://github.com/luci/luci-go@hierarchy-check-first
Patch Set: Delete "offset()" method. Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « no previous file | appengine/logdog/coordinator/endpoints/logs/get_test.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package logs 5 package logs
6 6
7 import ( 7 import (
8 "net/url" 8 "net/url"
9 "time" 9 "time"
10 10
(...skipping 11 matching lines...) Expand all
22 "golang.org/x/net/context" 22 "golang.org/x/net/context"
23 "google.golang.org/grpc/codes" 23 "google.golang.org/grpc/codes"
24 ) 24 )
25 25
26 const ( 26 const (
27 // getInitialArraySize is the initial amount of log slots to allocate fo r a 27 // getInitialArraySize is the initial amount of log slots to allocate fo r a
28 // Get request. 28 // Get request.
29 getInitialArraySize = 256 29 getInitialArraySize = 256
30 30
31 // getBytesLimit is the maximum amount of data that we are willing to qu ery. 31 // getBytesLimit is the maximum amount of data that we are willing to qu ery.
32 » // AppEngine limits our response size to 32MB. However, this limit appli es 32 » //
33 » // to the raw recovered LogEntry data, so we'll artificially constrain t his 33 » // We will limit byte responses to 16MB, based on the following constrai nts:
34 » // to 16MB so the additional JSON overhead doesn't kill it. 34 » //» - AppEngine cannot respond with more than 32MB of data. This inc ludes JSON
35 » //» overhead, including notation and base64 data expansion.
36 » //» - `urlfetch`, which is used for Google Cloud Storage (archival) responses,
37 » //» cannot handle responses larger than 32MB.
35 getBytesLimit = 16 * 1024 * 1024 38 getBytesLimit = 16 * 1024 * 1024
36 ) 39 )
37 40
38 // Get returns state and log data for a single log stream. 41 // Get returns state and log data for a single log stream.
39 func (s *Server) Get(c context.Context, req *logdog.GetRequest) (*logdog.GetResp onse, error) { 42 func (s *Server) Get(c context.Context, req *logdog.GetRequest) (*logdog.GetResp onse, error) {
40 return s.getImpl(c, req, false) 43 return s.getImpl(c, req, false)
41 } 44 }
42 45
43 // Tail returns the last log entry for a given log stream. 46 // Tail returns the last log entry for a given log stream.
44 func (s *Server) Tail(c context.Context, req *logdog.TailRequest) (*logdog.GetRe sponse, error) { 47 func (s *Server) Tail(c context.Context, req *logdog.TailRequest) (*logdog.GetRe sponse, error) {
45 r := logdog.GetRequest{ 48 r := logdog.GetRequest{
46 Path: req.Path, 49 Path: req.Path,
47 State: req.State, 50 State: req.State,
48 } 51 }
49 return s.getImpl(c, &r, true) 52 return s.getImpl(c, &r, true)
50 } 53 }
51 54
52 // getImpl is common code shared between Get and Tail endpoints. 55 // getImpl is common code shared between Get and Tail endpoints.
53 func (s *Server) getImpl(c context.Context, req *logdog.GetRequest, tail bool) ( *logdog.GetResponse, error) { 56 func (s *Server) getImpl(c context.Context, req *logdog.GetRequest, tail bool) ( *logdog.GetResponse, error) {
54 svc := s.GetServices() 57 svc := s.GetServices()
58 log.Fields{
59 "path": req.Path,
60 "index": req.Index,
61 "tail": tail,
62 }.Debugf(c, "Received get request.")
55 63
56 // Fetch the log stream state for this log stream. 64 // Fetch the log stream state for this log stream.
57 u, err := url.Parse(req.Path) 65 u, err := url.Parse(req.Path)
58 if err != nil { 66 if err != nil {
59 log.Fields{ 67 log.Fields{
60 log.ErrorKey: err, 68 log.ErrorKey: err,
61 "path": req.Path, 69 "path": req.Path,
62 }.Errorf(c, "Could not parse path URL.") 70 }.Errorf(c, "Could not parse path URL.")
63 return nil, grpcutil.Errf(codes.InvalidArgument, "invalid path e ncoding") 71 return nil, grpcutil.Errf(codes.InvalidArgument, "invalid path e ncoding")
64 } 72 }
(...skipping 65 matching lines...) Expand 10 before | Expand all | Expand 10 after
130 138
131 log.Fields{ 139 log.Fields{
132 "logCount": len(resp.Logs), 140 "logCount": len(resp.Logs),
133 }.Debugf(c, "Get request completed successfully.") 141 }.Debugf(c, "Get request completed successfully.")
134 return &resp, nil 142 return &resp, nil
135 } 143 }
136 144
137 func (s *Server) getLogs(c context.Context, svc coordinator.Services, req *logdo g.GetRequest, tail bool, 145 func (s *Server) getLogs(c context.Context, svc coordinator.Services, req *logdo g.GetRequest, tail bool,
138 ls *coordinator.LogStream) ( 146 ls *coordinator.LogStream) (
139 []*logpb.LogEntry, error) { 147 []*logpb.LogEntry, error) {
148 byteLimit := int(req.ByteCount)
149 if byteLimit <= 0 || byteLimit > getBytesLimit {
150 byteLimit = getBytesLimit
151 }
152
140 var st storage.Storage 153 var st storage.Storage
141 if !ls.Archived() { 154 if !ls.Archived() {
142 log.Debugf(c, "Log is not archived. Fetching from intermediate s torage.") 155 log.Debugf(c, "Log is not archived. Fetching from intermediate s torage.")
143 156
144 // Logs are not archived. Fetch from intermediate storage. 157 // Logs are not archived. Fetch from intermediate storage.
145 var err error 158 var err error
146 st, err = svc.IntermediateStorage(c) 159 st, err = svc.IntermediateStorage(c)
147 if err != nil { 160 if err != nil {
148 return nil, err 161 return nil, err
149 } 162 }
150 } else { 163 } else {
151 » » log.Debugf(c, "Log is archived. Fetching from archive storage.") 164 » » log.Fields{
165 » » » "indexURL": ls.ArchiveIndexURL,
166 » » » "streamURL": ls.ArchiveStreamURL,
167 » » » "archiveTime": ls.ArchivedTime,
168 » » }.Debugf(c, "Log is archived. Fetching from archive storage.")
169
152 var err error 170 var err error
153 gs, err := svc.GSClient(c) 171 gs, err := svc.GSClient(c)
154 if err != nil { 172 if err != nil {
155 log.WithError(err).Errorf(c, "Failed to create Google St orage client.") 173 log.WithError(err).Errorf(c, "Failed to create Google St orage client.")
156 return nil, err 174 return nil, err
157 } 175 }
158 defer func() { 176 defer func() {
159 if err := gs.Close(); err != nil { 177 if err := gs.Close(); err != nil {
160 log.WithError(err).Warningf(c, "Failed to close Google Storage client.") 178 log.WithError(err).Warningf(c, "Failed to close Google Storage client.")
161 } 179 }
162 }() 180 }()
163 181
164 st, err = archive.New(c, archive.Options{ 182 st, err = archive.New(c, archive.Options{
165 IndexURL: ls.ArchiveIndexURL, 183 IndexURL: ls.ArchiveIndexURL,
166 StreamURL: ls.ArchiveStreamURL, 184 StreamURL: ls.ArchiveStreamURL,
167 Client: gs, 185 Client: gs,
186 MaxBytes: byteLimit,
168 }) 187 })
169 if err != nil { 188 if err != nil {
170 log.WithError(err).Errorf(c, "Failed to create Google St orage storage instance.") 189 log.WithError(err).Errorf(c, "Failed to create Google St orage storage instance.")
171 return nil, err 190 return nil, err
172 } 191 }
173 } 192 }
174 defer st.Close() 193 defer st.Close()
175 194
176 path := ls.Path() 195 path := ls.Path()
177 196
178 var fetchedLogs [][]byte 197 var fetchedLogs [][]byte
179 var err error 198 var err error
180 if tail { 199 if tail {
181 fetchedLogs, err = getTail(c, st, path) 200 fetchedLogs, err = getTail(c, st, path)
182 } else { 201 } else {
183 » » fetchedLogs, err = getHead(c, req, st, path) 202 » » fetchedLogs, err = getHead(c, req, st, path, byteLimit)
184 } 203 }
185 if err != nil { 204 if err != nil {
186 log.WithError(err).Errorf(c, "Failed to fetch log records.") 205 log.WithError(err).Errorf(c, "Failed to fetch log records.")
187 return nil, err 206 return nil, err
188 } 207 }
189 208
190 logEntries := make([]*logpb.LogEntry, len(fetchedLogs)) 209 logEntries := make([]*logpb.LogEntry, len(fetchedLogs))
191 for idx, ld := range fetchedLogs { 210 for idx, ld := range fetchedLogs {
192 // Deserialize the log entry, then convert it to output value. 211 // Deserialize the log entry, then convert it to output value.
193 le := logpb.LogEntry{} 212 le := logpb.LogEntry{}
194 if err := proto.Unmarshal(ld, &le); err != nil { 213 if err := proto.Unmarshal(ld, &le); err != nil {
195 log.Fields{ 214 log.Fields{
196 log.ErrorKey: err, 215 log.ErrorKey: err,
197 "index": idx, 216 "index": idx,
198 }.Errorf(c, "Failed to generate response log entry.") 217 }.Errorf(c, "Failed to generate response log entry.")
199 return nil, err 218 return nil, err
200 } 219 }
201 logEntries[idx] = &le 220 logEntries[idx] = &le
202 } 221 }
203 return logEntries, nil 222 return logEntries, nil
204 } 223 }
205 224
206 func getHead(c context.Context, req *logdog.GetRequest, st storage.Storage, p ty pes.StreamPath) ([][]byte, error) { 225 func getHead(c context.Context, req *logdog.GetRequest, st storage.Storage, p ty pes.StreamPath, byteLimit int) (
226 » [][]byte, error) {
207 c = log.SetFields(c, log.Fields{ 227 c = log.SetFields(c, log.Fields{
208 "path": p, 228 "path": p,
209 "index": req.Index, 229 "index": req.Index,
210 "count": req.LogCount, 230 "count": req.LogCount,
211 "bytes": req.ByteCount, 231 "bytes": req.ByteCount,
212 "noncontiguous": req.NonContiguous, 232 "noncontiguous": req.NonContiguous,
213 }) 233 })
214 234
215 byteLimit := int(req.ByteCount)
216 if byteLimit <= 0 || byteLimit > getBytesLimit {
217 byteLimit = getBytesLimit
218 }
219
220 // Allocate result logs array. 235 // Allocate result logs array.
221 logCount := int(req.LogCount) 236 logCount := int(req.LogCount)
222 asz := getInitialArraySize 237 asz := getInitialArraySize
223 if logCount > 0 && logCount < asz { 238 if logCount > 0 && logCount < asz {
224 asz = logCount 239 asz = logCount
225 } 240 }
226 logs := make([][]byte, 0, asz) 241 logs := make([][]byte, 0, asz)
227 242
228 sreq := storage.GetRequest{ 243 sreq := storage.GetRequest{
229 Path: p, 244 Path: p,
(...skipping 52 matching lines...) Expand 10 before | Expand all | Expand 10 after
282 log.ErrorKey: err, 297 log.ErrorKey: err,
283 "delay": delay, 298 "delay": delay,
284 }.Warningf(c, "Transient error while fetching tail log; retrying .") 299 }.Warningf(c, "Transient error while fetching tail log; retrying .")
285 }) 300 })
286 if err != nil { 301 if err != nil {
287 log.WithError(err).Errorf(c, "Failed to fetch tail log.") 302 log.WithError(err).Errorf(c, "Failed to fetch tail log.")
288 return nil, err 303 return nil, err
289 } 304 }
290 return [][]byte{data}, err 305 return [][]byte{data}, err
291 } 306 }
OLDNEW
« no previous file with comments | « no previous file | appengine/logdog/coordinator/endpoints/logs/get_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698