Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(994)

Side by Side Diff: logdog/common/storage/archive/storage.go

Issue 2422393002: Fix sparse index handling, add index params. (Closed)
Patch Set: Created 4 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The LUCI Authors. All rights reserved. 1 // Copyright 2015 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0 2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file. 3 // that can be found in the LICENSE file.
4 4
5 // Package archive implements a storage.Storage instance that retrieves logs 5 // Package archive implements a storage.Storage instance that retrieves logs
6 // from a Google Storage archive. 6 // from a Google Storage archive.
7 // 7 //
8 // This is a special implementation of storage.Storage, and does not fully 8 // This is a special implementation of storage.Storage, and does not fully
9 // conform to the API expecations. Namely: 9 // conform to the API expecations. Namely:
10 // - It is read-only. Mutation methods will return storage.ErrReadOnly. 10 // - It is read-only. Mutation methods will return storage.ErrReadOnly.
(...skipping 97 matching lines...) Expand 10 before | Expand all | Expand 10 after
108 func (s *storageImpl) Put(storage.PutRequest) error { return storage.ErrReadOnly } 108 func (s *storageImpl) Put(storage.PutRequest) error { return storage.ErrReadOnly }
109 109
110 func (s *storageImpl) Get(req storage.GetRequest, cb storage.GetCallback) error { 110 func (s *storageImpl) Get(req storage.GetRequest, cb storage.GetCallback) error {
111 idx, err := s.getIndex() 111 idx, err := s.getIndex()
112 if err != nil { 112 if err != nil {
113 return err 113 return err
114 } 114 }
115 115
116 // Identify the byte offsets that we want to fetch from the entries stre am. 116 // Identify the byte offsets that we want to fetch from the entries stre am.
117 st := s.buildGetStrategy(&req, idx) 117 st := s.buildGetStrategy(&req, idx)
118 » if st.lastIndex == -1 || req.Index > st.lastIndex { 118 » if st.lastIndex >= 0 && req.Index > st.lastIndex {
119 » » // No records to read. 119 » » // We know the last index, and the user requested logs past it, so there are
120 » » // no records to read.
120 return nil 121 return nil
121 } 122 }
122 123
123 offset := int64(st.startOffset) 124 offset := int64(st.startOffset)
124 log.Fields{ 125 log.Fields{
125 "offset": offset, 126 "offset": offset,
126 "length": st.length(), 127 "length": st.length(),
127 "path": s.streamPath, 128 "path": s.streamPath,
128 }.Debugf(s, "Creating stream reader for range.") 129 }.Debugf(s, "Creating stream reader for range.")
129 r, err := s.Client.NewReader(s.streamPath, offset, st.length()) 130 r, err := s.Client.NewReader(s.streamPath, offset, st.length())
(...skipping 175 matching lines...) Expand 10 before | Expand all | Expand 10 after
305 306
306 // setEndOffset sets the `length` field. If called multiple times, the smallest 307 // setEndOffset sets the `length` field. If called multiple times, the smallest
307 // assigned value will be retained. 308 // assigned value will be retained.
308 func (gs *getStrategy) setEndOffset(v uint64) { 309 func (gs *getStrategy) setEndOffset(v uint64) {
309 if gs.endOffset == 0 || gs.endOffset > v { 310 if gs.endOffset == 0 || gs.endOffset > v {
310 gs.endOffset = v 311 gs.endOffset = v
311 } 312 }
312 } 313 }
313 314
314 func (s *storageImpl) buildGetStrategy(req *storage.GetRequest, idx *logpb.LogIn dex) *getStrategy { 315 func (s *storageImpl) buildGetStrategy(req *storage.GetRequest, idx *logpb.LogIn dex) *getStrategy {
315 » st := getStrategy{} 316 » st := getStrategy{
317 » » lastIndex: -1,
318 » }
316 319
317 if len(idx.Entries) == 0 { 320 if len(idx.Entries) == 0 {
318 st.lastIndex = -1
319 return &st 321 return &st
320 } 322 }
321 323
322 » st.lastIndex = types.MessageIndex(idx.Entries[len(idx.Entries)-1].Stream Index) 324 » // If we have a log entry count, mark the last log index.
dnj 2016/10/17 22:12:12 The actual bug: if there is a sparse index whose l
325 » if idx.LogEntryCount > 0 {
326 » » st.lastIndex = types.MessageIndex(idx.Entries[len(idx.Entries)-1 ].StreamIndex)
327 » }
328
323 startIdx := indexEntryFor(idx.Entries, req.Index) 329 startIdx := indexEntryFor(idx.Entries, req.Index)
324 if startIdx < 0 { 330 if startIdx < 0 {
325 startIdx = 0 331 startIdx = 0
326 } 332 }
327 le := idx.Entries[startIdx] 333 le := idx.Entries[startIdx]
328 st.startOffset = le.Offset 334 st.startOffset = le.Offset
329 335
330 // Determine an upper bound based on our limits. 336 // Determine an upper bound based on our limits.
331 // 337 //
332 // If we have a count limit, and we have enough index entries to upper-b ound 338 // If we have a count limit, and we have enough index entries to upper-b ound
(...skipping 42 matching lines...) Expand 10 before | Expand all | Expand 10 after
375 ui := uint64(i) 381 ui := uint64(i)
376 s := sort.Search(len(entries), func(i int) bool { 382 s := sort.Search(len(entries), func(i int) bool {
377 return entries[i].StreamIndex > ui 383 return entries[i].StreamIndex > ui
378 }) 384 })
379 385
380 // The returned index is the one immediately after the index that we wan t. If 386 // The returned index is the one immediately after the index that we wan t. If
381 // our search returned 0, the first index entry is > our search entry, a nd we 387 // our search returned 0, the first index entry is > our search entry, a nd we
382 // will return nil. 388 // will return nil.
383 return s - 1 389 return s - 1
384 } 390 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698