| Index: server/logdog/storage/bigtable/bigtable.go
|
| diff --git a/server/logdog/storage/bigtable/bigtable.go b/server/logdog/storage/bigtable/bigtable.go
|
| index abada5f7689fe03e49496f336785518f97dff3e6..184d1e3ff04ff3fed096ebc13b79b64e0ef51ccf 100644
|
| --- a/server/logdog/storage/bigtable/bigtable.go
|
| +++ b/server/logdog/storage/bigtable/bigtable.go
|
| @@ -10,27 +10,29 @@ import (
|
|
|
| "github.com/luci/luci-go/common/errors"
|
| "github.com/luci/luci-go/common/grpcutil"
|
| - log "github.com/luci/luci-go/common/logging"
|
| "github.com/luci/luci-go/server/logdog/storage"
|
| "golang.org/x/net/context"
|
| "google.golang.org/cloud/bigtable"
|
| )
|
|
|
| -// errStop is an internal sentinel error used to communicate "stop iteration"
|
| -// to btTable.getLogData.
|
| -var errStop = errors.New("stop")
|
| -
|
| const (
|
| logColumnFamily = "log"
|
| logColumn = "data"
|
| )
|
|
|
| +// Limits taken from here:
|
| +// https://cloud.google.com/bigtable/docs/schema-design
|
| +const (
|
| + // bigTableRowMaxBytes is the maximum number of bytes that a single BigTable
|
| + // row may hold.
|
| + bigTableRowMaxBytes = 1024 * 1024 * 10 // 10MB
|
| +)
|
| +
|
| // btGetCallback is a callback that is invoked for each log data row returned
|
| // by getLogData.
|
| //
|
| // If an error is encountered, no more log data will be fetched. The error will
|
| -// be propagated to the getLogData call unless the returned error is errStop, in
|
| -// which case iteration will stop and getLogData will return nil.
|
| +// be propagated to the getLogData call.
|
| type btGetCallback func(*rowKey, []byte) error
|
|
|
| // btTable is a general interface for BigTable operations intended to enable
|
| @@ -65,17 +67,12 @@ type btTableProd struct {
|
| }
|
|
|
| func (t *btTableProd) putLogData(c context.Context, rk *rowKey, data []byte) error {
|
| - table, err := t.getLogTable()
|
| - if err != nil {
|
| - return err
|
| - }
|
| -
|
| m := bigtable.NewMutation()
|
| m.Set(logColumnFamily, logColumn, bigtable.ServerTime, data)
|
| cm := bigtable.NewCondMutation(bigtable.RowKeyFilter(rk.encode()), nil, m)
|
|
|
| rowExists := false
|
| - if err = table.Apply(c, rk.encode(), cm, bigtable.GetCondMutationResult(&rowExists)); err != nil {
|
| + if err := t.logTable.Apply(c, rk.encode(), cm, bigtable.GetCondMutationResult(&rowExists)); err != nil {
|
| return wrapTransient(err)
|
| }
|
| if rowExists {
|
| @@ -85,10 +82,6 @@ func (t *btTableProd) putLogData(c context.Context, rk *rowKey, data []byte) err
|
| }
|
|
|
| func (t *btTableProd) getLogData(c context.Context, rk *rowKey, limit int, keysOnly bool, cb btGetCallback) error {
|
| - table, err := t.getLogTable()
|
| - if err != nil {
|
| - return err
|
| - }
|
| // Construct read options based on Get request.
|
| ropts := []bigtable.ReadOption{
|
| bigtable.RowFilter(bigtable.FamilyFilter(logColumnFamily)),
|
| @@ -108,7 +101,7 @@ func (t *btTableProd) getLogData(c context.Context, rk *rowKey, limit int, keysO
|
| rng := bigtable.NewRange(rk.encode(), rk.pathPrefixUpperBound())
|
|
|
| innerErr := error(nil)
|
| - err = table.ReadRows(c, rng, func(row bigtable.Row) bool {
|
| + err := t.logTable.ReadRows(c, rng, func(row bigtable.Row) bool {
|
| data := []byte(nil)
|
| if !keysOnly {
|
| err := error(nil)
|
| @@ -121,18 +114,12 @@ func (t *btTableProd) getLogData(c context.Context, rk *rowKey, limit int, keysO
|
|
|
| drk, err := decodeRowKey(row.Key())
|
| if err != nil {
|
| - log.Fields{
|
| - log.ErrorKey: err,
|
| - "value": row.Key(),
|
| - }.Warningf(c, "Failed to parse row key.")
|
| - innerErr = storage.ErrBadData
|
| + innerErr = err
|
| return false
|
| }
|
|
|
| if err := cb(drk, data); err != nil {
|
| - if err != errStop {
|
| - innerErr = err
|
| - }
|
| + innerErr = err
|
| return false
|
| }
|
| return true
|
| @@ -144,16 +131,11 @@ func (t *btTableProd) getLogData(c context.Context, rk *rowKey, limit int, keysO
|
| }
|
|
|
| func (t *btTableProd) setMaxLogAge(c context.Context, d time.Duration) error {
|
| - ac, err := t.getAdminClient()
|
| - if err != nil {
|
| - return err
|
| - }
|
| -
|
| var logGCPolicy bigtable.GCPolicy
|
| if d > 0 {
|
| logGCPolicy = bigtable.MaxAgePolicy(d)
|
| }
|
| - if err := ac.SetGCPolicy(c, t.LogTable, logColumnFamily, logGCPolicy); err != nil {
|
| + if err := t.adminClient.SetGCPolicy(c, t.LogTable, logColumnFamily, logGCPolicy); err != nil {
|
| return wrapTransient(err)
|
| }
|
| return nil
|
|
|