Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(486)

Unified Diff: server/logdog/storage/bigtable/storage.go

Issue 1838803002: LogDog: BigTable batching schema. (Closed) Base URL: https://github.com/luci/luci-go@recordio-split
Patch Set: Minor comments and quality of code tweaks. Created 4 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « server/logdog/storage/bigtable/initialize.go ('k') | server/logdog/storage/bigtable/storage_test.go » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: server/logdog/storage/bigtable/storage.go
diff --git a/server/logdog/storage/bigtable/storage.go b/server/logdog/storage/bigtable/storage.go
index 3b0c117babf7fcbf101f230d23e5d6fe6b761ef7..fd5ef98b538f22eca3cc34e1d22b3c9cf3b465fa 100644
--- a/server/logdog/storage/bigtable/storage.go
+++ b/server/logdog/storage/bigtable/storage.go
@@ -5,10 +5,13 @@
package bigtable
import (
+ "bytes"
+ "errors"
"fmt"
"github.com/luci/luci-go/common/logdog/types"
log "github.com/luci/luci-go/common/logging"
+ "github.com/luci/luci-go/common/recordio"
"github.com/luci/luci-go/server/logdog/storage"
"golang.org/x/net/context"
"google.golang.org/cloud"
@@ -44,6 +47,10 @@ const (
tailRowMaxSize = 1024 * 1024 * 16
)
+// errStop is an internal sentinel error used to indicate "stop iteration"
+// to btTable.getLogData iterator. It will
+var errStop = errors.New("stop")
+
// Options is a set of configuration options for BigTable storage.
type Options struct {
// Project is the name of the project to connect to.
@@ -60,30 +67,65 @@ type Options struct {
LogTable string
}
+func (o *Options) client(ctx context.Context) (*bigtable.Client, error) {
+ return bigtable.NewClient(ctx, o.Project, o.Zone, o.Cluster, o.ClientOptions...)
+}
+
+func (o *Options) adminClient(ctx context.Context) (*bigtable.AdminClient, error) {
+ return bigtable.NewAdminClient(ctx, o.Project, o.Zone, o.Cluster, o.ClientOptions...)
+}
+
// btStorage is a storage.Storage implementation that uses Google Cloud BigTable
// as a backend.
type btStorage struct {
*Options
- ctx context.Context
+ // Context is the bound supplied with New. It is retained (rather than
+ // supplied on a per-call basis) because a special Storage Context devoid of
+ // gRPC metadata is needed for Storage calls.
+ context.Context
client *bigtable.Client
logTable *bigtable.Table
adminClient *bigtable.AdminClient
- table btTable
+ // raw is the underlying btTable instance to use for raw operations.
+ raw btTable
+ // maxRowSize is the maxmium number of bytes that can be stored in a single
+ // BigTable row. This is a function of BigTable, and constant in production
+ // (bigTableRowMaxBytes), but variable here to allow for testing to control.
+ maxRowSize int
}
// New instantiates a new Storage instance connected to a BigTable cluster.
//
// The returned Storage instance will close the Client when its Close() method
// is called.
-func New(ctx context.Context, o Options) storage.Storage {
+func New(ctx context.Context, o Options) (storage.Storage, error) {
+ client, err := o.client(ctx)
+ if err != nil {
+ return nil, err
+ }
+
+ admin, err := o.adminClient(ctx)
+ if err != nil {
+ return nil, err
+ }
+
+ return newBTStorage(ctx, o, client, admin), nil
+}
+
+func newBTStorage(ctx context.Context, o Options, client *bigtable.Client, adminClient *bigtable.AdminClient) *btStorage {
st := &btStorage{
Options: &o,
- ctx: ctx,
+ Context: ctx,
+
+ client: client,
+ logTable: client.Open(o.LogTable),
+ adminClient: adminClient,
+ maxRowSize: bigTableRowMaxBytes,
}
- st.table = &btTableProd{st}
+ st.raw = &btTableProd{st}
return st
}
@@ -100,72 +142,121 @@ func (s *btStorage) Close() {
}
func (s *btStorage) Config(cfg storage.Config) error {
- if err := s.table.setMaxLogAge(s.ctx, cfg.MaxLogAge); err != nil {
- log.WithError(err).Errorf(s.ctx, "Failed to set 'log' GC policy.")
+ if err := s.raw.setMaxLogAge(s, cfg.MaxLogAge); err != nil {
+ log.WithError(err).Errorf(s, "Failed to set 'log' GC policy.")
return err
}
log.Fields{
"maxLogAge": cfg.MaxLogAge,
- }.Infof(s.ctx, "Set maximum log age.")
+ }.Infof(s, "Set maximum log age.")
return nil
}
-func (s *btStorage) Put(r *storage.PutRequest) error {
- rk := newRowKey(string(r.Path), int64(r.Index))
- ctx := log.SetFields(s.ctx, log.Fields{
- "rowKey": rk,
- "path": r.Path,
- "index": r.Index,
- "size": len(r.Value),
- })
- log.Debugf(ctx, "Adding entry to BigTable.")
+func (s *btStorage) Put(r storage.PutRequest) error {
+ rw := rowWriter{
+ threshold: s.maxRowSize,
+ }
+
+ for len(r.Values) > 0 {
+ // Add the next entry to the writer.
+ if appended := rw.append(r.Values[0]); !appended {
+ // We have failed to append our maximum BigTable row size. Flush any
+ // currently-buffered row data and try again with an empty buffer.
+ count, err := rw.flush(s, s.raw, r.Index, r.Path)
+ if err != nil {
+ return err
+ }
+
+ if count == 0 {
+ // Nothing was buffered, but we still couldn't append an entry. The
+ // current entry is too large by itself, so we must fail.
+ return fmt.Errorf("single row entry exceeds maximum size (%d > %d)", len(r.Values[0]), bigTableRowMaxBytes)
+ }
+
+ r.Index += types.MessageIndex(count)
+ continue
+ }
+
+ // We successfully appended this entry, so advance.
+ r.Values = r.Values[1:]
+ }
- return s.table.putLogData(ctx, rk, r.Value)
+ // Flush any buffered rows.
+ if _, err := rw.flush(s, s.raw, r.Index, r.Path); err != nil {
+ return err
+ }
+ return nil
}
-func (s *btStorage) Get(r *storage.GetRequest, cb storage.GetCallback) error {
+func (s *btStorage) Get(r storage.GetRequest, cb storage.GetCallback) error {
startKey := newRowKey(string(r.Path), int64(r.Index))
- c := log.SetFields(s.ctx, log.Fields{
+ ctx := log.SetFields(s, log.Fields{
"path": r.Path,
"index": r.Index,
"startRowKey": startKey,
})
- err := s.table.getLogData(c, startKey, r.Limit, false, func(rk *rowKey, data []byte) error {
+ limit := r.Limit
+ err := s.raw.getLogData(ctx, startKey, r.Limit, false, func(rk *rowKey, data []byte) error {
// Does this key match our requested log stream? If not, we've moved past
// this stream's records and must stop iteration.
if !rk.sharesPathWith(startKey) {
return errStop
}
- // We have a row. Invoke our callback.
- if !cb(types.MessageIndex(rk.index), data) {
- return errStop
+ // We have a row. Split it into individual records.
+ records, err := recordio.Split(data)
+ if err != nil {
+ return storage.ErrBadData
+ }
+
+ // Issue our callback for each row. Since we index the row on the LAST entry
+ // in the row, count backwards to get the index of the first entry.
+ firstIndex := types.MessageIndex(rk.index - int64(len(records)) + 1)
+ if firstIndex < 0 {
+ return storage.ErrBadData
+ }
+ for i, row := range records {
+ index := firstIndex + types.MessageIndex(i)
+ if index < r.Index {
+ // An offset was specified, and this row is before it, so skip.
+ continue
+ }
+
+ if !cb(index, row) {
+ return errStop
+ }
+
+ // Artificially apply limit within our row records.
+ if limit > 0 {
+ limit--
+ if limit == 0 {
+ return errStop
+ }
+ }
}
return nil
})
- if err != nil {
- log.Fields{
- log.ErrorKey: err,
- "project": s.Project,
- "zone": s.Zone,
- "cluster": s.Cluster,
- "table": s.LogTable,
- }.Errorf(c, "Failed to retrieve row range.")
+
+ switch err {
+ case nil, errStop:
+ return nil
+
+ default:
+ log.WithError(err).Errorf(ctx, "Failed to retrieve row range.")
return err
}
- return nil
}
func (s *btStorage) Tail(p types.StreamPath) ([]byte, types.MessageIndex, error) {
- c := log.SetFields(s.ctx, log.Fields{
+ ctx := log.SetFields(s, log.Fields{
"path": p,
})
// Iterate through all log keys in the stream. Record the latest one.
rk := newRowKey(string(p), 0)
var latest *rowKey
- err := s.table.getLogData(c, rk, 0, true, func(rk *rowKey, data []byte) error {
+ err := s.raw.getLogData(ctx, rk, 0, true, func(rk *rowKey, data []byte) error {
latest = rk
return nil
})
@@ -176,7 +267,7 @@ func (s *btStorage) Tail(p types.StreamPath) ([]byte, types.MessageIndex, error)
"zone": s.Zone,
"cluster": s.Cluster,
"table": s.LogTable,
- }.Errorf(c, "Failed to scan for tail.")
+ }.Errorf(ctx, "Failed to scan for tail.")
}
if latest == nil {
@@ -186,52 +277,88 @@ func (s *btStorage) Tail(p types.StreamPath) ([]byte, types.MessageIndex, error)
// Fetch the latest row's data.
var d []byte
- err = s.table.getLogData(c, latest, 1, false, func(rk *rowKey, data []byte) error {
- d = data
+ err = s.raw.getLogData(ctx, latest, 1, false, func(rk *rowKey, data []byte) error {
+ records, err := recordio.Split(data)
+ if err != nil || len(records) == 0 {
+ return storage.ErrBadData
+ }
+ d = records[len(records)-1]
return errStop
})
- if err != nil {
+ if err != nil && err != errStop {
log.Fields{
log.ErrorKey: err,
"project": s.Project,
"zone": s.Zone,
"cluster": s.Cluster,
"table": s.LogTable,
- }.Errorf(c, "Failed to retrieve tail row.")
+ }.Errorf(ctx, "Failed to retrieve tail row.")
}
return d, types.MessageIndex(latest.index), nil
}
-func (s *btStorage) getClient() (*bigtable.Client, error) {
- if s.client == nil {
- var err error
- if s.client, err = bigtable.NewClient(s.ctx, s.Project, s.Zone, s.Cluster, s.ClientOptions...); err != nil {
- return nil, fmt.Errorf("failed to create client: %s", err)
- }
- }
- return s.client, nil
+// rowWriter facilitates writing several consecutive data values to a single
+// BigTable row.
+type rowWriter struct {
+ // buf is the current set of buffered data.
+ buf bytes.Buffer
+
+ // count is the number of rows in the writer.
+ count int
+
+ // threshold is the maximum number of bytes that we can write.
+ threshold int
}
-func (s *btStorage) getAdminClient() (*bigtable.AdminClient, error) {
- if s.adminClient == nil {
- var err error
- if s.adminClient, err = bigtable.NewAdminClient(s.ctx, s.Project, s.Zone, s.Cluster, s.ClientOptions...); err != nil {
- return nil, fmt.Errorf("failed to create client: %s", err)
+func (w *rowWriter) append(d []byte) (appended bool) {
+ origSize := w.buf.Len()
+ defer func() {
+ // Restore our previous buffer state if we are reporting the write as
+ // failed.
+ if !appended {
+ w.buf.Truncate(origSize)
}
+ }()
+
+ // Serialize the next entry as a recordio blob.
+ if _, err := recordio.WriteFrame(&w.buf, d); err != nil {
+ return
+ }
+
+ // If we have exceeded our threshold, report a failure.
+ appended = (w.buf.Len() <= w.threshold)
+ if appended {
+ w.count++
}
- return s.adminClient, nil
+ return
}
-// getLogTable returns a btTable instance. If one is not already configured, a
-// production instance will be generated and cached.
-func (s *btStorage) getLogTable() (*bigtable.Table, error) {
- if s.logTable == nil {
- client, err := s.getClient()
- if err != nil {
- return nil, err
- }
- s.logTable = client.Open(s.LogTable)
+func (w *rowWriter) flush(ctx context.Context, raw btTable, index types.MessageIndex, path types.StreamPath) (int, error) {
+ flushCount := w.count
+ if flushCount == 0 {
+ return 0, nil
}
- return s.logTable, nil
+
+ // Write the current set of buffered rows to the table. Index on the LAST
+ // row index.
+ lastIndex := int64(index) + int64(flushCount) - 1
+ rk := newRowKey(string(path), lastIndex)
+
+ log.Fields{
+ "rowKey": rk,
+ "path": path,
+ "index": index,
+ "lastIndex": lastIndex,
+ "count": w.count,
+ "size": w.buf.Len(),
+ }.Debugf(ctx, "Adding entries to BigTable.")
+ if err := raw.putLogData(ctx, rk, w.buf.Bytes()); err != nil {
+ return 0, err
+ }
+
+ // Reset our buffer state.
+ w.buf.Reset()
+ w.count = 0
+ return flushCount, nil
}
« no previous file with comments | « server/logdog/storage/bigtable/initialize.go ('k') | server/logdog/storage/bigtable/storage_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698