| OLD | NEW |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package logs | 5 package logs |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "net/url" | 8 "net/url" |
| 9 "time" | 9 "time" |
| 10 | 10 |
| (...skipping 11 matching lines...) Expand all Loading... |
| 22 "golang.org/x/net/context" | 22 "golang.org/x/net/context" |
| 23 "google.golang.org/grpc/codes" | 23 "google.golang.org/grpc/codes" |
| 24 ) | 24 ) |
| 25 | 25 |
| 26 const ( | 26 const ( |
| 27 // getInitialArraySize is the initial amount of log slots to allocate fo
r a | 27 // getInitialArraySize is the initial amount of log slots to allocate fo
r a |
| 28 // Get request. | 28 // Get request. |
| 29 getInitialArraySize = 256 | 29 getInitialArraySize = 256 |
| 30 | 30 |
| 31 // getBytesLimit is the maximum amount of data that we are willing to qu
ery. | 31 // getBytesLimit is the maximum amount of data that we are willing to qu
ery. |
| 32 » // AppEngine limits our response size to 32MB. However, this limit appli
es | 32 » // |
| 33 » // to the raw recovered LogEntry data, so we'll artificially constrain t
his | 33 » // We will limit byte responses to 16MB, based on the following constrai
nts: |
| 34 » // to 16MB so the additional JSON overhead doesn't kill it. | 34 » //» - AppEngine cannot respond with more than 32MB of data. This inc
ludes JSON |
| 35 » //» overhead, including notation and base64 data expansion. |
| 36 » //» - `urlfetch`, which is used for Google Cloud Storage (archival)
responses, |
| 37 » //» cannot handle responses larger than 32MB. |
| 35 getBytesLimit = 16 * 1024 * 1024 | 38 getBytesLimit = 16 * 1024 * 1024 |
| 36 ) | 39 ) |
| 37 | 40 |
| 38 // Get returns state and log data for a single log stream. | 41 // Get returns state and log data for a single log stream. |
| 39 func (s *Server) Get(c context.Context, req *logdog.GetRequest) (*logdog.GetResp
onse, error) { | 42 func (s *Server) Get(c context.Context, req *logdog.GetRequest) (*logdog.GetResp
onse, error) { |
| 40 return s.getImpl(c, req, false) | 43 return s.getImpl(c, req, false) |
| 41 } | 44 } |
| 42 | 45 |
| 43 // Tail returns the last log entry for a given log stream. | 46 // Tail returns the last log entry for a given log stream. |
| 44 func (s *Server) Tail(c context.Context, req *logdog.TailRequest) (*logdog.GetRe
sponse, error) { | 47 func (s *Server) Tail(c context.Context, req *logdog.TailRequest) (*logdog.GetRe
sponse, error) { |
| 45 r := logdog.GetRequest{ | 48 r := logdog.GetRequest{ |
| 46 Path: req.Path, | 49 Path: req.Path, |
| 47 State: req.State, | 50 State: req.State, |
| 48 } | 51 } |
| 49 return s.getImpl(c, &r, true) | 52 return s.getImpl(c, &r, true) |
| 50 } | 53 } |
| 51 | 54 |
| 52 // getImpl is common code shared between Get and Tail endpoints. | 55 // getImpl is common code shared between Get and Tail endpoints. |
| 53 func (s *Server) getImpl(c context.Context, req *logdog.GetRequest, tail bool) (
*logdog.GetResponse, error) { | 56 func (s *Server) getImpl(c context.Context, req *logdog.GetRequest, tail bool) (
*logdog.GetResponse, error) { |
| 54 svc := s.GetServices() | 57 svc := s.GetServices() |
| 58 log.Fields{ |
| 59 "path": req.Path, |
| 60 "index": req.Index, |
| 61 "tail": tail, |
| 62 }.Debugf(c, "Received get request.") |
| 55 | 63 |
| 56 // Fetch the log stream state for this log stream. | 64 // Fetch the log stream state for this log stream. |
| 57 u, err := url.Parse(req.Path) | 65 u, err := url.Parse(req.Path) |
| 58 if err != nil { | 66 if err != nil { |
| 59 log.Fields{ | 67 log.Fields{ |
| 60 log.ErrorKey: err, | 68 log.ErrorKey: err, |
| 61 "path": req.Path, | 69 "path": req.Path, |
| 62 }.Errorf(c, "Could not parse path URL.") | 70 }.Errorf(c, "Could not parse path URL.") |
| 63 return nil, grpcutil.Errf(codes.InvalidArgument, "invalid path e
ncoding") | 71 return nil, grpcutil.Errf(codes.InvalidArgument, "invalid path e
ncoding") |
| 64 } | 72 } |
| (...skipping 65 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 130 | 138 |
| 131 log.Fields{ | 139 log.Fields{ |
| 132 "logCount": len(resp.Logs), | 140 "logCount": len(resp.Logs), |
| 133 }.Debugf(c, "Get request completed successfully.") | 141 }.Debugf(c, "Get request completed successfully.") |
| 134 return &resp, nil | 142 return &resp, nil |
| 135 } | 143 } |
| 136 | 144 |
| 137 func (s *Server) getLogs(c context.Context, svc coordinator.Services, req *logdo
g.GetRequest, tail bool, | 145 func (s *Server) getLogs(c context.Context, svc coordinator.Services, req *logdo
g.GetRequest, tail bool, |
| 138 ls *coordinator.LogStream) ( | 146 ls *coordinator.LogStream) ( |
| 139 []*logpb.LogEntry, error) { | 147 []*logpb.LogEntry, error) { |
| 148 byteLimit := int(req.ByteCount) |
| 149 if byteLimit <= 0 || byteLimit > getBytesLimit { |
| 150 byteLimit = getBytesLimit |
| 151 } |
| 152 |
| 140 var st storage.Storage | 153 var st storage.Storage |
| 141 if !ls.Archived() { | 154 if !ls.Archived() { |
| 142 log.Debugf(c, "Log is not archived. Fetching from intermediate s
torage.") | 155 log.Debugf(c, "Log is not archived. Fetching from intermediate s
torage.") |
| 143 | 156 |
| 144 // Logs are not archived. Fetch from intermediate storage. | 157 // Logs are not archived. Fetch from intermediate storage. |
| 145 var err error | 158 var err error |
| 146 st, err = svc.IntermediateStorage(c) | 159 st, err = svc.IntermediateStorage(c) |
| 147 if err != nil { | 160 if err != nil { |
| 148 return nil, err | 161 return nil, err |
| 149 } | 162 } |
| 150 } else { | 163 } else { |
| 151 » » log.Debugf(c, "Log is archived. Fetching from archive storage.") | 164 » » log.Fields{ |
| 165 » » » "indexURL": ls.ArchiveIndexURL, |
| 166 » » » "streamURL": ls.ArchiveStreamURL, |
| 167 » » » "archiveTime": ls.ArchivedTime, |
| 168 » » }.Debugf(c, "Log is archived. Fetching from archive storage.") |
| 169 |
| 152 var err error | 170 var err error |
| 153 gs, err := svc.GSClient(c) | 171 gs, err := svc.GSClient(c) |
| 154 if err != nil { | 172 if err != nil { |
| 155 log.WithError(err).Errorf(c, "Failed to create Google St
orage client.") | 173 log.WithError(err).Errorf(c, "Failed to create Google St
orage client.") |
| 156 return nil, err | 174 return nil, err |
| 157 } | 175 } |
| 158 defer func() { | 176 defer func() { |
| 159 if err := gs.Close(); err != nil { | 177 if err := gs.Close(); err != nil { |
| 160 log.WithError(err).Warningf(c, "Failed to close
Google Storage client.") | 178 log.WithError(err).Warningf(c, "Failed to close
Google Storage client.") |
| 161 } | 179 } |
| 162 }() | 180 }() |
| 163 | 181 |
| 164 st, err = archive.New(c, archive.Options{ | 182 st, err = archive.New(c, archive.Options{ |
| 165 IndexURL: ls.ArchiveIndexURL, | 183 IndexURL: ls.ArchiveIndexURL, |
| 166 StreamURL: ls.ArchiveStreamURL, | 184 StreamURL: ls.ArchiveStreamURL, |
| 167 Client: gs, | 185 Client: gs, |
| 186 MaxBytes: byteLimit, |
| 168 }) | 187 }) |
| 169 if err != nil { | 188 if err != nil { |
| 170 log.WithError(err).Errorf(c, "Failed to create Google St
orage storage instance.") | 189 log.WithError(err).Errorf(c, "Failed to create Google St
orage storage instance.") |
| 171 return nil, err | 190 return nil, err |
| 172 } | 191 } |
| 173 } | 192 } |
| 174 defer st.Close() | 193 defer st.Close() |
| 175 | 194 |
| 176 path := ls.Path() | 195 path := ls.Path() |
| 177 | 196 |
| 178 var fetchedLogs [][]byte | 197 var fetchedLogs [][]byte |
| 179 var err error | 198 var err error |
| 180 if tail { | 199 if tail { |
| 181 fetchedLogs, err = getTail(c, st, path) | 200 fetchedLogs, err = getTail(c, st, path) |
| 182 } else { | 201 } else { |
| 183 » » fetchedLogs, err = getHead(c, req, st, path) | 202 » » fetchedLogs, err = getHead(c, req, st, path, byteLimit) |
| 184 } | 203 } |
| 185 if err != nil { | 204 if err != nil { |
| 186 log.WithError(err).Errorf(c, "Failed to fetch log records.") | 205 log.WithError(err).Errorf(c, "Failed to fetch log records.") |
| 187 return nil, err | 206 return nil, err |
| 188 } | 207 } |
| 189 | 208 |
| 190 logEntries := make([]*logpb.LogEntry, len(fetchedLogs)) | 209 logEntries := make([]*logpb.LogEntry, len(fetchedLogs)) |
| 191 for idx, ld := range fetchedLogs { | 210 for idx, ld := range fetchedLogs { |
| 192 // Deserialize the log entry, then convert it to output value. | 211 // Deserialize the log entry, then convert it to output value. |
| 193 le := logpb.LogEntry{} | 212 le := logpb.LogEntry{} |
| 194 if err := proto.Unmarshal(ld, &le); err != nil { | 213 if err := proto.Unmarshal(ld, &le); err != nil { |
| 195 log.Fields{ | 214 log.Fields{ |
| 196 log.ErrorKey: err, | 215 log.ErrorKey: err, |
| 197 "index": idx, | 216 "index": idx, |
| 198 }.Errorf(c, "Failed to generate response log entry.") | 217 }.Errorf(c, "Failed to generate response log entry.") |
| 199 return nil, err | 218 return nil, err |
| 200 } | 219 } |
| 201 logEntries[idx] = &le | 220 logEntries[idx] = &le |
| 202 } | 221 } |
| 203 return logEntries, nil | 222 return logEntries, nil |
| 204 } | 223 } |
| 205 | 224 |
| 206 func getHead(c context.Context, req *logdog.GetRequest, st storage.Storage, p ty
pes.StreamPath) ([][]byte, error) { | 225 func getHead(c context.Context, req *logdog.GetRequest, st storage.Storage, p ty
pes.StreamPath, byteLimit int) ( |
| 226 » [][]byte, error) { |
| 207 c = log.SetFields(c, log.Fields{ | 227 c = log.SetFields(c, log.Fields{ |
| 208 "path": p, | 228 "path": p, |
| 209 "index": req.Index, | 229 "index": req.Index, |
| 210 "count": req.LogCount, | 230 "count": req.LogCount, |
| 211 "bytes": req.ByteCount, | 231 "bytes": req.ByteCount, |
| 212 "noncontiguous": req.NonContiguous, | 232 "noncontiguous": req.NonContiguous, |
| 213 }) | 233 }) |
| 214 | 234 |
| 215 byteLimit := int(req.ByteCount) | |
| 216 if byteLimit <= 0 || byteLimit > getBytesLimit { | |
| 217 byteLimit = getBytesLimit | |
| 218 } | |
| 219 | |
| 220 // Allocate result logs array. | 235 // Allocate result logs array. |
| 221 logCount := int(req.LogCount) | 236 logCount := int(req.LogCount) |
| 222 asz := getInitialArraySize | 237 asz := getInitialArraySize |
| 223 if logCount > 0 && logCount < asz { | 238 if logCount > 0 && logCount < asz { |
| 224 asz = logCount | 239 asz = logCount |
| 225 } | 240 } |
| 226 logs := make([][]byte, 0, asz) | 241 logs := make([][]byte, 0, asz) |
| 227 | 242 |
| 228 sreq := storage.GetRequest{ | 243 sreq := storage.GetRequest{ |
| 229 Path: p, | 244 Path: p, |
| (...skipping 52 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 282 log.ErrorKey: err, | 297 log.ErrorKey: err, |
| 283 "delay": delay, | 298 "delay": delay, |
| 284 }.Warningf(c, "Transient error while fetching tail log; retrying
.") | 299 }.Warningf(c, "Transient error while fetching tail log; retrying
.") |
| 285 }) | 300 }) |
| 286 if err != nil { | 301 if err != nil { |
| 287 log.WithError(err).Errorf(c, "Failed to fetch tail log.") | 302 log.WithError(err).Errorf(c, "Failed to fetch tail log.") |
| 288 return nil, err | 303 return nil, err |
| 289 } | 304 } |
| 290 return [][]byte{data}, err | 305 return [][]byte{data}, err |
| 291 } | 306 } |
| OLD | NEW |