| OLD | NEW |
| 1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package bigtable | 5 package bigtable |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "bytes" | 8 "bytes" |
| 9 "errors" | 9 "errors" |
| 10 "fmt" | 10 "fmt" |
| (...skipping 393 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 404 // If we have exceeded our threshold, report a failure. | 404 // If we have exceeded our threshold, report a failure. |
| 405 appended = (w.buf.Len() <= w.threshold) | 405 appended = (w.buf.Len() <= w.threshold) |
| 406 if appended { | 406 if appended { |
| 407 w.count++ | 407 w.count++ |
| 408 } | 408 } |
| 409 return | 409 return |
| 410 } | 410 } |
| 411 | 411 |
| 412 func (w *rowWriter) flush(ctx context.Context, raw btTable, index types.MessageI
ndex, | 412 func (w *rowWriter) flush(ctx context.Context, raw btTable, index types.MessageI
ndex, |
| 413 project cfgtypes.ProjectName, path types.StreamPath) (int, error) { | 413 project cfgtypes.ProjectName, path types.StreamPath) (int, error) { |
| 414 |
| 414 flushCount := w.count | 415 flushCount := w.count |
| 415 if flushCount == 0 { | 416 if flushCount == 0 { |
| 416 return 0, nil | 417 return 0, nil |
| 417 } | 418 } |
| 418 | 419 |
| 419 // Write the current set of buffered rows to the table. Index on the LAS
T | 420 // Write the current set of buffered rows to the table. Index on the LAS
T |
| 420 // row index. | 421 // row index. |
| 421 lastIndex := int64(index) + int64(flushCount) - 1 | 422 lastIndex := int64(index) + int64(flushCount) - 1 |
| 422 rk := newRowKey(string(project), string(path), lastIndex, int64(w.count)
) | 423 rk := newRowKey(string(project), string(path), lastIndex, int64(w.count)
) |
| 423 | 424 |
| 424 log.Fields{ | 425 log.Fields{ |
| 425 "rowKey": rk, | 426 "rowKey": rk, |
| 426 "project": project, | 427 "project": project, |
| 427 "path": path, | 428 "path": path, |
| 428 "index": index, | 429 "index": index, |
| 429 "lastIndex": lastIndex, | 430 "lastIndex": lastIndex, |
| 430 "count": w.count, | 431 "count": w.count, |
| 431 "size": w.buf.Len(), | 432 "size": w.buf.Len(), |
| 432 }.Debugf(ctx, "Adding entries to BigTable.") | 433 }.Debugf(ctx, "Adding entries to BigTable.") |
| 433 if err := raw.putLogData(ctx, rk, w.buf.Bytes()); err != nil { | 434 if err := raw.putLogData(ctx, rk, w.buf.Bytes()); err != nil { |
| 434 return 0, err | 435 return 0, err |
| 435 } | 436 } |
| 436 | 437 |
| 437 // Reset our buffer state. | 438 // Reset our buffer state. |
| 438 w.buf.Reset() | 439 w.buf.Reset() |
| 439 w.count = 0 | 440 w.count = 0 |
| 440 return flushCount, nil | 441 return flushCount, nil |
| 441 } | 442 } |
| OLD | NEW |