Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(79)

Side by Side Diff: server/logdog/storage/archive/storage.go

Issue 1904503003: LogDog: Fix archived log stream read errors. (Closed) Base URL: https://github.com/luci/luci-go@hierarchy-check-first
Patch Set: Delete "offset()" method. Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « common/recordio/size_test.go ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 // Package archive implements a storage.Storage instance that retrieves logs 5 // Package archive implements a storage.Storage instance that retrieves logs
6 // from a Google Storage archive. 6 // from a Google Storage archive.
7 // 7 //
8 // This is a special implementation of storage.Storage, and does not fully 8 // This is a special implementation of storage.Storage, and does not fully
9 // conform to the API expecations. Namely: 9 // conform to the API expecations. Namely:
10 // - It is read-only. Mutation methods will return storage.ErrReadOnly. 10 // - It is read-only. Mutation methods will return storage.ErrReadOnly.
(...skipping 36 matching lines...) Expand 10 before | Expand all | Expand 10 after
47 type Options struct { 47 type Options struct {
48 // IndexURL is the Google Storage URL for the stream's index. 48 // IndexURL is the Google Storage URL for the stream's index.
49 IndexURL string 49 IndexURL string
50 // StreamURL is the Google Storage URL for the stream's entries. 50 // StreamURL is the Google Storage URL for the stream's entries.
51 StreamURL string 51 StreamURL string
52 52
53 // Client is the HTTP client to use for authentication. 53 // Client is the HTTP client to use for authentication.
54 // 54 //
55 // Closing this Storage instance does not close the underlying Client. 55 // Closing this Storage instance does not close the underlying Client.
56 Client gs.Client 56 Client gs.Client
57
58 // MaxBytes, if >0, is the maximum number of bytes to fetch in any given
59 // request. This should be set for GAE fetches, as large log streams may
60 // exceed the urlfetch system's maximum response size otherwise.
61 //
62 // This is the number of bytes to request, not the number of bytes of lo g data
63 // to return. The difference is that the former includes the RecordIO fr ame
64 // headers.
65 MaxBytes int
57 } 66 }
58 67
59 type storageImpl struct { 68 type storageImpl struct {
60 *Options 69 *Options
61 context.Context 70 context.Context
62 71
63 streamPath gs.Path 72 streamPath gs.Path
64 indexPath gs.Path 73 indexPath gs.Path
65 74
66 indexMu sync.Mutex 75 indexMu sync.Mutex
(...skipping 36 matching lines...) Expand 10 before | Expand all | Expand 10 after
103 return err 112 return err
104 } 113 }
105 114
106 // Identify the byte offsets that we want to fetch from the entries stre am. 115 // Identify the byte offsets that we want to fetch from the entries stre am.
107 st := s.buildGetStrategy(&req, idx) 116 st := s.buildGetStrategy(&req, idx)
108 if st.lastIndex == -1 || req.Index > st.lastIndex { 117 if st.lastIndex == -1 || req.Index > st.lastIndex {
109 // No records to read. 118 // No records to read.
110 return nil 119 return nil
111 } 120 }
112 121
113 » r, err := s.Client.NewReader(s.streamPath, gs.Options{ 122 » offset := int64(st.startOffset)
114 » » From: st.from, 123 » log.Fields{
115 » » To: st.to, 124 » » "offset": offset,
116 » }) 125 » » "length": st.length(),
126 » » "path": s.streamPath,
127 » }.Debugf(s, "Creating stream reader for range.")
128 » r, err := s.Client.NewReader(s.streamPath, offset, st.length())
117 if err != nil { 129 if err != nil {
118 log.WithError(err).Errorf(s, "Failed to create stream Reader.") 130 log.WithError(err).Errorf(s, "Failed to create stream Reader.")
119 return err 131 return err
120 } 132 }
121 defer func() { 133 defer func() {
122 if err := r.Close(); err != nil { 134 if err := r.Close(); err != nil {
123 log.WithError(err).Warningf(s, "Error closing stream Rea der.") 135 log.WithError(err).Warningf(s, "Error closing stream Rea der.")
124 } 136 }
125 }() 137 }()
126 cr := iotools.CountingReader{Reader: r} 138 cr := iotools.CountingReader{Reader: r}
127 rio := recordio.NewReader(&cr, maxStreamRecordSize) 139 rio := recordio.NewReader(&cr, maxStreamRecordSize)
128 140
129 buf := bytes.Buffer{} 141 buf := bytes.Buffer{}
130 le := logpb.LogEntry{} 142 le := logpb.LogEntry{}
131 » max := req.Limit 143 » max := st.count
132 for { 144 for {
133 » » offset := st.from + cr.Count() 145 » » offset += cr.Count()
134 146
135 sz, r, err := rio.ReadFrame() 147 sz, r, err := rio.ReadFrame()
136 switch err { 148 switch err {
137 case nil: 149 case nil:
138 break 150 break
139 151
140 case io.EOF: 152 case io.EOF:
141 return nil 153 return nil
142 154
143 default: 155 default:
(...skipping 54 matching lines...) Expand 10 before | Expand all | Expand 10 after
198 return nil, 0, err 210 return nil, 0, err
199 } 211 }
200 212
201 // Get the offset of the last record. 213 // Get the offset of the last record.
202 if len(idx.Entries) == 0 { 214 if len(idx.Entries) == 0 {
203 return nil, 0, nil 215 return nil, 0, nil
204 } 216 }
205 lle := idx.Entries[len(idx.Entries)-1] 217 lle := idx.Entries[len(idx.Entries)-1]
206 218
207 // Get a Reader for the Tail entry. 219 // Get a Reader for the Tail entry.
208 » r, err := s.Client.NewReader(s.streamPath, gs.Options{ 220 » r, err := s.Client.NewReader(s.streamPath, int64(lle.Offset), -1)
209 » » From: int64(lle.Offset),
210 » })
211 if err != nil { 221 if err != nil {
212 log.Fields{ 222 log.Fields{
213 log.ErrorKey: err, 223 log.ErrorKey: err,
214 "offset": lle.Offset, 224 "offset": lle.Offset,
215 }.Errorf(s, "Failed to create reader.") 225 }.Errorf(s, "Failed to create reader.")
216 return nil, 0, err 226 return nil, 0, err
217 } 227 }
218 defer func() { 228 defer func() {
219 if err := r.Close(); err != nil { 229 if err := r.Close(); err != nil {
220 log.WithError(err).Warningf(s, "Failed to close Reader." ) 230 log.WithError(err).Warningf(s, "Failed to close Reader." )
221 } 231 }
222 }() 232 }()
223 233
224 rio := recordio.NewReader(r, maxStreamRecordSize) 234 rio := recordio.NewReader(r, maxStreamRecordSize)
225 d, err := rio.ReadFrameAll() 235 d, err := rio.ReadFrameAll()
226 if err != nil { 236 if err != nil {
227 log.WithError(err).Errorf(s, "Failed to read log frame.") 237 log.WithError(err).Errorf(s, "Failed to read log frame.")
228 return nil, 0, err 238 return nil, 0, err
229 } 239 }
230 240
231 return d, types.MessageIndex(lle.StreamIndex), nil 241 return d, types.MessageIndex(lle.StreamIndex), nil
232 } 242 }
233 243
234 // getIndex returns the cached log stream index, fetching it if necessary. 244 // getIndex returns the cached log stream index, fetching it if necessary.
235 func (s *storageImpl) getIndex() (*logpb.LogIndex, error) { 245 func (s *storageImpl) getIndex() (*logpb.LogIndex, error) {
236 s.indexMu.Lock() 246 s.indexMu.Lock()
237 defer s.indexMu.Unlock() 247 defer s.indexMu.Unlock()
238 248
239 if s.index == nil { 249 if s.index == nil {
240 » » r, err := s.Client.NewReader(s.indexPath, gs.Options{}) 250 » » r, err := s.Client.NewReader(s.indexPath, 0, -1)
241 if err != nil { 251 if err != nil {
242 log.WithError(err).Errorf(s, "Failed to create index Rea der.") 252 log.WithError(err).Errorf(s, "Failed to create index Rea der.")
243 return nil, err 253 return nil, err
244 } 254 }
245 defer func() { 255 defer func() {
246 if err := r.Close(); err != nil { 256 if err := r.Close(); err != nil {
247 log.WithError(err).Warningf(s, "Error closing in dex Reader.") 257 log.WithError(err).Warningf(s, "Error closing in dex Reader.")
248 } 258 }
249 }() 259 }()
250 indexData, err := ioutil.ReadAll(r) 260 indexData, err := ioutil.ReadAll(r)
251 if err != nil { 261 if err != nil {
252 log.WithError(err).Errorf(s, "Failed to read index.") 262 log.WithError(err).Errorf(s, "Failed to read index.")
253 return nil, err 263 return nil, err
254 } 264 }
255 265
256 index := logpb.LogIndex{} 266 index := logpb.LogIndex{}
257 if err := proto.Unmarshal(indexData, &index); err != nil { 267 if err := proto.Unmarshal(indexData, &index); err != nil {
258 log.WithError(err).Errorf(s, "Failed to unmarshal index. ") 268 log.WithError(err).Errorf(s, "Failed to unmarshal index. ")
259 return nil, err 269 return nil, err
260 } 270 }
261 271
262 s.index = &index 272 s.index = &index
263 } 273 }
264 return s.index, nil 274 return s.index, nil
265 } 275 }
266 276
267 type getStrategy struct { 277 type getStrategy struct {
268 » // from is the beginning byte offset of the log entry stream. 278 » // startOffset is the beginning byte offset of the log entry stream.
269 » from int64 279 » startOffset uint64
270 » // to is the ending byte offset of the log entry stream. 280 » // endOffset is the ending byte offset of the log entry stream.
271 » to int64 281 » endOffset uint64
272 282
283 // count is the number of log entries that will be fetched.
284 count int
273 // lastIndex is the last log entry index in the stream. This will be -1 if 285 // lastIndex is the last log entry index in the stream. This will be -1 if
274 // there are no entries in the stream. 286 // there are no entries in the stream.
275 lastIndex types.MessageIndex 287 lastIndex types.MessageIndex
276 } 288 }
277 289
290 func (gs *getStrategy) length() int64 {
291 if gs.startOffset < gs.endOffset {
292 return int64(gs.endOffset - gs.startOffset)
293 }
294 return -1
295 }
296
297 // setCount sets the `count` field. If called multiple times, the smallest
298 // assigned value will be retained.
299 func (gs *getStrategy) setCount(v int) {
300 if gs.count <= 0 || gs.count > v {
301 gs.count = v
302 }
303 }
304
305 // setEndOffset sets the `length` field. If called multiple times, the smallest
306 // assigned value will be retained.
307 func (gs *getStrategy) setEndOffset(v uint64) {
308 if gs.endOffset == 0 || gs.endOffset > v {
309 gs.endOffset = v
310 }
311 }
312
278 func (s *storageImpl) buildGetStrategy(req *storage.GetRequest, idx *logpb.LogIn dex) *getStrategy { 313 func (s *storageImpl) buildGetStrategy(req *storage.GetRequest, idx *logpb.LogIn dex) *getStrategy {
279 st := getStrategy{} 314 st := getStrategy{}
280 315
281 if len(idx.Entries) == 0 { 316 if len(idx.Entries) == 0 {
282 st.lastIndex = -1 317 st.lastIndex = -1
283 return &st 318 return &st
284 } 319 }
285 320
286 st.lastIndex = types.MessageIndex(idx.Entries[len(idx.Entries)-1].Stream Index) 321 st.lastIndex = types.MessageIndex(idx.Entries[len(idx.Entries)-1].Stream Index)
287 startIdx := indexEntryFor(idx.Entries, req.Index) 322 startIdx := indexEntryFor(idx.Entries, req.Index)
288 if startIdx < 0 { 323 if startIdx < 0 {
289 startIdx = 0 324 startIdx = 0
290 } 325 }
291 le := idx.Entries[startIdx] 326 le := idx.Entries[startIdx]
292 » st.from = int64(le.Offset) 327 » st.startOffset = le.Offset
293 328
294 » // If we have a limit, and we have enough index entries to upper-bound o ur 329 » // Determine an upper bound based on our limits.
295 » // stream based on that limit, use that.
296 // 330 //
297 » // Note that this may overshoot if the index and/or stream is sparse. We know 331 » // If we have a count limit, and we have enough index entries to upper-b ound
298 » // for sure that we have one LogEntry per index entry, so that's the bes t we 332 » // our stream based on that limit, use that. Note that this may overshoo t if
299 » // can do. 333 » // the index and/or stream is sparse. We know for sure that we have one
334 » // LogEntry per index entry, so that's the best we can do.
300 if req.Limit > 0 { 335 if req.Limit > 0 {
301 if ub := startIdx + req.Limit; ub < len(idx.Entries) { 336 if ub := startIdx + req.Limit; ub < len(idx.Entries) {
302 » » » st.to = int64(idx.Entries[ub].Offset) 337 » » » st.setEndOffset(idx.Entries[ub].Offset)
338 » » }
339 » » st.setCount(req.Limit)
340 » }
341
342 » // If we have a byte limit, count the entry sizes until we reach that li mit.
343 » if mb := int64(s.MaxBytes); mb > 0 {
344 » » mb := uint64(mb)
345
346 » » for i, e := range idx.Entries[startIdx:] {
347 » » » if e.Offset < st.startOffset {
348 » » » » // This shouldn't really happen, but it could ha ppen if there is a
349 » » » » // corrupt index.
350 » » » » continue
351 » » » }
352
353 » » » // Calculate the request offset and truncate if we've ex ceeded our maximum
354 » » » // request bytes.
355 » » » if size := (e.Offset - st.startOffset); size > mb {
356 » » » » st.setEndOffset(e.Offset)
357 » » » » st.setCount(i)
358 » » » » break
359 » » » }
303 } 360 }
304 } 361 }
305 return &st 362 return &st
306 } 363 }
307 364
308 // indexEntryFor identifies the log index entry closest (<=) to the specified 365 // indexEntryFor identifies the log index entry closest (<=) to the specified
309 // index. 366 // index.
310 // 367 //
311 // If the first index entry is greater than our search index, -1 will be 368 // If the first index entry is greater than our search index, -1 will be
312 // returned. This should never happen in practice, though, since our index 369 // returned. This should never happen in practice, though, since our index
313 // construction always indexes log entry #0. 370 // construction always indexes log entry #0.
314 // 371 //
315 // It does this by performing a binary search over the index entries. 372 // It does this by performing a binary search over the index entries.
316 func indexEntryFor(entries []*logpb.LogIndex_Entry, i types.MessageIndex) int { 373 func indexEntryFor(entries []*logpb.LogIndex_Entry, i types.MessageIndex) int {
317 ui := uint64(i) 374 ui := uint64(i)
318 s := sort.Search(len(entries), func(i int) bool { 375 s := sort.Search(len(entries), func(i int) bool {
319 return entries[i].StreamIndex > ui 376 return entries[i].StreamIndex > ui
320 }) 377 })
321 378
322 // The returned index is the one immediately after the index that we wan t. If 379 // The returned index is the one immediately after the index that we wan t. If
323 // our search returned 0, the first index entry is > our search entry, a nd we 380 // our search returned 0, the first index entry is > our search entry, a nd we
324 // will return nil. 381 // will return nil.
325 return s - 1 382 return s - 1
326 } 383 }
OLDNEW
« no previous file with comments | « common/recordio/size_test.go ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698