| Index: server/logdog/storage/bigtable/storage.go
|
| diff --git a/server/logdog/storage/bigtable/storage.go b/server/logdog/storage/bigtable/storage.go
|
| index 997e169ca54d609aa1f7fb0ef12780a9de8a0df3..9c586255741a4cbe6495ed2f298592f1ead9f881 100644
|
| --- a/server/logdog/storage/bigtable/storage.go
|
| +++ b/server/logdog/storage/bigtable/storage.go
|
| @@ -9,6 +9,7 @@ import (
|
| "errors"
|
| "fmt"
|
|
|
| + "github.com/luci/luci-go/common/config"
|
| "github.com/luci/luci-go/common/logdog/types"
|
| log "github.com/luci/luci-go/common/logging"
|
| "github.com/luci/luci-go/common/recordio"
|
| @@ -55,8 +56,8 @@ var (
|
| // errRepeatRequest is a transition sentinel that instructs us to repeat
|
| // our query with KeysOnly set to true.
|
| //
|
| - // This can be deleted once BigTable rows without a count have been aged
|
| - // off.
|
| + // TODO(dnj): This can be deleted once BigTable rows without a count have been
|
| + // aged off.
|
| errRepeatRequest = errors.New("bigtable: repeat request")
|
| )
|
|
|
| @@ -171,7 +172,7 @@ func (s *btStorage) Put(r storage.PutRequest) error {
|
| if appended := rw.append(r.Values[0]); !appended {
|
| // We have failed to append our maximum BigTable row size. Flush any
|
| // currently-buffered row data and try again with an empty buffer.
|
| - count, err := rw.flush(s, s.raw, r.Index, r.Path)
|
| + count, err := rw.flush(s, s.raw, r.Index, r.Project, r.Path)
|
| if err != nil {
|
| return err
|
| }
|
| @@ -191,15 +192,16 @@ func (s *btStorage) Put(r storage.PutRequest) error {
|
| }
|
|
|
| // Flush any buffered rows.
|
| - if _, err := rw.flush(s, s.raw, r.Index, r.Path); err != nil {
|
| + if _, err := rw.flush(s, s.raw, r.Index, r.Project, r.Path); err != nil {
|
| return err
|
| }
|
| return nil
|
| }
|
|
|
| func (s *btStorage) Get(r storage.GetRequest, cb storage.GetCallback) error {
|
| - startKey := newRowKey(string(r.Path), int64(r.Index), 0)
|
| + startKey := newRowKey(string(r.Project), string(r.Path), int64(r.Index), 0)
|
| ctx := log.SetFields(s, log.Fields{
|
| + "project": r.Project,
|
| "path": r.Path,
|
| "index": r.Index,
|
| "limit": r.Limit,
|
| @@ -237,7 +239,8 @@ func (s *btStorage) Get(r storage.GetRequest, cb storage.GetCallback) error {
|
| // inforamtion that we need, so repeat this Get request with KeysOnly
|
| // set to false.
|
| //
|
| - // NOTE: This logic can be removed once all uncounted rows are aged off.
|
| + // TODO(dnj): This logic can be removed once all uncounted rows are aged
|
| + // off.
|
| r.KeysOnly = false
|
| return errRepeatRequest
|
| }
|
| @@ -247,14 +250,15 @@ func (s *btStorage) Get(r storage.GetRequest, cb storage.GetCallback) error {
|
| // This is a non-keys-only query, but we are dealing with a legacy row.
|
| // Use the record count as the authority.
|
| //
|
| - // NOTE: This logic can be removed once all uncounted rows are aged off.
|
| + // TODO(dnj): This logic can be removed once all uncounted rows are aged
|
| + // off.
|
| rk.count = int64(len(records))
|
|
|
| case rk.count == int64(len(records)):
|
| // This is the expected case, so we're set.
|
| //
|
| - // NOTE: Once uncounted rows are aged off, this is the only logic that
|
| - // we need, in the form of a sanity assertion.
|
| + // TODO(dnj): Once uncounted rows are aged off, this is the only logic
|
| + // that we need, in the form of a sanity assertion.
|
| break
|
|
|
| default:
|
| @@ -326,13 +330,14 @@ func (s *btStorage) Get(r storage.GetRequest, cb storage.GetCallback) error {
|
| }
|
| }
|
|
|
| -func (s *btStorage) Tail(p types.StreamPath) ([]byte, types.MessageIndex, error) {
|
| +func (s *btStorage) Tail(project config.ProjectName, path types.StreamPath) ([]byte, types.MessageIndex, error) {
|
| ctx := log.SetFields(s, log.Fields{
|
| - "path": p,
|
| + "project": project,
|
| + "path": path,
|
| })
|
|
|
| // Iterate through all log keys in the stream. Record the latest one.
|
| - rk := newRowKey(string(p), 0, 0)
|
| + rk := newRowKey(string(project), string(path), 0, 0)
|
| var latest *rowKey
|
| err := s.raw.getLogData(ctx, rk, 0, true, func(rk *rowKey, data []byte) error {
|
| latest = rk
|
| @@ -412,7 +417,8 @@ func (w *rowWriter) append(d []byte) (appended bool) {
|
| return
|
| }
|
|
|
| -func (w *rowWriter) flush(ctx context.Context, raw btTable, index types.MessageIndex, path types.StreamPath) (int, error) {
|
| +func (w *rowWriter) flush(ctx context.Context, raw btTable, index types.MessageIndex,
|
| + project config.ProjectName, path types.StreamPath) (int, error) {
|
| flushCount := w.count
|
| if flushCount == 0 {
|
| return 0, nil
|
| @@ -421,10 +427,11 @@ func (w *rowWriter) flush(ctx context.Context, raw btTable, index types.MessageI
|
| // Write the current set of buffered rows to the table. Index on the LAST
|
| // row index.
|
| lastIndex := int64(index) + int64(flushCount) - 1
|
| - rk := newRowKey(string(path), lastIndex, int64(w.count))
|
| + rk := newRowKey(string(project), string(path), lastIndex, int64(w.count))
|
|
|
| log.Fields{
|
| "rowKey": rk,
|
| + "project": project,
|
| "path": path,
|
| "index": index,
|
| "lastIndex": lastIndex,
|
|
|