Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(432)

Side by Side Diff: server/logdog/storage/bigtable/storage.go

Issue 1872903002: LogDog: Enable keys-only BigTable queries. (Closed) Base URL: https://github.com/luci/luci-go@logdog-archive-v2
Patch Set: Rebase Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package bigtable 5 package bigtable
6 6
7 import ( 7 import (
8 "bytes" 8 "bytes"
9 "errors" 9 "errors"
10 "fmt" 10 "fmt"
(...skipping 29 matching lines...) Expand all
40 // 40 //
41 // This is simply the maximum number of rows (limit). The actual number of 41 // This is simply the maximum number of rows (limit). The actual number of
42 // rows will be further constrained by tailRowMaxSize. 42 // rows will be further constrained by tailRowMaxSize.
43 tailRowCount = 128 43 tailRowCount = 128
44 44
45 // tailRowMaxSize is the maximum number of bytes of tail row data that w ill be 45 // tailRowMaxSize is the maximum number of bytes of tail row data that w ill be
46 // buffered during Tail row reading. 46 // buffered during Tail row reading.
47 tailRowMaxSize = 1024 * 1024 * 16 47 tailRowMaxSize = 1024 * 1024 * 16
48 ) 48 )
49 49
50 // errStop is an internal sentinel error used to indicate "stop iteration" 50 var (
51 // to btTable.getLogData iterator. It will 51 » // errStop is an internal sentinel error used to indicate "stop iteratio n"
52 var errStop = errors.New("bigtable: stop iteration") 52 » // to btTable.getLogData iterator.
53 » errStop = errors.New("bigtable: stop iteration")
54
55 » // errRepeatRequest is a transition sentinel that instructs us to repeat
56 » // our query with KeysOnly set to true.
57 » //
58 » // This can be deleted once BigTable rows without a count have been aged
59 » // off.
60 » errRepeatRequest = errors.New("bigtable: repeat request")
61 )
53 62
54 // Options is a set of configuration options for BigTable storage. 63 // Options is a set of configuration options for BigTable storage.
55 type Options struct { 64 type Options struct {
56 // Project is the name of the project to connect to. 65 // Project is the name of the project to connect to.
57 Project string 66 Project string
58 // Zone is the name of the zone to connect to. 67 // Zone is the name of the zone to connect to.
59 Zone string 68 Zone string
60 // Cluster is the name of the cluster to connect to. 69 // Cluster is the name of the cluster to connect to.
61 Cluster string 70 Cluster string
62 // ClientOptions are additional client options to use when instantiating the 71 // ClientOptions are additional client options to use when instantiating the
(...skipping 119 matching lines...) Expand 10 before | Expand all | Expand 10 after
182 } 191 }
183 192
184 // Flush any buffered rows. 193 // Flush any buffered rows.
185 if _, err := rw.flush(s, s.raw, r.Index, r.Path); err != nil { 194 if _, err := rw.flush(s, s.raw, r.Index, r.Path); err != nil {
186 return err 195 return err
187 } 196 }
188 return nil 197 return nil
189 } 198 }
190 199
191 func (s *btStorage) Get(r storage.GetRequest, cb storage.GetCallback) error { 200 func (s *btStorage) Get(r storage.GetRequest, cb storage.GetCallback) error {
192 » startKey := newRowKey(string(r.Path), int64(r.Index)) 201 » startKey := newRowKey(string(r.Path), int64(r.Index), 0)
193 ctx := log.SetFields(s, log.Fields{ 202 ctx := log.SetFields(s, log.Fields{
194 "path": r.Path, 203 "path": r.Path,
195 "index": r.Index, 204 "index": r.Index,
205 "limit": r.Limit,
196 "startRowKey": startKey, 206 "startRowKey": startKey,
207 "keysOnly": r.KeysOnly,
197 }) 208 })
198 209
210 // If we issue a query and get back a legacy row, it will have no count
211 // associated with it. We will fast-exit
212
199 limit := r.Limit 213 limit := r.Limit
200 » err := s.raw.getLogData(ctx, startKey, r.Limit, false, func(rk *rowKey, data []byte) error { 214 » err := s.raw.getLogData(ctx, startKey, r.Limit, r.KeysOnly, func(rk *row Key, data []byte) error {
201 // Does this key match our requested log stream? If not, we've m oved past 215 // Does this key match our requested log stream? If not, we've m oved past
202 // this stream's records and must stop iteration. 216 // this stream's records and must stop iteration.
203 if !rk.sharesPathWith(startKey) { 217 if !rk.sharesPathWith(startKey) {
204 return errStop 218 return errStop
205 } 219 }
206 220
207 » » // We have a row. Split it into individual records. 221 » » // Split our data into records. Leave the records slice nil if w e're doing
208 » » records, err := recordio.Split(data) 222 » » // a keys-only get.
209 » » if err != nil { 223 » » var records [][]byte
224 » » if !r.KeysOnly {
225 » » » var err error
226 » » » if records, err = recordio.Split(data); err != nil {
227 » » » » return storage.ErrBadData
228 » » » }
229 » » }
230
231 » » switch {
232 » » case r.KeysOnly:
233 » » » // Keys only query, so count is the authority.
234 » » » if rk.count == 0 {
235 » » » » // If it's zero, we are dealing with a legacy ro w, before we started
236 » » » » // keeping count. A keys-only query is insuffici ent to get the
237 » » » » // inforamtion that we need, so repeat this Get request with KeysOnly
238 » » » » // set to false.
239 » » » » //
240 » » » » // NOTE: This logic can be removed once all unco unted rows are aged off.
241 » » » » r.KeysOnly = false
242 » » » » return errRepeatRequest
243 » » » }
244 » » » break
245
246 » » case rk.count == 0:
247 » » » // This is a non-keys-only query, but we are dealing wit h a legacy row.
248 » » » // Use the record count as the authority.
249 » » » //
250 » » » // NOTE: This logic can be removed once all uncounted ro ws are aged off.
251 » » » rk.count = int64(len(records))
252
253 » » case rk.count == int64(len(records)):
254 » » » // This is the expected case, so we're set.
255 » » » //
256 » » » // NOTE: Once uncounted rows are aged off, this is the o nly logic that
257 » » » // we need, in the form of a sanity assertion.
258 » » » break
259
260 » » default:
261 » » » log.Fields{
262 » » » » "count": rk.count,
263 » » » » "recordCount": len(records),
264 » » » }.Errorf(ctx, "Record count doesn't match declared count .")
210 return storage.ErrBadData 265 return storage.ErrBadData
211 } 266 }
212 267
213 // Issue our callback for each row. Since we index the row on th e LAST entry 268 // Issue our callback for each row. Since we index the row on th e LAST entry
214 // in the row, count backwards to get the index of the first ent ry. 269 // in the row, count backwards to get the index of the first ent ry.
215 » » firstIndex := types.MessageIndex(rk.index - int64(len(records)) + 1) 270 » » startIndex := rk.index - rk.count + 1
216 » » if firstIndex < 0 { 271 » » if startIndex < 0 {
217 return storage.ErrBadData 272 return storage.ErrBadData
218 } 273 }
219 » » for i, row := range records { 274
220 » » » index := firstIndex + types.MessageIndex(i) 275 » » // If we are indexed somewhere within this entry's records, disc ard any
221 » » » if index < r.Index { 276 » » // records before our index.
222 » » » » // An offset was specified, and this row is befo re it, so skip. 277 » » if discard := int64(r.Index) - startIndex; discard > 0 {
223 » » » » continue 278 » » » if discard > int64(len(records)) {
279 » » » » // This should never happen unless there is corr upt or conflicting data.
280 » » » » return nil
281 » » » }
282 » » » startIndex += discard
283 » » » records = records[discard:]
284 » » }
285
286 » » log.Fields{
287 » » » "rk": rk.encode(),
288 » » » "rkIndex": rk.index,
289 » » » "rkCount": rk.count,
290 » » » "startIndex": startIndex,
291 » » }.Debugf(ctx, "Punting row key range [%d - %d]...", startIndex, rk.index)
292
293 » » for index := startIndex; index <= rk.index; index++ {
294 » » » // If we're not doing keys-only, consume the row.
295 » » » var row []byte
296 » » » if !r.KeysOnly {
297 » » » » row, records = records[0], records[1:]
224 } 298 }
225 299
226 » » » if !cb(index, row) { 300 » » » if !cb(types.MessageIndex(index), row) {
227 return errStop 301 return errStop
228 } 302 }
303 r.Index = types.MessageIndex(index + 1)
229 304
230 // Artificially apply limit within our row records. 305 // Artificially apply limit within our row records.
231 if limit > 0 { 306 if limit > 0 {
232 limit-- 307 limit--
233 if limit == 0 { 308 if limit == 0 {
234 return errStop 309 return errStop
235 } 310 }
236 } 311 }
237 } 312 }
238 return nil 313 return nil
239 }) 314 })
240 315
241 switch err { 316 switch err {
242 case nil, errStop: 317 case nil, errStop:
243 return nil 318 return nil
244 319
320 case errRepeatRequest:
321 return s.Get(r, cb)
322
245 default: 323 default:
246 log.WithError(err).Errorf(ctx, "Failed to retrieve row range.") 324 log.WithError(err).Errorf(ctx, "Failed to retrieve row range.")
247 return err 325 return err
248 } 326 }
249 } 327 }
250 328
251 func (s *btStorage) Tail(p types.StreamPath) ([]byte, types.MessageIndex, error) { 329 func (s *btStorage) Tail(p types.StreamPath) ([]byte, types.MessageIndex, error) {
252 ctx := log.SetFields(s, log.Fields{ 330 ctx := log.SetFields(s, log.Fields{
253 "path": p, 331 "path": p,
254 }) 332 })
255 333
256 // Iterate through all log keys in the stream. Record the latest one. 334 // Iterate through all log keys in the stream. Record the latest one.
257 » rk := newRowKey(string(p), 0) 335 » rk := newRowKey(string(p), 0, 0)
258 var latest *rowKey 336 var latest *rowKey
259 err := s.raw.getLogData(ctx, rk, 0, true, func(rk *rowKey, data []byte) error { 337 err := s.raw.getLogData(ctx, rk, 0, true, func(rk *rowKey, data []byte) error {
260 latest = rk 338 latest = rk
261 return nil 339 return nil
262 }) 340 })
263 if err != nil { 341 if err != nil {
264 log.Fields{ 342 log.Fields{
265 log.ErrorKey: err, 343 log.ErrorKey: err,
266 "project": s.Project, 344 "project": s.Project,
267 "zone": s.Zone, 345 "zone": s.Zone,
(...skipping 68 matching lines...) Expand 10 before | Expand all | Expand 10 after
336 414
337 func (w *rowWriter) flush(ctx context.Context, raw btTable, index types.MessageI ndex, path types.StreamPath) (int, error) { 415 func (w *rowWriter) flush(ctx context.Context, raw btTable, index types.MessageI ndex, path types.StreamPath) (int, error) {
338 flushCount := w.count 416 flushCount := w.count
339 if flushCount == 0 { 417 if flushCount == 0 {
340 return 0, nil 418 return 0, nil
341 } 419 }
342 420
343 // Write the current set of buffered rows to the table. Index on the LAS T 421 // Write the current set of buffered rows to the table. Index on the LAS T
344 // row index. 422 // row index.
345 lastIndex := int64(index) + int64(flushCount) - 1 423 lastIndex := int64(index) + int64(flushCount) - 1
346 » rk := newRowKey(string(path), lastIndex) 424 » rk := newRowKey(string(path), lastIndex, int64(w.count))
347 425
348 log.Fields{ 426 log.Fields{
349 "rowKey": rk, 427 "rowKey": rk,
350 "path": path, 428 "path": path,
351 "index": index, 429 "index": index,
352 "lastIndex": lastIndex, 430 "lastIndex": lastIndex,
353 "count": w.count, 431 "count": w.count,
354 "size": w.buf.Len(), 432 "size": w.buf.Len(),
355 }.Debugf(ctx, "Adding entries to BigTable.") 433 }.Debugf(ctx, "Adding entries to BigTable.")
356 if err := raw.putLogData(ctx, rk, w.buf.Bytes()); err != nil { 434 if err := raw.putLogData(ctx, rk, w.buf.Bytes()); err != nil {
357 return 0, err 435 return 0, err
358 } 436 }
359 437
360 // Reset our buffer state. 438 // Reset our buffer state.
361 w.buf.Reset() 439 w.buf.Reset()
362 w.count = 0 440 w.count = 0
363 return flushCount, nil 441 return flushCount, nil
364 } 442 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698