Index: server/logdog/storage/bigtable/storage.go |
diff --git a/server/logdog/storage/bigtable/storage.go b/server/logdog/storage/bigtable/storage.go |
index 997e169ca54d609aa1f7fb0ef12780a9de8a0df3..9c586255741a4cbe6495ed2f298592f1ead9f881 100644 |
--- a/server/logdog/storage/bigtable/storage.go |
+++ b/server/logdog/storage/bigtable/storage.go |
@@ -9,6 +9,7 @@ import ( |
"errors" |
"fmt" |
+ "github.com/luci/luci-go/common/config" |
"github.com/luci/luci-go/common/logdog/types" |
log "github.com/luci/luci-go/common/logging" |
"github.com/luci/luci-go/common/recordio" |
@@ -55,8 +56,8 @@ var ( |
// errRepeatRequest is a transition sentinel that instructs us to repeat |
// our query with KeysOnly set to true. |
// |
- // This can be deleted once BigTable rows without a count have been aged |
- // off. |
+ // TODO(dnj): This can be deleted once BigTable rows without a count have been |
+ // aged off. |
errRepeatRequest = errors.New("bigtable: repeat request") |
) |
@@ -171,7 +172,7 @@ func (s *btStorage) Put(r storage.PutRequest) error { |
if appended := rw.append(r.Values[0]); !appended { |
// We have failed to append our maximum BigTable row size. Flush any |
// currently-buffered row data and try again with an empty buffer. |
- count, err := rw.flush(s, s.raw, r.Index, r.Path) |
+ count, err := rw.flush(s, s.raw, r.Index, r.Project, r.Path) |
if err != nil { |
return err |
} |
@@ -191,15 +192,16 @@ func (s *btStorage) Put(r storage.PutRequest) error { |
} |
// Flush any buffered rows. |
- if _, err := rw.flush(s, s.raw, r.Index, r.Path); err != nil { |
+ if _, err := rw.flush(s, s.raw, r.Index, r.Project, r.Path); err != nil { |
return err |
} |
return nil |
} |
func (s *btStorage) Get(r storage.GetRequest, cb storage.GetCallback) error { |
- startKey := newRowKey(string(r.Path), int64(r.Index), 0) |
+ startKey := newRowKey(string(r.Project), string(r.Path), int64(r.Index), 0) |
ctx := log.SetFields(s, log.Fields{ |
+ "project": r.Project, |
"path": r.Path, |
"index": r.Index, |
"limit": r.Limit, |
@@ -237,7 +239,8 @@ func (s *btStorage) Get(r storage.GetRequest, cb storage.GetCallback) error { |
// inforamtion that we need, so repeat this Get request with KeysOnly |
// set to false. |
// |
- // NOTE: This logic can be removed once all uncounted rows are aged off. |
+ // TODO(dnj): This logic can be removed once all uncounted rows are aged |
+ // off. |
r.KeysOnly = false |
return errRepeatRequest |
} |
@@ -247,14 +250,15 @@ func (s *btStorage) Get(r storage.GetRequest, cb storage.GetCallback) error { |
// This is a non-keys-only query, but we are dealing with a legacy row. |
// Use the record count as the authority. |
// |
- // NOTE: This logic can be removed once all uncounted rows are aged off. |
+ // TODO(dnj): This logic can be removed once all uncounted rows are aged |
+ // off. |
rk.count = int64(len(records)) |
case rk.count == int64(len(records)): |
// This is the expected case, so we're set. |
// |
- // NOTE: Once uncounted rows are aged off, this is the only logic that |
- // we need, in the form of a sanity assertion. |
+ // TODO(dnj): Once uncounted rows are aged off, this is the only logic |
+ // that we need, in the form of a sanity assertion. |
break |
default: |
@@ -326,13 +330,14 @@ func (s *btStorage) Get(r storage.GetRequest, cb storage.GetCallback) error { |
} |
} |
-func (s *btStorage) Tail(p types.StreamPath) ([]byte, types.MessageIndex, error) { |
+func (s *btStorage) Tail(project config.ProjectName, path types.StreamPath) ([]byte, types.MessageIndex, error) { |
ctx := log.SetFields(s, log.Fields{ |
- "path": p, |
+ "project": project, |
+ "path": path, |
}) |
// Iterate through all log keys in the stream. Record the latest one. |
- rk := newRowKey(string(p), 0, 0) |
+ rk := newRowKey(string(project), string(path), 0, 0) |
var latest *rowKey |
err := s.raw.getLogData(ctx, rk, 0, true, func(rk *rowKey, data []byte) error { |
latest = rk |
@@ -412,7 +417,8 @@ func (w *rowWriter) append(d []byte) (appended bool) { |
return |
} |
-func (w *rowWriter) flush(ctx context.Context, raw btTable, index types.MessageIndex, path types.StreamPath) (int, error) { |
+func (w *rowWriter) flush(ctx context.Context, raw btTable, index types.MessageIndex, |
+ project config.ProjectName, path types.StreamPath) (int, error) { |
flushCount := w.count |
if flushCount == 0 { |
return 0, nil |
@@ -421,10 +427,11 @@ func (w *rowWriter) flush(ctx context.Context, raw btTable, index types.MessageI |
// Write the current set of buffered rows to the table. Index on the LAST |
// row index. |
lastIndex := int64(index) + int64(flushCount) - 1 |
- rk := newRowKey(string(path), lastIndex, int64(w.count)) |
+ rk := newRowKey(string(project), string(path), lastIndex, int64(w.count)) |
log.Fields{ |
"rowKey": rk, |
+ "project": project, |
"path": path, |
"index": index, |
"lastIndex": lastIndex, |