| OLD | NEW |
| 1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package bigtable | 5 package bigtable |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "bytes" | 8 "bytes" |
| 9 "errors" | 9 "errors" |
| 10 "fmt" | 10 "fmt" |
| 11 | 11 |
| 12 "github.com/luci/luci-go/common/config" | 12 "github.com/luci/luci-go/common/config" |
| 13 "github.com/luci/luci-go/common/data/recordio" | 13 "github.com/luci/luci-go/common/data/recordio" |
| 14 log "github.com/luci/luci-go/common/logging" | 14 log "github.com/luci/luci-go/common/logging" |
| 15 "github.com/luci/luci-go/logdog/common/storage" | 15 "github.com/luci/luci-go/logdog/common/storage" |
| 16 "github.com/luci/luci-go/logdog/common/storage/caching" |
| 16 "github.com/luci/luci-go/logdog/common/types" | 17 "github.com/luci/luci-go/logdog/common/types" |
| 17 | 18 |
| 18 "cloud.google.com/go/bigtable" | 19 "cloud.google.com/go/bigtable" |
| 19 "golang.org/x/net/context" | 20 "golang.org/x/net/context" |
| 20 "google.golang.org/api/option" | 21 "google.golang.org/api/option" |
| 21 ) | 22 ) |
| 22 | 23 |
| 23 var ( | 24 var ( |
| 24 // StorageScopes is the set of OAuth scopes needed to use the storage | 25 // StorageScopes is the set of OAuth scopes needed to use the storage |
| 25 // functionality. | 26 // functionality. |
| (...skipping 34 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 60 // Project is the name of the project to connect to. | 61 // Project is the name of the project to connect to. |
| 61 Project string | 62 Project string |
| 62 // Instance is the name of the instance to connect to. | 63 // Instance is the name of the instance to connect to. |
| 63 Instance string | 64 Instance string |
| 64 // ClientOptions are additional client options to use when instantiating
the | 65 // ClientOptions are additional client options to use when instantiating
the |
| 65 // client instance. | 66 // client instance. |
| 66 ClientOptions []option.ClientOption | 67 ClientOptions []option.ClientOption |
| 67 | 68 |
| 68 // Table is the name of the BigTable table to use for logs. | 69 // Table is the name of the BigTable table to use for logs. |
| 69 LogTable string | 70 LogTable string |
| 71 |
| 72 // Cache, if not nil, will be used to cache data. |
| 73 Cache caching.Cache |
| 70 } | 74 } |
| 71 | 75 |
| 72 func (o *Options) client(ctx context.Context) (*bigtable.Client, error) { | 76 func (o *Options) client(ctx context.Context) (*bigtable.Client, error) { |
| 73 return bigtable.NewClient(ctx, o.Project, o.Instance, o.ClientOptions...
) | 77 return bigtable.NewClient(ctx, o.Project, o.Instance, o.ClientOptions...
) |
| 74 } | 78 } |
| 75 | 79 |
| 76 func (o *Options) adminClient(ctx context.Context) (*bigtable.AdminClient, error
) { | 80 func (o *Options) adminClient(ctx context.Context) (*bigtable.AdminClient, error
) { |
| 77 return bigtable.NewAdminClient(ctx, o.Project, o.Instance, o.ClientOptio
ns...) | 81 return bigtable.NewAdminClient(ctx, o.Project, o.Instance, o.ClientOptio
ns...) |
| 78 } | 82 } |
| 79 | 83 |
| 80 // btStorage is a storage.Storage implementation that uses Google Cloud BigTable | 84 // btStorage is a storage.Storage implementation that uses Google Cloud BigTable |
| 81 // as a backend. | 85 // as a backend. |
| 82 type btStorage struct { | 86 type btStorage struct { |
| 83 *Options | 87 *Options |
| 84 | 88 |
| 85 // Context is the bound supplied with New. It is retained (rather than | 89 // Context is the bound supplied with New. It is retained (rather than |
| 86 // supplied on a per-call basis) because a special Storage Context devoi
d of | 90 // supplied on a per-call basis) because a special Storage Context devoi
d of |
| 87 // gRPC metadata is needed for Storage calls. | 91 // gRPC metadata is needed for Storage calls. |
| 88 context.Context | 92 context.Context |
| 89 | 93 |
| 90 client *bigtable.Client | 94 client *bigtable.Client |
| 91 logTable *bigtable.Table | 95 logTable *bigtable.Table |
| 92 adminClient *bigtable.AdminClient | 96 adminClient *bigtable.AdminClient |
| 93 | 97 |
| 94 » // raw is the underlying btTable instance to use for raw operations. | 98 » // raw, if not nil, is the raw BigTable interface to use. This is useful
for |
| 99 » // testing. If nil, this will default to the production isntance. |
| 95 raw btTable | 100 raw btTable |
| 101 |
| 96 // maxRowSize is the maxmium number of bytes that can be stored in a sin
gle | 102 // maxRowSize is the maxmium number of bytes that can be stored in a sin
gle |
| 97 // BigTable row. This is a function of BigTable, and constant in product
ion | 103 // BigTable row. This is a function of BigTable, and constant in product
ion |
| 98 // (bigTableRowMaxBytes), but variable here to allow for testing to cont
rol. | 104 // (bigTableRowMaxBytes), but variable here to allow for testing to cont
rol. |
| 99 maxRowSize int | 105 maxRowSize int |
| 100 } | 106 } |
| 101 | 107 |
| 102 // New instantiates a new Storage instance connected to a BigTable instance. | 108 // New instantiates a new Storage instance connected to a BigTable instance. |
| 103 // | 109 // |
| 104 // The returned Storage instance will close the Client when its Close() method | 110 // The returned Storage instance will close the Client when its Close() method |
| 105 // is called. | 111 // is called. |
| 106 func New(ctx context.Context, o Options) (storage.Storage, error) { | 112 func New(ctx context.Context, o Options) (storage.Storage, error) { |
| 107 client, err := o.client(ctx) | 113 client, err := o.client(ctx) |
| 108 if err != nil { | 114 if err != nil { |
| 109 return nil, err | 115 return nil, err |
| 110 } | 116 } |
| 111 | 117 |
| 112 admin, err := o.adminClient(ctx) | 118 admin, err := o.adminClient(ctx) |
| 113 if err != nil { | 119 if err != nil { |
| 114 return nil, err | 120 return nil, err |
| 115 } | 121 } |
| 116 | 122 |
| 117 » return newBTStorage(ctx, o, client, admin), nil | 123 » return newBTStorage(ctx, o, client, admin, nil), nil |
| 118 } | 124 } |
| 119 | 125 |
| 120 func newBTStorage(ctx context.Context, o Options, client *bigtable.Client, admin
Client *bigtable.AdminClient) *btStorage { | 126 func newBTStorage(ctx context.Context, o Options, client *bigtable.Client, admin
Client *bigtable.AdminClient, raw btTable) *btStorage { |
| 121 s := &btStorage{ | 127 s := &btStorage{ |
| 122 Options: &o, | 128 Options: &o, |
| 123 Context: ctx, | 129 Context: ctx, |
| 124 | 130 |
| 125 client: client, | 131 client: client, |
| 126 adminClient: adminClient, | 132 adminClient: adminClient, |
| 133 raw: raw, |
| 127 maxRowSize: bigTableRowMaxBytes, | 134 maxRowSize: bigTableRowMaxBytes, |
| 128 } | 135 } |
| 129 if s.client != nil { | 136 if s.client != nil { |
| 130 s.logTable = s.client.Open(o.LogTable) | 137 s.logTable = s.client.Open(o.LogTable) |
| 131 } | 138 } |
| 132 » s.raw = &btTableProd{s} | 139 » if s.raw == nil { |
| 140 » » s.raw = &btTableProd{s} |
| 141 » } |
| 133 return s | 142 return s |
| 134 } | 143 } |
| 135 | 144 |
| 136 func (s *btStorage) Close() { | 145 func (s *btStorage) Close() { |
| 137 if s.client != nil { | 146 if s.client != nil { |
| 138 s.client.Close() | 147 s.client.Close() |
| 139 s.client = nil | 148 s.client = nil |
| 140 } | 149 } |
| 141 | 150 |
| 142 if s.adminClient != nil { | 151 if s.adminClient != nil { |
| (...skipping 149 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 292 return err | 301 return err |
| 293 } | 302 } |
| 294 } | 303 } |
| 295 | 304 |
| 296 func (s *btStorage) Tail(project config.ProjectName, path types.StreamPath) (*st
orage.Entry, error) { | 305 func (s *btStorage) Tail(project config.ProjectName, path types.StreamPath) (*st
orage.Entry, error) { |
| 297 ctx := log.SetFields(s, log.Fields{ | 306 ctx := log.SetFields(s, log.Fields{ |
| 298 "project": project, | 307 "project": project, |
| 299 "path": path, | 308 "path": path, |
| 300 }) | 309 }) |
| 301 | 310 |
| 311 // Load the "last tail index" from cache. If we have no cache, start at
0. |
| 312 var startIdx int64 |
| 313 if s.Cache != nil { |
| 314 startIdx = getLastTailIndex(s, s.Cache, project, path) |
| 315 } |
| 316 |
| 302 // Iterate through all log keys in the stream. Record the latest one. | 317 // Iterate through all log keys in the stream. Record the latest one. |
| 303 » rk := newRowKey(string(project), string(path), 0, 0) | 318 » rk := newRowKey(string(project), string(path), startIdx, 0) |
| 304 var latest *rowKey | 319 var latest *rowKey |
| 305 err := s.raw.getLogData(ctx, rk, 0, true, func(rk *rowKey, data []byte)
error { | 320 err := s.raw.getLogData(ctx, rk, 0, true, func(rk *rowKey, data []byte)
error { |
| 306 latest = rk | 321 latest = rk |
| 307 return nil | 322 return nil |
| 308 }) | 323 }) |
| 309 if err != nil { | 324 if err != nil { |
| 310 log.Fields{ | 325 log.Fields{ |
| 311 log.ErrorKey: err, | 326 log.ErrorKey: err, |
| 312 "project": s.Project, | 327 "project": s.Project, |
| 313 "instance": s.Instance, | 328 "instance": s.Instance, |
| 314 "table": s.LogTable, | 329 "table": s.LogTable, |
| 315 }.Errorf(ctx, "Failed to scan for tail.") | 330 }.Errorf(ctx, "Failed to scan for tail.") |
| 316 } | 331 } |
| 317 | 332 |
| 318 if latest == nil { | 333 if latest == nil { |
| 319 // No rows for the specified stream. | 334 // No rows for the specified stream. |
| 320 return nil, storage.ErrDoesNotExist | 335 return nil, storage.ErrDoesNotExist |
| 321 } | 336 } |
| 322 | 337 |
| 338 // Update our cache if the tail index has changed. |
| 339 if s.Cache != nil && startIdx != latest.index { |
| 340 putLastTailIndex(s, s.Cache, project, path, latest.index) |
| 341 } |
| 342 |
| 323 // Fetch the latest row's data. | 343 // Fetch the latest row's data. |
| 324 var d []byte | 344 var d []byte |
| 325 err = s.raw.getLogData(ctx, latest, 1, false, func(rk *rowKey, data []by
te) error { | 345 err = s.raw.getLogData(ctx, latest, 1, false, func(rk *rowKey, data []by
te) error { |
| 326 records, err := recordio.Split(data) | 346 records, err := recordio.Split(data) |
| 327 if err != nil || len(records) == 0 { | 347 if err != nil || len(records) == 0 { |
| 328 return storage.ErrBadData | 348 return storage.ErrBadData |
| 329 } | 349 } |
| 330 d = records[len(records)-1] | 350 d = records[len(records)-1] |
| 331 return errStop | 351 return errStop |
| 332 }) | 352 }) |
| (...skipping 68 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 401 }.Debugf(ctx, "Adding entries to BigTable.") | 421 }.Debugf(ctx, "Adding entries to BigTable.") |
| 402 if err := raw.putLogData(ctx, rk, w.buf.Bytes()); err != nil { | 422 if err := raw.putLogData(ctx, rk, w.buf.Bytes()); err != nil { |
| 403 return 0, err | 423 return 0, err |
| 404 } | 424 } |
| 405 | 425 |
| 406 // Reset our buffer state. | 426 // Reset our buffer state. |
| 407 w.buf.Reset() | 427 w.buf.Reset() |
| 408 w.count = 0 | 428 w.count = 0 |
| 409 return flushCount, nil | 429 return flushCount, nil |
| 410 } | 430 } |
| OLD | NEW |