Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(389)

Unified Diff: server/logdog/storage/bigtable/storage.go

Issue 1872903002: LogDog: Enable keys-only BigTable queries. (Closed) Base URL: https://github.com/luci/luci-go@logdog-archive-v2
Patch Set: Rebase Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: server/logdog/storage/bigtable/storage.go
diff --git a/server/logdog/storage/bigtable/storage.go b/server/logdog/storage/bigtable/storage.go
index 87ad1d909d34af62a9a90a362d6908051c2e1c76..997e169ca54d609aa1f7fb0ef12780a9de8a0df3 100644
--- a/server/logdog/storage/bigtable/storage.go
+++ b/server/logdog/storage/bigtable/storage.go
@@ -47,9 +47,18 @@ const (
tailRowMaxSize = 1024 * 1024 * 16
)
-// errStop is an internal sentinel error used to indicate "stop iteration"
-// to btTable.getLogData iterator. It will
-var errStop = errors.New("bigtable: stop iteration")
+var (
+ // errStop is an internal sentinel error used to indicate "stop iteration"
+ // to btTable.getLogData iterator.
+ errStop = errors.New("bigtable: stop iteration")
+
+ // errRepeatRequest is a transition sentinel that instructs us to repeat
+ // our query with KeysOnly set to true.
+ //
+ // This can be deleted once BigTable rows without a count have been aged
+ // off.
+ errRepeatRequest = errors.New("bigtable: repeat request")
+)
// Options is a set of configuration options for BigTable storage.
type Options struct {
@@ -189,43 +198,109 @@ func (s *btStorage) Put(r storage.PutRequest) error {
}
func (s *btStorage) Get(r storage.GetRequest, cb storage.GetCallback) error {
- startKey := newRowKey(string(r.Path), int64(r.Index))
+ startKey := newRowKey(string(r.Path), int64(r.Index), 0)
ctx := log.SetFields(s, log.Fields{
"path": r.Path,
"index": r.Index,
+ "limit": r.Limit,
"startRowKey": startKey,
+ "keysOnly": r.KeysOnly,
})
+ // If we issue a query and get back a legacy row, it will have no count
+ // associated with it. We will fast-exit
+
limit := r.Limit
- err := s.raw.getLogData(ctx, startKey, r.Limit, false, func(rk *rowKey, data []byte) error {
+ err := s.raw.getLogData(ctx, startKey, r.Limit, r.KeysOnly, func(rk *rowKey, data []byte) error {
// Does this key match our requested log stream? If not, we've moved past
// this stream's records and must stop iteration.
if !rk.sharesPathWith(startKey) {
return errStop
}
- // We have a row. Split it into individual records.
- records, err := recordio.Split(data)
- if err != nil {
+ // Split our data into records. Leave the records slice nil if we're doing
+ // a keys-only get.
+ var records [][]byte
+ if !r.KeysOnly {
+ var err error
+ if records, err = recordio.Split(data); err != nil {
+ return storage.ErrBadData
+ }
+ }
+
+ switch {
+ case r.KeysOnly:
+ // Keys only query, so count is the authority.
+ if rk.count == 0 {
+ // If it's zero, we are dealing with a legacy row, before we started
+ // keeping count. A keys-only query is insufficient to get the
+ // inforamtion that we need, so repeat this Get request with KeysOnly
+ // set to false.
+ //
+ // NOTE: This logic can be removed once all uncounted rows are aged off.
+ r.KeysOnly = false
+ return errRepeatRequest
+ }
+ break
+
+ case rk.count == 0:
+ // This is a non-keys-only query, but we are dealing with a legacy row.
+ // Use the record count as the authority.
+ //
+ // NOTE: This logic can be removed once all uncounted rows are aged off.
+ rk.count = int64(len(records))
+
+ case rk.count == int64(len(records)):
+ // This is the expected case, so we're set.
+ //
+ // NOTE: Once uncounted rows are aged off, this is the only logic that
+ // we need, in the form of a sanity assertion.
+ break
+
+ default:
+ log.Fields{
+ "count": rk.count,
+ "recordCount": len(records),
+ }.Errorf(ctx, "Record count doesn't match declared count.")
return storage.ErrBadData
}
// Issue our callback for each row. Since we index the row on the LAST entry
// in the row, count backwards to get the index of the first entry.
- firstIndex := types.MessageIndex(rk.index - int64(len(records)) + 1)
- if firstIndex < 0 {
+ startIndex := rk.index - rk.count + 1
+ if startIndex < 0 {
return storage.ErrBadData
}
- for i, row := range records {
- index := firstIndex + types.MessageIndex(i)
- if index < r.Index {
- // An offset was specified, and this row is before it, so skip.
- continue
+
+ // If we are indexed somewhere within this entry's records, discard any
+ // records before our index.
+ if discard := int64(r.Index) - startIndex; discard > 0 {
+ if discard > int64(len(records)) {
+ // This should never happen unless there is corrupt or conflicting data.
+ return nil
+ }
+ startIndex += discard
+ records = records[discard:]
+ }
+
+ log.Fields{
+ "rk": rk.encode(),
+ "rkIndex": rk.index,
+ "rkCount": rk.count,
+ "startIndex": startIndex,
+ }.Debugf(ctx, "Punting row key range [%d - %d]...", startIndex, rk.index)
+
+ for index := startIndex; index <= rk.index; index++ {
+ // If we're not doing keys-only, consume the row.
+ var row []byte
+ if !r.KeysOnly {
+ row, records = records[0], records[1:]
}
- if !cb(index, row) {
+ if !cb(types.MessageIndex(index), row) {
return errStop
}
+ r.Index = types.MessageIndex(index + 1)
// Artificially apply limit within our row records.
if limit > 0 {
@@ -242,6 +317,9 @@ func (s *btStorage) Get(r storage.GetRequest, cb storage.GetCallback) error {
case nil, errStop:
return nil
+ case errRepeatRequest:
+ return s.Get(r, cb)
+
default:
log.WithError(err).Errorf(ctx, "Failed to retrieve row range.")
return err
@@ -254,7 +332,7 @@ func (s *btStorage) Tail(p types.StreamPath) ([]byte, types.MessageIndex, error)
})
// Iterate through all log keys in the stream. Record the latest one.
- rk := newRowKey(string(p), 0)
+ rk := newRowKey(string(p), 0, 0)
var latest *rowKey
err := s.raw.getLogData(ctx, rk, 0, true, func(rk *rowKey, data []byte) error {
latest = rk
@@ -343,7 +421,7 @@ func (w *rowWriter) flush(ctx context.Context, raw btTable, index types.MessageI
// Write the current set of buffered rows to the table. Index on the LAST
// row index.
lastIndex := int64(index) + int64(flushCount) - 1
- rk := newRowKey(string(path), lastIndex)
+ rk := newRowKey(string(path), lastIndex, int64(w.count))
log.Fields{
"rowKey": rk,

Powered by Google App Engine
This is Rietveld 408576698