| OLD | NEW |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package bigtable | 5 package bigtable |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "bytes" | 8 "bytes" |
| 9 "errors" | 9 "errors" |
| 10 "fmt" | 10 "fmt" |
| (...skipping 29 matching lines...) Expand all Loading... |
| 40 // | 40 // |
| 41 // This is simply the maximum number of rows (limit). The actual number
of | 41 // This is simply the maximum number of rows (limit). The actual number
of |
| 42 // rows will be further constrained by tailRowMaxSize. | 42 // rows will be further constrained by tailRowMaxSize. |
| 43 tailRowCount = 128 | 43 tailRowCount = 128 |
| 44 | 44 |
| 45 // tailRowMaxSize is the maximum number of bytes of tail row data that w
ill be | 45 // tailRowMaxSize is the maximum number of bytes of tail row data that w
ill be |
| 46 // buffered during Tail row reading. | 46 // buffered during Tail row reading. |
| 47 tailRowMaxSize = 1024 * 1024 * 16 | 47 tailRowMaxSize = 1024 * 1024 * 16 |
| 48 ) | 48 ) |
| 49 | 49 |
| 50 // errStop is an internal sentinel error used to indicate "stop iteration" | 50 var ( |
| 51 // to btTable.getLogData iterator. It will | 51 » // errStop is an internal sentinel error used to indicate "stop iteratio
n" |
| 52 var errStop = errors.New("bigtable: stop iteration") | 52 » // to btTable.getLogData iterator. |
| 53 » errStop = errors.New("bigtable: stop iteration") |
| 54 |
| 55 » // errRepeatRequest is a transition sentinel that instructs us to repeat |
| 56 » // our query with KeysOnly set to true. |
| 57 » // |
| 58 » // This can be deleted once BigTable rows without a count have been aged |
| 59 » // off. |
| 60 » errRepeatRequest = errors.New("bigtable: repeat request") |
| 61 ) |
| 53 | 62 |
| 54 // Options is a set of configuration options for BigTable storage. | 63 // Options is a set of configuration options for BigTable storage. |
| 55 type Options struct { | 64 type Options struct { |
| 56 // Project is the name of the project to connect to. | 65 // Project is the name of the project to connect to. |
| 57 Project string | 66 Project string |
| 58 // Zone is the name of the zone to connect to. | 67 // Zone is the name of the zone to connect to. |
| 59 Zone string | 68 Zone string |
| 60 // Cluster is the name of the cluster to connect to. | 69 // Cluster is the name of the cluster to connect to. |
| 61 Cluster string | 70 Cluster string |
| 62 // ClientOptions are additional client options to use when instantiating
the | 71 // ClientOptions are additional client options to use when instantiating
the |
| (...skipping 119 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 182 } | 191 } |
| 183 | 192 |
| 184 // Flush any buffered rows. | 193 // Flush any buffered rows. |
| 185 if _, err := rw.flush(s, s.raw, r.Index, r.Path); err != nil { | 194 if _, err := rw.flush(s, s.raw, r.Index, r.Path); err != nil { |
| 186 return err | 195 return err |
| 187 } | 196 } |
| 188 return nil | 197 return nil |
| 189 } | 198 } |
| 190 | 199 |
| 191 func (s *btStorage) Get(r storage.GetRequest, cb storage.GetCallback) error { | 200 func (s *btStorage) Get(r storage.GetRequest, cb storage.GetCallback) error { |
| 192 » startKey := newRowKey(string(r.Path), int64(r.Index)) | 201 » startKey := newRowKey(string(r.Path), int64(r.Index), 0) |
| 193 ctx := log.SetFields(s, log.Fields{ | 202 ctx := log.SetFields(s, log.Fields{ |
| 194 "path": r.Path, | 203 "path": r.Path, |
| 195 "index": r.Index, | 204 "index": r.Index, |
| 205 "limit": r.Limit, |
| 196 "startRowKey": startKey, | 206 "startRowKey": startKey, |
| 207 "keysOnly": r.KeysOnly, |
| 197 }) | 208 }) |
| 198 | 209 |
| 210 // If we issue a query and get back a legacy row, it will have no count |
| 211 // associated with it. We will fast-exit |
| 212 |
| 199 limit := r.Limit | 213 limit := r.Limit |
| 200 » err := s.raw.getLogData(ctx, startKey, r.Limit, false, func(rk *rowKey,
data []byte) error { | 214 » err := s.raw.getLogData(ctx, startKey, r.Limit, r.KeysOnly, func(rk *row
Key, data []byte) error { |
| 201 // Does this key match our requested log stream? If not, we've m
oved past | 215 // Does this key match our requested log stream? If not, we've m
oved past |
| 202 // this stream's records and must stop iteration. | 216 // this stream's records and must stop iteration. |
| 203 if !rk.sharesPathWith(startKey) { | 217 if !rk.sharesPathWith(startKey) { |
| 204 return errStop | 218 return errStop |
| 205 } | 219 } |
| 206 | 220 |
| 207 » » // We have a row. Split it into individual records. | 221 » » // Split our data into records. Leave the records slice nil if w
e're doing |
| 208 » » records, err := recordio.Split(data) | 222 » » // a keys-only get. |
| 209 » » if err != nil { | 223 » » var records [][]byte |
| 224 » » if !r.KeysOnly { |
| 225 » » » var err error |
| 226 » » » if records, err = recordio.Split(data); err != nil { |
| 227 » » » » return storage.ErrBadData |
| 228 » » » } |
| 229 » » } |
| 230 |
| 231 » » switch { |
| 232 » » case r.KeysOnly: |
| 233 » » » // Keys only query, so count is the authority. |
| 234 » » » if rk.count == 0 { |
| 235 » » » » // If it's zero, we are dealing with a legacy ro
w, before we started |
| 236 » » » » // keeping count. A keys-only query is insuffici
ent to get the |
| 237 » » » » // inforamtion that we need, so repeat this Get
request with KeysOnly |
| 238 » » » » // set to false. |
| 239 » » » » // |
| 240 » » » » // NOTE: This logic can be removed once all unco
unted rows are aged off. |
| 241 » » » » r.KeysOnly = false |
| 242 » » » » return errRepeatRequest |
| 243 » » » } |
| 244 » » » break |
| 245 |
| 246 » » case rk.count == 0: |
| 247 » » » // This is a non-keys-only query, but we are dealing wit
h a legacy row. |
| 248 » » » // Use the record count as the authority. |
| 249 » » » // |
| 250 » » » // NOTE: This logic can be removed once all uncounted ro
ws are aged off. |
| 251 » » » rk.count = int64(len(records)) |
| 252 |
| 253 » » case rk.count == int64(len(records)): |
| 254 » » » // This is the expected case, so we're set. |
| 255 » » » // |
| 256 » » » // NOTE: Once uncounted rows are aged off, this is the o
nly logic that |
| 257 » » » // we need, in the form of a sanity assertion. |
| 258 » » » break |
| 259 |
| 260 » » default: |
| 261 » » » log.Fields{ |
| 262 » » » » "count": rk.count, |
| 263 » » » » "recordCount": len(records), |
| 264 » » » }.Errorf(ctx, "Record count doesn't match declared count
.") |
| 210 return storage.ErrBadData | 265 return storage.ErrBadData |
| 211 } | 266 } |
| 212 | 267 |
| 213 // Issue our callback for each row. Since we index the row on th
e LAST entry | 268 // Issue our callback for each row. Since we index the row on th
e LAST entry |
| 214 // in the row, count backwards to get the index of the first ent
ry. | 269 // in the row, count backwards to get the index of the first ent
ry. |
| 215 » » firstIndex := types.MessageIndex(rk.index - int64(len(records))
+ 1) | 270 » » startIndex := rk.index - rk.count + 1 |
| 216 » » if firstIndex < 0 { | 271 » » if startIndex < 0 { |
| 217 return storage.ErrBadData | 272 return storage.ErrBadData |
| 218 } | 273 } |
| 219 » » for i, row := range records { | 274 |
| 220 » » » index := firstIndex + types.MessageIndex(i) | 275 » » // If we are indexed somewhere within this entry's records, disc
ard any |
| 221 » » » if index < r.Index { | 276 » » // records before our index. |
| 222 » » » » // An offset was specified, and this row is befo
re it, so skip. | 277 » » if discard := int64(r.Index) - startIndex; discard > 0 { |
| 223 » » » » continue | 278 » » » if discard > int64(len(records)) { |
| 279 » » » » // This should never happen unless there is corr
upt or conflicting data. |
| 280 » » » » return nil |
| 281 » » » } |
| 282 » » » startIndex += discard |
| 283 » » » records = records[discard:] |
| 284 » » } |
| 285 |
| 286 » » log.Fields{ |
| 287 » » » "rk": rk.encode(), |
| 288 » » » "rkIndex": rk.index, |
| 289 » » » "rkCount": rk.count, |
| 290 » » » "startIndex": startIndex, |
| 291 » » }.Debugf(ctx, "Punting row key range [%d - %d]...", startIndex,
rk.index) |
| 292 |
| 293 » » for index := startIndex; index <= rk.index; index++ { |
| 294 » » » // If we're not doing keys-only, consume the row. |
| 295 » » » var row []byte |
| 296 » » » if !r.KeysOnly { |
| 297 » » » » row, records = records[0], records[1:] |
| 224 } | 298 } |
| 225 | 299 |
| 226 » » » if !cb(index, row) { | 300 » » » if !cb(types.MessageIndex(index), row) { |
| 227 return errStop | 301 return errStop |
| 228 } | 302 } |
| 303 r.Index = types.MessageIndex(index + 1) |
| 229 | 304 |
| 230 // Artificially apply limit within our row records. | 305 // Artificially apply limit within our row records. |
| 231 if limit > 0 { | 306 if limit > 0 { |
| 232 limit-- | 307 limit-- |
| 233 if limit == 0 { | 308 if limit == 0 { |
| 234 return errStop | 309 return errStop |
| 235 } | 310 } |
| 236 } | 311 } |
| 237 } | 312 } |
| 238 return nil | 313 return nil |
| 239 }) | 314 }) |
| 240 | 315 |
| 241 switch err { | 316 switch err { |
| 242 case nil, errStop: | 317 case nil, errStop: |
| 243 return nil | 318 return nil |
| 244 | 319 |
| 320 case errRepeatRequest: |
| 321 return s.Get(r, cb) |
| 322 |
| 245 default: | 323 default: |
| 246 log.WithError(err).Errorf(ctx, "Failed to retrieve row range.") | 324 log.WithError(err).Errorf(ctx, "Failed to retrieve row range.") |
| 247 return err | 325 return err |
| 248 } | 326 } |
| 249 } | 327 } |
| 250 | 328 |
| 251 func (s *btStorage) Tail(p types.StreamPath) ([]byte, types.MessageIndex, error)
{ | 329 func (s *btStorage) Tail(p types.StreamPath) ([]byte, types.MessageIndex, error)
{ |
| 252 ctx := log.SetFields(s, log.Fields{ | 330 ctx := log.SetFields(s, log.Fields{ |
| 253 "path": p, | 331 "path": p, |
| 254 }) | 332 }) |
| 255 | 333 |
| 256 // Iterate through all log keys in the stream. Record the latest one. | 334 // Iterate through all log keys in the stream. Record the latest one. |
| 257 » rk := newRowKey(string(p), 0) | 335 » rk := newRowKey(string(p), 0, 0) |
| 258 var latest *rowKey | 336 var latest *rowKey |
| 259 err := s.raw.getLogData(ctx, rk, 0, true, func(rk *rowKey, data []byte)
error { | 337 err := s.raw.getLogData(ctx, rk, 0, true, func(rk *rowKey, data []byte)
error { |
| 260 latest = rk | 338 latest = rk |
| 261 return nil | 339 return nil |
| 262 }) | 340 }) |
| 263 if err != nil { | 341 if err != nil { |
| 264 log.Fields{ | 342 log.Fields{ |
| 265 log.ErrorKey: err, | 343 log.ErrorKey: err, |
| 266 "project": s.Project, | 344 "project": s.Project, |
| 267 "zone": s.Zone, | 345 "zone": s.Zone, |
| (...skipping 68 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 336 | 414 |
| 337 func (w *rowWriter) flush(ctx context.Context, raw btTable, index types.MessageI
ndex, path types.StreamPath) (int, error) { | 415 func (w *rowWriter) flush(ctx context.Context, raw btTable, index types.MessageI
ndex, path types.StreamPath) (int, error) { |
| 338 flushCount := w.count | 416 flushCount := w.count |
| 339 if flushCount == 0 { | 417 if flushCount == 0 { |
| 340 return 0, nil | 418 return 0, nil |
| 341 } | 419 } |
| 342 | 420 |
| 343 // Write the current set of buffered rows to the table. Index on the LAS
T | 421 // Write the current set of buffered rows to the table. Index on the LAS
T |
| 344 // row index. | 422 // row index. |
| 345 lastIndex := int64(index) + int64(flushCount) - 1 | 423 lastIndex := int64(index) + int64(flushCount) - 1 |
| 346 » rk := newRowKey(string(path), lastIndex) | 424 » rk := newRowKey(string(path), lastIndex, int64(w.count)) |
| 347 | 425 |
| 348 log.Fields{ | 426 log.Fields{ |
| 349 "rowKey": rk, | 427 "rowKey": rk, |
| 350 "path": path, | 428 "path": path, |
| 351 "index": index, | 429 "index": index, |
| 352 "lastIndex": lastIndex, | 430 "lastIndex": lastIndex, |
| 353 "count": w.count, | 431 "count": w.count, |
| 354 "size": w.buf.Len(), | 432 "size": w.buf.Len(), |
| 355 }.Debugf(ctx, "Adding entries to BigTable.") | 433 }.Debugf(ctx, "Adding entries to BigTable.") |
| 356 if err := raw.putLogData(ctx, rk, w.buf.Bytes()); err != nil { | 434 if err := raw.putLogData(ctx, rk, w.buf.Bytes()); err != nil { |
| 357 return 0, err | 435 return 0, err |
| 358 } | 436 } |
| 359 | 437 |
| 360 // Reset our buffer state. | 438 // Reset our buffer state. |
| 361 w.buf.Reset() | 439 w.buf.Reset() |
| 362 w.count = 0 | 440 w.count = 0 |
| 363 return flushCount, nil | 441 return flushCount, nil |
| 364 } | 442 } |
| OLD | NEW |