| Index: server/logdog/storage/bigtable/storage.go
|
| diff --git a/server/logdog/storage/bigtable/storage.go b/server/logdog/storage/bigtable/storage.go
|
| index 87ad1d909d34af62a9a90a362d6908051c2e1c76..997e169ca54d609aa1f7fb0ef12780a9de8a0df3 100644
|
| --- a/server/logdog/storage/bigtable/storage.go
|
| +++ b/server/logdog/storage/bigtable/storage.go
|
| @@ -47,9 +47,18 @@ const (
|
| tailRowMaxSize = 1024 * 1024 * 16
|
| )
|
|
|
| -// errStop is an internal sentinel error used to indicate "stop iteration"
|
| -// to btTable.getLogData iterator. It will
|
| -var errStop = errors.New("bigtable: stop iteration")
|
| +var (
|
| + // errStop is an internal sentinel error used to indicate "stop iteration"
|
| + // to btTable.getLogData iterator.
|
| + errStop = errors.New("bigtable: stop iteration")
|
| +
|
| + // errRepeatRequest is a transition sentinel that instructs us to repeat
|
| + // our query with KeysOnly set to true.
|
| + //
|
| + // This can be deleted once BigTable rows without a count have been aged
|
| + // off.
|
| + errRepeatRequest = errors.New("bigtable: repeat request")
|
| +)
|
|
|
| // Options is a set of configuration options for BigTable storage.
|
| type Options struct {
|
| @@ -189,43 +198,109 @@ func (s *btStorage) Put(r storage.PutRequest) error {
|
| }
|
|
|
| func (s *btStorage) Get(r storage.GetRequest, cb storage.GetCallback) error {
|
| - startKey := newRowKey(string(r.Path), int64(r.Index))
|
| + startKey := newRowKey(string(r.Path), int64(r.Index), 0)
|
| ctx := log.SetFields(s, log.Fields{
|
| "path": r.Path,
|
| "index": r.Index,
|
| + "limit": r.Limit,
|
| "startRowKey": startKey,
|
| + "keysOnly": r.KeysOnly,
|
| })
|
|
|
| + // If we issue a query and get back a legacy row, it will have no count
|
| + // associated with it. We will fast-exit
|
| +
|
| limit := r.Limit
|
| - err := s.raw.getLogData(ctx, startKey, r.Limit, false, func(rk *rowKey, data []byte) error {
|
| + err := s.raw.getLogData(ctx, startKey, r.Limit, r.KeysOnly, func(rk *rowKey, data []byte) error {
|
| // Does this key match our requested log stream? If not, we've moved past
|
| // this stream's records and must stop iteration.
|
| if !rk.sharesPathWith(startKey) {
|
| return errStop
|
| }
|
|
|
| - // We have a row. Split it into individual records.
|
| - records, err := recordio.Split(data)
|
| - if err != nil {
|
| + // Split our data into records. Leave the records slice nil if we're doing
|
| + // a keys-only get.
|
| + var records [][]byte
|
| + if !r.KeysOnly {
|
| + var err error
|
| + if records, err = recordio.Split(data); err != nil {
|
| + return storage.ErrBadData
|
| + }
|
| + }
|
| +
|
| + switch {
|
| + case r.KeysOnly:
|
| + // Keys only query, so count is the authority.
|
| + if rk.count == 0 {
|
| + // If it's zero, we are dealing with a legacy row, before we started
|
| + // keeping count. A keys-only query is insufficient to get the
|
| + // inforamtion that we need, so repeat this Get request with KeysOnly
|
| + // set to false.
|
| + //
|
| + // NOTE: This logic can be removed once all uncounted rows are aged off.
|
| + r.KeysOnly = false
|
| + return errRepeatRequest
|
| + }
|
| + break
|
| +
|
| + case rk.count == 0:
|
| + // This is a non-keys-only query, but we are dealing with a legacy row.
|
| + // Use the record count as the authority.
|
| + //
|
| + // NOTE: This logic can be removed once all uncounted rows are aged off.
|
| + rk.count = int64(len(records))
|
| +
|
| + case rk.count == int64(len(records)):
|
| + // This is the expected case, so we're set.
|
| + //
|
| + // NOTE: Once uncounted rows are aged off, this is the only logic that
|
| + // we need, in the form of a sanity assertion.
|
| + break
|
| +
|
| + default:
|
| + log.Fields{
|
| + "count": rk.count,
|
| + "recordCount": len(records),
|
| + }.Errorf(ctx, "Record count doesn't match declared count.")
|
| return storage.ErrBadData
|
| }
|
|
|
| // Issue our callback for each row. Since we index the row on the LAST entry
|
| // in the row, count backwards to get the index of the first entry.
|
| - firstIndex := types.MessageIndex(rk.index - int64(len(records)) + 1)
|
| - if firstIndex < 0 {
|
| + startIndex := rk.index - rk.count + 1
|
| + if startIndex < 0 {
|
| return storage.ErrBadData
|
| }
|
| - for i, row := range records {
|
| - index := firstIndex + types.MessageIndex(i)
|
| - if index < r.Index {
|
| - // An offset was specified, and this row is before it, so skip.
|
| - continue
|
| +
|
| + // If we are indexed somewhere within this entry's records, discard any
|
| + // records before our index.
|
| + if discard := int64(r.Index) - startIndex; discard > 0 {
|
| + if discard > int64(len(records)) {
|
| + // This should never happen unless there is corrupt or conflicting data.
|
| + return nil
|
| + }
|
| + startIndex += discard
|
| + records = records[discard:]
|
| + }
|
| +
|
| + log.Fields{
|
| + "rk": rk.encode(),
|
| + "rkIndex": rk.index,
|
| + "rkCount": rk.count,
|
| + "startIndex": startIndex,
|
| + }.Debugf(ctx, "Punting row key range [%d - %d]...", startIndex, rk.index)
|
| +
|
| + for index := startIndex; index <= rk.index; index++ {
|
| + // If we're not doing keys-only, consume the row.
|
| + var row []byte
|
| + if !r.KeysOnly {
|
| + row, records = records[0], records[1:]
|
| }
|
|
|
| - if !cb(index, row) {
|
| + if !cb(types.MessageIndex(index), row) {
|
| return errStop
|
| }
|
| + r.Index = types.MessageIndex(index + 1)
|
|
|
| // Artificially apply limit within our row records.
|
| if limit > 0 {
|
| @@ -242,6 +317,9 @@ func (s *btStorage) Get(r storage.GetRequest, cb storage.GetCallback) error {
|
| case nil, errStop:
|
| return nil
|
|
|
| + case errRepeatRequest:
|
| + return s.Get(r, cb)
|
| +
|
| default:
|
| log.WithError(err).Errorf(ctx, "Failed to retrieve row range.")
|
| return err
|
| @@ -254,7 +332,7 @@ func (s *btStorage) Tail(p types.StreamPath) ([]byte, types.MessageIndex, error)
|
| })
|
|
|
| // Iterate through all log keys in the stream. Record the latest one.
|
| - rk := newRowKey(string(p), 0)
|
| + rk := newRowKey(string(p), 0, 0)
|
| var latest *rowKey
|
| err := s.raw.getLogData(ctx, rk, 0, true, func(rk *rowKey, data []byte) error {
|
| latest = rk
|
| @@ -343,7 +421,7 @@ func (w *rowWriter) flush(ctx context.Context, raw btTable, index types.MessageI
|
| // Write the current set of buffered rows to the table. Index on the LAST
|
| // row index.
|
| lastIndex := int64(index) + int64(flushCount) - 1
|
| - rk := newRowKey(string(path), lastIndex)
|
| + rk := newRowKey(string(path), lastIndex, int64(w.count))
|
|
|
| log.Fields{
|
| "rowKey": rk,
|
|
|