Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
|
dnj
2016/10/19 23:18:59
This file changed bigly.
| |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 // Package archive implements a storage.Storage instance that retrieves logs | 5 // Package archive implements a storage.Storage instance that retrieves logs |
| 6 // from a Google Storage archive. | 6 // from a Google Storage archive. |
| 7 // | 7 // |
| 8 // This is a special implementation of storage.Storage, and does not fully | 8 // This is a special implementation of storage.Storage, and does not fully |
| 9 // conform to the API expecations. Namely: | 9 // conform to the API expecations. Namely: |
| 10 // - It is read-only. Mutation methods will return storage.ErrReadOnly. | 10 // - It is read-only. Mutation methods will return storage.ErrReadOnly. |
| 11 // - Storage methods ignore the supplied Path argument, instead opting for | 11 // - Storage methods ignore the supplied Path argument, instead opting for |
| 12 // the archive configured in its Options. | 12 // the archive configured in its Options. |
| 13 package archive | 13 package archive |
| 14 | 14 |
| 15 import ( | 15 import ( |
| 16 "bytes" | 16 "bytes" |
| 17 "fmt" | 17 "fmt" |
| 18 "io" | 18 "io" |
| 19 "io/ioutil" | 19 "io/ioutil" |
| 20 "sort" | 20 "sort" |
| 21 "sync" | 21 "sync" |
| 22 | 22 |
| 23 "golang.org/x/net/context" | |
| 24 | |
| 25 "github.com/golang/protobuf/proto" | |
| 26 "github.com/luci/luci-go/common/config" | 23 "github.com/luci/luci-go/common/config" |
| 27 "github.com/luci/luci-go/common/data/recordio" | 24 "github.com/luci/luci-go/common/data/recordio" |
| 25 "github.com/luci/luci-go/common/errors" | |
| 28 "github.com/luci/luci-go/common/gcloud/gs" | 26 "github.com/luci/luci-go/common/gcloud/gs" |
| 29 "github.com/luci/luci-go/common/iotools" | 27 "github.com/luci/luci-go/common/iotools" |
| 30 log "github.com/luci/luci-go/common/logging" | 28 log "github.com/luci/luci-go/common/logging" |
| 31 "github.com/luci/luci-go/logdog/api/logpb" | 29 "github.com/luci/luci-go/logdog/api/logpb" |
| 32 "github.com/luci/luci-go/logdog/common/storage" | 30 "github.com/luci/luci-go/logdog/common/storage" |
| 33 "github.com/luci/luci-go/logdog/common/types" | 31 "github.com/luci/luci-go/logdog/common/types" |
| 32 | |
| 33 cloudStorage "cloud.google.com/go/storage" | |
| 34 "github.com/golang/protobuf/proto" | |
| 35 "golang.org/x/net/context" | |
| 34 ) | 36 ) |
| 35 | 37 |
| 36 const ( | 38 const ( |
| 37 // maxStreamRecordSize is the maximum record size we're willing to read from | 39 // maxStreamRecordSize is the maximum record size we're willing to read from |
| 38 // our archived log stream. This will help prevent out-of-memory errors if the | 40 // our archived log stream. This will help prevent out-of-memory errors if the |
| 39 // arhived log stream is malicious or corrupt. | 41 // arhived log stream is malicious or corrupt. |
| 40 » maxStreamRecordSize = 16 * 1024 * 1024 | 42 » // |
| 43 » // 16MB is larger than the maximum log entry size | |
| 44 » maxStreamRecordSize = 2 * types.MaxLogEntryDataSize | |
| 41 ) | 45 ) |
| 42 | 46 |
| 43 // Options is the set of configuration options for this Storage instance. | 47 // Options is the set of configuration options for this Storage instance. |
| 44 // | 48 // |
| 45 // Unlike other Storage instances, this is bound to a single archived stream. | 49 // Unlike other Storage instances, this is bound to a single archived stream. |
| 46 // Project and Path parameters in requests will be ignored in favor of the | 50 // Project and Path parameters in requests will be ignored in favor of the |
| 47 // Google Storage URLs. | 51 // Google Storage URLs. |
| 48 type Options struct { | 52 type Options struct { |
| 49 // IndexURL is the Google Storage URL for the stream's index. | 53 // IndexURL is the Google Storage URL for the stream's index. |
| 50 IndexURL string | 54 IndexURL string |
| 51 // StreamURL is the Google Storage URL for the stream's entries. | 55 // StreamURL is the Google Storage URL for the stream's entries. |
| 52 StreamURL string | 56 StreamURL string |
| 53 | 57 |
| 54 // Client is the HTTP client to use for authentication. | 58 // Client is the HTTP client to use for authentication. |
| 55 // | 59 // |
| 56 // Closing this Storage instance does not close the underlying Client. | 60 // Closing this Storage instance does not close the underlying Client. |
| 57 Client gs.Client | 61 Client gs.Client |
| 58 | |
| 59 // MaxBytes, if >0, is the maximum number of bytes to fetch in any given | |
| 60 // request. This should be set for GAE fetches, as large log streams may | |
| 61 // exceed the urlfetch system's maximum response size otherwise. | |
| 62 // | |
| 63 // This is the number of bytes to request, not the number of bytes of lo g data | |
| 64 // to return. The difference is that the former includes the RecordIO fr ame | |
| 65 // headers. | |
| 66 MaxBytes int | |
| 67 } | 62 } |
| 68 | 63 |
| 69 type storageImpl struct { | 64 type storageImpl struct { |
| 70 *Options | 65 *Options |
| 71 context.Context | 66 context.Context |
| 72 | 67 |
| 73 streamPath gs.Path | 68 streamPath gs.Path |
| 74 indexPath gs.Path | 69 indexPath gs.Path |
| 75 | 70 |
| 76 » indexMu sync.Mutex | 71 » indexMu sync.Mutex |
| 77 » index *logpb.LogIndex | 72 » index *logpb.LogIndex |
| 78 » closeClient bool | |
| 79 } | 73 } |
| 80 | 74 |
| 81 // New instantiates a new Storage instance, bound to the supplied Options. | 75 // New instantiates a new Storage instance, bound to the supplied Options. |
| 82 func New(ctx context.Context, o Options) (storage.Storage, error) { | 76 func New(ctx context.Context, o Options) (storage.Storage, error) { |
| 83 s := storageImpl{ | 77 s := storageImpl{ |
| 84 Options: &o, | 78 Options: &o, |
| 85 Context: ctx, | 79 Context: ctx, |
| 86 | 80 |
| 87 streamPath: gs.Path(o.StreamURL), | 81 streamPath: gs.Path(o.StreamURL), |
| 88 indexPath: gs.Path(o.IndexURL), | 82 indexPath: gs.Path(o.IndexURL), |
| 89 } | 83 } |
| 90 | 84 |
| 91 if !s.streamPath.IsFullPath() { | 85 if !s.streamPath.IsFullPath() { |
| 92 return nil, fmt.Errorf("invalid stream URL: %q", s.streamPath) | 86 return nil, fmt.Errorf("invalid stream URL: %q", s.streamPath) |
| 93 } | 87 } |
| 94 » if !s.indexPath.IsFullPath() { | 88 » if s.indexPath != "" && !s.indexPath.IsFullPath() { |
| 95 return nil, fmt.Errorf("invalid index URL: %v", s.indexPath) | 89 return nil, fmt.Errorf("invalid index URL: %v", s.indexPath) |
| 96 } | 90 } |
| 97 | 91 |
| 98 return &s, nil | 92 return &s, nil |
| 99 } | 93 } |
| 100 | 94 |
| 101 func (s *storageImpl) Close() { | 95 func (s *storageImpl) Close() {} |
| 102 » if s.closeClient { | |
| 103 » » _ = s.Client.Close() | |
| 104 » } | |
| 105 } | |
| 106 | 96 |
| 107 func (s *storageImpl) Config(storage.Config) error { return storage.ErrReadOnly } | 97 func (s *storageImpl) Config(storage.Config) error { return storage.ErrReadOnly } |
| 108 func (s *storageImpl) Put(storage.PutRequest) error { return storage.ErrReadOnly } | 98 func (s *storageImpl) Put(storage.PutRequest) error { return storage.ErrReadOnly } |
| 109 | 99 |
| 110 func (s *storageImpl) Get(req storage.GetRequest, cb storage.GetCallback) error { | 100 func (s *storageImpl) Get(req storage.GetRequest, cb storage.GetCallback) error { |
| 111 idx, err := s.getIndex() | 101 idx, err := s.getIndex() |
| 112 if err != nil { | 102 if err != nil { |
| 113 return err | 103 return err |
| 114 } | 104 } |
| 115 | 105 |
| 116 // Identify the byte offsets that we want to fetch from the entries stre am. | 106 // Identify the byte offsets that we want to fetch from the entries stre am. |
| 117 » st := s.buildGetStrategy(&req, idx) | 107 » st := buildGetStrategy(&req, idx) |
| 118 » if st.lastIndex >= 0 && req.Index > st.lastIndex { | 108 » if st == nil { |
| 119 » » // We know the last index, and the user requested logs past it, so there are | 109 » » // No more records to read. |
| 120 » » // no records to read. | |
| 121 return nil | 110 return nil |
| 122 } | 111 } |
| 123 | 112 |
| 124 » offset := int64(st.startOffset) | 113 » switch err := s.getLogEntriesIter(st, cb); errors.Unwrap(err) { |
| 114 » case nil, io.EOF: | |
| 115 » » // We hit the end of our log stream. | |
| 116 » » return nil | |
| 117 | |
| 118 » case cloudStorage.ErrObjectNotExist, cloudStorage.ErrBucketNotExist: | |
| 119 » » return storage.ErrDoesNotExist | |
| 120 | |
| 121 » default: | |
| 122 » » return errors.Annotate(err).Reason("failed to read log stream"). Err() | |
| 123 » } | |
| 124 } | |
| 125 | |
| 126 // getLogEntriesImpl retrieves log entries from archive until complete. | |
| 127 func (s *storageImpl) getLogEntriesIter(st *getStrategy, cb storage.GetCallback) error { | |
| 128 » // Get our maximum byte limit. If we are externally constrained via MaxB ytes, | |
| 129 » // apply that limit too. | |
| 130 » // Get an archive reader. | |
| 131 » var ( | |
| 132 » » offset = st.startOffset | |
| 133 » » length = st.length() | |
| 134 » ) | |
| 135 | |
| 125 log.Fields{ | 136 log.Fields{ |
| 126 "offset": offset, | 137 "offset": offset, |
| 127 » » "length": st.length(), | 138 » » "length": length, |
| 128 "path": s.streamPath, | 139 "path": s.streamPath, |
| 129 }.Debugf(s, "Creating stream reader for range.") | 140 }.Debugf(s, "Creating stream reader for range.") |
| 130 » r, err := s.Client.NewReader(s.streamPath, offset, st.length()) | 141 » storageReader, err := s.Client.NewReader(s.streamPath, int64(offset), le ngth) |
| 131 if err != nil { | 142 if err != nil { |
| 132 log.WithError(err).Errorf(s, "Failed to create stream Reader.") | 143 log.WithError(err).Errorf(s, "Failed to create stream Reader.") |
| 133 » » return err | 144 » » return errors.Annotate(err).Reason("failed to create stream Read er").Err() |
| 134 } | 145 } |
| 135 defer func() { | 146 defer func() { |
| 136 » » if err := r.Close(); err != nil { | 147 » » if tmpErr := storageReader.Close(); tmpErr != nil { |
| 137 » » » log.WithError(err).Warningf(s, "Error closing stream Rea der.") | 148 » » » // (Non-fatal) |
| 149 » » » log.WithError(tmpErr).Warningf(s, "Error closing stream Reader.") | |
| 138 } | 150 } |
| 139 }() | 151 }() |
| 140 cr := iotools.CountingReader{Reader: r} | |
| 141 rio := recordio.NewReader(&cr, maxStreamRecordSize) | |
| 142 | 152 |
| 143 » buf := bytes.Buffer{} | 153 » // Count how many bytes we've read. |
| 144 » le := logpb.LogEntry{} | 154 » cr := iotools.CountingReader{Reader: storageReader} |
| 145 » max := st.count | 155 |
| 156 » // Iteratively update our strategy's start offset each time we read a co mplete | |
| 157 » // frame. | |
| 158 » var ( | |
| 159 » » rio = recordio.NewReader(&cr, maxStreamRecordSize) | |
| 160 » » buf bytes.Buffer | |
| 161 » » remaining = st.count | |
| 162 » ) | |
| 146 for { | 163 for { |
| 147 » » offset += cr.Count() | 164 » » // Reset the count so we know how much we read for this frame. |
| 165 » » cr.Count = 0 | |
| 148 | 166 |
| 149 sz, r, err := rio.ReadFrame() | 167 sz, r, err := rio.ReadFrame() |
| 150 » » switch err { | 168 » » if err != nil { |
| 151 » » case nil: | 169 » » » return errors.Annotate(err).Reason("failed to read frame ").Err() |
| 152 » » » break | |
| 153 | |
| 154 » » case io.EOF: | |
| 155 » » » return nil | |
| 156 | |
| 157 » » default: | |
| 158 » » » log.Fields{ | |
| 159 » » » » log.ErrorKey: err, | |
| 160 » » » » "index": idx, | |
| 161 » » » » "offset": offset, | |
| 162 » » » }.Errorf(s, "Failed to read next frame.") | |
| 163 » » » return err | |
| 164 } | 170 } |
| 165 | 171 |
| 166 buf.Reset() | 172 buf.Reset() |
| 167 buf.Grow(int(sz)) | 173 buf.Grow(int(sz)) |
| 168 » » if _, err := buf.ReadFrom(r); err != nil { | 174 |
| 175 » » switch amt, err := buf.ReadFrom(r); { | |
| 176 » » case err != nil: | |
| 169 log.Fields{ | 177 log.Fields{ |
| 170 » » » » log.ErrorKey: err, | 178 » » » » log.ErrorKey: err, |
| 171 » » » » "offset": offset, | 179 » » » » "frameOffset": offset, |
| 172 » » » » "frameSize": sz, | 180 » » » » "frameSize": sz, |
| 173 }.Errorf(s, "Failed to read frame data.") | 181 }.Errorf(s, "Failed to read frame data.") |
| 174 » » » return err | 182 » » » return errors.Annotate(err).Reason("failed to read frame data").Err() |
| 183 | |
| 184 » » case amt != sz: | |
| 185 » » » // If we didn't buffer the complete frame, we hit a prem ature EOF. | |
| 186 » » » return errors.Annotate(io.EOF).Reason("incomplete frame read").Err() | |
| 175 } | 187 } |
| 176 | 188 |
| 177 » » if err := proto.Unmarshal(buf.Bytes(), &le); err != nil { | 189 » » // If we read from offset 0, the first frame will be the log str eam's |
| 178 » » » log.Fields{ | 190 » » // descriptor, which we can discard. |
| 179 » » » » log.ErrorKey: err, | 191 » » discardFrame := (offset == 0) |
| 180 » » » » "offset": offset, | 192 » » offset += uint64(cr.Count) |
| 181 » » » » "frameSize": sz, | 193 » » if discardFrame { |
| 182 » » » }.Errorf(s, "Failed to unmarshal log data.") | 194 » » » continue |
| 183 » » » return err | |
| 184 } | 195 } |
| 185 | 196 |
| 186 » » idx := types.MessageIndex(le.StreamIndex) | 197 » » // Punt this log entry to our callback, if appropriate. |
| 187 » » if idx < req.Index { | 198 » » entry := storage.MakeEntry(buf.Bytes(), -1) |
| 199 » » switch idx, err := entry.GetStreamIndex(); { | |
| 200 » » case err != nil: | |
| 201 » » » log.Fields{ | |
| 202 » » » » log.ErrorKey: err, | |
| 203 » » » » "frameOffset": offset, | |
| 204 » » » » "frameSize": sz, | |
| 205 » » » }.Errorf(s, "Failed to get log entry index.") | |
| 206 » » » return errors.Annotate(err).Reason("failed to get log en try index").Err() | |
| 207 | |
| 208 » » case idx < st.startIndex: | |
| 188 // Skip this entry, as it's before the first requested e ntry. | 209 // Skip this entry, as it's before the first requested e ntry. |
| 189 continue | 210 continue |
| 190 } | 211 } |
| 191 | 212 |
| 192 » » d := make([]byte, buf.Len()) | 213 » » // We want to punt this entry, but we also want to re-use our Bu ffer. Clone |
| 193 » » copy(d, buf.Bytes()) | 214 » » // its data so it is independent. |
| 194 » » if !cb(idx, d) { | 215 » » entry.D = make([]byte, len(entry.D)) |
| 195 » » » break | 216 » » copy(entry.D, buf.Bytes()) |
| 217 » » if !cb(entry) { | |
| 218 » » » return nil | |
| 196 } | 219 } |
| 197 | 220 |
| 198 // Enforce our limit, if one is supplied. | 221 // Enforce our limit, if one is supplied. |
| 199 » » if max > 0 { | 222 » » if remaining > 0 { |
| 200 » » » max-- | 223 » » » remaining-- |
| 201 » » » if max == 0 { | 224 » » » if remaining == 0 { |
| 202 » » » » break | 225 » » » » return nil |
| 203 } | 226 } |
| 204 } | 227 } |
| 205 } | 228 } |
| 206 return nil | |
| 207 } | 229 } |
| 208 | 230 |
| 209 func (s *storageImpl) Tail(project config.ProjectName, path types.StreamPath) ([ ]byte, types.MessageIndex, error) { | 231 func (s *storageImpl) Tail(project config.ProjectName, path types.StreamPath) (* storage.Entry, error) { |
| 210 idx, err := s.getIndex() | 232 idx, err := s.getIndex() |
| 211 if err != nil { | 233 if err != nil { |
| 212 » » return nil, 0, err | 234 » » return nil, err |
| 213 } | 235 } |
| 214 | 236 |
| 215 » // Get the offset of the last record. | 237 » // Get the offset that is as close to our tail record as possible. If we know |
| 216 » if len(idx.Entries) == 0 { | 238 » // what that index is (from "idx"), we can request it directly. Otherwis e, we |
| 217 » » return nil, 0, nil | 239 » // will get as close as possible and read forwards from there. |
| 218 » } | 240 » req := storage.GetRequest{} |
| 219 » lle := idx.Entries[len(idx.Entries)-1] | 241 » switch { |
| 242 » case idx.LastStreamIndex > 0: | |
| 243 » » req.Index = types.MessageIndex(idx.LastStreamIndex) | |
| 244 » » req.Limit = 1 | |
| 220 | 245 |
| 221 » // Get a Reader for the Tail entry. | 246 » case len(idx.Entries) > 0: |
| 222 » r, err := s.Client.NewReader(s.streamPath, int64(lle.Offset), -1) | 247 » » req.Index = types.MessageIndex(idx.Entries[len(idx.Entries)-1].S treamIndex) |
| 223 » if err != nil { | |
| 224 » » log.Fields{ | |
| 225 » » » log.ErrorKey: err, | |
| 226 » » » "offset": lle.Offset, | |
| 227 » » }.Errorf(s, "Failed to create reader.") | |
| 228 » » return nil, 0, err | |
| 229 » } | |
| 230 » defer func() { | |
| 231 » » if err := r.Close(); err != nil { | |
| 232 » » » log.WithError(err).Warningf(s, "Failed to close Reader." ) | |
| 233 » » } | |
| 234 » }() | |
| 235 | |
| 236 » rio := recordio.NewReader(r, maxStreamRecordSize) | |
| 237 » d, err := rio.ReadFrameAll() | |
| 238 » if err != nil { | |
| 239 » » log.WithError(err).Errorf(s, "Failed to read log frame.") | |
| 240 » » return nil, 0, err | |
| 241 } | 248 } |
| 242 | 249 |
| 243 » return d, types.MessageIndex(lle.StreamIndex), nil | 250 » // Build a Get strategy for our closest-to-Tail index. |
| 251 » st := buildGetStrategy(&req, idx) | |
| 252 » if st == nil { | |
| 253 » » return nil, storage.ErrDoesNotExist | |
| 254 » } | |
| 255 | |
| 256 » // Read forwards to EOF. Retain the last entry that we read. | |
| 257 » var lastEntry *storage.Entry | |
| 258 » err = s.Get(req, func(e *storage.Entry) bool { | |
| 259 » » lastEntry = e | |
| 260 | |
| 261 » » // We can stop if we have the last stream index and this is that index. | |
| 262 » » if idx.LastStreamIndex > 0 { | |
| 263 » » » // Get the index for this entry. | |
| 264 » » » // | |
| 265 » » » // We can ignore this error, since "Get" will have alrea dy resolved the | |
| 266 » » » // index successfully. | |
| 267 » » » if sidx, _ := e.GetStreamIndex(); sidx == types.MessageI ndex(idx.LastStreamIndex) { | |
| 268 » » » » return false | |
| 269 » » » } | |
| 270 » » } | |
| 271 » » return true | |
| 272 » }) | |
| 273 » switch { | |
| 274 » case err != nil: | |
| 275 » » return nil, err | |
| 276 | |
| 277 » case lastEntry == nil: | |
| 278 » » return nil, storage.ErrDoesNotExist | |
| 279 | |
| 280 » default: | |
| 281 » » return lastEntry, nil | |
| 282 » } | |
| 244 } | 283 } |
| 245 | 284 |
| 246 // getIndex returns the cached log stream index, fetching it if necessary. | 285 // getIndex returns the cached log stream index, fetching it if necessary. |
| 247 func (s *storageImpl) getIndex() (*logpb.LogIndex, error) { | 286 func (s *storageImpl) getIndex() (*logpb.LogIndex, error) { |
| 248 s.indexMu.Lock() | 287 s.indexMu.Lock() |
| 249 defer s.indexMu.Unlock() | 288 defer s.indexMu.Unlock() |
| 250 | 289 |
| 251 if s.index == nil { | 290 if s.index == nil { |
| 252 » » r, err := s.Client.NewReader(s.indexPath, 0, -1) | 291 » » index, err := loadIndex(s, s.Client, s.indexPath) |
| 253 » » if err != nil { | 292 » » switch errors.Unwrap(err) { |
| 254 » » » log.WithError(err).Errorf(s, "Failed to create index Rea der.") | 293 » » case nil: |
| 255 » » » return nil, err | 294 » » » break |
| 256 » » } | 295 |
| 257 » » defer func() { | 296 » » case cloudStorage.ErrBucketNotExist, cloudStorage.ErrObjectNotEx ist: |
| 258 » » » if err := r.Close(); err != nil { | 297 » » » // Treat a missing index the same as an empty index. |
| 259 » » » » log.WithError(err).Warningf(s, "Error closing in dex Reader.") | 298 » » » log.WithError(err).Warningf(s, "Index is invalid, using empty index.") |
| 260 » » » } | 299 » » » index = &logpb.LogIndex{} |
| 261 » » }() | 300 |
| 262 » » indexData, err := ioutil.ReadAll(r) | 301 » » default: |
| 263 » » if err != nil { | |
| 264 » » » log.WithError(err).Errorf(s, "Failed to read index.") | |
| 265 return nil, err | 302 return nil, err |
| 266 } | 303 } |
| 267 | 304 |
| 268 » » index := logpb.LogIndex{} | 305 » » s.index = index |
| 269 » » if err := proto.Unmarshal(indexData, &index); err != nil { | |
| 270 » » » log.WithError(err).Errorf(s, "Failed to unmarshal index. ") | |
| 271 » » » return nil, err | |
| 272 » » } | |
| 273 | |
| 274 » » s.index = &index | |
| 275 } | 306 } |
| 276 return s.index, nil | 307 return s.index, nil |
| 277 } | 308 } |
| 278 | 309 |
| 310 func loadIndex(c context.Context, client gs.Client, path gs.Path) (*logpb.LogInd ex, error) { | |
| 311 // If there is no path, then return an empty index. | |
| 312 if path == "" { | |
| 313 log.Infof(c, "No index path, using empty index.") | |
| 314 return &logpb.LogIndex{}, nil | |
| 315 } | |
| 316 | |
| 317 r, err := client.NewReader(path, 0, -1) | |
| 318 if err != nil { | |
| 319 log.WithError(err).Errorf(c, "Failed to create index Reader.") | |
| 320 return nil, errors.Annotate(err).Reason("failed to create index Reader").Err() | |
| 321 } | |
| 322 defer func() { | |
| 323 if err := r.Close(); err != nil { | |
| 324 log.WithError(err).Warningf(c, "Error closing index Read er.") | |
| 325 } | |
| 326 }() | |
| 327 indexData, err := ioutil.ReadAll(r) | |
| 328 if err != nil { | |
| 329 log.WithError(err).Errorf(c, "Failed to read index.") | |
| 330 return nil, errors.Annotate(err).Reason("failed to read index"). Err() | |
| 331 } | |
| 332 | |
| 333 index := logpb.LogIndex{} | |
| 334 if err := proto.Unmarshal(indexData, &index); err != nil { | |
| 335 log.WithError(err).Errorf(c, "Failed to unmarshal index.") | |
| 336 return nil, errors.Annotate(err).Reason("failed to unmarshal ind ex").Err() | |
| 337 } | |
| 338 | |
| 339 return &index, nil | |
| 340 } | |
| 341 | |
| 279 type getStrategy struct { | 342 type getStrategy struct { |
| 280 » // startOffset is the beginning byte offset of the log entry stream. | 343 » // startIndex is desired initial log entry index. |
| 344 » startIndex types.MessageIndex | |
| 345 | |
| 346 » // startOffset is the beginning byte offset of the log entry stream. Thi s may | |
| 347 » // be lower than the offset of the starting record if the index is spars e. | |
| 281 startOffset uint64 | 348 startOffset uint64 |
| 282 » // endOffset is the ending byte offset of the log entry stream. | 349 » // endOffset is the ending byte offset of the log entry stream. This wil l be |
| 350 » // 0 if an end offset is not known. | |
| 283 endOffset uint64 | 351 endOffset uint64 |
| 284 | 352 |
| 285 » // count is the number of log entries that will be fetched. | 353 » // count is the number of log entries that will be fetched. If 0, no upp er |
| 286 » count int | 354 » // bound was calculated. |
| 287 » // lastIndex is the last log entry index in the stream. This will be -1 if | 355 » count uint64 |
| 288 » // there are no entries in the stream. | |
| 289 » lastIndex types.MessageIndex | |
| 290 } | 356 } |
| 291 | 357 |
| 292 func (gs *getStrategy) length() int64 { | 358 func (gs *getStrategy) length() int64 { |
| 293 if gs.startOffset < gs.endOffset { | 359 if gs.startOffset < gs.endOffset { |
| 294 return int64(gs.endOffset - gs.startOffset) | 360 return int64(gs.endOffset - gs.startOffset) |
| 295 } | 361 } |
| 296 return -1 | 362 return -1 |
| 297 } | 363 } |
| 298 | 364 |
| 299 // setCount sets the `count` field. If called multiple times, the smallest | 365 // setCount sets the `count` field. If called multiple times, the smallest |
| 300 // assigned value will be retained. | 366 // assigned value will be retained. |
| 301 func (gs *getStrategy) setCount(v int) { | 367 func (gs *getStrategy) setCount(v uint64) { |
| 302 » if gs.count <= 0 || gs.count > v { | 368 » if gs.count == 0 || gs.count > v { |
| 303 gs.count = v | 369 gs.count = v |
| 304 } | 370 } |
| 305 } | 371 } |
| 306 | 372 |
| 307 // setEndOffset sets the `length` field. If called multiple times, the smallest | 373 func buildGetStrategy(req *storage.GetRequest, idx *logpb.LogIndex) *getStrategy { |
| 308 // assigned value will be retained. | |
| 309 func (gs *getStrategy) setEndOffset(v uint64) { | |
| 310 » if gs.endOffset == 0 || gs.endOffset > v { | |
| 311 » » gs.endOffset = v | |
| 312 » } | |
| 313 } | |
| 314 | |
| 315 func (s *storageImpl) buildGetStrategy(req *storage.GetRequest, idx *logpb.LogIn dex) *getStrategy { | |
| 316 st := getStrategy{ | 374 st := getStrategy{ |
| 317 » » lastIndex: -1, | 375 » » startIndex: req.Index, |
| 318 } | 376 } |
| 319 | 377 |
| 320 » if len(idx.Entries) == 0 { | 378 » // If the user has requested an index past the end of the stream, return no |
| 321 » » return &st | 379 » // entries (count == 0). This only works if the last stream index is kno wn. |
| 380 » if idx.LastStreamIndex > 0 && req.Index > types.MessageIndex(idx.LastStr eamIndex) { | |
| 381 » » return nil | |
| 322 } | 382 } |
| 323 | 383 |
| 324 » // If we have a log entry count, mark the last log index. | 384 » // Identify the closest index entry to the requested log. |
| 325 » if idx.LogEntryCount > 0 { | 385 » // |
| 326 » » st.lastIndex = types.MessageIndex(idx.Entries[len(idx.Entries)-1 ].StreamIndex) | 386 » // If the requested log starts before the first index entry, we must rea d from |
| 387 » // record #0. | |
| 388 » startIndexEntry := indexEntryFor(idx.Entries, req.Index) | |
| 389 » if startIndexEntry >= 0 { | |
| 390 » » st.startOffset = idx.Entries[startIndexEntry].Offset | |
| 327 } | 391 } |
| 328 | 392 |
| 329 startIdx := indexEntryFor(idx.Entries, req.Index) | |
| 330 if startIdx < 0 { | |
| 331 startIdx = 0 | |
| 332 } | |
| 333 le := idx.Entries[startIdx] | |
| 334 st.startOffset = le.Offset | |
| 335 | |
| 336 // Determine an upper bound based on our limits. | 393 // Determine an upper bound based on our limits. |
| 337 // | 394 // |
| 338 » // If we have a count limit, and we have enough index entries to upper-b ound | 395 » // If we have a count limit, identify the maximum entry that can be load ed, |
| 339 » // our stream based on that limit, use that. Note that this may overshoo t if | 396 » // find the index entry closest to it, and use that to determine our upp er |
| 340 » // the index and/or stream is sparse. We know for sure that we have one | 397 » // bound. |
| 341 » // LogEntry per index entry, so that's the best we can do. | |
| 342 if req.Limit > 0 { | 398 if req.Limit > 0 { |
| 343 » » if ub := startIdx + req.Limit; ub < len(idx.Entries) { | 399 » » st.setCount(uint64(req.Limit)) |
| 344 » » » st.setEndOffset(idx.Entries[ub].Offset) | 400 |
| 401 » » // Find the index entry for the stream entry AFTER the last one we are going | |
| 402 » » // to return. | |
| 403 » » entryAfterGetBlock := req.Index + types.MessageIndex(req.Limit) | |
| 404 » » endIndexEntry := indexEntryFor(idx.Entries, entryAfterGetBlock) | |
| 405 » » switch { | |
| 406 » » case endIndexEntry < 0: | |
| 407 » » » // The last possible request log entry is before the fir st index entry. | |
| 408 » » » // Read up to the first index entry. | |
| 409 » » » endIndexEntry = 0 | |
| 410 | |
| 411 » » case endIndexEntry <= startIndexEntry: | |
| 412 » » » // The last possible request log entry is closest to the start index | |
| 413 » » » // entry. Use the index entry immediately after it. | |
| 414 » » » endIndexEntry = startIndexEntry + 1 | |
| 415 | |
| 416 » » default: | |
| 417 » » » // We have the index entry <= the stream entry after the last one that we | |
| 418 » » » // will return. | |
| 419 » » » // | |
| 420 » » » // If we're sparse, this could be the index at or before our last entry. | |
| 421 » » » // If this is the case, use the next index entry, which will be after | |
| 422 » » » // "entryAfterGetBlock" (EAGB). | |
| 423 » » » // | |
| 424 » » » // START ------ LIMIT (LIMIT+1) | |
| 425 » » » // | [IDX] | [IDX] | |
| 426 » » » // index | entryAfterGetBlock | | |
| 427 » » » // endIndexEntry (endIndexEntry+1) | |
| 428 » » » if types.MessageIndex(idx.Entries[endIndexEntry].StreamI ndex) < entryAfterGetBlock { | |
| 429 » » » » endIndexEntry++ | |
| 430 » » » } | |
| 345 } | 431 } |
| 346 » » st.setCount(req.Limit) | 432 |
| 433 » » // If we're pointing to a valid index entry, set our upper bound . | |
| 434 » » if endIndexEntry < len(idx.Entries) { | |
| 435 » » » st.endOffset = idx.Entries[endIndexEntry].Offset | |
| 436 » » } | |
| 347 } | 437 } |
| 348 | 438 |
| 349 // If we have a byte limit, count the entry sizes until we reach that li mit. | |
| 350 if mb := int64(s.MaxBytes); mb > 0 { | |
| 351 mb := uint64(mb) | |
| 352 | |
| 353 for i, e := range idx.Entries[startIdx:] { | |
| 354 if e.Offset < st.startOffset { | |
| 355 // This shouldn't really happen, but it could ha ppen if there is a | |
| 356 // corrupt index. | |
| 357 continue | |
| 358 } | |
| 359 | |
| 360 // Calculate the request offset and truncate if we've ex ceeded our maximum | |
| 361 // request bytes. | |
| 362 if size := (e.Offset - st.startOffset); size > mb { | |
| 363 st.setEndOffset(e.Offset) | |
| 364 st.setCount(i) | |
| 365 break | |
| 366 } | |
| 367 } | |
| 368 } | |
| 369 return &st | 439 return &st |
| 370 } | 440 } |
| 371 | 441 |
| 372 // indexEntryFor identifies the log index entry closest (<=) to the specified | 442 // indexEntryFor identifies the log index entry closest (<=) to the specified |
| 373 // index. | 443 // index. |
| 374 // | 444 // |
| 375 // If the first index entry is greater than our search index, -1 will be | 445 // If the first index entry is greater than our search index, -1 will be |
| 376 // returned. This should never happen in practice, though, since our index | 446 // returned. This should never happen in practice, though, since our index |
| 377 // construction always indexes log entry #0. | 447 // construction always indexes log entry #0. |
| 378 // | 448 // |
| 379 // It does this by performing a binary search over the index entries. | 449 // It does this by performing a binary search over the index entries. |
| 380 func indexEntryFor(entries []*logpb.LogIndex_Entry, i types.MessageIndex) int { | 450 func indexEntryFor(entries []*logpb.LogIndex_Entry, i types.MessageIndex) int { |
| 381 ui := uint64(i) | 451 ui := uint64(i) |
| 382 s := sort.Search(len(entries), func(i int) bool { | 452 s := sort.Search(len(entries), func(i int) bool { |
| 383 return entries[i].StreamIndex > ui | 453 return entries[i].StreamIndex > ui |
| 384 }) | 454 }) |
| 385 | 455 |
| 386 // The returned index is the one immediately after the index that we wan t. If | 456 // The returned index is the one immediately after the index that we wan t. If |
| 387 // our search returned 0, the first index entry is > our search entry, a nd we | 457 // our search returned 0, the first index entry is > our search entry, a nd we |
| 388 // will return nil. | 458 // will return nil. |
| 389 return s - 1 | 459 return s - 1 |
| 390 } | 460 } |
| OLD | NEW |