Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(110)

Side by Side Diff: logdog/common/storage/bigtable/storage.go

Issue 2435113002: LogDog: Add Storage-layer data caching. (Closed)
Patch Set: Fix byteLimit bug. Created 4 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The LUCI Authors. All rights reserved. 1 // Copyright 2015 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0 2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file. 3 // that can be found in the LICENSE file.
4 4
5 package bigtable 5 package bigtable
6 6
7 import ( 7 import (
8 "bytes" 8 "bytes"
9 "errors" 9 "errors"
10 "fmt" 10 "fmt"
11 11
12 "github.com/luci/luci-go/common/config" 12 "github.com/luci/luci-go/common/config"
13 "github.com/luci/luci-go/common/data/recordio" 13 "github.com/luci/luci-go/common/data/recordio"
14 log "github.com/luci/luci-go/common/logging" 14 log "github.com/luci/luci-go/common/logging"
15 "github.com/luci/luci-go/logdog/common/storage" 15 "github.com/luci/luci-go/logdog/common/storage"
16 "github.com/luci/luci-go/logdog/common/storage/caching"
16 "github.com/luci/luci-go/logdog/common/types" 17 "github.com/luci/luci-go/logdog/common/types"
17 18
18 "cloud.google.com/go/bigtable" 19 "cloud.google.com/go/bigtable"
19 "golang.org/x/net/context" 20 "golang.org/x/net/context"
20 "google.golang.org/api/option" 21 "google.golang.org/api/option"
21 ) 22 )
22 23
23 var ( 24 var (
24 // StorageScopes is the set of OAuth scopes needed to use the storage 25 // StorageScopes is the set of OAuth scopes needed to use the storage
25 // functionality. 26 // functionality.
(...skipping 34 matching lines...) Expand 10 before | Expand all | Expand 10 after
60 // Project is the name of the project to connect to. 61 // Project is the name of the project to connect to.
61 Project string 62 Project string
62 // Instance is the name of the instance to connect to. 63 // Instance is the name of the instance to connect to.
63 Instance string 64 Instance string
64 // ClientOptions are additional client options to use when instantiating the 65 // ClientOptions are additional client options to use when instantiating the
65 // client instance. 66 // client instance.
66 ClientOptions []option.ClientOption 67 ClientOptions []option.ClientOption
67 68
68 // Table is the name of the BigTable table to use for logs. 69 // Table is the name of the BigTable table to use for logs.
69 LogTable string 70 LogTable string
71
72 // Cache, if not nil, will be used to cache data.
73 Cache caching.Cache
70 } 74 }
71 75
72 func (o *Options) client(ctx context.Context) (*bigtable.Client, error) { 76 func (o *Options) client(ctx context.Context) (*bigtable.Client, error) {
73 return bigtable.NewClient(ctx, o.Project, o.Instance, o.ClientOptions... ) 77 return bigtable.NewClient(ctx, o.Project, o.Instance, o.ClientOptions... )
74 } 78 }
75 79
76 func (o *Options) adminClient(ctx context.Context) (*bigtable.AdminClient, error ) { 80 func (o *Options) adminClient(ctx context.Context) (*bigtable.AdminClient, error ) {
77 return bigtable.NewAdminClient(ctx, o.Project, o.Instance, o.ClientOptio ns...) 81 return bigtable.NewAdminClient(ctx, o.Project, o.Instance, o.ClientOptio ns...)
78 } 82 }
79 83
80 // btStorage is a storage.Storage implementation that uses Google Cloud BigTable 84 // btStorage is a storage.Storage implementation that uses Google Cloud BigTable
81 // as a backend. 85 // as a backend.
82 type btStorage struct { 86 type btStorage struct {
83 *Options 87 *Options
84 88
85 // Context is the bound supplied with New. It is retained (rather than 89 // Context is the bound supplied with New. It is retained (rather than
86 // supplied on a per-call basis) because a special Storage Context devoi d of 90 // supplied on a per-call basis) because a special Storage Context devoi d of
87 // gRPC metadata is needed for Storage calls. 91 // gRPC metadata is needed for Storage calls.
88 context.Context 92 context.Context
89 93
90 client *bigtable.Client 94 client *bigtable.Client
91 logTable *bigtable.Table 95 logTable *bigtable.Table
92 adminClient *bigtable.AdminClient 96 adminClient *bigtable.AdminClient
93 97
94 » // raw is the underlying btTable instance to use for raw operations. 98 » // raw, if not nil, is the raw BigTable interface to use. This is useful for
99 » // testing. If nil, this will default to the production isntance.
95 raw btTable 100 raw btTable
101
96 // maxRowSize is the maxmium number of bytes that can be stored in a sin gle 102 // maxRowSize is the maxmium number of bytes that can be stored in a sin gle
97 // BigTable row. This is a function of BigTable, and constant in product ion 103 // BigTable row. This is a function of BigTable, and constant in product ion
98 // (bigTableRowMaxBytes), but variable here to allow for testing to cont rol. 104 // (bigTableRowMaxBytes), but variable here to allow for testing to cont rol.
99 maxRowSize int 105 maxRowSize int
100 } 106 }
101 107
102 // New instantiates a new Storage instance connected to a BigTable instance. 108 // New instantiates a new Storage instance connected to a BigTable instance.
103 // 109 //
104 // The returned Storage instance will close the Client when its Close() method 110 // The returned Storage instance will close the Client when its Close() method
105 // is called. 111 // is called.
106 func New(ctx context.Context, o Options) (storage.Storage, error) { 112 func New(ctx context.Context, o Options) (storage.Storage, error) {
107 client, err := o.client(ctx) 113 client, err := o.client(ctx)
108 if err != nil { 114 if err != nil {
109 return nil, err 115 return nil, err
110 } 116 }
111 117
112 admin, err := o.adminClient(ctx) 118 admin, err := o.adminClient(ctx)
113 if err != nil { 119 if err != nil {
114 return nil, err 120 return nil, err
115 } 121 }
116 122
117 » return newBTStorage(ctx, o, client, admin), nil 123 » return newBTStorage(ctx, o, client, admin, nil), nil
118 } 124 }
119 125
120 func newBTStorage(ctx context.Context, o Options, client *bigtable.Client, admin Client *bigtable.AdminClient) *btStorage { 126 func newBTStorage(ctx context.Context, o Options, client *bigtable.Client, admin Client *bigtable.AdminClient, raw btTable) *btStorage {
121 s := &btStorage{ 127 s := &btStorage{
122 Options: &o, 128 Options: &o,
123 Context: ctx, 129 Context: ctx,
124 130
125 client: client, 131 client: client,
126 adminClient: adminClient, 132 adminClient: adminClient,
133 raw: raw,
127 maxRowSize: bigTableRowMaxBytes, 134 maxRowSize: bigTableRowMaxBytes,
128 } 135 }
129 if s.client != nil { 136 if s.client != nil {
130 s.logTable = s.client.Open(o.LogTable) 137 s.logTable = s.client.Open(o.LogTable)
131 } 138 }
132 » s.raw = &btTableProd{s} 139 » if s.raw == nil {
140 » » s.raw = &btTableProd{s}
141 » }
133 return s 142 return s
134 } 143 }
135 144
136 func (s *btStorage) Close() { 145 func (s *btStorage) Close() {
137 if s.client != nil { 146 if s.client != nil {
138 s.client.Close() 147 s.client.Close()
139 s.client = nil 148 s.client = nil
140 } 149 }
141 150
142 if s.adminClient != nil { 151 if s.adminClient != nil {
(...skipping 149 matching lines...) Expand 10 before | Expand all | Expand 10 after
292 return err 301 return err
293 } 302 }
294 } 303 }
295 304
296 func (s *btStorage) Tail(project config.ProjectName, path types.StreamPath) (*st orage.Entry, error) { 305 func (s *btStorage) Tail(project config.ProjectName, path types.StreamPath) (*st orage.Entry, error) {
297 ctx := log.SetFields(s, log.Fields{ 306 ctx := log.SetFields(s, log.Fields{
298 "project": project, 307 "project": project,
299 "path": path, 308 "path": path,
300 }) 309 })
301 310
311 // Load the "last tail index" from cache. If we have no cache, start at 0.
312 var startIdx int64
313 if s.Cache != nil {
314 startIdx = getLastTailIndex(s, s.Cache, project, path)
315 }
316
302 // Iterate through all log keys in the stream. Record the latest one. 317 // Iterate through all log keys in the stream. Record the latest one.
303 » rk := newRowKey(string(project), string(path), 0, 0) 318 » rk := newRowKey(string(project), string(path), startIdx, 0)
304 var latest *rowKey 319 var latest *rowKey
305 err := s.raw.getLogData(ctx, rk, 0, true, func(rk *rowKey, data []byte) error { 320 err := s.raw.getLogData(ctx, rk, 0, true, func(rk *rowKey, data []byte) error {
306 latest = rk 321 latest = rk
307 return nil 322 return nil
308 }) 323 })
309 if err != nil { 324 if err != nil {
310 log.Fields{ 325 log.Fields{
311 log.ErrorKey: err, 326 log.ErrorKey: err,
312 "project": s.Project, 327 "project": s.Project,
313 "instance": s.Instance, 328 "instance": s.Instance,
314 "table": s.LogTable, 329 "table": s.LogTable,
315 }.Errorf(ctx, "Failed to scan for tail.") 330 }.Errorf(ctx, "Failed to scan for tail.")
316 } 331 }
317 332
318 if latest == nil { 333 if latest == nil {
319 // No rows for the specified stream. 334 // No rows for the specified stream.
320 return nil, storage.ErrDoesNotExist 335 return nil, storage.ErrDoesNotExist
321 } 336 }
322 337
338 // Update our cache if the tail index has changed.
339 if s.Cache != nil && startIdx != latest.index {
340 putLastTailIndex(s, s.Cache, project, path, latest.index)
341 }
342
323 // Fetch the latest row's data. 343 // Fetch the latest row's data.
324 var d []byte 344 var d []byte
325 err = s.raw.getLogData(ctx, latest, 1, false, func(rk *rowKey, data []by te) error { 345 err = s.raw.getLogData(ctx, latest, 1, false, func(rk *rowKey, data []by te) error {
326 records, err := recordio.Split(data) 346 records, err := recordio.Split(data)
327 if err != nil || len(records) == 0 { 347 if err != nil || len(records) == 0 {
328 return storage.ErrBadData 348 return storage.ErrBadData
329 } 349 }
330 d = records[len(records)-1] 350 d = records[len(records)-1]
331 return errStop 351 return errStop
332 }) 352 })
(...skipping 68 matching lines...) Expand 10 before | Expand all | Expand 10 after
401 }.Debugf(ctx, "Adding entries to BigTable.") 421 }.Debugf(ctx, "Adding entries to BigTable.")
402 if err := raw.putLogData(ctx, rk, w.buf.Bytes()); err != nil { 422 if err := raw.putLogData(ctx, rk, w.buf.Bytes()); err != nil {
403 return 0, err 423 return 0, err
404 } 424 }
405 425
406 // Reset our buffer state. 426 // Reset our buffer state.
407 w.buf.Reset() 427 w.buf.Reset()
408 w.count = 0 428 w.count = 0
409 return flushCount, nil 429 return flushCount, nil
410 } 430 }
OLDNEW
« no previous file with comments | « logdog/common/storage/bigtable/initialize.go ('k') | logdog/common/storage/bigtable/storage_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698