| OLD | NEW |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package logs | 5 package logs |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "net/url" | 8 "net/url" |
| 9 "time" | 9 "time" |
| 10 | 10 |
| 11 "github.com/golang/protobuf/proto" | 11 "github.com/golang/protobuf/proto" |
| 12 ds "github.com/luci/gae/service/datastore" | 12 ds "github.com/luci/gae/service/datastore" |
| 13 "github.com/luci/luci-go/appengine/logdog/coordinator" | 13 "github.com/luci/luci-go/appengine/logdog/coordinator" |
| 14 "github.com/luci/luci-go/appengine/logdog/coordinator/config" | 14 "github.com/luci/luci-go/appengine/logdog/coordinator/config" |
| 15 "github.com/luci/luci-go/common/api/logdog_coordinator/logs/v1" | 15 "github.com/luci/luci-go/common/api/logdog_coordinator/logs/v1" |
| 16 "github.com/luci/luci-go/common/grpcutil" | 16 "github.com/luci/luci-go/common/grpcutil" |
| 17 "github.com/luci/luci-go/common/logdog/types" | 17 "github.com/luci/luci-go/common/logdog/types" |
| 18 log "github.com/luci/luci-go/common/logging" | 18 log "github.com/luci/luci-go/common/logging" |
| 19 "github.com/luci/luci-go/common/proto/logdog/logpb" | 19 "github.com/luci/luci-go/common/proto/logdog/logpb" |
| 20 "github.com/luci/luci-go/common/retry" | 20 "github.com/luci/luci-go/common/retry" |
| 21 "github.com/luci/luci-go/server/logdog/storage" | 21 "github.com/luci/luci-go/server/logdog/storage" |
| 22 "github.com/luci/luci-go/server/logdog/storage/archive" | |
| 23 "golang.org/x/net/context" | 22 "golang.org/x/net/context" |
| 24 "google.golang.org/grpc/codes" | 23 "google.golang.org/grpc/codes" |
| 25 ) | 24 ) |
| 26 | 25 |
| 27 const ( | 26 const ( |
| 28 // getInitialArraySize is the initial amount of log slots to allocate fo
r a | 27 // getInitialArraySize is the initial amount of log slots to allocate fo
r a |
| 29 // Get request. | 28 // Get request. |
| 30 » getInitialArraySize = 256 | 29 » getInitialArraySize = int64(256) |
| 31 | 30 |
| 32 // getBytesLimit is the maximum amount of data that we are willing to qu
ery. | 31 // getBytesLimit is the maximum amount of data that we are willing to qu
ery. |
| 33 // AppEngine limits our response size to 32MB. However, this limit appli
es | 32 // AppEngine limits our response size to 32MB. However, this limit appli
es |
| 34 // to the raw recovered LogEntry data, so we'll artificially constrain t
his | 33 // to the raw recovered LogEntry data, so we'll artificially constrain t
his |
| 35 // to 16MB so the additional JSON overhead doesn't kill it. | 34 // to 16MB so the additional JSON overhead doesn't kill it. |
| 36 getBytesLimit = 16 * 1024 * 1024 | 35 getBytesLimit = 16 * 1024 * 1024 |
| 37 ) | 36 ) |
| 38 | 37 |
| 39 // Get returns state and log data for a single log stream. | 38 // Get returns state and log data for a single log stream. |
| 40 func (s *Server) Get(c context.Context, req *logs.GetRequest) (*logs.GetResponse
, error) { | 39 func (s *Server) Get(c context.Context, req *logs.GetRequest) (*logs.GetResponse
, error) { |
| (...skipping 87 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 128 } | 127 } |
| 129 | 128 |
| 130 log.Fields{ | 129 log.Fields{ |
| 131 "logCount": len(resp.Logs), | 130 "logCount": len(resp.Logs), |
| 132 }.Debugf(c, "Get request completed successfully.") | 131 }.Debugf(c, "Get request completed successfully.") |
| 133 return &resp, nil | 132 return &resp, nil |
| 134 } | 133 } |
| 135 | 134 |
| 136 func (s *Server) getLogs(c context.Context, req *logs.GetRequest, tail bool, ls
*coordinator.LogStream) ( | 135 func (s *Server) getLogs(c context.Context, req *logs.GetRequest, tail bool, ls
*coordinator.LogStream) ( |
| 137 []*logpb.LogEntry, error) { | 136 []*logpb.LogEntry, error) { |
| 138 » var st storage.Storage | 137 » st, err := s.StorageForLogStream(c, ls) |
| 139 » if !ls.Archived() { | 138 » if err != nil { |
| 140 » » log.Debugf(c, "Log is not archived. Fetching from intermediate s
torage.") | 139 » » return nil, err |
| 141 | |
| 142 » » // Logs are not archived. Fetch from intermediate storage. | |
| 143 » » var err error | |
| 144 » » st, err = s.Storage(c) | |
| 145 » » if err != nil { | |
| 146 » » » return nil, err | |
| 147 » » } | |
| 148 » } else { | |
| 149 » » log.Debugf(c, "Log is archived. Fetching from archive storage.") | |
| 150 » » var err error | |
| 151 » » gs, err := s.GSClient(c) | |
| 152 » » if err != nil { | |
| 153 » » » log.WithError(err).Errorf(c, "Failed to create Google St
orage client.") | |
| 154 » » » return nil, err | |
| 155 » » } | |
| 156 » » defer func() { | |
| 157 » » » if err := gs.Close(); err != nil { | |
| 158 » » » » log.WithError(err).Warningf(c, "Failed to close
Google Storage client.") | |
| 159 » » » } | |
| 160 » » }() | |
| 161 | |
| 162 » » st, err = archive.New(c, archive.Options{ | |
| 163 » » » IndexURL: ls.ArchiveIndexURL, | |
| 164 » » » StreamURL: ls.ArchiveStreamURL, | |
| 165 » » » Client: gs, | |
| 166 » » }) | |
| 167 » » if err != nil { | |
| 168 » » » log.WithError(err).Errorf(c, "Failed to create Google St
orage storage instance.") | |
| 169 » » » return nil, err | |
| 170 » » } | |
| 171 } | 140 } |
| 172 defer st.Close() | 141 defer st.Close() |
| 173 | 142 |
| 174 path := ls.Path() | 143 path := ls.Path() |
| 175 | 144 |
| 176 var fetchedLogs [][]byte | 145 var fetchedLogs [][]byte |
| 177 var err error | |
| 178 if tail { | 146 if tail { |
| 179 fetchedLogs, err = getTail(c, st, path) | 147 fetchedLogs, err = getTail(c, st, path) |
| 180 } else { | 148 } else { |
| 181 fetchedLogs, err = getHead(c, req, st, path) | 149 fetchedLogs, err = getHead(c, req, st, path) |
| 182 } | 150 } |
| 183 if err != nil { | 151 if err != nil { |
| 184 log.WithError(err).Errorf(c, "Failed to fetch log records.") | 152 log.WithError(err).Errorf(c, "Failed to fetch log records.") |
| 185 return nil, err | 153 return nil, err |
| 186 } | 154 } |
| 187 | 155 |
| (...skipping 21 matching lines...) Expand all Loading... |
| 209 "bytes": req.ByteCount, | 177 "bytes": req.ByteCount, |
| 210 "noncontiguous": req.NonContiguous, | 178 "noncontiguous": req.NonContiguous, |
| 211 }) | 179 }) |
| 212 | 180 |
| 213 byteLimit := int(req.ByteCount) | 181 byteLimit := int(req.ByteCount) |
| 214 if byteLimit <= 0 || byteLimit > getBytesLimit { | 182 if byteLimit <= 0 || byteLimit > getBytesLimit { |
| 215 byteLimit = getBytesLimit | 183 byteLimit = getBytesLimit |
| 216 } | 184 } |
| 217 | 185 |
| 218 // Allocate result logs array. | 186 // Allocate result logs array. |
| 219 » logCount := int(req.LogCount) | 187 » logCount := int64(req.LogCount) |
| 220 asz := getInitialArraySize | 188 asz := getInitialArraySize |
| 221 if logCount > 0 && logCount < asz { | 189 if logCount > 0 && logCount < asz { |
| 222 asz = logCount | 190 asz = logCount |
| 223 } | 191 } |
| 224 logs := make([][]byte, 0, asz) | 192 logs := make([][]byte, 0, asz) |
| 225 | 193 |
| 226 sreq := storage.GetRequest{ | 194 sreq := storage.GetRequest{ |
| 227 Path: p, | 195 Path: p, |
| 228 Index: types.MessageIndex(req.Index), | 196 Index: types.MessageIndex(req.Index), |
| 229 Limit: logCount, | 197 Limit: logCount, |
| 230 } | 198 } |
| 231 | 199 |
| 232 count := 0 | |
| 233 err := retry.Retry(c, retry.TransientOnly(retry.Default), func() error { | 200 err := retry.Retry(c, retry.TransientOnly(retry.Default), func() error { |
| 234 // Issue the Get request. This may return a transient error, in
which case | 201 // Issue the Get request. This may return a transient error, in
which case |
| 235 // we will retry. | 202 // we will retry. |
| 236 return st.Get(&sreq, func(idx types.MessageIndex, ld []byte) boo
l { | 203 return st.Get(&sreq, func(idx types.MessageIndex, ld []byte) boo
l { |
| 237 » » » if count > 0 && byteLimit-len(ld) < 0 { | 204 » » » if len(logs) > 0 && byteLimit-len(ld) < 0 { |
| 238 // Not the first log, and we've exceeded our byt
e limit. | 205 // Not the first log, and we've exceeded our byt
e limit. |
| 239 return false | 206 return false |
| 240 } | 207 } |
| 241 byteLimit -= len(ld) | 208 byteLimit -= len(ld) |
| 242 | 209 |
| 243 if !(req.NonContiguous || idx == sreq.Index) { | 210 if !(req.NonContiguous || idx == sreq.Index) { |
| 244 return false | 211 return false |
| 245 } | 212 } |
| 246 logs = append(logs, ld) | 213 logs = append(logs, ld) |
| 247 sreq.Index = idx + 1 | 214 sreq.Index = idx + 1 |
| 248 » » » count++ | 215 » » » return !(logCount > 0 && int64(len(logs)) >= logCount) |
| 249 » » » return !(logCount > 0 && count >= logCount) | |
| 250 }) | 216 }) |
| 251 }, func(err error, delay time.Duration) { | 217 }, func(err error, delay time.Duration) { |
| 252 log.Fields{ | 218 log.Fields{ |
| 253 log.ErrorKey: err, | 219 log.ErrorKey: err, |
| 254 "delay": delay, | 220 "delay": delay, |
| 255 "initialIndex": req.Index, | 221 "initialIndex": req.Index, |
| 256 "nextIndex": sreq.Index, | 222 "nextIndex": sreq.Index, |
| 257 "count": len(logs), | 223 "count": len(logs), |
| 258 }.Warningf(c, "Transient error while loading logs; retrying.") | 224 }.Warningf(c, "Transient error while loading logs; retrying.") |
| 259 }) | 225 }) |
| (...skipping 20 matching lines...) Expand all Loading... |
| 280 log.ErrorKey: err, | 246 log.ErrorKey: err, |
| 281 "delay": delay, | 247 "delay": delay, |
| 282 }.Warningf(c, "Transient error while fetching tail log; retrying
.") | 248 }.Warningf(c, "Transient error while fetching tail log; retrying
.") |
| 283 }) | 249 }) |
| 284 if err != nil { | 250 if err != nil { |
| 285 log.WithError(err).Errorf(c, "Failed to fetch tail log.") | 251 log.WithError(err).Errorf(c, "Failed to fetch tail log.") |
| 286 return nil, err | 252 return nil, err |
| 287 } | 253 } |
| 288 return [][]byte{data}, err | 254 return [][]byte{data}, err |
| 289 } | 255 } |
| OLD | NEW |