| Index: logdog/common/storage/bigtable/storage.go
|
| diff --git a/logdog/common/storage/bigtable/storage.go b/logdog/common/storage/bigtable/storage.go
|
| index 4be49071382ab23974e3ef80a8019939933c3da2..8fdd1a076617fda89b2f78b20d565b37c8d801d3 100644
|
| --- a/logdog/common/storage/bigtable/storage.go
|
| +++ b/logdog/common/storage/bigtable/storage.go
|
| @@ -13,6 +13,7 @@ import (
|
| "github.com/luci/luci-go/common/data/recordio"
|
| log "github.com/luci/luci-go/common/logging"
|
| "github.com/luci/luci-go/logdog/common/storage"
|
| + "github.com/luci/luci-go/logdog/common/storage/caching"
|
| "github.com/luci/luci-go/logdog/common/types"
|
|
|
| "cloud.google.com/go/bigtable"
|
| @@ -67,6 +68,9 @@ type Options struct {
|
|
|
| // Table is the name of the BigTable table to use for logs.
|
| LogTable string
|
| +
|
| + // Cache, if not nil, will be used to cache data.
|
| + Cache caching.Cache
|
| }
|
|
|
| func (o *Options) client(ctx context.Context) (*bigtable.Client, error) {
|
| @@ -91,8 +95,10 @@ type btStorage struct {
|
| logTable *bigtable.Table
|
| adminClient *bigtable.AdminClient
|
|
|
| - // raw is the underlying btTable instance to use for raw operations.
|
| + // raw, if not nil, is the raw BigTable interface to use. This is useful for
|
| + // testing. If nil, this will default to the production isntance.
|
| raw btTable
|
| +
|
| // maxRowSize is the maxmium number of bytes that can be stored in a single
|
| // BigTable row. This is a function of BigTable, and constant in production
|
| // (bigTableRowMaxBytes), but variable here to allow for testing to control.
|
| @@ -114,22 +120,25 @@ func New(ctx context.Context, o Options) (storage.Storage, error) {
|
| return nil, err
|
| }
|
|
|
| - return newBTStorage(ctx, o, client, admin), nil
|
| + return newBTStorage(ctx, o, client, admin, nil), nil
|
| }
|
|
|
| -func newBTStorage(ctx context.Context, o Options, client *bigtable.Client, adminClient *bigtable.AdminClient) *btStorage {
|
| +func newBTStorage(ctx context.Context, o Options, client *bigtable.Client, adminClient *bigtable.AdminClient, raw btTable) *btStorage {
|
| s := &btStorage{
|
| Options: &o,
|
| Context: ctx,
|
|
|
| client: client,
|
| adminClient: adminClient,
|
| + raw: raw,
|
| maxRowSize: bigTableRowMaxBytes,
|
| }
|
| if s.client != nil {
|
| s.logTable = s.client.Open(o.LogTable)
|
| }
|
| - s.raw = &btTableProd{s}
|
| + if s.raw == nil {
|
| + s.raw = &btTableProd{s}
|
| + }
|
| return s
|
| }
|
|
|
| @@ -299,8 +308,14 @@ func (s *btStorage) Tail(project config.ProjectName, path types.StreamPath) (*st
|
| "path": path,
|
| })
|
|
|
| + // Load the "last tail index" from cache. If we have no cache, start at 0.
|
| + var startIdx int64
|
| + if s.Cache != nil {
|
| + startIdx = getLastTailIndex(s, s.Cache, project, path)
|
| + }
|
| +
|
| // Iterate through all log keys in the stream. Record the latest one.
|
| - rk := newRowKey(string(project), string(path), 0, 0)
|
| + rk := newRowKey(string(project), string(path), startIdx, 0)
|
| var latest *rowKey
|
| err := s.raw.getLogData(ctx, rk, 0, true, func(rk *rowKey, data []byte) error {
|
| latest = rk
|
| @@ -320,6 +335,11 @@ func (s *btStorage) Tail(project config.ProjectName, path types.StreamPath) (*st
|
| return nil, storage.ErrDoesNotExist
|
| }
|
|
|
| + // Update our cache if the tail index has changed.
|
| + if s.Cache != nil && startIdx != latest.index {
|
| + putLastTailIndex(s, s.Cache, project, path, latest.index)
|
| + }
|
| +
|
| // Fetch the latest row's data.
|
| var d []byte
|
| err = s.raw.getLogData(ctx, latest, 1, false, func(rk *rowKey, data []byte) error {
|
|
|