Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(189)

Side by Side Diff: server/logdog/storage/bigtable/storage.go

Issue 1909943003: LogDog: Add project support to Storage. (Closed) Base URL: https://github.com/luci/luci-go@logdog-project-coordinator-services
Patch Set: Rebase? Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package bigtable 5 package bigtable
6 6
7 import ( 7 import (
8 "bytes" 8 "bytes"
9 "errors" 9 "errors"
10 "fmt" 10 "fmt"
11 11
12 "github.com/luci/luci-go/common/config"
12 "github.com/luci/luci-go/common/logdog/types" 13 "github.com/luci/luci-go/common/logdog/types"
13 log "github.com/luci/luci-go/common/logging" 14 log "github.com/luci/luci-go/common/logging"
14 "github.com/luci/luci-go/common/recordio" 15 "github.com/luci/luci-go/common/recordio"
15 "github.com/luci/luci-go/server/logdog/storage" 16 "github.com/luci/luci-go/server/logdog/storage"
16 "golang.org/x/net/context" 17 "golang.org/x/net/context"
17 "google.golang.org/cloud" 18 "google.golang.org/cloud"
18 "google.golang.org/cloud/bigtable" 19 "google.golang.org/cloud/bigtable"
19 ) 20 )
20 21
21 var ( 22 var (
(...skipping 26 matching lines...) Expand all
48 ) 49 )
49 50
50 var ( 51 var (
51 // errStop is an internal sentinel error used to indicate "stop iteratio n" 52 // errStop is an internal sentinel error used to indicate "stop iteratio n"
52 // to btTable.getLogData iterator. 53 // to btTable.getLogData iterator.
53 errStop = errors.New("bigtable: stop iteration") 54 errStop = errors.New("bigtable: stop iteration")
54 55
55 // errRepeatRequest is a transition sentinel that instructs us to repeat 56 // errRepeatRequest is a transition sentinel that instructs us to repeat
56 // our query with KeysOnly set to true. 57 // our query with KeysOnly set to true.
57 // 58 //
58 » // This can be deleted once BigTable rows without a count have been aged 59 » // TODO(dnj): This can be deleted once BigTable rows without a count hav e been
59 » // off. 60 » // aged off.
60 errRepeatRequest = errors.New("bigtable: repeat request") 61 errRepeatRequest = errors.New("bigtable: repeat request")
61 ) 62 )
62 63
63 // Options is a set of configuration options for BigTable storage. 64 // Options is a set of configuration options for BigTable storage.
64 type Options struct { 65 type Options struct {
65 // Project is the name of the project to connect to. 66 // Project is the name of the project to connect to.
66 Project string 67 Project string
67 // Zone is the name of the zone to connect to. 68 // Zone is the name of the zone to connect to.
68 Zone string 69 Zone string
69 // Cluster is the name of the cluster to connect to. 70 // Cluster is the name of the cluster to connect to.
(...skipping 94 matching lines...) Expand 10 before | Expand all | Expand 10 after
164 func (s *btStorage) Put(r storage.PutRequest) error { 165 func (s *btStorage) Put(r storage.PutRequest) error {
165 rw := rowWriter{ 166 rw := rowWriter{
166 threshold: s.maxRowSize, 167 threshold: s.maxRowSize,
167 } 168 }
168 169
169 for len(r.Values) > 0 { 170 for len(r.Values) > 0 {
170 // Add the next entry to the writer. 171 // Add the next entry to the writer.
171 if appended := rw.append(r.Values[0]); !appended { 172 if appended := rw.append(r.Values[0]); !appended {
172 // We have failed to append our maximum BigTable row siz e. Flush any 173 // We have failed to append our maximum BigTable row siz e. Flush any
173 // currently-buffered row data and try again with an emp ty buffer. 174 // currently-buffered row data and try again with an emp ty buffer.
174 » » » count, err := rw.flush(s, s.raw, r.Index, r.Path) 175 » » » count, err := rw.flush(s, s.raw, r.Index, r.Project, r.P ath)
175 if err != nil { 176 if err != nil {
176 return err 177 return err
177 } 178 }
178 179
179 if count == 0 { 180 if count == 0 {
180 // Nothing was buffered, but we still couldn't a ppend an entry. The 181 // Nothing was buffered, but we still couldn't a ppend an entry. The
181 // current entry is too large by itself, so we m ust fail. 182 // current entry is too large by itself, so we m ust fail.
182 return fmt.Errorf("single row entry exceeds maxi mum size (%d > %d)", len(r.Values[0]), bigTableRowMaxBytes) 183 return fmt.Errorf("single row entry exceeds maxi mum size (%d > %d)", len(r.Values[0]), bigTableRowMaxBytes)
183 } 184 }
184 185
185 r.Index += types.MessageIndex(count) 186 r.Index += types.MessageIndex(count)
186 continue 187 continue
187 } 188 }
188 189
189 // We successfully appended this entry, so advance. 190 // We successfully appended this entry, so advance.
190 r.Values = r.Values[1:] 191 r.Values = r.Values[1:]
191 } 192 }
192 193
193 // Flush any buffered rows. 194 // Flush any buffered rows.
194 » if _, err := rw.flush(s, s.raw, r.Index, r.Path); err != nil { 195 » if _, err := rw.flush(s, s.raw, r.Index, r.Project, r.Path); err != nil {
195 return err 196 return err
196 } 197 }
197 return nil 198 return nil
198 } 199 }
199 200
200 func (s *btStorage) Get(r storage.GetRequest, cb storage.GetCallback) error { 201 func (s *btStorage) Get(r storage.GetRequest, cb storage.GetCallback) error {
201 » startKey := newRowKey(string(r.Path), int64(r.Index), 0) 202 » startKey := newRowKey(string(r.Project), string(r.Path), int64(r.Index), 0)
202 ctx := log.SetFields(s, log.Fields{ 203 ctx := log.SetFields(s, log.Fields{
204 "project": r.Project,
203 "path": r.Path, 205 "path": r.Path,
204 "index": r.Index, 206 "index": r.Index,
205 "limit": r.Limit, 207 "limit": r.Limit,
206 "startRowKey": startKey, 208 "startRowKey": startKey,
207 "keysOnly": r.KeysOnly, 209 "keysOnly": r.KeysOnly,
208 }) 210 })
209 211
210 // If we issue a query and get back a legacy row, it will have no count 212 // If we issue a query and get back a legacy row, it will have no count
211 // associated with it. We will fast-exit 213 // associated with it. We will fast-exit
212 214
(...skipping 17 matching lines...) Expand all
230 232
231 switch { 233 switch {
232 case r.KeysOnly: 234 case r.KeysOnly:
233 // Keys only query, so count is the authority. 235 // Keys only query, so count is the authority.
234 if rk.count == 0 { 236 if rk.count == 0 {
235 // If it's zero, we are dealing with a legacy ro w, before we started 237 // If it's zero, we are dealing with a legacy ro w, before we started
236 // keeping count. A keys-only query is insuffici ent to get the 238 // keeping count. A keys-only query is insuffici ent to get the
237 // inforamtion that we need, so repeat this Get request with KeysOnly 239 // inforamtion that we need, so repeat this Get request with KeysOnly
238 // set to false. 240 // set to false.
239 // 241 //
240 » » » » // NOTE: This logic can be removed once all unco unted rows are aged off. 242 » » » » // TODO(dnj): This logic can be removed once all uncounted rows are aged
243 » » » » // off.
241 r.KeysOnly = false 244 r.KeysOnly = false
242 return errRepeatRequest 245 return errRepeatRequest
243 } 246 }
244 break 247 break
245 248
246 case rk.count == 0: 249 case rk.count == 0:
247 // This is a non-keys-only query, but we are dealing wit h a legacy row. 250 // This is a non-keys-only query, but we are dealing wit h a legacy row.
248 // Use the record count as the authority. 251 // Use the record count as the authority.
249 // 252 //
250 » » » // NOTE: This logic can be removed once all uncounted ro ws are aged off. 253 » » » // TODO(dnj): This logic can be removed once all uncount ed rows are aged
254 » » » // off.
251 rk.count = int64(len(records)) 255 rk.count = int64(len(records))
252 256
253 case rk.count == int64(len(records)): 257 case rk.count == int64(len(records)):
254 // This is the expected case, so we're set. 258 // This is the expected case, so we're set.
255 // 259 //
256 » » » // NOTE: Once uncounted rows are aged off, this is the o nly logic that 260 » » » // TODO(dnj): Once uncounted rows are aged off, this is the only logic
257 » » » // we need, in the form of a sanity assertion. 261 » » » // that we need, in the form of a sanity assertion.
258 break 262 break
259 263
260 default: 264 default:
261 log.Fields{ 265 log.Fields{
262 "count": rk.count, 266 "count": rk.count,
263 "recordCount": len(records), 267 "recordCount": len(records),
264 }.Errorf(ctx, "Record count doesn't match declared count .") 268 }.Errorf(ctx, "Record count doesn't match declared count .")
265 return storage.ErrBadData 269 return storage.ErrBadData
266 } 270 }
267 271
(...skipping 51 matching lines...) Expand 10 before | Expand all | Expand 10 after
319 323
320 case errRepeatRequest: 324 case errRepeatRequest:
321 return s.Get(r, cb) 325 return s.Get(r, cb)
322 326
323 default: 327 default:
324 log.WithError(err).Errorf(ctx, "Failed to retrieve row range.") 328 log.WithError(err).Errorf(ctx, "Failed to retrieve row range.")
325 return err 329 return err
326 } 330 }
327 } 331 }
328 332
329 func (s *btStorage) Tail(p types.StreamPath) ([]byte, types.MessageIndex, error) { 333 func (s *btStorage) Tail(project config.ProjectName, path types.StreamPath) ([]b yte, types.MessageIndex, error) {
330 ctx := log.SetFields(s, log.Fields{ 334 ctx := log.SetFields(s, log.Fields{
331 » » "path": p, 335 » » "project": project,
336 » » "path": path,
332 }) 337 })
333 338
334 // Iterate through all log keys in the stream. Record the latest one. 339 // Iterate through all log keys in the stream. Record the latest one.
335 » rk := newRowKey(string(p), 0, 0) 340 » rk := newRowKey(string(project), string(path), 0, 0)
336 var latest *rowKey 341 var latest *rowKey
337 err := s.raw.getLogData(ctx, rk, 0, true, func(rk *rowKey, data []byte) error { 342 err := s.raw.getLogData(ctx, rk, 0, true, func(rk *rowKey, data []byte) error {
338 latest = rk 343 latest = rk
339 return nil 344 return nil
340 }) 345 })
341 if err != nil { 346 if err != nil {
342 log.Fields{ 347 log.Fields{
343 log.ErrorKey: err, 348 log.ErrorKey: err,
344 "project": s.Project, 349 "project": s.Project,
345 "zone": s.Zone, 350 "zone": s.Zone,
(...skipping 59 matching lines...) Expand 10 before | Expand all | Expand 10 after
405 } 410 }
406 411
407 // If we have exceeded our threshold, report a failure. 412 // If we have exceeded our threshold, report a failure.
408 appended = (w.buf.Len() <= w.threshold) 413 appended = (w.buf.Len() <= w.threshold)
409 if appended { 414 if appended {
410 w.count++ 415 w.count++
411 } 416 }
412 return 417 return
413 } 418 }
414 419
415 func (w *rowWriter) flush(ctx context.Context, raw btTable, index types.MessageI ndex, path types.StreamPath) (int, error) { 420 func (w *rowWriter) flush(ctx context.Context, raw btTable, index types.MessageI ndex,
421 » project config.ProjectName, path types.StreamPath) (int, error) {
416 flushCount := w.count 422 flushCount := w.count
417 if flushCount == 0 { 423 if flushCount == 0 {
418 return 0, nil 424 return 0, nil
419 } 425 }
420 426
421 // Write the current set of buffered rows to the table. Index on the LAS T 427 // Write the current set of buffered rows to the table. Index on the LAS T
422 // row index. 428 // row index.
423 lastIndex := int64(index) + int64(flushCount) - 1 429 lastIndex := int64(index) + int64(flushCount) - 1
424 » rk := newRowKey(string(path), lastIndex, int64(w.count)) 430 » rk := newRowKey(string(project), string(path), lastIndex, int64(w.count) )
425 431
426 log.Fields{ 432 log.Fields{
427 "rowKey": rk, 433 "rowKey": rk,
434 "project": project,
428 "path": path, 435 "path": path,
429 "index": index, 436 "index": index,
430 "lastIndex": lastIndex, 437 "lastIndex": lastIndex,
431 "count": w.count, 438 "count": w.count,
432 "size": w.buf.Len(), 439 "size": w.buf.Len(),
433 }.Debugf(ctx, "Adding entries to BigTable.") 440 }.Debugf(ctx, "Adding entries to BigTable.")
434 if err := raw.putLogData(ctx, rk, w.buf.Bytes()); err != nil { 441 if err := raw.putLogData(ctx, rk, w.buf.Bytes()); err != nil {
435 return 0, err 442 return 0, err
436 } 443 }
437 444
438 // Reset our buffer state. 445 // Reset our buffer state.
439 w.buf.Reset() 446 w.buf.Reset()
440 w.count = 0 447 w.count = 0
441 return flushCount, nil 448 return flushCount, nil
442 } 449 }
OLDNEW
« no previous file with comments | « server/logdog/storage/bigtable/rowKey_test.go ('k') | server/logdog/storage/bigtable/storage_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698