| OLD | NEW |
| 1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package bigtable | 5 package bigtable |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "bytes" | 8 "bytes" |
| 9 "errors" | 9 "errors" |
| 10 "fmt" | 10 "fmt" |
| 11 | 11 |
| 12 "github.com/luci/luci-go/common/config" | |
| 13 "github.com/luci/luci-go/common/data/recordio" | 12 "github.com/luci/luci-go/common/data/recordio" |
| 14 log "github.com/luci/luci-go/common/logging" | 13 log "github.com/luci/luci-go/common/logging" |
| 15 "github.com/luci/luci-go/logdog/common/storage" | 14 "github.com/luci/luci-go/logdog/common/storage" |
| 16 "github.com/luci/luci-go/logdog/common/storage/caching" | 15 "github.com/luci/luci-go/logdog/common/storage/caching" |
| 17 "github.com/luci/luci-go/logdog/common/types" | 16 "github.com/luci/luci-go/logdog/common/types" |
| 17 "github.com/luci/luci-go/luci_config/common/cfgtypes" |
| 18 | 18 |
| 19 "cloud.google.com/go/bigtable" | 19 "cloud.google.com/go/bigtable" |
| 20 "golang.org/x/net/context" | 20 "golang.org/x/net/context" |
| 21 "google.golang.org/api/option" | 21 "google.golang.org/api/option" |
| 22 ) | 22 ) |
| 23 | 23 |
| 24 var ( | 24 var ( |
| 25 // StorageScopes is the set of OAuth scopes needed to use the storage | 25 // StorageScopes is the set of OAuth scopes needed to use the storage |
| 26 // functionality. | 26 // functionality. |
| 27 StorageScopes = []string{ | 27 StorageScopes = []string{ |
| (...skipping 267 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 295 switch err { | 295 switch err { |
| 296 case nil, errStop: | 296 case nil, errStop: |
| 297 return nil | 297 return nil |
| 298 | 298 |
| 299 default: | 299 default: |
| 300 log.WithError(err).Errorf(ctx, "Failed to retrieve row range.") | 300 log.WithError(err).Errorf(ctx, "Failed to retrieve row range.") |
| 301 return err | 301 return err |
| 302 } | 302 } |
| 303 } | 303 } |
| 304 | 304 |
| 305 func (s *btStorage) Tail(project config.ProjectName, path types.StreamPath) (*st
orage.Entry, error) { | 305 func (s *btStorage) Tail(project cfgtypes.ProjectName, path types.StreamPath) (*
storage.Entry, error) { |
| 306 ctx := log.SetFields(s, log.Fields{ | 306 ctx := log.SetFields(s, log.Fields{ |
| 307 "project": project, | 307 "project": project, |
| 308 "path": path, | 308 "path": path, |
| 309 }) | 309 }) |
| 310 | 310 |
| 311 // Load the "last tail index" from cache. If we have no cache, start at
0. | 311 // Load the "last tail index" from cache. If we have no cache, start at
0. |
| 312 var startIdx int64 | 312 var startIdx int64 |
| 313 if s.Cache != nil { | 313 if s.Cache != nil { |
| 314 startIdx = getLastTailIndex(s, s.Cache, project, path) | 314 startIdx = getLastTailIndex(s, s.Cache, project, path) |
| 315 } | 315 } |
| (...skipping 87 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 403 | 403 |
| 404 // If we have exceeded our threshold, report a failure. | 404 // If we have exceeded our threshold, report a failure. |
| 405 appended = (w.buf.Len() <= w.threshold) | 405 appended = (w.buf.Len() <= w.threshold) |
| 406 if appended { | 406 if appended { |
| 407 w.count++ | 407 w.count++ |
| 408 } | 408 } |
| 409 return | 409 return |
| 410 } | 410 } |
| 411 | 411 |
| 412 func (w *rowWriter) flush(ctx context.Context, raw btTable, index types.MessageI
ndex, | 412 func (w *rowWriter) flush(ctx context.Context, raw btTable, index types.MessageI
ndex, |
| 413 » project config.ProjectName, path types.StreamPath) (int, error) { | 413 » project cfgtypes.ProjectName, path types.StreamPath) (int, error) { |
| 414 flushCount := w.count | 414 flushCount := w.count |
| 415 if flushCount == 0 { | 415 if flushCount == 0 { |
| 416 return 0, nil | 416 return 0, nil |
| 417 } | 417 } |
| 418 | 418 |
| 419 // Write the current set of buffered rows to the table. Index on the LAS
T | 419 // Write the current set of buffered rows to the table. Index on the LAS
T |
| 420 // row index. | 420 // row index. |
| 421 lastIndex := int64(index) + int64(flushCount) - 1 | 421 lastIndex := int64(index) + int64(flushCount) - 1 |
| 422 rk := newRowKey(string(project), string(path), lastIndex, int64(w.count)
) | 422 rk := newRowKey(string(project), string(path), lastIndex, int64(w.count)
) |
| 423 | 423 |
| 424 log.Fields{ | 424 log.Fields{ |
| 425 "rowKey": rk, | 425 "rowKey": rk, |
| 426 "project": project, | 426 "project": project, |
| 427 "path": path, | 427 "path": path, |
| 428 "index": index, | 428 "index": index, |
| 429 "lastIndex": lastIndex, | 429 "lastIndex": lastIndex, |
| 430 "count": w.count, | 430 "count": w.count, |
| 431 "size": w.buf.Len(), | 431 "size": w.buf.Len(), |
| 432 }.Debugf(ctx, "Adding entries to BigTable.") | 432 }.Debugf(ctx, "Adding entries to BigTable.") |
| 433 if err := raw.putLogData(ctx, rk, w.buf.Bytes()); err != nil { | 433 if err := raw.putLogData(ctx, rk, w.buf.Bytes()); err != nil { |
| 434 return 0, err | 434 return 0, err |
| 435 } | 435 } |
| 436 | 436 |
| 437 // Reset our buffer state. | 437 // Reset our buffer state. |
| 438 w.buf.Reset() | 438 w.buf.Reset() |
| 439 w.count = 0 | 439 w.count = 0 |
| 440 return flushCount, nil | 440 return flushCount, nil |
| 441 } | 441 } |
| OLD | NEW |