Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(72)

Unified Diff: server/logdog/storage/bigtable/storage.go

Issue 1909943003: LogDog: Add project support to Storage. (Closed) Base URL: https://github.com/luci/luci-go@logdog-project-coordinator-services
Patch Set: Rebase? Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « server/logdog/storage/bigtable/rowKey_test.go ('k') | server/logdog/storage/bigtable/storage_test.go » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: server/logdog/storage/bigtable/storage.go
diff --git a/server/logdog/storage/bigtable/storage.go b/server/logdog/storage/bigtable/storage.go
index 997e169ca54d609aa1f7fb0ef12780a9de8a0df3..9c586255741a4cbe6495ed2f298592f1ead9f881 100644
--- a/server/logdog/storage/bigtable/storage.go
+++ b/server/logdog/storage/bigtable/storage.go
@@ -9,6 +9,7 @@ import (
"errors"
"fmt"
+ "github.com/luci/luci-go/common/config"
"github.com/luci/luci-go/common/logdog/types"
log "github.com/luci/luci-go/common/logging"
"github.com/luci/luci-go/common/recordio"
@@ -55,8 +56,8 @@ var (
// errRepeatRequest is a transition sentinel that instructs us to repeat
// our query with KeysOnly set to true.
//
- // This can be deleted once BigTable rows without a count have been aged
- // off.
+ // TODO(dnj): This can be deleted once BigTable rows without a count have been
+ // aged off.
errRepeatRequest = errors.New("bigtable: repeat request")
)
@@ -171,7 +172,7 @@ func (s *btStorage) Put(r storage.PutRequest) error {
if appended := rw.append(r.Values[0]); !appended {
// We have failed to append our maximum BigTable row size. Flush any
// currently-buffered row data and try again with an empty buffer.
- count, err := rw.flush(s, s.raw, r.Index, r.Path)
+ count, err := rw.flush(s, s.raw, r.Index, r.Project, r.Path)
if err != nil {
return err
}
@@ -191,15 +192,16 @@ func (s *btStorage) Put(r storage.PutRequest) error {
}
// Flush any buffered rows.
- if _, err := rw.flush(s, s.raw, r.Index, r.Path); err != nil {
+ if _, err := rw.flush(s, s.raw, r.Index, r.Project, r.Path); err != nil {
return err
}
return nil
}
func (s *btStorage) Get(r storage.GetRequest, cb storage.GetCallback) error {
- startKey := newRowKey(string(r.Path), int64(r.Index), 0)
+ startKey := newRowKey(string(r.Project), string(r.Path), int64(r.Index), 0)
ctx := log.SetFields(s, log.Fields{
+ "project": r.Project,
"path": r.Path,
"index": r.Index,
"limit": r.Limit,
@@ -237,7 +239,8 @@ func (s *btStorage) Get(r storage.GetRequest, cb storage.GetCallback) error {
// inforamtion that we need, so repeat this Get request with KeysOnly
// set to false.
//
- // NOTE: This logic can be removed once all uncounted rows are aged off.
+ // TODO(dnj): This logic can be removed once all uncounted rows are aged
+ // off.
r.KeysOnly = false
return errRepeatRequest
}
@@ -247,14 +250,15 @@ func (s *btStorage) Get(r storage.GetRequest, cb storage.GetCallback) error {
// This is a non-keys-only query, but we are dealing with a legacy row.
// Use the record count as the authority.
//
- // NOTE: This logic can be removed once all uncounted rows are aged off.
+ // TODO(dnj): This logic can be removed once all uncounted rows are aged
+ // off.
rk.count = int64(len(records))
case rk.count == int64(len(records)):
// This is the expected case, so we're set.
//
- // NOTE: Once uncounted rows are aged off, this is the only logic that
- // we need, in the form of a sanity assertion.
+ // TODO(dnj): Once uncounted rows are aged off, this is the only logic
+ // that we need, in the form of a sanity assertion.
break
default:
@@ -326,13 +330,14 @@ func (s *btStorage) Get(r storage.GetRequest, cb storage.GetCallback) error {
}
}
-func (s *btStorage) Tail(p types.StreamPath) ([]byte, types.MessageIndex, error) {
+func (s *btStorage) Tail(project config.ProjectName, path types.StreamPath) ([]byte, types.MessageIndex, error) {
ctx := log.SetFields(s, log.Fields{
- "path": p,
+ "project": project,
+ "path": path,
})
// Iterate through all log keys in the stream. Record the latest one.
- rk := newRowKey(string(p), 0, 0)
+ rk := newRowKey(string(project), string(path), 0, 0)
var latest *rowKey
err := s.raw.getLogData(ctx, rk, 0, true, func(rk *rowKey, data []byte) error {
latest = rk
@@ -412,7 +417,8 @@ func (w *rowWriter) append(d []byte) (appended bool) {
return
}
-func (w *rowWriter) flush(ctx context.Context, raw btTable, index types.MessageIndex, path types.StreamPath) (int, error) {
+func (w *rowWriter) flush(ctx context.Context, raw btTable, index types.MessageIndex,
+ project config.ProjectName, path types.StreamPath) (int, error) {
flushCount := w.count
if flushCount == 0 {
return 0, nil
@@ -421,10 +427,11 @@ func (w *rowWriter) flush(ctx context.Context, raw btTable, index types.MessageI
// Write the current set of buffered rows to the table. Index on the LAST
// row index.
lastIndex := int64(index) + int64(flushCount) - 1
- rk := newRowKey(string(path), lastIndex, int64(w.count))
+ rk := newRowKey(string(project), string(path), lastIndex, int64(w.count))
log.Fields{
"rowKey": rk,
+ "project": project,
"path": path,
"index": index,
"lastIndex": lastIndex,
« no previous file with comments | « server/logdog/storage/bigtable/rowKey_test.go ('k') | server/logdog/storage/bigtable/storage_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698