Chromium Code Reviews| Index: server/logdog/storage/bigtable/storage.go |
| diff --git a/server/logdog/storage/bigtable/storage.go b/server/logdog/storage/bigtable/storage.go |
| index 3b0c117babf7fcbf101f230d23e5d6fe6b761ef7..0f472b21a52f2760ddacb21f98bf0ae50c7f903b 100644 |
| --- a/server/logdog/storage/bigtable/storage.go |
| +++ b/server/logdog/storage/bigtable/storage.go |
| @@ -5,10 +5,13 @@ |
| package bigtable |
| import ( |
| + "bytes" |
| + "errors" |
| "fmt" |
| "github.com/luci/luci-go/common/logdog/types" |
| log "github.com/luci/luci-go/common/logging" |
| + "github.com/luci/luci-go/common/recordio" |
| "github.com/luci/luci-go/server/logdog/storage" |
| "golang.org/x/net/context" |
| "google.golang.org/cloud" |
| @@ -44,6 +47,10 @@ const ( |
| tailRowMaxSize = 1024 * 1024 * 16 |
| ) |
| +// errStop is an internal sentinel error used to indicate "stop iteration" |
| +// to btTable.getLogData iterator. It will |
| +var errStop = errors.New("stop") |
| + |
| // Options is a set of configuration options for BigTable storage. |
| type Options struct { |
| // Project is the name of the project to connect to. |
| @@ -60,28 +67,59 @@ type Options struct { |
| LogTable string |
| } |
| +func (o *Options) client(ctx context.Context) (*bigtable.Client, error) { |
| + return bigtable.NewClient(ctx, o.Project, o.Zone, o.Cluster, o.ClientOptions...) |
| +} |
| + |
| +func (o *Options) adminClient(ctx context.Context) (*bigtable.AdminClient, error) { |
| + return bigtable.NewAdminClient(ctx, o.Project, o.Zone, o.Cluster, o.ClientOptions...) |
| +} |
| + |
| // btStorage is a storage.Storage implementation that uses Google Cloud BigTable |
| // as a backend. |
| type btStorage struct { |
| *Options |
| - ctx context.Context |
| + // Context is the bound supplied with New. It is retained (rather than |
| + // supplied on a per-call basis) because a special Storage Context is needed |
| + // for Storage calls). See GAE code for more information. |
|
nodir
2016/03/30 18:29:42
"GAE code" is abstract. Provide a link or say some
dnj
2016/03/30 18:37:13
Done.
|
| + context.Context |
| client *bigtable.Client |
| logTable *bigtable.Table |
| adminClient *bigtable.AdminClient |
| - table btTable |
| + table btTable |
| + maxRowSize int |
|
nodir
2016/03/30 18:29:42
a comment would be nice. Initially I thought it is
dnj
2016/03/30 18:37:13
Done.
|
| } |
| // New instantiates a new Storage instance connected to a BigTable cluster. |
| // |
| // The returned Storage instance will close the Client when its Close() method |
| // is called. |
| -func New(ctx context.Context, o Options) storage.Storage { |
| +func New(ctx context.Context, o Options) (storage.Storage, error) { |
| + client, err := o.client(ctx) |
| + if err != nil { |
| + return nil, err |
| + } |
| + |
| + admin, err := o.adminClient(ctx) |
| + if err != nil { |
| + return nil, err |
| + } |
| + |
| + return newBTStorage(ctx, o, client, admin), nil |
| +} |
| + |
| +func newBTStorage(ctx context.Context, o Options, client *bigtable.Client, adminClient *bigtable.AdminClient) *btStorage { |
| st := &btStorage{ |
| Options: &o, |
| - ctx: ctx, |
| + Context: ctx, |
| + |
| + client: client, |
| + logTable: client.Open(o.LogTable), |
| + adminClient: adminClient, |
| + maxRowSize: bigTableRowMaxBytes, |
| } |
| st.table = &btTableProd{st} |
| return st |
| @@ -100,72 +138,121 @@ func (s *btStorage) Close() { |
| } |
| func (s *btStorage) Config(cfg storage.Config) error { |
| - if err := s.table.setMaxLogAge(s.ctx, cfg.MaxLogAge); err != nil { |
| - log.WithError(err).Errorf(s.ctx, "Failed to set 'log' GC policy.") |
| + if err := s.table.setMaxLogAge(s, cfg.MaxLogAge); err != nil { |
| + log.WithError(err).Errorf(s, "Failed to set 'log' GC policy.") |
| return err |
| } |
| log.Fields{ |
| "maxLogAge": cfg.MaxLogAge, |
| - }.Infof(s.ctx, "Set maximum log age.") |
| + }.Infof(s, "Set maximum log age.") |
| return nil |
| } |
| -func (s *btStorage) Put(r *storage.PutRequest) error { |
| - rk := newRowKey(string(r.Path), int64(r.Index)) |
| - ctx := log.SetFields(s.ctx, log.Fields{ |
| - "rowKey": rk, |
| - "path": r.Path, |
| - "index": r.Index, |
| - "size": len(r.Value), |
| - }) |
| - log.Debugf(ctx, "Adding entry to BigTable.") |
| +func (s *btStorage) Put(r storage.PutRequest) error { |
| + rw := rowWriter{ |
| + threshold: s.maxRowSize, |
| + } |
| + |
| + for len(r.Values) > 0 { |
| + // Add the next entry to the writer. |
| + if appended := rw.append(r.Values[0]); !appended { |
| + // We have failed to append our maximum BigTable row size. Flush any |
| + // currently-buffered row data and try again with an empty buffer. |
| + count, err := rw.flush(s, s.table, r.Index, r.Path) |
| + if err != nil { |
| + return err |
| + } |
| + |
| + if count == 0 { |
| + // Nothing was buffered, but we still couldn't append an entry. The |
| + // current entry is too large by itself, so we must fail. |
| + return fmt.Errorf("single row entry exceeds maximum size (%d > %d)", len(r.Values[0]), bigTableRowMaxBytes) |
| + } |
| + |
| + r.Index += types.MessageIndex(count) |
| + continue |
| + } |
| + |
| + // We successfully appended this entry, so advance. |
| + r.Values = r.Values[1:] |
| + } |
| - return s.table.putLogData(ctx, rk, r.Value) |
| + // Flush any buffered rows. |
| + if _, err := rw.flush(s, s.table, r.Index, r.Path); err != nil { |
| + return err |
| + } |
| + return nil |
| } |
| -func (s *btStorage) Get(r *storage.GetRequest, cb storage.GetCallback) error { |
| +func (s *btStorage) Get(r storage.GetRequest, cb storage.GetCallback) error { |
| startKey := newRowKey(string(r.Path), int64(r.Index)) |
| - c := log.SetFields(s.ctx, log.Fields{ |
| + ctx := log.SetFields(s, log.Fields{ |
| "path": r.Path, |
| "index": r.Index, |
| "startRowKey": startKey, |
| }) |
| - err := s.table.getLogData(c, startKey, r.Limit, false, func(rk *rowKey, data []byte) error { |
| + limit := r.Limit |
| + err := s.table.getLogData(ctx, startKey, r.Limit, false, func(rk *rowKey, data []byte) error { |
| // Does this key match our requested log stream? If not, we've moved past |
| // this stream's records and must stop iteration. |
| if !rk.sharesPathWith(startKey) { |
| return errStop |
| } |
| - // We have a row. Invoke our callback. |
| - if !cb(types.MessageIndex(rk.index), data) { |
| - return errStop |
| + // We have a row. Split it into individual records. |
| + records, err := recordio.Split(data) |
| + if err != nil { |
| + return storage.ErrBadData |
| + } |
| + |
| + // Issue our callback for each row. Since we index the row on the LAST entry |
| + // in the row, count backwards to get the index of the first entry. |
| + firstIndex := types.MessageIndex(rk.index - int64(len(records)) + 1) |
| + if firstIndex < 0 { |
| + return storage.ErrBadData |
| + } |
| + for i, row := range records { |
| + index := firstIndex + types.MessageIndex(i) |
| + if index < r.Index { |
| + // An offset was specified, and this row is before it, so skip. |
| + continue |
| + } |
| + |
| + if !cb(index, row) { |
| + return errStop |
| + } |
| + |
| + // Artificially apply limit within our row records. |
| + if limit > 0 { |
| + limit-- |
| + if limit == 0 { |
| + return errStop |
| + } |
| + } |
| } |
| return nil |
| }) |
| - if err != nil { |
| - log.Fields{ |
| - log.ErrorKey: err, |
| - "project": s.Project, |
| - "zone": s.Zone, |
| - "cluster": s.Cluster, |
| - "table": s.LogTable, |
| - }.Errorf(c, "Failed to retrieve row range.") |
| + |
| + switch err { |
| + case nil, errStop: |
| + return nil |
| + |
| + default: |
| + log.WithError(err).Errorf(ctx, "Failed to retrieve row range.") |
| return err |
| } |
| - return nil |
| } |
| func (s *btStorage) Tail(p types.StreamPath) ([]byte, types.MessageIndex, error) { |
| - c := log.SetFields(s.ctx, log.Fields{ |
| + ctx := log.SetFields(s, log.Fields{ |
| "path": p, |
| }) |
| // Iterate through all log keys in the stream. Record the latest one. |
| rk := newRowKey(string(p), 0) |
| var latest *rowKey |
| - err := s.table.getLogData(c, rk, 0, true, func(rk *rowKey, data []byte) error { |
| + err := s.table.getLogData(ctx, rk, 0, true, func(rk *rowKey, data []byte) error { |
| latest = rk |
| return nil |
| }) |
| @@ -176,7 +263,7 @@ func (s *btStorage) Tail(p types.StreamPath) ([]byte, types.MessageIndex, error) |
| "zone": s.Zone, |
| "cluster": s.Cluster, |
| "table": s.LogTable, |
| - }.Errorf(c, "Failed to scan for tail.") |
| + }.Errorf(ctx, "Failed to scan for tail.") |
| } |
| if latest == nil { |
| @@ -186,52 +273,88 @@ func (s *btStorage) Tail(p types.StreamPath) ([]byte, types.MessageIndex, error) |
| // Fetch the latest row's data. |
| var d []byte |
| - err = s.table.getLogData(c, latest, 1, false, func(rk *rowKey, data []byte) error { |
| - d = data |
| + err = s.table.getLogData(ctx, latest, 1, false, func(rk *rowKey, data []byte) error { |
| + records, err := recordio.Split(data) |
| + if err != nil || len(records) == 0 { |
| + return storage.ErrBadData |
| + } |
| + d = records[len(records)-1] |
| return errStop |
| }) |
| - if err != nil { |
| + if err != nil && err != errStop { |
| log.Fields{ |
| log.ErrorKey: err, |
| "project": s.Project, |
| "zone": s.Zone, |
| "cluster": s.Cluster, |
| "table": s.LogTable, |
| - }.Errorf(c, "Failed to retrieve tail row.") |
| + }.Errorf(ctx, "Failed to retrieve tail row.") |
| } |
| return d, types.MessageIndex(latest.index), nil |
| } |
| -func (s *btStorage) getClient() (*bigtable.Client, error) { |
| - if s.client == nil { |
| - var err error |
| - if s.client, err = bigtable.NewClient(s.ctx, s.Project, s.Zone, s.Cluster, s.ClientOptions...); err != nil { |
| - return nil, fmt.Errorf("failed to create client: %s", err) |
| - } |
| - } |
| - return s.client, nil |
| +// rowWriter facilitates writing several consecutive data values to a single |
| +// BigTable row. |
| +type rowWriter struct { |
| + // buf is the current set of buffered data. |
| + buf bytes.Buffer |
| + |
| + // count is the number of rows in the writer. |
| + count int |
| + |
| + // threshold is the maximum number of bytes that we can write. |
| + threshold int |
| } |
| -func (s *btStorage) getAdminClient() (*bigtable.AdminClient, error) { |
| - if s.adminClient == nil { |
| - var err error |
| - if s.adminClient, err = bigtable.NewAdminClient(s.ctx, s.Project, s.Zone, s.Cluster, s.ClientOptions...); err != nil { |
| - return nil, fmt.Errorf("failed to create client: %s", err) |
| +func (w *rowWriter) append(d []byte) (appended bool) { |
| + origSize := w.buf.Len() |
| + defer func() { |
| + // Restore our previous buffer state if we are reporting the write as |
| + // failed. |
| + if !appended { |
| + w.buf.Truncate(origSize) |
| } |
| + }() |
| + |
| + // Serialize the next entry as a recordio blob. |
| + if _, err := recordio.WriteFrame(&w.buf, d); err != nil { |
| + return |
| + } |
| + |
| + // If we have exceeded our threshold, report a failure. |
| + appended = (w.buf.Len() <= w.threshold) |
| + if appended { |
| + w.count++ |
| } |
| - return s.adminClient, nil |
| + return |
| } |
| -// getLogTable returns a btTable instance. If one is not already configured, a |
| -// production instance will be generated and cached. |
| -func (s *btStorage) getLogTable() (*bigtable.Table, error) { |
| - if s.logTable == nil { |
| - client, err := s.getClient() |
| - if err != nil { |
| - return nil, err |
| - } |
| - s.logTable = client.Open(s.LogTable) |
| +func (w *rowWriter) flush(ctx context.Context, table btTable, index types.MessageIndex, path types.StreamPath) (int, error) { |
| + flushCount := w.count |
| + if flushCount == 0 { |
| + return 0, nil |
| } |
| - return s.logTable, nil |
| + |
| + // Write the current set of buffered rows to the table. Index on the LAST |
| + // row index. |
| + lastIndex := int64(index) + int64(flushCount) - 1 |
| + rk := newRowKey(string(path), lastIndex) |
| + |
| + log.Fields{ |
| + "rowKey": rk, |
| + "path": path, |
| + "index": index, |
| + "lastIndex": lastIndex, |
| + "count": w.count, |
| + "size": w.buf.Len(), |
| + }.Debugf(ctx, "Adding entries to BigTable.") |
| + if err := table.putLogData(ctx, rk, w.buf.Bytes()); err != nil { |
| + return 0, err |
| + } |
| + |
| + // Reset our buffer state. |
| + w.buf.Reset() |
| + w.count = 0 |
| + return flushCount, nil |
| } |