| OLD | NEW |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package bigtable | 5 package bigtable |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "bytes" | 8 "bytes" |
| 9 "errors" | 9 "errors" |
| 10 "fmt" | 10 "fmt" |
| 11 | 11 |
| 12 "github.com/luci/luci-go/common/config" |
| 12 "github.com/luci/luci-go/common/logdog/types" | 13 "github.com/luci/luci-go/common/logdog/types" |
| 13 log "github.com/luci/luci-go/common/logging" | 14 log "github.com/luci/luci-go/common/logging" |
| 14 "github.com/luci/luci-go/common/recordio" | 15 "github.com/luci/luci-go/common/recordio" |
| 15 "github.com/luci/luci-go/server/logdog/storage" | 16 "github.com/luci/luci-go/server/logdog/storage" |
| 16 "golang.org/x/net/context" | 17 "golang.org/x/net/context" |
| 17 "google.golang.org/cloud" | 18 "google.golang.org/cloud" |
| 18 "google.golang.org/cloud/bigtable" | 19 "google.golang.org/cloud/bigtable" |
| 19 ) | 20 ) |
| 20 | 21 |
| 21 var ( | 22 var ( |
| (...skipping 26 matching lines...) Expand all Loading... |
| 48 ) | 49 ) |
| 49 | 50 |
| 50 var ( | 51 var ( |
| 51 // errStop is an internal sentinel error used to indicate "stop iteratio
n" | 52 // errStop is an internal sentinel error used to indicate "stop iteratio
n" |
| 52 // to btTable.getLogData iterator. | 53 // to btTable.getLogData iterator. |
| 53 errStop = errors.New("bigtable: stop iteration") | 54 errStop = errors.New("bigtable: stop iteration") |
| 54 | 55 |
| 55 // errRepeatRequest is a transition sentinel that instructs us to repeat | 56 // errRepeatRequest is a transition sentinel that instructs us to repeat |
| 56 // our query with KeysOnly set to true. | 57 // our query with KeysOnly set to true. |
| 57 // | 58 // |
| 58 » // This can be deleted once BigTable rows without a count have been aged | 59 » // TODO(dnj): This can be deleted once BigTable rows without a count hav
e been |
| 59 » // off. | 60 » // aged off. |
| 60 errRepeatRequest = errors.New("bigtable: repeat request") | 61 errRepeatRequest = errors.New("bigtable: repeat request") |
| 61 ) | 62 ) |
| 62 | 63 |
| 63 // Options is a set of configuration options for BigTable storage. | 64 // Options is a set of configuration options for BigTable storage. |
| 64 type Options struct { | 65 type Options struct { |
| 65 // Project is the name of the project to connect to. | 66 // Project is the name of the project to connect to. |
| 66 Project string | 67 Project string |
| 67 // Zone is the name of the zone to connect to. | 68 // Zone is the name of the zone to connect to. |
| 68 Zone string | 69 Zone string |
| 69 // Cluster is the name of the cluster to connect to. | 70 // Cluster is the name of the cluster to connect to. |
| (...skipping 94 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 164 func (s *btStorage) Put(r storage.PutRequest) error { | 165 func (s *btStorage) Put(r storage.PutRequest) error { |
| 165 rw := rowWriter{ | 166 rw := rowWriter{ |
| 166 threshold: s.maxRowSize, | 167 threshold: s.maxRowSize, |
| 167 } | 168 } |
| 168 | 169 |
| 169 for len(r.Values) > 0 { | 170 for len(r.Values) > 0 { |
| 170 // Add the next entry to the writer. | 171 // Add the next entry to the writer. |
| 171 if appended := rw.append(r.Values[0]); !appended { | 172 if appended := rw.append(r.Values[0]); !appended { |
| 172 // We have failed to append our maximum BigTable row siz
e. Flush any | 173 // We have failed to append our maximum BigTable row siz
e. Flush any |
| 173 // currently-buffered row data and try again with an emp
ty buffer. | 174 // currently-buffered row data and try again with an emp
ty buffer. |
| 174 » » » count, err := rw.flush(s, s.raw, r.Index, r.Path) | 175 » » » count, err := rw.flush(s, s.raw, r.Index, r.Project, r.P
ath) |
| 175 if err != nil { | 176 if err != nil { |
| 176 return err | 177 return err |
| 177 } | 178 } |
| 178 | 179 |
| 179 if count == 0 { | 180 if count == 0 { |
| 180 // Nothing was buffered, but we still couldn't a
ppend an entry. The | 181 // Nothing was buffered, but we still couldn't a
ppend an entry. The |
| 181 // current entry is too large by itself, so we m
ust fail. | 182 // current entry is too large by itself, so we m
ust fail. |
| 182 return fmt.Errorf("single row entry exceeds maxi
mum size (%d > %d)", len(r.Values[0]), bigTableRowMaxBytes) | 183 return fmt.Errorf("single row entry exceeds maxi
mum size (%d > %d)", len(r.Values[0]), bigTableRowMaxBytes) |
| 183 } | 184 } |
| 184 | 185 |
| 185 r.Index += types.MessageIndex(count) | 186 r.Index += types.MessageIndex(count) |
| 186 continue | 187 continue |
| 187 } | 188 } |
| 188 | 189 |
| 189 // We successfully appended this entry, so advance. | 190 // We successfully appended this entry, so advance. |
| 190 r.Values = r.Values[1:] | 191 r.Values = r.Values[1:] |
| 191 } | 192 } |
| 192 | 193 |
| 193 // Flush any buffered rows. | 194 // Flush any buffered rows. |
| 194 » if _, err := rw.flush(s, s.raw, r.Index, r.Path); err != nil { | 195 » if _, err := rw.flush(s, s.raw, r.Index, r.Project, r.Path); err != nil
{ |
| 195 return err | 196 return err |
| 196 } | 197 } |
| 197 return nil | 198 return nil |
| 198 } | 199 } |
| 199 | 200 |
| 200 func (s *btStorage) Get(r storage.GetRequest, cb storage.GetCallback) error { | 201 func (s *btStorage) Get(r storage.GetRequest, cb storage.GetCallback) error { |
| 201 » startKey := newRowKey(string(r.Path), int64(r.Index), 0) | 202 » startKey := newRowKey(string(r.Project), string(r.Path), int64(r.Index),
0) |
| 202 ctx := log.SetFields(s, log.Fields{ | 203 ctx := log.SetFields(s, log.Fields{ |
| 204 "project": r.Project, |
| 203 "path": r.Path, | 205 "path": r.Path, |
| 204 "index": r.Index, | 206 "index": r.Index, |
| 205 "limit": r.Limit, | 207 "limit": r.Limit, |
| 206 "startRowKey": startKey, | 208 "startRowKey": startKey, |
| 207 "keysOnly": r.KeysOnly, | 209 "keysOnly": r.KeysOnly, |
| 208 }) | 210 }) |
| 209 | 211 |
| 210 // If we issue a query and get back a legacy row, it will have no count | 212 // If we issue a query and get back a legacy row, it will have no count |
| 211 // associated with it. We will fast-exit | 213 // associated with it. We will fast-exit |
| 212 | 214 |
| (...skipping 17 matching lines...) Expand all Loading... |
| 230 | 232 |
| 231 switch { | 233 switch { |
| 232 case r.KeysOnly: | 234 case r.KeysOnly: |
| 233 // Keys only query, so count is the authority. | 235 // Keys only query, so count is the authority. |
| 234 if rk.count == 0 { | 236 if rk.count == 0 { |
| 235 // If it's zero, we are dealing with a legacy ro
w, before we started | 237 // If it's zero, we are dealing with a legacy ro
w, before we started |
| 236 // keeping count. A keys-only query is insuffici
ent to get the | 238 // keeping count. A keys-only query is insuffici
ent to get the |
| 237 // inforamtion that we need, so repeat this Get
request with KeysOnly | 239 // inforamtion that we need, so repeat this Get
request with KeysOnly |
| 238 // set to false. | 240 // set to false. |
| 239 // | 241 // |
| 240 » » » » // NOTE: This logic can be removed once all unco
unted rows are aged off. | 242 » » » » // TODO(dnj): This logic can be removed once all
uncounted rows are aged |
| 243 » » » » // off. |
| 241 r.KeysOnly = false | 244 r.KeysOnly = false |
| 242 return errRepeatRequest | 245 return errRepeatRequest |
| 243 } | 246 } |
| 244 break | 247 break |
| 245 | 248 |
| 246 case rk.count == 0: | 249 case rk.count == 0: |
| 247 // This is a non-keys-only query, but we are dealing wit
h a legacy row. | 250 // This is a non-keys-only query, but we are dealing wit
h a legacy row. |
| 248 // Use the record count as the authority. | 251 // Use the record count as the authority. |
| 249 // | 252 // |
| 250 » » » // NOTE: This logic can be removed once all uncounted ro
ws are aged off. | 253 » » » // TODO(dnj): This logic can be removed once all uncount
ed rows are aged |
| 254 » » » // off. |
| 251 rk.count = int64(len(records)) | 255 rk.count = int64(len(records)) |
| 252 | 256 |
| 253 case rk.count == int64(len(records)): | 257 case rk.count == int64(len(records)): |
| 254 // This is the expected case, so we're set. | 258 // This is the expected case, so we're set. |
| 255 // | 259 // |
| 256 » » » // NOTE: Once uncounted rows are aged off, this is the o
nly logic that | 260 » » » // TODO(dnj): Once uncounted rows are aged off, this is
the only logic |
| 257 » » » // we need, in the form of a sanity assertion. | 261 » » » // that we need, in the form of a sanity assertion. |
| 258 break | 262 break |
| 259 | 263 |
| 260 default: | 264 default: |
| 261 log.Fields{ | 265 log.Fields{ |
| 262 "count": rk.count, | 266 "count": rk.count, |
| 263 "recordCount": len(records), | 267 "recordCount": len(records), |
| 264 }.Errorf(ctx, "Record count doesn't match declared count
.") | 268 }.Errorf(ctx, "Record count doesn't match declared count
.") |
| 265 return storage.ErrBadData | 269 return storage.ErrBadData |
| 266 } | 270 } |
| 267 | 271 |
| (...skipping 51 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 319 | 323 |
| 320 case errRepeatRequest: | 324 case errRepeatRequest: |
| 321 return s.Get(r, cb) | 325 return s.Get(r, cb) |
| 322 | 326 |
| 323 default: | 327 default: |
| 324 log.WithError(err).Errorf(ctx, "Failed to retrieve row range.") | 328 log.WithError(err).Errorf(ctx, "Failed to retrieve row range.") |
| 325 return err | 329 return err |
| 326 } | 330 } |
| 327 } | 331 } |
| 328 | 332 |
| 329 func (s *btStorage) Tail(p types.StreamPath) ([]byte, types.MessageIndex, error)
{ | 333 func (s *btStorage) Tail(project config.ProjectName, path types.StreamPath) ([]b
yte, types.MessageIndex, error) { |
| 330 ctx := log.SetFields(s, log.Fields{ | 334 ctx := log.SetFields(s, log.Fields{ |
| 331 » » "path": p, | 335 » » "project": project, |
| 336 » » "path": path, |
| 332 }) | 337 }) |
| 333 | 338 |
| 334 // Iterate through all log keys in the stream. Record the latest one. | 339 // Iterate through all log keys in the stream. Record the latest one. |
| 335 » rk := newRowKey(string(p), 0, 0) | 340 » rk := newRowKey(string(project), string(path), 0, 0) |
| 336 var latest *rowKey | 341 var latest *rowKey |
| 337 err := s.raw.getLogData(ctx, rk, 0, true, func(rk *rowKey, data []byte)
error { | 342 err := s.raw.getLogData(ctx, rk, 0, true, func(rk *rowKey, data []byte)
error { |
| 338 latest = rk | 343 latest = rk |
| 339 return nil | 344 return nil |
| 340 }) | 345 }) |
| 341 if err != nil { | 346 if err != nil { |
| 342 log.Fields{ | 347 log.Fields{ |
| 343 log.ErrorKey: err, | 348 log.ErrorKey: err, |
| 344 "project": s.Project, | 349 "project": s.Project, |
| 345 "zone": s.Zone, | 350 "zone": s.Zone, |
| (...skipping 59 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 405 } | 410 } |
| 406 | 411 |
| 407 // If we have exceeded our threshold, report a failure. | 412 // If we have exceeded our threshold, report a failure. |
| 408 appended = (w.buf.Len() <= w.threshold) | 413 appended = (w.buf.Len() <= w.threshold) |
| 409 if appended { | 414 if appended { |
| 410 w.count++ | 415 w.count++ |
| 411 } | 416 } |
| 412 return | 417 return |
| 413 } | 418 } |
| 414 | 419 |
| 415 func (w *rowWriter) flush(ctx context.Context, raw btTable, index types.MessageI
ndex, path types.StreamPath) (int, error) { | 420 func (w *rowWriter) flush(ctx context.Context, raw btTable, index types.MessageI
ndex, |
| 421 » project config.ProjectName, path types.StreamPath) (int, error) { |
| 416 flushCount := w.count | 422 flushCount := w.count |
| 417 if flushCount == 0 { | 423 if flushCount == 0 { |
| 418 return 0, nil | 424 return 0, nil |
| 419 } | 425 } |
| 420 | 426 |
| 421 // Write the current set of buffered rows to the table. Index on the LAS
T | 427 // Write the current set of buffered rows to the table. Index on the LAS
T |
| 422 // row index. | 428 // row index. |
| 423 lastIndex := int64(index) + int64(flushCount) - 1 | 429 lastIndex := int64(index) + int64(flushCount) - 1 |
| 424 » rk := newRowKey(string(path), lastIndex, int64(w.count)) | 430 » rk := newRowKey(string(project), string(path), lastIndex, int64(w.count)
) |
| 425 | 431 |
| 426 log.Fields{ | 432 log.Fields{ |
| 427 "rowKey": rk, | 433 "rowKey": rk, |
| 434 "project": project, |
| 428 "path": path, | 435 "path": path, |
| 429 "index": index, | 436 "index": index, |
| 430 "lastIndex": lastIndex, | 437 "lastIndex": lastIndex, |
| 431 "count": w.count, | 438 "count": w.count, |
| 432 "size": w.buf.Len(), | 439 "size": w.buf.Len(), |
| 433 }.Debugf(ctx, "Adding entries to BigTable.") | 440 }.Debugf(ctx, "Adding entries to BigTable.") |
| 434 if err := raw.putLogData(ctx, rk, w.buf.Bytes()); err != nil { | 441 if err := raw.putLogData(ctx, rk, w.buf.Bytes()); err != nil { |
| 435 return 0, err | 442 return 0, err |
| 436 } | 443 } |
| 437 | 444 |
| 438 // Reset our buffer state. | 445 // Reset our buffer state. |
| 439 w.buf.Reset() | 446 w.buf.Reset() |
| 440 w.count = 0 | 447 w.count = 0 |
| 441 return flushCount, nil | 448 return flushCount, nil |
| 442 } | 449 } |
| OLD | NEW |