| OLD | NEW | 
|---|
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be | 
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. | 
| 4 | 4 | 
| 5 package bigtable | 5 package bigtable | 
| 6 | 6 | 
| 7 import ( | 7 import ( | 
| 8         "bytes" | 8         "bytes" | 
| 9         "errors" | 9         "errors" | 
| 10         "fmt" | 10         "fmt" | 
| 11 | 11 | 
|  | 12         "github.com/luci/luci-go/common/config" | 
| 12         "github.com/luci/luci-go/common/logdog/types" | 13         "github.com/luci/luci-go/common/logdog/types" | 
| 13         log "github.com/luci/luci-go/common/logging" | 14         log "github.com/luci/luci-go/common/logging" | 
| 14         "github.com/luci/luci-go/common/recordio" | 15         "github.com/luci/luci-go/common/recordio" | 
| 15         "github.com/luci/luci-go/server/logdog/storage" | 16         "github.com/luci/luci-go/server/logdog/storage" | 
| 16         "golang.org/x/net/context" | 17         "golang.org/x/net/context" | 
| 17         "google.golang.org/cloud" | 18         "google.golang.org/cloud" | 
| 18         "google.golang.org/cloud/bigtable" | 19         "google.golang.org/cloud/bigtable" | 
| 19 ) | 20 ) | 
| 20 | 21 | 
| 21 var ( | 22 var ( | 
| (...skipping 26 matching lines...) Expand all  Loading... | 
| 48 ) | 49 ) | 
| 49 | 50 | 
| 50 var ( | 51 var ( | 
| 51         // errStop is an internal sentinel error used to indicate "stop iteratio
     n" | 52         // errStop is an internal sentinel error used to indicate "stop iteratio
     n" | 
| 52         // to btTable.getLogData iterator. | 53         // to btTable.getLogData iterator. | 
| 53         errStop = errors.New("bigtable: stop iteration") | 54         errStop = errors.New("bigtable: stop iteration") | 
| 54 | 55 | 
| 55         // errRepeatRequest is a transition sentinel that instructs us to repeat | 56         // errRepeatRequest is a transition sentinel that instructs us to repeat | 
| 56         // our query with KeysOnly set to true. | 57         // our query with KeysOnly set to true. | 
| 57         // | 58         // | 
| 58 »       // This can be deleted once BigTable rows without a count have been aged | 59 »       // TODO(dnj): This can be deleted once BigTable rows without a count hav
     e been | 
| 59 »       // off. | 60 »       // aged off. | 
| 60         errRepeatRequest = errors.New("bigtable: repeat request") | 61         errRepeatRequest = errors.New("bigtable: repeat request") | 
| 61 ) | 62 ) | 
| 62 | 63 | 
| 63 // Options is a set of configuration options for BigTable storage. | 64 // Options is a set of configuration options for BigTable storage. | 
| 64 type Options struct { | 65 type Options struct { | 
| 65         // Project is the name of the project to connect to. | 66         // Project is the name of the project to connect to. | 
| 66         Project string | 67         Project string | 
| 67         // Zone is the name of the zone to connect to. | 68         // Zone is the name of the zone to connect to. | 
| 68         Zone string | 69         Zone string | 
| 69         // Cluster is the name of the cluster to connect to. | 70         // Cluster is the name of the cluster to connect to. | 
| (...skipping 94 matching lines...) Expand 10 before | Expand all | Expand 10 after  Loading... | 
| 164 func (s *btStorage) Put(r storage.PutRequest) error { | 165 func (s *btStorage) Put(r storage.PutRequest) error { | 
| 165         rw := rowWriter{ | 166         rw := rowWriter{ | 
| 166                 threshold: s.maxRowSize, | 167                 threshold: s.maxRowSize, | 
| 167         } | 168         } | 
| 168 | 169 | 
| 169         for len(r.Values) > 0 { | 170         for len(r.Values) > 0 { | 
| 170                 // Add the next entry to the writer. | 171                 // Add the next entry to the writer. | 
| 171                 if appended := rw.append(r.Values[0]); !appended { | 172                 if appended := rw.append(r.Values[0]); !appended { | 
| 172                         // We have failed to append our maximum BigTable row siz
     e. Flush any | 173                         // We have failed to append our maximum BigTable row siz
     e. Flush any | 
| 173                         // currently-buffered row data and try again with an emp
     ty buffer. | 174                         // currently-buffered row data and try again with an emp
     ty buffer. | 
| 174 »       »       »       count, err := rw.flush(s, s.raw, r.Index, r.Path) | 175 »       »       »       count, err := rw.flush(s, s.raw, r.Index, r.Project, r.P
     ath) | 
| 175                         if err != nil { | 176                         if err != nil { | 
| 176                                 return err | 177                                 return err | 
| 177                         } | 178                         } | 
| 178 | 179 | 
| 179                         if count == 0 { | 180                         if count == 0 { | 
| 180                                 // Nothing was buffered, but we still couldn't a
     ppend an entry. The | 181                                 // Nothing was buffered, but we still couldn't a
     ppend an entry. The | 
| 181                                 // current entry is too large by itself, so we m
     ust fail. | 182                                 // current entry is too large by itself, so we m
     ust fail. | 
| 182                                 return fmt.Errorf("single row entry exceeds maxi
     mum size (%d > %d)", len(r.Values[0]), bigTableRowMaxBytes) | 183                                 return fmt.Errorf("single row entry exceeds maxi
     mum size (%d > %d)", len(r.Values[0]), bigTableRowMaxBytes) | 
| 183                         } | 184                         } | 
| 184 | 185 | 
| 185                         r.Index += types.MessageIndex(count) | 186                         r.Index += types.MessageIndex(count) | 
| 186                         continue | 187                         continue | 
| 187                 } | 188                 } | 
| 188 | 189 | 
| 189                 // We successfully appended this entry, so advance. | 190                 // We successfully appended this entry, so advance. | 
| 190                 r.Values = r.Values[1:] | 191                 r.Values = r.Values[1:] | 
| 191         } | 192         } | 
| 192 | 193 | 
| 193         // Flush any buffered rows. | 194         // Flush any buffered rows. | 
| 194 »       if _, err := rw.flush(s, s.raw, r.Index, r.Path); err != nil { | 195 »       if _, err := rw.flush(s, s.raw, r.Index, r.Project, r.Path); err != nil 
     { | 
| 195                 return err | 196                 return err | 
| 196         } | 197         } | 
| 197         return nil | 198         return nil | 
| 198 } | 199 } | 
| 199 | 200 | 
| 200 func (s *btStorage) Get(r storage.GetRequest, cb storage.GetCallback) error { | 201 func (s *btStorage) Get(r storage.GetRequest, cb storage.GetCallback) error { | 
| 201 »       startKey := newRowKey(string(r.Path), int64(r.Index), 0) | 202 »       startKey := newRowKey(string(r.Project), string(r.Path), int64(r.Index),
      0) | 
| 202         ctx := log.SetFields(s, log.Fields{ | 203         ctx := log.SetFields(s, log.Fields{ | 
|  | 204                 "project":     r.Project, | 
| 203                 "path":        r.Path, | 205                 "path":        r.Path, | 
| 204                 "index":       r.Index, | 206                 "index":       r.Index, | 
| 205                 "limit":       r.Limit, | 207                 "limit":       r.Limit, | 
| 206                 "startRowKey": startKey, | 208                 "startRowKey": startKey, | 
| 207                 "keysOnly":    r.KeysOnly, | 209                 "keysOnly":    r.KeysOnly, | 
| 208         }) | 210         }) | 
| 209 | 211 | 
| 210         // If we issue a query and get back a legacy row, it will have no count | 212         // If we issue a query and get back a legacy row, it will have no count | 
| 211         // associated with it. We will fast-exit | 213         // associated with it. We will fast-exit | 
| 212 | 214 | 
| (...skipping 17 matching lines...) Expand all  Loading... | 
| 230 | 232 | 
| 231                 switch { | 233                 switch { | 
| 232                 case r.KeysOnly: | 234                 case r.KeysOnly: | 
| 233                         // Keys only query, so count is the authority. | 235                         // Keys only query, so count is the authority. | 
| 234                         if rk.count == 0 { | 236                         if rk.count == 0 { | 
| 235                                 // If it's zero, we are dealing with a legacy ro
     w, before we started | 237                                 // If it's zero, we are dealing with a legacy ro
     w, before we started | 
| 236                                 // keeping count. A keys-only query is insuffici
     ent to get the | 238                                 // keeping count. A keys-only query is insuffici
     ent to get the | 
| 237                                 // inforamtion that we need, so repeat this Get 
     request with KeysOnly | 239                                 // inforamtion that we need, so repeat this Get 
     request with KeysOnly | 
| 238                                 // set to false. | 240                                 // set to false. | 
| 239                                 // | 241                                 // | 
| 240 »       »       »       »       // NOTE: This logic can be removed once all unco
     unted rows are aged off. | 242 »       »       »       »       // TODO(dnj): This logic can be removed once all
      uncounted rows are aged | 
|  | 243 »       »       »       »       // off. | 
| 241                                 r.KeysOnly = false | 244                                 r.KeysOnly = false | 
| 242                                 return errRepeatRequest | 245                                 return errRepeatRequest | 
| 243                         } | 246                         } | 
| 244                         break | 247                         break | 
| 245 | 248 | 
| 246                 case rk.count == 0: | 249                 case rk.count == 0: | 
| 247                         // This is a non-keys-only query, but we are dealing wit
     h a legacy row. | 250                         // This is a non-keys-only query, but we are dealing wit
     h a legacy row. | 
| 248                         // Use the record count as the authority. | 251                         // Use the record count as the authority. | 
| 249                         // | 252                         // | 
| 250 »       »       »       // NOTE: This logic can be removed once all uncounted ro
     ws are aged off. | 253 »       »       »       // TODO(dnj): This logic can be removed once all uncount
     ed rows are aged | 
|  | 254 »       »       »       // off. | 
| 251                         rk.count = int64(len(records)) | 255                         rk.count = int64(len(records)) | 
| 252 | 256 | 
| 253                 case rk.count == int64(len(records)): | 257                 case rk.count == int64(len(records)): | 
| 254                         // This is the expected case, so we're set. | 258                         // This is the expected case, so we're set. | 
| 255                         // | 259                         // | 
| 256 »       »       »       // NOTE: Once uncounted rows are aged off, this is the o
     nly logic that | 260 »       »       »       // TODO(dnj): Once uncounted rows are aged off, this is 
     the only logic | 
| 257 »       »       »       // we need, in the form of a sanity assertion. | 261 »       »       »       // that we need, in the form of a sanity assertion. | 
| 258                         break | 262                         break | 
| 259 | 263 | 
| 260                 default: | 264                 default: | 
| 261                         log.Fields{ | 265                         log.Fields{ | 
| 262                                 "count":       rk.count, | 266                                 "count":       rk.count, | 
| 263                                 "recordCount": len(records), | 267                                 "recordCount": len(records), | 
| 264                         }.Errorf(ctx, "Record count doesn't match declared count
     .") | 268                         }.Errorf(ctx, "Record count doesn't match declared count
     .") | 
| 265                         return storage.ErrBadData | 269                         return storage.ErrBadData | 
| 266                 } | 270                 } | 
| 267 | 271 | 
| (...skipping 51 matching lines...) Expand 10 before | Expand all | Expand 10 after  Loading... | 
| 319 | 323 | 
| 320         case errRepeatRequest: | 324         case errRepeatRequest: | 
| 321                 return s.Get(r, cb) | 325                 return s.Get(r, cb) | 
| 322 | 326 | 
| 323         default: | 327         default: | 
| 324                 log.WithError(err).Errorf(ctx, "Failed to retrieve row range.") | 328                 log.WithError(err).Errorf(ctx, "Failed to retrieve row range.") | 
| 325                 return err | 329                 return err | 
| 326         } | 330         } | 
| 327 } | 331 } | 
| 328 | 332 | 
| 329 func (s *btStorage) Tail(p types.StreamPath) ([]byte, types.MessageIndex, error)
      { | 333 func (s *btStorage) Tail(project config.ProjectName, path types.StreamPath) ([]b
     yte, types.MessageIndex, error) { | 
| 330         ctx := log.SetFields(s, log.Fields{ | 334         ctx := log.SetFields(s, log.Fields{ | 
| 331 »       »       "path": p, | 335 »       »       "project": project, | 
|  | 336 »       »       "path":    path, | 
| 332         }) | 337         }) | 
| 333 | 338 | 
| 334         // Iterate through all log keys in the stream. Record the latest one. | 339         // Iterate through all log keys in the stream. Record the latest one. | 
| 335 »       rk := newRowKey(string(p), 0, 0) | 340 »       rk := newRowKey(string(project), string(path), 0, 0) | 
| 336         var latest *rowKey | 341         var latest *rowKey | 
| 337         err := s.raw.getLogData(ctx, rk, 0, true, func(rk *rowKey, data []byte) 
     error { | 342         err := s.raw.getLogData(ctx, rk, 0, true, func(rk *rowKey, data []byte) 
     error { | 
| 338                 latest = rk | 343                 latest = rk | 
| 339                 return nil | 344                 return nil | 
| 340         }) | 345         }) | 
| 341         if err != nil { | 346         if err != nil { | 
| 342                 log.Fields{ | 347                 log.Fields{ | 
| 343                         log.ErrorKey: err, | 348                         log.ErrorKey: err, | 
| 344                         "project":    s.Project, | 349                         "project":    s.Project, | 
| 345                         "zone":       s.Zone, | 350                         "zone":       s.Zone, | 
| (...skipping 59 matching lines...) Expand 10 before | Expand all | Expand 10 after  Loading... | 
| 405         } | 410         } | 
| 406 | 411 | 
| 407         // If we have exceeded our threshold, report a failure. | 412         // If we have exceeded our threshold, report a failure. | 
| 408         appended = (w.buf.Len() <= w.threshold) | 413         appended = (w.buf.Len() <= w.threshold) | 
| 409         if appended { | 414         if appended { | 
| 410                 w.count++ | 415                 w.count++ | 
| 411         } | 416         } | 
| 412         return | 417         return | 
| 413 } | 418 } | 
| 414 | 419 | 
| 415 func (w *rowWriter) flush(ctx context.Context, raw btTable, index types.MessageI
     ndex, path types.StreamPath) (int, error) { | 420 func (w *rowWriter) flush(ctx context.Context, raw btTable, index types.MessageI
     ndex, | 
|  | 421 »       project config.ProjectName, path types.StreamPath) (int, error) { | 
| 416         flushCount := w.count | 422         flushCount := w.count | 
| 417         if flushCount == 0 { | 423         if flushCount == 0 { | 
| 418                 return 0, nil | 424                 return 0, nil | 
| 419         } | 425         } | 
| 420 | 426 | 
| 421         // Write the current set of buffered rows to the table. Index on the LAS
     T | 427         // Write the current set of buffered rows to the table. Index on the LAS
     T | 
| 422         // row index. | 428         // row index. | 
| 423         lastIndex := int64(index) + int64(flushCount) - 1 | 429         lastIndex := int64(index) + int64(flushCount) - 1 | 
| 424 »       rk := newRowKey(string(path), lastIndex, int64(w.count)) | 430 »       rk := newRowKey(string(project), string(path), lastIndex, int64(w.count)
     ) | 
| 425 | 431 | 
| 426         log.Fields{ | 432         log.Fields{ | 
| 427                 "rowKey":    rk, | 433                 "rowKey":    rk, | 
|  | 434                 "project":   project, | 
| 428                 "path":      path, | 435                 "path":      path, | 
| 429                 "index":     index, | 436                 "index":     index, | 
| 430                 "lastIndex": lastIndex, | 437                 "lastIndex": lastIndex, | 
| 431                 "count":     w.count, | 438                 "count":     w.count, | 
| 432                 "size":      w.buf.Len(), | 439                 "size":      w.buf.Len(), | 
| 433         }.Debugf(ctx, "Adding entries to BigTable.") | 440         }.Debugf(ctx, "Adding entries to BigTable.") | 
| 434         if err := raw.putLogData(ctx, rk, w.buf.Bytes()); err != nil { | 441         if err := raw.putLogData(ctx, rk, w.buf.Bytes()); err != nil { | 
| 435                 return 0, err | 442                 return 0, err | 
| 436         } | 443         } | 
| 437 | 444 | 
| 438         // Reset our buffer state. | 445         // Reset our buffer state. | 
| 439         w.buf.Reset() | 446         w.buf.Reset() | 
| 440         w.count = 0 | 447         w.count = 0 | 
| 441         return flushCount, nil | 448         return flushCount, nil | 
| 442 } | 449 } | 
| OLD | NEW | 
|---|