Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(615)

Side by Side Diff: logdog/common/storage/archive/storage.go

Issue 2435883002: LogDog: Fix archival Get/Tail implementations. (Closed)
Patch Set: LogDog: Fix archival Get/Tail implementations. Created 4 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The LUCI Authors. All rights reserved. 1 // Copyright 2015 The LUCI Authors. All rights reserved.
dnj 2016/10/19 23:18:59 This file changed bigly.
2 // Use of this source code is governed under the Apache License, Version 2.0 2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file. 3 // that can be found in the LICENSE file.
4 4
5 // Package archive implements a storage.Storage instance that retrieves logs 5 // Package archive implements a storage.Storage instance that retrieves logs
6 // from a Google Storage archive. 6 // from a Google Storage archive.
7 // 7 //
8 // This is a special implementation of storage.Storage, and does not fully 8 // This is a special implementation of storage.Storage, and does not fully
9 // conform to the API expecations. Namely: 9 // conform to the API expecations. Namely:
10 // - It is read-only. Mutation methods will return storage.ErrReadOnly. 10 // - It is read-only. Mutation methods will return storage.ErrReadOnly.
11 // - Storage methods ignore the supplied Path argument, instead opting for 11 // - Storage methods ignore the supplied Path argument, instead opting for
12 // the archive configured in its Options. 12 // the archive configured in its Options.
13 package archive 13 package archive
14 14
15 import ( 15 import (
16 "bytes" 16 "bytes"
17 "fmt" 17 "fmt"
18 "io" 18 "io"
19 "io/ioutil" 19 "io/ioutil"
20 "sort" 20 "sort"
21 "sync" 21 "sync"
22 22
23 "golang.org/x/net/context"
24
25 "github.com/golang/protobuf/proto"
26 "github.com/luci/luci-go/common/config" 23 "github.com/luci/luci-go/common/config"
27 "github.com/luci/luci-go/common/data/recordio" 24 "github.com/luci/luci-go/common/data/recordio"
25 "github.com/luci/luci-go/common/errors"
28 "github.com/luci/luci-go/common/gcloud/gs" 26 "github.com/luci/luci-go/common/gcloud/gs"
29 "github.com/luci/luci-go/common/iotools" 27 "github.com/luci/luci-go/common/iotools"
30 log "github.com/luci/luci-go/common/logging" 28 log "github.com/luci/luci-go/common/logging"
31 "github.com/luci/luci-go/logdog/api/logpb" 29 "github.com/luci/luci-go/logdog/api/logpb"
32 "github.com/luci/luci-go/logdog/common/storage" 30 "github.com/luci/luci-go/logdog/common/storage"
33 "github.com/luci/luci-go/logdog/common/types" 31 "github.com/luci/luci-go/logdog/common/types"
32
33 cloudStorage "cloud.google.com/go/storage"
34 "github.com/golang/protobuf/proto"
35 "golang.org/x/net/context"
34 ) 36 )
35 37
36 const ( 38 const (
37 // maxStreamRecordSize is the maximum record size we're willing to read from 39 // maxStreamRecordSize is the maximum record size we're willing to read from
38 // our archived log stream. This will help prevent out-of-memory errors if the 40 // our archived log stream. This will help prevent out-of-memory errors if the
39 // arhived log stream is malicious or corrupt. 41 // arhived log stream is malicious or corrupt.
40 » maxStreamRecordSize = 16 * 1024 * 1024 42 » //
43 » // 16MB is larger than the maximum log entry size
44 » maxStreamRecordSize = 2 * types.MaxLogEntryDataSize
41 ) 45 )
42 46
43 // Options is the set of configuration options for this Storage instance. 47 // Options is the set of configuration options for this Storage instance.
44 // 48 //
45 // Unlike other Storage instances, this is bound to a single archived stream. 49 // Unlike other Storage instances, this is bound to a single archived stream.
46 // Project and Path parameters in requests will be ignored in favor of the 50 // Project and Path parameters in requests will be ignored in favor of the
47 // Google Storage URLs. 51 // Google Storage URLs.
48 type Options struct { 52 type Options struct {
49 // IndexURL is the Google Storage URL for the stream's index. 53 // IndexURL is the Google Storage URL for the stream's index.
50 IndexURL string 54 IndexURL string
51 // StreamURL is the Google Storage URL for the stream's entries. 55 // StreamURL is the Google Storage URL for the stream's entries.
52 StreamURL string 56 StreamURL string
53 57
54 // Client is the HTTP client to use for authentication. 58 // Client is the HTTP client to use for authentication.
55 // 59 //
56 // Closing this Storage instance does not close the underlying Client. 60 // Closing this Storage instance does not close the underlying Client.
57 Client gs.Client 61 Client gs.Client
58
59 // MaxBytes, if >0, is the maximum number of bytes to fetch in any given
60 // request. This should be set for GAE fetches, as large log streams may
61 // exceed the urlfetch system's maximum response size otherwise.
62 //
63 // This is the number of bytes to request, not the number of bytes of lo g data
64 // to return. The difference is that the former includes the RecordIO fr ame
65 // headers.
66 MaxBytes int
67 } 62 }
68 63
69 type storageImpl struct { 64 type storageImpl struct {
70 *Options 65 *Options
71 context.Context 66 context.Context
72 67
73 streamPath gs.Path 68 streamPath gs.Path
74 indexPath gs.Path 69 indexPath gs.Path
75 70
76 » indexMu sync.Mutex 71 » indexMu sync.Mutex
77 » index *logpb.LogIndex 72 » index *logpb.LogIndex
78 » closeClient bool
79 } 73 }
80 74
81 // New instantiates a new Storage instance, bound to the supplied Options. 75 // New instantiates a new Storage instance, bound to the supplied Options.
82 func New(ctx context.Context, o Options) (storage.Storage, error) { 76 func New(ctx context.Context, o Options) (storage.Storage, error) {
83 s := storageImpl{ 77 s := storageImpl{
84 Options: &o, 78 Options: &o,
85 Context: ctx, 79 Context: ctx,
86 80
87 streamPath: gs.Path(o.StreamURL), 81 streamPath: gs.Path(o.StreamURL),
88 indexPath: gs.Path(o.IndexURL), 82 indexPath: gs.Path(o.IndexURL),
89 } 83 }
90 84
91 if !s.streamPath.IsFullPath() { 85 if !s.streamPath.IsFullPath() {
92 return nil, fmt.Errorf("invalid stream URL: %q", s.streamPath) 86 return nil, fmt.Errorf("invalid stream URL: %q", s.streamPath)
93 } 87 }
94 » if !s.indexPath.IsFullPath() { 88 » if s.indexPath != "" && !s.indexPath.IsFullPath() {
95 return nil, fmt.Errorf("invalid index URL: %v", s.indexPath) 89 return nil, fmt.Errorf("invalid index URL: %v", s.indexPath)
96 } 90 }
97 91
98 return &s, nil 92 return &s, nil
99 } 93 }
100 94
101 func (s *storageImpl) Close() { 95 func (s *storageImpl) Close() {}
102 » if s.closeClient {
103 » » _ = s.Client.Close()
104 » }
105 }
106 96
107 func (s *storageImpl) Config(storage.Config) error { return storage.ErrReadOnly } 97 func (s *storageImpl) Config(storage.Config) error { return storage.ErrReadOnly }
108 func (s *storageImpl) Put(storage.PutRequest) error { return storage.ErrReadOnly } 98 func (s *storageImpl) Put(storage.PutRequest) error { return storage.ErrReadOnly }
109 99
110 func (s *storageImpl) Get(req storage.GetRequest, cb storage.GetCallback) error { 100 func (s *storageImpl) Get(req storage.GetRequest, cb storage.GetCallback) error {
111 idx, err := s.getIndex() 101 idx, err := s.getIndex()
112 if err != nil { 102 if err != nil {
113 return err 103 return err
114 } 104 }
115 105
116 // Identify the byte offsets that we want to fetch from the entries stre am. 106 // Identify the byte offsets that we want to fetch from the entries stre am.
117 » st := s.buildGetStrategy(&req, idx) 107 » st := buildGetStrategy(&req, idx)
118 » if st.lastIndex >= 0 && req.Index > st.lastIndex { 108 » if st == nil {
119 » » // We know the last index, and the user requested logs past it, so there are 109 » » // No more records to read.
120 » » // no records to read.
121 return nil 110 return nil
122 } 111 }
123 112
124 » offset := int64(st.startOffset) 113 » switch err := s.getLogEntriesIter(st, cb); errors.Unwrap(err) {
114 » case nil, io.EOF:
115 » » // We hit the end of our log stream.
116 » » return nil
117
118 » case cloudStorage.ErrObjectNotExist, cloudStorage.ErrBucketNotExist:
119 » » return storage.ErrDoesNotExist
120
121 » default:
122 » » return errors.Annotate(err).Reason("failed to read log stream"). Err()
123 » }
124 }
125
126 // getLogEntriesImpl retrieves log entries from archive until complete.
127 func (s *storageImpl) getLogEntriesIter(st *getStrategy, cb storage.GetCallback) error {
128 » // Get our maximum byte limit. If we are externally constrained via MaxB ytes,
129 » // apply that limit too.
130 » // Get an archive reader.
131 » var (
132 » » offset = st.startOffset
133 » » length = st.length()
134 » )
135
125 log.Fields{ 136 log.Fields{
126 "offset": offset, 137 "offset": offset,
127 » » "length": st.length(), 138 » » "length": length,
128 "path": s.streamPath, 139 "path": s.streamPath,
129 }.Debugf(s, "Creating stream reader for range.") 140 }.Debugf(s, "Creating stream reader for range.")
130 » r, err := s.Client.NewReader(s.streamPath, offset, st.length()) 141 » storageReader, err := s.Client.NewReader(s.streamPath, int64(offset), le ngth)
131 if err != nil { 142 if err != nil {
132 log.WithError(err).Errorf(s, "Failed to create stream Reader.") 143 log.WithError(err).Errorf(s, "Failed to create stream Reader.")
133 » » return err 144 » » return errors.Annotate(err).Reason("failed to create stream Read er").Err()
134 } 145 }
135 defer func() { 146 defer func() {
136 » » if err := r.Close(); err != nil { 147 » » if tmpErr := storageReader.Close(); tmpErr != nil {
137 » » » log.WithError(err).Warningf(s, "Error closing stream Rea der.") 148 » » » // (Non-fatal)
149 » » » log.WithError(tmpErr).Warningf(s, "Error closing stream Reader.")
138 } 150 }
139 }() 151 }()
140 cr := iotools.CountingReader{Reader: r}
141 rio := recordio.NewReader(&cr, maxStreamRecordSize)
142 152
143 » buf := bytes.Buffer{} 153 » // Count how many bytes we've read.
144 » le := logpb.LogEntry{} 154 » cr := iotools.CountingReader{Reader: storageReader}
145 » max := st.count 155
156 » // Iteratively update our strategy's start offset each time we read a co mplete
157 » // frame.
158 » var (
159 » » rio = recordio.NewReader(&cr, maxStreamRecordSize)
160 » » buf bytes.Buffer
161 » » remaining = st.count
162 » )
146 for { 163 for {
147 » » offset += cr.Count() 164 » » // Reset the count so we know how much we read for this frame.
165 » » cr.Count = 0
148 166
149 sz, r, err := rio.ReadFrame() 167 sz, r, err := rio.ReadFrame()
150 » » switch err { 168 » » if err != nil {
151 » » case nil: 169 » » » return errors.Annotate(err).Reason("failed to read frame ").Err()
152 » » » break
153
154 » » case io.EOF:
155 » » » return nil
156
157 » » default:
158 » » » log.Fields{
159 » » » » log.ErrorKey: err,
160 » » » » "index": idx,
161 » » » » "offset": offset,
162 » » » }.Errorf(s, "Failed to read next frame.")
163 » » » return err
164 } 170 }
165 171
166 buf.Reset() 172 buf.Reset()
167 buf.Grow(int(sz)) 173 buf.Grow(int(sz))
168 » » if _, err := buf.ReadFrom(r); err != nil { 174
175 » » switch amt, err := buf.ReadFrom(r); {
176 » » case err != nil:
169 log.Fields{ 177 log.Fields{
170 » » » » log.ErrorKey: err, 178 » » » » log.ErrorKey: err,
171 » » » » "offset": offset, 179 » » » » "frameOffset": offset,
172 » » » » "frameSize": sz, 180 » » » » "frameSize": sz,
173 }.Errorf(s, "Failed to read frame data.") 181 }.Errorf(s, "Failed to read frame data.")
174 » » » return err 182 » » » return errors.Annotate(err).Reason("failed to read frame data").Err()
183
184 » » case amt != sz:
185 » » » // If we didn't buffer the complete frame, we hit a prem ature EOF.
186 » » » return errors.Annotate(io.EOF).Reason("incomplete frame read").Err()
175 } 187 }
176 188
177 » » if err := proto.Unmarshal(buf.Bytes(), &le); err != nil { 189 » » // If we read from offset 0, the first frame will be the log str eam's
178 » » » log.Fields{ 190 » » // descriptor, which we can discard.
179 » » » » log.ErrorKey: err, 191 » » discardFrame := (offset == 0)
180 » » » » "offset": offset, 192 » » offset += uint64(cr.Count)
181 » » » » "frameSize": sz, 193 » » if discardFrame {
182 » » » }.Errorf(s, "Failed to unmarshal log data.") 194 » » » continue
183 » » » return err
184 } 195 }
185 196
186 » » idx := types.MessageIndex(le.StreamIndex) 197 » » // Punt this log entry to our callback, if appropriate.
187 » » if idx < req.Index { 198 » » entry := storage.MakeEntry(buf.Bytes(), -1)
199 » » switch idx, err := entry.GetStreamIndex(); {
200 » » case err != nil:
201 » » » log.Fields{
202 » » » » log.ErrorKey: err,
203 » » » » "frameOffset": offset,
204 » » » » "frameSize": sz,
205 » » » }.Errorf(s, "Failed to get log entry index.")
206 » » » return errors.Annotate(err).Reason("failed to get log en try index").Err()
207
208 » » case idx < st.startIndex:
188 // Skip this entry, as it's before the first requested e ntry. 209 // Skip this entry, as it's before the first requested e ntry.
189 continue 210 continue
190 } 211 }
191 212
192 » » d := make([]byte, buf.Len()) 213 » » // We want to punt this entry, but we also want to re-use our Bu ffer. Clone
193 » » copy(d, buf.Bytes()) 214 » » // its data so it is independent.
194 » » if !cb(idx, d) { 215 » » entry.D = make([]byte, len(entry.D))
195 » » » break 216 » » copy(entry.D, buf.Bytes())
217 » » if !cb(entry) {
218 » » » return nil
196 } 219 }
197 220
198 // Enforce our limit, if one is supplied. 221 // Enforce our limit, if one is supplied.
199 » » if max > 0 { 222 » » if remaining > 0 {
200 » » » max-- 223 » » » remaining--
201 » » » if max == 0 { 224 » » » if remaining == 0 {
202 » » » » break 225 » » » » return nil
203 } 226 }
204 } 227 }
205 } 228 }
206 return nil
207 } 229 }
208 230
209 func (s *storageImpl) Tail(project config.ProjectName, path types.StreamPath) ([ ]byte, types.MessageIndex, error) { 231 func (s *storageImpl) Tail(project config.ProjectName, path types.StreamPath) (* storage.Entry, error) {
210 idx, err := s.getIndex() 232 idx, err := s.getIndex()
211 if err != nil { 233 if err != nil {
212 » » return nil, 0, err 234 » » return nil, err
213 } 235 }
214 236
215 » // Get the offset of the last record. 237 » // Get the offset that is as close to our tail record as possible. If we know
216 » if len(idx.Entries) == 0 { 238 » // what that index is (from "idx"), we can request it directly. Otherwis e, we
217 » » return nil, 0, nil 239 » // will get as close as possible and read forwards from there.
218 » } 240 » req := storage.GetRequest{}
219 » lle := idx.Entries[len(idx.Entries)-1] 241 » switch {
242 » case idx.LastStreamIndex > 0:
243 » » req.Index = types.MessageIndex(idx.LastStreamIndex)
244 » » req.Limit = 1
220 245
221 » // Get a Reader for the Tail entry. 246 » case len(idx.Entries) > 0:
222 » r, err := s.Client.NewReader(s.streamPath, int64(lle.Offset), -1) 247 » » req.Index = types.MessageIndex(idx.Entries[len(idx.Entries)-1].S treamIndex)
223 » if err != nil {
224 » » log.Fields{
225 » » » log.ErrorKey: err,
226 » » » "offset": lle.Offset,
227 » » }.Errorf(s, "Failed to create reader.")
228 » » return nil, 0, err
229 » }
230 » defer func() {
231 » » if err := r.Close(); err != nil {
232 » » » log.WithError(err).Warningf(s, "Failed to close Reader." )
233 » » }
234 » }()
235
236 » rio := recordio.NewReader(r, maxStreamRecordSize)
237 » d, err := rio.ReadFrameAll()
238 » if err != nil {
239 » » log.WithError(err).Errorf(s, "Failed to read log frame.")
240 » » return nil, 0, err
241 } 248 }
242 249
243 » return d, types.MessageIndex(lle.StreamIndex), nil 250 » // Build a Get strategy for our closest-to-Tail index.
251 » st := buildGetStrategy(&req, idx)
252 » if st == nil {
253 » » return nil, storage.ErrDoesNotExist
254 » }
255
256 » // Read forwards to EOF. Retain the last entry that we read.
257 » var lastEntry *storage.Entry
258 » err = s.Get(req, func(e *storage.Entry) bool {
259 » » lastEntry = e
260
261 » » // We can stop if we have the last stream index and this is that index.
262 » » if idx.LastStreamIndex > 0 {
263 » » » // Get the index for this entry.
264 » » » //
265 » » » // We can ignore this error, since "Get" will have alrea dy resolved the
266 » » » // index successfully.
267 » » » if sidx, _ := e.GetStreamIndex(); sidx == types.MessageI ndex(idx.LastStreamIndex) {
268 » » » » return false
269 » » » }
270 » » }
271 » » return true
272 » })
273 » switch {
274 » case err != nil:
275 » » return nil, err
276
277 » case lastEntry == nil:
278 » » return nil, storage.ErrDoesNotExist
279
280 » default:
281 » » return lastEntry, nil
282 » }
244 } 283 }
245 284
246 // getIndex returns the cached log stream index, fetching it if necessary. 285 // getIndex returns the cached log stream index, fetching it if necessary.
247 func (s *storageImpl) getIndex() (*logpb.LogIndex, error) { 286 func (s *storageImpl) getIndex() (*logpb.LogIndex, error) {
248 s.indexMu.Lock() 287 s.indexMu.Lock()
249 defer s.indexMu.Unlock() 288 defer s.indexMu.Unlock()
250 289
251 if s.index == nil { 290 if s.index == nil {
252 » » r, err := s.Client.NewReader(s.indexPath, 0, -1) 291 » » index, err := loadIndex(s, s.Client, s.indexPath)
253 » » if err != nil { 292 » » switch errors.Unwrap(err) {
254 » » » log.WithError(err).Errorf(s, "Failed to create index Rea der.") 293 » » case nil:
255 » » » return nil, err 294 » » » break
256 » » } 295
257 » » defer func() { 296 » » case cloudStorage.ErrBucketNotExist, cloudStorage.ErrObjectNotEx ist:
258 » » » if err := r.Close(); err != nil { 297 » » » // Treat a missing index the same as an empty index.
259 » » » » log.WithError(err).Warningf(s, "Error closing in dex Reader.") 298 » » » log.WithError(err).Warningf(s, "Index is invalid, using empty index.")
260 » » » } 299 » » » index = &logpb.LogIndex{}
261 » » }() 300
262 » » indexData, err := ioutil.ReadAll(r) 301 » » default:
263 » » if err != nil {
264 » » » log.WithError(err).Errorf(s, "Failed to read index.")
265 return nil, err 302 return nil, err
266 } 303 }
267 304
268 » » index := logpb.LogIndex{} 305 » » s.index = index
269 » » if err := proto.Unmarshal(indexData, &index); err != nil {
270 » » » log.WithError(err).Errorf(s, "Failed to unmarshal index. ")
271 » » » return nil, err
272 » » }
273
274 » » s.index = &index
275 } 306 }
276 return s.index, nil 307 return s.index, nil
277 } 308 }
278 309
310 func loadIndex(c context.Context, client gs.Client, path gs.Path) (*logpb.LogInd ex, error) {
311 // If there is no path, then return an empty index.
312 if path == "" {
313 log.Infof(c, "No index path, using empty index.")
314 return &logpb.LogIndex{}, nil
315 }
316
317 r, err := client.NewReader(path, 0, -1)
318 if err != nil {
319 log.WithError(err).Errorf(c, "Failed to create index Reader.")
320 return nil, errors.Annotate(err).Reason("failed to create index Reader").Err()
321 }
322 defer func() {
323 if err := r.Close(); err != nil {
324 log.WithError(err).Warningf(c, "Error closing index Read er.")
325 }
326 }()
327 indexData, err := ioutil.ReadAll(r)
328 if err != nil {
329 log.WithError(err).Errorf(c, "Failed to read index.")
330 return nil, errors.Annotate(err).Reason("failed to read index"). Err()
331 }
332
333 index := logpb.LogIndex{}
334 if err := proto.Unmarshal(indexData, &index); err != nil {
335 log.WithError(err).Errorf(c, "Failed to unmarshal index.")
336 return nil, errors.Annotate(err).Reason("failed to unmarshal ind ex").Err()
337 }
338
339 return &index, nil
340 }
341
279 type getStrategy struct { 342 type getStrategy struct {
280 » // startOffset is the beginning byte offset of the log entry stream. 343 » // startIndex is desired initial log entry index.
344 » startIndex types.MessageIndex
345
346 » // startOffset is the beginning byte offset of the log entry stream. Thi s may
347 » // be lower than the offset of the starting record if the index is spars e.
281 startOffset uint64 348 startOffset uint64
282 » // endOffset is the ending byte offset of the log entry stream. 349 » // endOffset is the ending byte offset of the log entry stream. This wil l be
350 » // 0 if an end offset is not known.
283 endOffset uint64 351 endOffset uint64
284 352
285 » // count is the number of log entries that will be fetched. 353 » // count is the number of log entries that will be fetched. If 0, no upp er
286 » count int 354 » // bound was calculated.
287 » // lastIndex is the last log entry index in the stream. This will be -1 if 355 » count uint64
288 » // there are no entries in the stream.
289 » lastIndex types.MessageIndex
290 } 356 }
291 357
292 func (gs *getStrategy) length() int64 { 358 func (gs *getStrategy) length() int64 {
293 if gs.startOffset < gs.endOffset { 359 if gs.startOffset < gs.endOffset {
294 return int64(gs.endOffset - gs.startOffset) 360 return int64(gs.endOffset - gs.startOffset)
295 } 361 }
296 return -1 362 return -1
297 } 363 }
298 364
299 // setCount sets the `count` field. If called multiple times, the smallest 365 // setCount sets the `count` field. If called multiple times, the smallest
300 // assigned value will be retained. 366 // assigned value will be retained.
301 func (gs *getStrategy) setCount(v int) { 367 func (gs *getStrategy) setCount(v uint64) {
302 » if gs.count <= 0 || gs.count > v { 368 » if gs.count == 0 || gs.count > v {
303 gs.count = v 369 gs.count = v
304 } 370 }
305 } 371 }
306 372
307 // setEndOffset sets the `length` field. If called multiple times, the smallest 373 func buildGetStrategy(req *storage.GetRequest, idx *logpb.LogIndex) *getStrategy {
308 // assigned value will be retained.
309 func (gs *getStrategy) setEndOffset(v uint64) {
310 » if gs.endOffset == 0 || gs.endOffset > v {
311 » » gs.endOffset = v
312 » }
313 }
314
315 func (s *storageImpl) buildGetStrategy(req *storage.GetRequest, idx *logpb.LogIn dex) *getStrategy {
316 st := getStrategy{ 374 st := getStrategy{
317 » » lastIndex: -1, 375 » » startIndex: req.Index,
318 } 376 }
319 377
320 » if len(idx.Entries) == 0 { 378 » // If the user has requested an index past the end of the stream, return no
321 » » return &st 379 » // entries (count == 0). This only works if the last stream index is kno wn.
380 » if idx.LastStreamIndex > 0 && req.Index > types.MessageIndex(idx.LastStr eamIndex) {
381 » » return nil
322 } 382 }
323 383
324 » // If we have a log entry count, mark the last log index. 384 » // Identify the closest index entry to the requested log.
325 » if idx.LogEntryCount > 0 { 385 » //
326 » » st.lastIndex = types.MessageIndex(idx.Entries[len(idx.Entries)-1 ].StreamIndex) 386 » // If the requested log starts before the first index entry, we must rea d from
387 » // record #0.
388 » startIndexEntry := indexEntryFor(idx.Entries, req.Index)
389 » if startIndexEntry >= 0 {
390 » » st.startOffset = idx.Entries[startIndexEntry].Offset
327 } 391 }
328 392
329 startIdx := indexEntryFor(idx.Entries, req.Index)
330 if startIdx < 0 {
331 startIdx = 0
332 }
333 le := idx.Entries[startIdx]
334 st.startOffset = le.Offset
335
336 // Determine an upper bound based on our limits. 393 // Determine an upper bound based on our limits.
337 // 394 //
338 » // If we have a count limit, and we have enough index entries to upper-b ound 395 » // If we have a count limit, identify the maximum entry that can be load ed,
339 » // our stream based on that limit, use that. Note that this may overshoo t if 396 » // find the index entry closest to it, and use that to determine our upp er
340 » // the index and/or stream is sparse. We know for sure that we have one 397 » // bound.
341 » // LogEntry per index entry, so that's the best we can do.
342 if req.Limit > 0 { 398 if req.Limit > 0 {
343 » » if ub := startIdx + req.Limit; ub < len(idx.Entries) { 399 » » st.setCount(uint64(req.Limit))
344 » » » st.setEndOffset(idx.Entries[ub].Offset) 400
401 » » // Find the index entry for the stream entry AFTER the last one we are going
402 » » // to return.
403 » » entryAfterGetBlock := req.Index + types.MessageIndex(req.Limit)
404 » » endIndexEntry := indexEntryFor(idx.Entries, entryAfterGetBlock)
405 » » switch {
406 » » case endIndexEntry < 0:
407 » » » // The last possible request log entry is before the fir st index entry.
408 » » » // Read up to the first index entry.
409 » » » endIndexEntry = 0
410
411 » » case endIndexEntry <= startIndexEntry:
412 » » » // The last possible request log entry is closest to the start index
413 » » » // entry. Use the index entry immediately after it.
414 » » » endIndexEntry = startIndexEntry + 1
415
416 » » default:
417 » » » // We have the index entry <= the stream entry after the last one that we
418 » » » // will return.
419 » » » //
420 » » » // If we're sparse, this could be the index at or before our last entry.
421 » » » // If this is the case, use the next index entry, which will be after
422 » » » // "entryAfterGetBlock" (EAGB).
423 » » » //
424 » » » // START ------ LIMIT (LIMIT+1)
425 » » » // | [IDX] | [IDX]
426 » » » // index | entryAfterGetBlock |
427 » » » // endIndexEntry (endIndexEntry+1)
428 » » » if types.MessageIndex(idx.Entries[endIndexEntry].StreamI ndex) < entryAfterGetBlock {
429 » » » » endIndexEntry++
430 » » » }
345 } 431 }
346 » » st.setCount(req.Limit) 432
433 » » // If we're pointing to a valid index entry, set our upper bound .
434 » » if endIndexEntry < len(idx.Entries) {
435 » » » st.endOffset = idx.Entries[endIndexEntry].Offset
436 » » }
347 } 437 }
348 438
349 // If we have a byte limit, count the entry sizes until we reach that li mit.
350 if mb := int64(s.MaxBytes); mb > 0 {
351 mb := uint64(mb)
352
353 for i, e := range idx.Entries[startIdx:] {
354 if e.Offset < st.startOffset {
355 // This shouldn't really happen, but it could ha ppen if there is a
356 // corrupt index.
357 continue
358 }
359
360 // Calculate the request offset and truncate if we've ex ceeded our maximum
361 // request bytes.
362 if size := (e.Offset - st.startOffset); size > mb {
363 st.setEndOffset(e.Offset)
364 st.setCount(i)
365 break
366 }
367 }
368 }
369 return &st 439 return &st
370 } 440 }
371 441
372 // indexEntryFor identifies the log index entry closest (<=) to the specified 442 // indexEntryFor identifies the log index entry closest (<=) to the specified
373 // index. 443 // index.
374 // 444 //
375 // If the first index entry is greater than our search index, -1 will be 445 // If the first index entry is greater than our search index, -1 will be
376 // returned. This should never happen in practice, though, since our index 446 // returned. This should never happen in practice, though, since our index
377 // construction always indexes log entry #0. 447 // construction always indexes log entry #0.
378 // 448 //
379 // It does this by performing a binary search over the index entries. 449 // It does this by performing a binary search over the index entries.
380 func indexEntryFor(entries []*logpb.LogIndex_Entry, i types.MessageIndex) int { 450 func indexEntryFor(entries []*logpb.LogIndex_Entry, i types.MessageIndex) int {
381 ui := uint64(i) 451 ui := uint64(i)
382 s := sort.Search(len(entries), func(i int) bool { 452 s := sort.Search(len(entries), func(i int) bool {
383 return entries[i].StreamIndex > ui 453 return entries[i].StreamIndex > ui
384 }) 454 })
385 455
386 // The returned index is the one immediately after the index that we wan t. If 456 // The returned index is the one immediately after the index that we wan t. If
387 // our search returned 0, the first index entry is > our search entry, a nd we 457 // our search returned 0, the first index entry is > our search entry, a nd we
388 // will return nil. 458 // will return nil.
389 return s - 1 459 return s - 1
390 } 460 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698