Chromium Code Reviews| Index: server/logdog/storage/bigtable/bigtable.go |
| diff --git a/server/logdog/storage/bigtable/bigtable.go b/server/logdog/storage/bigtable/bigtable.go |
| index abada5f7689fe03e49496f336785518f97dff3e6..184d1e3ff04ff3fed096ebc13b79b64e0ef51ccf 100644 |
| --- a/server/logdog/storage/bigtable/bigtable.go |
| +++ b/server/logdog/storage/bigtable/bigtable.go |
| @@ -10,27 +10,29 @@ import ( |
| "github.com/luci/luci-go/common/errors" |
| "github.com/luci/luci-go/common/grpcutil" |
| - log "github.com/luci/luci-go/common/logging" |
| "github.com/luci/luci-go/server/logdog/storage" |
| "golang.org/x/net/context" |
| "google.golang.org/cloud/bigtable" |
| ) |
| -// errStop is an internal sentinel error used to communicate "stop iteration" |
| -// to btTable.getLogData. |
| -var errStop = errors.New("stop") |
| - |
| const ( |
| logColumnFamily = "log" |
| logColumn = "data" |
| ) |
| +// Limits taken from here: |
| +// https://cloud.google.com/bigtable/docs/schema-design |
| +const ( |
| + // bigTableRowMaxBytes is the maximum number of bytes that a single BigTable |
| + // row may hold. |
| + bigTableRowMaxBytes = 1024 * 1024 * 10 // 10MB |
| +) |
| + |
| // btGetCallback is a callback that is invoked for each log data row returned |
| // by getLogData. |
| // |
| // If an error is encountered, no more log data will be fetched. The error will |
| -// be propagated to the getLogData call unless the returned error is errStop, in |
| -// which case iteration will stop and getLogData will return nil. |
| +// be propagated to the getLogData call. |
| type btGetCallback func(*rowKey, []byte) error |
|
nodir
2016/03/30 18:29:42
[general feedback] you don't have to prefix types
dnj
2016/03/30 18:37:13
Yep, but in this case, this is a BigTable-specific
|
| // btTable is a general interface for BigTable operations intended to enable |
| @@ -65,17 +67,12 @@ type btTableProd struct { |
| } |
| func (t *btTableProd) putLogData(c context.Context, rk *rowKey, data []byte) error { |
| - table, err := t.getLogTable() |
| - if err != nil { |
| - return err |
| - } |
| - |
| m := bigtable.NewMutation() |
| m.Set(logColumnFamily, logColumn, bigtable.ServerTime, data) |
| cm := bigtable.NewCondMutation(bigtable.RowKeyFilter(rk.encode()), nil, m) |
| rowExists := false |
| - if err = table.Apply(c, rk.encode(), cm, bigtable.GetCondMutationResult(&rowExists)); err != nil { |
| + if err := t.logTable.Apply(c, rk.encode(), cm, bigtable.GetCondMutationResult(&rowExists)); err != nil { |
| return wrapTransient(err) |
| } |
| if rowExists { |
| @@ -85,10 +82,6 @@ func (t *btTableProd) putLogData(c context.Context, rk *rowKey, data []byte) err |
| } |
| func (t *btTableProd) getLogData(c context.Context, rk *rowKey, limit int, keysOnly bool, cb btGetCallback) error { |
| - table, err := t.getLogTable() |
| - if err != nil { |
| - return err |
| - } |
| // Construct read options based on Get request. |
| ropts := []bigtable.ReadOption{ |
| bigtable.RowFilter(bigtable.FamilyFilter(logColumnFamily)), |
| @@ -108,7 +101,7 @@ func (t *btTableProd) getLogData(c context.Context, rk *rowKey, limit int, keysO |
| rng := bigtable.NewRange(rk.encode(), rk.pathPrefixUpperBound()) |
| innerErr := error(nil) |
| - err = table.ReadRows(c, rng, func(row bigtable.Row) bool { |
| + err := t.logTable.ReadRows(c, rng, func(row bigtable.Row) bool { |
| data := []byte(nil) |
| if !keysOnly { |
| err := error(nil) |
| @@ -121,18 +114,12 @@ func (t *btTableProd) getLogData(c context.Context, rk *rowKey, limit int, keysO |
| drk, err := decodeRowKey(row.Key()) |
| if err != nil { |
| - log.Fields{ |
| - log.ErrorKey: err, |
| - "value": row.Key(), |
| - }.Warningf(c, "Failed to parse row key.") |
| - innerErr = storage.ErrBadData |
| + innerErr = err |
| return false |
| } |
| if err := cb(drk, data); err != nil { |
| - if err != errStop { |
| - innerErr = err |
| - } |
| + innerErr = err |
| return false |
| } |
| return true |
| @@ -144,16 +131,11 @@ func (t *btTableProd) getLogData(c context.Context, rk *rowKey, limit int, keysO |
| } |
| func (t *btTableProd) setMaxLogAge(c context.Context, d time.Duration) error { |
| - ac, err := t.getAdminClient() |
| - if err != nil { |
| - return err |
| - } |
| - |
| var logGCPolicy bigtable.GCPolicy |
| if d > 0 { |
| logGCPolicy = bigtable.MaxAgePolicy(d) |
| } |
| - if err := ac.SetGCPolicy(c, t.LogTable, logColumnFamily, logGCPolicy); err != nil { |
| + if err := t.adminClient.SetGCPolicy(c, t.LogTable, logColumnFamily, logGCPolicy); err != nil { |
| return wrapTransient(err) |
| } |
| return nil |