Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 // Package archive implements a storage.Storage instance that retrieves logs | 5 // Package archive implements a storage.Storage instance that retrieves logs |
| 6 // from a Google Storage archive. | 6 // from a Google Storage archive. |
| 7 // | 7 // |
| 8 // This is a special implementation of storage.Storage, and does not fully | 8 // This is a special implementation of storage.Storage, and does not fully |
| 9 // conform to the API expecations. Namely: | 9 // conform to the API expecations. Namely: |
| 10 // - It is read-only. Mutation methods will return storage.ErrReadOnly. | 10 // - It is read-only. Mutation methods will return storage.ErrReadOnly. |
| (...skipping 97 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 108 func (s *storageImpl) Put(storage.PutRequest) error { return storage.ErrReadOnly } | 108 func (s *storageImpl) Put(storage.PutRequest) error { return storage.ErrReadOnly } |
| 109 | 109 |
| 110 func (s *storageImpl) Get(req storage.GetRequest, cb storage.GetCallback) error { | 110 func (s *storageImpl) Get(req storage.GetRequest, cb storage.GetCallback) error { |
| 111 idx, err := s.getIndex() | 111 idx, err := s.getIndex() |
| 112 if err != nil { | 112 if err != nil { |
| 113 return err | 113 return err |
| 114 } | 114 } |
| 115 | 115 |
| 116 // Identify the byte offsets that we want to fetch from the entries stre am. | 116 // Identify the byte offsets that we want to fetch from the entries stre am. |
| 117 st := s.buildGetStrategy(&req, idx) | 117 st := s.buildGetStrategy(&req, idx) |
| 118 » if st.lastIndex == -1 || req.Index > st.lastIndex { | 118 » if st.lastIndex >= 0 && req.Index > st.lastIndex { |
| 119 » » // No records to read. | 119 » » // We know the last index, and the user requested logs past it, so there are |
| 120 » » // no records to read. | |
| 120 return nil | 121 return nil |
| 121 } | 122 } |
| 122 | 123 |
| 123 offset := int64(st.startOffset) | 124 offset := int64(st.startOffset) |
| 124 log.Fields{ | 125 log.Fields{ |
| 125 "offset": offset, | 126 "offset": offset, |
| 126 "length": st.length(), | 127 "length": st.length(), |
| 127 "path": s.streamPath, | 128 "path": s.streamPath, |
| 128 }.Debugf(s, "Creating stream reader for range.") | 129 }.Debugf(s, "Creating stream reader for range.") |
| 129 r, err := s.Client.NewReader(s.streamPath, offset, st.length()) | 130 r, err := s.Client.NewReader(s.streamPath, offset, st.length()) |
| (...skipping 175 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 305 | 306 |
| 306 // setEndOffset sets the `length` field. If called multiple times, the smallest | 307 // setEndOffset sets the `length` field. If called multiple times, the smallest |
| 307 // assigned value will be retained. | 308 // assigned value will be retained. |
| 308 func (gs *getStrategy) setEndOffset(v uint64) { | 309 func (gs *getStrategy) setEndOffset(v uint64) { |
| 309 if gs.endOffset == 0 || gs.endOffset > v { | 310 if gs.endOffset == 0 || gs.endOffset > v { |
| 310 gs.endOffset = v | 311 gs.endOffset = v |
| 311 } | 312 } |
| 312 } | 313 } |
| 313 | 314 |
| 314 func (s *storageImpl) buildGetStrategy(req *storage.GetRequest, idx *logpb.LogIn dex) *getStrategy { | 315 func (s *storageImpl) buildGetStrategy(req *storage.GetRequest, idx *logpb.LogIn dex) *getStrategy { |
| 315 » st := getStrategy{} | 316 » st := getStrategy{ |
| 317 » » lastIndex: -1, | |
| 318 » } | |
| 316 | 319 |
| 317 if len(idx.Entries) == 0 { | 320 if len(idx.Entries) == 0 { |
| 318 st.lastIndex = -1 | |
| 319 return &st | 321 return &st |
| 320 } | 322 } |
| 321 | 323 |
| 322 » st.lastIndex = types.MessageIndex(idx.Entries[len(idx.Entries)-1].Stream Index) | 324 » // If we have a log entry count, mark the last log index. |
|
dnj
2016/10/17 22:12:12
The actual bug: if there is a sparse index whose l
| |
| 325 » if idx.LogEntryCount > 0 { | |
| 326 » » st.lastIndex = types.MessageIndex(idx.Entries[len(idx.Entries)-1 ].StreamIndex) | |
| 327 » } | |
| 328 | |
| 323 startIdx := indexEntryFor(idx.Entries, req.Index) | 329 startIdx := indexEntryFor(idx.Entries, req.Index) |
| 324 if startIdx < 0 { | 330 if startIdx < 0 { |
| 325 startIdx = 0 | 331 startIdx = 0 |
| 326 } | 332 } |
| 327 le := idx.Entries[startIdx] | 333 le := idx.Entries[startIdx] |
| 328 st.startOffset = le.Offset | 334 st.startOffset = le.Offset |
| 329 | 335 |
| 330 // Determine an upper bound based on our limits. | 336 // Determine an upper bound based on our limits. |
| 331 // | 337 // |
| 332 // If we have a count limit, and we have enough index entries to upper-b ound | 338 // If we have a count limit, and we have enough index entries to upper-b ound |
| (...skipping 42 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 375 ui := uint64(i) | 381 ui := uint64(i) |
| 376 s := sort.Search(len(entries), func(i int) bool { | 382 s := sort.Search(len(entries), func(i int) bool { |
| 377 return entries[i].StreamIndex > ui | 383 return entries[i].StreamIndex > ui |
| 378 }) | 384 }) |
| 379 | 385 |
| 380 // The returned index is the one immediately after the index that we wan t. If | 386 // The returned index is the one immediately after the index that we wan t. If |
| 381 // our search returned 0, the first index entry is > our search entry, a nd we | 387 // our search returned 0, the first index entry is > our search entry, a nd we |
| 382 // will return nil. | 388 // will return nil. |
| 383 return s - 1 | 389 return s - 1 |
| 384 } | 390 } |
| OLD | NEW |