OLD | NEW |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 package logs | 5 package logs |
6 | 6 |
7 import ( | 7 import ( |
8 "net/url" | 8 "net/url" |
9 "time" | 9 "time" |
10 | 10 |
11 "github.com/golang/protobuf/proto" | 11 "github.com/golang/protobuf/proto" |
12 ds "github.com/luci/gae/service/datastore" | 12 ds "github.com/luci/gae/service/datastore" |
13 "github.com/luci/luci-go/appengine/logdog/coordinator" | 13 "github.com/luci/luci-go/appengine/logdog/coordinator" |
14 "github.com/luci/luci-go/appengine/logdog/coordinator/config" | 14 "github.com/luci/luci-go/appengine/logdog/coordinator/config" |
15 "github.com/luci/luci-go/common/api/logdog_coordinator/logs/v1" | 15 "github.com/luci/luci-go/common/api/logdog_coordinator/logs/v1" |
16 "github.com/luci/luci-go/common/grpcutil" | 16 "github.com/luci/luci-go/common/grpcutil" |
17 "github.com/luci/luci-go/common/logdog/types" | 17 "github.com/luci/luci-go/common/logdog/types" |
18 log "github.com/luci/luci-go/common/logging" | 18 log "github.com/luci/luci-go/common/logging" |
19 "github.com/luci/luci-go/common/proto/logdog/logpb" | 19 "github.com/luci/luci-go/common/proto/logdog/logpb" |
20 "github.com/luci/luci-go/common/retry" | 20 "github.com/luci/luci-go/common/retry" |
21 "github.com/luci/luci-go/server/logdog/storage" | 21 "github.com/luci/luci-go/server/logdog/storage" |
22 "github.com/luci/luci-go/server/logdog/storage/archive" | |
23 "golang.org/x/net/context" | 22 "golang.org/x/net/context" |
24 "google.golang.org/grpc/codes" | 23 "google.golang.org/grpc/codes" |
25 ) | 24 ) |
26 | 25 |
27 const ( | 26 const ( |
28 // getInitialArraySize is the initial amount of log slots to allocate fo
r a | 27 // getInitialArraySize is the initial amount of log slots to allocate fo
r a |
29 // Get request. | 28 // Get request. |
30 » getInitialArraySize = 256 | 29 » getInitialArraySize = int64(256) |
31 | 30 |
32 // getBytesLimit is the maximum amount of data that we are willing to qu
ery. | 31 // getBytesLimit is the maximum amount of data that we are willing to qu
ery. |
33 // AppEngine limits our response size to 32MB. However, this limit appli
es | 32 // AppEngine limits our response size to 32MB. However, this limit appli
es |
34 // to the raw recovered LogEntry data, so we'll artificially constrain t
his | 33 // to the raw recovered LogEntry data, so we'll artificially constrain t
his |
35 // to 16MB so the additional JSON overhead doesn't kill it. | 34 // to 16MB so the additional JSON overhead doesn't kill it. |
36 getBytesLimit = 16 * 1024 * 1024 | 35 getBytesLimit = 16 * 1024 * 1024 |
37 ) | 36 ) |
38 | 37 |
39 // Get returns state and log data for a single log stream. | 38 // Get returns state and log data for a single log stream. |
40 func (s *Server) Get(c context.Context, req *logs.GetRequest) (*logs.GetResponse
, error) { | 39 func (s *Server) Get(c context.Context, req *logs.GetRequest) (*logs.GetResponse
, error) { |
(...skipping 87 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
128 } | 127 } |
129 | 128 |
130 log.Fields{ | 129 log.Fields{ |
131 "logCount": len(resp.Logs), | 130 "logCount": len(resp.Logs), |
132 }.Debugf(c, "Get request completed successfully.") | 131 }.Debugf(c, "Get request completed successfully.") |
133 return &resp, nil | 132 return &resp, nil |
134 } | 133 } |
135 | 134 |
136 func (s *Server) getLogs(c context.Context, req *logs.GetRequest, tail bool, ls
*coordinator.LogStream) ( | 135 func (s *Server) getLogs(c context.Context, req *logs.GetRequest, tail bool, ls
*coordinator.LogStream) ( |
137 []*logpb.LogEntry, error) { | 136 []*logpb.LogEntry, error) { |
138 » var st storage.Storage | 137 » st, err := s.StorageForLogStream(c, ls) |
139 » if !ls.Archived() { | 138 » if err != nil { |
140 » » log.Debugf(c, "Log is not archived. Fetching from intermediate s
torage.") | 139 » » return nil, err |
141 | |
142 » » // Logs are not archived. Fetch from intermediate storage. | |
143 » » var err error | |
144 » » st, err = s.Storage(c) | |
145 » » if err != nil { | |
146 » » » return nil, err | |
147 » » } | |
148 » } else { | |
149 » » log.Debugf(c, "Log is archived. Fetching from archive storage.") | |
150 » » var err error | |
151 » » gs, err := s.GSClient(c) | |
152 » » if err != nil { | |
153 » » » log.WithError(err).Errorf(c, "Failed to create Google St
orage client.") | |
154 » » » return nil, err | |
155 » » } | |
156 » » defer func() { | |
157 » » » if err := gs.Close(); err != nil { | |
158 » » » » log.WithError(err).Warningf(c, "Failed to close
Google Storage client.") | |
159 » » » } | |
160 » » }() | |
161 | |
162 » » st, err = archive.New(c, archive.Options{ | |
163 » » » IndexURL: ls.ArchiveIndexURL, | |
164 » » » StreamURL: ls.ArchiveStreamURL, | |
165 » » » Client: gs, | |
166 » » }) | |
167 » » if err != nil { | |
168 » » » log.WithError(err).Errorf(c, "Failed to create Google St
orage storage instance.") | |
169 » » » return nil, err | |
170 » » } | |
171 } | 140 } |
172 defer st.Close() | 141 defer st.Close() |
173 | 142 |
174 path := ls.Path() | 143 path := ls.Path() |
175 | 144 |
176 var fetchedLogs [][]byte | 145 var fetchedLogs [][]byte |
177 var err error | |
178 if tail { | 146 if tail { |
179 fetchedLogs, err = getTail(c, st, path) | 147 fetchedLogs, err = getTail(c, st, path) |
180 } else { | 148 } else { |
181 fetchedLogs, err = getHead(c, req, st, path) | 149 fetchedLogs, err = getHead(c, req, st, path) |
182 } | 150 } |
183 if err != nil { | 151 if err != nil { |
184 log.WithError(err).Errorf(c, "Failed to fetch log records.") | 152 log.WithError(err).Errorf(c, "Failed to fetch log records.") |
185 return nil, err | 153 return nil, err |
186 } | 154 } |
187 | 155 |
(...skipping 21 matching lines...) Expand all Loading... |
209 "bytes": req.ByteCount, | 177 "bytes": req.ByteCount, |
210 "noncontiguous": req.NonContiguous, | 178 "noncontiguous": req.NonContiguous, |
211 }) | 179 }) |
212 | 180 |
213 byteLimit := int(req.ByteCount) | 181 byteLimit := int(req.ByteCount) |
214 if byteLimit <= 0 || byteLimit > getBytesLimit { | 182 if byteLimit <= 0 || byteLimit > getBytesLimit { |
215 byteLimit = getBytesLimit | 183 byteLimit = getBytesLimit |
216 } | 184 } |
217 | 185 |
218 // Allocate result logs array. | 186 // Allocate result logs array. |
219 » logCount := int(req.LogCount) | 187 » logCount := int64(req.LogCount) |
220 asz := getInitialArraySize | 188 asz := getInitialArraySize |
221 if logCount > 0 && logCount < asz { | 189 if logCount > 0 && logCount < asz { |
222 asz = logCount | 190 asz = logCount |
223 } | 191 } |
224 logs := make([][]byte, 0, asz) | 192 logs := make([][]byte, 0, asz) |
225 | 193 |
226 sreq := storage.GetRequest{ | 194 sreq := storage.GetRequest{ |
227 Path: p, | 195 Path: p, |
228 Index: types.MessageIndex(req.Index), | 196 Index: types.MessageIndex(req.Index), |
229 Limit: logCount, | 197 Limit: logCount, |
230 } | 198 } |
231 | 199 |
232 count := 0 | |
233 err := retry.Retry(c, retry.TransientOnly(retry.Default), func() error { | 200 err := retry.Retry(c, retry.TransientOnly(retry.Default), func() error { |
234 // Issue the Get request. This may return a transient error, in
which case | 201 // Issue the Get request. This may return a transient error, in
which case |
235 // we will retry. | 202 // we will retry. |
236 return st.Get(&sreq, func(idx types.MessageIndex, ld []byte) boo
l { | 203 return st.Get(&sreq, func(idx types.MessageIndex, ld []byte) boo
l { |
237 » » » if count > 0 && byteLimit-len(ld) < 0 { | 204 » » » if len(logs) > 0 && byteLimit-len(ld) < 0 { |
238 // Not the first log, and we've exceeded our byt
e limit. | 205 // Not the first log, and we've exceeded our byt
e limit. |
239 return false | 206 return false |
240 } | 207 } |
241 byteLimit -= len(ld) | 208 byteLimit -= len(ld) |
242 | 209 |
243 if !(req.NonContiguous || idx == sreq.Index) { | 210 if !(req.NonContiguous || idx == sreq.Index) { |
244 return false | 211 return false |
245 } | 212 } |
246 logs = append(logs, ld) | 213 logs = append(logs, ld) |
247 sreq.Index = idx + 1 | 214 sreq.Index = idx + 1 |
248 » » » count++ | 215 » » » return !(logCount > 0 && int64(len(logs)) >= logCount) |
249 » » » return !(logCount > 0 && count >= logCount) | |
250 }) | 216 }) |
251 }, func(err error, delay time.Duration) { | 217 }, func(err error, delay time.Duration) { |
252 log.Fields{ | 218 log.Fields{ |
253 log.ErrorKey: err, | 219 log.ErrorKey: err, |
254 "delay": delay, | 220 "delay": delay, |
255 "initialIndex": req.Index, | 221 "initialIndex": req.Index, |
256 "nextIndex": sreq.Index, | 222 "nextIndex": sreq.Index, |
257 "count": len(logs), | 223 "count": len(logs), |
258 }.Warningf(c, "Transient error while loading logs; retrying.") | 224 }.Warningf(c, "Transient error while loading logs; retrying.") |
259 }) | 225 }) |
(...skipping 20 matching lines...) Expand all Loading... |
280 log.ErrorKey: err, | 246 log.ErrorKey: err, |
281 "delay": delay, | 247 "delay": delay, |
282 }.Warningf(c, "Transient error while fetching tail log; retrying
.") | 248 }.Warningf(c, "Transient error while fetching tail log; retrying
.") |
283 }) | 249 }) |
284 if err != nil { | 250 if err != nil { |
285 log.WithError(err).Errorf(c, "Failed to fetch tail log.") | 251 log.WithError(err).Errorf(c, "Failed to fetch tail log.") |
286 return nil, err | 252 return nil, err |
287 } | 253 } |
288 return [][]byte{data}, err | 254 return [][]byte{data}, err |
289 } | 255 } |
OLD | NEW |